From 98f359f24c024aa928f90e3b3efab35d122c68a5 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 30 May 2019 13:15:42 +0200 Subject: [PATCH] Add optional reopen throttling to ResumableBidiRpc --- api_core/google/api_core/bidi.py | 24 +++++++++++++++++++++--- api_core/tests/unit/test_bidi.py | 16 ++++++++++++++++ 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/api_core/google/api_core/bidi.py b/api_core/google/api_core/bidi.py index 18b7cea2ecefe..fed8d6f03d2ed 100644 --- a/api_core/google/api_core/bidi.py +++ b/api_core/google/api_core/bidi.py @@ -389,15 +389,29 @@ def should_recover(exc): whenever an error is encountered on the stream. metadata Sequence[Tuple(str, str)]: RPC metadata to include in the request. + throttle_reopen (bool): If ``True``, throttling will be applied to + stream reopen calls. Defaults to ``False``. """ - def __init__(self, start_rpc, should_recover, initial_request=None, metadata=None): + def __init__( + self, + start_rpc, + should_recover, + initial_request=None, + metadata=None, + throttle_reopen=False, + ): super(ResumableBidiRpc, self).__init__(start_rpc, initial_request, metadata) self._should_recover = should_recover self._operational_lock = threading.RLock() self._finalized = False self._finalize_lock = threading.Lock() + if throttle_reopen: + self._reopen_throttle = _Throttle(entry_cap=5, time_window=10) + else: + self._reopen_throttle = None + def _finalize(self, result): with self._finalize_lock: if self._finalized: @@ -440,7 +454,11 @@ def _reopen(self): # retryable error. try: - self.open() + if self._reopen_throttle: + with self._reopen_throttle: + self.open() + else: + self.open() # If re-opening or re-calling the method fails for any reason, # consider it a terminal error and finalize the stream. except Exception as exc: @@ -639,7 +657,7 @@ def start(self): thread = threading.Thread( name=_BIDIRECTIONAL_CONSUMER_NAME, target=self._thread_main, - args=(ready,) + args=(ready,), ) thread.daemon = True thread.start() diff --git a/api_core/tests/unit/test_bidi.py b/api_core/tests/unit/test_bidi.py index 889caa8bfb90a..934522b8a2338 100644 --- a/api_core/tests/unit/test_bidi.py +++ b/api_core/tests/unit/test_bidi.py @@ -514,6 +514,22 @@ def test_reopen_failure_on_rpc_restart(self): assert bidi_rpc.is_active is False callback.assert_called_once_with(error2) + def test_using_throttle_on_reopen_requests(self): + call = CallStub([]) + start_rpc = mock.create_autospec( + grpc.StreamStreamMultiCallable, instance=True, return_value=call + ) + should_recover = mock.Mock(spec=["__call__"], return_value=True) + bidi_rpc = bidi.ResumableBidiRpc( + start_rpc, should_recover, throttle_reopen=True + ) + + patcher = mock.patch.object(bidi_rpc._reopen_throttle.__class__, "__enter__") + with patcher as mock_enter: + bidi_rpc._reopen() + + mock_enter.assert_called_once() + def test_send_not_open(self): bidi_rpc = bidi.ResumableBidiRpc(None, lambda _: False)