-
Notifications
You must be signed in to change notification settings - Fork 101
fix for agent published tracks internally, no event spawn, added user… #91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… creation instead of none user name
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (1)
150-169: Fix None-participant crash in removal flow (ParticipantLeftEvent)When participant is None, the else-branch dereferences participant.published_tracks, causing an AttributeError. Compute tracks_to_remove from payload if available, otherwise derive from _track_map.
- if not participant: - user_id = event.payload.user_id - session_id = event.payload.session_id - else: - user_id = event.participant.user_id - session_id = event.participant.session_id + if not participant: + user_id = event.payload.user_id + session_id = event.payload.session_id + else: + user_id = participant.user_id + session_id = participant.session_id @@ - if hasattr(event.payload, 'type') and event.payload is not None: + if getattr(event.payload, "type", None) is not None: # TrackUnpublishedEvent - single track tracks_to_remove = [event.payload.type] event_desc = "Track unpublished" else: # ParticipantLeftEvent - all published tracks - tracks_to_remove = participant.published_tracks or [] + if participant and getattr(participant, "published_tracks", None) is not None: + tracks_to_remove = participant.published_tracks or [] + elif hasattr(event.payload, "published_tracks"): + tracks_to_remove = event.payload.published_tracks or [] + else: + # Fallback: remove any known tracks for this user/session + tracks_to_remove = [t for (uid, sid, t) in self._track_map.keys() if uid == user_id and sid == session_id] event_desc = "Participant left"
🧹 Nitpick comments (2)
agents-core/vision_agents/core/events/manager.py (1)
183-185: Warn when overwriting an existing event type to avoid silent shadowingUnconditional assignment can mask duplicate registrations. Emit a warning when replacing a different class for the same type.
- self._events[event_class.type] = event_class - logger.debug(f"Registered new event {event_class} - {event_class.type}") + existing = self._events.get(event_class.type) + self._events[event_class.type] = event_class + if existing and existing is not event_class: + logger.warning(f"Overwriting event '{event_class.type}': {existing} -> {event_class}") + else: + logger.debug(f"Registered event {event_class} - {event_class.type}")plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (1)
130-137: Handle missing participant on TrackPublishedEventIf event.participant is None, TrackAddedEvent gets None for user fields. Fall back to a lightweight participant from payload.
- # NOW spawn TrackAddedEvent with correct type - self.events.send(events.TrackAddedEvent( + # NOW spawn TrackAddedEvent with correct type + resolved_participant = event.participant or Participant(user_id=user_id, session_id=session_id) + self.events.send(events.TrackAddedEvent( plugin_name="getstream", track_id=track_id, track_type=track_type_name, - user=event.participant, - user_metadata=event.participant + user=resolved_participant, + user_metadata=resolved_participant ))Please confirm Participant(user_id=..., session_id=...) construction is valid for this pb2 type in your SDK version.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
agents-core/vision_agents/core/events/manager.py(1 hunks)plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py(7 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (.cursor/rules/python.mdc)
**/*.py: Never adjust sys.path (e.g., sys.path.append/insert/assignment)
Docstrings must follow the Google style guide
Files:
agents-core/vision_agents/core/events/manager.pyplugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py
🔇 Additional comments (4)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (4)
60-60: agent_user_id addition looks goodTracks authored by the agent can now be filtered reliably.
Confirm agent_user_id is always set (create_user) before any SFU events can reach this instance in production flows.
203-205: Persisting agent_user_id before user creation is correctEnsures self-authored tracks can be filtered immediately.
247-249: Manual aenter call — confirm API expectations and cleanupDirectly awaiting connection.aenter() is unusual. Verify this is a supported public API and that close()/leave() pairs correctly with it on errors and shutdown.
294-296: Creating the demo user before token generation is soundPrevents “none” username cases and ensures the user exists server-side.
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (1)
149-169: Fix None-participant crash and ignore agent-owned removalsParticipantLeftEvent may have participant=None; current code dereferences it. Also suppress agent-owned removals to avoid spurious events.
Apply:
async def _on_track_removed(self, event: sfu_events.ParticipantLeftEvent | sfu_events.TrackUnpublishedEvent): """Handle track unpublished and participant left events.""" if not event.payload: return participant = event.participant if not participant: user_id = event.payload.user_id session_id = event.payload.session_id else: user_id = event.participant.user_id session_id = event.participant.session_id - # Determine which tracks to remove - if hasattr(event.payload, 'type') and event.payload is not None: - # TrackUnpublishedEvent - single track - tracks_to_remove = [event.payload.type] - event_desc = "Track unpublished" - else: - # ParticipantLeftEvent - all published tracks - tracks_to_remove = participant.published_tracks or [] - event_desc = "Participant left" + # Ignore agent-owned tracks; handled internally + if user_id == self.agent_user_id: + self.logger.debug(f"Ignoring agent-owned track removal: user={user_id}") + return + + # Determine which tracks to remove + if hasattr(event.payload, 'type'): + # TrackUnpublishedEvent - single track + tracks_to_remove = [event.payload.type] + event_desc = "Track unpublished" + else: + # ParticipantLeftEvent - all published tracks + if participant and getattr(participant, 'published_tracks', None): + tracks_to_remove = participant.published_tracks + else: + # Fallback: derive from track map for this user/session + tracks_to_remove = [ + tt for (uid, sid, tt), info in self._track_map.items() + if uid == user_id and sid == session_id and info.get("published") + ] + event_desc = "Participant left"
♻️ Duplicate comments (1)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (1)
235-239: Ignore agent-owned tracks in track_added to prevent pending leaksWithout guarding, agent-owned tracks linger in _pending_tracks since _on_track_published ignores them. Early-return here.
Apply:
@connection.on("track_added") async def on_track(track_id, track_type, user): - # Store track in pending map - wait for SFU to confirm type before spawning TrackAddedEvent - self._pending_tracks[track_id] = (user.user_id, user.session_id, track_type) - self.logger.info(f"Track received from WebRTC (pending SFU confirmation): {track_id}, type: {track_type}, user: {user.user_id}") + # Ignore agent-owned tracks; handled internally, no events should be spawned + if user.user_id == self.agent_user_id: + self.logger.debug(f"Ignoring agent-owned track in pending map: {track_id} ({track_type})") + return + # Store track in pending map - wait for SFU to confirm type before spawning TrackAddedEvent + self._pending_tracks[track_id] = (user.user_id, user.session_id, track_type) + self.logger.info(f"Track received from WebRTC (pending SFU confirmation): {track_id}, type: {track_type}, user: {user.user_id}")
🧹 Nitpick comments (1)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (1)
107-107: Reduce polling intensity (optional)poll_interval=10ms can be CPU-heavy. 50–100ms (or simple backoff) is usually sufficient.
Apply:
- poll_interval = 0.01 + poll_interval = 0.05
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py(7 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (.cursor/rules/python.mdc)
**/*.py: Never adjust sys.path (e.g., sys.path.append/insert/assignment)
Docstrings must follow the Google style guide
Files:
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py
🧬 Code graph analysis (1)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (5)
agents-core/vision_agents/core/edge/sfu_events.py (11)
user_id(446-450)user_id(761-765)user_id(798-802)user_id(1047-1051)user_id(1852-1856)user_id(1890-1894)participant(1322-1327)participant(1360-1365)participant(1421-1426)participant(1859-1864)participant(1904-1909)agents-core/vision_agents/core/edge/types.py (1)
TrackType(28-33)plugins/deepgram/tests/test_realtime.py (1)
on(18-21)agents-core/vision_agents/core/events/manager.py (1)
silent(397-404)agents-core/vision_agents/core/edge/events.py (1)
AudioReceivedEvent(9-13)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: unit / Ruff & mypy
- GitHub Check: unit / Test "not integration"
- GitHub Check: unit / Test "not integration"
- GitHub Check: unit / Ruff & mypy
🔇 Additional comments (5)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (5)
85-90: Good guard for agent-owned published tracksEarly-returning for agent’s own tracks aligns with “no event spawn” requirement.
230-233: Verify rtc.join + manual aenter lifecycleCalling await rtc.join(...) then await connection.aenter() is unusual. Ensure this is the intended non-context-manager usage and won’t double-enter or leak.
Do we have a connect/start method to prefer over aenter?
Also applies to: 251-252
241-243: Confirm intent of silencing AudioReceivedEventevents.silent(AudioReceivedEvent) before sending this event will suppress processing/logging depending on EventManager behavior. Confirm this is desired.
298-299: Create user before token generation — LGTMPrevents “none” username and ensures user existence for browser join.
60-60: agent_user_id field — LGTMNeeded to filter agent-owned tracks across handlers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (1)
239-243: Verify agent track filtering in pending map.While Line 94-95 filters agent-owned tracks in
_on_track_published, theon_trackhandler at Line 239-243 stores ALL tracks (including agent-owned) in_pending_tracks. Since_on_track_publishedreturns early for agent tracks, these pending entries will never be removed, causing a memory leak.Apply the fix suggested in the previous review:
@connection.on("track_added") async def on_track(track_id, track_type, user): + # Ignore agent-owned tracks; handled internally, no events should be spawned + if user.user_id == self.agent_user_id: + self.logger.debug(f"Ignoring agent-owned track in pending map: {track_id} ({track_type})") + return # Store track in pending map - wait for SFU to confirm type before spawning TrackAddedEvent self._pending_tracks[track_id] = (user.user_id, user.session_id, track_type) self.logger.info(f"Track received from WebRTC (pending SFU confirmation): {track_id}, type: {track_type}, user: {user.user_id}")
🧹 Nitpick comments (2)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (2)
59-59: Document agent_user_id attribute.The new
agent_user_idattribute lacks a docstring. Add documentation explaining when it's set and its purpose in filtering agent-owned tracks.Consider adding a docstring to the class or attribute:
class StreamEdge(EdgeTransport): """ StreamEdge uses getstream.io's edge network. To support multiple vendors, this means we expose """ client: AsyncStream + # ID of the agent user, set during user creation. Used to filter agent-owned tracks. + agent_user_id: str | None = None
111-111: Poll interval is aggressive.
poll_interval = 0.01results in 100 checks per second during the 10-second timeout window. This creates up to 1000 iterations per track. Consider increasing to 0.05 (20 checks/sec) or 0.1 (10 checks/sec) to reduce CPU overhead while maintaining responsiveness.- poll_interval = 0.01 + poll_interval = 0.05 # 20 checks per second
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
agents-core/vision_agents/core/agents/agents.py(2 hunks)agents-core/vision_agents/core/edge/events.py(2 hunks)agents-core/vision_agents/core/edge/types.py(1 hunks)plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py(8 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (.cursor/rules/python.mdc)
**/*.py: Never adjust sys.path (e.g., sys.path.append/insert/assignment)
Docstrings must follow the Google style guide
Files:
agents-core/vision_agents/core/agents/agents.pyplugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.pyagents-core/vision_agents/core/edge/events.pyagents-core/vision_agents/core/edge/types.py
🧬 Code graph analysis (3)
agents-core/vision_agents/core/agents/agents.py (2)
agents-core/vision_agents/core/edge/events.py (2)
AudioReceivedEvent(9-13)TrackAddedEvent(17-22)agents-core/vision_agents/core/edge/sfu_events.py (9)
Participant(204-244)track_type(528-532)track_type(1054-1058)track_type(2023-2027)participant(1322-1327)participant(1360-1365)participant(1421-1426)participant(1859-1864)participant(1904-1909)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (5)
agents-core/vision_agents/core/edge/sfu_events.py (15)
participant(1322-1327)participant(1360-1365)participant(1421-1426)participant(1859-1864)participant(1904-1909)user_id(446-450)user_id(761-765)user_id(798-802)user_id(1047-1051)user_id(1852-1856)user_id(1890-1894)track_type(528-532)track_type(1054-1058)track_type(2023-2027)name(1935-1939)agents-core/vision_agents/core/events/manager.py (2)
send(406-449)silent(397-404)agents-core/vision_agents/core/edge/events.py (3)
TrackAddedEvent(17-22)TrackRemovedEvent(26-31)AudioReceivedEvent(9-13)agents-core/vision_agents/core/agents/agents.py (3)
join(198-269)on_track(547-556)create_user(385-394)plugins/openai/vision_agents/plugins/openai/rtc_manager.py (1)
on_track(274-275)
agents-core/vision_agents/core/edge/events.py (1)
agents-core/vision_agents/core/edge/sfu_events.py (3)
track_type(528-532)track_type(1054-1058)track_type(2023-2027)
🔇 Additional comments (7)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (4)
87-95: Agent track filtering looks good.The logic correctly derives
user_idandsession_idfromparticipantorpayloadwith appropriate fallback, then filters agent-owned tracks early to prevent event spawning.
155-164: Robust user_id/session_id derivation.The fallback logic for extracting
user_idandsession_idhandles bothTrackUnpublishedEventandParticipantLeftEventconsistently.
210-212: Agent user ID properly initialized.Setting
self.agent_user_idduring user creation ensures the filter in_on_track_publishedhas the correct value before tracks are added.
299-303: Demo user creation ordering is correct.Creating the GetStream user before generating the token ensures the user exists when the token references it.
agents-core/vision_agents/core/edge/events.py (2)
25-31: No changes required: removal event handlers already accept integertrack_type
Consumer code (e.g.,stream_edge_transport.py) usestrack_type_int, matching the updatedOptional[int].
16-22: TrackAddedEvent.track_type API change validated
Consumers already use integer track_type; document this breaking change in the release notes.agents-core/vision_agents/core/agents/agents.py (1)
585-589: Comparing int track_type to TrackType enum is valid
TrackType constants are integer values from the protobuf and are used consistently across the code; no change required.
| await connection.__aenter__() # TODO: weird API? there should be a manual version | ||
| self._connection = connection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Manual context manager usage is unconventional.
Line 255 calls await connection.__aenter__() manually instead of using async with. This bypasses the standard context manager protocol and may skip cleanup on exceptions.
Refactor to use proper context manager:
- connection = await rtc.join(
+ async with await rtc.join(
call, agent.agent_user.id, subscription_config=subscription_config
- )
+ ) as connection:
@connection.on("track_added")
async def on_track(track_id, track_type, user):
# Store track in pending map - wait for SFU to confirm type before spawning TrackAddedEvent
self._pending_tracks[track_id] = (user.user_id, user.session_id, track_type)
self.logger.info(f"Track received from WebRTC (pending SFU confirmation): {track_id}, type: {track_type}, user: {user.user_id}")
self.events.silent(events.AudioReceivedEvent)
@connection.on("audio")
async def on_audio_received(pcm: PcmData, participant: Participant):
self.events.send(events.AudioReceivedEvent(
plugin_name="getstream",
pcm_data=pcm,
participant=participant,
user_metadata=participant
))
- await connection.__aenter__() # TODO: weird API? there should be a manual version
- self._connection = connection
+ self._connection = connection
- standardize_connection = StreamConnection(connection)
- return standardize_connection
+ standardize_connection = StreamConnection(connection)
+ return standardize_connectionNote: This may require adjusting the caller to handle the context manager lifecycle differently. Verify the intended connection lifetime.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py
around lines 255-256, the code calls await connection.__aenter__() manually
which bypasses normal async context manager semantics and can skip cleanup on
exceptions; replace this manual call with proper async with usage or refactor so
the caller manages the connection context: either open the connection inside an
async with block and perform needed setup within that block, or provide a
connection manager method that returns an async context manager and store the
active connection only for the intended lifetime while ensuring
connection.__aexit__() is invoked on close; update any callers to await the
async with (or call the new open/close methods) and verify the intended
connection lifetime and exception cleanup are preserved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
README.md (1)
27-29: Trim the duplicated paragraph in Sports Coaching.Lines 27-29 repeat the same explanation that already appears on Lines 43-45; keeping both copies bloats the section. Please drop one of them for readability.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
README.md(3 hunks)
… creation instead of none user name
Summary by CodeRabbit
Bug Fixes
Refactor
Chores
Documentation
Breaking Changes