From 958e9ff01cd3be93081876a29557c6a043d6b867 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Fri, 27 Sep 2019 14:43:55 -0400 Subject: [PATCH 1/3] fix(api_core): finalize during close of 'ResumableBidiRpc Avoid blocking for ill-behaved daemon threads during BiDi shutdown. Closes #8616, #9008. --- api_core/google/api_core/bidi.py | 8 +++++++- api_core/tests/unit/test_bidi.py | 25 +++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/api_core/google/api_core/bidi.py b/api_core/google/api_core/bidi.py index f73c7c9dfabc..456542c98679 100644 --- a/api_core/google/api_core/bidi.py +++ b/api_core/google/api_core/bidi.py @@ -561,6 +561,10 @@ def _recv(self): def recv(self): return self._recoverable(self._recv) + def close(self): + self._finalize(None) + super(ResumableBidiRpc, self).close() + @property def is_active(self): """bool: True if this stream is currently open and active.""" @@ -698,7 +702,9 @@ def stop(self): if self._thread is not None: # Resume the thread to wake it up in case it is sleeping. self.resume() - self._thread.join() + self._thread.join(1.0) + if self._thread.is_alive(): + _LOGGER.warning("Background thread did not exit.") self._thread = None diff --git a/api_core/tests/unit/test_bidi.py b/api_core/tests/unit/test_bidi.py index 4d185d3158e4..52215cbde22f 100644 --- a/api_core/tests/unit/test_bidi.py +++ b/api_core/tests/unit/test_bidi.py @@ -597,6 +597,31 @@ def test_recv_failure(self): assert bidi_rpc.is_active is False assert call.cancelled is True + def test_close(self): + call = mock.create_autospec(_CallAndFuture, instance=True) + + def cancel_side_effect(): + call.is_active.return_value = False + + call.cancel.side_effect = cancel_side_effect + start_rpc = mock.create_autospec( + grpc.StreamStreamMultiCallable, instance=True, return_value=call + ) + should_recover = mock.Mock(spec=["__call__"], return_value=False) + bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover) + bidi_rpc.open() + + bidi_rpc.close() + + should_recover.assert_not_called() + call.cancel.assert_called_once() + assert bidi_rpc.call == call + assert bidi_rpc.is_active is False + # ensure the request queue was signaled to stop. + assert bidi_rpc.pending_requests == 1 + assert bidi_rpc._request_queue.get() is None + assert bidi_rpc._finalized + def test_reopen_failure_on_rpc_restart(self): error1 = ValueError("1") error2 = ValueError("2") From 1ec9a37f0d94b3a90a08714a92373e35959c78d0 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Fri, 27 Sep 2019 17:35:43 -0400 Subject: [PATCH 2/3] Add comment explaining the timeout to 'thread.join'. --- api_core/google/api_core/bidi.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/api_core/google/api_core/bidi.py b/api_core/google/api_core/bidi.py index 456542c98679..34ecc3b2d16b 100644 --- a/api_core/google/api_core/bidi.py +++ b/api_core/google/api_core/bidi.py @@ -702,6 +702,8 @@ def stop(self): if self._thread is not None: # Resume the thread to wake it up in case it is sleeping. self.resume() + # The daemonized thread may itself block, so don't wait + # for it longer than a second. self._thread.join(1.0) if self._thread.is_alive(): _LOGGER.warning("Background thread did not exit.") From 3a05c19c368b8ecfc795913b339e5bd40d327d3a Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 1 Oct 2019 10:48:36 -0400 Subject: [PATCH 3/3] Skip coverage for logging the nosferatu thread. --- api_core/google/api_core/bidi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api_core/google/api_core/bidi.py b/api_core/google/api_core/bidi.py index 34ecc3b2d16b..b171a4112a31 100644 --- a/api_core/google/api_core/bidi.py +++ b/api_core/google/api_core/bidi.py @@ -705,7 +705,7 @@ def stop(self): # The daemonized thread may itself block, so don't wait # for it longer than a second. self._thread.join(1.0) - if self._thread.is_alive(): + if self._thread.is_alive(): # pragma: NO COVER _LOGGER.warning("Background thread did not exit.") self._thread = None