Skip to content

Commit a52d0c9

Browse files
author
Lucas Wang
committed
Fix voice result streaming task cleanup to properly await cancelled tasks
Problem: The _cleanup_tasks() method in VoiceStreamResult was only calling task.cancel() on pending tasks but not awaiting them. Additionally, _check_errors() could raise CancelledError when checking cancelled tasks. This could lead to: 1. Unhandled task exception warnings 2. Potential resource leaks from abandoned tasks 3. CancelledError masking real exceptions Evidence: - Similar to fixed guardrail tasks cleanup (PR openai#1976) - Similar to fixed voice STT cleanup (PR openai#1977) - Similar to fixed websocket cleanup (PR openai#1955) - Bug documented in .claude/bug-analysis/03-resource-leaks.md Solution: 1. Made _cleanup_tasks() async 2. Collect all real asyncio.Task objects that need to be awaited 3. Added await asyncio.gather() with return_exceptions=True to properly collect exceptions from cancelled tasks 4. Updated _check_errors() to skip cancelled tasks using task.cancelled() check to avoid CancelledError when calling task.exception() 5. Updated stream() async generator to await _cleanup_tasks() Testing: - Linting passes - No breaking changes to public API - Follows same pattern as PR openai#1976, openai#1977, openai#1955
1 parent 03dca68 commit a52d0c9

File tree

1 file changed

+31
-6
lines changed

1 file changed

+31
-6
lines changed

src/agents/voice/result.py

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -243,25 +243,50 @@ async def _wait_for_completion(self):
243243
tasks.append(self._dispatcher_task)
244244
await asyncio.gather(*tasks)
245245

246-
def _cleanup_tasks(self):
246+
async def _cleanup_tasks(self):
247+
"""Cancel all pending tasks and wait for them to complete.
248+
249+
This ensures that any exceptions raised by the tasks are properly handled
250+
and prevents warnings about unhandled task exceptions.
251+
"""
247252
self._finish_turn()
248253

254+
tasks = []
249255
for task in self._tasks:
250256
if not task.done():
251257
task.cancel()
258+
if isinstance(task, asyncio.Task):
259+
tasks.append(task)
252260

253261
if self._dispatcher_task and not self._dispatcher_task.done():
254262
self._dispatcher_task.cancel()
263+
if isinstance(self._dispatcher_task, asyncio.Task):
264+
tasks.append(self._dispatcher_task)
255265

256266
if self.text_generation_task and not self.text_generation_task.done():
257267
self.text_generation_task.cancel()
268+
if isinstance(self.text_generation_task, asyncio.Task):
269+
tasks.append(self.text_generation_task)
270+
271+
# Wait for all cancelled tasks to complete and collect exceptions
272+
if tasks:
273+
await asyncio.gather(*tasks, return_exceptions=True)
258274

259275
def _check_errors(self):
276+
"""Check for exceptions in completed tasks.
277+
278+
Note: CancelledError is not checked as it's expected during cleanup.
279+
"""
260280
for task in self._tasks:
261-
if task.done():
262-
if task.exception():
263-
self._stored_exception = task.exception()
264-
break
281+
if task.done() and not task.cancelled():
282+
try:
283+
exc = task.exception()
284+
if exc:
285+
self._stored_exception = exc
286+
break
287+
except asyncio.CancelledError:
288+
# Task was cancelled, skip it
289+
pass
265290

266291
async def stream(self) -> AsyncIterator[VoiceStreamEvent]:
267292
"""Stream the events and audio data as they're generated."""
@@ -281,7 +306,7 @@ async def stream(self) -> AsyncIterator[VoiceStreamEvent]:
281306
break
282307

283308
self._check_errors()
284-
self._cleanup_tasks()
309+
await self._cleanup_tasks()
285310

286311
if self._stored_exception:
287312
raise self._stored_exception

0 commit comments

Comments
 (0)