Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Pubsub does not auto-reconnect with get_message() #1130

Open
luhn opened this issue Sep 16, 2021 · 6 comments · May be fixed by #1156
Open

Pubsub does not auto-reconnect with get_message() #1130

luhn opened this issue Sep 16, 2021 · 6 comments · May be fixed by #1156

Comments

@luhn
Copy link

luhn commented Sep 16, 2021

Counterpart of redis/redis-py#1572

Run the following script:

import itertools
import traceback
import asyncio

import aioredis


async def consumer(pubsub):
    await pubsub.subscribe('test')
    while True:
        try:
            message = await pubsub.get_message(
                ignore_subscribe_messages=True,
                timeout=1.0,
            )
            if message:
                print(f'Receive: {message}')
        except asyncio.CancelledError:
            raise
        except Exception:
            traceback.print_exc()
            await asyncio.sleep(1.0)


async def producer(redis):
    for i in itertools.count():
        try:
            print(f'Publish {i}')
            await redis.publish('test', str(i))
        except Exception:
            traceback.print_exc()
        await asyncio.sleep(1.0)


loop = asyncio.get_event_loop()
redis = aioredis.from_url('redis://', decode_responses=True)
loop.create_task(consumer(redis.pubsub()))
loop.run_until_complete(producer(redis))

While running, stop the Redis server and then start it again.

Expected behavior: publish() and get_message() will fail while server is stopped, then succeed again after server restarts.

Observed behavior: publish() resumes working, but get_message() continues to fail with the following traceback:

Traceback (most recent call last):
  File "/Users/luhn/Code/aioredis-py/test.py", line 12, in consumer
    message = await pubsub.get_message(
  File "/Users/luhn/Code/aioredis-py/aioredis/client.py", line 4154, in get_message
    response = await self.parse_response(block=False, timeout=timeout)
  File "/Users/luhn/Code/aioredis-py/aioredis/client.py", line 4034, in parse_response
    if not block and not await conn.can_read(timeout=timeout):
  File "/Users/luhn/Code/aioredis-py/aioredis/connection.py", line 850, in can_read
    return await self._parser.can_read(timeout)
  File "/Users/luhn/Code/aioredis-py/aioredis/connection.py", line 453, in can_read
    return await self.read_from_socket(timeout=timeout, raise_on_timeout=False)
  File "/Users/luhn/Code/aioredis-py/aioredis/connection.py", line 464, in read_from_socket
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
aioredis.exceptions.ConnectionError: Connection closed by server.
@Andrew-Chen-Wang
Copy link
Collaborator

This is great code, but I believe we had a discussion before about how this was not the job of redis-py or aioredis. In my personal belief, I agree with this since people may want to be able to have a custom implementation for server failing.

@luhn
Copy link
Author

luhn commented Sep 17, 2021

Currently when a connection breaks, get_message() behaves differently than Redis.publish(), PubSub.subscribe(), or whatever your favorite command might be. That's why I'm reporting a bug. It should work exactly the same as everything else.

There's even a whole connect callback that will resubscribe to all subscribe topics upon reconnection, so obviously the implementation is meant to reconnect.

@Andrew-Chen-Wang
Copy link
Collaborator

I don't see where publish has a reconnection option. subscribe needs reconnection since it's long living, whereas publish (which I didn't see have a reconnect option) and get_message are single execution commands.

@luhn
Copy link
Author

luhn commented Sep 17, 2021

Maybe "reconnect" is the wrong word. All get_message needs to do is conn.disconnect() when a ConnectionError occurs. Connection will handle the rest.

I'm not sure what publish() does behind the scenes, but the effect is the same—The next call to publish() will use a new connection.

@luhn
Copy link
Author

luhn commented Sep 30, 2021

FWIW, the equivalent issue+PR I opened with redis-py has been merged. redis/redis-py#1572

@Andrew-Chen-Wang
Copy link
Collaborator

@luhn apologies, I never had a chance to respond, especially after I made sense of "Maybe "reconnect" is the wrong word". Thanks for also linking to redis-py. I'll probably be skipping your PR since I'll be making a huge PR port from redis-py tomorrow.

Thanks for creating the issue though!

Andrew-Chen-Wang added a commit that referenced this issue Oct 8, 2021
Auto-reconnect PubSub on get_message (redis/redis-py#1574)

* Thank you Theron Luhn <@luhn>!!!
* Fixes #1130

Signed-off-by: Andrew-Chen-Wang <acwangpython@gmail.com>
@Andrew-Chen-Wang Andrew-Chen-Wang linked a pull request Oct 8, 2021 that will close this issue
5 tasks
@Andrew-Chen-Wang Andrew-Chen-Wang linked a pull request Nov 20, 2021 that will close this issue
5 tasks
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
2 participants