Skip to content

Commit

Permalink
Add optional reopen throttling to ResumableBidiRpc
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed Jun 11, 2019
1 parent 7bc6bf7 commit 98f359f
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
24 changes: 21 additions & 3 deletions api_core/google/api_core/bidi.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,15 +389,29 @@ def should_recover(exc):
whenever an error is encountered on the stream.
metadata Sequence[Tuple(str, str)]: RPC metadata to include in
the request.
throttle_reopen (bool): If ``True``, throttling will be applied to
stream reopen calls. Defaults to ``False``.
"""

def __init__(self, start_rpc, should_recover, initial_request=None, metadata=None):
def __init__(
self,
start_rpc,
should_recover,
initial_request=None,
metadata=None,
throttle_reopen=False,
):
super(ResumableBidiRpc, self).__init__(start_rpc, initial_request, metadata)
self._should_recover = should_recover
self._operational_lock = threading.RLock()
self._finalized = False
self._finalize_lock = threading.Lock()

if throttle_reopen:
self._reopen_throttle = _Throttle(entry_cap=5, time_window=10)
else:
self._reopen_throttle = None

def _finalize(self, result):
with self._finalize_lock:
if self._finalized:
Expand Down Expand Up @@ -440,7 +454,11 @@ def _reopen(self):
# retryable error.

try:
self.open()
if self._reopen_throttle:
with self._reopen_throttle:
self.open()
else:
self.open()
# If re-opening or re-calling the method fails for any reason,
# consider it a terminal error and finalize the stream.
except Exception as exc:
Expand Down Expand Up @@ -639,7 +657,7 @@ def start(self):
thread = threading.Thread(
name=_BIDIRECTIONAL_CONSUMER_NAME,
target=self._thread_main,
args=(ready,)
args=(ready,),
)
thread.daemon = True
thread.start()
Expand Down
16 changes: 16 additions & 0 deletions api_core/tests/unit/test_bidi.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,22 @@ def test_reopen_failure_on_rpc_restart(self):
assert bidi_rpc.is_active is False
callback.assert_called_once_with(error2)

def test_using_throttle_on_reopen_requests(self):
call = CallStub([])
start_rpc = mock.create_autospec(
grpc.StreamStreamMultiCallable, instance=True, return_value=call
)
should_recover = mock.Mock(spec=["__call__"], return_value=True)
bidi_rpc = bidi.ResumableBidiRpc(
start_rpc, should_recover, throttle_reopen=True
)

patcher = mock.patch.object(bidi_rpc._reopen_throttle.__class__, "__enter__")
with patcher as mock_enter:
bidi_rpc._reopen()

mock_enter.assert_called_once()

def test_send_not_open(self):
bidi_rpc = bidi.ResumableBidiRpc(None, lambda _: False)

Expand Down

0 comments on commit 98f359f

Please sign in to comment.