diff --git a/redis/client.py b/redis/client.py index 1c702fd96d..3d15d3051d 100644 --- a/redis/client.py +++ b/redis/client.py @@ -420,7 +420,12 @@ def lock(self, name, timeout=None, sleep=0.1): when the lock is in blocking mode and another client is currently holding the lock. """ - return Lock(self, name, timeout=timeout, sleep=sleep) + from distutils.version import StrictVersion + version = self.info()['redis_version'] + if StrictVersion(version) < StrictVersion('2.6.0'): + return Lock(self, name, timeout=timeout, sleep=sleep) + else: + return LuaLock(self, name, timeout=timeout, sleep=sleep) def pubsub(self, shard_hint=None): """ @@ -2211,3 +2216,94 @@ def release(self): self.acquired_until = None if delete_lock: self.redis.delete(self.name) + + +class LuaLock(object): + """ + Like Lock, but using key expiration and LUA for transactions. + + Backward compatible with Lock, + except for not keeping released locks in redis. + + NOTE: Requires Redis 2.6+ + """ + # KEYS[1] - lock name, ARGV[1] - timeout, ARGV[2] - timeout at + LUA_LOCK_ACQUIRE_SCRIPT = """ + local ret = redis.call('setnx', KEYS[1], ARGV[2]) + if (ret == 1) then + redis.call('expire', KEYS[1], ARGV[1]) + return true + end + return false + """ + LUA_LOCK_RELEASE_SCRIPT = """ + if (redis.call('get', KEYS[1])) then + redis.call('del', KEYS[1]) + return true + end + return false + """ + LUA_LOCK_ACQUIRE = None + LUA_LOCK_RELEASE = None + + def __init__(self, redis, name, timeout=None, sleep=0.1): + """ + Create a new Lock instance named ``name`` using the Redis client + supplied by ``redis``. + + ``timeout`` indicates a maximum life for the lock. + By default, it will remain locked until release() is called. + + ``sleep`` indicates the amount of time to sleep per loop iteration + when the lock is in blocking mode and another client is currently + holding the lock. + """ + self.redis = redis + self.name = name + self.acquired_until = None + self.timeout = timeout + self.sleep = sleep + if self.timeout and self.sleep > self.timeout: + raise LockError("'sleep' must be less than 'timeout'") + # If the Acquire/Release scripts are not in redis- load them + if not callable(LuaLock.LUA_LOCK_ACQUIRE): + LuaLock.LUA_LOCK_ACQUIRE = redis.register_script( + LuaLock.LUA_LOCK_ACQUIRE_SCRIPT) + if not callable(LuaLock.LUA_LOCK_RELEASE): + LuaLock.LUA_LOCK_RELEASE = redis.register_script( + LuaLock.LUA_LOCK_RELEASE_SCRIPT) + + def __enter__(self): + return self.acquire() + + def __exit__(self, exc_type, exc_value, traceback): + self.release() + + def acquire(self, blocking=True): + """ + Use Redis to hold a shared, distributed lock named ``name``. + Returns True once the lock is acquired. + + If ``blocking`` is False, always return immediately. If the lock + was acquired, return True, otherwise return False. + """ + sleep = self.sleep + timeout = self.timeout if self.timeout else Lock.LOCK_FOREVER + while 1: + timeout_at = mod_time.time() + self.timeout if self.timeout \ + else Lock.LOCK_FOREVER + if LuaLock.LUA_LOCK_ACQUIRE(keys=[self.name], + args=[int(timeout), timeout_at]): + self.acquired_until = timeout_at + return True + if not blocking: + return False + mod_time.sleep(sleep) + + def release(self): + """ + Releases the already acquired lock + """ + if not LuaLock.LUA_LOCK_RELEASE(keys=[self.name]): + raise ValueError("Cannot release an unlocked lock") + self.acquired_until = None