Skip to content

Commit 93dff59

Browse files
committed
fix: closes tailing streams in bidi classes.
Always put `None` into the request queue when closing a bidi stream. This ensures that the request queue is always signaled as closed, even if the underlying gRPC call object is not yet available.
1 parent a4b291f commit 93dff59

File tree

4 files changed

+29
-11
lines changed

4 files changed

+29
-11
lines changed

google/api_core/bidi.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -281,13 +281,12 @@ def open(self):
281281

282282
def close(self):
283283
"""Closes the stream."""
284-
if self.call is None:
285-
return
284+
if self.call is not None:
285+
self.call.cancel()
286286

287+
# Put None in request queue to signal termination.
287288
self._request_queue.put(None)
288-
self.call.cancel()
289289
self._request_generator = None
290-
self._initial_request = None
291290
self._callbacks = []
292291
# Don't set self.call to None. Keep it around so that send/recv can
293292
# raise the error.

google/api_core/bidi_async.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,13 +197,12 @@ async def open(self) -> None:
197197

198198
async def close(self) -> None:
199199
"""Closes the stream."""
200-
if self.call is None:
201-
return
200+
if self.call is not None:
201+
self.call.cancel()
202202

203+
# Put None in request queue to signal termination.
203204
await self._request_queue.put(None)
204-
self.call.cancel()
205205
self._request_generator = None
206-
self._initial_request = None
207206
self._callbacks = []
208207
# Don't set self.call to None. Keep it around so that send/recv can
209208
# raise the error.

tests/asyncio/test_bidi_async.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,20 @@ async def test_close(self):
252252
assert bidi_rpc.pending_requests == 1
253253
assert await bidi_rpc._request_queue.get() is None
254254
# ensure request and callbacks are cleaned up
255-
assert bidi_rpc._initial_request is None
255+
assert not bidi_rpc._callbacks
256+
257+
@pytest.mark.asyncio
258+
async def test_close_with_no_rpc(self):
259+
bidi_rpc = bidi_async.AsyncBidiRpc(None)
260+
261+
await bidi_rpc.close()
262+
263+
assert bidi_rpc.call is None
264+
assert bidi_rpc.is_active is False
265+
# ensure the request queue was signaled to stop.
266+
assert bidi_rpc.pending_requests == 1
267+
assert await bidi_rpc._request_queue.get() is None
268+
# ensure request and callbacks are cleaned up
256269
assert not bidi_rpc._callbacks
257270

258271
@pytest.mark.asyncio

tests/unit/test_bidi.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,13 +298,20 @@ def test_close(self):
298298
assert bidi_rpc.pending_requests == 1
299299
assert bidi_rpc._request_queue.get() is None
300300
# ensure request and callbacks are cleaned up
301-
assert bidi_rpc._initial_request is None
302301
assert not bidi_rpc._callbacks
303302

304-
def test_close_no_rpc(self):
303+
def test_close_with_no_rpc(self):
305304
bidi_rpc = bidi.BidiRpc(None)
306305
bidi_rpc.close()
307306

307+
assert bidi_rpc.call is None
308+
assert bidi_rpc.is_active is False
309+
# ensure the request queue was signaled to stop.
310+
assert bidi_rpc.pending_requests == 1
311+
assert bidi_rpc._request_queue.get() is None
312+
# ensure request and callbacks are cleaned up
313+
assert not bidi_rpc._callbacks
314+
308315
def test_send(self):
309316
rpc, call = make_rpc()
310317
bidi_rpc = bidi.BidiRpc(rpc)

0 commit comments

Comments
 (0)