Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.12.8
rev: v0.14.0
hooks:
# Run the linter.
- id: ruff-check
Expand Down
296 changes: 181 additions & 115 deletions agents-core/vision_agents/_generate_sfu_events.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion agents-core/vision_agents/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

from vision_agents.core.agents import Agent

__all__ = ["Agent", "User"]
__all__ = ["Agent", "User"]
1 change: 1 addition & 0 deletions agents-core/vision_agents/core/agents/agent_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class AgentSessionContextManager:
connection_cm: Optional provider-specific connection context manager
returned by the edge transport (kept open during the context).
"""

def __init__(self, agent: Agent, connection_cm=None):
self.agent = agent
self._connection_cm = connection_cm
Expand Down
131 changes: 75 additions & 56 deletions agents-core/vision_agents/core/agents/agents.py

Large diffs are not rendered by default.

99 changes: 63 additions & 36 deletions agents-core/vision_agents/core/agents/conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

logger = logging.getLogger(__name__)


@dataclass
class Message:
"""A single utterance or assistant message within a conversation.
Expand All @@ -21,6 +22,7 @@ class Message:
user_id: Logical user identifier associated with the message.
id: Unique message identifier (auto-generated if not provided).
"""

content: str
original: Optional[Any] = None # the original openai, claude or gemini message
timestamp: Optional[datetime.datetime] = None
Expand All @@ -36,23 +38,24 @@ def __post_init__(self):
@dataclass
class StreamHandle:
"""Handle for managing a streaming message.

This lightweight object is returned when starting a streaming message
and must be passed to subsequent update operations. It encapsulates
the message ID and user ID, preventing accidental cross-contamination
between concurrent streams.

Example:
# Start a streaming message
handle = conversation.start_streaming_message(role="assistant")

# Update the message using the handle
conversation.append_to_message(handle, "Hello")
conversation.append_to_message(handle, " world!")

# Complete the message
conversation.complete_message(handle)
"""

message_id: str
user_id: str

Expand All @@ -75,66 +78,77 @@ def __init__(
@abstractmethod
def add_message(self, message: Message, completed: bool = True):
"""Add a message to the conversation.

Args:
message: The Message object to add
completed: If True, mark the message as completed (not generating).
completed: If True, mark the message as completed (not generating).
If False, mark as still generating. Defaults to True.

Returns:
The result of the add operation (implementation-specific)
"""
...

@abstractmethod
def update_message(self, message_id: str, input_text: str, user_id: str, replace_content: bool, completed: bool):
def update_message(
self,
message_id: str,
input_text: str,
user_id: str,
replace_content: bool,
completed: bool,
):
"""Update an existing message or create a new one if not found.

Args:
message_id: The ID of the message to update
input_text: The text content to set or append
user_id: The ID of the user who owns the message
replace_content: If True, replace the entire message content. If False, append to existing content.
completed: If True, mark the message as completed (not generating). If False, mark as still generating.

Returns:
The result of the update operation (implementation-specific)
"""
...

# Streaming message convenience methods
def start_streaming_message(self, role: str = "assistant", user_id: Optional[str] = None,
initial_content: str = "") -> StreamHandle:
def start_streaming_message(
self,
role: str = "assistant",
user_id: Optional[str] = None,
initial_content: str = "",
) -> StreamHandle:
"""Start a new streaming message and return a handle for subsequent operations.

This method simplifies the management of streaming messages by returning a handle
that encapsulates the message ID and user ID. Use the handle with append_to_message,
replace_message, and complete_message methods.

Args:
role: The role of the message sender (default: "assistant")
user_id: The ID of the user (default: same as role)
initial_content: Initial content for the message (default: empty string)

Returns:
StreamHandle: A handle to use for subsequent operations on this message

Example:
# Simple usage
handle = conversation.start_streaming_message()
conversation.append_to_message(handle, "Processing...")
conversation.replace_message(handle, "Here's the answer: ")
conversation.append_to_message(handle, "42")
conversation.complete_message(handle)

# Multiple concurrent streams
user_handle = conversation.start_streaming_message(role="user", user_id="user123")
assistant_handle = conversation.start_streaming_message(role="assistant")

# Update both independently
conversation.append_to_message(user_handle, "Hello")
conversation.append_to_message(assistant_handle, "Hi there!")

# Complete in any order
conversation.complete_message(user_handle)
conversation.complete_message(assistant_handle)
Expand All @@ -144,7 +158,7 @@ def start_streaming_message(self, role: str = "assistant", user_id: Optional[str
content=initial_content,
role=role,
user_id=user_id or role,
id=None # Will be assigned during add
id=None, # Will be assigned during add
)
self.add_message(message, completed=False)
# The message now has an ID assigned by the add_message flow
Expand All @@ -154,10 +168,10 @@ def start_streaming_message(self, role: str = "assistant", user_id: Optional[str
assert added_message.id is not None, "Message ID should be set by add_message"
assert added_message.user_id is not None, "User ID should be set by add_message"
return StreamHandle(message_id=added_message.id, user_id=added_message.user_id)

def append_to_message(self, handle: StreamHandle, text: str):
"""Append text to a streaming message identified by the handle.

Args:
handle: The StreamHandle returned by start_streaming_message
text: Text to append to the message
Expand All @@ -167,12 +181,12 @@ def append_to_message(self, handle: StreamHandle, text: str):
input_text=text,
user_id=handle.user_id,
replace_content=False,
completed=False
completed=False,
)

def replace_message(self, handle: StreamHandle, text: str):
"""Replace the content of a streaming message identified by the handle.

Args:
handle: The StreamHandle returned by start_streaming_message
text: Text to replace the message content with
Expand All @@ -182,26 +196,28 @@ def replace_message(self, handle: StreamHandle, text: str):
input_text=text,
user_id=handle.user_id,
replace_content=True,
completed=False
completed=False,
)

def complete_message(self, handle: StreamHandle):
"""Mark a streaming message as completed.

Args:
handle: The StreamHandle returned by start_streaming_message
"""
# We need to find the message to get its current content
# so we can set completed without changing the content
message = next((msg for msg in self.messages if msg.id == handle.message_id), None)
message = next(
(msg for msg in self.messages if msg.id == handle.message_id), None
)
if message:
# Use replace mode with the current content to avoid space issues
self.update_message(
message_id=handle.message_id,
input_text=message.content,
user_id=handle.user_id,
replace_content=True,
completed=True
completed=True,
)


Expand Down Expand Up @@ -240,7 +256,14 @@ def add_message(self, message: Message, completed: bool = True):
# In-memory conversation doesn't need to handle completed flag
return None

def update_message(self, message_id: str, input_text: str, user_id: str, replace_content: bool, completed: bool):
def update_message(
self,
message_id: str,
input_text: str,
user_id: str,
replace_content: bool,
completed: bool,
):
"""Update or create a message in-memory.

If the message is not found, a new one is created with the given id.
Expand All @@ -254,10 +277,15 @@ def update_message(self, message_id: str, input_text: str, user_id: str, replace
"""
# Find the message by id
message = self.lookup(message_id)

if message is None:
logger.info(f"message {message_id} not found, create one instead")
return self.add_message(Message(user_id=user_id, id=message_id, content=input_text, original=None), completed=completed)
return self.add_message(
Message(
user_id=user_id, id=message_id, content=input_text, original=None
),
completed=completed,
)

if replace_content:
message.content = input_text
Expand All @@ -266,4 +294,3 @@ def update_message(self, message_id: str, input_text: str, user_id: str, replace

# In-memory conversation just updates the message, no external API call
return None

12 changes: 8 additions & 4 deletions agents-core/vision_agents/core/agents/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
@dataclass
class AgentSayEvent(PluginBaseEvent):
"""Event emitted when the agent wants to say something."""
type: str = field(default='agent.say', init=False)

type: str = field(default="agent.say", init=False)
text: str = ""
user_id: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
Expand All @@ -19,7 +20,8 @@ def __post_init__(self):
@dataclass
class AgentSayStartedEvent(PluginBaseEvent):
"""Event emitted when agent speech synthesis starts."""
type: str = field(default='agent.say_started', init=False)

type: str = field(default="agent.say_started", init=False)
text: str = ""
user_id: Optional[str] = None
synthesis_id: Optional[str] = None
Expand All @@ -28,7 +30,8 @@ class AgentSayStartedEvent(PluginBaseEvent):
@dataclass
class AgentSayCompletedEvent(PluginBaseEvent):
"""Event emitted when agent speech synthesis completes."""
type: str = field(default='agent.say_completed', init=False)

type: str = field(default="agent.say_completed", init=False)
text: str = ""
user_id: Optional[str] = None
synthesis_id: Optional[str] = None
Expand All @@ -38,7 +41,8 @@ class AgentSayCompletedEvent(PluginBaseEvent):
@dataclass
class AgentSayErrorEvent(PluginBaseEvent):
"""Event emitted when agent speech synthesis encounters an error."""
type: str = field(default='agent.say_error', init=False)

type: str = field(default="agent.say_error", init=False)
text: str = ""
user_id: Optional[str] = None
error: Optional[Exception] = None
Expand Down
1 change: 0 additions & 1 deletion agents-core/vision_agents/core/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ async def start_dispatcher(

await agent_func()


logger.info("🔚 Stream Agents dispatcher stopped")


Expand Down
7 changes: 4 additions & 3 deletions agents-core/vision_agents/core/edge/edge_transport.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Abstraction for stream vs other services here
"""

import abc

from typing import TYPE_CHECKING, Any, Optional
Expand All @@ -11,7 +12,6 @@
from vision_agents.core.edge.types import User

if TYPE_CHECKING:

pass


Expand Down Expand Up @@ -55,6 +55,7 @@ async def create_conversation(self, call: Any, user: User, instructions):
pass

@abc.abstractmethod
def add_track_subscriber(self, track_id: str) -> Optional[aiortc.mediastreams.MediaStreamTrack]:
def add_track_subscriber(
self, track_id: str
) -> Optional[aiortc.mediastreams.MediaStreamTrack]:
pass

12 changes: 8 additions & 4 deletions agents-core/vision_agents/core/edge/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@
@dataclass
class AudioReceivedEvent(PluginBaseEvent):
"""Event emitted when audio is received from a participant."""
type: str = field(default='plugin.edge.audio_received', init=False)

type: str = field(default="plugin.edge.audio_received", init=False)
pcm_data: Optional[PcmData] = None
participant: Optional[Any] = None


@dataclass
class TrackAddedEvent(PluginBaseEvent):
"""Event emitted when a track is added to the call."""
type: str = field(default='plugin.edge.track_added', init=False)

type: str = field(default="plugin.edge.track_added", init=False)
track_id: Optional[str] = None
track_type: Optional[int] = None
user: Optional[Any] = None
Expand All @@ -25,7 +27,8 @@ class TrackAddedEvent(PluginBaseEvent):
@dataclass
class TrackRemovedEvent(PluginBaseEvent):
"""Event emitted when a track is removed from the call."""
type: str = field(default='plugin.edge.track_removed', init=False)

type: str = field(default="plugin.edge.track_removed", init=False)
track_id: Optional[str] = None
track_type: Optional[int] = None
user: Optional[Any] = None
Expand All @@ -34,6 +37,7 @@ class TrackRemovedEvent(PluginBaseEvent):
@dataclass
class CallEndedEvent(PluginBaseEvent):
"""Event emitted when a call ends."""
type: str = field(default='plugin.edge.call_ended', init=False)

type: str = field(default="plugin.edge.call_ended", init=False)
args: Optional[tuple] = None
kwargs: Optional[dict] = None
Loading