Skip to content
26 changes: 21 additions & 5 deletions livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -1667,7 +1667,6 @@ def _on_first_frame(fut: asyncio.Future[float] | asyncio.Future[None]) -> None:
started_speaking_at = fut.result() or time.time()
except BaseException:
return

self._session._update_agent_state(
"speaking",
start_time=started_speaking_at,
Expand Down Expand Up @@ -1921,6 +1920,8 @@ async def _pipeline_reply_task_impl(
await text_tee.aclose()
return

self._session._update_agent_state("thinking")

reply_started_at = time.time()

async def _read_text(
Expand Down Expand Up @@ -1952,10 +1953,17 @@ def _on_first_frame(fut: asyncio.Future[float] | asyncio.Future[None]) -> None:
"""
nonlocal started_speaking_at
try:
# Check future state before getting result
# Note: This callback is registered via add_done_callback, so fut.done() should always be True
if fut.cancelled():
return

if not fut.done():
return

started_speaking_at = fut.result() or time.time()
except BaseException:
except (asyncio.CancelledError, Exception):
return

self._session._update_agent_state(
"speaking",
start_time=started_speaking_at,
Expand Down Expand Up @@ -2321,10 +2329,18 @@ def _on_first_frame(fut: asyncio.Future[float] | asyncio.Future[None]) -> None:
"""
nonlocal started_speaking_at
try:
# Check future state before getting result
# Note: This callback is registered via add_done_callback, so fut.done() should always be True
# However, we check anyway for safety
if fut.cancelled():
return

if not fut.done():
return

started_speaking_at = fut.result() or time.time()
except BaseException:
except (asyncio.CancelledError, Exception):
return

self._session._update_agent_state(
"speaking",
start_time=started_speaking_at,
Expand Down
7 changes: 3 additions & 4 deletions livekit-agents/livekit/agents/voice/agent_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1196,10 +1196,9 @@ def _update_agent_state(

old_state = self._agent_state
self._agent_state = state
self.emit(
"agent_state_changed",
AgentStateChangedEvent(old_state=old_state, new_state=state),
)

event = AgentStateChangedEvent(old_state=old_state, new_state=state)
self.emit("agent_state_changed", event)

def _update_user_state(
self, state: UserState, *, last_speaking_time: float | None = None
Expand Down
6 changes: 6 additions & 0 deletions livekit-agents/livekit/agents/voice/avatar/_datastream_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import json
import math
import time
from collections.abc import AsyncIterator
from dataclasses import asdict
from typing import Any, Callable, Union
Expand Down Expand Up @@ -134,6 +135,11 @@ async def capture_frame(self, frame: rtc.AudioFrame) -> None:
},
)
self._pushed_duration = 0.0
# Trigger playback_started event when first frame is captured
# This is needed for first_frame_fut to complete
# DataStreamAudioOutput is the end of the audio chain (next_in_chain=None),
# so it must trigger playback_started itself since no downstream component will
self.on_playback_started(created_at=time.time())
await self._stream_writer.write(bytes(frame.data))
self._pushed_duration += frame.duration

Expand Down
6 changes: 6 additions & 0 deletions livekit-agents/livekit/agents/voice/avatar/_queue_io.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import time
from collections.abc import AsyncIterator
from typing import Literal, Union

Expand Down Expand Up @@ -37,6 +38,11 @@ async def capture_frame(self, frame: rtc.AudioFrame) -> None:
await super().capture_frame(frame)
if not self._capturing:
self._capturing = True
# Trigger playback_started event when first frame is captured
# This is needed for first_frame_fut to complete
# QueueAudioOutput is the end of the audio chain (next_in_chain=None),
# so it must trigger playback_started itself since no downstream component will
self.on_playback_started(created_at=time.time())

await self._data_ch.send(frame)

Expand Down