Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions src/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import inspect
import os
import warnings
from dataclasses import dataclass, field
from typing import Any, Callable, Generic, cast, get_args

Expand Down Expand Up @@ -720,19 +721,37 @@ def run_sync(
conversation_id = kwargs.get("conversation_id")
session = kwargs.get("session")

# Python 3.14 no longer creates a default loop implicitly, so we inspect the running loop.
# Python 3.14 stopped implicitly wiring up a default event loop
# when synchronous code touches asyncio APIs for the first time.
# Several of our synchronous entry points (for example the Redis/SQLAlchemy session helpers)
# construct asyncio primitives like asyncio.Lock during __init__,
# which binds them to whatever loop happens to be the thread's default at that moment.
# To keep those locks usable we must ensure that run_sync reuses that same default loop
# instead of hopping over to a brand-new asyncio.run() loop.
try:
loop = asyncio.get_running_loop()
already_running_loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
already_running_loop = None

if loop is not None:
if already_running_loop is not None:
# This method is only expected to run when no loop is already active.
raise RuntimeError(
"AgentRunner.run_sync() cannot be called when an event loop is already running."
)

return asyncio.run(
policy = asyncio.get_event_loop_policy()
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
try:
default_loop = policy.get_event_loop()
except RuntimeError:
default_loop = policy.new_event_loop()
policy.set_event_loop(default_loop)

# We intentionally leave the default loop open even if we had to create one above. Session
# instances and other helpers stash loop-bound primitives between calls and expect to find
# the same default loop every time run_sync is invoked on this thread.
return default_loop.run_until_complete(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Cancel pending task when run_sync is aborted

The new implementation leaves the coroutine returned by self.run() attached to the default loop and simply calls default_loop.run_until_complete(...) (lines 751‑754). When the loop is interrupted before completion—e.g. a user presses Ctrl+C and triggers KeyboardInterruptrun_until_complete unwinds immediately and does not cancel the task it created. Because we now keep the loop open for reuse, that unfinished task remains pending on the loop and resumes the next time run_sync is invoked, so an aborted agent run keeps running concurrently with the next run. This regression is easy to reproduce: start run_sync, hit Ctrl+C to abort it, then call run_sync again and observe the first run continue. The previous asyncio.run call avoided this by creating a fresh loop per invocation and tearing it down on interruption. Please wrap the scheduled coroutine in a task and cancel/await it when run_until_complete exits with an exception (especially KeyboardInterrupt) so that aborted runs do not linger on the shared loop.

Useful? React with 👍 / 👎.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codex i've resolved this issue. can you review the changes again?

self.run(
starting_agent,
input,
Expand Down
73 changes: 73 additions & 0 deletions tests/test_agent_runner_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import asyncio
from collections.abc import Generator

import pytest

from agents.run import AgentRunner


@pytest.fixture
def fresh_event_loop_policy() -> Generator[asyncio.AbstractEventLoopPolicy, None, None]:
policy_before = asyncio.get_event_loop_policy()
new_policy = asyncio.DefaultEventLoopPolicy()
asyncio.set_event_loop_policy(new_policy)
try:
yield new_policy
finally:
asyncio.set_event_loop_policy(policy_before)


def test_run_sync_reuses_existing_default_loop(monkeypatch, fresh_event_loop_policy):
runner = AgentRunner()
observed_loops: list[asyncio.AbstractEventLoop] = []

async def fake_run(self, *_args, **_kwargs):
observed_loops.append(asyncio.get_running_loop())
return object()

monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False)

test_loop = asyncio.new_event_loop()
fresh_event_loop_policy.set_event_loop(test_loop)

try:
runner.run_sync(object(), "input")
assert observed_loops and observed_loops[0] is test_loop
finally:
fresh_event_loop_policy.set_event_loop(None)
test_loop.close()


def test_run_sync_creates_default_loop_when_missing(monkeypatch, fresh_event_loop_policy):
runner = AgentRunner()
observed_loops: list[asyncio.AbstractEventLoop] = []

async def fake_run(self, *_args, **_kwargs):
observed_loops.append(asyncio.get_running_loop())
return object()

monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False)

fresh_event_loop_policy.set_event_loop(None)

runner.run_sync(object(), "input")
created_loop = observed_loops[0]
assert created_loop is fresh_event_loop_policy.get_event_loop()

fresh_event_loop_policy.set_event_loop(None)
created_loop.close()


def test_run_sync_errors_when_loop_already_running(monkeypatch, fresh_event_loop_policy):
runner = AgentRunner()

async def fake_run(self, *_args, **_kwargs):
return object()

monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False)

async def invoke():
with pytest.raises(RuntimeError):
runner.run_sync(object(), "input")

asyncio.run(invoke())
Loading