Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion livekit-agents/livekit/agents/voice/room_io/_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
from ..transcription import find_micro_track_id


class _InterruptedError(Exception):
pass


class _ParticipantAudioOutput(io.AudioOutput):
def __init__(
self,
Expand Down Expand Up @@ -57,6 +61,9 @@ def __init__(
self._playback_enabled = asyncio.Event()
self._playback_enabled.set()

self._first_frame: rtc.AudioFrame | None = None
self._first_frame_fut: asyncio.Future[None] | None = None

async def _publish_track(self) -> None:
async with self._lock:
track = rtc.LocalAudioTrack.create_audio_track(self._track_name, self._audio_source)
Expand Down Expand Up @@ -97,7 +104,22 @@ async def capture_frame(self, frame: rtc.AudioFrame) -> None:
await self._flush_task

for f in self._audio_bstream.push(frame.data):
if self._pushed_duration == 0:
self._first_frame = f
self._first_frame_fut = asyncio.Future[None]()

await self._audio_buf.send(f)

# wait for the first frame to be captured
if self._first_frame and self._first_frame_fut:
try:
await self._first_frame_fut
except _InterruptedError:
continue
finally:
self._first_frame = None
self._first_frame_fut = None

self._pushed_duration += f.duration

def flush(self) -> None:
Expand Down Expand Up @@ -167,6 +189,10 @@ async def _wait_buffered_audio() -> None:

self._pushed_duration = 0
self._interrupted_event.clear()
if self._first_frame_fut and not self._first_frame_fut.done():
self._first_frame_fut.set_exception(_InterruptedError())
self._first_frame = None
self._first_frame_fut = None
self.on_playback_finished(playback_position=pushed_duration, interrupted=interrupted)

async def _forward_audio(self) -> None:
Expand All @@ -177,13 +203,21 @@ async def _forward_audio(self) -> None:
# TODO(long): save the frames in the queue and play them later
# TODO(long): ignore frames from previous syllable

if self._interrupted_event.is_set() or self._pushed_duration == 0:
if self._interrupted_event.is_set() or (
self._pushed_duration == 0 and not self._first_frame
):
if self._interrupted_event.is_set() and self._flush_task:
await self._flush_task

# ignore frames if interrupted
continue

if self._first_frame and self._first_frame_fut and not self._first_frame_fut.done():
if frame is self._first_frame:
self._first_frame_fut.set_result(None)
self._first_frame = None
self._first_frame_fut = None

await self._audio_source.capture_frame(frame)

def _on_reconnected(self) -> None:
Expand Down