-
Notifications
You must be signed in to change notification settings - Fork 570
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Debounce transcript processing 300ms for controlling event loop better #1211
Conversation
WalkthroughThe changes in this pull request introduce a new WebSocket utility function, Changes
Possibly related PRs
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
04ee1ed
to
5709bbb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
🧹 Outside diff range and nitpick comments (6)
backend/utils/pusher.py (1)
Line range hint 1-11
: Add missing websockets import
The websockets
module is used but not explicitly imported. Add the import to prevent potential runtime errors.
Add this import at the top of the file:
import uuid
import os
from datetime import datetime, timezone, timedelta
from enum import Enum
+import websockets
import opuslib
🧰 Tools
🪛 Ruff
53-53: websockets
may be undefined, or defined from star imports
(F405)
backend/utils/webhooks.py (3)
Line range hint 62-78
: Critical: Implement debouncing and use async HTTP client.
Several issues need to be addressed:
- The PR objective mentions debouncing transcript processing (300ms), but no debouncing logic is implemented.
- Using synchronous
requests.post
in an async function can block the event loop.
Consider this implementation:
import aiohttp
from functools import partial
from async_timeout import timeout
from typing import List, Dict, Any
def debounce(wait_time: float):
"""Decorator to debounce a function"""
def decorator(fn):
task = None
async def debounced(*args, **kwargs):
nonlocal task
if task is not None:
task.cancel()
task = asyncio.create_task(
_delayed_call(wait_time, fn, *args, **kwargs)
)
return debounced
return decorator
async def _delayed_call(wait_time: float, fn, *args, **kwargs):
await asyncio.sleep(wait_time)
return await fn(*args, **kwargs)
@debounce(wait_time=0.3) # 300ms debounce
async def realtime_transcript_webhook(uid: str, segments: List[dict]):
if not (toggled := user_webhook_status_db(uid, WebhookType.realtime_transcript)):
return
if not (webhook_url := get_user_webhook_db(uid, WebhookType.realtime_transcript)):
return
webhook_url = f'{webhook_url}?uid={uid}'
async with aiohttp.ClientSession() as session:
try:
async with timeout(15):
async with session.post(
webhook_url,
json={'segments': segments, 'session_id': uid},
headers={'Content-Type': 'application/json'}
) as response:
logging.info(
'Realtime transcript webhook response [url=%s]: %d',
webhook_url,
response.status
)
except Exception as e:
logging.error(
'Realtime transcript webhook failed [url=%s]: %s',
webhook_url,
str(e)
)
Line range hint 1-150
: Consider implementing robust webhook management system.
The current webhook implementation could benefit from several architectural improvements:
- Webhook Manager class to handle different webhook types consistently
- Circuit breaker pattern to handle failing webhooks
- Rate limiting to prevent overwhelming webhook endpoints
- Retry mechanism with exponential backoff
- Webhook health monitoring and automatic disable/enable
This would improve reliability and maintainability of the webhook system.
Example architecture:
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Callable, Any
import asyncio
from datetime import datetime, timedelta
@dataclass
class WebhookConfig:
timeout: int
max_retries: int
backoff_factor: float
circuit_breaker_threshold: int
rate_limit_per_minute: int
class WebhookManager:
def __init__(self):
self._configs: Dict[WebhookType, WebhookConfig] = {}
self._circuit_breakers: Dict[str, CircuitBreaker] = {}
self._rate_limiters: Dict[str, RateLimiter] = {}
async def send_webhook(
self,
webhook_type: WebhookType,
url: str,
payload: Any,
retry_strategy: Optional[RetryStrategy] = None
) -> None:
# Implementation details...
Would you like me to create a detailed proposal for this architecture improvement?
Async HTTP client and thread safety improvements needed
The codebase scan reveals several issues that confirm the review comment's concerns:
-
The function uses synchronous
requests
library despite being marked asasync
, which can block the event loop. All webhook-related code usesrequests
instead of an async HTTP client likeaiohttp
. -
The TODO comment's concerns are valid:
- No locking mechanism exists for thread safety when handling audio bytes
- No segmentation of large audio payloads
- No URL validation for webhook endpoints
-
The function is called from multiple places in
backend/routers/pusher.py
using bothasyncio.create_task()
andasyncio.run_coroutine_threadsafe()
, making thread safety even more critical.
🔗 Analysis chain
Line range hint 98-113
: Address TODO comment and async implementation.
The TODO comment mentions several important issues that should be addressed:
- Adding a lock for thread safety
- Sending shorter segments
- Validating regex
Additionally, like the transcript webhook, this function should use an async HTTP client.
Would you like me to help implement these improvements and create a GitHub issue to track them?
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check if there are any other TODOs in the codebase related to webhooks
rg -i "todo.*webhook" --type py
Length of output: 33
Script:
#!/bin/bash
# Let's check for webhook-related code and async patterns
rg -A 5 "async.*webhook" --type py
# Also check for any webhook-related functions
rg -A 5 "def.*webhook" --type py
# Check for any webhook imports and HTTP client usage
rg "import.*requests|import.*aiohttp" --type py
Length of output: 11697
backend/routers/transcribe_v2.py (2)
Line range hint 2-24
: Add missing imports for asyncio
, time
, os
, json
, and websockets
modules
Several modules used in the code are not imported, which could lead to NameError
exceptions at runtime. Please ensure all used modules are properly imported.
Apply this diff to add the missing imports:
import uuid
import struct
+import asyncio
+import time
+import os
+import json
+import websockets
from datetime import datetime, timezone, timedelta
from enum import Enum
Line range hint 417-467
: Consider refactoring receive_audio
function for better readability
The receive_audio
function is quite lengthy and contains multiple nested conditional statements. Refactoring it into smaller helper functions or restructuring the logic can improve readability and maintainability.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (4)
- backend/routers/pusher.py (4 hunks)
- backend/routers/transcribe_v2.py (18 hunks)
- backend/utils/pusher.py (1 hunks)
- backend/utils/webhooks.py (4 hunks)
🧰 Additional context used
🪛 Ruff
backend/routers/pusher.py
211-211: asyncio
may be undefined, or defined from star imports
(F405)
230-230: json
may be undefined, or defined from star imports
(F405)
231-231: asyncio
may be undefined, or defined from star imports
(F405)
232-232: asyncio
may be undefined, or defined from star imports
(F405)
240-240: asyncio
may be undefined, or defined from star imports
(F405)
258-258: asyncio
may be undefined, or defined from star imports
(F405)
272-272: asyncio
may be undefined, or defined from star imports
(F405)
275-275: asyncio
may be undefined, or defined from star imports
(F405)
276-276: asyncio
may be undefined, or defined from star imports
(F405)
backend/routers/transcribe_v2.py
24-24: utils.pusher.connect_to_transcript_pusher
imported but unused
Remove unused import
(F401)
24-24: utils.pusher.connect_to_audio_bytes_pusher
imported but unused
Remove unused import
(F401)
115-115: asyncio
may be undefined, or defined from star imports
(F405)
120-120: f-string without any placeholders
Remove extraneous f
prefix
(F541)
160-160: asyncio
may be undefined, or defined from star imports
(F405)
237-237: asyncio
may be undefined, or defined from star imports
(F405)
256-256: get_profile_audio_if_exists
may be undefined, or defined from star imports
(F405)
261-261: process_audio_dg
may be undefined, or defined from star imports
(F405)
265-265: process_audio_dg
may be undefined, or defined from star imports
(F405)
270-270: send_initial_file_path
may be undefined, or defined from star imports
(F405)
273-273: process_audio_soniox
may be undefined, or defined from star imports
(F405)
279-279: process_audio_speechmatics
may be undefined, or defined from star imports
(F405)
283-283: send_initial_file_path
may be undefined, or defined from star imports
(F405)
313-313: asyncio
may be undefined, or defined from star imports
(F405)
319-319: json
may be undefined, or defined from star imports
(F405)
322-322: websockets
may be undefined, or defined from star imports
(F405)
342-342: asyncio
may be undefined, or defined from star imports
(F405)
351-351: websockets
may be undefined, or defined from star imports
(F405)
372-372: asyncio
may be undefined, or defined from star imports
(F405)
514-514: asyncio
may be undefined, or defined from star imports
(F405)
515-515: asyncio
may be undefined, or defined from star imports
(F405)
518-518: asyncio
may be undefined, or defined from star imports
(F405)
520-520: asyncio
may be undefined, or defined from star imports
(F405)
523-523: asyncio
may be undefined, or defined from star imports
(F405)
backend/utils/pusher.py
53-53: websockets
may be undefined, or defined from star imports
(F405)
🔇 Additional comments (3)
backend/utils/pusher.py (1)
49-58
: Verify integration with debounce mechanism
The PR's objective is to implement a 300ms debounce for transcript processing, but this implementation doesn't show how it integrates with that mechanism. Please ensure this new trigger connection supports the debounce functionality.
Let's check for debounce-related code in the codebase:
🧰 Tools
🪛 Ruff
53-53: websockets
may be undefined, or defined from star imports
(F405)
backend/routers/pusher.py (1)
196-293
: Ensure consistent exception handling and resource cleanup in WebSocket utility
While the code handles exceptions and attempts to close the WebSocket connection properly, ensure that all exceptions are caught and resources are cleaned up to prevent potential memory leaks or dangling connections.
[approve]
To verify, you can review the exception handling blocks to make sure all possible exceptions are accounted for and the websocket.close()
method is called in all exit paths.
🧰 Tools
🪛 Ruff
211-211: asyncio
may be undefined, or defined from star imports
(F405)
230-230: json
may be undefined, or defined from star imports
(F405)
231-231: asyncio
may be undefined, or defined from star imports
(F405)
232-232: asyncio
may be undefined, or defined from star imports
(F405)
240-240: asyncio
may be undefined, or defined from star imports
(F405)
258-258: asyncio
may be undefined, or defined from star imports
(F405)
272-272: asyncio
may be undefined, or defined from star imports
(F405)
275-275: asyncio
may be undefined, or defined from star imports
(F405)
276-276: asyncio
may be undefined, or defined from star imports
(F405)
backend/routers/transcribe_v2.py (1)
23-23
: 🛠️ Refactor suggestion
Replace wildcard import with explicit imports from utils.stt.streaming
Using wildcard imports (from module import *
) is discouraged as it can lead to namespace pollution and reduce code readability. Import specific functions or classes instead.
Apply this diff to import specific functions:
-from utils.stt.streaming import *
+from utils.stt.streaming import (
+ get_profile_audio_if_exists,
+ process_audio_dg,
+ process_audio_soniox,
+ process_audio_speechmatics,
+ send_initial_file_path,
+)
Likely invalid or redundant comment.
async def connect_to_trigger_pusher(uid: str, sample_rate: int = 8000): | ||
try: | ||
print("Connecting to Pusher transcripts trigger WebSocket...") | ||
ws_host = PusherAPI.replace("http", "ws") | ||
socket = await websockets.connect(f"{ws_host}/v1/trigger/listen?uid={uid}&sample_rate={sample_rate}") | ||
print("Connected to Pusher transcripts trigger WebSocket.") | ||
return socket | ||
except Exception as e: | ||
print(f"Exception in connect_to_transcript_pusher: {e}") | ||
raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix incorrect log messages
The log messages and error handling reference "transcripts trigger" instead of just "trigger", and the error message references the wrong function name.
Apply these corrections:
async def connect_to_trigger_pusher(uid: str, sample_rate: int = 8000):
try:
- print("Connecting to Pusher transcripts trigger WebSocket...")
+ print("Connecting to Pusher trigger WebSocket...")
ws_host = PusherAPI.replace("http", "ws")
socket = await websockets.connect(f"{ws_host}/v1/trigger/listen?uid={uid}&sample_rate={sample_rate}")
- print("Connected to Pusher transcripts trigger WebSocket.")
+ print("Connected to Pusher trigger WebSocket.")
return socket
except Exception as e:
- print(f"Exception in connect_to_transcript_pusher: {e}")
+ print(f"Exception in connect_to_trigger_pusher: {e}")
raise
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async def connect_to_trigger_pusher(uid: str, sample_rate: int = 8000): | |
try: | |
print("Connecting to Pusher transcripts trigger WebSocket...") | |
ws_host = PusherAPI.replace("http", "ws") | |
socket = await websockets.connect(f"{ws_host}/v1/trigger/listen?uid={uid}&sample_rate={sample_rate}") | |
print("Connected to Pusher transcripts trigger WebSocket.") | |
return socket | |
except Exception as e: | |
print(f"Exception in connect_to_transcript_pusher: {e}") | |
raise | |
async def connect_to_trigger_pusher(uid: str, sample_rate: int = 8000): | |
try: | |
print("Connecting to Pusher trigger WebSocket...") | |
ws_host = PusherAPI.replace("http", "ws") | |
socket = await websockets.connect(f"{ws_host}/v1/trigger/listen?uid={uid}&sample_rate={sample_rate}") | |
print("Connected to Pusher trigger WebSocket.") | |
return socket | |
except Exception as e: | |
print(f"Exception in connect_to_trigger_pusher: {e}") | |
raise |
🧰 Tools
🪛 Ruff
53-53: websockets
may be undefined, or defined from star imports
(F405)
try: | ||
response = requests.post( | ||
webhook_url, | ||
json={ | ||
'summary': summary, | ||
'uid': uid, | ||
'created_at': datetime.now().isoformat() | ||
}, | ||
headers={'Content-Type': 'application/json'}, | ||
timeout=30, | ||
) | ||
print('day_summary_webhook:', webhook_url, response.status_code) | ||
except Exception as e: | ||
print(f"Error sending day summary to developer webhook: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider extracting common webhook logic to reduce duplication.
The webhook implementations share similar patterns for request handling, timeouts, and error logging. Consider extracting this to a common utility function.
async def _send_webhook(webhook_url: str, payload: dict, timeout: int = 10) -> None:
try:
response = requests.post(
webhook_url,
json=payload,
headers={'Content-Type': 'application/json'},
timeout=timeout,
)
logging.info('Webhook response [url=%s]: %d', webhook_url, response.status_code)
except Exception as e:
logging.error('Webhook failed [url=%s]: %s', webhook_url, str(e))
# Usage example:
def day_summary_webhook(uid, summary: str):
if not (toggled := user_webhook_status_db(uid, WebhookType.day_summary)):
return
if not (webhook_url := get_user_webhook_db(uid, WebhookType.day_summary)):
return
await _send_webhook(
f'{webhook_url}?uid={uid}',
{
'summary': summary,
'uid': uid,
'created_at': datetime.now().isoformat()
}
)
try: | ||
response = requests.post( | ||
webhook_url, | ||
json=memory.as_dict_cleaned_dates(), | ||
headers={'Content-Type': 'application/json'}, | ||
timeout=30, | ||
) | ||
print('memory_created_webhook:', webhook_url, response.status_code) | ||
except Exception as e: | ||
print(f"Error sending memory created to developer webhook: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider reducing timeout and improving error logging.
While the error handling is a good addition, there are a few improvements to consider:
- 30 seconds is a long timeout for a webhook. Consider reducing it to 5-10 seconds to fail fast.
- Replace print statements with proper logging.
+import logging
+
def memory_created_webhook(uid, memory: Memory):
# ...
try:
response = requests.post(
webhook_url,
json=memory.as_dict_cleaned_dates(),
headers={'Content-Type': 'application/json'},
- timeout=30,
+ timeout=10,
)
- print('memory_created_webhook:', webhook_url, response.status_code)
+ logging.info('Memory webhook response [url=%s]: %d', webhook_url, response.status_code)
except Exception as e:
- print(f"Error sending memory created to developer webhook: {e}")
+ logging.error('Memory webhook failed [url=%s]: %s', webhook_url, str(e))
Committable suggestion was skipped due to low confidence.
data = await websocket.receive_bytes() | ||
header_type = struct.unpack('<I', data[:4])[0] | ||
|
||
# Transcript |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add data length check before unpacking to prevent exceptions
When receiving data from the WebSocket, there is a potential for struct.unpack
to raise an exception if data
is less than 4 bytes in length. This can happen if the client sends insufficient data or disconnects unexpectedly.
Apply this diff to add a check before unpacking:
while websocket_active:
data = await websocket.receive_bytes()
+ if len(data) < 4:
+ print("Received data too short to contain header.")
+ continue
header_type = struct.unpack('<I', data[:4])[0]
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
data = await websocket.receive_bytes() | |
header_type = struct.unpack('<I', data[:4])[0] | |
# Transcript | |
data = await websocket.receive_bytes() | |
if len(data) < 4: | |
print("Received data too short to contain header.") | |
continue | |
header_type = struct.unpack('<I', data[:4])[0] | |
# Transcript |
@@ -1,4 +1,5 @@ | |||
import uuid | |||
import struct |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import 'asyncio' and 'json' modules to prevent NameError
The modules asyncio
and json
are used in the code but are not imported. This will result in a NameError
when the code is executed.
Apply this diff to fix the missing imports:
import uuid
import struct
+import asyncio
+import json
from datetime import datetime, timezone, timedelta
from enum import Enum
Committable suggestion was skipped due to low confidence.
async def send_heartbeat(): | ||
nonlocal websocket_active | ||
nonlocal websocket_close_code | ||
try: | ||
while websocket_active: | ||
await asyncio.sleep(20) | ||
if websocket.client_state == WebSocketState.CONNECTED: | ||
await websocket.send_json({"type": "ping"}) | ||
else: | ||
break | ||
except WebSocketDisconnect: | ||
print("WebSocket disconnected") | ||
except Exception as e: | ||
print(f'Heartbeat error: {e}') | ||
websocket_close_code = 1011 | ||
finally: | ||
websocket_active = False | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Refactor duplicated send_heartbeat
functions to improve maintainability
The send_heartbeat
function is duplicated across multiple WebSocket utility functions (_websocket_util_transcript
, _websocket_util_audio_bytes
, and _websocket_util_trigger
). Refactoring this into a single function will reduce code duplication and improve maintainability.
Consider defining a shared send_heartbeat
function outside the utility functions:
+async def send_heartbeat(websocket, websocket_active, websocket_close_code):
+ try:
+ while websocket_active:
+ await asyncio.sleep(20)
+ if websocket.client_state == WebSocketState.CONNECTED:
+ await websocket.send_json({"type": "ping"})
+ else:
+ break
+ except WebSocketDisconnect:
+ print("WebSocket disconnected")
+ except Exception as e:
+ print(f'Heartbeat error: {e}')
+ websocket_close_code = 1011
+ finally:
+ websocket_active = False
And then, update your utility functions to use this shared function:
heartbeat_task = asyncio.create_task(
- send_heartbeat()
+ send_heartbeat(websocket, websocket_active, websocket_close_code)
)
Committable suggestion was skipped due to low confidence.
🧰 Tools
🪛 Ruff
258-258: asyncio
may be undefined, or defined from star imports
(F405)
try: | ||
await asyncio.sleep(delay_seconds) | ||
|
||
# recheck session | ||
memory = retrieve_in_progress_memory(uid) | ||
if not memory or memory['finished_at'] > finished_at: | ||
print(f"_trigger_create_memory_with_delay not memory or not last session") | ||
print(f"_trigger_create_memory_with_delay not memory or not last session", uid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove unnecessary f
prefix from the print statement
The print statement is an f-string but does not contain any placeholders. Remove the f
prefix to avoid confusion.
Apply this diff:
- print(f"_trigger_create_memory_with_delay not memory or not last session", uid)
+ print("_trigger_create_memory_with_delay not memory or not last session", uid)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
print(f"_trigger_create_memory_with_delay not memory or not last session", uid) | |
print("_trigger_create_memory_with_delay not memory or not last session", uid) |
🧰 Tools
🪛 Ruff
120-120: f-string without any placeholders
Remove extraneous f
prefix
(F541)
@@ -20,7 +21,7 @@ | |||
from utils.stt.streaming import * | |||
from utils.webhooks import send_audio_bytes_developer_webhook, realtime_transcript_webhook, \ | |||
get_audio_bytes_webhook_seconds | |||
from utils.pusher import connect_to_transcript_pusher, connect_to_audio_bytes_pusher | |||
from utils.pusher import connect_to_transcript_pusher, connect_to_audio_bytes_pusher, connect_to_trigger_pusher |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove unused imports connect_to_transcript_pusher
and connect_to_audio_bytes_pusher
These imported functions are not used in the code and can be removed to clean up the import statements.
Apply this diff:
-from utils.pusher import connect_to_transcript_pusher, connect_to_audio_bytes_pusher, connect_to_trigger_pusher
+from utils.pusher import connect_to_trigger_pusher
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
from utils.pusher import connect_to_transcript_pusher, connect_to_audio_bytes_pusher, connect_to_trigger_pusher | |
from utils.pusher import connect_to_trigger_pusher |
🧰 Tools
🪛 Ruff
24-24: utils.pusher.connect_to_transcript_pusher
imported but unused
Remove unused import
(F401)
24-24: utils.pusher.connect_to_audio_bytes_pusher
imported but unused
Remove unused import
(F401)
Issue: #1194
Summary by CodeRabbit
New Features
Bug Fixes
Refactor