Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Commit

Permalink
fix pubsub receiver stop()
Browse files Browse the repository at this point in the history
  • Loading branch information
popravich committed Apr 28, 2017
1 parent 2da8651 commit ec40c88
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
19 changes: 14 additions & 5 deletions aioredis/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def get(self, *, encoding=None, decoder=None):
* tuple of three elements: pattern channel, (target channel & message);
* or None in case Receiver is stopped.
* or None in case Receiver is not active or has just been stopped.
:raises aioredis.ChannelClosedError: If listener is stopped
and all messages have been received.
Expand All @@ -290,7 +290,10 @@ def get(self, *, encoding=None, decoder=None):
if not self._running: # inactive but running
raise ChannelClosedError()
return
ch, msg = yield from self._queue.get()
obj = yield from self._queue.get()
if obj is EndOfStream:
return
ch, msg = obj
if ch.is_pattern:
dest_ch, msg = msg
if encoding is not None:
Expand Down Expand Up @@ -318,6 +321,8 @@ def is_active(self):
"""Returns True if listener has any active subscription."""
if not self._queue.empty():
return True
# NOTE: this expression requires at least one subscriber
# to return True;
return (self._running and
any(ch.is_active for ch in self._refs.values()))

Expand All @@ -328,6 +333,7 @@ def stop(self):
so you must call unsubscribe before stopping this listener.
"""
self._running = False
self._put_nowait(EndOfStream, sender=None)

if PY_35:
def iter(self, *, encoding=None, decoder=None):
Expand All @@ -346,11 +352,14 @@ def iter(self, *, encoding=None, decoder=None):
# internal methods

def _put_nowait(self, data, *, sender):
if not self._running:
logger.warning("Pub/Sub listener message after stop: %r, %r",
if not self._running and data is not EndOfStream:
logger.warning("Pub/Sub listener message after stop:"
" sender: %r, data: %r",
sender, data)
return
self._queue.put_nowait((sender, data))
if data is not EndOfStream:
data = (sender, data)
self._queue.put_nowait(data)
if self._waiter is not None:
fut, self._waiter = self._waiter, None
if fut.done():
Expand Down
6 changes: 4 additions & 2 deletions tests/pubsub_receiver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,15 @@ def test_stopped(create_connection, server, loop):
yield from asyncio.sleep(0, loop=loop)

assert len(cm.output) == 1
# Receiver must have 1 EndOfStream message
warn_messaege = (
"WARNING:aioredis:Pub/Sub listener message after stop: "
"<_Sender name:b'channel:1', is_pattern:False, receiver:"
"<Receiver is_active:False, senders:1, qsize:0>>, b'Hello'"
"sender: <_Sender name:b'channel:1', is_pattern:False, receiver:"
"<Receiver is_active:True, senders:1, qsize:1>>, data: b'Hello'"
)
assert cm.output == [warn_messaege]

assert (yield from mpsc.get()) is None
with pytest.raises(ChannelClosedError):
yield from mpsc.get()
res = yield from mpsc.wait_message()
Expand Down

0 comments on commit ec40c88

Please sign in to comment.