Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions src/agents/run.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

import asyncio
import contextlib
import inspect
import os
import warnings
from dataclasses import dataclass, field
from typing import Any, Callable, Generic, cast, get_args

Expand Down Expand Up @@ -720,19 +722,37 @@ def run_sync(
conversation_id = kwargs.get("conversation_id")
session = kwargs.get("session")

# Python 3.14 no longer creates a default loop implicitly, so we inspect the running loop.
# Python 3.14 stopped implicitly wiring up a default event loop
# when synchronous code touches asyncio APIs for the first time.
# Several of our synchronous entry points (for example the Redis/SQLAlchemy session helpers)
# construct asyncio primitives like asyncio.Lock during __init__,
# which binds them to whatever loop happens to be the thread's default at that moment.
# To keep those locks usable we must ensure that run_sync reuses that same default loop
# instead of hopping over to a brand-new asyncio.run() loop.
try:
loop = asyncio.get_running_loop()
already_running_loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
already_running_loop = None

if loop is not None:
if already_running_loop is not None:
# This method is only expected to run when no loop is already active.
raise RuntimeError(
"AgentRunner.run_sync() cannot be called when an event loop is already running."
)

return asyncio.run(
policy = asyncio.get_event_loop_policy()
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
try:
default_loop = policy.get_event_loop()
except RuntimeError:
default_loop = policy.new_event_loop()
policy.set_event_loop(default_loop)

# We intentionally leave the default loop open even if we had to create one above. Session
# instances and other helpers stash loop-bound primitives between calls and expect to find
# the same default loop every time run_sync is invoked on this thread.
task = default_loop.create_task(
self.run(
starting_agent,
input,
Expand All @@ -746,6 +766,15 @@ def run_sync(
)
)

try:
return default_loop.run_until_complete(task)
except BaseException:
Comment on lines 746 to 775

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge finalize async generators before returning

The new run_sync implementation stops using asyncio.run/Runner, but it never performs the cleanup that Runner.close() used to do. After the default_loop.run_until_complete(task) call the code simply returns, so _asyncgens is never drained with loop.shutdown_asyncgens(). Any async generators created inside AgentRunner.run (or user hooks/tools) that aren’t fully consumed now keep their finally blocks from running unless another loop run happens later. You can reproduce the regression by invoking run_sync on a coroutine that does g = agen(); await g.asend(None) where agen has a finally: cleanup. Before this change cleanup would run as soon as run_sync returned; now it never runs because the loop is left open and the registered async generators are never closed. This leaks resources and makes async generators accumulate across runs. Please call default_loop.run_until_complete(default_loop.shutdown_asyncgens()) (and then return the result) so the loop still stays open but async generators are finalized after every synchronous run.

Useful? React with 👍 / 👎.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codex improved this; can you review the changes again?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codex Review: Didn't find any major issues. What shall we delve into next?

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

if not task.done():
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
default_loop.run_until_complete(task)
raise

def run_streamed(
self,
starting_agent: Agent[TContext],
Expand Down
120 changes: 120 additions & 0 deletions tests/test_agent_runner_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import asyncio
from collections.abc import Generator
from typing import Any

import pytest

from agents.agent import Agent
from agents.run import AgentRunner


@pytest.fixture
def fresh_event_loop_policy() -> Generator[asyncio.AbstractEventLoopPolicy, None, None]:
policy_before = asyncio.get_event_loop_policy()
new_policy = asyncio.DefaultEventLoopPolicy()
asyncio.set_event_loop_policy(new_policy)
try:
yield new_policy
finally:
asyncio.set_event_loop_policy(policy_before)


def test_run_sync_reuses_existing_default_loop(monkeypatch, fresh_event_loop_policy):
runner = AgentRunner()
observed_loops: list[asyncio.AbstractEventLoop] = []

async def fake_run(self, *_args, **_kwargs):
observed_loops.append(asyncio.get_running_loop())
return object()

monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False)

test_loop = asyncio.new_event_loop()
fresh_event_loop_policy.set_event_loop(test_loop)

try:
runner.run_sync(Agent(name="test-agent"), "input")
assert observed_loops and observed_loops[0] is test_loop
finally:
fresh_event_loop_policy.set_event_loop(None)
test_loop.close()


def test_run_sync_creates_default_loop_when_missing(monkeypatch, fresh_event_loop_policy):
runner = AgentRunner()
observed_loops: list[asyncio.AbstractEventLoop] = []

async def fake_run(self, *_args, **_kwargs):
observed_loops.append(asyncio.get_running_loop())
return object()

monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False)

fresh_event_loop_policy.set_event_loop(None)

runner.run_sync(Agent(name="test-agent"), "input")
created_loop = observed_loops[0]
assert created_loop is fresh_event_loop_policy.get_event_loop()

fresh_event_loop_policy.set_event_loop(None)
created_loop.close()


def test_run_sync_errors_when_loop_already_running(monkeypatch, fresh_event_loop_policy):
runner = AgentRunner()

async def fake_run(self, *_args, **_kwargs):
return object()

monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False)

async def invoke():
with pytest.raises(RuntimeError):
runner.run_sync(Agent(name="test-agent"), "input")

asyncio.run(invoke())


def test_run_sync_cancels_task_when_interrupted(monkeypatch, fresh_event_loop_policy):
runner = AgentRunner()

async def fake_run(self, *_args, **_kwargs):
await asyncio.sleep(3600)

monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False)

test_loop = asyncio.new_event_loop()
fresh_event_loop_policy.set_event_loop(test_loop)

created_tasks: list[asyncio.Task[Any]] = []
original_create_task = test_loop.create_task

def capturing_create_task(coro):
task = original_create_task(coro)
created_tasks.append(task)
return task

original_run_until_complete = test_loop.run_until_complete
call_count = {"value": 0}

def interrupt_once(future):
call_count["value"] += 1
if call_count["value"] == 1:
raise KeyboardInterrupt()
return original_run_until_complete(future)

monkeypatch.setattr(test_loop, "create_task", capturing_create_task)
monkeypatch.setattr(test_loop, "run_until_complete", interrupt_once)

try:
with pytest.raises(KeyboardInterrupt):
runner.run_sync(Agent(name="test-agent"), "input")

assert created_tasks, "Expected run_sync to schedule a task."
assert created_tasks[0].done()
assert created_tasks[0].cancelled()
assert call_count["value"] >= 2
finally:
monkeypatch.undo()
fresh_event_loop_policy.set_event_loop(None)
test_loop.close()