From ac37dbec8f9c6416448ebae63ce4917d81d2b6be Mon Sep 17 00:00:00 2001 From: Alexey Popravka Date: Fri, 28 Apr 2017 21:43:37 +0300 Subject: [PATCH] fix pubsub receiver stop() --- aioredis/pubsub.py | 19 ++++++++++++++----- tests/pubsub_receiver_test.py | 6 ++++-- tests/py35_pubsub_receiver_test.py | 3 +-- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/aioredis/pubsub.py b/aioredis/pubsub.py index 4a85a34ab..f40fa1993 100644 --- a/aioredis/pubsub.py +++ b/aioredis/pubsub.py @@ -280,7 +280,7 @@ def get(self, *, encoding=None, decoder=None): * tuple of three elements: pattern channel, (target channel & message); - * or None in case Receiver is stopped. + * or None in case Receiver is not active or has just been stopped. :raises aioredis.ChannelClosedError: If listener is stopped and all messages have been received. @@ -290,7 +290,10 @@ def get(self, *, encoding=None, decoder=None): if not self._running: # inactive but running raise ChannelClosedError() return - ch, msg = yield from self._queue.get() + obj = yield from self._queue.get() + if obj is EndOfStream: + return + ch, msg = obj if ch.is_pattern: dest_ch, msg = msg if encoding is not None: @@ -318,6 +321,8 @@ def is_active(self): """Returns True if listener has any active subscription.""" if not self._queue.empty(): return True + # NOTE: this expression requires at least one subscriber + # to return True; return (self._running and any(ch.is_active for ch in self._refs.values())) @@ -328,6 +333,7 @@ def stop(self): so you must call unsubscribe before stopping this listener. """ self._running = False + self._put_nowait(EndOfStream, sender=None) if PY_35: def iter(self, *, encoding=None, decoder=None): @@ -346,11 +352,14 @@ def iter(self, *, encoding=None, decoder=None): # internal methods def _put_nowait(self, data, *, sender): - if not self._running: - logger.warning("Pub/Sub listener message after stop: %r, %r", + if not self._running and data is not EndOfStream: + logger.warning("Pub/Sub listener message after stop:" + " sender: %r, data: %r", sender, data) return - self._queue.put_nowait((sender, data)) + if data is not EndOfStream: + data = (sender, data) + self._queue.put_nowait(data) if self._waiter is not None: fut, self._waiter = self._waiter, None if fut.done(): diff --git a/tests/pubsub_receiver_test.py b/tests/pubsub_receiver_test.py index 329be9952..67cde4162 100644 --- a/tests/pubsub_receiver_test.py +++ b/tests/pubsub_receiver_test.py @@ -181,13 +181,15 @@ def test_stopped(create_connection, server, loop): yield from asyncio.sleep(0, loop=loop) assert len(cm.output) == 1 + # Receiver must have 1 EndOfStream message warn_messaege = ( "WARNING:aioredis:Pub/Sub listener message after stop: " - "<_Sender name:b'channel:1', is_pattern:False, receiver:" - ">, b'Hello'" + "sender: <_Sender name:b'channel:1', is_pattern:False, receiver:" + ">, data: b'Hello'" ) assert cm.output == [warn_messaege] + assert (yield from mpsc.get()) is None with pytest.raises(ChannelClosedError): yield from mpsc.get() res = yield from mpsc.wait_message() diff --git a/tests/py35_pubsub_receiver_test.py b/tests/py35_pubsub_receiver_test.py index bbf3ce494..5eaa317b5 100644 --- a/tests/py35_pubsub_receiver_test.py +++ b/tests/py35_pubsub_receiver_test.py @@ -41,7 +41,6 @@ async def coro(mpsc): async def test_pubsub_receiver_call_stop_with_empty_queue( create_redis, server, loop): sub = await create_redis(server.tcp_address, loop=loop) - pub = await create_redis(server.tcp_address, loop=loop) mpsc = Receiver(loop=loop) @@ -50,7 +49,7 @@ async def test_pubsub_receiver_call_stop_with_empty_queue( now = loop.time() loop.call_later(.5, mpsc.stop) - async for _ in mpsc.iter(): + async for i in mpsc.iter(): # noqa (flake8 bug with async for) assert False, "StopAsyncIteration not raised" dt = loop.time() - now assert dt <= 1.5