Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion livekit-agents/livekit/agents/voice/speech_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from opentelemetry import context as otel_context

from .. import llm, utils
from ..log import logger

INTERRUPTION_TIMEOUT = 5.0 # seconds


class SpeechHandle:
Expand Down Expand Up @@ -35,6 +38,8 @@ def __init__(self, *, speech_id: str, allow_interruptions: bool) -> None:
self._num_steps = 1
self._agent_turn_context: otel_context.Context | None = None

self._interrupt_timeout_handle: asyncio.TimerHandle | None = None

self._item_added_callbacks: set[Callable[[llm.ChatItem], None]] = set()
self._done_callbacks: set[Callable[[SpeechHandle], None]] = set()

Expand Down Expand Up @@ -172,9 +177,22 @@ def _cancel(self) -> SpeechHandle:
if self.done():
return self

with contextlib.suppress(asyncio.InvalidStateError):
if not self._interrupt_fut.done():
self._interrupt_fut.set_result(None)

def _on_timeout() -> None:
logger.error(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check the done state again here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both _mark_done and the timeout handle are sync method, the _on_timeout is only called when _mark_done isn't called.

"speech not done in time after interruption, cancelling the speech arbitrarily.",
extra={"speech_id": self._id, "timeout": INTERRUPTION_TIMEOUT},
)
for task in self._tasks:
task.cancel()
self._mark_done()

self._interrupt_timeout_handle = asyncio.get_event_loop().call_later(
INTERRUPTION_TIMEOUT, _on_timeout
)

return self

def _add_item_added_callback(self, callback: Callable[[llm.ChatItem], Any]) -> None:
Expand Down Expand Up @@ -224,6 +242,10 @@ def _mark_done(self) -> None:
if self._generations:
self._mark_generation_done() # preemptive generation could be cancelled before being scheduled

if self._interrupt_timeout_handle is not None:
self._interrupt_timeout_handle.cancel()
self._interrupt_timeout_handle = None

def _mark_scheduled(self) -> None:
with contextlib.suppress(asyncio.InvalidStateError):
self._scheduled_fut.set_result(None)