-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make aio wait_key more robust #128
Conversation
bmerry
commented
Nov 17, 2021
- Work around an aioredis bug
- Make pubsub reconnection more robust
If the pubsub connection is allowed to unsubscribe from everything, then later (after health check interval expires) tries to resubscribe, it can end up with a bogus message. The workaround is belt-and-braces: always subscribe to some dummy channel (with a name constructed to be unlikely to conflict with any sane use), and also detect and ignore the bogus messages.
- Catch builtin ConnectionError class in addition to aioredis.ConnectionError. - Always sleep after a connection error, even if there was no connection object. Without this it was spamming, presumably because aioredis had itself removed the connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor suggestion, otherwise looks solid.
@@ -289,12 +299,14 @@ async def _run_pubsub(self) -> None: | |||
""" | |||
try: | |||
loop = asyncio.get_event_loop() | |||
# Ensure we are always subscribed to something, as a workaround for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is only on the connection dealing with subscriptions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is also the synchronous telstate redis backend. In theory that might also be vulnerable to redis/redis-py#1720, but it uses a dedicated connection for each call to wait_keys
and hence shouldn't run into problems unless something stalls one of the initial subscriptions for more than 30 seconds - highly unlikely and would probably cause the socket timeout to expire anyway. The aioredis backend was changed to multiplex over a single pub-sub connection because something (I think ingest) needed to monitor a large number of keys and having a connection per wait_key was running into some limit.
katsdptelstate/aio/redis.py
Outdated
@@ -34,6 +34,9 @@ | |||
|
|||
logger = logging.getLogger(__name__) | |||
_QueueItem = Tuple[bytes, Optional[bytes]] | |||
# Note: this must be valid UTF-8, because aioredis decodes it if it needs to | |||
# reconnect to the server. | |||
_dummy_channel = b'\0katsdptelstate-internal0\001' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be _DUMMY_CHANNEL
since it's a literal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quite right - fixed! For some reason I thought about it when I wrote the code and convinced myself that it should be lower-case, but I have no idea why.
It is a constant.
Since the suggested change was a purely mechanical one I'm going to go ahead and merge. |