Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 97 additions & 1 deletion redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,12 @@ def lock(self, name, timeout=None, sleep=0.1):
when the lock is in blocking mode and another client is currently
holding the lock.
"""
return Lock(self, name, timeout=timeout, sleep=sleep)
from distutils.version import StrictVersion
version = self.info()['redis_version']
if StrictVersion(version) < StrictVersion('2.6.0'):
return Lock(self, name, timeout=timeout, sleep=sleep)
else:
return LuaLock(self, name, timeout=timeout, sleep=sleep)

def pubsub(self, shard_hint=None):
"""
Expand Down Expand Up @@ -2211,3 +2216,94 @@ def release(self):
self.acquired_until = None
if delete_lock:
self.redis.delete(self.name)


class LuaLock(object):
"""
Like Lock, but using key expiration and LUA for transactions.

Backward compatible with Lock,
except for not keeping released locks in redis.

NOTE: Requires Redis 2.6+
"""
# KEYS[1] - lock name, ARGV[1] - timeout, ARGV[2] - timeout at
LUA_LOCK_ACQUIRE_SCRIPT = """
local ret = redis.call('setnx', KEYS[1], ARGV[2])
if (ret == 1) then
redis.call('expire', KEYS[1], ARGV[1])
return true
end
return false
"""
LUA_LOCK_RELEASE_SCRIPT = """
if (redis.call('get', KEYS[1])) then
redis.call('del', KEYS[1])
return true
end
return false
"""
LUA_LOCK_ACQUIRE = None
LUA_LOCK_RELEASE = None

def __init__(self, redis, name, timeout=None, sleep=0.1):
"""
Create a new Lock instance named ``name`` using the Redis client
supplied by ``redis``.

``timeout`` indicates a maximum life for the lock.
By default, it will remain locked until release() is called.

``sleep`` indicates the amount of time to sleep per loop iteration
when the lock is in blocking mode and another client is currently
holding the lock.
"""
self.redis = redis
self.name = name
self.acquired_until = None
self.timeout = timeout
self.sleep = sleep
if self.timeout and self.sleep > self.timeout:
raise LockError("'sleep' must be less than 'timeout'")
# If the Acquire/Release scripts are not in redis- load them
if not callable(LuaLock.LUA_LOCK_ACQUIRE):
LuaLock.LUA_LOCK_ACQUIRE = redis.register_script(
LuaLock.LUA_LOCK_ACQUIRE_SCRIPT)
if not callable(LuaLock.LUA_LOCK_RELEASE):
LuaLock.LUA_LOCK_RELEASE = redis.register_script(
LuaLock.LUA_LOCK_RELEASE_SCRIPT)

def __enter__(self):
return self.acquire()

def __exit__(self, exc_type, exc_value, traceback):
self.release()

def acquire(self, blocking=True):
"""
Use Redis to hold a shared, distributed lock named ``name``.
Returns True once the lock is acquired.

If ``blocking`` is False, always return immediately. If the lock
was acquired, return True, otherwise return False.
"""
sleep = self.sleep
timeout = self.timeout if self.timeout else Lock.LOCK_FOREVER
while 1:
timeout_at = mod_time.time() + self.timeout if self.timeout \
else Lock.LOCK_FOREVER
if LuaLock.LUA_LOCK_ACQUIRE(keys=[self.name],
args=[int(timeout), timeout_at]):
self.acquired_until = timeout_at
return True
if not blocking:
return False
mod_time.sleep(sleep)

def release(self):
"""
Releases the already acquired lock
"""
if not LuaLock.LUA_LOCK_RELEASE(keys=[self.name]):
raise ValueError("Cannot release an unlocked lock")
self.acquired_until = None