Skip to content

Commit

Permalink
Single connection for multi_cached
Browse files Browse the repository at this point in the history
  • Loading branch information
argaen committed Jun 4, 2017
1 parent 0e9c11c commit 53d9d4a
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 67 deletions.
89 changes: 54 additions & 35 deletions aiocache/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ class cached:
An example would be endpoint and port for the RedisCache. You can send those args as
kwargs and they will be propagated accordingly.
Each call will use the same connection through all the cache calls. If you expect high
concurrency for the function you are decorating, it will be safer if you set high pool sizes
(in case of using memcached or redis).
:param ttl: int seconds to store the function call. Default is None which means no expiration.
:param key: str value to set as key for the function return. Takes precedence over
key_from_attr param. If key and key_from_attr are not passed, it will use module_name
Expand Down Expand Up @@ -48,6 +52,10 @@ def __init__(
self._plugins = plugins
self._kwargs = kwargs

@property
def conn(self):
return self._conn

def __call__(self, f):

if self.alias:
Expand All @@ -56,14 +64,15 @@ def __call__(self, f):
self.cache = _get_cache(
cache=self._cache, serializer=self._serializer,
plugins=self._plugins, **self._kwargs)
self._conn = self.cache.get_connection()

@functools.wraps(f)
async def wrapper(*args, **kwargs):
return await self.decorator(f, *args, **kwargs)
return wrapper

async def decorator(self, f, *args, **kwargs):
self._conn = self.cache.get_connection()

async with self._conn:
key = self.get_cache_key(f, args, kwargs)

Expand All @@ -74,7 +83,6 @@ async def decorator(self, f, *args, **kwargs):
result = await f(*args, **kwargs)

await self.set_in_cache(key, result)
asyncio.ensure_future(self.cache.close())

return result

Expand All @@ -94,7 +102,7 @@ def _key_from_args(self, func, args, kwargs):

async def get_from_cache(self, key):
try:
value = await self._conn.get(key)
value = await self.conn.get(key)
if value is not None:
asyncio.ensure_future(self.cache.close())
return value
Expand All @@ -103,7 +111,7 @@ async def get_from_cache(self, key):

async def set_in_cache(self, key, value):
try:
await self._conn.set(key, value, ttl=self.ttl)
await self.conn.set(key, value, ttl=self.ttl)
except Exception:
logger.exception("Couldn't set %s in key %s, unexpected error", value, key)

Expand All @@ -117,6 +125,9 @@ class cached_stampede(cached):
An example would be endpoint and port for the RedisCache. You can send those args as
kwargs and they will be propagated accordingly.
This decorator doesn't reuse connections because it would lock the connection while its
locked waiting for the first call to finish calculating and this is counterproductive.
:param lease: int seconds to lock function call to avoid cache stampede effects.
If 0 or None, no locking happens (default is 2). redis and memory backends support
float ttls
Expand All @@ -142,24 +153,26 @@ def __init__(
super().__init__(**kwargs)
self.lease = lease

@property
def conn(self):
return self.cache

async def decorator(self, f, *args, **kwargs):
async with self._conn:
key = self.get_cache_key(f, args, kwargs)
key = self.get_cache_key(f, args, kwargs)

value = await self.get_from_cache(key)
if value is not None:
return value

async with self.conn._redlock(key, self.lease):
value = await self.get_from_cache(key)
if value is not None:
return value

async with self.cache._redlock(key, self.lease):
value = await self.get_from_cache(key)
if value is not None:
return value

result = await f(*args, **kwargs)
result = await f(*args, **kwargs)

await self.set_in_cache(key, result)
await self.set_in_cache(key, result)

asyncio.ensure_future(self.cache.close())
return result


Expand Down Expand Up @@ -188,6 +201,10 @@ class multi_cached:
If the attribute specified to be the key is an empty list, the cache will be ignored and
the function will be called as expected.
Each call will use the same connection through all the cache calls. If you expect high
concurrency for the function you are decorating, it will be safer if you set high pool sizes
(in case of using memcached or redis).
:param keys_from_attr: arg or kwarg name from the function containing an iterable to use
as keys to index in the cache.
:param key_builder: Callable that allows to change the format of the keys before storing.
Expand All @@ -211,6 +228,7 @@ def __init__(
self.ttl = ttl
self.alias = alias
self.cache = None
self._conn = None

self._cache = cache
self._serializer = serializer
Expand All @@ -232,25 +250,26 @@ async def wrapper(*args, **kwargs):
return wrapper

async def decorator(self, f, *args, **kwargs):
missing_keys = []
partial = {}
keys = self.get_cache_keys(f, args, kwargs)

values = await self.get_from_cache(*keys)
for key, value in zip(keys, values):
if value is None:
missing_keys.append(key)
else:
partial[key] = value
kwargs[self.keys_from_attr] = missing_keys
if values and None not in values:
return partial

result = await f(*args, **kwargs)
result.update(partial)

await self.set_in_cache(result)
asyncio.ensure_future(self.cache.close())
self._conn = self.cache.get_connection()
async with self._conn:
missing_keys = []
partial = {}
keys = self.get_cache_keys(f, args, kwargs)

values = await self.get_from_cache(*keys)
for key, value in zip(keys, values):
if value is None:
missing_keys.append(key)
else:
partial[key] = value
kwargs[self.keys_from_attr] = missing_keys
if values and None not in values:
return partial

result = await f(*args, **kwargs)
result.update(partial)

await self.set_in_cache(result)

return result

Expand All @@ -263,7 +282,7 @@ async def get_from_cache(self, *keys):
if not keys:
return []
try:
values = await self.cache.multi_get(keys)
values = await self._conn.multi_get(keys)
if None not in values:
asyncio.ensure_future(self.cache.close())
return values
Expand All @@ -273,6 +292,6 @@ async def get_from_cache(self, *keys):

async def set_in_cache(self, result):
try:
await self.cache.multi_set([(k, v) for k, v in result.items()], ttl=self.ttl)
await self._conn.multi_set([(k, v) for k, v in result.items()], ttl=self.ttl)
except Exception:
logger.exception("Couldn't set %s, unexpected error", result)
4 changes: 2 additions & 2 deletions tests/acceptance/test_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ async def test_cached_stampede(self, mocker, cache):
decorator(stub)(1),
decorator(stub)(1))

cache.get.assert_called_with('acceptance.test_decoratorsstub(1,)[]', _conn=mock.ANY)
cache.get.assert_called_with('acceptance.test_decoratorsstub(1,)[]')
assert cache.get.call_count == 4
cache.set.assert_called_with(
'acceptance.test_decoratorsstub(1,)[]', mock.ANY, _conn=mock.ANY, ttl=10)
'acceptance.test_decoratorsstub(1,)[]', mock.ANY, ttl=10)
assert cache.set.call_count == 1

@pytest.mark.asyncio
Expand Down
Loading

0 comments on commit 53d9d4a

Please sign in to comment.