Skip to content

Commit 695de92

Browse files
committed
fix: closes tailing streams in bidi classes.
Always put `None` into the request queue when closing a bidi stream. This ensures that the request queue is always signaled as closed, even if the underlying gRPC call object is not yet available.
1 parent a4b291f commit 695de92

File tree

4 files changed

+31
-7
lines changed

4 files changed

+31
-7
lines changed

google/api_core/bidi.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,11 +281,11 @@ def open(self):
281281

282282
def close(self):
283283
"""Closes the stream."""
284-
if self.call is None:
285-
return
284+
if self.call is not None:
285+
self.call.cancel()
286286

287+
# Put None in request queue to signal termination.
287288
self._request_queue.put(None)
288-
self.call.cancel()
289289
self._request_generator = None
290290
self._initial_request = None
291291
self._callbacks = []

google/api_core/bidi_async.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,11 +197,11 @@ async def open(self) -> None:
197197

198198
async def close(self) -> None:
199199
"""Closes the stream."""
200-
if self.call is None:
201-
return
200+
if self.call is not None:
201+
self.call.cancel()
202202

203+
# Put None in request queue to signal termination.
203204
await self._request_queue.put(None)
204-
self.call.cancel()
205205
self._request_generator = None
206206
self._initial_request = None
207207
self._callbacks = []

tests/asyncio/test_bidi_async.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,21 @@ async def test_close(self):
255255
assert bidi_rpc._initial_request is None
256256
assert not bidi_rpc._callbacks
257257

258+
@pytest.mark.asyncio
259+
async def test_close_with_no_rpc(self):
260+
bidi_rpc = bidi_async.AsyncBidiRpc(None)
261+
262+
await bidi_rpc.close()
263+
264+
assert bidi_rpc.call is None
265+
assert bidi_rpc.is_active is False
266+
# ensure the request queue was signaled to stop.
267+
assert bidi_rpc.pending_requests == 1
268+
assert await bidi_rpc._request_queue.get() is None
269+
# ensure request and callbacks are cleaned up
270+
assert bidi_rpc._initial_request is None
271+
assert not bidi_rpc._callbacks
272+
258273
@pytest.mark.asyncio
259274
async def test_close_no_rpc(self):
260275
bidi_rpc = bidi_async.AsyncBidiRpc(None)

tests/unit/test_bidi.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,10 +301,19 @@ def test_close(self):
301301
assert bidi_rpc._initial_request is None
302302
assert not bidi_rpc._callbacks
303303

304-
def test_close_no_rpc(self):
304+
def test_close_with_no_rpc(self):
305305
bidi_rpc = bidi.BidiRpc(None)
306306
bidi_rpc.close()
307307

308+
assert bidi_rpc.call is None
309+
assert bidi_rpc.is_active is False
310+
# ensure the request queue was signaled to stop.
311+
assert bidi_rpc.pending_requests == 1
312+
assert bidi_rpc._request_queue.get() is None
313+
# ensure request and callbacks are cleaned up
314+
assert bidi_rpc._initial_request is None
315+
assert not bidi_rpc._callbacks
316+
308317
def test_send(self):
309318
rpc, call = make_rpc()
310319
bidi_rpc = bidi.BidiRpc(rpc)

0 commit comments

Comments
 (0)