Skip to content

Conversation

@d3xvn
Copy link
Contributor

@d3xvn d3xvn commented Oct 7, 2025

  • Remove duplicate track publishing and audio/video listening code in join() method
  • Initialize timeout_errors and consecutive_errors before video processing loop
  • Increment timeout_errors in TimeoutError exception handler
  • Fixes potential crash when error counters are referenced but not initialized

Summary by CodeRabbit

  • Bug Fixes
    • Prevented duplicate media publishing during session join, reducing setup glitches and unexpected behavior.
  • Refactor
    • Improved video processing resilience with adaptive timeout handling and exponential backoff, leading to smoother recovery from network hiccups and fewer interruptions.
    • Enhanced internal error tracking to reset after successful processing, improving overall stream stability.

…ters

- Remove duplicate track publishing and audio/video listening code in join() method
- Initialize timeout_errors and consecutive_errors before video processing loop
- Increment timeout_errors in TimeoutError exception handler
- Fixes potential crash when error counters are referenced but not initialized
@coderabbitai
Copy link

coderabbitai bot commented Oct 7, 2025

Walkthrough

Consolidated duplicate track publishing in join to a single invocation, then proceeds to listening. In _process_track, added timeout and consecutive error counters, integrated exponential backoff on TimeoutError, and reset counters upon successful frame processing. Control flow now tracks and logs timeouts within the processing loop.

Changes

Cohort / File(s) Summary
Join flow and track processing
agents-core/vision_agents/core/agents/agents.py
Removed duplicate publish in join; added timeout_errors and consecutive_errors; implemented exponential backoff on timeouts with counter-based delay; reset timeout counter on success; updated related debug logging and control flow within the processing loop.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Client
  participant Agent
  participant MediaTracks

  Client->>Agent: join()
  activate Agent
  Agent->>MediaTracks: publish_tracks()
  Note over Agent,MediaTracks: Single publish (duplicate removed)
  Agent-->>Client: start listening
  deactivate Agent
Loading
sequenceDiagram
  autonumber
  participant Processor as _process_track
  participant Source as VideoSource

  loop For each frame
    Processor->>Source: get_next_frame(timeout)
    alt Frame received
      Source-->>Processor: frame
      Note right of Processor: Process frame<br/>timeout_errors = 0
    else TimeoutError
      Source-->>Processor: timeout
      Note right of Processor: timeout_errors += 1<br/>delay = base * 2^(timeout_errors-1)
      Processor->>Processor: sleep(delay)
    end
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

The room holds its breath: one publish, not two—
A single bell in the skull’s cathedral.
Frames arrive like moths, or don’t; the dark refuses.
I count the silences, doubling their hush,
nursing the clock’s milk till it forgives.
Then—the image blooms, and error forgets my name.

Pre-merge checks

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title explicitly identifies the primary change—removing duplicate track publishing code—from the join() method and is concise, clear, and directly tied to the summary of changes without extraneous information.

Note

Free review on us!

CodeRabbit is offering free reviews until Wed Oct 08 2025 to showcase some of the refinements we've made.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
agents-core/vision_agents/core/agents/agents.py (1)

669-669: Unreachable code at Line 669.

This log statement is unreachable. The while True loop at line 627 only exits via:

  • Line 663: return (on CancelledError)
  • Line 666: raise (on other exceptions)

Line 669 will never execute.

Apply this diff to remove the unreachable code:

             except Exception:
                 raise
-
-        # Cleanup and logging
-        self.logger.info(f"🎥VDP: Video processing loop ended for track {track_id} - timeouts: {timeout_errors}, consecutive_errors: {consecutive_errors}")
🧹 Nitpick comments (1)
agents-core/vision_agents/core/agents/agents.py (1)

624-625: Consider removing unused consecutive_errors counter.

consecutive_errors is incremented on line 652 but never used for control flow or backoff logic—unlike timeout_errors. It was only logged in the now-unreachable line 669.

If there's no planned logic for consecutive_errors, remove it:

-        # Initialize error tracking counters
         timeout_errors = 0
-        consecutive_errors = 0
         
         while True:
             try:
                 # Use the shared forwarder instead of competing for track.recv()
                 video_frame = await shared_forwarder.next_frame(timeout=2.0)
 
                 if video_frame:
                     # Reset error counts on successful frame processing
                     timeout_errors = 0
-                    consecutive_errors = 0
 
                     if hasImageProcessers:
 
                         img = video_frame.to_image()
 
                         for processor in self.image_processors:
                             try:
                                 await processor.process_image(img, participant.user_id)
                             except Exception as e:
                                 self.logger.error(
                                     f"Error in image processor {type(processor).__name__}: {e}"
                                 )
 
 
                 else:
                     self.logger.warning("🎥VDP: Received empty frame")
-                    consecutive_errors += 1

Alternatively, implement logic that uses consecutive_errors (e.g., exit the loop or skip processing after N consecutive empty frames).

Also applies to: 652-652

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2d08f1d and 4f60ab2.

📒 Files selected for processing (1)
  • agents-core/vision_agents/core/agents/agents.py (2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

📄 CodeRabbit inference engine (.cursor/rules/python.mdc)

**/*.py: Never adjust sys.path (e.g., sys.path.append/insert/assignment)
Docstrings must follow the Google style guide

Files:

  • agents-core/vision_agents/core/agents/agents.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: unit / Ruff & mypy
  • GitHub Check: unit / Test "not integration"
🔇 Additional comments (3)
agents-core/vision_agents/core/agents/agents.py (3)

240-252: LGTM! Clean track publishing flow.

The track publishing and listening logic is now consolidated properly. Publishing occurs once before setting up listeners, eliminating the duplication issue described in the PR objectives.


623-626: Proper initialization prevents crash.

Initializing timeout_errors and consecutive_errors before the processing loop correctly addresses the potential crash when these counters are referenced in exception handlers.


656-661: Timeout handling and backoff logic is correct.

The timeout_errors increment and exponential backoff calculation work as intended, capping at 30 seconds to prevent excessive delays.

@d3xvn d3xvn merged commit 9b5db80 into main Oct 7, 2025
5 checks passed
@d3xvn d3xvn deleted the fix/agents-tracks branch October 7, 2025 12:40
Nash0x7E2 added a commit that referenced this pull request Oct 8, 2025
commit 4757845
Merge: 8d9a9e2 c834231
Author: Thierry Schellenbach <thierry@getstream.io>
Date:   Wed Oct 8 10:29:01 2025 +0200

    Merge branch 'main' of github.com:GetStream/agents

commit 8d9a9e2
Author: Thierry Schellenbach <thierry@getstream.io>
Date:   Wed Oct 8 10:28:54 2025 +0200

    move fal smart detection to plugin

commit c834231
Merge: b6deb4d facedf2
Author: maxkahan <max.kahan@getstream.io>
Date:   Wed Oct 8 10:17:22 2025 +0200

    Merge pull request #73 from GetStream/fix/shared_forwarder

    fix: video feed mismatch and VideoForwarder resource leaks

commit b6deb4d
Author: Neevash Ramdial (Nash) <mail@neevash.dev>
Date:   Wed Oct 8 09:38:51 2025 +0200

    Add CI secrets  (#72)

    * Add in secrets for daily integration

    * Rename to realtime instead of realtime 2

    * Add events.wait to xAI test

commit 73ddc8e
Author: Thierry Schellenbach <thierry@getstream.io>
Date:   Tue Oct 7 17:27:40 2025 +0200

    pyproject cleanup

commit facedf2
Author: Deven Joshi <deven9852@gmail.com>
Date:   Tue Oct 7 17:26:11 2025 +0200

    fix: critical video feed mismatch and VideoForwarder resource leaks

    CRITICAL FIXES:

    1. Video Feed Mismatch (LLM getting wrong video)
       - When YOLO/video processors are used, LLM was receiving empty processed track
       - Root cause: shared_forwarder was created from RAW track but LLM was given processed track
       - Fix: Create separate forwarders for raw and processed video tracks
       - Now LLM correctly receives YOLO-annotated frames when using pose detection

    2. VideoForwarder Resource Leaks
       - Consumer tasks were never removed from _tasks set (memory leak)
       - Fix: Add task.add_done_callback(self._task_done) to clean up tasks
       - Producer exceptions were silently swallowed
       - Fix: Log and re-raise exceptions for proper error handling

    3. Race Condition in VideoForwarder.stop()
       - Used list() snapshot for cancellation but original set for gather()
       - Fix: Use tasks_snapshot consistently throughout stop()

    4. Multiple start() Protection
       - No guard against calling start() multiple times
       - Fix: Add _started flag and early return with warning

    5. Missing VideoForwarder Cleanup in Agent
       - Forwarders were created but never stopped on agent.close()
       - Fix: Track all forwarders and stop them in close() method

    These fixes prevent resource leaks, ensure correct video routing, and improve
    error visibility for production debugging.

commit fbc1759
Author: Thierry Schellenbach <thierry@getstream.io>
Date:   Tue Oct 7 17:19:45 2025 +0200

    wip on pyproject files

commit 3739605
Author: Thierry Schellenbach <thierry@getstream.io>
Date:   Tue Oct 7 15:55:19 2025 +0200

    pypi environment

commit 6144265
Merge: 231efc8 9b5db80
Author: Thierry Schellenbach <thierry@getstream.io>
Date:   Tue Oct 7 15:17:09 2025 +0200

    cleanup

commit 231efc8
Author: Thierry Schellenbach <thierry@getstream.io>
Date:   Tue Oct 7 15:12:31 2025 +0200

    remove duplicate publish tracks

commit 9b5db80
Merge: 2d08f1d 4f60ab2
Author: Deven Joshi <deven9852@gmail.com>
Date:   Tue Oct 7 14:40:31 2025 +0200

    Merge pull request #71 from GetStream/fix/agents-tracks

    fix: remove duplicate track publishing code

commit 2d08f1d
Author: Thierry Schellenbach <thierry@getstream.io>
Date:   Tue Oct 7 14:30:01 2025 +0200

    fix openai realtime test

commit 4f60ab2
Author: Deven Joshi <deven9852@gmail.com>
Date:   Tue Oct 7 14:25:01 2025 +0200

    fix: remove duplicate track publishing code and initialize error counters

    - Remove duplicate track publishing and audio/video listening code in join() method
    - Initialize timeout_errors and consecutive_errors before video processing loop
    - Increment timeout_errors in TimeoutError exception handler
    - Fixes potential crash when error counters are referenced but not initialized

commit ca562de
Merge: 4b8f686 b121bc6
Author: Thierry Schellenbach <thierry@getstream.io>
Date:   Tue Oct 7 14:24:02 2025 +0200

    Merge branch 'main' of github.com:GetStream/agents

commit 4b8f686
Author: Thierry Schellenbach <thierry@getstream.io>
Date:   Tue Oct 7 14:23:54 2025 +0200

    nicer tests for openai realtime

commit b121bc6
Merge: 4a178e9 1bd131b
Author: Yarik <43354956+yarikdevcom@users.noreply.github.com>
Date:   Tue Oct 7 14:22:56 2025 +0200

    Merge pull request #69 from GetStream/yarikrudenok/ai-176-migrate-branding-to-vision-agents

    Refactor project structure to replace 'stream_agents' with 'vision_ag…

commit 1bd131b
Author: Yarik <yarik.rudenok@getstream.io>
Date:   Tue Oct 7 14:16:49 2025 +0200

    feat: [AI-176] Rename to vision

commit 4a178e9
Merge: a940bd3 2eacdfb
Author: maxkahan <max.kahan@getstream.io>
Date:   Tue Oct 7 11:50:28 2025 +0100

    Merge pull request #70 from GetStream/fix/agent-example

    fix: Agent Example and TURN detection

commit 2eacdfb
Author: Deven Joshi <deven9852@gmail.com>
Date:   Tue Oct 7 12:42:58 2025 +0200

    Fix: Remove f-string prefix from log with no placeholders

    - Fixed lint error F541 on line 797
    - Changed f-string to regular string since no interpolation needed

commit 66deea5
Author: Deven Joshi <deven9852@gmail.com>
Date:   Tue Oct 7 12:41:33 2025 +0200

    Move realtime mode check to top of _on_turn_event

    - Realtime LLMs handle their own turn detection and interruption
    - Skip all turn event processing in realtime mode (not just LLM triggering)
    - Removes duplicate realtime check in TurnEndedEvent branch
    - Cleaner and more efficient

commit 8c01c31
Author: Deven Joshi <deven9852@gmail.com>
Date:   Tue Oct 7 12:20:42 2025 +0200

    Optimize: Check realtime mode early in _on_turn_event TurnEndedEvent

    - Add early return for realtime mode after logging the event
    - Skips unnecessary transcript fetching and participant metadata extraction
    - Removes redundant realtime_mode check later in the flow
    - Consistent with _on_transcript optimization

commit f4fa0a5
Author: Deven Joshi <deven9852@gmail.com>
Date:   Tue Oct 7 12:18:09 2025 +0200

    Optimize: Check realtime mode early in _on_transcript

    - Add early return if in realtime mode to skip LLM triggering logic
    - In realtime mode, the LLM handles STT, turn detection, and responses itself
    - Removes redundant check in else branch
    - Improves code clarity and efficiency

commit 12b1638
Author: Deven Joshi <deven9852@gmail.com>
Date:   Tue Oct 7 10:48:07 2025 +0200

    Fix agent LLM triggering and turn detection

    - Implement automatic LLM triggering in _on_transcript() for both modes:
      * Without turn detection: triggers immediately on transcript completion
      * With turn detection: accumulates transcripts and waits for TurnEndedEvent
    - Add _pending_user_transcripts dict to track multi-chunk transcripts per user
    - Implement turn detection LLM response in _on_turn_event()
    - Add TTS interruption when user starts speaking (barge-in)
    - Fix FAL turn detection event emission logic
    - Fix double TTS triggering in OpenAI LLM plugin (was emitting LLMResponseCompletedEvent twice)
    - Add FAL turn detection to simple agent example
    - Update example dependencies to use vision-agents naming

    Known limitation: LLM response generation is not yet cancelled when user interrupts.
    Only TTS audio playback stops, but LLM continues generating in background.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants