Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: memory leak in bidi classes #770

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions google/api_core/bidi.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ def close(self):
self._request_queue.put(None)
self.call.cancel()
self._request_generator = None
self._initial_request = None
self._callbacks = []
# Don't set self.call to None. Keep it around so that send/recv can
# raise the error.

Expand Down Expand Up @@ -717,6 +719,7 @@ def stop(self):
_LOGGER.warning("Background thread did not exit.")

self._thread = None
self._on_response = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes you can't restart an instance after it has been stopped. The test cases and docstrings point to that being the case, but it's not completely clear


@property
def is_active(self):
Expand Down
7 changes: 7 additions & 0 deletions tests/unit/test_bidi.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ def test_close(self):
# ensure the request queue was signaled to stop.
assert bidi_rpc.pending_requests == 1
assert bidi_rpc._request_queue.get() is None
# ensure request and callbacks are cleaned up
assert bidi_rpc._initial_request is None
assert not bidi_rpc._callbacks

def test_close_no_rpc(self):
bidi_rpc = bidi.BidiRpc(None)
Expand Down Expand Up @@ -623,6 +626,8 @@ def cancel_side_effect():
assert bidi_rpc.pending_requests == 1
assert bidi_rpc._request_queue.get() is None
assert bidi_rpc._finalized
assert bidi_rpc._initial_request is None
assert not bidi_rpc._callbacks

def test_reopen_failure_on_rpc_restart(self):
error1 = ValueError("1")
Expand Down Expand Up @@ -777,6 +782,7 @@ def on_response(response):
consumer.stop()

assert consumer.is_active is False
assert consumer._on_response is None

def test_wake_on_error(self):
should_continue = threading.Event()
Expand Down Expand Up @@ -884,6 +890,7 @@ def close_side_effect():

consumer.stop()
assert consumer.is_active is False
assert consumer._on_response is None

# calling stop twice should not result in an error.
consumer.stop()
Loading