Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slow memory leak in RedisChannelLayer .receive_buffer due to lack of cleanup/expiry #212

Closed
ryanpetrello opened this issue Jul 24, 2020 · 22 comments

Comments

@ryanpetrello
Copy link
Contributor

ryanpetrello commented Jul 24, 2020

I'm observing what looks like a slow memory leak in channels-redis in a project I maintain that uses Daphne (Ansible AWX):

/var/lib/awx/venv/awx/bin/pip3 freeze | egrep "chan|daphne|Twisted"
channels==2.4.0
channels-redis==2.4.2
daphne==2.4.1
Twisted==20.3.0

https://github.com/ansible/awx/blob/devel/awx/main/consumers.py#L114

This looks quite similar to a recently reported (and resolved) memory leak: django/channels#1181

After noticing a slow memory leak in Daphe at high volumes of (large) messages:

image

I decided to add some instrumentation to my routing code with tracemalloc to try to spot the leak:

diff --git a/awx/main/routing.py b/awx/main/routing.py
index 2866d46ed0..71e40c693f 100644
--- a/awx/main/routing.py
+++ b/awx/main/routing.py
@@ -1,3 +1,6 @@
+from datetime import datetime
+import tracemalloc
+
 import redis
 import logging

@@ -10,11 +13,16 @@ from channels.routing import ProtocolTypeRouter, URLRouter
 from . import consumers


+TRACE_INTERVAL = 60 * 5  # take a tracemalloc snapshot every 5 minutes
+
+
 logger = logging.getLogger('awx.main.routing')


 class AWXProtocolTypeRouter(ProtocolTypeRouter):
     def __init__(self, *args, **kwargs):
+        tracemalloc.start()


+class SnapshottedEventConsumer(consumers.EventConsumer):
+
+    last_snapshot = datetime.min
+
+    async def connect(self):
+        if (datetime.now() - SnapshottedEventConsumer.last_snapshot).total_seconds() > TRACE_INTERVAL:
+            snapshot = tracemalloc.take_snapshot()
+            top_stats = snapshot.statistics('lineno')
+            top_stats = [
+                stat for stat in top_stats[:25]
+                if 'importlib._bootstrap_external' not in str(stat)
+            ]
+            for stat in top_stats[:25]:
+                logger.error('[TRACE] ' + str(stat))
+            SnapshottedEventConsumer.last_snapshot = datetime.now()
+        await super().connect()
+
+
 websocket_urlpatterns = [
-    url(r'websocket/$', consumers.EventConsumer),
+    url(r'websocket/$', SnapshottedEventConsumer),

...and what I found was that we were leaking the actual messages somewhere:

2020-07-23 18:20:44,628 ERROR    awx.main.routing [TRACE] /var/lib/awx/venv/awx/lib64/python3.6/site-packages/channels_redis/core.py:782: size=121 MiB, count=201990, average=626 B
2020-07-23 18:20:44,630 ERROR    awx.main.routing [TRACE] /usr/lib64/python3.6/asyncio/base_events.py:295: size=7628 KiB, count=71910, average=109 B
2020-07-23 18:20:44,630 ERROR    awx.main.routing [TRACE] /usr/lib64/python3.6/asyncio/base_events.py:615: size=342 KiB, count=663, average=528 B
2020-07-23 18:20:44,631 ERROR    awx.main.routing [TRACE] /var/lib/awx/venv/awx/lib64/python3.6/site-packages/redis/client.py:91: size=191 KiB, count=1791, average=109 B
2020-07-23 18:20:44,632 ERROR    awx.main.routing [TRACE] /usr/lib64/python3.6/tracemalloc.py:65: size=142 KiB, count=2020, average=72 B

I spent some time reading over https://github.com/django/channels_redis/blob/master/channels_redis/core.py and was immediately suspicious of self.receive_buffer. In my reproduction, here's what I'm seeing:

  1. I can open a new browser tab that establishes a new websocket connection.
  2. In the background, I'm generating very high volumes of data (in our case, the data represents the output of a very noisy Ansible playbook) that is going over the websocket and being printed to the user in their browser.
  3. Part of the way through this process, I'm closing the browser, and disconnecting.
  4. In the meantime, however, self.receive_buffer[channel] for the prior browser client is still growing, and these messages never end up being freed. I can open subsequent browser sessions, establish new websocket connections, and using an interactive debugger, I can see that the channel layer's receive_buffer is keeping around stale messages for old channels which will never be delivered (or freed).

I think the root of this issue is that the self.receive_buffer data structure doesn't have any form of internal cleanup or expiry (which is supported by this code comment):

# TODO: message expiry?

In practice, having only a send / receive interface might make this a hard/expensive problem to solve (tracking and managing in-memory expirations). One thing that stands out to me about the Redis channel layer is that it does do expiration in redis, but once messages are loaded into memory, there doesn't seem to be any sort expiration logic for the receive_buffer data structure (and I'm able to see it grow in an unbounded way fairly easily in my reproduction).

@ryanpetrello
Copy link
Contributor Author

I'm attempting to work around this issue in the AWX project itself at the moment by adding some code which tracks disconnections, and prevents the redis channel layer from continuing to handle messages for that specific channel:

ansible/awx#7719

It's worth mentioning however, that this workaround probably isn't perfect, and I'm guessing it still sometimes leaks a little bit of data. In my opinion, what's really needed is some mechanism for channels_redis to clean this data structure up proactively after messages have expired.

Aside from a change in the Redis channel layer to add housekeeping to this data structure, can you think of anything else I could do here?

@carltongibson
Copy link
Member

carltongibson commented Jul 24, 2020

Hi @ryanpetrello. First up can you update to 3.0.1? There were changes in 4c88088 related to expiration. (Haven't looked to see if that's that's not likely related yet but...)

@ryanpetrello
Copy link
Contributor Author

ryanpetrello commented Jul 24, 2020

@carltongibson I can give that a go, but I think the issue here isn't that keys aren't getting expired in redis, it's that the self.receive_buffer data structure inside of Daphne's memory space is growing without bounds.

@ryanpetrello
Copy link
Contributor Author

ryanpetrello commented Jul 24, 2020

I'll spend some time in the next day or so trying to make a simple reproducer for this outside of Ansible AWX with 3.0.1.

@carltongibson
Copy link
Member

Hi @ryanpetrello yes, I realised that reading in more depth.

If you can pin down a minimal case that would be awesome. Thanks.

@ryanpetrello
Copy link
Contributor Author

ryanpetrello commented Jul 24, 2020

@carltongibson,

I'm able to reproduce this in a simple Django/Channels project (outside of AWX). Here's a repo with instructions for reproducing it:

https://github.com/ryanpetrello/channels-redis-leak/

~ pip3 freeze | egrep "chan|daphne|Twisted"
channels==2.4.0
channels-redis==3.0.1
daphne==2.5.0
django-channels==0.7.0
leaky-channels==0.0.0
Twisted==20.3.0

ryanpetrello added a commit to ryanpetrello/channels_redis that referenced this issue Jul 27, 2020
@carltongibson
Copy link
Member

@ryanpetrello Super. Thanks!

If you fancy, take a look at django/channels#165, which does a rewrite here: I haven't had bandwidth at all to look at that still but your input would be good, and it may inspire something at the worst.

@ryanpetrello
Copy link
Contributor Author

ryanpetrello commented Jul 27, 2020

To be honest, @carltongibson, now that I'm reading it, this issue sounds like a duplicate of #384

I'm unsure if it's worth tracking there or here. I'll give #165 a whirl and see if fixes the memory leak for me, though.

@carltongibson
Copy link
Member

No, let's keep this open. Best to triangulate in these slow burners.

@ryanpetrello
Copy link
Contributor Author

ryanpetrello commented Jul 27, 2020

@carltongibson I tried the patch at #165 and it also leaks for me.

#165 (comment)

^ @davidfstr noticed the same thing - the suggested rewrite also leaks.

I do agree that the patch you linked to is simpler, but it still leaks - I think the only way around this issue is to implement some form of TTL and expiration for in-memory buffers. I'm currently tinkering with this idea.

@carltongibson
Copy link
Member

Ok, super. Thanks for giving it a run.

If you have input here, that would be super. My attention is entirely on updating Channels for ASGI 3's single callables and documenting use with Django 3.1 — so I do need a bit of input to resolve this any time soon.

@ryanpetrello
Copy link
Contributor Author

@carltongibson here's my attempt at this:

#213

ryanpetrello added a commit to ryanpetrello/channels_redis that referenced this issue Jul 27, 2020
ryanpetrello added a commit to ryanpetrello/channels_redis that referenced this issue Jul 27, 2020
ryanpetrello added a commit to ryanpetrello/channels_redis that referenced this issue Jul 27, 2020
ryanpetrello added a commit to ryanpetrello/channels_redis that referenced this issue Jul 27, 2020
ryanpetrello added a commit to ryanpetrello/channels_redis that referenced this issue Jul 27, 2020
ryanpetrello added a commit to ryanpetrello/channels_redis that referenced this issue Jul 27, 2020
ryanpetrello added a commit to ryanpetrello/channels_redis that referenced this issue Jul 27, 2020
ryanpetrello added a commit to ryanpetrello/channels_redis that referenced this issue Jul 27, 2020
@ryanpetrello
Copy link
Contributor Author

@davidfstr I believe I'm encountering a duplicate of the issue you reported at #384

Did you ever come up with a solution other than periodically restarting Daphne?

ryanpetrello added a commit to ryanpetrello/channels_redis that referenced this issue Aug 20, 2020
respect the capacity setting so that the receive_buffer does not grow
without bounds

see: django#212
@ryanpetrello
Copy link
Contributor Author

Hey @carltongibson,

I've been running a patch like this one (ansible/awx#7950) in production for almost a week now, and am no longer seeing the memory leak outlined in my reproducer:

#219

ryanpetrello added a commit to ryanpetrello/channels_redis that referenced this issue Aug 20, 2020
respect the capacity setting so that the receive_buffer does not grow
without bounds

see: django#212
@peteryin21
Copy link

I've been following this thread, as I'm also seeing memory leak issues. We are using channels + channels_redis + uvicorn.

One thing I wanted to add to the discussion is why we are seeing this receive_buffer growth in the first place. I'm actually a bit confused how @ryanpetrello can recreate this simply by opening a ws connection and then disconnecting. Based on my read of core.py it seems that we determine which channel receive buffer queues to add a message to based on what channels are in the group. Normally on websocket disconnects we do remove the channel from the group. I would expect the memory leak to occur when for some reason the websocket connection has dropped but we don't detect it in channels, which leads to us keeping the channel in the group and populating the receive buffer.

In our application, this is what we observe, with the following error triggering a climb in memory usage at https://github.com/encode/uvicorn/blob/master/uvicorn/protocols/websockets/websockets_impl.py#L157

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/websockets/protocol.py", line 827, in transfer_data
    message = await self.read_message()
  File "/usr/local/lib/python3.7/site-packages/websockets/protocol.py", line 895, in read_message
    frame = await self.read_data_frame(max_size=self.max_size)
  File "/usr/local/lib/python3.7/site-packages/websockets/protocol.py", line 971, in read_data_frame
    frame = await self.read_frame(max_size)
  File "/usr/local/lib/python3.7/site-packages/websockets/protocol.py", line 1051, in read_frame
    extensions=self.extensions,
  File "/usr/local/lib/python3.7/site-packages/websockets/framing.py", line 105, in read
    data = await reader(2)
  File "/usr/local/lib/python3.7/asyncio/streams.py", line 679, in readexactly
    await self._wait_for_data('readexactly')
  File "/usr/local/lib/python3.7/asyncio/streams.py", line 473, in _wait_for_data
    await self._waiter
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/uvicorn/protocols/websockets/websockets_impl.py", line 153, in run_asgi
    result = await self.app(self.scope, self.asgi_receive, self.asgi_send)
  File "/usr/local/lib/python3.7/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
    return await self.app(scope, receive, send)
  File "/usr/local/lib/python3.7/site-packages/uvicorn/middleware/asgi2.py", line 7, in __call__
    await instance(receive, send)
  File "/usr/local/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
    return await self.inner(receive, self.send)
  File "/usr/local/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
    await inner_instance(receive, send)
  File "/usr/local/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__
    [receive, self.channel_receive], self.dispatch
  File "/usr/local/lib/python3.7/site-packages/channels/utils.py", line 51, in await_many_dispatch
    await dispatch(result)
  File "./acs_broker/websocket/consumers.py", line 40, in dispatch
    await self.send_json({"mutation": mutation_key, "data": event["data"]})
  File "/usr/local/lib/python3.7/site-packages/channels/generic/websocket.py", line 273, in send_json
    await super().send(text_data=await self.encode_json(content), close=close)
  File "/usr/local/lib/python3.7/site-packages/channels/generic/websocket.py", line 211, in send
    await super().send({"type": "websocket.send", "text": text_data})
  File "/usr/local/lib/python3.7/site-packages/channels/consumer.py", line 81, in send
    await self.base_send(message)
  File "/usr/local/lib/python3.7/site-packages/channels/sessions.py", line 236, in send
    return await self.real_send(message)
  File "/usr/local/lib/python3.7/site-packages/uvicorn/protocols/websockets/websockets_impl.py", line 211, in asgi_send
    await self.send(data)
  File "/usr/local/lib/python3.7/site-packages/websockets/protocol.py", line 555, in send
    await self.ensure_open()
  File "/usr/local/lib/python3.7/site-packages/websockets/protocol.py", line 812, in ensure_open
    raise self.connection_closed_exc()
websockets.exceptions.ConnectionClosedError: code = 1006 (connection closed abnormally [internal]), no reason

I also verified the hypothesis that memory grows when a channel is never removed from a group on disconnect by overriding the websocket_disconnect method on my consumer. I opened 2 new tabs with WS connections, and closed one, and the memory started growing.

    async def websocket_disconnect(self, message):
        """
        Called when a WebSocket connection is closed. Base level so you don't
        need to call super() all the time.
        """
        logger.info(f"WEBSOCKET DISCONNECT CALLED for {self.channel_name}")
        # Don't remove the channel from the group
        # try:
        #     for group in self.groups:
        #         await self.channel_layer.group_discard(group, self.channel_name)
        # except AttributeError:
        #     raise Exception("BACKEND is unconfigured or doesn't support groups")
        await self.disconnect(message["code"])
        raise StopConsumer()

So overall, my understanding of what's happening is 1) A cancellation error in our ASGI app during ws handshake (of unknown reason) closes the websocket connection and throws an ASGI error in uvicorn. 2) This disconnection doesn’t propagate to our django consumer so we never remove the channel from our group. 3) This leads to WS messages continually getting sent to the channel that has been disconnected, which will ultimately lead to populating it’s receive buffer and growing indefinitely.

I think this workaround can serve as a patch to avoid indefinite memory leakage, but I'm curious if there are any ideas about the root cause, and if there would be any way to ensure that a ws connection drop can always propagate to channels so we can remove that channel from the group.

@ryanpetrello
Copy link
Contributor Author

ryanpetrello commented Aug 26, 2020

Hey @peteryin21 - thanks for the detailed reply.

For what it's worth, I'm not using uvicorn, and I don't see any evidence of an uncaught concurrent.futures._base.CancelledError being raised in my logs.

I agree with your overall understanding; there's a comment thread on a related issue in channels that has similar/related thoughts:

#384
#384

I don't think this patch is a perfect solution, but it's allowed me to get memory growth under control and not constantly restart Daphne. I'm wondering if it might also help for you:

#219

ryanpetrello added a commit to ryanpetrello/channels_redis that referenced this issue Sep 2, 2020
respect the capacity setting so that the receive_buffer does not grow
without bounds

see: django#212
ryanpetrello added a commit to ryanpetrello/channels_redis that referenced this issue Sep 2, 2020
respect the capacity setting so that the receive_buffer does not grow
without bounds

see: django#212
ryanpetrello added a commit to ryanpetrello/channels_redis that referenced this issue Sep 2, 2020
respect the capacity setting so that the receive_buffer does not grow
without bounds

see: django#212
ryanpetrello added a commit to ryanpetrello/channels_redis that referenced this issue Sep 2, 2020
respect the capacity setting so that the receive_buffer does not grow
without bounds

see: django#212
carltongibson pushed a commit to ryanpetrello/channels_redis that referenced this issue Sep 3, 2020
respect the capacity setting so that the receive_buffer does not grow
without bounds

see: django#212
carltongibson pushed a commit to ryanpetrello/channels_redis that referenced this issue Sep 3, 2020
respect the capacity setting so that the receive_buffer does not grow
without bounds

see: django#212
carltongibson pushed a commit that referenced this issue Sep 3, 2020
)

respect the capacity setting so that the receive_buffer does not grow
without bounds

see: #212
@ryanpetrello
Copy link
Contributor Author

@carltongibson
Copy link
Member

Thanks for the report and the fix! 🥇

@yuri1992
Copy link

Which version of channels_redis is fixing the memory leak?

@davidfstr
Copy link

Which version of channels_redis is fixing the memory leak?

3.1, according to the tweet linked above

dudleyhunt86 added a commit to dudleyhunt86/channels_redis-python-dev that referenced this issue Oct 7, 2022
…219)

respect the capacity setting so that the receive_buffer does not grow
without bounds

see: django/channels_redis#212
goldentroll added a commit to goldentroll/channels_redis that referenced this issue Mar 14, 2023
…219)

respect the capacity setting so that the receive_buffer does not grow
without bounds

see: django/channels_redis#212
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants