diff --git a/channels_redis/core.py b/channels_redis/core.py index b3e5a98..bc6e496 100644 --- a/channels_redis/core.py +++ b/channels_redis/core.py @@ -384,7 +384,10 @@ async def receive(self, channel): # Ensure all tasks are cancelled if we are cancelled. # Also see: https://bugs.python.org/issue23859 for task in tasks: - task.cancel() + if not task.cancel(): + assert task.done() + if task.result() is True: + self.receive_lock.release() raise diff --git a/tests/test_core.py b/tests/test_core.py index c9c085f..dbf2401 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -321,3 +321,25 @@ async def test_group_send_capacity(channel_layer): with pytest.raises(asyncio.TimeoutError): async with async_timeout.timeout(1): await channel_layer.receive(channel) + + +@pytest.mark.asyncio +async def test_receive_cancel(channel_layer): + """ + Makes sure we can cancel a receive without blocking + """ + channel_layer = RedisChannelLayer(capacity=10) + channel = await channel_layer.new_channel() + delay = 0 + while delay < 0.01: + await channel_layer.send(channel, {"type": "test.message", "text": "Ahoy-hoy!"}) + + task = asyncio.ensure_future(channel_layer.receive(channel)) + await asyncio.sleep(delay) + task.cancel() + delay += 0.001 + + try: + await asyncio.wait_for(task, None) + except asyncio.CancelledError: + pass