diff --git a/agents-core/vision_agents/core/agents/agents.py b/agents-core/vision_agents/core/agents/agents.py index ef341234..e5c581cd 100644 --- a/agents-core/vision_agents/core/agents/agents.py +++ b/agents-core/vision_agents/core/agents/agents.py @@ -105,6 +105,7 @@ def __init__( self.instructions = instructions self.edge = edge self.agent_user = agent_user + self._agent_user_initialized = False # only needed in case we spin threads self._root_span = trace.get_current_span() @@ -329,6 +330,8 @@ async def on_stt_transcript_event_create_response(event: STTTranscriptEvent): await self.simple_response(event.text, event.user_metadata) async def join(self, call: Call) -> "AgentSessionContextManager": + await self.create_user() + # TODO: validation. join can only be called once with self.tracer.start_as_current_span("join"): if self._is_running: @@ -516,16 +519,19 @@ def clear_call_logging_context(self) -> None: clear_call_context(self._call_context_token) self._call_context_token = None - async def create_user(self): - """Create the agent user in the edge provider, if required. + async def create_user(self) -> None: + """Create the agent user in the edge provider, if required.""" + + if self._agent_user_initialized: + return None - Returns: - Provider-specific user creation response. - """ with self.tracer.start_as_current_span("edge.create_user"): - if self.agent_user.id == "": - self.agent_user.id = str(uuid4()) - return await self.edge.create_user(self.agent_user) + if not self.agent_user.id: + self.agent_user.id = f"agent-{uuid4()}" + await self.edge.create_user(self.agent_user) + self._agent_user_initialized = True + + return None def _on_vad_audio(self, event: VADAudioEvent): self.logger.info(f"Vad audio event {self._truncate_for_logging(event)}") @@ -614,7 +620,7 @@ async def say( metadata=metadata, ) ) - # Unified API: simple non-streaming message + if self.conversation is not None: await self.conversation.upsert_message( role="assistant", diff --git a/agents-core/vision_agents/core/events/manager.py b/agents-core/vision_agents/core/events/manager.py index 8e9b3669..8aa0e621 100644 --- a/agents-core/vision_agents/core/events/manager.py +++ b/agents-core/vision_agents/core/events/manager.py @@ -21,47 +21,46 @@ def _truncate_event_for_logging(event, max_length=200): """ Truncate event data for logging to prevent log spam. - + Args: event: The event object to truncate max_length: Maximum length of the string representation - + Returns: Truncated string representation of the event """ event_str = str(event) - + # Special handling for audio data arrays - if hasattr(event, 'pcm_data') and hasattr(event.pcm_data, 'samples'): + if hasattr(event, "pcm_data") and hasattr(event.pcm_data, "samples"): # Replace the full array with a summary samples = event.pcm_data.samples array_summary = f"array([{samples[0]}, {samples[1]}, ..., {samples[-1]}], dtype={samples.dtype}, size={len(samples)})" event_str = event_str.replace(str(samples), array_summary) - + # If the event is still too long, truncate it if len(event_str) > max_length: # Find a good truncation point (end of a field) truncate_at = max_length - 20 # Leave room for "... (truncated)" - while truncate_at > 0 and event_str[truncate_at] not in [',', ')', '}']: + while truncate_at > 0 and event_str[truncate_at] not in [",", ")", "}"]: truncate_at -= 1 - + if truncate_at > 0: event_str = event_str[:truncate_at] + "... (truncated)" else: - event_str = event_str[:max_length-20] + "... (truncated)" - - return event_str + event_str = event_str[: max_length - 20] + "... (truncated)" + return event_str class EventManager: """ A comprehensive event management system for handling asynchronous event-driven communication. - + The EventManager provides a centralized way to register events, subscribe handlers, and process events asynchronously. It supports event queuing, error handling, and automatic exception event generation. - + Features: - Event registration and validation - Handler subscription with type hints @@ -69,42 +68,42 @@ class EventManager: - Error handling with automatic exception events - Support for Union types in handlers - Event queuing and batch processing - + Example: ```python from vision_agents.core.events.manager import EventManager from vision_agents.core.vad.events import VADSpeechStartEvent, VADSpeechEndEvent from vision_agents.core.stt.events import STTTranscriptEvent from vision_agents.core.tts.events import TTSAudioEvent - + # Create event manager manager = EventManager() - + # Register events manager.register(VADSpeechStartEvent) manager.register(VADSpeechEndEvent) manager.register(STTTranscriptEvent) manager.register(TTSAudioEvent) - + # Subscribe to VAD events @manager.subscribe async def handle_speech_start(event: VADSpeechStartEvent): print(f"Speech started with probability {event.speech_probability}") - + @manager.subscribe async def handle_speech_end(event: VADSpeechEndEvent): print(f"Speech ended after {event.total_speech_duration_ms}ms") - + # Subscribe to STT events @manager.subscribe async def handle_transcript(event: STTTranscriptEvent): print(f"Transcript: {event.text} (confidence: {event.confidence})") - + # Subscribe to multiple event types using Union @manager.subscribe async def handle_audio_events(event: VADSpeechStartEvent | VADSpeechEndEvent): print(f"VAD event: {event.type}") - + # Send events manager.send(VADSpeechStartEvent( plugin_name="silero", @@ -116,20 +115,20 @@ async def handle_audio_events(event: VADSpeechStartEvent | VADSpeechEndEvent): text="Hello world", confidence=0.98 )) - + # Before shutdown, ensure all events are processed await manager.shutdown() ``` - + Args: ignore_unknown_events (bool): If True, unknown events are ignored rather than raising errors. Defaults to True. """ - + def __init__(self, ignore_unknown_events: bool = True): """ Initialize the EventManager. - + Args: ignore_unknown_events (bool): If True, unknown events are ignored rather than raising errors. Defaults to True. @@ -149,51 +148,55 @@ def __init__(self, ignore_unknown_events: bool = True): self.register(ConnectionOkEvent) self.register(ConnectionErrorEvent) self.register(ConnectionClosedEvent) - + # Start background processing task self._start_processing_task() def register(self, event_class, ignore_not_compatible=False): """ Register an event class for use with the event manager. - + Event classes must: - Have a name ending with 'Event' - Have a 'type' attribute (string) - + Example: ```python from vision_agents.core.vad.events import VADSpeechStartEvent from vision_agents.core.stt.events import STTTranscriptEvent - + manager = EventManager() manager.register(VADSpeechStartEvent) manager.register(STTTranscriptEvent) ``` - + Args: event_class: The event class to register ignore_not_compatible (bool): If True, log warning instead of raising error for incompatible classes. Defaults to False. - + Raises: ValueError: If event_class doesn't meet requirements and ignore_not_compatible is False """ - if event_class.__name__.endswith('Event') and hasattr(event_class, 'type'): + if event_class.__name__.endswith("Event") and hasattr(event_class, "type"): self._events[event_class.type] = event_class logger.debug(f"Registered new event {event_class} - {event_class.type}") - elif event_class.__name__.endswith('BaseEvent'): + elif event_class.__name__.endswith("BaseEvent"): return elif not ignore_not_compatible: - raise ValueError(f"Provide valid class that ends on '*Event' and 'type' attribute: {event_class}") + raise ValueError( + f"Provide valid class that ends on '*Event' and 'type' attribute: {event_class}" + ) else: - logger.warning(f"Provide valid class that ends on '*Event' and 'type' attribute: {event_class}") + logger.warning( + f"Provide valid class that ends on '*Event' and 'type' attribute: {event_class}" + ) - def merge(self, em: 'EventManager'): + def merge(self, em: "EventManager"): # Stop the processing task in the merged manager if em._processing_task and not em._processing_task.done(): em._processing_task.cancel() - + # Merge all data from the other manager self._events.update(em._events) self._modules.update(em._modules) @@ -211,29 +214,31 @@ def merge(self, em: 'EventManager'): em._silent_events = self._silent_events em._processing_task = None # Clear the stopped task reference - def register_events_from_module(self, module, prefix='', ignore_not_compatible=True): + def register_events_from_module( + self, module, prefix="", ignore_not_compatible=True + ): """ Register all event classes from a module. - + Automatically discovers and registers all classes in a module that: - Have names ending with 'Event' - Have a 'type' attribute (optionally matching the prefix) - + Example: ```python # Register all VAD events from the core module from vision_agents.core import vad manager.register_events_from_module(vad.events, prefix="plugin.vad") - + # Register all TTS events from the core module from vision_agents.core import tts manager.register_events_from_module(tts.events, prefix="plugin.tts") - + # Register all events from a plugin module from vision_agents.plugins.silero import events as silero_events manager.register_events_from_module(silero_events, prefix="plugin.silero") ``` - + Args: module: The Python module to scan for event classes prefix (str): Optional prefix to filter event types. Only events with @@ -242,7 +247,9 @@ def register_events_from_module(self, module, prefix='', ignore_not_compatible=T for incompatible classes. Defaults to True. """ for name, class_ in module.__dict__.items(): - if name.endswith('Event') and (not prefix or getattr(class_, 'type', '').startswith(prefix)): + if name.endswith("Event") and ( + not prefix or getattr(class_, "type", "").startswith(prefix) + ): self.register(class_, ignore_not_compatible=ignore_not_compatible) self._modules.setdefault(module.__name__, []).append(class_) @@ -265,20 +272,20 @@ def _generate_import_file(self): def unsubscribe(self, function): """ Unsubscribe a function from all event types. - + Removes the specified function from all event handler lists. This is useful for cleaning up handlers that are no longer needed. - + Example: ```python @manager.subscribe async def speech_handler(event: VADSpeechStartEvent): print("Speech started") - + # Later, unsubscribe the handler manager.unsubscribe(speech_handler) ``` - + Args: function: The function to unsubscribe from all event types. """ @@ -292,30 +299,30 @@ async def speech_handler(event: VADSpeechStartEvent): def subscribe(self, function): """ Subscribe a function to handle specific event types. - + The function must have type hints indicating which event types it handles. Supports both single event types and Union types for handling multiple event types. - + Example: ```python # Single event type @manager.subscribe async def handle_speech_start(event: VADSpeechStartEvent): print(f"Speech started with probability {event.speech_probability}") - + # Multiple event types using Union @manager.subscribe async def handle_audio_events(event: VADSpeechStartEvent | VADSpeechEndEvent): print(f"VAD event: {event.type}") ``` - + Args: function: The async function to subscribe as an event handler. Must have type hints for event parameters. - + Returns: The decorated function (for use as decorator). - + Raises: RuntimeError: If handler has multiple separate event parameters (use Union instead) KeyError: If event type is not registered and ignore_unknown_events is False @@ -338,44 +345,57 @@ async def handle_audio_events(event: VADSpeechStartEvent | VADSpeechEndEvent): event_type = getattr(sub_event, "type", None) if subscribed and not is_union: - raise RuntimeError("Multiple seperated events per handler are not supported, use Union instead") + raise RuntimeError( + "Multiple seperated events per handler are not supported, use Union instead" + ) if event_type in self._events: subscribed = True self._handlers.setdefault(event_type, []).append(function) - module_name = getattr(function, '__module__', 'unknown') - logger.info(f"Handler {function.__name__} from {module_name} registered for event {event_type}") + module_name = getattr(function, "__module__", "unknown") + logger.info( + f"Handler {function.__name__} from {module_name} registered for event {event_type}" + ) elif not self._ignore_unknown_events: - raise KeyError(f"Event {sub_event} - {event_type} is not registered.") + raise KeyError( + f"Event {sub_event} - {event_type} is not registered." + ) else: - module_name = getattr(function, '__module__', 'unknown') - logger.info(f"Event {sub_event} - {event_type} is not registered – skipping handler {function.__name__} from {module_name}.") + module_name = getattr(function, "__module__", "unknown") + logger.info( + f"Event {sub_event} - {event_type} is not registered – skipping handler {function.__name__} from {module_name}." + ) return function def _prepare_event(self, event): # Handle dict events - convert to event class if isinstance(event, dict): - event_type = event.get('type', '') + event_type = event.get("type", "") try: event_class = self._events[event_type] event = event_class.from_dict(event, infer_missing=True) # type: ignore[attr-defined] except Exception: logger.exception(f"Can't convert dict {event} to event class, skipping") return - + # Handle raw protobuf messages - wrap in BaseEvent subclass # Check for protobuf DESCRIPTOR but exclude already-wrapped BaseEvent subclasses - elif (hasattr(event, 'DESCRIPTOR') and hasattr(event.DESCRIPTOR, 'full_name') - and not hasattr(event, 'event_id')): # event_id is unique to BaseEvent + elif ( + hasattr(event, "DESCRIPTOR") + and hasattr(event.DESCRIPTOR, "full_name") + and not hasattr(event, "event_id") + ): # event_id is unique to BaseEvent proto_type = event.DESCRIPTOR.full_name - + # Look up the registered event class by protobuf type proto_event_class = self._events.get(proto_type) - if proto_event_class and hasattr(proto_event_class, 'from_proto'): + if proto_event_class and hasattr(proto_event_class, "from_proto"): try: event = proto_event_class.from_proto(event) except Exception: - logger.exception(f"Failed to convert protobuf {proto_type} to event class {proto_event_class}") + logger.exception( + f"Failed to convert protobuf {proto_type} to event class {proto_event_class}" + ) return else: # No matching event class found @@ -384,10 +404,10 @@ def _prepare_event(self, event): return else: raise RuntimeError(f"Protobuf event not registered: {proto_type}") - + # Validate event is registered (handles both BaseEvent and generated protobuf events) - if hasattr(event, 'type') and event.type in self._events: - #logger.info(f"Received event {_truncate_event_for_logging(event)}") + if hasattr(event, "type") and event.type in self._events: + # logger.info(f"Received event {_truncate_event_for_logging(event)}") return event elif self._ignore_unknown_events: logger.info(f"Event not registered {_truncate_event_for_logging(event)}") @@ -397,7 +417,7 @@ def _prepare_event(self, event): def silent(self, event_class): """ Silence logging for an event class from being processed. - + Args: event_class: The event class to silence """ @@ -406,11 +426,11 @@ def silent(self, event_class): def send(self, *events): """ Send one or more events for processing. - + Events are added to the queue and will be processed by the background - processing task. If an event handler raises an exception, an ExceptionEvent + processing task. If an event handler raises an exception, an ExceptionEvent is automatically created and queued for processing. - + Example: ```python # Send single event @@ -419,13 +439,13 @@ def send(self, *events): speech_probability=0.95, activation_threshold=0.5 )) - + # Send multiple events manager.send( VADSpeechStartEvent(plugin_name="silero", speech_probability=0.95), STTTranscriptEvent(plugin_name="deepgram", text="Hello world") ) - + # Send event from dictionary manager.send({ "type": "plugin.vad_speech_start", @@ -433,27 +453,26 @@ def send(self, *events): "speech_probability": 0.95 }) ``` - + Args: *events: One or more event objects or dictionaries to send. Events can be instances of registered event classes or dictionaries with a 'type' field that matches a registered event type. - + Raises: RuntimeError: If event type is not registered and ignore_unknown_events is False """ for event in events: event = self._prepare_event(event) if event: - #logger.info(f"🎯 EventManager.send: {event.__class__.__name__} - {event.type}") self._queue.append(event) - + async def wait(self, timeout: float = 10.0): """ Wait for all queued events to be processed. - + This is useful in tests to ensure events are processed before assertions. - + Args: timeout: Maximum time to wait for processing to complete """ @@ -463,19 +482,19 @@ async def wait(self, timeout: float = 10.0): if self._handler_tasks: await asyncio.wait(list(self._handler_tasks.values())) - + def _start_processing_task(self): """Start the background event processing task.""" if self._processing_task and not self._processing_task.done(): return # Already running - + loop = asyncio.get_running_loop() self._processing_task = loop.create_task(self._process_events_loop()) async def _process_events_loop(self): """ Background task that continuously processes events from the queue. - + This task runs until shutdown is requested and processes all events in the queue. It's shielded from cancellation to ensure all events are processed before shutdown. @@ -488,12 +507,18 @@ async def _process_events_loop(self): await self._process_single_event(event) except asyncio.CancelledError as exc: cancelled_exc = exc - logger.info(f"Event processing task was cancelled, processing remaining events, {len(self._queue)}") + logger.info( + f"Event processing task was cancelled, processing remaining events, {len(self._queue)}" + ) await self._process_single_event(event) elif cancelled_exc: raise cancelled_exc else: - cleanup_ids = set(task_id for task_id, task in self._handler_tasks.items() if task.done()) + cleanup_ids = set( + task_id + for task_id, task in self._handler_tasks.items() + if task.done() + ) for task_id in cleanup_ids: self._handler_tasks.pop(task_id) await asyncio.sleep(0.0001) @@ -503,17 +528,20 @@ async def _run_handler(self, handler, event): return await handler(event) except Exception as exc: self._queue.appendleft(ExceptionEvent(exc, handler)) # type: ignore[arg-type] - module_name = getattr(handler, '__module__', 'unknown') - logger.exception(f"Error calling handler {handler.__name__} from {module_name} for event {event.type}") + module_name = getattr(handler, "__module__", "unknown") + logger.exception( + f"Error calling handler {handler.__name__} from {module_name} for event {event.type}" + ) async def _process_single_event(self, event): """Process a single event.""" for handler in self._handlers.get(event.type, []): - module_name = getattr(handler, '__module__', 'unknown') + module_name = getattr(handler, "__module__", "unknown") if event.type not in self._silent_events: - logger.info(f"Called handler {handler.__name__} from {module_name} for event {event.type}") + logger.info( + f"Called handler {handler.__name__} from {module_name} for event {event.type}" + ) loop = asyncio.get_running_loop() handler_task = loop.create_task(self._run_handler(handler, event)) self._handler_tasks[uuid.uuid4()] = handler_task - diff --git a/examples/01_simple_agent_example/simple_agent_example.py b/examples/01_simple_agent_example/simple_agent_example.py index 1d4abd1b..6e65e382 100644 --- a/examples/01_simple_agent_example/simple_agent_example.py +++ b/examples/01_simple_agent_example/simple_agent_example.py @@ -29,7 +29,6 @@ async def start_agent() -> None: # realtime version (vad, tts and stt not needed) # llm=openai.Realtime() ) - await agent.create_user() # Create a call call = agent.edge.client.video.call("default", str(uuid4())) diff --git a/examples/02_golf_coach_example/golf_coach_example.py b/examples/02_golf_coach_example/golf_coach_example.py index 2a11538b..e9ce7b2b 100644 --- a/examples/02_golf_coach_example/golf_coach_example.py +++ b/examples/02_golf_coach_example/golf_coach_example.py @@ -21,8 +21,6 @@ async def start_agent() -> None: ], # realtime pose detection with yolo ) - await agent.create_user() - # create a call, some other video networks call this a room call = agent.edge.client.video.call("default", str(uuid4())) diff --git a/examples/other_examples/09_github_mcp_demo/gemini_realtime_github_mcp_demo.py b/examples/other_examples/09_github_mcp_demo/gemini_realtime_github_mcp_demo.py index da8315d6..8f1d285a 100644 --- a/examples/other_examples/09_github_mcp_demo/gemini_realtime_github_mcp_demo.py +++ b/examples/other_examples/09_github_mcp_demo/gemini_realtime_github_mcp_demo.py @@ -56,15 +56,11 @@ async def start_agent(): model="gemini-2.5-flash-native-audio-preview-09-2025", api_key=google_api_key ) - # Create real edge transport and agent user - edge = getstream.Edge() - agent_user = User(name="GitHub AI Assistant", id="github-agent") - # Create agent with GitHub MCP server and Gemini Realtime LLM agent = Agent( - edge=edge, + edge=getstream.Edge(), llm=llm, - agent_user=agent_user, + agent_user=User(name="GitHub AI Assistant", id="github-agent"), instructions="You are a helpful AI assistant with access to GitHub via MCP server. You can help with GitHub operations like creating issues, managing pull requests, searching repositories, and more. Keep responses conversational and helpful. When you need to perform GitHub operations, use the available MCP tools.", processors=[], mcp_servers=[github_server], @@ -74,9 +70,6 @@ async def start_agent(): logger.info(f"GitHub server: {github_server}") try: - # Create the agent user - await agent.create_user() - # Set up event handler for when participants join @agent.subscribe async def on_participant_joined(event: CallSessionParticipantJoinedEvent): diff --git a/examples/other_examples/09_github_mcp_demo/github_mcp_demo.py b/examples/other_examples/09_github_mcp_demo/github_mcp_demo.py index 13c488b3..e39e7eac 100644 --- a/examples/other_examples/09_github_mcp_demo/github_mcp_demo.py +++ b/examples/other_examples/09_github_mcp_demo/github_mcp_demo.py @@ -88,9 +88,6 @@ async def start_agent(): ) logger.info("MCP tools are now available to the LLM for function calling!") - # Create the agent user - await agent.create_user() - # Set up event handler for when participants join @agent.subscribe async def on_participant_joined(event: CallSessionParticipantJoinedEvent): diff --git a/examples/other_examples/09_github_mcp_demo/openai_realtime_github_mcp_demo.py b/examples/other_examples/09_github_mcp_demo/openai_realtime_github_mcp_demo.py index 92b10807..ef88c62d 100644 --- a/examples/other_examples/09_github_mcp_demo/openai_realtime_github_mcp_demo.py +++ b/examples/other_examples/09_github_mcp_demo/openai_realtime_github_mcp_demo.py @@ -72,9 +72,6 @@ async def start_agent(): logger.info(f"GitHub server: {github_server}") try: - # Create the agent user - await agent.create_user() - # Set up event handler for when participants join @agent.subscribe async def on_participant_joined(event: CallSessionParticipantJoinedEvent): diff --git a/examples/other_examples/gemini_live_realtime/gemini_live_example.py b/examples/other_examples/gemini_live_realtime/gemini_live_example.py index cf328221..e4084e11 100644 --- a/examples/other_examples/gemini_live_realtime/gemini_live_example.py +++ b/examples/other_examples/gemini_live_realtime/gemini_live_example.py @@ -4,23 +4,28 @@ from dotenv import load_dotenv from getstream import AsyncStream + +from vision_agents.core.edge.types import User from vision_agents.core.agents import Agent from vision_agents.plugins import gemini, getstream load_dotenv() -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s [call_id=%(call_id)s] %(name)s: %(message)s") +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s [call_id=%(call_id)s] %(name)s: %(message)s", +) logger = logging.getLogger(__name__) async def start_agent() -> None: client = AsyncStream() - agent_user = await client.create_user(name="My happy AI friend") - agent = Agent( edge=getstream.Edge(), - agent_user=agent_user, # the user object for the agent (name, image etc) + agent_user=User( + name="My happy AI friend" + ), # the user object for the agent (name, image etc) instructions="Read @voice-agent.md", llm=gemini.Realtime(), processors=[], # processors can fetch extra data, check images/audio data or transform video