Skip to content

Conversation

@tschellenbach
Copy link
Member

@tschellenbach tschellenbach commented Nov 7, 2025

  • Cleanup order of functions in Agent
  • Fixed track selection logic. Simplified now
  • Audio queue for the agent
  • Simplified event integration
  • New video forwarder
  • New tests for video forwarder
  • New audio queue class

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced audio buffering system with configurable duration-based retrieval and sample management.
    • Added agent configuration options for customizable model directory settings.
    • Enhanced video frame forwarding with multi-handler support and per-handler FPS throttling.
  • Refactor

    • Streamlined processor integrations with simplified frame handler registration.
    • Reorganized internal video queue implementations for improved consistency.
  • Tests

    • Added comprehensive audio queue test suite covering initialization, concurrent operations, and edge cases.

@coderabbitai
Copy link

coderabbitai bot commented Nov 7, 2025

Caution

Review failed

The pull request is closed.

Walkthrough

This PR refactors the agent infrastructure with new audio/video utilities: introduces AudioQueue for buffered audio streaming, restructures video track metadata with TrackInfo, migrates VideoForwarder from event-consumer to frame-handler API, renames LatestNQueue to VideoLatestNQueue, adds distributed tracing and logging to Agent, and updates all plugin implementations accordingly.

Changes

Cohort / File(s) Summary
Configuration & Dependencies
.cursor/rules/python.mdc, agents-core/pyproject.toml
Added Python style rule forbidding bare except Exception clauses; enabled local editable GetStream dependency.
Core Audio Utilities
agents-core/vision_agents/core/utils/audio_queue.py
New AsyncQueue-based AudioQueue class with duration/sample-based retrieval, rate mismatch detection, buffer limits, and comprehensive put/get interfaces for streaming PCM audio.
Core Agent Infrastructure
agents-core/vision_agents/core/agents/agent_options.py, agents-core/vision_agents/core/agents/agents.py, agents-core/vision_agents/core/processors/base_processor.py
Added AgentOptions dataclass with merge/update semantics; substantially refactored Agent class with TrackInfo metadata structure, AudioQueue integration, distributed tracing (_start_tracing, _end_tracing, span), new logging adapter, lifecycle hooks (start/stop), and track event handling (_on_track_added, _on_track_removed); added required name: str attribute to Processor protocol.
Core Video Queue & Forwarder
agents-core/vision_agents/core/utils/video_queue.py, agents-core/vision_agents/core/utils/video_forwarder.py
Renamed LatestNQueue to VideoLatestNQueue; refactored VideoForwarder from single-queue to handler-based architecture with add_frame_handler/remove_frame_handler methods, per-handler FPS throttling, and shared producer/consumer tasks.
Video Track Updates
agents-core/vision_agents/core/utils/video_track.py, plugins/heygen/vision_agents/plugins/heygen/heygen_video_track.py, plugins/moondream/vision_agents/plugins/moondream/detection/moondream_video_track.py, plugins/moondream/vision_agents/plugins/moondream/vlm/moondream_local_vlm.py
Updated frame_queue type annotation and imports from LatestNQueue to VideoLatestNQueue across video track implementations.
Example Cleanup
examples/01_simple_agent_example/simple_agent_example.py
Removed vogent import and turn_detection parameter from Agent initialization.
Plugin: Frame Handler Migration
plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py, plugins/openai/vision_agents/plugins/openai/rtc_manager.py, plugins/moondream/vision_agents/plugins/moondream/detection/moondream_cloud_processor.py, plugins/moondream/vision_agents/plugins/moondream/detection/moondream_local_processor.py
Replaced VideoForwarder start_event_consumer calls with add_frame_handler; updated consumer_name parameter to name.
Plugin: Moondream VLM & Processor Updates
plugins/ultralytics/vision_agents/plugins/ultralytics/yolo_pose_processor.py, plugins/moondream/vision_agents/plugins/moondream/vlm/moondream_cloud_vlm.py
Added name attribute to YOLOPoseProcessor; migrated Moondream VLM implementations from LatestNQueue to VideoLatestNQueue and from start_event_consumer to add_frame_handler; updated VideoForwarder constructor to use input_track keyword argument.
Test Suite
tests/test_audio_queue.py, tests/test_queue_and_video_forwarder.py
Added comprehensive AudioQueue test suite covering initialization, put/get operations, buffer limits, sample-rate mismatches, and concurrent scenarios; refactored VideoForwarder/queue tests to exercise multiple frame handlers, FPS constraints, and new handler lifecycle API.

Sequence Diagram(s)

sequenceDiagram
    participant Producer as Frame Producer
    participant VF as VideoForwarder
    participant NH as Frame Handler
    participant CB as Callback (app)
    
    rect rgb(200, 220, 240)
    Note over Producer,CB: New Frame-Handler Pattern (add-on, not replacement)
    end
    
    Producer->>VF: Frame arrives
    activate VF
    VF->>VF: _producer enqueues to buffer
    deactivate VF
    
    activate VF
    loop For each registered handler
        VF->>NH: Check FPS throttle & last_ts
        alt FPS allow (elapsed >= interval)
            NH->>CB: Call handler callback (sync or async)
            activate CB
            CB-->>NH: Frame processed
            deactivate CB
            NH->>VF: Update last_ts
        else FPS skip
            NH->>VF: Skip frame (throttle active)
        end
    end
    deactivate VF
Loading
sequenceDiagram
    participant App as Application
    participant AQ as AudioQueue
    participant Buffer as Internal Buffer
    
    rect rgb(220, 240, 200)
    Note over App,Buffer: AudioQueue: Buffering & Duration-based Retrieval
    end
    
    App->>AQ: put(PcmData)
    activate AQ
    AQ->>Buffer: Enqueue, track sample rate
    AQ->>AQ: _current_duration_ms check
    alt Duration exceeds limit
        AQ->>App: Warn (buffer limit approaching)
    end
    deactivate AQ
    
    App->>AQ: get_duration(100ms)
    activate AQ
    AQ->>AQ: Convert 100ms → sample count
    loop Dequeue items until samples met
        AQ->>Buffer: Dequeue & accumulate
        alt Partial chunk match
            AQ->>AQ: Split, re-enqueue remainder
        end
    end
    AQ->>App: Return merged PcmData (100ms worth)
    deactivate AQ
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

  • AudioQueue implementation requires careful review of async/sync interfaces, buffer management, and sample-rate/duration arithmetic.
  • Agent class refactoring involves substantial new infrastructure (TrackInfo, tracing context, logging adapter, lifecycle hooks, track event handlers) with interconnected logic.
  • VideoForwarder API redesign changes the callback registration model; multiple plugins updated consistently, but the underlying producer/consumer task orchestration is non-trivial.
  • Queue type migration (LatestNQueue → VideoLatestNQueue) is relatively homogeneous across ~7 files, reducing per-file review overhead.
  • Processor protocol change (adding name: str attribute) is straightforward but affects all implementations.

Possibly related PRs

Suggested labels

core-agents, tests

Suggested reviewers

  • dangusev
  • yarikdevcom
  • maxkahan

Poem

Bell jar frames the refactor's glint,
Each track a memory forged in TrackInfo's mint—
AudioQueue hums its buffered song,
Handlers chain where frames belong,
Plath-dark code that cuts so clean,
Tracing threads through the machine.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 58.77% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title '[AI-246] Agents cleanup - part one' clearly summarizes the main change: cleaning up the Agent class and related components as part of a larger refactoring effort.
Linked Issues check ✅ Passed The PR implements the primary objective of AI-246: cleaning up and refactoring the Agent class by reordering functions, simplifying track selection, introducing an audio queue, and simplifying event integration.
Out of Scope Changes check ✅ Passed All changes are scoped to Agent class cleanup and supporting infrastructure. Updates to related classes (AudioQueue, VideoForwarder, TrackInfo) and imports across plugins are necessary supporting changes for the refactoring.

📜 Recent review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 0022a2e and eff257c.

📒 Files selected for processing (5)
  • plugins/moondream/vision_agents/plugins/moondream/detection/moondream_cloud_processor.py (2 hunks)
  • plugins/moondream/vision_agents/plugins/moondream/detection/moondream_local_processor.py (2 hunks)
  • plugins/moondream/vision_agents/plugins/moondream/detection/moondream_video_track.py (2 hunks)
  • plugins/moondream/vision_agents/plugins/moondream/vlm/moondream_cloud_vlm.py (3 hunks)
  • plugins/moondream/vision_agents/plugins/moondream/vlm/moondream_local_vlm.py (3 hunks)

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@tschellenbach tschellenbach marked this pull request as ready for review November 7, 2025 18:29
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (3)
.cursor/rules/python.mdc (1)

11-12: Enhance the exception handling rule with guidance on alternatives.

The new rule correctly discourages bare except Exception as e, but developers may benefit from guidance on what to use instead. Consider clarifying the rule with examples or approved patterns.

For instance, you might add a note like:

  • Catch specific exceptions (e.g., except ValueError as e)
  • Use except Exception as e: raise if you need to log and re-raise
  • Avoid bare except: (catches KeyboardInterrupt, SystemExit)

Here's a suggested enhancement:

-never write: except Exception as e
+never write: except Exception as e (catch specific exceptions instead, or use except Exception as e: raise if logging is needed)
plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py (1)

424-425: Consider adding a name parameter for consistency.

While the callback registration is functionally correct, adding an explicit name parameter (e.g., name="gemini") would improve logging consistency with the shared forwarder path and aid debugging.

Apply this diff to add the name parameter:

-            # Add frame handler (starts automatically)
-            self._video_forwarder.add_frame_handler(self._send_video_frame)
+            # Add frame handler (starts automatically)
+            self._video_forwarder.add_frame_handler(self._send_video_frame, name="gemini")
agents-core/vision_agents/core/agents/agent_options.py (1)

23-24: Consider adding return type annotation.

The function would benefit from a return type annotation for better IDE support and type checking:

-def default_agent_options():
+def default_agent_options() -> AgentOptions:
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 290849e and 0022a2e.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (19)
  • .cursor/rules/python.mdc (1 hunks)
  • agents-core/pyproject.toml (1 hunks)
  • agents-core/vision_agents/core/agents/agent_options.py (1 hunks)
  • agents-core/vision_agents/core/agents/agents.py (15 hunks)
  • agents-core/vision_agents/core/processors/base_processor.py (1 hunks)
  • agents-core/vision_agents/core/utils/audio_queue.py (1 hunks)
  • agents-core/vision_agents/core/utils/video_forwarder.py (1 hunks)
  • agents-core/vision_agents/core/utils/video_queue.py (1 hunks)
  • agents-core/vision_agents/core/utils/video_track.py (2 hunks)
  • examples/01_simple_agent_example/simple_agent_example.py (2 hunks)
  • plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py (2 hunks)
  • plugins/heygen/vision_agents/plugins/heygen/heygen_video_track.py (2 hunks)
  • plugins/moondream/vision_agents/plugins/moondream/moondream_cloud_processor.py (2 hunks)
  • plugins/moondream/vision_agents/plugins/moondream/moondream_local_processor.py (2 hunks)
  • plugins/moondream/vision_agents/plugins/moondream/moondream_video_track.py (2 hunks)
  • plugins/openai/vision_agents/plugins/openai/rtc_manager.py (1 hunks)
  • plugins/ultralytics/vision_agents/plugins/ultralytics/yolo_pose_processor.py (2 hunks)
  • tests/test_audio_queue.py (1 hunks)
  • tests/test_queue_and_video_forwarder.py (9 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py

📄 CodeRabbit inference engine (.cursor/rules/python.mdc)

**/*.py: Do not modify sys.path in Python code
Docstrings must follow the Google style guide

Files:

  • plugins/heygen/vision_agents/plugins/heygen/heygen_video_track.py
  • examples/01_simple_agent_example/simple_agent_example.py
  • agents-core/vision_agents/core/utils/video_queue.py
  • plugins/moondream/vision_agents/plugins/moondream/moondream_video_track.py
  • agents-core/vision_agents/core/processors/base_processor.py
  • agents-core/vision_agents/core/utils/audio_queue.py
  • agents-core/vision_agents/core/utils/video_forwarder.py
  • tests/test_queue_and_video_forwarder.py
  • plugins/moondream/vision_agents/plugins/moondream/moondream_local_processor.py
  • plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py
  • plugins/moondream/vision_agents/plugins/moondream/moondream_cloud_processor.py
  • tests/test_audio_queue.py
  • plugins/ultralytics/vision_agents/plugins/ultralytics/yolo_pose_processor.py
  • agents-core/vision_agents/core/agents/agent_options.py
  • agents-core/vision_agents/core/utils/video_track.py
  • plugins/openai/vision_agents/plugins/openai/rtc_manager.py
  • agents-core/vision_agents/core/agents/agents.py
tests/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/python.mdc)

tests/**/*.py: Never use mocking utilities (e.g., unittest.mock, pytest-mock) in test files
Write tests using pytest (avoid unittest.TestCase or other frameworks)
Mark integration tests with @pytest.mark.integration
Do not use @pytest.mark.asyncio; async support is automatic

Files:

  • tests/test_queue_and_video_forwarder.py
  • tests/test_audio_queue.py
🧠 Learnings (4)
📚 Learning: 2025-10-20T19:23:41.259Z
Learnt from: CR
Repo: GetStream/Vision-Agents PR: 0
File: .cursor/rules/python.mdc:0-0
Timestamp: 2025-10-20T19:23:41.259Z
Learning: Applies to tests/**/*.py : Write tests using pytest (avoid unittest.TestCase or other frameworks)

Applied to files:

  • .cursor/rules/python.mdc
📚 Learning: 2025-10-20T19:23:41.259Z
Learnt from: CR
Repo: GetStream/Vision-Agents PR: 0
File: .cursor/rules/python.mdc:0-0
Timestamp: 2025-10-20T19:23:41.259Z
Learning: Applies to tests/**/*.py : Never use mocking utilities (e.g., unittest.mock, pytest-mock) in test files

Applied to files:

  • .cursor/rules/python.mdc
📚 Learning: 2025-10-20T19:23:41.259Z
Learnt from: CR
Repo: GetStream/Vision-Agents PR: 0
File: .cursor/rules/python.mdc:0-0
Timestamp: 2025-10-20T19:23:41.259Z
Learning: Applies to **/*.py : Docstrings must follow the Google style guide

Applied to files:

  • .cursor/rules/python.mdc
📚 Learning: 2025-10-20T19:23:41.259Z
Learnt from: CR
Repo: GetStream/Vision-Agents PR: 0
File: .cursor/rules/python.mdc:0-0
Timestamp: 2025-10-20T19:23:41.259Z
Learning: Applies to **/*.py : Do not modify sys.path in Python code

Applied to files:

  • .cursor/rules/python.mdc
🧬 Code graph analysis (13)
plugins/heygen/vision_agents/plugins/heygen/heygen_video_track.py (1)
agents-core/vision_agents/core/utils/video_queue.py (1)
  • VideoLatestNQueue (6-28)
plugins/moondream/vision_agents/plugins/moondream/moondream_video_track.py (1)
agents-core/vision_agents/core/utils/video_queue.py (1)
  • VideoLatestNQueue (6-28)
agents-core/vision_agents/core/processors/base_processor.py (1)
agents-core/vision_agents/core/edge/sfu_events.py (1)
  • name (2197-2201)
agents-core/vision_agents/core/utils/video_forwarder.py (3)
agents-core/vision_agents/core/utils/video_queue.py (2)
  • VideoLatestNQueue (6-28)
  • put_latest (14-20)
tests/test_queue_and_video_forwarder.py (2)
  • handler (169-170)
  • handler (186-187)
conftest.py (2)
  • recv (315-338)
  • recv (361-366)
tests/test_queue_and_video_forwarder.py (3)
agents-core/vision_agents/core/utils/video_queue.py (1)
  • VideoLatestNQueue (6-28)
conftest.py (1)
  • bunny_video_track (300-344)
agents-core/vision_agents/core/utils/video_forwarder.py (4)
  • VideoForwarder (24-147)
  • add_frame_handler (48-74)
  • stop (102-112)
  • remove_frame_handler (76-92)
plugins/moondream/vision_agents/plugins/moondream/moondream_local_processor.py (2)
agents-core/vision_agents/core/utils/video_forwarder.py (1)
  • add_frame_handler (48-74)
plugins/moondream/vision_agents/plugins/moondream/moondream_cloud_processor.py (1)
  • _process_and_add_frame (204-233)
plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py (2)
agents-core/vision_agents/core/utils/video_forwarder.py (1)
  • add_frame_handler (48-74)
plugins/openai/vision_agents/plugins/openai/rtc_manager.py (1)
  • _send_video_frame (268-274)
plugins/moondream/vision_agents/plugins/moondream/moondream_cloud_processor.py (2)
agents-core/vision_agents/core/utils/video_forwarder.py (1)
  • add_frame_handler (48-74)
plugins/moondream/vision_agents/plugins/moondream/moondream_local_processor.py (1)
  • _process_and_add_frame (315-347)
tests/test_audio_queue.py (2)
agents-core/vision_agents/core/utils/audio_queue.py (11)
  • AudioQueue (12-274)
  • empty (36-38)
  • put (50-83)
  • qsize (40-42)
  • get (119-136)
  • put_nowait (85-117)
  • get_nowait (138-152)
  • get_samples (154-237)
  • get_duration (239-258)
  • get_buffer_info (260-274)
  • _current_duration_ms (44-48)
tests/base_test.py (1)
  • BaseTest (11-111)
plugins/ultralytics/vision_agents/plugins/ultralytics/yolo_pose_processor.py (1)
agents-core/vision_agents/core/utils/video_forwarder.py (1)
  • add_frame_handler (48-74)
agents-core/vision_agents/core/utils/video_track.py (1)
agents-core/vision_agents/core/utils/video_queue.py (1)
  • VideoLatestNQueue (6-28)
plugins/openai/vision_agents/plugins/openai/rtc_manager.py (2)
agents-core/vision_agents/core/utils/video_forwarder.py (1)
  • add_frame_handler (48-74)
plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py (1)
  • _send_video_frame (435-447)
agents-core/vision_agents/core/agents/agents.py (9)
agents-core/vision_agents/core/agents/agent_options.py (3)
  • AgentOptions (6-16)
  • default_agent_options (23-24)
  • update (9-16)
agents-core/vision_agents/core/utils/audio_queue.py (4)
  • AudioQueue (12-274)
  • put (50-83)
  • get_duration (239-258)
  • get (119-136)
agents-core/vision_agents/core/edge/sfu_events.py (22)
  • TrackInfo (392-428)
  • participant (1496-1501)
  • participant (1504-1507)
  • participant (1545-1550)
  • participant (1553-1556)
  • participant (1625-1630)
  • participant (1633-1636)
  • participant (2100-2105)
  • participant (2108-2111)
  • participant (2156-2161)
  • participant (2164-2167)
  • Participant (229-270)
  • track_type (579-583)
  • track_type (1193-1197)
  • track_type (2289-2293)
  • user_id (489-493)
  • user_id (856-860)
  • user_id (901-905)
  • user_id (1186-1190)
  • user_id (2093-2097)
  • user_id (2142-2146)
  • name (2197-2201)
agents-core/vision_agents/core/utils/video_forwarder.py (1)
  • VideoForwarder (24-147)
agents-core/vision_agents/core/events/manager.py (2)
  • send (428-472)
  • subscribe (301-370)
agents-core/vision_agents/core/edge/events.py (3)
  • TrackAddedEvent (18-24)
  • TrackRemovedEvent (28-34)
  • AudioReceivedEvent (9-14)
plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py (4)
  • simple_response (119-134)
  • simple_audio_response (136-164)
  • watch_video_track (388-427)
  • Realtime (53-679)
agents-core/vision_agents/core/llm/llm.py (6)
  • simple_response (73-79)
  • simple_audio_response (415-427)
  • watch_video_track (445-458)
  • LLM (49-405)
  • VideoLLM (437-458)
  • AudioLLM (408-434)
plugins/openai/vision_agents/plugins/openai/openai_realtime.py (4)
  • simple_response (108-127)
  • simple_audio_response (129-144)
  • watch_video_track (277-293)
  • Realtime (40-487)
🔇 Additional comments (16)
plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py (1)

412-414: LGTM: Correct migration to add_frame_handler API.

The shared forwarder subscription correctly uses the new add_frame_handler pattern with explicit fps and name parameters, replacing the previous start_event_consumer approach.

examples/01_simple_agent_example/simple_agent_example.py (2)

7-7: LGTM: Import cleanup is consistent.

The removal of vogent aligns with the elimination of vogent.TurnDetection() usage below.


29-29: Perfect. The web search confirms that Deepgram's Flux model is optimized for real-time conversation with built-in turn detection, and the model features first-of-its-kind model-integrated end-of-turn detection. However, this is specifically for the Flux model, not all Deepgram models.

The example file uses deepgram.STT() without specifying a model, so the default model would be used. The comment's claim about "built-in turn detection" is accurate for Flux but potentially misleading for other Deepgram models like Nova-3, which do not have integrated turn detection.

Now I have sufficient information to generate the rewritten review comment.

Clarify that turn detection is optional and model-dependent.

Deepgram's Flux model includes built-in turn detection, making the turn_detection parameter unnecessary when using Flux. However, the comment should clarify:

  1. This applies only when using Deepgram's Flux model (not the default Nova or other models)
  2. The Agent class properly supports optional turn_detection with Optional[TurnDetector] = None and includes defensive None checks throughout, so this change is safe
  3. Users switching to different STT providers should verify whether they need explicit turn detection

Consider updating the comment to specify: # turn_detection=smart_turn.TurnDetection() # Optional: Deepgram Flux has built-in turn detection; other STT providers may require this

agents-core/pyproject.toml (1)

94-94: LGTM! Local development dependency enabled.

This is a standard practice for local development against an editable stream-py dependency. No concerns for production since UV sources override dependencies only when the path exists.

agents-core/vision_agents/core/agents/agent_options.py (2)

1-20: LGTM! Good async-friendly pattern.

Caching tempfile.gettempdir() at module load time avoids blocking I/O during async operations—this is a thoughtful optimization.


9-16: LGTM! Clean merge-update pattern.

The update() method correctly merges options, overriding only non-None fields from other. This pattern supports partial updates and future optional fields.

plugins/heygen/vision_agents/plugins/heygen/heygen_video_track.py (2)

9-9: LGTM! Import updated to VideoLatestNQueue.

Consistent with the broader queue renaming across the codebase.


35-35: LGTM! Type annotation updated.

The type annotation correctly reflects the new VideoLatestNQueue class while preserving the maxlen=2 buffer for low-latency frame delivery.

agents-core/vision_agents/core/utils/video_queue.py (1)

6-28: LGTM! Clean class rename to VideoLatestNQueue.

The rename makes the purpose more explicit while preserving the complete API. The implementation remains unchanged and correct.

plugins/moondream/vision_agents/plugins/moondream/moondream_local_processor.py (2)

238-248: LGTM! Migrated to add_frame_handler API.

The switch from start_event_consumer to add_frame_handler aligns with the new VideoForwarder API. The handler registration is clean, with proper FPS throttling and a descriptive name.


249-263: LGTM! Own forwarder setup is correct.

The VideoForwarder instantiation and frame handler registration follow the new pattern correctly. The handler inherits FPS from the forwarder, and automatic start is handled by add_frame_handler.

plugins/openai/vision_agents/plugins/openai/rtc_manager.py (1)

293-296: LGTM! Clean migration to add_frame_handler.

The switch from start_event_consumer to add_frame_handler is consistent with the broader VideoForwarder API update. Proper FPS conversion and handler naming.

plugins/ultralytics/vision_agents/plugins/ultralytics/yolo_pose_processor.py (2)

60-60: LGTM! Processor name attribute added.

The name class attribute provides clear identification for this processor, aligning with the Processor protocol update mentioned in the PR summary.


121-128: LGTM! Migrated to add_frame_handler API.

The process_video method now uses the shared forwarder with add_frame_handler, consistent with the new VideoForwarder pattern. Proper FPS throttling and handler naming.

agents-core/vision_agents/core/utils/video_track.py (2)

7-7: LGTM! Import updated to VideoLatestNQueue.

Consistent with the queue renaming across the codebase.


20-20: LGTM! Type annotation updated.

The type annotation correctly reflects the new VideoLatestNQueue class with maxlen=10 buffer, appropriate for queued video delivery.

Comment on lines +907 to +909
async def _on_track_removed(self, track_id: str, track_type: int, participant: Participant):
self._active_video_tracks.pop(track_id)
await self._on_track_change(track_id)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Handle non-video removals and stop forwarders safely

_on_track_removed blindly pops the entry and never stops the forwarder. When the SFU notifies us about non-video tracks (or if the same video track is removed twice), pop() raises a KeyError, taking down the subscriber task. Even on the happy path, the forwarder keeps running after removal, leaking the producer/consumer tasks. Guard the lookup and stop the forwarder before returning.

-        self._active_video_tracks.pop(track_id)
-        await self._on_track_change(track_id)
+        track_info = self._active_video_tracks.pop(track_id, None)
+        if track_info is None:
+            return
+
+        await track_info.forwarder.stop()
+        await self._on_track_change(track_id)
🤖 Prompt for AI Agents
In agents-core/vision_agents/core/agents/agents.py around lines 907 to 909,
_on_track_removed currently pops the track unguarded and never stops the
forwarder; change it to first check whether the removed track is present and
relevant (e.g., only handle video track_type or membership in
self._active_video_tracks), retrieve the forwarder without using pop(), if found
await stopping the forwarder (wrap in try/except to swallow/log errors) and then
remove the entry from self._active_video_tracks (pop after stopping) before
calling await self._on_track_change(track_id); ensure the code never raises
KeyError if the key is missing or the track_type is non-video.

Comment on lines +912 to 924
# shared logic between track remove and added
# Select a track. Prioritize screenshare over regular
# This is the track without processing
non_processed_tracks = [t for t in self._active_video_tracks.values() if not t.processor]
source_track = sorted(non_processed_tracks, key=lambda t: t.priority, reverse=True)[0]
# assign the tracks that we last used so we can notify of changes...
self._active_source_track_id = source_track.id

track = _EvenDimensionsTrack(track) # type: ignore[arg-type]
await self._track_to_video_processors(source_track)

# Create a SHARED VideoForwarder for the RAW incoming track
# This prevents multiple recv() calls competing on the same track
raw_forwarder = VideoForwarder(
track, # type: ignore[arg-type]
max_buffer=30,
fps=30, # Max FPS for the producer (individual consumers can throttle down)
name=f"raw_video_forwarder_{track_id}",
)
await raw_forwarder.start()
self.logger.debug("🎥 Created raw VideoForwarder for track %s", track_id)
processed_track = sorted([t for t in self._active_video_tracks.values()], key=lambda t: t.priority, reverse=True)[0]
self._active_processed_track_id = processed_track.id

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Avoid IndexError when no eligible video tracks remain

After a removal, _active_video_tracks can be empty or contain only processed entries (e.g., the agent’s own published track). In those cases both sorted(...)[0] calls raise IndexError, breaking media handling right when tracks are renumbered. Guard for the empty cases and only select a source/processed track when candidates exist.

-        non_processed_tracks = [t for t in self._active_video_tracks.values() if not t.processor]
-        source_track = sorted(non_processed_tracks, key=lambda t: t.priority, reverse=True)[0]
-        self._active_source_track_id = source_track.id
-
-        await self._track_to_video_processors(source_track)
-
-        processed_track = sorted([t for t in self._active_video_tracks.values()], key=lambda t: t.priority, reverse=True)[0]
-        self._active_processed_track_id = processed_track.id
+        if not self._active_video_tracks:
+            self._active_source_track_id = None
+            self._active_processed_track_id = None
+            return
+
+        non_processed_tracks = [
+            t for t in self._active_video_tracks.values() if not t.processor
+        ]
+        source_track = (
+            max(non_processed_tracks, key=lambda t: t.priority) if non_processed_tracks else None
+        )
+        self._active_source_track_id = source_track.id if source_track else None
+        if source_track:
+            await self._track_to_video_processors(source_track)
+
+        processed_track = max(
+            self._active_video_tracks.values(), key=lambda t: t.priority
+        )
+        self._active_processed_track_id = processed_track.id
🤖 Prompt for AI Agents
In agents-core/vision_agents/core/agents/agents.py around lines 912 to 924,
selecting the first element of sorted() can raise IndexError when
_active_video_tracks has no eligible entries; guard the selections by checking
for emptiness before indexing: compute non_processed_tracks and only call
sorted(...)[0] (and await _track_to_video_processors) if that list is non-empty,
otherwise clear or set _active_source_track_id to None/appropriate sentinel and
skip the processor call; likewise, only set _active_processed_track_id if there
are processed candidates, otherwise set it to None; ensure no unguarded list[0]
access remains and add minimal logging if desired.



class Processor(Protocol):
name: str
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Implementations in this file lack the required name attribute.

Adding name: str to the Protocol establishes it as mandatory for all Processor implementations. However, AudioVideoProcessor (line 112), AudioLogger (line 144), and ImageCapture (line 164) in this file do not define this attribute, causing Protocol violations that will fail type checking.

Add the name attribute to each implementation. For example, update AudioVideoProcessor:

 class AudioVideoProcessor(Processor):
     def __init__(
         self,
+        name: str,
         interval: int = 3,
         receive_audio: bool = False,
         receive_video: bool = True,
         *args,
         **kwargs,
     ):
+        self.name = name
         self.interval = interval
         self.last_process_time = 0

And update AudioLogger:

 class AudioLogger(AudioVideoProcessor, AudioProcessorMixin):
-    def __init__(self, interval: int = 2):
-        super().__init__(interval, receive_audio=True, receive_video=False)
+    def __init__(self, name: str = "AudioLogger", interval: int = 2):
+        super().__init__(name=name, interval=interval, receive_audio=True, receive_video=False)
         self.audio_count = 0

Similarly for ImageCapture:

 class ImageCapture(AudioVideoProcessor, ImageProcessorMixin):
     def __init__(
         self,
+        name: str = "ImageCapture",
         output_dir: str = "captured_frames",
         interval: int = 3,
         *args,
         **kwargs,
     ):
-        super().__init__(interval=interval, receive_audio=False, receive_video=True)
+        super().__init__(name=name, interval=interval, receive_audio=False, receive_video=True)
         self.output_dir = Path(output_dir)

Verify all Processor implementations across the codebase have been updated:


🏁 Script executed:

#!/bin/bash
# Find all classes that might implement Processor and check for name attribute

# First, find all Python files that import or reference Processor
echo "=== Files referencing Processor ==="
rg -l "Processor" --type=py

echo -e "\n=== Classes that inherit from or implement Processor ==="
rg -n --type=py "class.*\(.*Processor.*\):" -A 10

echo -e "\n=== Checking for name attribute definitions ==="
# Look for classes that inherit Processor but may lack name attribute
rg -n --type=py -B 2 -A 15 "class.*\(.*Processor.*\):" | rg -C 5 "self\.name|name:"

Length of output: 11664


🏁 Script executed:

#!/bin/bash
# Extract the AudioVideoProcessor __init__ method and related code from base_processor.py
cat agents-core/vision_agents/core/processors/base_processor.py | sed -n '112,142p'

echo -e "\n=== AudioLogger __init__ ==="
cat agents-core/vision_agents/core/processors/base_processor.py | sed -n '144,161p'

echo -e "\n=== ImageCapture __init__ ==="
cat agents-core/vision_agents/core/processors/base_processor.py | sed -n '164,180p'

echo -e "\n=== Check external implementations for name parameter ==="
# Check AvatarPublisher
echo "AvatarPublisher __init__:"
rg -n -A 20 "class AvatarPublisher" plugins/heygen/vision_agents/plugins/heygen/heygen_avatar_publisher.py | head -30

echo -e "\nLocalDetectionProcessor __init__:"
rg -n -A 20 "class LocalDetectionProcessor" plugins/moondream/vision_agents/plugins/moondream/moondream_local_processor.py | head -30

echo -e "\nCloudDetectionProcessor __init__:"
rg -n -A 20 "class CloudDetectionProcessor" plugins/moondream/vision_agents/plugins/moondream/moondream_cloud_processor.py | head -30

Length of output: 5829


🏁 Script executed:

#!/bin/bash
# Deep dive: check if external implementations have name attribute defined anywhere

echo "=== YOLOPoseProcessor full init ==="
rg -n -A 30 "class YOLOPoseProcessor" plugins/ultralytics/vision_agents/plugins/ultralytics/yolo_pose_processor.py | head -50

echo -e "\n=== AvatarPublisher full init and class body ==="
rg -n -A 50 "class AvatarPublisher" plugins/heygen/vision_agents/plugins/heygen/heygen_avatar_publisher.py | grep -E "(name|def __init__|self\.name)" | head -20

echo -e "\n=== LocalDetectionProcessor full init ==="
rg -n -A 50 "class LocalDetectionProcessor" plugins/moondream/vision_agents/plugins/moondream/moondream_local_processor.py | grep -E "(name|def __init__|self\.name)" | head -20

echo -e "\n=== CloudDetectionProcessor full init ==="
rg -n -A 50 "class CloudDetectionProcessor" plugins/moondream/vision_agents/plugins/moondream/moondream_cloud_processor.py | grep -E "(name|def __init__|self\.name)" | head -20

Length of output: 1444


The three Processor implementations in this file lack the required name attribute, causing Protocol violations.

Adding name: str to the Processor Protocol at line 36 makes it mandatory for all implementations. However, AudioVideoProcessor (line 112), AudioLogger (line 144), and ImageCapture (line 164) do not define this attribute. While YOLOPoseProcessor in the ultralytics plugin has been updated with name = "yolo_pose", the base implementations remain non-compliant and will fail type checking.

Add name as a required attribute to each implementation:

 class AudioVideoProcessor(Processor):
     def __init__(
         self,
+        name: str,
         interval: int = 3,
         receive_audio: bool = False,
         receive_video: bool = True,
         *args,
         **kwargs,
     ):
+        self.name = name
         self.interval = interval
         self.last_process_time = 0
 class AudioLogger(AudioVideoProcessor, AudioProcessorMixin):
-    def __init__(self, interval: int = 2):
-        super().__init__(interval, receive_audio=True, receive_video=False)
+    def __init__(self, name: str = "AudioLogger", interval: int = 2):
+        super().__init__(name=name, interval=interval, receive_audio=True, receive_video=False)
         self.audio_count = 0
 class ImageCapture(AudioVideoProcessor, ImageProcessorMixin):
     def __init__(
         self,
+        name: str = "ImageCapture",
         output_dir: str = "captured_frames",
         interval: int = 3,
         *args,
         **kwargs,
     ):
-        super().__init__(interval=interval, receive_audio=False, receive_video=True)
+        super().__init__(name=name, interval=interval, receive_audio=False, receive_video=True)
         self.output_dir = Path(output_dir)

External implementations (AvatarPublisher, LocalDetectionProcessor, CloudDetectionProcessor) also require this attribute for full Protocol compliance.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In agents-core/vision_agents/core/processors/base_processor.py around line 36
and the implementations at lines ~112, ~144, and ~164, the Processor Protocol
now requires a name: str attribute but the three base implementations
(AudioVideoProcessor, AudioLogger, ImageCapture) and external implementations
(AvatarPublisher, LocalDetectionProcessor, CloudDetectionProcessor) do not
define it; add a class-level attribute name: str to each processor
implementation with an appropriate literal value (e.g., name = "audio_video",
name = "audio_logger", name = "image_capture" or other project-consistent
identifiers) so they satisfy the Protocol and pass type checking.

Comment on lines +123 to 127
self._video_forwarder.add_frame_handler(
self._process_and_add_frame,
fps=float(self.fps),
consumer_name="moondream"
name="moondream"
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't cast fps to float when registering the handler.

float(self.fps) raises TypeError as soon as self.fps is None, breaking the shared-forwarder path that previously accepted unlimited FPS (and matches the VideoForwarder signature of Optional[float]). Pass through the original value instead.

Apply this diff:

-            self._video_forwarder.add_frame_handler(
-                self._process_and_add_frame,
-                fps=float(self.fps),
-                name="moondream"
-            )
+            self._video_forwarder.add_frame_handler(
+                self._process_and_add_frame,
+                fps=self.fps,
+                name="moondream",
+            )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self._video_forwarder.add_frame_handler(
self._process_and_add_frame,
fps=float(self.fps),
consumer_name="moondream"
name="moondream"
)
self._video_forwarder.add_frame_handler(
self._process_and_add_frame,
fps=self.fps,
name="moondream",
)
🤖 Prompt for AI Agents
In
plugins/moondream/vision_agents/plugins/moondream/moondream_cloud_processor.py
around lines 123 to 127, the handler registration casts fps via float(self.fps)
which raises TypeError when self.fps is None; change the call to pass self.fps
directly (i.e., fps=self.fps) so the optional None is preserved and matches the
VideoForwarder signature Optional[float].

Comment on lines 10 to 11
@pytest.mark.asyncio
async def test_basic_put_get(self):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Drop @pytest.mark.asyncio decorators.

The testing guidelines here explicitly forbid @pytest.mark.asyncio; async support is provided automatically, so these new decorators violate policy. Please remove them across this file. As per coding guidelines.

Apply this diff:

-    @pytest.mark.asyncio
     async def test_basic_put_get(self):
@@
-    @pytest.mark.asyncio
     async def test_put_latest_discards_oldest(self):
@@
-    @pytest.mark.asyncio
     async def test_put_latest_nowait(self):
@@
-    @pytest.mark.asyncio
     async def test_put_latest_nowait_discards_oldest(self):
@@
-    @pytest.mark.asyncio
     async def test_queue_size_limits(self):
@@
-    @pytest.mark.asyncio
     async def test_generic_type_support(self):
@@
-    @pytest.mark.asyncio
     async def test_multiple_handlers_with_different_fps(self, bunny_video_track):
@@
-    @pytest.mark.asyncio
     async def test_handler_fps_exceeds_forwarder_fps_raises_error(self, bunny_video_track):
@@
-    @pytest.mark.asyncio
     async def test_stop_stops_handlers(self, bunny_video_track):
@@
-    @pytest.mark.asyncio
     async def test_add_and_remove_handlers(self, bunny_video_track):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@pytest.mark.asyncio
async def test_basic_put_get(self):
async def test_basic_put_get(self):
🤖 Prompt for AI Agents
In tests/test_queue_and_video_forwarder.py around lines 10 to 11, the test
function is annotated with @pytest.mark.asyncio which violates the repository
testing guidelines because async support is provided automatically; remove the
@pytest.mark.asyncio decorator from this function and any other occurrences in
this file so the tests rely on the built-in async support instead of the
explicit marker.

@tschellenbach tschellenbach merged commit e6c322f into main Nov 7, 2025
1 of 6 checks passed
@tschellenbach tschellenbach deleted the agents_clean branch November 7, 2025 18:43
dangusev pushed a commit that referenced this pull request Nov 10, 2025
* first bit of cleanup

* audio queue cleanup

* more cleanup

* more event cleanup

* cleanup

* fixes

* closer

* fix audio

* wip on video forwarder cleanup

* clean tests for forwarder

* cleanup integrations of forwarder

* remove logging

* typing cleanup
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants