From c09e3608ec0ba8a951a9fc7fffab39d84839e405 Mon Sep 17 00:00:00 2001 From: Tommaso Barbugli Date: Mon, 20 Oct 2025 16:09:09 +0200 Subject: [PATCH 1/2] ensure user agent is initialized before joining the call --- .../vision_agents/core/agents/agents.py | 152 +++++++------ .../vision_agents/core/events/manager.py | 210 ++++++++++-------- .../simple_agent_example.py | 30 +-- .../golf_coach_example.py | 16 +- .../gemini_realtime_github_mcp_demo.py | 8 +- .../gemini_live_example.py | 13 +- 6 files changed, 244 insertions(+), 185 deletions(-) diff --git a/agents-core/vision_agents/core/agents/agents.py b/agents-core/vision_agents/core/agents/agents.py index bdfba57e..babe3728 100644 --- a/agents-core/vision_agents/core/agents/agents.py +++ b/agents-core/vision_agents/core/agents/agents.py @@ -50,6 +50,7 @@ def _log_task_exception(task: asyncio.Task): except Exception: logger.exception("Error in background task") + class Agent: """ Agent class makes it easy to build your own video AI. @@ -102,6 +103,7 @@ def __init__( self.instructions = instructions self.edge = edge self.agent_user = agent_user + self._agent_user_initialized = False # only needed in case we spin threads self._root_span = trace.get_current_span() @@ -124,7 +126,11 @@ def __init__( self._call_context_token: CallContextToken | None = None # Initialize MCP manager if servers are provided - self.mcp_manager = MCPManager(self.mcp_servers, self.llm, self.logger) if self.mcp_servers else None + self.mcp_manager = ( + MCPManager(self.mcp_servers, self.llm, self.logger) + if self.mcp_servers + else None + ) # we sync the user talking and the agent responses to the conversation # because we want to support streaming responses and can have delta updates for both @@ -132,7 +138,7 @@ def __init__( self.conversation: Optional[Conversation] = None self._user_conversation_handle: Optional[StreamHandle] = None self._agent_conversation_handle: Optional[StreamHandle] = None - + # Track pending transcripts for turn-based response triggering self._pending_user_transcripts: Dict[str, str] = {} @@ -153,7 +159,7 @@ def __init__( self._current_frame = None self._interval_task = None self._callback_executed = False - self._track_tasks : Dict[str, asyncio.Task] = {} + self._track_tasks: Dict[str, asyncio.Task] = {} self._connection: Optional[Connection] = None self._audio_track: Optional[aiortc.AudioStreamTrack] = None self._video_track: Optional[VideoStreamTrack] = None @@ -194,8 +200,9 @@ def subscribe(self, function): """ return self.events.subscribe(function) - async def join(self, call: Call) -> "AgentSessionContextManager": + await self.create_user() + # TODO: validation. join can only be called once with self.tracer.start_as_current_span("join"): if self._is_running: @@ -311,9 +318,9 @@ async def close(self): for processor in self.processors: processor.close() - + # Stop all video forwarders - if hasattr(self, '_video_forwarders'): + if hasattr(self, "_video_forwarders"): for forwarder in self._video_forwarders: try: await forwarder.stop() @@ -382,16 +389,18 @@ def clear_call_logging_context(self) -> None: clear_call_context(self._call_context_token) self._call_context_token = None - async def create_user(self): - """Create the agent user in the edge provider, if required. + async def create_user(self) -> None: + """Create the agent user in the edge provider, if required.""" + + if self._agent_user_initialized: + return None - Returns: - Provider-specific user creation response. - """ with self.tracer.start_as_current_span("edge.create_user"): - if self.agent_user.id == "": - self.agent_user.id = str(uuid4()) - return await self.edge.create_user(self.agent_user) + if not self.agent_user.id: + self.agent_user.id = f"agent-{uuid4()}" + await self.edge.create_user(self.agent_user) + + return None async def _handle_output_text_delta(self, event: LLMResponseChunkEvent): """Handle partial LLM response text deltas.""" @@ -499,23 +508,30 @@ async def _on_agent_say(self, event: events.AgentSayEvent): ) self.logger.error(f"Error in agent say: {e}") - async def say(self, text: str, user_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None): + async def say( + self, + text: str, + user_id: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ): """ Make the agent say something using TTS. - + This is a convenience method that sends an AgentSayEvent to trigger TTS synthesis. - + Args: text: The text for the agent to say user_id: Optional user ID for the speech metadata: Optional metadata to include with the speech """ - self.events.send(events.AgentSayEvent( - plugin_name="agent", - text=text, - user_id=user_id or self.agent_user.id, - metadata=metadata - )) + self.events.send( + events.AgentSayEvent( + plugin_name="agent", + text=text, + user_id=user_id or self.agent_user.id, + metadata=metadata, + ) + ) def _setup_turn_detection(self): if self.turn_detection: @@ -571,12 +587,11 @@ async def _reply_to_audio( continue await processor.process_audio(audio_bytes, participant.user_id) - # when in Realtime mode call the Realtime directly (non-blocking) if self.realtime_mode and isinstance(self.llm, Realtime): # TODO: this behaviour should be easy to change in the agent class asyncio.create_task(self.llm.simple_audio_response(pcm_data)) - #task.add_done_callback(lambda t: print(f"Task (send_audio_pcm) error: {t.exception()}")) + # task.add_done_callback(lambda t: print(f"Task (send_audio_pcm) error: {t.exception()}")) # Process audio through STT elif self.stt: self.logger.debug(f"🎵 Processing audio from {participant}") @@ -591,14 +606,12 @@ async def _process_track(self, track_id: str, track_type: int, participant): # subscribe to the video track track = self.edge.add_track_subscriber(track_id) if not track: - self.logger.error( - f"Failed to subscribe to {track_id}" - ) + self.logger.error(f"Failed to subscribe to {track_id}") return # Import VideoForwarder from ..utils.video_forwarder import VideoForwarder - + # Create a SHARED VideoForwarder for the RAW incoming track # This prevents multiple recv() calls competing on the same track raw_forwarder = VideoForwarder( @@ -609,9 +622,9 @@ async def _process_track(self, track_id: str, track_type: int, participant): ) await raw_forwarder.start() self.logger.info("🎥 Created raw VideoForwarder for track %s", track_id) - + # Track forwarders for cleanup - if not hasattr(self, '_video_forwarders'): + if not hasattr(self, "_video_forwarders"): self._video_forwarders = [] self._video_forwarders.append(raw_forwarder) @@ -620,7 +633,9 @@ async def _process_track(self, track_id: str, track_type: int, participant): if self._video_track: # We have a video publisher (e.g., YOLO processor) # Create a separate forwarder for the PROCESSED video track - self.logger.info("🎥 Forwarding PROCESSED video frames to Realtime provider") + self.logger.info( + "🎥 Forwarding PROCESSED video frames to Realtime provider" + ) processed_forwarder = VideoForwarder( self._video_track, # type: ignore[arg-type] max_buffer=30, @@ -629,23 +644,28 @@ async def _process_track(self, track_id: str, track_type: int, participant): ) await processed_forwarder.start() self._video_forwarders.append(processed_forwarder) - + if isinstance(self.llm, Realtime): # Send PROCESSED frames with the processed forwarder - await self.llm._watch_video_track(self._video_track, shared_forwarder=processed_forwarder) + await self.llm._watch_video_track( + self._video_track, shared_forwarder=processed_forwarder + ) else: # No video publisher, send raw frames self.logger.info("🎥 Forwarding RAW video frames to Realtime provider") if isinstance(self.llm, Realtime): - await self.llm._watch_video_track(track, shared_forwarder=raw_forwarder) - + await self.llm._watch_video_track( + track, shared_forwarder=raw_forwarder + ) hasImageProcessers = len(self.image_processors) > 0 # video processors - pass the raw forwarder (they process incoming frames) for processor in self.video_processors: try: - await processor.process_video(track, participant.user_id, shared_forwarder=raw_forwarder) + await processor.process_video( + track, participant.user_id, shared_forwarder=raw_forwarder + ) except Exception as e: self.logger.error( f"Error in video processor {type(processor).__name__}: {e}" @@ -654,13 +674,15 @@ async def _process_track(self, track_id: str, track_type: int, participant): # Use raw forwarder for image processors - only if there are image processors if not hasImageProcessers: # No image processors, just keep the connection alive - self.logger.info("No image processors, video processing handled by video processors only") + self.logger.info( + "No image processors, video processing handled by video processors only" + ) return - + # Initialize error tracking counters timeout_errors = 0 consecutive_errors = 0 - + while True: try: # Use the raw forwarder instead of competing for track.recv() @@ -672,7 +694,6 @@ async def _process_track(self, track_id: str, track_type: int, participant): consecutive_errors = 0 if hasImageProcessers: - img = video_frame.to_image() for processor in self.image_processors: @@ -683,7 +704,6 @@ async def _process_track(self, track_id: str, track_type: int, participant): f"Error in image processor {type(processor).__name__}: {e}" ) - else: self.logger.warning("🎥VDP: Received empty frame") consecutive_errors += 1 @@ -698,14 +718,16 @@ async def _process_track(self, track_id: str, track_type: int, participant): await asyncio.sleep(backoff_delay) # Cleanup and logging - self.logger.info(f"🎥VDP: Video processing loop ended for track {track_id} - timeouts: {timeout_errors}, consecutive_errors: {consecutive_errors}") + self.logger.info( + f"🎥VDP: Video processing loop ended for track {track_id} - timeouts: {timeout_errors}, consecutive_errors: {consecutive_errors}" + ) async def _on_turn_event(self, event: TurnStartedEvent | TurnEndedEvent) -> None: """Handle turn detection events.""" # In realtime mode, the LLM handles turn detection, interruption, and responses itself if self.realtime_mode: return - + if isinstance(event, TurnStartedEvent): # Interrupt TTS when user starts speaking (barge-in) if event.speaker_id and event.speaker_id != self.agent_user.id: @@ -730,26 +752,28 @@ async def _on_turn_event(self, event: TurnStartedEvent | TurnEndedEvent) -> None self.logger.info( f"👉 Turn ended - participant {event.speaker_id} finished (duration: {event.duration}, confidence: {event.confidence})" ) - + # When turn detection is enabled, trigger LLM response when user's turn ends # This is the signal that the user has finished speaking and expects a response if event.speaker_id and event.speaker_id != self.agent_user.id: # Get the accumulated transcript for this speaker transcript = self._pending_user_transcripts.get(event.speaker_id, "") - + if transcript and transcript.strip(): - self.logger.info(f"🀖 Triggering LLM response after turn ended for {event.speaker_id}") - + self.logger.info( + f"🀖 Triggering LLM response after turn ended for {event.speaker_id}" + ) + # Create participant object if we have metadata participant = None - if hasattr(event, 'custom') and event.custom: + if hasattr(event, "custom") and event.custom: # Try to extract participant info from custom metadata - participant = event.custom.get('participant') - + participant = event.custom.get("participant") + # Trigger LLM response with the complete transcript if self.llm: await self.simple_response(transcript, participant) - + # Clear the pending transcript for this speaker self._pending_user_transcripts[event.speaker_id] = "" @@ -806,12 +830,12 @@ async def _on_transcript(self, event: STTTranscriptEvent | RealtimeTranscriptEve ) self.conversation.complete_message(self._user_conversation_handle) self._user_conversation_handle = None - + # In realtime mode, the LLM handles everything itself (STT, turn detection, responses) # Skip our manual LLM triggering logic if self.realtime_mode: return - + # Determine how to handle LLM triggering based on turn detection if self.turn_detection is not None: # With turn detection: accumulate transcripts and wait for TurnEndedEvent @@ -821,7 +845,7 @@ async def _on_transcript(self, event: STTTranscriptEvent | RealtimeTranscriptEve else: # Append to existing transcript (user might be speaking in chunks) self._pending_user_transcripts[user_id] += " " + event.text - + self.logger.debug( f"📝 Accumulated transcript for {user_id} (waiting for turn end): " f"{self._pending_user_transcripts[user_id][:100]}..." @@ -830,21 +854,21 @@ async def _on_transcript(self, event: STTTranscriptEvent | RealtimeTranscriptEve # Without turn detection: trigger LLM immediately on transcript completion # This is the traditional STT -> LLM flow if self.llm: - self.logger.info("🀖 Triggering LLM response immediately (no turn detection)") - + self.logger.info( + "🀖 Triggering LLM response immediately (no turn detection)" + ) + # Get participant from event metadata participant = None if hasattr(event, "user_metadata"): participant = event.user_metadata - + await self.simple_response(event.text, participant) async def _on_stt_error(self, error): """Handle STT service errors.""" self.logger.error(f"❌ STT Error: {error}") - - @property def realtime_mode(self) -> bool: """Check if the agent is in Realtime mode. @@ -869,8 +893,7 @@ def publish_audio(self) -> bool: @property def publish_video(self) -> bool: - """Whether the agent should publish an outbound video track. - """ + """Whether the agent should publish an outbound video track.""" return len(self.video_publishers) > 0 def _needs_audio_or_video_input(self) -> bool: @@ -1000,7 +1023,9 @@ def _prepare_rtc(self): else: framerate = 48000 stereo = True # Default to stereo for WebRTC - self._audio_track = self.edge.create_audio_track(framerate=framerate, stereo=stereo) + self._audio_track = self.edge.create_audio_track( + framerate=framerate, stereo=stereo + ) if self.tts: self.tts.set_output_track(self._audio_track) @@ -1012,7 +1037,6 @@ def _prepare_rtc(self): self._video_track = video_publisher.publish_video_track() self.logger.info("🎥 Video track initialized from video publisher") - def _truncate_for_logging(self, obj, max_length=200): """Truncate object string representation for logging to prevent spam.""" obj_str = str(obj) diff --git a/agents-core/vision_agents/core/events/manager.py b/agents-core/vision_agents/core/events/manager.py index 8e9b3669..8aa0e621 100644 --- a/agents-core/vision_agents/core/events/manager.py +++ b/agents-core/vision_agents/core/events/manager.py @@ -21,47 +21,46 @@ def _truncate_event_for_logging(event, max_length=200): """ Truncate event data for logging to prevent log spam. - + Args: event: The event object to truncate max_length: Maximum length of the string representation - + Returns: Truncated string representation of the event """ event_str = str(event) - + # Special handling for audio data arrays - if hasattr(event, 'pcm_data') and hasattr(event.pcm_data, 'samples'): + if hasattr(event, "pcm_data") and hasattr(event.pcm_data, "samples"): # Replace the full array with a summary samples = event.pcm_data.samples array_summary = f"array([{samples[0]}, {samples[1]}, ..., {samples[-1]}], dtype={samples.dtype}, size={len(samples)})" event_str = event_str.replace(str(samples), array_summary) - + # If the event is still too long, truncate it if len(event_str) > max_length: # Find a good truncation point (end of a field) truncate_at = max_length - 20 # Leave room for "... (truncated)" - while truncate_at > 0 and event_str[truncate_at] not in [',', ')', '}']: + while truncate_at > 0 and event_str[truncate_at] not in [",", ")", "}"]: truncate_at -= 1 - + if truncate_at > 0: event_str = event_str[:truncate_at] + "... (truncated)" else: - event_str = event_str[:max_length-20] + "... (truncated)" - - return event_str + event_str = event_str[: max_length - 20] + "... (truncated)" + return event_str class EventManager: """ A comprehensive event management system for handling asynchronous event-driven communication. - + The EventManager provides a centralized way to register events, subscribe handlers, and process events asynchronously. It supports event queuing, error handling, and automatic exception event generation. - + Features: - Event registration and validation - Handler subscription with type hints @@ -69,42 +68,42 @@ class EventManager: - Error handling with automatic exception events - Support for Union types in handlers - Event queuing and batch processing - + Example: ```python from vision_agents.core.events.manager import EventManager from vision_agents.core.vad.events import VADSpeechStartEvent, VADSpeechEndEvent from vision_agents.core.stt.events import STTTranscriptEvent from vision_agents.core.tts.events import TTSAudioEvent - + # Create event manager manager = EventManager() - + # Register events manager.register(VADSpeechStartEvent) manager.register(VADSpeechEndEvent) manager.register(STTTranscriptEvent) manager.register(TTSAudioEvent) - + # Subscribe to VAD events @manager.subscribe async def handle_speech_start(event: VADSpeechStartEvent): print(f"Speech started with probability {event.speech_probability}") - + @manager.subscribe async def handle_speech_end(event: VADSpeechEndEvent): print(f"Speech ended after {event.total_speech_duration_ms}ms") - + # Subscribe to STT events @manager.subscribe async def handle_transcript(event: STTTranscriptEvent): print(f"Transcript: {event.text} (confidence: {event.confidence})") - + # Subscribe to multiple event types using Union @manager.subscribe async def handle_audio_events(event: VADSpeechStartEvent | VADSpeechEndEvent): print(f"VAD event: {event.type}") - + # Send events manager.send(VADSpeechStartEvent( plugin_name="silero", @@ -116,20 +115,20 @@ async def handle_audio_events(event: VADSpeechStartEvent | VADSpeechEndEvent): text="Hello world", confidence=0.98 )) - + # Before shutdown, ensure all events are processed await manager.shutdown() ``` - + Args: ignore_unknown_events (bool): If True, unknown events are ignored rather than raising errors. Defaults to True. """ - + def __init__(self, ignore_unknown_events: bool = True): """ Initialize the EventManager. - + Args: ignore_unknown_events (bool): If True, unknown events are ignored rather than raising errors. Defaults to True. @@ -149,51 +148,55 @@ def __init__(self, ignore_unknown_events: bool = True): self.register(ConnectionOkEvent) self.register(ConnectionErrorEvent) self.register(ConnectionClosedEvent) - + # Start background processing task self._start_processing_task() def register(self, event_class, ignore_not_compatible=False): """ Register an event class for use with the event manager. - + Event classes must: - Have a name ending with 'Event' - Have a 'type' attribute (string) - + Example: ```python from vision_agents.core.vad.events import VADSpeechStartEvent from vision_agents.core.stt.events import STTTranscriptEvent - + manager = EventManager() manager.register(VADSpeechStartEvent) manager.register(STTTranscriptEvent) ``` - + Args: event_class: The event class to register ignore_not_compatible (bool): If True, log warning instead of raising error for incompatible classes. Defaults to False. - + Raises: ValueError: If event_class doesn't meet requirements and ignore_not_compatible is False """ - if event_class.__name__.endswith('Event') and hasattr(event_class, 'type'): + if event_class.__name__.endswith("Event") and hasattr(event_class, "type"): self._events[event_class.type] = event_class logger.debug(f"Registered new event {event_class} - {event_class.type}") - elif event_class.__name__.endswith('BaseEvent'): + elif event_class.__name__.endswith("BaseEvent"): return elif not ignore_not_compatible: - raise ValueError(f"Provide valid class that ends on '*Event' and 'type' attribute: {event_class}") + raise ValueError( + f"Provide valid class that ends on '*Event' and 'type' attribute: {event_class}" + ) else: - logger.warning(f"Provide valid class that ends on '*Event' and 'type' attribute: {event_class}") + logger.warning( + f"Provide valid class that ends on '*Event' and 'type' attribute: {event_class}" + ) - def merge(self, em: 'EventManager'): + def merge(self, em: "EventManager"): # Stop the processing task in the merged manager if em._processing_task and not em._processing_task.done(): em._processing_task.cancel() - + # Merge all data from the other manager self._events.update(em._events) self._modules.update(em._modules) @@ -211,29 +214,31 @@ def merge(self, em: 'EventManager'): em._silent_events = self._silent_events em._processing_task = None # Clear the stopped task reference - def register_events_from_module(self, module, prefix='', ignore_not_compatible=True): + def register_events_from_module( + self, module, prefix="", ignore_not_compatible=True + ): """ Register all event classes from a module. - + Automatically discovers and registers all classes in a module that: - Have names ending with 'Event' - Have a 'type' attribute (optionally matching the prefix) - + Example: ```python # Register all VAD events from the core module from vision_agents.core import vad manager.register_events_from_module(vad.events, prefix="plugin.vad") - + # Register all TTS events from the core module from vision_agents.core import tts manager.register_events_from_module(tts.events, prefix="plugin.tts") - + # Register all events from a plugin module from vision_agents.plugins.silero import events as silero_events manager.register_events_from_module(silero_events, prefix="plugin.silero") ``` - + Args: module: The Python module to scan for event classes prefix (str): Optional prefix to filter event types. Only events with @@ -242,7 +247,9 @@ def register_events_from_module(self, module, prefix='', ignore_not_compatible=T for incompatible classes. Defaults to True. """ for name, class_ in module.__dict__.items(): - if name.endswith('Event') and (not prefix or getattr(class_, 'type', '').startswith(prefix)): + if name.endswith("Event") and ( + not prefix or getattr(class_, "type", "").startswith(prefix) + ): self.register(class_, ignore_not_compatible=ignore_not_compatible) self._modules.setdefault(module.__name__, []).append(class_) @@ -265,20 +272,20 @@ def _generate_import_file(self): def unsubscribe(self, function): """ Unsubscribe a function from all event types. - + Removes the specified function from all event handler lists. This is useful for cleaning up handlers that are no longer needed. - + Example: ```python @manager.subscribe async def speech_handler(event: VADSpeechStartEvent): print("Speech started") - + # Later, unsubscribe the handler manager.unsubscribe(speech_handler) ``` - + Args: function: The function to unsubscribe from all event types. """ @@ -292,30 +299,30 @@ async def speech_handler(event: VADSpeechStartEvent): def subscribe(self, function): """ Subscribe a function to handle specific event types. - + The function must have type hints indicating which event types it handles. Supports both single event types and Union types for handling multiple event types. - + Example: ```python # Single event type @manager.subscribe async def handle_speech_start(event: VADSpeechStartEvent): print(f"Speech started with probability {event.speech_probability}") - + # Multiple event types using Union @manager.subscribe async def handle_audio_events(event: VADSpeechStartEvent | VADSpeechEndEvent): print(f"VAD event: {event.type}") ``` - + Args: function: The async function to subscribe as an event handler. Must have type hints for event parameters. - + Returns: The decorated function (for use as decorator). - + Raises: RuntimeError: If handler has multiple separate event parameters (use Union instead) KeyError: If event type is not registered and ignore_unknown_events is False @@ -338,44 +345,57 @@ async def handle_audio_events(event: VADSpeechStartEvent | VADSpeechEndEvent): event_type = getattr(sub_event, "type", None) if subscribed and not is_union: - raise RuntimeError("Multiple seperated events per handler are not supported, use Union instead") + raise RuntimeError( + "Multiple seperated events per handler are not supported, use Union instead" + ) if event_type in self._events: subscribed = True self._handlers.setdefault(event_type, []).append(function) - module_name = getattr(function, '__module__', 'unknown') - logger.info(f"Handler {function.__name__} from {module_name} registered for event {event_type}") + module_name = getattr(function, "__module__", "unknown") + logger.info( + f"Handler {function.__name__} from {module_name} registered for event {event_type}" + ) elif not self._ignore_unknown_events: - raise KeyError(f"Event {sub_event} - {event_type} is not registered.") + raise KeyError( + f"Event {sub_event} - {event_type} is not registered." + ) else: - module_name = getattr(function, '__module__', 'unknown') - logger.info(f"Event {sub_event} - {event_type} is not registered – skipping handler {function.__name__} from {module_name}.") + module_name = getattr(function, "__module__", "unknown") + logger.info( + f"Event {sub_event} - {event_type} is not registered – skipping handler {function.__name__} from {module_name}." + ) return function def _prepare_event(self, event): # Handle dict events - convert to event class if isinstance(event, dict): - event_type = event.get('type', '') + event_type = event.get("type", "") try: event_class = self._events[event_type] event = event_class.from_dict(event, infer_missing=True) # type: ignore[attr-defined] except Exception: logger.exception(f"Can't convert dict {event} to event class, skipping") return - + # Handle raw protobuf messages - wrap in BaseEvent subclass # Check for protobuf DESCRIPTOR but exclude already-wrapped BaseEvent subclasses - elif (hasattr(event, 'DESCRIPTOR') and hasattr(event.DESCRIPTOR, 'full_name') - and not hasattr(event, 'event_id')): # event_id is unique to BaseEvent + elif ( + hasattr(event, "DESCRIPTOR") + and hasattr(event.DESCRIPTOR, "full_name") + and not hasattr(event, "event_id") + ): # event_id is unique to BaseEvent proto_type = event.DESCRIPTOR.full_name - + # Look up the registered event class by protobuf type proto_event_class = self._events.get(proto_type) - if proto_event_class and hasattr(proto_event_class, 'from_proto'): + if proto_event_class and hasattr(proto_event_class, "from_proto"): try: event = proto_event_class.from_proto(event) except Exception: - logger.exception(f"Failed to convert protobuf {proto_type} to event class {proto_event_class}") + logger.exception( + f"Failed to convert protobuf {proto_type} to event class {proto_event_class}" + ) return else: # No matching event class found @@ -384,10 +404,10 @@ def _prepare_event(self, event): return else: raise RuntimeError(f"Protobuf event not registered: {proto_type}") - + # Validate event is registered (handles both BaseEvent and generated protobuf events) - if hasattr(event, 'type') and event.type in self._events: - #logger.info(f"Received event {_truncate_event_for_logging(event)}") + if hasattr(event, "type") and event.type in self._events: + # logger.info(f"Received event {_truncate_event_for_logging(event)}") return event elif self._ignore_unknown_events: logger.info(f"Event not registered {_truncate_event_for_logging(event)}") @@ -397,7 +417,7 @@ def _prepare_event(self, event): def silent(self, event_class): """ Silence logging for an event class from being processed. - + Args: event_class: The event class to silence """ @@ -406,11 +426,11 @@ def silent(self, event_class): def send(self, *events): """ Send one or more events for processing. - + Events are added to the queue and will be processed by the background - processing task. If an event handler raises an exception, an ExceptionEvent + processing task. If an event handler raises an exception, an ExceptionEvent is automatically created and queued for processing. - + Example: ```python # Send single event @@ -419,13 +439,13 @@ def send(self, *events): speech_probability=0.95, activation_threshold=0.5 )) - + # Send multiple events manager.send( VADSpeechStartEvent(plugin_name="silero", speech_probability=0.95), STTTranscriptEvent(plugin_name="deepgram", text="Hello world") ) - + # Send event from dictionary manager.send({ "type": "plugin.vad_speech_start", @@ -433,27 +453,26 @@ def send(self, *events): "speech_probability": 0.95 }) ``` - + Args: *events: One or more event objects or dictionaries to send. Events can be instances of registered event classes or dictionaries with a 'type' field that matches a registered event type. - + Raises: RuntimeError: If event type is not registered and ignore_unknown_events is False """ for event in events: event = self._prepare_event(event) if event: - #logger.info(f"🎯 EventManager.send: {event.__class__.__name__} - {event.type}") self._queue.append(event) - + async def wait(self, timeout: float = 10.0): """ Wait for all queued events to be processed. - + This is useful in tests to ensure events are processed before assertions. - + Args: timeout: Maximum time to wait for processing to complete """ @@ -463,19 +482,19 @@ async def wait(self, timeout: float = 10.0): if self._handler_tasks: await asyncio.wait(list(self._handler_tasks.values())) - + def _start_processing_task(self): """Start the background event processing task.""" if self._processing_task and not self._processing_task.done(): return # Already running - + loop = asyncio.get_running_loop() self._processing_task = loop.create_task(self._process_events_loop()) async def _process_events_loop(self): """ Background task that continuously processes events from the queue. - + This task runs until shutdown is requested and processes all events in the queue. It's shielded from cancellation to ensure all events are processed before shutdown. @@ -488,12 +507,18 @@ async def _process_events_loop(self): await self._process_single_event(event) except asyncio.CancelledError as exc: cancelled_exc = exc - logger.info(f"Event processing task was cancelled, processing remaining events, {len(self._queue)}") + logger.info( + f"Event processing task was cancelled, processing remaining events, {len(self._queue)}" + ) await self._process_single_event(event) elif cancelled_exc: raise cancelled_exc else: - cleanup_ids = set(task_id for task_id, task in self._handler_tasks.items() if task.done()) + cleanup_ids = set( + task_id + for task_id, task in self._handler_tasks.items() + if task.done() + ) for task_id in cleanup_ids: self._handler_tasks.pop(task_id) await asyncio.sleep(0.0001) @@ -503,17 +528,20 @@ async def _run_handler(self, handler, event): return await handler(event) except Exception as exc: self._queue.appendleft(ExceptionEvent(exc, handler)) # type: ignore[arg-type] - module_name = getattr(handler, '__module__', 'unknown') - logger.exception(f"Error calling handler {handler.__name__} from {module_name} for event {event.type}") + module_name = getattr(handler, "__module__", "unknown") + logger.exception( + f"Error calling handler {handler.__name__} from {module_name} for event {event.type}" + ) async def _process_single_event(self, event): """Process a single event.""" for handler in self._handlers.get(event.type, []): - module_name = getattr(handler, '__module__', 'unknown') + module_name = getattr(handler, "__module__", "unknown") if event.type not in self._silent_events: - logger.info(f"Called handler {handler.__name__} from {module_name} for event {event.type}") + logger.info( + f"Called handler {handler.__name__} from {module_name} for event {event.type}" + ) loop = asyncio.get_running_loop() handler_task = loop.create_task(self._run_handler(handler, event)) self._handler_tasks[uuid.uuid4()] = handler_task - diff --git a/examples/01_simple_agent_example/simple_agent_example.py b/examples/01_simple_agent_example/simple_agent_example.py index 3b063e66..e27724a3 100644 --- a/examples/01_simple_agent_example/simple_agent_example.py +++ b/examples/01_simple_agent_example/simple_agent_example.py @@ -7,24 +7,28 @@ load_dotenv() + async def start_agent() -> None: llm = openai.LLM(model="gpt-4o-mini") # create an agent to run with Stream's edge, openAI llm agent = Agent( edge=getstream.Edge(), # low latency edge. clients for React, iOS, Android, RN, Flutter etc. - agent_user=User(name="My happy AI friend", id="agent"), # the user object for the agent (name, image etc) + agent_user=User( + name="My happy AI friend", id="agent" + ), # the user object for the agent (name, image etc) instructions="You're a voice AI assistant. Keep responses short and conversational. Don't use special characters or formatting. Be friendly and helpful.", processors=[], # processors can fetch extra data, check images/audio data or transform video # llm with tts & stt. if you use a realtime (sts capable) llm the tts, stt and vad aren't needed llm=llm, tts=cartesia.TTS(), stt=deepgram.STT(), - turn_detection=smart_turn.TurnDetection(buffer_duration=2.0, confidence_threshold=0.5), # Enable turn detection with FAL/ Smart turn - #vad=silero.VAD(), + turn_detection=smart_turn.TurnDetection( + buffer_duration=2.0, confidence_threshold=0.5 + ), # Enable turn detection with FAL/ Smart turn + # vad=silero.VAD(), # realtime version (vad, tts and stt not needed) # llm=openai.Realtime() ) - await agent.create_user() # Create a call call = agent.edge.client.video.call("default", str(uuid4())) @@ -37,15 +41,15 @@ async def start_agent() -> None: # Example 1: standardized simple response # await agent.llm.simple_response("chat with the user about the weather.") # Example 2: use native openAI create response - # await llm.create_response(input=[ - # { - # "role": "user", - # "content": [ - # {"type": "input_text", "text": "Tell me a short poem about this image"}, - # {"type": "input_image", "image_url": f"https://images.unsplash.com/photo-1757495361144-0c2bfba62b9e?q=80&w=2340&auto=format&fit=crop&ixlib=rb-4.1.0&ixid=M3wxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8fA%3D%3D"}, - # ], - # } - # ],) + # await llm.create_response(input=[ + # { + # "role": "user", + # "content": [ + # {"type": "input_text", "text": "Tell me a short poem about this image"}, + # {"type": "input_image", "image_url": f"https://images.unsplash.com/photo-1757495361144-0c2bfba62b9e?q=80&w=2340&auto=format&fit=crop&ixlib=rb-4.1.0&ixid=M3wxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8fA%3D%3D"}, + # ], + # } + # ],) # run till the call ends await agent.finish() diff --git a/examples/02_golf_coach_example/golf_coach_example.py b/examples/02_golf_coach_example/golf_coach_example.py index bb74f9e4..40ba968c 100644 --- a/examples/02_golf_coach_example/golf_coach_example.py +++ b/examples/02_golf_coach_example/golf_coach_example.py @@ -11,16 +11,16 @@ async def start_agent() -> None: agent = Agent( - edge=getstream.Edge(), # use stream for edge video transport + edge=getstream.Edge(), # use stream for edge video transport agent_user=User(name="AI golf coach"), - instructions="Read @golf_coach.md", # read the golf coach markdown instructions - llm=gemini.Realtime(fps=10), # Careful with FPS can get expensive + instructions="Read @golf_coach.md", # read the golf coach markdown instructions + llm=gemini.Realtime(fps=10), # Careful with FPS can get expensive # llm=openai.Realtime(fps=10), use this to switch to openai - processors=[ultralytics.YOLOPoseProcessor(model_path="yolo11n-pose.pt")], # realtime pose detection with yolo + processors=[ + ultralytics.YOLOPoseProcessor(model_path="yolo11n-pose.pt") + ], # realtime pose detection with yolo ) - await agent.create_user() - # create a call, some other video networks call this a room call = agent.edge.client.video.call("default", str(uuid4())) @@ -28,7 +28,9 @@ async def start_agent() -> None: with await agent.join(call): await agent.edge.open_demo(call) # all LLMs support a simple_response method and a more advanced native method (so you can always use the latest LLM features) - await agent.llm.simple_response(text="Say hi. After the user does their golf swing offer helpful feedback.") + await agent.llm.simple_response( + text="Say hi. After the user does their golf swing offer helpful feedback." + ) # Gemini's native API is available here # agent.llm.send_realtime_input(text="Hello world") await agent.finish() # run till the call ends diff --git a/examples/other_examples/09_github_mcp_demo/gemini_realtime_github_mcp_demo.py b/examples/other_examples/09_github_mcp_demo/gemini_realtime_github_mcp_demo.py index da8315d6..bedee409 100644 --- a/examples/other_examples/09_github_mcp_demo/gemini_realtime_github_mcp_demo.py +++ b/examples/other_examples/09_github_mcp_demo/gemini_realtime_github_mcp_demo.py @@ -56,15 +56,11 @@ async def start_agent(): model="gemini-2.5-flash-native-audio-preview-09-2025", api_key=google_api_key ) - # Create real edge transport and agent user - edge = getstream.Edge() - agent_user = User(name="GitHub AI Assistant", id="github-agent") - # Create agent with GitHub MCP server and Gemini Realtime LLM agent = Agent( - edge=edge, + edge=getstream.Edge(), llm=llm, - agent_user=agent_user, + agent_user=User(name="GitHub AI Assistant", id="github-agent"), instructions="You are a helpful AI assistant with access to GitHub via MCP server. You can help with GitHub operations like creating issues, managing pull requests, searching repositories, and more. Keep responses conversational and helpful. When you need to perform GitHub operations, use the available MCP tools.", processors=[], mcp_servers=[github_server], diff --git a/examples/other_examples/gemini_live_realtime/gemini_live_example.py b/examples/other_examples/gemini_live_realtime/gemini_live_example.py index cf328221..e4084e11 100644 --- a/examples/other_examples/gemini_live_realtime/gemini_live_example.py +++ b/examples/other_examples/gemini_live_realtime/gemini_live_example.py @@ -4,23 +4,28 @@ from dotenv import load_dotenv from getstream import AsyncStream + +from vision_agents.core.edge.types import User from vision_agents.core.agents import Agent from vision_agents.plugins import gemini, getstream load_dotenv() -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s [call_id=%(call_id)s] %(name)s: %(message)s") +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s [call_id=%(call_id)s] %(name)s: %(message)s", +) logger = logging.getLogger(__name__) async def start_agent() -> None: client = AsyncStream() - agent_user = await client.create_user(name="My happy AI friend") - agent = Agent( edge=getstream.Edge(), - agent_user=agent_user, # the user object for the agent (name, image etc) + agent_user=User( + name="My happy AI friend" + ), # the user object for the agent (name, image etc) instructions="Read @voice-agent.md", llm=gemini.Realtime(), processors=[], # processors can fetch extra data, check images/audio data or transform video From 377e759dcede425b7181e5fff22f8f5e94c08f97 Mon Sep 17 00:00:00 2001 From: Tommaso Barbugli Date: Mon, 20 Oct 2025 21:32:29 +0200 Subject: [PATCH 2/2] wip --- agents-core/vision_agents/core/agents/agents.py | 1 + .../09_github_mcp_demo/gemini_realtime_github_mcp_demo.py | 3 --- examples/other_examples/09_github_mcp_demo/github_mcp_demo.py | 3 --- .../09_github_mcp_demo/openai_realtime_github_mcp_demo.py | 3 --- 4 files changed, 1 insertion(+), 9 deletions(-) diff --git a/agents-core/vision_agents/core/agents/agents.py b/agents-core/vision_agents/core/agents/agents.py index 83dfc112..e5c581cd 100644 --- a/agents-core/vision_agents/core/agents/agents.py +++ b/agents-core/vision_agents/core/agents/agents.py @@ -529,6 +529,7 @@ async def create_user(self) -> None: if not self.agent_user.id: self.agent_user.id = f"agent-{uuid4()}" await self.edge.create_user(self.agent_user) + self._agent_user_initialized = True return None diff --git a/examples/other_examples/09_github_mcp_demo/gemini_realtime_github_mcp_demo.py b/examples/other_examples/09_github_mcp_demo/gemini_realtime_github_mcp_demo.py index bedee409..8f1d285a 100644 --- a/examples/other_examples/09_github_mcp_demo/gemini_realtime_github_mcp_demo.py +++ b/examples/other_examples/09_github_mcp_demo/gemini_realtime_github_mcp_demo.py @@ -70,9 +70,6 @@ async def start_agent(): logger.info(f"GitHub server: {github_server}") try: - # Create the agent user - await agent.create_user() - # Set up event handler for when participants join @agent.subscribe async def on_participant_joined(event: CallSessionParticipantJoinedEvent): diff --git a/examples/other_examples/09_github_mcp_demo/github_mcp_demo.py b/examples/other_examples/09_github_mcp_demo/github_mcp_demo.py index 13c488b3..e39e7eac 100644 --- a/examples/other_examples/09_github_mcp_demo/github_mcp_demo.py +++ b/examples/other_examples/09_github_mcp_demo/github_mcp_demo.py @@ -88,9 +88,6 @@ async def start_agent(): ) logger.info("MCP tools are now available to the LLM for function calling!") - # Create the agent user - await agent.create_user() - # Set up event handler for when participants join @agent.subscribe async def on_participant_joined(event: CallSessionParticipantJoinedEvent): diff --git a/examples/other_examples/09_github_mcp_demo/openai_realtime_github_mcp_demo.py b/examples/other_examples/09_github_mcp_demo/openai_realtime_github_mcp_demo.py index 92b10807..ef88c62d 100644 --- a/examples/other_examples/09_github_mcp_demo/openai_realtime_github_mcp_demo.py +++ b/examples/other_examples/09_github_mcp_demo/openai_realtime_github_mcp_demo.py @@ -72,9 +72,6 @@ async def start_agent(): logger.info(f"GitHub server: {github_server}") try: - # Create the agent user - await agent.create_user() - # Set up event handler for when participants join @agent.subscribe async def on_participant_joined(event: CallSessionParticipantJoinedEvent):