Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnect does not work with version 8.2.4 but works with version 8.2.3 (8.2.2, 8.2.1, 8.2.0 as well) #505

Closed
smfx opened this issue Nov 6, 2022 · 12 comments · Fixed by #533

Comments

@smfx
Copy link

smfx commented Nov 6, 2022

Hello!

First of all, thank you for your great work! I probably found a bug with aio-pika 8.2.4 version. I'm using windows, docker image with rabbitmq, python 3.9.

Code example for reproducing:

import asyncio

import aio_pika
from aio_pika import ExchangeType
from aio_pika.abc import AbstractIncomingMessage
import aiorun


async def _consume(message: AbstractIncomingMessage) -> None:
    async with message.process(requeue=True, ignore_processed=True):
        print(
            f"[x] {message.routing_key}"
            f"[x] Decoded {message.body.decode()}"
        )


async def _check_tasks() -> None:
    while True:
        tasks = asyncio.all_tasks()
        print(f"Number of tasks: {len(tasks)}")
        for task in tasks:
            print(f"Task: {task.get_name()} {task.get_coro()}")
        print("================================")
        await asyncio.sleep(10)


async def _receive() -> None:
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/"
    )
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=20)
    exchange = await channel.declare_exchange(
        "broker_publish",
        ExchangeType.TOPIC,
    )
    queue = await channel.declare_queue(
        "broker_events",
        durable=False,
        exclusive=False,
        auto_delete=False,
    )
    await queue.bind(exchange, routing_key="#")
    await queue.consume(_consume)


async def main() -> None:
    asyncio.create_task(_receive())
    asyncio.create_task(_check_tasks())


if __name__ == "__main__":
    aiorun.run(
        main(),
        stop_on_unhandled_errors=False,
    )

Steps:

  1. Run the script (with aio-pika 8.2.4). You will see 7 running tasks (I think on Linux it would be 6).

  2. Publish message to queue. You will see this message in terminal.

  3. Restart docker container with RabbitMQ.

  4. Thera are only 6 running tasks after reconnect (Channel._reader is missing)

  5. Try to publish another message. No result in terminal. In RMQ GUI no consumers related to this queue.

  6. Repeat steps 1-5 with aio-pika 8.2.0. Number of tasks will remain after reconnect and the script will receive message.

I can provide more information if needed. Thank you very much!

@smfx smfx changed the title Reconnect does not work with version 8.2.4 but working with version 8.2.0 Reconnect does not work with version 8.2.4 but works with version 8.2.0 Nov 6, 2022
@mosquito
Copy link
Owner

mosquito commented Nov 7, 2022

Hi @smfx, thanks for your kind words.
I tried to reproduce your issue on my MacOS and it worked for me with code from the master branch:

Number of tasks: 4
Task: Task-3 <coroutine object _check_tasks at 0x101b97370>
Task: Task-2 <coroutine object _receive at 0x101b97300>
Task: Task-4 <coroutine object Connection.connect at 0x101b97760>
Task: Task-1 <coroutine object main at 0x1017cd310>
================================
Number of tasks: 7
Task: Task-3 <coroutine object _check_tasks at 0x101b97370>
Task: Task-73 <coroutine object StreamReader.readexactly at 0x101b97680>
Task: Task-1 <coroutine object main at 0x1017cd310>
Task: Task-21 <coroutine object Connection.__writer at 0x101b97990>
Task: Task-24 <coroutine object Channel._reader at 0x101b976f0>
Task: Task-22 <coroutine object Connection.__heartbeat at 0x101b97c30>
Task: Task-20 <coroutine object Connection.__reader at 0x101b97920>
================================
# <---- call `docker restart aio_pika_rabbitmq` here
Unexpected connection close from remote "amqp://guest:******@127.0.0.1:5672/", Connection.Close(reply_code=320, reply_text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
NoneType: None
Number of tasks: 3
Task: Task-3 <coroutine object _check_tasks at 0x101b97370>
Task: Task-1 <coroutine object main at 0x1017cd310>
Task: Task-84 <coroutine object OneShotCallback.__task_inner at 0x101c052a0>
================================
Connection attempt to "amqp://guest:******@127.0.0.1/" failed: Server connection unexpectedly closed. Reconnecting after 5 seconds.
Connection attempt to "amqp://guest:******@127.0.0.1/" failed: Server connection unexpectedly closed. Reconnecting after 5 seconds.
Number of tasks: 3
Task: Task-3 <coroutine object _check_tasks at 0x101b97370>
Task: Task-1 <coroutine object main at 0x1017cd310>
Task: Task-84 <coroutine object OneShotCallback.__task_inner at 0x101c052a0>
================================
Connection attempt to "amqp://guest:******@127.0.0.1/" failed: Server connection unexpectedly closed. Reconnecting after 5 seconds.
Connection attempt to "amqp://guest:******@127.0.0.1/" failed: Server connection unexpectedly closed. Reconnecting after 5 seconds.
Number of tasks: 3
Task: Task-3 <coroutine object _check_tasks at 0x101b97370>
Task: Task-1 <coroutine object main at 0x1017cd310>
Task: Task-84 <coroutine object OneShotCallback.__task_inner at 0x101c052a0>
================================
Number of tasks: 6
Task: Task-3 <coroutine object _check_tasks at 0x101b97370>
Task: Task-123 <coroutine object Connection.__heartbeat at 0x101c04580>
Task: Task-1 <coroutine object main at 0x1017cd310>
Task: Task-122 <coroutine object Connection.__writer at 0x101c060a0>
Task: Task-121 <coroutine object Connection.__reader at 0x101c06030>
Task: Task-125 <coroutine object StreamReader.readexactly at 0x101b97840>
================================
Number of tasks: 6
Task: Task-3 <coroutine object _check_tasks at 0x101b97370>
Task: Task-123 <coroutine object Connection.__heartbeat at 0x101c04580>
Task: Task-1 <coroutine object main at 0x1017cd310>
Task: Task-122 <coroutine object Connection.__writer at 0x101c060a0>
Task: Task-121 <coroutine object Connection.__reader at 0x101c06030>
Task: Task-125 <coroutine object StreamReader.readexactly at 0x101b97840>
================================

To be honest, I modified your code a bit to not install aiorun.

import asyncio

import aio_pika
from aio_pika import ExchangeType
from aio_pika.abc import AbstractIncomingMessage


async def _consume(message: AbstractIncomingMessage) -> None:
    async with message.process(requeue=True, ignore_processed=True):
        print(f"[x] {message.routing_key}"
              f"[x] Decoded {message.body.decode()}")


async def _check_tasks() -> None:
    while True:
        tasks = asyncio.all_tasks()
        print(f"Number of tasks: {len(tasks)}")
        for task in tasks:
            print(f"Task: {task.get_name()} {task.get_coro()}")
        print("================================")
        await asyncio.sleep(10)


async def _receive() -> None:
    connection = await aio_pika.connect_robust()
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=20)
    exchange = await channel.declare_exchange("broker_publish",
                                              ExchangeType.TOPIC)
    queue = await channel.declare_queue("broker_events", durable=False,
                                        exclusive=False, auto_delete=False)
    await queue.bind(exchange, routing_key="#")
    await queue.consume(_consume)


async def main() -> None:
    await asyncio.gather(_receive(), _check_tasks())


if __name__ == "__main__":
    asyncio.run(main())

@mosquito
Copy link
Owner

mosquito commented Nov 7, 2022

Maybe there is a problem only on Windows, I have nowhere to check it now, maybe it will appear later. If you manage to find out the details feel free to ping here.

@smfx
Copy link
Author

smfx commented Nov 7, 2022

Thank you for your assistant! But it seems the problem persists in your output too. You had the following tasks before reconnect:

Number of tasks: 7
Task: Task-3 <coroutine object _check_tasks at 0x101b97370>
Task: Task-73 <coroutine object StreamReader.readexactly at 0x101b97680>
Task: Task-1 <coroutine object main at 0x1017cd310>
Task: Task-21 <coroutine object Connection.__writer at 0x101b97990>
Task: Task-24 <coroutine object **Channel._reader** at 0x101b976f0>
Task: Task-22 <coroutine object Connection.__heartbeat at 0x101b97c30>
Task: Task-20 <coroutine object Connection.__reader at 0x101b97920>

Pay attention too Channel._reader. After <---- call `docker restart aio_pika_rabbitmq` here there is no this task:

Number of tasks: 6
Task: Task-3 <coroutine object _check_tasks at 0x101b97370>
Task: Task-123 <coroutine object Connection.__heartbeat at 0x101c04580>
Task: Task-1 <coroutine object main at 0x1017cd310>
Task: Task-122 <coroutine object Connection.__writer at 0x101c060a0>
Task: Task-121 <coroutine object Connection.__reader at 0x101c06030>
Task: Task-125 <coroutine object StreamReader.readexactly at 0x101b97840>

So, if you publish message (after reconnect) to the exchange it wont be processed (with aio-pika 8.2.0 it would be processed). Actually, I can see the queue wont be restored after docker restart. Maybe, it is intentional behavior, but my application just stops receiving messages after docker restart with no errors :) I will try to find MacOS for testing :)

@dvf
Copy link
Contributor

dvf commented Nov 13, 2022

@smfx to reproduce, have you tried killing the RabbitMQ process in the Docker container?

It's likely that RabbitMQ is respecting SIGTERM which is triggered on docker restart.

@mosquito
Copy link
Owner

I already found the reason, the fact is that the garbage collector removes the channel object and sometimes the connection. in the above example, after the function ends, neither the channel nor the connection remains referenced.

@smfx
Copy link
Author

smfx commented Nov 14, 2022

Hi!

I looked a little bit more:

  1. I reproduced this behavior on Mac.
  2. the problem appears on version 8.2.4, everything is fine on 8.2.3. I can compare branches on this weekend and try to figure out how to fix it if needed.
  3. I was thinking how I could test these situations (client side) so that a version update would not cause the bugs to appear. Tried to write test with testcontainers library.
    @mosquito Maybe you have thoughts on this? I would really appreciate if you give me advise how to test such situations. Or testing third-party library is pointless?
    On Windows, the test runs if you set the TC_HOST=localhost environment variable.
Code of the test with testcontainers
import asyncio
from typing import Dict

from aio_pika.abc import AbstractIncomingMessage
from aio_pika import ExchangeType, connect_robust, Message
from testcontainers.rabbitmq import RabbitMqContainer
import pytest
import json


incoming_message_list = []


class TestConsumer:
    """Silly test consumer."""

    def __init__(
        self,
        rmq_connection_str: str,
        exchange_name: str,
        exchange_type: ExchangeType,
        queue_name: str,
    ):
        self._rmq_connection_str = rmq_connection_str
        self._exchange_name = exchange_name
        self._exchange_type = exchange_type
        self._queue_name = queue_name

        self._connection = None
        self._queue = None
        self._queue_consumer_tag = None

    async def start_consuming(self) -> None:
        """Connecting to queue and waiting for messages."""
        self._connection = await connect_robust(
            self._rmq_connection_str,
        )
        channel = await self._connection.channel()
        await channel.set_qos(1)
        exchange = await channel.declare_exchange(
            self._exchange_name,
            self._exchange_type,
        )
        self._queue = await channel.declare_queue(
            self._queue_name,
            durable=False,
            exclusive=False,
            auto_delete=False,
        )
        await self._queue.bind(exchange, routing_key="#")
        self._queue_consumer_tag = await self._queue.consume(self._consume)

    async def _consume(
        self, message: AbstractIncomingMessage
    ) -> None:
        """Message handler."""
        async with message.process():
            incoming_message_list.append(message.body.decode())


async def publish_message(
    message: Dict,
    connection_string: str,
    exchange_name: str,
    exchange_type: ExchangeType = ExchangeType.TOPIC,
) -> None:
    """Sends message to exchange."""
    _connection = await connect_robust(connection_string,)
    channel = await _connection.channel()
    exchange = await channel.declare_exchange(
        exchange_name,
        exchange_type,
    )
    await exchange.publish(
        Message(json.dumps(message).encode()),
        routing_key="test"
    )


@pytest.mark.asyncio
async def test_receive_message_after_reconnect():
    """Check that messages are still being delivered after reconnecting."""
    rmq_container = RabbitMqContainer()
    # Пробрасываем порты, чтобы после рестарта они не поменялись
    rmq_container.ports = {5672: 5672}
    with rmq_container as rmq:
        # Arrange
        connection_string = f"amqp://guest:guest@" \
                            f"{rmq.get_container_host_ip()}:" \
                            f"{rmq.get_exposed_port(rmq.RABBITMQ_NODE_PORT)}/"
        exchange_name = "test_exchange"
        queue_name = "test_queue"

        consumer = TestConsumer(
            rmq_connection_str=connection_string,
            exchange_name=exchange_name,
            exchange_type=ExchangeType.TOPIC,
            queue_name=queue_name,
        )
        await consumer.start_consuming()
        # Waiting consumer connection
        await asyncio.sleep(5)
        rmq.get_wrapped_container().restart()
        # Waiting rmq restart
        await asyncio.sleep(15)
        # Act
        connection_string = f"amqp://guest:guest@" \
                            f"{rmq.get_container_host_ip()}:" \
                            f"{rmq.get_exposed_port(rmq.RABBITMQ_NODE_PORT)}/"
        await publish_message(
            {"test": "message111"},
            connection_string=connection_string,
            exchange_name=exchange_name,
        )
        await asyncio.sleep(0.1)

        # Assert
        assert len(incoming_message_list) == 1

        # Cleanup
        incoming_message_list.clear()

@smfx
Copy link
Author

smfx commented Nov 14, 2022

@dvf Hello! No, I did not try this. For me it is easier to restart container :) But I tried to connect to the RMQ on the remote server (K8S cluster) and imitate network error (switch off wifi) and the problem remains.

@smfx smfx changed the title Reconnect does not work with version 8.2.4 but works with version 8.2.0 Reconnect does not work with version 8.2.4 but works with version 8.2.3 (8.2.2, 8.2.1, 8.2.0 as well) Nov 14, 2022
@mosquito
Copy link
Owner

@smfx see this #505 (comment)

@olii
Copy link

olii commented Apr 3, 2023

I already found the reason, the fact is that the garbage collector removes the channel object and sometimes the connection. in the above example, after the function ends, neither the channel nor the connection remains referenced.

So the workaround for this is to keep a robust connection and robust channel in a variable in my application code? Shouldn't there be any other way instead of using weak references in the robust connection?
https://github.com/mosquito/aio-pika/blob/master/aio_pika/robust_connection.py#L51

@MartinWallgren
Copy link

We are seeing the same thing in version 9.0.7

If we don't hold a reference to the RobustChannel, we always fail to resume consuming after connection issues to RabbitMQ,

@MartinWallgren
Copy link

Why do we need weak references for channels in RobustConnection?
Do we have a risk of stale channel instances lingering there?

@mosquito
Copy link
Owner

@MartinWallgren this already reworked in #533

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants