diff --git a/oslo_concurrency/lockutils.py b/oslo_concurrency/lockutils.py index ea67571..4519463 100644 --- a/oslo_concurrency/lockutils.py +++ b/oslo_concurrency/lockutils.py @@ -87,6 +87,43 @@ ReaderWriterLock = fasteners.ReaderWriterLock """ +class FairLocks(object): + """A garbage collected container of fair locks. + + This collection internally uses a weak value dictionary so that when a + lock is no longer in use (by any threads) it will automatically be + removed from this container by the garbage collector. + """ + + def __init__(self): + self._locks = weakref.WeakValueDictionary() + self._lock = threading.Lock() + + def get(self, name): + """Gets (or creates) a lock with a given name. + + :param name: The lock name to get/create (used to associate + previously created names with the same lock). + + Returns an newly constructed lock (or an existing one if it was + already created for the given name). + """ + with self._lock: + try: + return self._locks[name] + except KeyError: + rwlock = ReaderWriterLock() + self._locks[name] = rwlock + return rwlock + + +fair_locks = FairLocks() + + +def internal_fair_lock(name): + return fair_locks.get(name) + + class Semaphores(object): """A garbage collected container of semaphores. @@ -170,7 +207,7 @@ def internal_lock(name, semaphores=None): @contextlib.contextmanager def lock(name, lock_file_prefix=None, external=False, lock_path=None, - do_log=True, semaphores=None, delay=0.01): + do_log=True, semaphores=None, delay=0.01, fair=False): """Context based lock This function yields a `threading.Semaphore` instance (if we don't use @@ -200,16 +237,22 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None, :param delay: Delay between acquisition attempts (in seconds). + :param fair: Whether or not we want a "fair" lock where contending lockers + will get the lock in the order in which they tried to acquire it. + .. versionchanged:: 0.2 Added *do_log* optional parameter. .. versionchanged:: 0.3 Added *delay* and *semaphores* optional parameters. """ - int_lock = internal_lock(name, semaphores=semaphores) + if fair: + int_lock = internal_fair_lock(name).write_lock() + else: + int_lock = internal_lock(name, semaphores=semaphores) with int_lock: if do_log: - LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name}) + LOG.debug('Acquired lock "%(lock)s"', {'lock': name}) try: if external and not CONF.oslo_concurrency.disable_process_locking: ext_lock = external_lock(name, lock_file_prefix, lock_path) @@ -222,11 +265,11 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None, yield int_lock finally: if do_log: - LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name}) + LOG.debug('Releasing lock "%(lock)s"', {'lock': name}) def synchronized(name, lock_file_prefix=None, external=False, lock_path=None, - semaphores=None, delay=0.01): + semaphores=None, delay=0.01, fair=False): """Synchronization decorator. Decorating a method like so:: @@ -261,7 +304,8 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None, t2 = None try: with lock(name, lock_file_prefix, external, lock_path, - do_log=False, semaphores=semaphores, delay=delay): + do_log=False, semaphores=semaphores, delay=delay, + fair=fair): t2 = timeutils.now() LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: ' 'waited %(wait_secs)0.3fs',