Skip to content

Commit 9530548

Browse files
authored
feat: support for async bidi streaming apis (#836)
1 parent 8168988 commit 9530548

File tree

4 files changed

+676
-63
lines changed

4 files changed

+676
-63
lines changed

google/api_core/bidi.py

Lines changed: 39 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
"""Bi-directional streaming RPC helpers."""
15+
"""Helpers for synchronous bidirectional streaming RPCs."""
1616

1717
import collections
1818
import datetime
@@ -22,6 +22,7 @@
2222
import time
2323

2424
from google.api_core import exceptions
25+
from google.api_core.bidi_base import BidiRpcBase
2526

2627
_LOGGER = logging.getLogger(__name__)
2728
_BIDIRECTIONAL_CONSUMER_NAME = "Thread-ConsumeBidirectionalStream"
@@ -36,21 +37,6 @@ class _RequestQueueGenerator(object):
3637
otherwise open-ended set of requests to send through a request-streaming
3738
(or bidirectional) RPC.
3839
39-
The reason this is necessary is because gRPC takes an iterator as the
40-
request for request-streaming RPCs. gRPC consumes this iterator in another
41-
thread to allow it to block while generating requests for the stream.
42-
However, if the generator blocks indefinitely gRPC will not be able to
43-
clean up the thread as it'll be blocked on `next(iterator)` and not be able
44-
to check the channel status to stop iterating. This helper mitigates that
45-
by waiting on the queue with a timeout and checking the RPC state before
46-
yielding.
47-
48-
Finally, it allows for retrying without swapping queues because if it does
49-
pull an item off the queue when the RPC is inactive, it'll immediately put
50-
it back and then exit. This is necessary because yielding the item in this
51-
case will cause gRPC to discard it. In practice, this means that the order
52-
of messages is not guaranteed. If such a thing is necessary it would be
53-
easy to use a priority queue.
5440
5541
Example::
5642
@@ -62,12 +48,6 @@ class _RequestQueueGenerator(object):
6248
print(response)
6349
q.put(...)
6450
65-
Note that it is possible to accomplish this behavior without "spinning"
66-
(using a queue timeout). One possible way would be to use more threads to
67-
multiplex the grpc end event with the queue, another possible way is to
68-
use selectors and a custom event/queue object. Both of these approaches
69-
are significant from an engineering perspective for small benefit - the
70-
CPU consumed by spinning is pretty minuscule.
7151
7252
Args:
7353
queue (queue_module.Queue): The request queue.
@@ -96,6 +76,31 @@ def _is_active(self):
9676
return self.call is None or self.call.is_active()
9777

9878
def __iter__(self):
79+
# The reason this is necessary is because gRPC takes an iterator as the
80+
# request for request-streaming RPCs. gRPC consumes this iterator in
81+
# another thread to allow it to block while generating requests for
82+
# the stream. However, if the generator blocks indefinitely gRPC will
83+
# not be able to clean up the thread as it'll be blocked on
84+
# `next(iterator)` and not be able to check the channel status to stop
85+
# iterating. This helper mitigates that by waiting on the queue with
86+
# a timeout and checking the RPC state before yielding.
87+
#
88+
# Finally, it allows for retrying without swapping queues because if
89+
# it does pull an item off the queue when the RPC is inactive, it'll
90+
# immediately put it back and then exit. This is necessary because
91+
# yielding the item in this case will cause gRPC to discard it. In
92+
# practice, this means that the order of messages is not guaranteed.
93+
# If such a thing is necessary it would be easy to use a priority
94+
# queue.
95+
#
96+
# Note that it is possible to accomplish this behavior without
97+
# "spinning" (using a queue timeout). One possible way would be to use
98+
# more threads to multiplex the grpc end event with the queue, another
99+
# possible way is to use selectors and a custom event/queue object.
100+
# Both of these approaches are significant from an engineering
101+
# perspective for small benefit - the CPU consumed by spinning is
102+
# pretty minuscule.
103+
99104
if self._initial_request is not None:
100105
if callable(self._initial_request):
101106
yield self._initial_request()
@@ -201,7 +206,7 @@ def __repr__(self):
201206
)
202207

203208

204-
class BidiRpc(object):
209+
class BidiRpc(BidiRpcBase):
205210
"""A helper for consuming a bi-directional streaming RPC.
206211
207212
This maps gRPC's built-in interface which uses a request iterator and a
@@ -227,6 +232,8 @@ class BidiRpc(object):
227232
rpc.send(example_pb2.StreamingRpcRequest(
228233
data='example'))
229234
235+
rpc.close()
236+
230237
This does *not* retry the stream on errors. See :class:`ResumableBidiRpc`.
231238
232239
Args:
@@ -240,40 +247,14 @@ class BidiRpc(object):
240247
the request.
241248
"""
242249

243-
def __init__(self, start_rpc, initial_request=None, metadata=None):
244-
self._start_rpc = start_rpc
245-
self._initial_request = initial_request
246-
self._rpc_metadata = metadata
247-
self._request_queue = queue_module.Queue()
248-
self._request_generator = None
249-
self._is_active = False
250-
self._callbacks = []
251-
self.call = None
252-
253-
def add_done_callback(self, callback):
254-
"""Adds a callback that will be called when the RPC terminates.
255-
256-
This occurs when the RPC errors or is successfully terminated.
257-
258-
Args:
259-
callback (Callable[[grpc.Future], None]): The callback to execute.
260-
It will be provided with the same gRPC future as the underlying
261-
stream which will also be a :class:`grpc.Call`.
262-
"""
263-
self._callbacks.append(callback)
264-
265-
def _on_call_done(self, future):
266-
# This occurs when the RPC errors or is successfully terminated.
267-
# Note that grpc's "future" here can also be a grpc.RpcError.
268-
# See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331
269-
# that `grpc.RpcError` is also `grpc.call`.
270-
for callback in self._callbacks:
271-
callback(future)
250+
def _create_queue(self):
251+
"""Create a queue for requests."""
252+
return queue_module.Queue()
272253

273254
def open(self):
274255
"""Opens the stream."""
275256
if self.is_active:
276-
raise ValueError("Can not open an already open stream.")
257+
raise ValueError("Cannot open an already open stream.")
277258

278259
request_generator = _RequestQueueGenerator(
279260
self._request_queue, initial_request=self._initial_request
@@ -322,7 +303,7 @@ def send(self, request):
322303
request (protobuf.Message): The request to send.
323304
"""
324305
if self.call is None:
325-
raise ValueError("Can not send() on an RPC that has never been open()ed.")
306+
raise ValueError("Cannot send on an RPC stream that has never been opened.")
326307

327308
# Don't use self.is_active(), as ResumableBidiRpc will overload it
328309
# to mean something semantically different.
@@ -343,20 +324,15 @@ def recv(self):
343324
protobuf.Message: The received message.
344325
"""
345326
if self.call is None:
346-
raise ValueError("Can not recv() on an RPC that has never been open()ed.")
327+
raise ValueError("Cannot recv on an RPC stream that has never been opened.")
347328

348329
return next(self.call)
349330

350331
@property
351332
def is_active(self):
352-
"""bool: True if this stream is currently open and active."""
333+
"""True if this stream is currently open and active."""
353334
return self.call is not None and self.call.is_active()
354335

355-
@property
356-
def pending_requests(self):
357-
"""int: Returns an estimate of the number of queued requests."""
358-
return self._request_queue.qsize()
359-
360336

361337
def _never_terminate(future_or_error):
362338
"""By default, no errors cause BiDi termination."""
@@ -544,7 +520,7 @@ def _send(self, request):
544520
call = self.call
545521

546522
if call is None:
547-
raise ValueError("Can not send() on an RPC that has never been open()ed.")
523+
raise ValueError("Cannot send on an RPC that has never been opened.")
548524

549525
# Don't use self.is_active(), as ResumableBidiRpc will overload it
550526
# to mean something semantically different.
@@ -563,7 +539,7 @@ def _recv(self):
563539
call = self.call
564540

565541
if call is None:
566-
raise ValueError("Can not recv() on an RPC that has never been open()ed.")
542+
raise ValueError("Cannot recv on an RPC that has never been opened.")
567543

568544
return next(call)
569545

0 commit comments

Comments
 (0)