Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue.cansel() does not work in the consume mode #253

Open
Kaveshnikov opened this issue Sep 23, 2019 · 8 comments
Open

Queue.cansel() does not work in the consume mode #253

Kaveshnikov opened this issue Sep 23, 2019 · 8 comments

Comments

@Kaveshnikov
Copy link

Kaveshnikov commented Sep 23, 2019

Hi. I tried aio-pika with aiohttp and noticed, that queue.cancel() hanged infinitely. Receiving and sending messages work normally. channel.close() in the consume mode hangs too. Mentioned methods works normally in the client mode.

aio-pika==6.1.2
aiormq==2.7.5
pamqp==2.3.0
aiohttp==3.6.0
RabbitMQ 3.7.18

Code example:

async def on_message(message: aio_pika.IncomingMessage):
    async with message.process():
        print(message.body.decode())

async def consume(app: web.Application):
    connection: aio_pika.RobustConnection = app['mq']
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)
    queue: aio_pika.RobustQueue = await channel.declare_queue('hello', durable=True)
    consumer_tag = await queue.consume(on_message)

    yield
    # it hangs here
    await queue.cancel(consumer_tag, nowait=True)
    await channel.close()


async def init_mq(app: web.Application) -> None:
    connection: aio_pika.RobustConnection = await aio_pika.connect(
        host=HOST,
        port=PORT,
        login=LOGIN,
        password=PASSWORD,
        timeout=CONNECTION_TIMEOUT,
    )
    app['mq'] = connection

    yield
    await app['mq'].close()

def create_app() -> web.Application:
    app: web.Application = web.Application()
    app.cleanup_ctx.append(init_mq)
    app.cleanup_ctx.append(consume)

    return app

if __name__ == '__main__':
    app: web.Application = create_app()
    web.run_app(app, host='localhost', port=8080)
@Kaveshnikov
Copy link
Author

@mosquito

@mosquito
Copy link
Owner

Could you provide RabbitMQ traffic dump?

Hint command: tcpdump -w traff.pcap -pni any "port 5672"

@Kaveshnikov
Copy link
Author

@ghost
Copy link

ghost commented Oct 20, 2019

Hey there!

Although my code is slightly different, I believe its the same issue:

import asyncio

import aio_pika
from aiohttp import web


async def listen_to_rmq(app):
    fixture_id = "1"
    conn = await aio_pika.connect_robust("amqp://guest:guest@localhost/")

    ch = await conn.channel()

    ex = await ch.declare_exchange(fixture_id)
    q = await ch.declare_queue(name=fixture_id, exclusive=True)
    await q.bind(ex)

    try:
        async with q.iterator() as q_iter:
            async for msg in q_iter:
                async with msg.process():
                    print(msg.body.decode())
    except asyncio.CancelledError:
        pass
    finally:
        await q.unbind(ex, fixture_id)
        await q.delete()
        await ex.delete()
        await ch.close()


async def start_background_tasks(app):
    app['rmq_listener'] = asyncio.create_task(listen_to_rmq(app))


async def cleanup_background_tasks(app):
    app['rmq_listener'].cancel()
    await app['rmq_listener']


if __name__ == "__main__":
    app = web.Application()
    app.on_startup.append(start_background_tasks)
    app.on_cleanup.append(cleanup_background_tasks)
    web.run_app(app)

@moznuy
Copy link

moznuy commented Jan 23, 2020

I think I found problem. I have encountered similar issue in the past. It's related to the way how aiohttp handle stopping(and asyncio.run() because aiohttp.run_app is analogous).
aiohttp web.run_app:

    except (GracefulExit, KeyboardInterrupt):  # pragma: no cover
        pass
    finally:
        _cancel_all_tasks(loop)
        if sys.version_info >= (3, 6):  # don't use PY_36 to pass mypy
            loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

The problem is:
Reader task is cancelled before queue.cancel() rpc response received.
You can see that if you add

logging.basicConfig(level=logging.DEBUG)

in above examples:

 9180.18 -    aiormq.connection -    DEBUG - Reader task cancelled:
Traceback (most recent call last):
  File "/home/lamar/.envs/testPika/lib/python3.8/site-packages/aiormq/connection.py", line 380, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/home/lamar/.envs/testPika/lib/python3.8/site-packages/aiormq/connection.py", line 332, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/lib/python3.8/asyncio/streams.py", line 723, in readexactly
    await self._wait_for_data('readexactly')
  File "/usr/lib/python3.8/asyncio/streams.py", line 517, in _wait_for_data
    await self._waiter
asyncio.exceptions.CancelledError

So await queue.cancel() waits forever.
My understanding of _cancel_all_tasks(loop) is limited but as far as I can tell:

  1. All tasks are marked canceled.
  2. aiohttp _run_app task is scheduled after aiormq.connection.__reader
  3. Reader task runs and returns.
  4. _run_app tasks waits for our shutdown tasks - queue.cancel() but it stucks forever, because now nobody can resolve the future.

I think this issue is related to aio-libs/aiohttp#3593

@moznuy
Copy link

moznuy commented Jan 23, 2020

I'm not very familiar with either aio_pika or aiormq inner workings but after taking a peek in aiormq.connection.Connection.__reader() maybe something like this could work:

async def __reader(all_rpc_futures_are: asyncio.Future):
    cancellation_in_progress = False

    try:
        while True: # not self.reader.at_eof():
            if cancellation_in_progress and all_rpc_futures_are.done():
                break
            try:
                await asyncio.sleep(0.1) # Work
            except asyncio.CancelledError:
                print('Socket Received Cancellation')
                cancellation_in_progress = True
    finally:
        print('Socket close')
# Somewhere in connection.close() workflow
await asyncio.gather(<all rpc futures>)
all_rpc_futures_are.set_result(None)
# Or
gathering_future = asyncio.gather(<all rpc futures>)
gathering_future.add_done_callback(lambda: all_rpc_futures_are.set_result(None))

Some points that might be important:

  1. I don't know if __reader() could be cancelled from somewhere else beside
    Connection.close() workflow or aiohttp(and asyncio.run()) _cancel_all_tasks().
  2. This solution should not be endless after cancellation_in_progress == True. Maybe one can add timeout to Connection.close(timeout = None) and some logic to set_exception to all_rpc_futures_are

@multun
Copy link

multun commented Jan 27, 2020

@moznuy I think #3805 fixed the issue

@moznuy
Copy link

moznuy commented Jan 27, 2020

@multun Yes, I am quite happy that aiohttp now waits for cleanup tasks before cancelling all tasks.
But some slight changes to the example above:

import asyncio
import logging

import aio_pika


logging.basicConfig(format='%(relativeCreated)8.2f - %(name)20s - %(levelname)8s - %(message)s', level=logging.DEBUG)


async def on_message(msg):
    async with msg.process():
        logging.info("Received: %s", msg.body.decode())
        await asyncio.sleep(0.1)


async def main():
    fixture_id = "test_queue"
    conn: aio_pika.Connection = await aio_pika.connect("amqp://guest:guest@localhost/")

    ch: aio_pika.Channel = await conn.channel()
    await ch.set_qos(prefetch_count=3)

    q = await ch.declare_queue(name=fixture_id, durable=True)
    tag = await q.consume(callback=on_message)
    try:
        await asyncio.sleep(1000)
    finally:
        logging.debug("Before cancel")
        await q.cancel(tag)
        logging.debug("After cancel")
        await conn.close()


if __name__ == "__main__":
    asyncio.run(main())

And you still get hanging on await q.cancel() even without aiohttp:

   56.75 -              asyncio -    DEBUG - Using selector: EpollSelector
   62.51 -  aio_pika.connection -    DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost/">
   62.68 -  aio_pika.connection -    DEBUG - Channel created: <Channel "None#Not initialized channel">
   64.48 -       aio_pika.queue -    DEBUG - Declaring queue: <Queue(test_queue): auto_delete=False, durable=True, exclusive=False, arguments=None>
   65.18 -       aio_pika.queue -    DEBUG - Start to consuming queue: <Queue(test_queue): auto_delete=False, durable=True, exclusive=False, arguments=None>
   66.43 -                 root -     INFO - Received: Hello 65308
   66.49 -                 root -     INFO - Received: Hello 65309
   66.53 -                 root -     INFO - Received: Hello 65310
  170.34 -                 root -     INFO - Received: Hello 65311
  171.25 -                 root -     INFO - Received: Hello 65312
  171.46 -                 root -     INFO - Received: Hello 65313
  273.44 -                 root -     INFO - Received: Hello 65314
  275.26 -                 root -     INFO - Received: Hello 65315
  275.51 -                 root -     INFO - Received: Hello 65316
  377.35 -                 root -     INFO - Received: Hello 65317
  379.21 -                 root -     INFO - Received: Hello 65318
  379.81 -                 root -     INFO - Received: Hello 65319
  481.31 -                 root -     INFO - Received: Hello 65320
  483.42 -                 root -     INFO - Received: Hello 65321
  484.16 -                 root -     INFO - Received: Hello 65322
  583.59 -                 root -     INFO - Received: Hello 65323
  584.77 -                 root -     INFO - Received: Hello 65324
  585.51 -                 root -     INFO - Received: Hello 65325
  686.03 -                 root -     INFO - Received: Hello 65326
  686.11 -                 root -     INFO - Received: Hello 65327
  686.56 -                 root -     INFO - Received: Hello 65328
  722.92 -    aiormq.connection -    DEBUG - Reader task cancelled:
Traceback (most recent call last):
  File "/usr/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 599, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 567, in run_forever
    self._run_once()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 1819, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/lib/python3.8/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/lamar/.envs/api_server3/lib/python3.8/site-packages/aiormq/connection.py", line 377, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/home/lamar/.envs/api_server3/lib/python3.8/site-packages/aiormq/connection.py", line 329, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/lib/python3.8/asyncio/streams.py", line 723, in readexactly
    await self._wait_for_data('readexactly')
  File "/usr/lib/python3.8/asyncio/streams.py", line 517, in _wait_for_data
    await self._waiter
asyncio.exceptions.CancelledError
  723.99 -                 root -    DEBUG - Before cancel

...And nothing....

Because asyncio.run works similar to aiohttp run_app before #3805 fix.

One small note: I've tried a bunch of examples before stopping on this one because I wasn't getting consistent results: sometimes it was 50/50, sometimes program used to stop, sometimes adding / subtracting one line: logging.debug("Before cancel") used to change the behaviour completely upside down from 0 to 100%

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants