Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Robust_connection and RuntimeError "Writer is None" #288

Open
rsaleev opened this issue Jan 13, 2020 · 15 comments
Open

Robust_connection and RuntimeError "Writer is None" #288

rsaleev opened this issue Jan 13, 2020 · 15 comments

Comments

@rsaleev
Copy link

rsaleev commented Jan 13, 2020

Is it ok that robust_connection with args reconnection_interval=0.5 and timeout=2 throws exception when connection couldn't be re-established during exchange.publish(Message(...))?

RuntimeError: Writer is None
Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /usr/lib/python3.7/asyncio/tasks.py:596> exception=RuntimeError('Writer is None')>
Traceback (most recent call last):
  File "/usr/lib/python3.7/asyncio/tasks.py", line 223, in __step
    result = coro.send(None)
  File "/usr/lib/python3.7/asyncio/tasks.py", line 603, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.7/dist-packages/aiormq/tools.py", line 86, in __await__
    return (yield from self().__await__())
  File "/usr/local/lib/python3.7/dist-packages/aiormq/connection.py", line 136, in drain
    raise RuntimeError("Writer is %r" % self.writer)
RuntimeError: Writer is None
socket.send() raised exception. 

When I use just a Connecton class and wrap connection with:

while self.cnx is None:
           try:
               self.cnx = await connect(f"amqp://{self.user}:{self.password}@{self.host}/", loop=self.loop, timeout=2)
           except (ConnectionError, ConnectionRefusedError, ConnectionResetError, ConnectionAbortedError):
               continue
else:
           self.connected = True
           self.ch = await self.cnx.channel()
           if self.exchange_type == 'fanout':
               self.ex = await self.ch.declare_exchange(self.exchange, ExchangeType.FANOUT)
           elif self.exchange_type == 'topic':
               self.ex = await self.ch.declare_exchange(self.exchange, ExchangeType.TOPIC)
           elif self.exchange_type == 'direct':
               self.ex = await self.ch.declare_exchange(self.exchange, ExchangeType.DIRECT)
           if not self.queue is None:
               self.queue = await self.ch.declare_queue(self.queue_name, durable=True)
               await self.queue.bind(self.ex, self.bind)
           return self

I see no exceptions during code execution. I use this to reconnect to server if connection was lost during message sending. I know that I've reinvented a wheel, but it works as I expected.

@remram44
Copy link
Contributor

I'm also getting RuntimeError: Writer is None a lot under network load. I also get ChannelInvalidStateError: <Channel: "1"> closed from publish().

@remram44
Copy link
Contributor

2020-11-19 22:41:15,434 WARNING asyncio: socket.send() raised exception.
2020-11-19 22:41:15,435 WARNING asyncio: socket.send() raised exception.
2020-11-19 22:41:15,435 WARNING asyncio: socket.send() raised exception.
2020-11-19 22:41:15,436 INFO aio_pika.robust_connection: Connection to amqp://datamart:******@rabbitmq:5672// closed. Reconnecting after 5 seconds.
2020-11-19 22:41:15,437 WARNING asyncio: socket.send() raised exception.
2020-11-19 22:41:15,437 WARNING asyncio: socket.send() raised exception.
2020-11-19 22:41:15,438 ERROR asyncio: Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /usr/local/lib/python3.7/asyncio/tasks.py:623> exception=RuntimeError('Writer is None')>
Traceback (most recent call last):
    File "/usr/local/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
    return (yield from awaitable.__await__())
    File "/usr/local/lib/python3.7/site-packages/aiormq/tools.py", line 86, in __await__
    return (yield from self().__await__())
    File "/usr/local/lib/python3.7/site-packages/aiormq/connection.py", line 138, in drain
    raise RuntimeError("Writer is %r" % self.writer)
RuntimeError: Writer is None

@nhumrich
Copy link
Contributor

Any update on this issue?

@mosquito
Copy link
Owner

@nhumrich unfortunately not, too much have to be investigated.

@nhumrich
Copy link
Contributor

@mosquito So, I tracked this down to being an issue with the RobustChannel not actually being robust. When a channel dies, you can call await channel.reopen() and it fixes everything, but I would expect RobustChannel to handle this itself. Instead, the channel just dies, and reopen has to be called manually. Close callbacks aren't called when a channel is closed non-gracefully, so you can't just add a callback to re-open.

You can reproduce by creating a channel, then doing something that will close the channel such as binding a queue that doesn't exist to an exchange.

@Artimi
Copy link

Artimi commented Jun 16, 2021

Just for records I experienced this when due to a failure in my program I did not acked a message. 30 minutes after that I could see this in RabbitMQ:

2021-06-15 11:16:00.456 [warning] <0.6808.6> Consumer ctag1.cf784dcdb9f4fd0e427d8d29bc731e18 on channel 1 has timed out waiting for delivery acknowledgement. Timeout used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more
2021-06-15 11:16:00.458 [error] <0.6808.6> Channel error on connection <0.6799.6> (10.233.64.217:57120 -> 10.233.64.223:5672, vhost: '/', user: 'user'), channel 1:
operation none caused a channel exception precondition_failed: delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more

And then I got this exception

Task exception was never retrieved
future: <Task finished name='Task-222840' coro=<MessagingToPubsubAdapter.remove_mapping() done, defined at /usr/local/lib/python3.8/site-packages/rabbit_messaging/adapters.py:133> exception=ChannelInvalidStateError('writer is None')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/rabbit_messaging/adapters.py", line 136, in remove_mapping
    await self._consumer.unsubscribe(routing_key)
  File "/usr/local/lib/python3.8/site-packages/rabbit_messaging/consumer.py", line 122, in unsubscribe
    await self._queue.unbind(self._exchanges[schema], routing_key)
  File "/usr/local/lib/python3.8/site-packages/aio_pika/robust_queue.py", line 102, in unbind
    result = await super().unbind(
  File "/usr/local/lib/python3.8/site-packages/aio_pika/queue.py", line 184, in unbind
    return await asyncio.wait_for(
  File "/usr/local/lib/python3.8/asyncio/tasks.py", line 455, in wait_for
    return await fut
  File "/usr/local/lib/python3.8/site-packages/aiormq/channel.py", line 751, in queue_unbind
    return await self.rpc(
  File "/usr/local/lib/python3.8/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/usr/local/lib/python3.8/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
  File "/usr/local/lib/python3.8/site-packages/aiormq/channel.py", line 121, in rpc
    raise ChannelInvalidStateError("writer is None")
aiormq.exceptions.ChannelInvalidStateError: writer is None

I "fixed" it by acking all messages, but the problem there still remains.

@kaya-zekioglu
Copy link

@mosquito So, I tracked this down to being an issue with the RobustChannel not actually being robust. When a channel dies, you can call await channel.reopen() and it fixes everything, but I would expect RobustChannel to handle this itself. Instead, the channel just dies, and reopen has to be called manually. Close callbacks aren't called when a channel is closed non-gracefully, so you can't just add a callback to re-open.

You can reproduce by creating a channel, then doing something that will close the channel such as binding a queue that doesn't exist to an exchange.

I am encountering the same situation. Minimal reproduction:

import contextlib
import aio_pika

connection = await aio_pika.connect_robust(url)
exit_stack = contextlib.AsyncExitStack()
await exit_stack.enter_async_context(connection)
channel = await connection.channel()
await channel.declare_queue(subject_that_exists, passive=True)  # works fine
await channel.declare_queue(subject_that_doesnt_exist, passive=True)  # raises aiormq.exceptions.ChannelNotFoundEntity
await channel.declare_queue(subject_that_exists, passive=True)  # raises aiormq.exceptions.ChannelInvalidStateError: writer is None
await channel.reopen()
await channel.declare_queue(subject_that_exists, passive=True)  # works fine

@mosquito
Copy link
Owner

mosquito commented Aug 6, 2021

@kaya-zekioglu that's a good example but this doesn't reproduce that issue. robust_connection protects the network issues, but this example points to the business logic breaks. RabbitMQ closes the channel when the user performs the wrong action, and it's correct not a bug. So when you manually call the reopen() method you force the channel creation.

As in the topic described this issue raises when reconnection performs.

Of course, I can rewrite it for calls reopen when the channel is closed for any reason, but it's might have unexpected side effects.

@themanifold
Copy link

Of course, I can rewrite it for calls reopen when the channel is closed for any reason, but it's might have unexpected side effects.

@mosquito How about you make it optional?

@mosquito
Copy link
Owner

mosquito commented Aug 6, 2021

@themanifold I think it's the wrong way to done it.

@kaya-zekioglu
Copy link

@kaya-zekioglu that's a good example but this doesn't reproduce that issue. robust_connection protects the network issues, but this example points to the business logic breaks. RabbitMQ closes the channel when the user performs the wrong action, and it's correct not a bug. So when you manually call the reopen() method you force the channel creation.

As in the topic described this issue raises when reconnection performs.

Of course, I can rewrite it for calls reopen when the channel is closed for any reason, but it's might have unexpected side effects.

Thank you for the explanation. I understand now that what I encountered is not necessarily related to this issue.

@bokolob
Copy link

bokolob commented Nov 24, 2021

Hello. I have the same issue. But, my consumer starts with the no_ack flag.

In spite of this, I get 'operation none caused a channel exception precondition_failed: delivery acknowledgement on channel 1 timed out.'

Then channel is closed.

@bokolob
Copy link

bokolob commented Nov 24, 2021

Is it safe to have different consumers (with different queueus ) on the same channel?

@mosquito
Copy link
Owner

mosquito commented Nov 24, 2021

Is it safe to have different consumers (with different queueus ) on the same channel?

@bokolob yes it's a legal safe. This issue will be fixed in #381 I hope 😅.

Sorry guys it's really giant work that I will have to be done. Unfortunately, work and private life takes up more time than we would like, there is not much free time left, especially for such big changes as in #381.

@ltalirz
Copy link

ltalirz commented Aug 3, 2022

Since #436 (rework of #381) was released in aio-pika 7.0, has anyone checked whether it has resolved this issue?

ltalirz added a commit to aiidateam/plumpy that referenced this issue Aug 3, 2022
aio-pika 7.0 may address an important issue concerning the `robust_connection`.
mosquito/aio-pika#288
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants