Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cached supports stampede locking #249

Merged
merged 1 commit into from
May 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aiocache/backends/memcached.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)

def _build_key(self, key, namespace=None):
ns_key = super()._build_key(key, namespace=namespace)
ns_key = super()._build_key(key, namespace=namespace).replace(' ', '_')
return str.encode(ns_key)

def __repr__(self): # pragma: no cover
Expand Down
47 changes: 38 additions & 9 deletions aiocache/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,34 @@ def _get_cache(


def cached(
ttl=0, key=None, key_from_attr=None, cache=SimpleMemoryCache,
serializer=None, plugins=None, alias=None, noself=False, **kwargs):
ttl=None, key=None, key_from_attr=None, cache=SimpleMemoryCache,
serializer=None, plugins=None, alias=None, noself=False, stampede_lease=None,
**kwargs):
"""
Caches the functions return value into a key generated with module_name, function_name and args.

In some cases you will need to send more args to configure the cache object.
An example would be endpoint and port for the RedisCache. You can send those args as
kwargs and they will be propagated accordingly.

:param ttl: int seconds to store the function call. Default is 0 which means no expiration.
:param ttl: int seconds to store the function call. Default is None which means no expiration.
:param key: str value to set as key for the function return. Takes precedence over
key_from_attr param. If key and key_from_attr are not passed, it will use module_name
+ function_name + args + kwargs
:param key_from_attr: arg or kwarg name from the function to use as a key.
:param key_from_attr: str arg or kwarg name from the function to use as a key.
:param cache: cache class to use when calling the ``set``/``get`` operations.
Default is ``aiocache.SimpleMemoryCache``.
:param serializer: serializer instance to use when calling the ``dumps``/``loads``.
Default is pulled from the cache class being used.
:param plugins: plugins to use when calling the cmd hooks
:param plugins: list plugins to use when calling the cmd hooks
Default is pulled from the cache class being used.
:param alias: str specifying the alias to load the config from. If alias is passed, other config
parameters are ignored. New cache is created every time.
:param noself: if you are decorating a class function, by default self is also used to generate
the key. This will result in same function calls done by different class instances to use
different cache keys. Use noself=True if you want to ignore it.
:param noself: bool if you are decorating a class function, by default self is also used to
generate the key. This will result in same function calls done by different class instances
to use different cache keys. Use noself=True if you want to ignore it.
:param stampede_lease: int seconds to lock function call to avoid cache stampede effects.
If 0 or None, (default) no locking happens.
"""
cache_kwargs = kwargs

Expand All @@ -61,7 +64,12 @@ async def wrapper(*args, **kwargs):
return value

except Exception:
logger.exception("Unexpected error with %s", cache_instance)
logger.exception("Unexpected error with %s", cache)

if stampede_lease:
async with cache_instance._redlock(cache_key, stampede_lease):
return await _cache_or_func(
cache_instance, cache_key, func(*args, **kwargs), ttl)

result = await func(*args, **kwargs)

Expand All @@ -77,6 +85,27 @@ async def wrapper(*args, **kwargs):
return cached_decorator


async def _cache_or_func(cache, key, func, ttl):
try:
value = await cache.get(key)
if value is not None:
asyncio.ensure_future(cache.close())
return value

except Exception:
logger.exception("Unexpected error with %s", cache)

result = await func

try:
await cache.set(key, result, ttl=ttl)
except Exception:
logger.exception("Unexpected error with %s", cache)

asyncio.ensure_future(cache.close())
return result


def multi_cached(
keys_from_attr, key_builder=None, ttl=0, cache=SimpleMemoryCache,
serializer=None, plugins=None, alias=None, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion tests/acceptance/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ async def dummy():
res = await cache.get(pytest.KEY)
if res is not None:
return res
await asyncio.sleep(1)
await asyncio.sleep(0.1)
await cache.set(pytest.KEY, "value")

await asyncio.gather(dummy(), dummy(), dummy(), dummy())
Expand Down
111 changes: 111 additions & 0 deletions tests/acceptance/test_decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import asyncio
import sys
import pytest
import random

from unittest import mock

from aiocache import cached, multi_cached


async def return_dict(keys=None):
ret = {}
for value, key in enumerate(keys or ['a', 'd', 'z', 'y']):
ret[key] = str(value)
return ret


async def stub(*args, key=None, seconds=0, **kwargs):
await asyncio.sleep(seconds)
if key:
return str(key)
return str(random.randint(1, 50))


class TestCachedDecorator:

@pytest.fixture(autouse=True)
def default_cache(self, mocker, cache):
mocker.patch("aiocache.decorators._get_cache", return_value=cache)

@pytest.mark.asyncio
async def test_cached_ttl(self, mocker, cache):
module = sys.modules[globals()['__name__']]
mocker.spy(module, 'stub')
cached_decorator = cached(ttl=1)

resp1 = await cached_decorator(stub)(1)
resp2 = await cached_decorator(stub)(1)

assert await cache.get('stubstub(1,){}') == resp1 == resp2
await asyncio.sleep(1)
assert await cache.get('stubstub(1,){}') is None

@pytest.mark.asyncio
async def test_cached_key_from_attr(self, mocker, cache):
module = sys.modules[globals()['__name__']]
mocker.spy(module, 'stub')
cached_decorator = cached(key_from_attr="key")

resp1 = await cached_decorator(stub)(key='key')
resp2 = await cached_decorator(stub)(key='key')

assert await cache.get('key') == resp1 == resp2

@pytest.mark.asyncio
async def test_cached_key(self, mocker, cache):
cached_decorator = cached(key="key")

resp1 = await cached_decorator(stub)()
resp2 = await cached_decorator(stub)()

assert await cache.get('key') == resp1 == resp2

@pytest.mark.asyncio
async def test_cached_stampede(self, mocker, cache):
mocker.spy(cache, 'get')
mocker.spy(cache, 'set')
module = sys.modules[globals()['__name__']]
mocker.spy(module, 'stub')
cached_decorator = cached(ttl=10, stampede_lease=2)

await asyncio.gather(
cached_decorator(stub)(1),
cached_decorator(stub)(1))

cache.get.assert_called_with('stubstub(1,){}')
assert cache.get.call_count == 4
cache.set.assert_called_with('stubstub(1,){}', mock.ANY, ttl=10)
assert cache.set.call_count == 1

@pytest.mark.asyncio
async def test_locking_dogpile_lease_expiration(self, mocker, cache):
mocker.spy(cache, 'get')
mocker.spy(cache, 'set')
module = sys.modules[globals()['__name__']]
mocker.spy(module, 'stub')
cached_decorator = cached(ttl=10, stampede_lease=1)

await asyncio.gather(
cached_decorator(stub)(1, seconds=2),
cached_decorator(stub)(1, seconds=2))

assert cache.get.call_count == 4
assert cache.set.call_count == 2


class TestMultiCachedDecorator:

@pytest.fixture(autouse=True)
def default_cache(self, mocker, cache):
mocker.patch("aiocache.decorators._get_cache", return_value=cache)

@pytest.mark.asyncio
async def test_multi_cached(self, mocker, cache):
multi_cached_decorator = multi_cached('keys')

default_keys = {'a', 'd', 'z', 'y'}
await multi_cached_decorator(return_dict)(keys=default_keys)

for key in default_keys:
assert await cache.get(key) is not None
3 changes: 3 additions & 0 deletions tests/ut/backends/test_memcached.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,6 @@ def test_build_key_bytes(self, set_test_namespace, memcached_cache, namespace, e

def test_build_key_no_namespace(self, memcached_cache):
assert memcached_cache._build_key(pytest.KEY, namespace=None) == pytest.KEY.encode()

def test_build_key_no_spaces(self, memcached_cache):
assert memcached_cache._build_key('hello world') == b'hello_world'
4 changes: 3 additions & 1 deletion tests/ut/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest
import asynctest

from aiocache.base import BaseCache
from aiocache.base import BaseCache, API
from aiocache import caches, RedisCache, MemcachedCache
from aiocache.plugins import BasePlugin
from aiocache.serializers import DefaultSerializer
Expand Down Expand Up @@ -57,6 +57,8 @@ def mock_cache(mocker):
cache = MockCache()
cache.timeout = 0.002
mocker.spy(cache, '_build_key')
for cmd in API.CMDS:
mocker.spy(cache, cmd.__name__)
cache.serializer = asynctest.Mock(spec=DefaultSerializer)
cache.plugins = [asynctest.Mock(spec=BasePlugin)]
return cache
Expand Down
1 change: 1 addition & 0 deletions tests/ut/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ async def test_redlock_release(self, base_cache):
with pytest.raises(NotImplementedError):
await base_cache._redlock_release(pytest.KEY, 20)

@pytest.mark.asyncio
async def test_close(self, base_cache):
assert await base_cache._close() is None

Expand Down
Loading