Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(api_core): finalize during close of 'ResumableBidiRpc'. #9337

Merged
merged 3 commits into from
Oct 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion api_core/google/api_core/bidi.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,10 @@ def _recv(self):
def recv(self):
return self._recoverable(self._recv)

def close(self):
self._finalize(None)
super(ResumableBidiRpc, self).close()

@property
def is_active(self):
"""bool: True if this stream is currently open and active."""
Expand Down Expand Up @@ -698,7 +702,11 @@ def stop(self):
if self._thread is not None:
# Resume the thread to wake it up in case it is sleeping.
self.resume()
self._thread.join()
# The daemonized thread may itself block, so don't wait
# for it longer than a second.
self._thread.join(1.0)
tseaver marked this conversation as resolved.
Show resolved Hide resolved
if self._thread.is_alive(): # pragma: NO COVER
_LOGGER.warning("Background thread did not exit.")
tseaver marked this conversation as resolved.
Show resolved Hide resolved

self._thread = None

Expand Down
25 changes: 25 additions & 0 deletions api_core/tests/unit/test_bidi.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,31 @@ def test_recv_failure(self):
assert bidi_rpc.is_active is False
assert call.cancelled is True

def test_close(self):
call = mock.create_autospec(_CallAndFuture, instance=True)

def cancel_side_effect():
call.is_active.return_value = False

call.cancel.side_effect = cancel_side_effect
start_rpc = mock.create_autospec(
grpc.StreamStreamMultiCallable, instance=True, return_value=call
)
should_recover = mock.Mock(spec=["__call__"], return_value=False)
bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
bidi_rpc.open()

bidi_rpc.close()

should_recover.assert_not_called()
call.cancel.assert_called_once()
assert bidi_rpc.call == call
assert bidi_rpc.is_active is False
# ensure the request queue was signaled to stop.
assert bidi_rpc.pending_requests == 1
assert bidi_rpc._request_queue.get() is None
assert bidi_rpc._finalized

def test_reopen_failure_on_rpc_restart(self):
error1 = ValueError("1")
error2 = ValueError("2")
Expand Down