Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature #271/refactor locking api #279

Merged
merged 3 commits into from
Jun 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions aiocache/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import asyncio

from aiocache import serializers
from aiocache._lock import _RedLock
from aiocache.log import logger


Expand Down Expand Up @@ -461,12 +460,6 @@ def _build_key(self, key, namespace=None):
return "{}{}".format(self.namespace, key)
return key

def _redlock(self, key, lease):
return _RedLock(self, key, lease)

async def _redlock_release(self, key, value):
raise NotImplementedError()

def get_connection(self):
return _Conn(self)

Expand Down
3 changes: 2 additions & 1 deletion aiocache/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from aiocache.log import logger
from aiocache import SimpleMemoryCache, caches
from aiocache.lock import RedLock


class cached:
Expand Down Expand Up @@ -146,7 +147,7 @@ async def decorator(self, f, *args, **kwargs):
if value is not None:
return value

async with self.cache._redlock(key, self.lease):
async with RedLock(self.cache, key, self.lease):
value = await self.get_from_cache(key)
if value is not None:
return value
Expand Down
83 changes: 66 additions & 17 deletions aiocache/_lock.py → aiocache/lock.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
import asyncio
import uuid

from typing import Type, Union, Any

class _RedLock:
from aiocache.base import BaseCache


class RedLock:
"""
Implementation of [Redlock](https://redis.io/topics/distlock)
Implementation of `Redlock <https://redis.io/topics/distlock>`_
with a single instance because aiocache is focused on single
instance cache.

This locking has some limitations and shouldn't be used in **bold**
This locking has some limitations and shouldn't be used in
situations where consistency is critical. Those locks are aimed for
performance reasons where failing on locking from time to time
is acceptable. TLDR: do NOT use this if you need real resource
exclusion.

Couple of considerations with the implementation:

- If the lease expires and there are clients waiting, all of them
- If the lease expires and there are calls waiting, all of them
will pass (blocking just happens for the first time).
- When a new client arrives, it will wait always at most lease
time. This means that the client could end up blocked longer
- When a new call arrives, it will wait always at most lease
time. This means that the call could end up blocked longer
than needed in case the lease from the blocker expires.

Backend specific implementation:
Expand All @@ -31,16 +35,35 @@ class _RedLock:
- Memcached follows the same approach with a difference. Due
to memcached lacking a way to execute the operation get and
delete commands atomically, any client is able to release the
lock. This is a limitation that can't be fixed safely.
lock. This is a limitation that can't be fixed without introducing
race conditions.
- Memory implementation is not distributed, it will only apply
to the process running. Say you have 4 processes running
APIs with aiocache, the locking will apply only per process
(still useful to reduce load per process).

Example usage::

from aiocache import RedisCache
from aiocache.lock import RedLock

cache = RedisCache()
async with RedLock(cache, 'key', lease=1): # Calls will wait here
result = await cache.get('key')
if result is not None:
return result
result = await super_expensive_function()
await cache.set('key', result)

In the example, first call will start computing the ``super_expensive_function``
while consecutive calls will block at most 1 second. If the blocking lasts for
more than 1 second, the calls will proceed to also calculate the
result of ``super_expensive_function``.
"""

_EVENTS = {}

def __init__(self, client, key, lease):
def __init__(self, client: Type[BaseCache], key: str, lease: Union[int, float]):
self.client = client
self.key = self.client._build_key(key + '-lock')
self.lease = lease
Expand All @@ -56,14 +79,14 @@ async def _acquire(self):
self.key,
self._value,
ttl=self.lease)
_RedLock._EVENTS[self.key] = asyncio.Event()
RedLock._EVENTS[self.key] = asyncio.Event()
except ValueError:
await self._wait_for_release()

async def _wait_for_release(self):
try:
await asyncio.wait_for(
_RedLock._EVENTS[self.key].wait(),
RedLock._EVENTS[self.key].wait(),
self.lease)
except asyncio.TimeoutError:
pass
Expand All @@ -76,25 +99,48 @@ async def __aexit__(self, exc_type, exc_value, traceback):
async def _release(self):
removed = await self.client._redlock_release(self.key, self._value)
if removed:
_RedLock._EVENTS.pop(self.key).set()
RedLock._EVENTS.pop(self.key).set()
return removed


class _OptimisticLock:
class OptimisticLock:
"""
Implementation of
[optimistic lock](https://en.wikipedia.org/wiki/Optimistic_concurrency_control)
`optimistic lock <https://en.wikipedia.org/wiki/Optimistic_concurrency_control>`_

Optimistic locking assumes multiple transactions can happen at the same time
and they will only fail if before finish, conflicting modifications with other
transactions are found, producing a roll back.

Finding a conflict will end up raising an `aiocache.exceptions.OptimisticLockError`
Finding a conflict will end up raising an `aiocache.lock.OptimisticLockError`
exception. A conflict happens when the value at the storage is different from
the one we retrieved when the lock started.

Example usage::

cache = RedisCache()

# The value stored in 'key' will be checked here
async with OptimisticLock(cache, 'key') as lock:
result = await super_expensive_call()
await lock.cas(result)

If any other call sets the value of ``key`` before the ``lock.cas`` is called,
an :class:`aiocache.lock.OptimisticLockError` will be raised. A way to make
the same call crash would be to change the value inside the lock like::

cache = RedisCache()

# The value stored in 'key' will be checked here
async with OptimisticLock(cache, 'key') as lock:
result = await super_expensive_call()
await cache.set('random_value') # This will make the `lock.cas` call fail
await lock.cas(result)

If the lock is created with an unexisting key, there will never be conflicts.
"""

def __init__(self, client, key):
def __init__(self, client: Type[BaseCache], key: str):
self.client = client
self.key = key
self.ns_key = self.client._build_key(key)
Expand All @@ -110,10 +156,13 @@ async def _acquire(self):
async def __aexit__(self, exc_type, exc_value, traceback):
pass

async def cas(self, value, **kwargs):
async def cas(self, value: Any, **kwargs) -> True:
"""
Checks and sets the specified value for the locked key. If the value has changed
since the lock was created, it will raise an OptimisticLockError exception.
since the lock was created, it will raise an :class:`aiocache.lock.OptimisticLockError`
exception.

:raises: :class:`aiocache.lock.OptimisticLockError`
"""
success = await self.client.set(self.key, value, _cas_token=self._token, **kwargs)
if not success:
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ Contents
plugins
configuration
decorators
locking
testing

Indices and tables
Expand Down
29 changes: 29 additions & 0 deletions docs/locking.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
.. _locking:

Locking
=======


.. WARNING::
The implementations provided are **NOT** intented for consistency/synchronization purposes. If you need a locking mechanism focused on consistency, consider implementing your mechanism based on more serious tools like https://zookeeper.apache.org/.


There are a couple of locking implementations than can help you to protect against `cache stampede effects <https://en.wikipedia.org/wiki/Cache_stampede>`_:


.. _redlock:

RedLock
-------

.. autoclass:: aiocache.lock.RedLock
:members:


.. _optimisticlock:

OptimisticLock
--------------

.. autoclass:: aiocache.lock.OptimisticLock
:members:
47 changes: 47 additions & 0 deletions examples/optimistic_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import asyncio
import logging
import random

from aiocache import RedisCache
from aiocache.lock import OptimisticLock, OptimisticLockError


logger = logging.getLogger(__name__)
cache = RedisCache(endpoint='127.0.0.1', port=6379, namespace='main')


async def expensive_function():
logger.warning('Expensive is being executed...')
await asyncio.sleep(random.uniform(0, 2))
return 'result'


async def my_view():

async with OptimisticLock(cache, 'key') as lock:
result = await expensive_function()
try:
await lock.cas(result)
except OptimisticLockError:
logger.warning(
'I failed setting the value because it is different since the lock started!')
return result


async def concurrent():
await cache.set('key', 'initial_value')
# All three calls will read 'initial_value' as the value to check and only
# the first one finishing will succeed because the others, when trying to set
# the value, will see that the value is not the same as when the lock started
await asyncio.gather(my_view(), my_view(), my_view())


def test_redis():
loop = asyncio.get_event_loop()
loop.run_until_complete(concurrent())
loop.run_until_complete(cache.delete('key'))
loop.run_until_complete(cache.close())


if __name__ == '__main__':
test_redis()
43 changes: 43 additions & 0 deletions examples/redlock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import asyncio
import logging

from aiocache import RedisCache
from aiocache.lock import RedLock


logger = logging.getLogger(__name__)
cache = RedisCache(endpoint='127.0.0.1', port=6379, namespace='main')


async def expensive_function():
logger.warning('Expensive is being executed...')
await asyncio.sleep(1)
return 'result'


async def my_view():

async with RedLock(cache, 'key', lease=2): # Wait at most 2 seconds
result = await cache.get('key')
if result is not None:
logger.info('Found the value in the cache hurray!')
return result

result = await expensive_function()
await cache.set('key', result)
return result


async def concurrent():
await asyncio.gather(my_view(), my_view(), my_view())


def test_redis():
loop = asyncio.get_event_loop()
loop.run_until_complete(concurrent())
loop.run_until_complete(cache.delete('key'))
loop.run_until_complete(cache.close())


if __name__ == '__main__':
test_redis()
2 changes: 1 addition & 1 deletion requirements-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pytest-mock==1.6.0
codecov==2.0.9
sphinx==1.6.2
marshmallow==2.13.5
asynctest==0.10.0
git+https://github.com/MartiusWeb/asynctest@async_magic
pystache==0.5.4

aiohttp==2.1.0
Expand Down
46 changes: 0 additions & 46 deletions tests/acceptance/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,52 +178,6 @@ async def test_clear_with_namespace_memory(self, memory_cache):

assert await memory_cache.exists(pytest.KEY, namespace="test") is False

@pytest.mark.asyncio
async def test_locking_dogpile(self, mocker, cache):
mocker.spy(cache, 'get')
mocker.spy(cache, 'set')
mocker.spy(cache, '_add')
mocker.spy(cache, '_redlock_release')

async def dummy():
res = await cache.get(pytest.KEY)
if res is not None:
return res

async with cache._redlock(pytest.KEY, lease=5):
res = await cache.get(pytest.KEY)
if res is not None:
return res
await asyncio.sleep(0.1)
await cache.set(pytest.KEY, "value")

await asyncio.gather(dummy(), dummy(), dummy(), dummy())
assert cache._add.call_count == 4
assert cache._redlock_release.call_count == 4
assert cache.get.call_count == 8
assert cache.set.call_count == 1

@pytest.mark.asyncio
async def test_locking_dogpile_lease_expiration(self, mocker, cache):
mocker.spy(cache, 'get')
mocker.spy(cache, 'set')

async def dummy():
res = await cache.get(pytest.KEY)
if res is not None:
return res

async with cache._redlock(pytest.KEY, lease=1):
res = await cache.get(pytest.KEY)
if res is not None:
return res
await asyncio.sleep(1.1)
await cache.set(pytest.KEY, "value")

await asyncio.gather(dummy(), dummy(), dummy(), dummy())
assert cache.get.call_count == 8
assert cache.set.call_count == 4


class TestMemcachedCache:

Expand Down
Loading