Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 80 additions & 5 deletions agents-core/vision_agents/core/agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ def __init__(
self.conversation: Optional[Conversation] = None
self._user_conversation_handle: Optional[StreamHandle] = None
self._agent_conversation_handle: Optional[StreamHandle] = None

# Track pending transcripts for turn-based response triggering
self._pending_user_transcripts: Dict[str, str] = {}

# Merge plugin events BEFORE subscribing to any events
for plugin in [stt, tts, turn_detection, vad, llm]:
Expand Down Expand Up @@ -663,16 +666,56 @@ async def _process_track(self, track_id: str, track_type: str, participant):

async def _on_turn_event(self, event: TurnStartedEvent | TurnEndedEvent) -> None:
"""Handle turn detection events."""
# In realtime mode, the LLM handles turn detection, interruption, and responses itself
if self.realtime_mode:
return

if isinstance(event, TurnStartedEvent):
# TODO: Implement TTS pause/resume functionality
# For now, TTS will continue playing - this should be improved
self.logger.info(
f"👉 Turn started - participant speaking {event.speaker_id} : {event.confidence}"
)
# Interrupt TTS when user starts speaking (barge-in)
if event.speaker_id and event.speaker_id != self.agent_user.id:
if self.tts:
self.logger.info(
f"👉 Turn started - interrupting TTS for participant {event.speaker_id}"
)
try:
await self.tts.stop_audio()
except Exception as e:
self.logger.error(f"Error stopping TTS: {e}")
else:
self.logger.info(
f"👉 Turn started - participant speaking {event.speaker_id} : {event.confidence}"
)
else:
# Agent itself started speaking - this is normal
self.logger.debug(
f"👉 Turn started - agent speaking {event.speaker_id}"
)
elif isinstance(event, TurnEndedEvent):
self.logger.info(
f"👉 Turn ended - participant {event.speaker_id} finished (duration: {event.duration}, confidence: {event.confidence})"
)

# When turn detection is enabled, trigger LLM response when user's turn ends
# This is the signal that the user has finished speaking and expects a response
if event.speaker_id and event.speaker_id != self.agent_user.id:
# Get the accumulated transcript for this speaker
transcript = self._pending_user_transcripts.get(event.speaker_id, "")

if transcript and transcript.strip():
self.logger.info(f"🤖 Triggering LLM response after turn ended for {event.speaker_id}")

# Create participant object if we have metadata
participant = None
if hasattr(event, 'custom') and event.custom:
# Try to extract participant info from custom metadata
participant = event.custom.get('participant')

# Trigger LLM response with the complete transcript
if self.llm:
await self.simple_response(transcript, participant)

# Clear the pending transcript for this speaker
self._pending_user_transcripts[event.speaker_id] = ""
Comment on lines +699 to +718
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't assume transcripts arrive before TurnEnded.

Turn detection emits TurnEndedEvent as soon as silence is detected, but STT final transcripts can land a beat later (we call turn_detection.process_audio before stt.process_audio). When that happens, Line 701 sees an empty transcript, we skip simple_response, and nothing ever re-triggers once the transcript finally shows up in _on_transcript. Result: user turns are silently dropped whenever STT lags behind turn detection—major functional regression.

Please track “turn ended but transcript pending” state. One way:

@@
-        self._pending_user_transcripts: Dict[str, str] = {}
+        self._pending_user_transcripts: Dict[str, str] = {}
+        self._pending_turn_completions: set[str] = set()
@@ def _on_turn_event(...):
-            if event.speaker_id and event.speaker_id != self.agent_user.id:
+            if event.speaker_id and event.speaker_id != self.agent_user.id:
                 transcript = self._pending_user_transcripts.get(event.speaker_id, "")
 
-                if transcript and transcript.strip():
+                if transcript and transcript.strip():
                     ...
                     self._pending_user_transcripts[event.speaker_id] = ""
+                    self._pending_turn_completions.discard(event.speaker_id)
+                else:
+                    self._pending_turn_completions.add(event.speaker_id)
@@ def _on_transcript(...):
-            if user_id not in self._pending_user_transcripts:
+            if user_id not in self._pending_user_transcripts:
                 ...
             else:
                 ...
 
+            if user_id in getattr(self, "_pending_turn_completions", set()):
+                participant = getattr(event, "user_metadata", None)
+                await self.simple_response(self._pending_user_transcripts[user_id], participant)
+                self._pending_user_transcripts[user_id] = ""
+                self._pending_turn_completions.discard(user_id)

Any equivalent solution that ensures a late-arriving transcript still fires the LLM response works.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In agents-core/vision_agents/core/agents/agents.py around lines 699–718, the
TurnEnded handling assumes the final STT transcript is already available and
skips triggering the LLM when the transcript arrives late; fix by recording that
a turn ended for this speaker when TurnEnded is received (e.g., add speaker_id
to a pending_turns set or mark a flag alongside the empty transcript) and then,
in the transcript arrival path (_on_transcript or wherever transcripts are
written to _pending_user_transcripts), check for that pending-turn-ended marker
and if present call simple_response(transcript, participant) and clear both the
pending marker and the stored transcript; ensure you still call simple_response
immediately when TurnEnded sees a non-empty transcript and avoid double-calling
by clearing the marker after handling.


async def _on_partial_transcript(
self, event: STTPartialTranscriptEvent | RealtimePartialTranscriptEvent
Expand Down Expand Up @@ -727,6 +770,38 @@ async def _on_transcript(self, event: STTTranscriptEvent | RealtimeTranscriptEve
)
self.conversation.complete_message(self._user_conversation_handle)
self._user_conversation_handle = None

# In realtime mode, the LLM handles everything itself (STT, turn detection, responses)
# Skip our manual LLM triggering logic
if self.realtime_mode:
return

# Determine how to handle LLM triggering based on turn detection
if self.turn_detection is not None:
# With turn detection: accumulate transcripts and wait for TurnEndedEvent
# Store/append the transcript for this user
if user_id not in self._pending_user_transcripts:
self._pending_user_transcripts[user_id] = event.text
else:
# Append to existing transcript (user might be speaking in chunks)
self._pending_user_transcripts[user_id] += " " + event.text

self.logger.debug(
f"📝 Accumulated transcript for {user_id} (waiting for turn end): "
f"{self._pending_user_transcripts[user_id][:100]}..."
)
else:
# Without turn detection: trigger LLM immediately on transcript completion
# This is the traditional STT -> LLM flow
if self.llm:
self.logger.info("🤖 Triggering LLM response immediately (no turn detection)")

# Get participant from event metadata
participant = None
if hasattr(event, "user_metadata"):
participant = event.user_metadata

await self.simple_response(event.text, participant)

async def _on_stt_error(self, error):
"""Handle STT service errors."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,13 @@ async def _process_turn_prediction(
f"Turn completed detected for user {user_id} (confidence: {probability:.3f})"
)

# If this user was speaking, emit turn ended
if self._current_speaker == user_id:
self._emit_turn_event(TurnEvent.TURN_ENDED, event_data)
self._current_speaker = None
# User finished speaking - emit turn ended
# Set them as current speaker if they weren't already (in case we missed the start)
if self._current_speaker != user_id:
self._current_speaker = user_id

self._emit_turn_event(TurnEvent.TURN_ENDED, event_data)
self._current_speaker = None

else:
# Turn is still in progress
Expand Down
31 changes: 11 additions & 20 deletions examples/01_simple_agent_example/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,22 @@ requires-python = ">=3.13"
# put only what this example needs
dependencies = [
"python-dotenv>=1.0",
"stream-agents-plugins-deepgram",
"stream-agents-plugins-elevenlabs",
"stream-agents-plugins-anthropic",
"stream-agents-plugins-getstream",
"vision-agents-plugins-deepgram",
"vision-agents-plugins-elevenlabs",
"vision-agents-plugins-anthropic",
"vision-agents-plugins-getstream",
"getstream-plugins-common",
"stream-agents",
"vision-agents",
"openai>=1.101.0",
"krisp-audio>=1.4.0; sys_platform == 'darwin' and platform_machine == 'aarch64'",
"krisp-audio>=1.4.0; sys_platform == 'win32'",
"krisp-audio>=1.4.0; sys_platform == 'linux' and platform_machine == 'x86_64'",
"krisp-audio>=1.4.0; sys_platform == 'linux' and platform_machine == 'aarch64'",
"anthropic>=0.66.0",
"google-genai>=1.33.0",
"fal-client>=0.5.3",
]

[tool.uv.sources]
krisp-audio = [
{ path = "../../agents-core/stream_agents/core/turn_detection/krisp/krisp_audio-1.4.0-cp313-cp313-macosx_12_0_arm64.whl", marker = "sys_platform == 'darwin' and platform_machine == 'aarch64'" },
{ path = "../../agents-core/stream_agents/core/turn_detection/krisp/krisp_audio-1.4.0-cp313-cp313-linux_aarch64.whl", marker = "sys_platform == 'linux' and platform_machine == 'aarch64'" },
{ path = "../../agents-core/stream_agents/core/turn_detection/krisp/krisp_audio-1.4.0-cp313-cp313-linux_x86_64.whl", marker = "sys_platform == 'linux' and platform_machine == 'x86_64'" },
{ path = "../../agents-core/stream_agents/core/turn_detection/krisp/krisp_audio-1.4.0-cp313-cp313-win_amd64.whl", marker = "sys_platform == 'win32'" }
]
"stream-agents-plugins-deepgram" = {path = "../../plugins/deepgram", editable=true}
"stream-agents-plugins-elevenlabs" = {path = "../../plugins/elevenlabs", editable=true}
"stream-agents-plugins-anthropic" = {path = "../../plugins/anthropic", editable=true}
"stream-agents-plugins-getstream" = {path = "../../plugins/getstream", editable=true}
"vision-agents-plugins-deepgram" = {path = "../../plugins/deepgram", editable=true}
"vision-agents-plugins-elevenlabs" = {path = "../../plugins/elevenlabs", editable=true}
"vision-agents-plugins-anthropic" = {path = "../../plugins/anthropic", editable=true}
"vision-agents-plugins-getstream" = {path = "../../plugins/getstream", editable=true}

"stream-agents" = {path = "../../agents-core", editable=true}
"vision-agents" = {path = "../../agents-core", editable=true}
2 changes: 2 additions & 0 deletions examples/01_simple_agent_example/simple_agent_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from vision_agents.plugins import elevenlabs, deepgram, openai, getstream
from vision_agents.core import agents, cli
from vision_agents.core.events import CallSessionParticipantJoinedEvent
from vision_agents.core.turn_detection import FalTurnDetection

logging.basicConfig(
level=logging.INFO,
Expand Down Expand Up @@ -37,6 +38,7 @@ async def start_agent() -> None:
llm=llm,
tts=elevenlabs.TTS(),
stt=deepgram.STT(),
turn_detection=FalTurnDetection(buffer_duration=2.0, confidence_threshold=0.5), # Enable turn detection with FAL
#vad=silero.VAD(),
# realtime version (vad, tts and stt not needed)
# llm=openai.Realtime()
Expand Down
Loading
Loading