diff --git a/google/api_core/bidi.py b/google/api_core/bidi.py index bed4c70e..270ad091 100644 --- a/google/api_core/bidi.py +++ b/google/api_core/bidi.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Bi-directional streaming RPC helpers.""" +"""Helpers for synchronous bidirectional streaming RPCs.""" import collections import datetime @@ -22,6 +22,7 @@ import time from google.api_core import exceptions +from google.api_core.bidi_base import BidiRpcBase _LOGGER = logging.getLogger(__name__) _BIDIRECTIONAL_CONSUMER_NAME = "Thread-ConsumeBidirectionalStream" @@ -36,21 +37,6 @@ class _RequestQueueGenerator(object): otherwise open-ended set of requests to send through a request-streaming (or bidirectional) RPC. - The reason this is necessary is because gRPC takes an iterator as the - request for request-streaming RPCs. gRPC consumes this iterator in another - thread to allow it to block while generating requests for the stream. - However, if the generator blocks indefinitely gRPC will not be able to - clean up the thread as it'll be blocked on `next(iterator)` and not be able - to check the channel status to stop iterating. This helper mitigates that - by waiting on the queue with a timeout and checking the RPC state before - yielding. - - Finally, it allows for retrying without swapping queues because if it does - pull an item off the queue when the RPC is inactive, it'll immediately put - it back and then exit. This is necessary because yielding the item in this - case will cause gRPC to discard it. In practice, this means that the order - of messages is not guaranteed. If such a thing is necessary it would be - easy to use a priority queue. Example:: @@ -62,12 +48,6 @@ class _RequestQueueGenerator(object): print(response) q.put(...) - Note that it is possible to accomplish this behavior without "spinning" - (using a queue timeout). One possible way would be to use more threads to - multiplex the grpc end event with the queue, another possible way is to - use selectors and a custom event/queue object. Both of these approaches - are significant from an engineering perspective for small benefit - the - CPU consumed by spinning is pretty minuscule. Args: queue (queue_module.Queue): The request queue. @@ -96,6 +76,31 @@ def _is_active(self): return self.call is None or self.call.is_active() def __iter__(self): + # The reason this is necessary is because gRPC takes an iterator as the + # request for request-streaming RPCs. gRPC consumes this iterator in + # another thread to allow it to block while generating requests for + # the stream. However, if the generator blocks indefinitely gRPC will + # not be able to clean up the thread as it'll be blocked on + # `next(iterator)` and not be able to check the channel status to stop + # iterating. This helper mitigates that by waiting on the queue with + # a timeout and checking the RPC state before yielding. + # + # Finally, it allows for retrying without swapping queues because if + # it does pull an item off the queue when the RPC is inactive, it'll + # immediately put it back and then exit. This is necessary because + # yielding the item in this case will cause gRPC to discard it. In + # practice, this means that the order of messages is not guaranteed. + # If such a thing is necessary it would be easy to use a priority + # queue. + # + # Note that it is possible to accomplish this behavior without + # "spinning" (using a queue timeout). One possible way would be to use + # more threads to multiplex the grpc end event with the queue, another + # possible way is to use selectors and a custom event/queue object. + # Both of these approaches are significant from an engineering + # perspective for small benefit - the CPU consumed by spinning is + # pretty minuscule. + if self._initial_request is not None: if callable(self._initial_request): yield self._initial_request() @@ -201,7 +206,7 @@ def __repr__(self): ) -class BidiRpc(object): +class BidiRpc(BidiRpcBase): """A helper for consuming a bi-directional streaming RPC. This maps gRPC's built-in interface which uses a request iterator and a @@ -227,6 +232,8 @@ class BidiRpc(object): rpc.send(example_pb2.StreamingRpcRequest( data='example')) + rpc.close() + This does *not* retry the stream on errors. See :class:`ResumableBidiRpc`. Args: @@ -240,40 +247,14 @@ class BidiRpc(object): the request. """ - def __init__(self, start_rpc, initial_request=None, metadata=None): - self._start_rpc = start_rpc - self._initial_request = initial_request - self._rpc_metadata = metadata - self._request_queue = queue_module.Queue() - self._request_generator = None - self._is_active = False - self._callbacks = [] - self.call = None - - def add_done_callback(self, callback): - """Adds a callback that will be called when the RPC terminates. - - This occurs when the RPC errors or is successfully terminated. - - Args: - callback (Callable[[grpc.Future], None]): The callback to execute. - It will be provided with the same gRPC future as the underlying - stream which will also be a :class:`grpc.Call`. - """ - self._callbacks.append(callback) - - def _on_call_done(self, future): - # This occurs when the RPC errors or is successfully terminated. - # Note that grpc's "future" here can also be a grpc.RpcError. - # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331 - # that `grpc.RpcError` is also `grpc.call`. - for callback in self._callbacks: - callback(future) + def _create_queue(self): + """Create a queue for requests.""" + return queue_module.Queue() def open(self): """Opens the stream.""" if self.is_active: - raise ValueError("Can not open an already open stream.") + raise ValueError("Cannot open an already open stream.") request_generator = _RequestQueueGenerator( self._request_queue, initial_request=self._initial_request @@ -322,7 +303,7 @@ def send(self, request): request (protobuf.Message): The request to send. """ if self.call is None: - raise ValueError("Can not send() on an RPC that has never been open()ed.") + raise ValueError("Cannot send on an RPC stream that has never been opened.") # Don't use self.is_active(), as ResumableBidiRpc will overload it # to mean something semantically different. @@ -343,20 +324,15 @@ def recv(self): protobuf.Message: The received message. """ if self.call is None: - raise ValueError("Can not recv() on an RPC that has never been open()ed.") + raise ValueError("Cannot recv on an RPC stream that has never been opened.") return next(self.call) @property def is_active(self): - """bool: True if this stream is currently open and active.""" + """True if this stream is currently open and active.""" return self.call is not None and self.call.is_active() - @property - def pending_requests(self): - """int: Returns an estimate of the number of queued requests.""" - return self._request_queue.qsize() - def _never_terminate(future_or_error): """By default, no errors cause BiDi termination.""" @@ -544,7 +520,7 @@ def _send(self, request): call = self.call if call is None: - raise ValueError("Can not send() on an RPC that has never been open()ed.") + raise ValueError("Cannot send on an RPC that has never been opened.") # Don't use self.is_active(), as ResumableBidiRpc will overload it # to mean something semantically different. @@ -563,7 +539,7 @@ def _recv(self): call = self.call if call is None: - raise ValueError("Can not recv() on an RPC that has never been open()ed.") + raise ValueError("Cannot recv on an RPC that has never been opened.") return next(call) diff --git a/google/api_core/bidi_async.py b/google/api_core/bidi_async.py new file mode 100644 index 00000000..d73b4c98 --- /dev/null +++ b/google/api_core/bidi_async.py @@ -0,0 +1,244 @@ +# Copyright 2025, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Asynchronous bi-directional streaming RPC helpers.""" + +import asyncio +import logging +from typing import Callable, Optional, Union + +from grpc import aio + +from google.api_core import exceptions +from google.api_core.bidi_base import BidiRpcBase + +from google.protobuf.message import Message as ProtobufMessage + + +_LOGGER = logging.getLogger(__name__) + + +class _AsyncRequestQueueGenerator: + """_AsyncRequestQueueGenerator is a helper class for sending asynchronous + requests to a gRPC stream from a Queue. + + This generator takes asynchronous requests off a given `asyncio.Queue` and + yields them to gRPC. + + It's useful when you have an indeterminate, indefinite, or otherwise + open-ended set of requests to send through a request-streaming (or + bidirectional) RPC. + + Example:: + + requests = _AsyncRequestQueueGenerator(q) + call = await stub.StreamingRequest(requests) + requests.call = call + + async for response in call: + print(response) + await q.put(...) + + Args: + queue (asyncio.Queue): The request queue. + initial_request (Union[ProtobufMessage, + Callable[[], ProtobufMessage]]): The initial request to + yield. This is done independently of the request queue to allow for + easily restarting streams that require some initial configuration + request. + """ + + def __init__( + self, + queue: asyncio.Queue, + initial_request: Optional[ + Union[ProtobufMessage, Callable[[], ProtobufMessage]] + ] = None, + ) -> None: + self._queue = queue + self._initial_request = initial_request + self.call: Optional[aio.Call] = None + + def _is_active(self) -> bool: + """Returns true if the call is not set or not completed.""" + # Note: there is a possibility that this starts *before* the call + # property is set. So we have to check if self.call is set before + # seeing if it's active. We need to return True if self.call is None. + # See https://github.com/googleapis/python-api-core/issues/560. + return self.call is None or not self.call.done() + + async def __aiter__(self): + # The reason this is necessary is because it lets the user have + # control on when they would want to send requests proto messages + # instead of sending all of them initially. + # + # This is achieved via asynchronous queue (asyncio.Queue), + # gRPC awaits until there's a message in the queue. + # + # Finally, it allows for retrying without swapping queues because if + # it does pull an item off the queue when the RPC is inactive, it'll + # immediately put it back and then exit. This is necessary because + # yielding the item in this case will cause gRPC to discard it. In + # practice, this means that the order of messages is not guaranteed. + # If preserving order is necessary it would be easy to use a priority + # queue. + if self._initial_request is not None: + if callable(self._initial_request): + yield self._initial_request() + else: + yield self._initial_request + + while True: + item = await self._queue.get() + + # The consumer explicitly sent "None", indicating that the request + # should end. + if item is None: + _LOGGER.debug("Cleanly exiting request generator.") + return + + if not self._is_active(): + # We have an item, but the call is closed. We should put the + # item back on the queue so that the next call can consume it. + await self._queue.put(item) + _LOGGER.debug( + "Inactive call, replacing item on queue and exiting " + "request generator." + ) + return + + yield item + + +class AsyncBidiRpc(BidiRpcBase): + """A helper for consuming a async bi-directional streaming RPC. + + This maps gRPC's built-in interface which uses a request iterator and a + response iterator into a socket-like :func:`send` and :func:`recv`. This + is a more useful pattern for long-running or asymmetric streams (streams + where there is not a direct correlation between the requests and + responses). + + Example:: + + initial_request = example_pb2.StreamingRpcRequest( + setting='example') + rpc = AsyncBidiRpc( + stub.StreamingRpc, + initial_request=initial_request, + metadata=[('name', 'value')] + ) + + await rpc.open() + + while rpc.is_active: + print(await rpc.recv()) + await rpc.send(example_pb2.StreamingRpcRequest( + data='example')) + + await rpc.close() + + This does *not* retry the stream on errors. + + Args: + start_rpc (grpc.aio.StreamStreamMultiCallable): The gRPC method used to + start the RPC. + initial_request (Union[ProtobufMessage, + Callable[[], ProtobufMessage]]): The initial request to + yield. This is useful if an initial request is needed to start the + stream. + metadata (Sequence[Tuple(str, str)]): RPC metadata to include in + the request. + """ + + def _create_queue(self) -> asyncio.Queue: + """Create a queue for requests.""" + return asyncio.Queue() + + async def open(self) -> None: + """Opens the stream.""" + if self.is_active: + raise ValueError("Cannot open an already open stream.") + + request_generator = _AsyncRequestQueueGenerator( + self._request_queue, initial_request=self._initial_request + ) + try: + call = await self._start_rpc(request_generator, metadata=self._rpc_metadata) + except exceptions.GoogleAPICallError as exc: + # The original `grpc.aio.AioRpcError` (which is usually also a + # `grpc.aio.Call`) is available from the ``response`` property on + # the mapped exception. + self._on_call_done(exc.response) + raise + + request_generator.call = call + + # TODO: api_core should expose the future interface for wrapped + # callables as well. + if hasattr(call, "_wrapped"): # pragma: NO COVER + call._wrapped.add_done_callback(self._on_call_done) + else: + call.add_done_callback(self._on_call_done) + + self._request_generator = request_generator + self.call = call + + async def close(self) -> None: + """Closes the stream.""" + if self.call is None: + return + + await self._request_queue.put(None) + self.call.cancel() + self._request_generator = None + self._initial_request = None + self._callbacks = [] + # Don't set self.call to None. Keep it around so that send/recv can + # raise the error. + + async def send(self, request: ProtobufMessage) -> None: + """Queue a message to be sent on the stream. + + If the underlying RPC has been closed, this will raise. + + Args: + request (ProtobufMessage): The request to send. + """ + if self.call is None: + raise ValueError("Cannot send on an RPC stream that has never been opened.") + + if not self.call.done(): + await self._request_queue.put(request) + else: + # calling read should cause the call to raise. + await self.call.read() + + async def recv(self) -> ProtobufMessage: + """Wait for a message to be returned from the stream. + + If the underlying RPC has been closed, this will raise. + + Returns: + ProtobufMessage: The received message. + """ + if self.call is None: + raise ValueError("Cannot recv on an RPC stream that has never been opened.") + + return await self.call.read() + + @property + def is_active(self) -> bool: + """Whether the stream is currently open and active.""" + return self.call is not None and not self.call.done() diff --git a/google/api_core/bidi_base.py b/google/api_core/bidi_base.py new file mode 100644 index 00000000..9288fda4 --- /dev/null +++ b/google/api_core/bidi_base.py @@ -0,0 +1,88 @@ +# Copyright 2025, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may obtain a copy of the License at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Base class for bi-directional streaming RPC helpers.""" + + +class BidiRpcBase: + """A base class for consuming a bi-directional streaming RPC. + + This maps gRPC's built-in interface which uses a request iterator and a + response iterator into a socket-like :func:`send` and :func:`recv`. This + is a more useful pattern for long-running or asymmetric streams (streams + where there is not a direct correlation between the requests and + responses). + + This does *not* retry the stream on errors. + + Args: + start_rpc (Union[grpc.StreamStreamMultiCallable, + grpc.aio.StreamStreamMultiCallable]): The gRPC method used + to start the RPC. + initial_request (Union[protobuf.Message, + Callable[[], protobuf.Message]]): The initial request to + yield. This is useful if an initial request is needed to start the + stream. + metadata (Sequence[Tuple(str, str)]): RPC metadata to include in + the request. + """ + + def __init__(self, start_rpc, initial_request=None, metadata=None): + self._start_rpc = start_rpc + self._initial_request = initial_request + self._rpc_metadata = metadata + self._request_queue = self._create_queue() + self._request_generator = None + self._callbacks = [] + self.call = None + + def _create_queue(self): + """Create a queue for requests.""" + raise NotImplementedError("`_create_queue` is not implemented.") + + def add_done_callback(self, callback): + """Adds a callback that will be called when the RPC terminates. + + This occurs when the RPC errors or is successfully terminated. + + Args: + callback (Union[Callable[[grpc.Future], None], Callable[[Any], None]]): + The callback to execute after gRPC call completed (success or + failure). + + For sync streaming gRPC: Callable[[grpc.Future], None] + + For async streaming gRPC: Callable[[Any], None] + """ + self._callbacks.append(callback) + + def _on_call_done(self, future): + # This occurs when the RPC errors or is successfully terminated. + # Note that grpc's "future" here can also be a grpc.RpcError. + # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331 + # that `grpc.RpcError` is also `grpc.Call`. + # for asynchronous gRPC call it would be `grpc.aio.AioRpcError` + + # Note: sync callbacks can be limiting for async code, because you can't + # await anything in a sync callback. + for callback in self._callbacks: + callback(future) + + @property + def is_active(self): + """True if the gRPC call is not done yet.""" + raise NotImplementedError("`is_active` is not implemented.") + + @property + def pending_requests(self): + """Estimate of the number of queued requests.""" + return self._request_queue.qsize() diff --git a/tests/asyncio/test_bidi_async.py b/tests/asyncio/test_bidi_async.py new file mode 100644 index 00000000..696113db --- /dev/null +++ b/tests/asyncio/test_bidi_async.py @@ -0,0 +1,305 @@ +# Copyright 2025, Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import asyncio + +from unittest import mock + +try: + from unittest.mock import AsyncMock +except ImportError: # pragma: NO COVER + from mock import AsyncMock # type: ignore + + +import pytest + +try: + from grpc import aio +except ImportError: # pragma: NO COVER + pytest.skip("No GRPC", allow_module_level=True) + +from google.api_core import bidi_async +from google.api_core import exceptions + +# TODO: remove this when droppping support for "Python 3.10" and below. +if sys.version_info < (3, 10): # type: ignore[operator] + + def aiter(obj): + return obj.__aiter__() + + async def anext(obj): + return await obj.__anext__() + + +@pytest.mark.asyncio +class Test_AsyncRequestQueueGenerator: + async def test_bounded_consume(self): + call = mock.create_autospec(aio.Call, instance=True) + call.done.return_value = False + + q = asyncio.Queue() + await q.put(mock.sentinel.A) + await q.put(mock.sentinel.B) + + generator = bidi_async._AsyncRequestQueueGenerator(q) + generator.call = call + + items = [] + gen_aiter = aiter(generator) + + items.append(await anext(gen_aiter)) + items.append(await anext(gen_aiter)) + + # At this point, the queue is empty. The next call to anext will sleep. + # We make the call inactive. + call.done.return_value = True + + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(anext(gen_aiter), timeout=0.01) + + assert items == [mock.sentinel.A, mock.sentinel.B] + + async def test_yield_initial_and_exit(self): + q = asyncio.Queue() + call = mock.create_autospec(aio.Call, instance=True) + call.done.return_value = True + + generator = bidi_async._AsyncRequestQueueGenerator( + q, initial_request=mock.sentinel.A + ) + generator.call = call + + assert await anext(aiter(generator)) == mock.sentinel.A + + async def test_yield_initial_callable_and_exit(self): + q = asyncio.Queue() + call = mock.create_autospec(aio.Call, instance=True) + call.done.return_value = True + + generator = bidi_async._AsyncRequestQueueGenerator( + q, initial_request=lambda: mock.sentinel.A + ) + generator.call = call + + assert await anext(aiter(generator)) == mock.sentinel.A + + async def test_exit_when_inactive_with_item(self): + q = asyncio.Queue() + await q.put(mock.sentinel.A) + + call = mock.create_autospec(aio.Call, instance=True) + call.done.return_value = True + + generator = bidi_async._AsyncRequestQueueGenerator(q) + generator.call = call + + with pytest.raises( + StopAsyncIteration, + ): + assert await anext(aiter(generator)) + + # Make sure it put the item back. + assert not q.empty() + assert await q.get() == mock.sentinel.A + + async def test_exit_when_inactive_empty(self): + q = asyncio.Queue() + call = mock.create_autospec(aio.Call, instance=True) + call.done.return_value = True + + generator = bidi_async._AsyncRequestQueueGenerator(q) + generator.call = call + + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(anext(aiter(generator)), timeout=0.01) + + async def test_exit_with_stop(self): + q = asyncio.Queue() + await q.put(None) + call = mock.create_autospec(aio.Call, instance=True) + call.done.return_value = False + + generator = bidi_async._AsyncRequestQueueGenerator(q) + generator.call = call + + with pytest.raises(StopAsyncIteration): + assert await anext(aiter(generator)) + + +def make_async_rpc(): + """Makes a mock async RPC used to test Bidi classes.""" + call = mock.create_autospec(aio.StreamStreamCall, instance=True) + rpc = AsyncMock() + + def rpc_side_effect(request, metadata=None): + call.done.return_value = False + return call + + rpc.side_effect = rpc_side_effect + + def cancel_side_effect(): + call.done.return_value = True + return True + + call.cancel.side_effect = cancel_side_effect + call.read = AsyncMock() + + return rpc, call + + +class AsyncClosedCall: + def __init__(self, exception): + self.exception = exception + + def done(self): + return True + + async def read(self): + raise self.exception + + +class TestAsyncBidiRpc: + def test_initial_state(self): + bidi_rpc = bidi_async.AsyncBidiRpc(None) + assert bidi_rpc.is_active is False + + def test_done_callbacks(self): + bidi_rpc = bidi_async.AsyncBidiRpc(None) + callback = mock.Mock(spec=["__call__"]) + + bidi_rpc.add_done_callback(callback) + bidi_rpc._on_call_done(mock.sentinel.future) + + callback.assert_called_once_with(mock.sentinel.future) + + @pytest.mark.asyncio + @pytest.mark.skipif( + sys.version_info < (3, 8), # type: ignore[operator] + reason="Versions of Python below 3.8 don't provide support for assert_awaited_once", + ) + async def test_metadata(self): + rpc, call = make_async_rpc() + bidi_rpc = bidi_async.AsyncBidiRpc(rpc, metadata=mock.sentinel.A) + assert bidi_rpc._rpc_metadata == mock.sentinel.A + + await bidi_rpc.open() + assert bidi_rpc.call == call + rpc.assert_awaited_once() + assert rpc.call_args.kwargs["metadata"] == mock.sentinel.A + + @pytest.mark.asyncio + async def test_open(self): + rpc, call = make_async_rpc() + bidi_rpc = bidi_async.AsyncBidiRpc(rpc) + + await bidi_rpc.open() + + assert bidi_rpc.call == call + assert bidi_rpc.is_active + call.add_done_callback.assert_called_once_with(bidi_rpc._on_call_done) + + @pytest.mark.asyncio + async def test_open_error_already_open(self): + rpc, _ = make_async_rpc() + bidi_rpc = bidi_async.AsyncBidiRpc(rpc) + + await bidi_rpc.open() + + with pytest.raises(ValueError): + await bidi_rpc.open() + + @pytest.mark.asyncio + async def test_open_error_call_error(self): + rpc, _ = make_async_rpc() + expected_exception = exceptions.GoogleAPICallError( + "test", response=mock.sentinel.response + ) + rpc.side_effect = expected_exception + bidi_rpc = bidi_async.AsyncBidiRpc(rpc) + callback = mock.Mock(spec=["__call__"]) + bidi_rpc.add_done_callback(callback) + + with pytest.raises(exceptions.GoogleAPICallError) as exc_info: + await bidi_rpc.open() + + assert exc_info.value == expected_exception + callback.assert_called_once_with(mock.sentinel.response) + + @pytest.mark.asyncio + async def test_close(self): + rpc, call = make_async_rpc() + bidi_rpc = bidi_async.AsyncBidiRpc(rpc) + await bidi_rpc.open() + + await bidi_rpc.close() + + call.cancel.assert_called_once() + assert bidi_rpc.call is call + assert bidi_rpc.is_active is False + # ensure the request queue was signaled to stop. + assert bidi_rpc.pending_requests == 1 + assert await bidi_rpc._request_queue.get() is None + # ensure request and callbacks are cleaned up + assert bidi_rpc._initial_request is None + assert not bidi_rpc._callbacks + + @pytest.mark.asyncio + async def test_close_no_rpc(self): + bidi_rpc = bidi_async.AsyncBidiRpc(None) + await bidi_rpc.close() + + @pytest.mark.asyncio + async def test_send(self): + rpc, call = make_async_rpc() + bidi_rpc = bidi_async.AsyncBidiRpc(rpc) + await bidi_rpc.open() + + await bidi_rpc.send(mock.sentinel.request) + + assert bidi_rpc.pending_requests == 1 + assert await bidi_rpc._request_queue.get() is mock.sentinel.request + + @pytest.mark.asyncio + async def test_send_not_open(self): + bidi_rpc = bidi_async.AsyncBidiRpc(None) + + with pytest.raises(ValueError): + await bidi_rpc.send(mock.sentinel.request) + + @pytest.mark.asyncio + async def test_send_dead_rpc(self): + error = ValueError() + bidi_rpc = bidi_async.AsyncBidiRpc(None) + bidi_rpc.call = AsyncClosedCall(error) + + with pytest.raises(ValueError): + await bidi_rpc.send(mock.sentinel.request) + + @pytest.mark.asyncio + async def test_recv(self): + bidi_rpc = bidi_async.AsyncBidiRpc(None) + bidi_rpc.call = mock.create_autospec(aio.Call, instance=True) + bidi_rpc.call.read = AsyncMock(return_value=mock.sentinel.response) + + response = await bidi_rpc.recv() + + assert response == mock.sentinel.response + + @pytest.mark.asyncio + async def test_recv_not_open(self): + bidi_rpc = bidi_async.AsyncBidiRpc(None) + + with pytest.raises(ValueError): + await bidi_rpc.recv()