From 53d9d4acf306de269bf855d16931b0d3e49453a5 Mon Sep 17 00:00:00 2001 From: argaen Date: Sun, 4 Jun 2017 18:39:57 +0200 Subject: [PATCH] Single connection for multi_cached --- aiocache/decorators.py | 89 ++++++++++++++++---------- tests/acceptance/test_decorators.py | 4 +- tests/ut/test_decorators.py | 99 ++++++++++++++++++++--------- 3 files changed, 125 insertions(+), 67 deletions(-) diff --git a/aiocache/decorators.py b/aiocache/decorators.py index 2d603fd1..2aa55776 100644 --- a/aiocache/decorators.py +++ b/aiocache/decorators.py @@ -14,6 +14,10 @@ class cached: An example would be endpoint and port for the RedisCache. You can send those args as kwargs and they will be propagated accordingly. + Each call will use the same connection through all the cache calls. If you expect high + concurrency for the function you are decorating, it will be safer if you set high pool sizes + (in case of using memcached or redis). + :param ttl: int seconds to store the function call. Default is None which means no expiration. :param key: str value to set as key for the function return. Takes precedence over key_from_attr param. If key and key_from_attr are not passed, it will use module_name @@ -48,6 +52,10 @@ def __init__( self._plugins = plugins self._kwargs = kwargs + @property + def conn(self): + return self._conn + def __call__(self, f): if self.alias: @@ -56,7 +64,6 @@ def __call__(self, f): self.cache = _get_cache( cache=self._cache, serializer=self._serializer, plugins=self._plugins, **self._kwargs) - self._conn = self.cache.get_connection() @functools.wraps(f) async def wrapper(*args, **kwargs): @@ -64,6 +71,8 @@ async def wrapper(*args, **kwargs): return wrapper async def decorator(self, f, *args, **kwargs): + self._conn = self.cache.get_connection() + async with self._conn: key = self.get_cache_key(f, args, kwargs) @@ -74,7 +83,6 @@ async def decorator(self, f, *args, **kwargs): result = await f(*args, **kwargs) await self.set_in_cache(key, result) - asyncio.ensure_future(self.cache.close()) return result @@ -94,7 +102,7 @@ def _key_from_args(self, func, args, kwargs): async def get_from_cache(self, key): try: - value = await self._conn.get(key) + value = await self.conn.get(key) if value is not None: asyncio.ensure_future(self.cache.close()) return value @@ -103,7 +111,7 @@ async def get_from_cache(self, key): async def set_in_cache(self, key, value): try: - await self._conn.set(key, value, ttl=self.ttl) + await self.conn.set(key, value, ttl=self.ttl) except Exception: logger.exception("Couldn't set %s in key %s, unexpected error", value, key) @@ -117,6 +125,9 @@ class cached_stampede(cached): An example would be endpoint and port for the RedisCache. You can send those args as kwargs and they will be propagated accordingly. + This decorator doesn't reuse connections because it would lock the connection while its + locked waiting for the first call to finish calculating and this is counterproductive. + :param lease: int seconds to lock function call to avoid cache stampede effects. If 0 or None, no locking happens (default is 2). redis and memory backends support float ttls @@ -142,24 +153,26 @@ def __init__( super().__init__(**kwargs) self.lease = lease + @property + def conn(self): + return self.cache + async def decorator(self, f, *args, **kwargs): - async with self._conn: - key = self.get_cache_key(f, args, kwargs) + key = self.get_cache_key(f, args, kwargs) + + value = await self.get_from_cache(key) + if value is not None: + return value + async with self.conn._redlock(key, self.lease): value = await self.get_from_cache(key) if value is not None: return value - async with self.cache._redlock(key, self.lease): - value = await self.get_from_cache(key) - if value is not None: - return value - - result = await f(*args, **kwargs) + result = await f(*args, **kwargs) - await self.set_in_cache(key, result) + await self.set_in_cache(key, result) - asyncio.ensure_future(self.cache.close()) return result @@ -188,6 +201,10 @@ class multi_cached: If the attribute specified to be the key is an empty list, the cache will be ignored and the function will be called as expected. + Each call will use the same connection through all the cache calls. If you expect high + concurrency for the function you are decorating, it will be safer if you set high pool sizes + (in case of using memcached or redis). + :param keys_from_attr: arg or kwarg name from the function containing an iterable to use as keys to index in the cache. :param key_builder: Callable that allows to change the format of the keys before storing. @@ -211,6 +228,7 @@ def __init__( self.ttl = ttl self.alias = alias self.cache = None + self._conn = None self._cache = cache self._serializer = serializer @@ -232,25 +250,26 @@ async def wrapper(*args, **kwargs): return wrapper async def decorator(self, f, *args, **kwargs): - missing_keys = [] - partial = {} - keys = self.get_cache_keys(f, args, kwargs) - - values = await self.get_from_cache(*keys) - for key, value in zip(keys, values): - if value is None: - missing_keys.append(key) - else: - partial[key] = value - kwargs[self.keys_from_attr] = missing_keys - if values and None not in values: - return partial - - result = await f(*args, **kwargs) - result.update(partial) - - await self.set_in_cache(result) - asyncio.ensure_future(self.cache.close()) + self._conn = self.cache.get_connection() + async with self._conn: + missing_keys = [] + partial = {} + keys = self.get_cache_keys(f, args, kwargs) + + values = await self.get_from_cache(*keys) + for key, value in zip(keys, values): + if value is None: + missing_keys.append(key) + else: + partial[key] = value + kwargs[self.keys_from_attr] = missing_keys + if values and None not in values: + return partial + + result = await f(*args, **kwargs) + result.update(partial) + + await self.set_in_cache(result) return result @@ -263,7 +282,7 @@ async def get_from_cache(self, *keys): if not keys: return [] try: - values = await self.cache.multi_get(keys) + values = await self._conn.multi_get(keys) if None not in values: asyncio.ensure_future(self.cache.close()) return values @@ -273,6 +292,6 @@ async def get_from_cache(self, *keys): async def set_in_cache(self, result): try: - await self.cache.multi_set([(k, v) for k, v in result.items()], ttl=self.ttl) + await self._conn.multi_set([(k, v) for k, v in result.items()], ttl=self.ttl) except Exception: logger.exception("Couldn't set %s, unexpected error", result) diff --git a/tests/acceptance/test_decorators.py b/tests/acceptance/test_decorators.py index 483fb07c..645b13a7 100644 --- a/tests/acceptance/test_decorators.py +++ b/tests/acceptance/test_decorators.py @@ -58,10 +58,10 @@ async def test_cached_stampede(self, mocker, cache): decorator(stub)(1), decorator(stub)(1)) - cache.get.assert_called_with('acceptance.test_decoratorsstub(1,)[]', _conn=mock.ANY) + cache.get.assert_called_with('acceptance.test_decoratorsstub(1,)[]') assert cache.get.call_count == 4 cache.set.assert_called_with( - 'acceptance.test_decoratorsstub(1,)[]', mock.ANY, _conn=mock.ANY, ttl=10) + 'acceptance.test_decoratorsstub(1,)[]', mock.ANY, ttl=10) assert cache.set.call_count == 1 @pytest.mark.asyncio diff --git a/tests/ut/test_decorators.py b/tests/ut/test_decorators.py index b045f336..d0c3938a 100644 --- a/tests/ut/test_decorators.py +++ b/tests/ut/test_decorators.py @@ -8,6 +8,7 @@ from asynctest import Mock, CoroutineMock, ANY from aiocache import cached, cached_stampede, multi_cached, SimpleMemoryCache +from aiocache.base import _Conn async def stub(*args, value=None, seconds=0, **kwargs): @@ -26,7 +27,9 @@ def decorator(self, mocker, mock_cache): @pytest.fixture def decorator_call(self, decorator): - yield decorator(stub) + d = decorator(stub) + decorator._conn = decorator.cache.get_connection() + yield d @pytest.fixture(autouse=True) def spy_stub(self, mocker): @@ -94,7 +97,6 @@ async def test_calls_get_and_returns(self, decorator, decorator_call): async def test_get_from_cache_returns(self, decorator, decorator_call): decorator.cache.get = CoroutineMock(return_value=1) assert await decorator.get_from_cache("key") == 1 - assert decorator.cache.close.call_count == 1 @pytest.mark.asyncio async def test_get_from_cache_exception(self, decorator, decorator_call): @@ -105,7 +107,6 @@ async def test_get_from_cache_exception(self, decorator, decorator_call): async def test_get_from_cache_none(self, decorator, decorator_call): decorator.cache.get = CoroutineMock(return_value=None) assert await decorator.get_from_cache("key") is None - assert decorator.cache.close.call_count == 0 @pytest.mark.asyncio async def test_get_from_cache_conn(self, decorator, decorator_call): @@ -125,7 +126,6 @@ async def test_calls_fn_set_when_get_none(self, mocker, decorator, decorator_cal assert decorator.get_from_cache.call_count == 1 decorator.set_in_cache.assert_called_with("stub()[('value', 'value')]", "value") stub.assert_called_once_with(value="value") - assert decorator.cache.close.call_count == 1 @pytest.mark.asyncio async def test_calls_fn_raises_exception(self, mocker, decorator, decorator_call): @@ -189,7 +189,23 @@ async def test_reuses_connection(self, mocker, decorator, decorator_call): "stub()[('value', 'value')]", _conn=decorator._conn._conn) decorator.cache.set.assert_called_with( "stub()[('value', 'value')]", 'value', _conn=decorator._conn._conn, ttl=None) - assert decorator.cache.close.call_count == 1 + + @pytest.mark.asyncio + async def test_different_connection_per_call(self, mocker, decorator, decorator_call): + decorator.cache.get = CoroutineMock(return_value=None) + conn1, conn2 = _Conn(decorator.cache), _Conn(decorator.cache) + mocker.spy(conn1, 'get') + mocker.spy(conn1, 'set') + mocker.spy(conn2, 'get') + mocker.spy(conn2, 'set') + decorator.cache.get_connection = Mock(side_effect=[conn1, conn2]) + await decorator_call(value="value") + await decorator_call(value="value") + + conn1.get.call_count = 1 + conn1.set.call_count = 1 + conn2.get.call_count = 1 + conn2.set.call_count = 1 class TestCachedStampede: @@ -229,7 +245,7 @@ async def test_calls_get_and_returns(self, decorator, decorator_call): await decorator_call() - decorator.cache.get.assert_called_with('stub()[]', _conn=ANY) + decorator.cache.get.assert_called_with('stub()[]') assert decorator.cache.set.call_count == 0 assert stub.call_count == 0 @@ -249,7 +265,7 @@ async def test_calls_redlock(self, decorator, decorator_call): assert decorator.cache.get.call_count == 2 assert decorator.cache._redlock.call_count == 1 decorator.cache.set.assert_called_with( - "stub()[('value', 'value')]", "value", _conn=ANY, ttl=None) + "stub()[('value', 'value')]", "value", ttl=None) stub.assert_called_once_with(value="value") @pytest.mark.asyncio @@ -263,20 +279,19 @@ async def test_calls_locked_client(self, decorator, decorator_call): assert decorator.cache.get.call_count == 4 assert decorator.cache._redlock.call_count == 2 decorator.cache.set.assert_called_with( - "stub()[('value', 'value')]", "value", _conn=ANY, ttl=None) + "stub()[('value', 'value')]", "value", ttl=None) assert stub.call_count == 1 @pytest.mark.asyncio - async def test_reuses_connection(self, mocker, decorator, decorator_call): + async def test_doesnt_reuse_connection(self, mocker, decorator, decorator_call): decorator.cache.get = CoroutineMock(return_value=None) await decorator_call(value="value") - assert decorator._conn._conn is not None + assert decorator._conn is None decorator.cache.get.assert_called_with( - "stub()[('value', 'value')]", _conn=decorator._conn._conn) + "stub()[('value', 'value')]") decorator.cache.set.assert_called_with( - "stub()[('value', 'value')]", 'value', _conn=decorator._conn._conn, ttl=None) - assert decorator.cache.close.call_count == 1 + "stub()[('value', 'value')]", 'value', ttl=None) async def stub_dict(*args, keys=None, **kwargs): @@ -297,7 +312,9 @@ def decorator(self, mocker, mock_cache): @pytest.fixture def decorator_call(self, decorator): - yield decorator(stub_dict) + d = decorator(stub_dict) + decorator._conn = decorator.cache.get_connection() + yield d @pytest.fixture(autouse=True) def spy_stub_dict(self, mocker): @@ -353,36 +370,32 @@ async def test_get_from_cache(self, decorator, decorator_call): decorator.cache.multi_get = CoroutineMock(return_value=[1, 2, 3]) assert await decorator.get_from_cache('a', 'b', 'c') == [1, 2, 3] - decorator.cache.multi_get.assert_called_with(('a', 'b', 'c')) - assert decorator.cache.close.call_count == 1 + decorator.cache.multi_get.assert_called_with(('a', 'b', 'c'), _conn=ANY) @pytest.mark.asyncio async def test_get_from_cache_no_keys(self, decorator, decorator_call): assert await decorator.get_from_cache() == [] assert decorator.cache.multi_get.call_count == 0 - assert decorator.cache.close.call_count == 0 - - @pytest.mark.asyncio - async def test_get_from_cache_with_nones_no_close(self, decorator, decorator_call): - decorator.cache.multi_get = CoroutineMock(return_value=[1, None, 3]) - - assert await decorator.get_from_cache('a', 'b', 'c') == [1, None, 3] - decorator.cache.multi_get.assert_called_with(('a', 'b', 'c')) - assert decorator.cache.close.call_count == 0 @pytest.mark.asyncio async def test_get_from_cache_exception(self, decorator, decorator_call): decorator.cache.multi_get = CoroutineMock(side_effect=Exception) assert await decorator.get_from_cache('a', 'b', 'c') == [None, None, None] - decorator.cache.multi_get.assert_called_with(('a', 'b', 'c')) - assert decorator.cache.close.call_count == 0 + decorator.cache.multi_get.assert_called_with(('a', 'b', 'c'), _conn=ANY) + + @pytest.mark.asyncio + async def test_get_from_cache_conn(self, decorator, decorator_call): + decorator._conn._conn = Mock() + decorator.cache.multi_get = CoroutineMock(return_value=[1, 2, 3]) + + assert await decorator.get_from_cache('a', 'b', 'c') == [1, 2, 3] + decorator.cache.multi_get.assert_called_with(('a', 'b', 'c'), _conn=decorator._conn._conn) @pytest.mark.asyncio async def test_calls_no_keys(self, decorator, decorator_call): await decorator_call(keys=[]) assert decorator.cache.multi_get.call_count == 0 - assert decorator.cache.close.call_count == 1 assert stub_dict.call_count == 1 @pytest.mark.asyncio @@ -395,7 +408,6 @@ async def test_returns_from_multi_set(self, mocker, decorator, decorator_call): decorator.get_from_cache.assert_called_once_with('a', 'b') assert decorator.set_in_cache.call_count == 0 assert stub_dict.call_count == 0 - assert decorator.cache.close.call_count == 1 @pytest.mark.asyncio async def test_calls_fn_multi_set_when_multi_get_none(self, mocker, decorator, decorator_call): @@ -408,7 +420,6 @@ async def test_calls_fn_multi_set_when_multi_get_none(self, mocker, decorator, d decorator.get_from_cache.assert_called_once_with('a', 'b') decorator.set_in_cache.assert_called_with(ret) stub_dict.assert_called_once_with(1, keys=['a', 'b'], value="value") - assert decorator.cache.close.call_count == 1 @pytest.mark.asyncio async def test_calls_fn_with_only_missing_keys(self, mocker, decorator, decorator_call): @@ -470,3 +481,31 @@ async def what(self, keys=None, what=1): assert what.__name__ == "what" assert str(inspect.signature(what)) == '(self, keys=None, what=1)' assert inspect.getfullargspec(what.__wrapped__).args == ['self', 'keys', 'what'] + + @pytest.mark.asyncio + async def test_reuses_connection(self, mocker, decorator, decorator_call): + decorator.cache.multi_get = CoroutineMock(return_value=[None]) + await decorator_call(keys=[pytest.KEY]) + + assert decorator._conn._conn is not None + decorator.cache.multi_get.assert_called_with( + ('key',), _conn=decorator._conn._conn) + decorator.cache.multi_set.assert_called_with( + [('key', None)], _conn=decorator._conn._conn, ttl=0) + + @pytest.mark.asyncio + async def test_different_connection_per_call(self, mocker, decorator, decorator_call): + decorator.cache.get = CoroutineMock(return_value=None) + conn1, conn2 = _Conn(decorator.cache), _Conn(decorator.cache) + mocker.spy(conn1, 'multi_get') + mocker.spy(conn1, 'multi_set') + mocker.spy(conn2, 'multi_get') + mocker.spy(conn2, 'multi_set') + decorator.cache.get_connection = Mock(side_effect=[conn1, conn2]) + await decorator_call(keys=[pytest.KEY]) + await decorator_call(keys=[pytest.KEY]) + + conn1.multi_get.call_count = 1 + conn1.multi_set.call_count = 1 + conn2.multi_get.call_count = 1 + conn2.multi_set.call_count = 1