Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion livekit-agents/livekit/agents/ipc/log_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,34 @@
import queue
import sys
import threading
from typing import Callable, Optional
from typing import Any, Callable, Optional

from .. import utils
from ..utils.aio import duplex_unix


def _safe_to_log(obj: Any) -> Any:
"""Safely convert an object to a pickleable format.

Handles RpcError and other objects that may not be directly pickleable
by converting them to a dictionary representation.
"""
try:
# Try to identify RpcError-like objects
if hasattr(obj, "__class__") and "RpcError" in obj.__class__.__name__:
# Convert RpcError to a safe dict representation
return {
"_type": "RpcError",
"message": str(obj) if hasattr(obj, "__str__") else repr(obj),
"code": getattr(obj, "code", None),
"msg": getattr(obj, "msg", None),
}
return obj
except Exception:
# If anything goes wrong, return a safe string representation
return str(obj) if obj is not None else None


class LogQueueListener:
def __init__(
self,
Expand Down Expand Up @@ -103,6 +125,44 @@ def emit(self, record: logging.LogRecord) -> None:
if hasattr(record, "websocket"):
record.websocket = None

# Safely handle RpcError and other non-pickleable objects
# RpcError objects might be in extra dict fields added via logger calls
# We need to sanitize these before pickling
try:
# Get all non-standard attributes (those added via extra= in logging calls)
# Standard LogRecord attributes are safe, we only need to check custom ones
standard_attrs = {
"name", "msg", "args", "created", "filename", "funcName",
"levelname", "levelno", "lineno", "module", "msecs",
"message", "pathname", "process", "processName", "relativeCreated",
"thread", "threadName", "exc_info", "exc_text", "stack_info",
"getMessage", "websocket"
}

# Check custom attributes that might contain RpcError
for attr_name in dir(record):
if attr_name.startswith("_") or attr_name in standard_attrs:
continue
try:
attr_value = getattr(record, attr_name, None)
if attr_value is not None:
# Recursively sanitize dict values (common for extra= parameters)
if isinstance(attr_value, dict):
safe_dict = {}
for key, value in attr_value.items():
safe_dict[key] = _safe_to_log(value)
setattr(record, attr_name, safe_dict)
else:
safe_value = _safe_to_log(attr_value)
if safe_value is not attr_value:
setattr(record, attr_name, safe_value)
except (AttributeError, TypeError):
# Skip attributes that can't be accessed or modified
pass
except Exception:
# If sanitization fails, continue anyway - the original exception handler will catch pickle errors
pass

self._send_q.put_nowait(pickle.dumps(record))

except Exception:
Expand Down
42 changes: 41 additions & 1 deletion tests/test_chat_ctx.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,48 @@
assert chat_ctx.is_equivalent(ChatContext.from_dict(chat_ctx.to_dict()))


async def test_summarize():
async def test_summarize(monkeypatch):
from livekit.agents import ChatContext
import openai as openai_package

Check failure on line 67 in tests/test_chat_ctx.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (I001)

tests/test_chat_ctx.py:66:5: I001 Import block is un-sorted or un-formatted

# Stub out openai.AsyncClient to prevent real API calls
# The plugin's LLM class uses openai.AsyncClient internally
class _DummyStream:
def __init__(self, chunks=None):
self._chunks = chunks or []
self._iter = iter(self._chunks)

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc, tb):
return False

def __aiter__(self):
return self

async def __anext__(self):
try:
return next(self._iter)
except StopIteration:
raise StopAsyncIteration

Check failure on line 89 in tests/test_chat_ctx.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (B904)

tests/test_chat_ctx.py:89:17: B904 Within an `except` clause, raise exceptions with `raise ... from err` or `raise ... from None` to distinguish them from errors in exception handling

class _DummyCompletions:
async def create(self, *args, **kwargs):
# Return an empty stream for testing
return _DummyStream(chunks=[])

class _DummyChat:
def __init__(self):
self.completions = _DummyCompletions()

class _DummyAsyncClient:
def __init__(self, *args, **kwargs):
self.chat = _DummyChat()
self.api_key = None

# Replace openai.AsyncClient in the openai package to prevent real API calls
monkeypatch.setattr(openai_package, "AsyncClient", _DummyAsyncClient)

chat_ctx = ChatContext()
chat_ctx.add_message(
Expand Down
103 changes: 103 additions & 0 deletions tests/test_log_queue_rpcerror.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""Test case for Issue #4188: LogQueueListener crashes when logging RpcErrors."""

from __future__ import annotations

import logging
import socket
import time

from livekit.agents.ipc.log_queue import LogQueueHandler, LogQueueListener
from livekit.agents.utils.aio import duplex_unix


class MockRpcError:
"""Mock RpcError-like object that might not be pickleable."""

def __init__(self, message: str, code: int = 0):
self.message = message
self.code = code
self.msg = message

def __str__(self) -> str:
return self.message

def __repr__(self) -> str:
return f"MockRpcError(message={self.message!r}, code={self.code})"


def test_log_queue_handler_with_rpcerror() -> None:
"""Test that LogQueueHandler can safely handle RpcError objects in log records."""
mp_log_pch, mp_log_cch = socket.socketpair()

try:
log_pch = duplex_unix._Duplex.open(mp_log_pch)
handler = LogQueueHandler(log_pch)

# Create a logger and add the handler
test_logger = logging.getLogger("test_rpcerror")
test_logger.addHandler(handler)
test_logger.setLevel(logging.DEBUG)

# Try to log with RpcError in extra fields
rpc_error = MockRpcError("Test RPC error", code=500)
test_logger.error(
"Test error with RpcError",
extra={"rpc_error": rpc_error, "error_code": rpc_error.code},
)

# Give the handler time to process
time.sleep(0.1)

# The handler should not crash when pickling
# If it does, the exception will be caught by handleError
assert True, "LogQueueHandler should handle RpcError without crashing"

finally:
handler.close()
log_pch.close()
mp_log_cch.close()


def test_log_queue_listener_with_rpcerror() -> None:
"""Test that LogQueueListener can safely receive and handle log records with RpcError."""
mp_log_pch, mp_log_cch = socket.socketpair()

try:
log_pch = duplex_unix._Duplex.open(mp_log_pch)
log_cch = duplex_unix._Duplex.open(mp_log_cch)

received_records = []

def prepare_fnc(record: logging.LogRecord) -> None:
received_records.append(record)

listener = LogQueueListener(log_cch, prepare_fnc)
listener.start()

# Create a handler that sends to the listener
handler = LogQueueHandler(log_pch)

test_logger = logging.getLogger("test_listener_rpcerror")
test_logger.addHandler(handler)
test_logger.setLevel(logging.DEBUG)

# Log with RpcError
rpc_error = MockRpcError("Test RPC error for listener", code=404)
test_logger.warning(
"Test warning with RpcError",
extra={"rpc_error": rpc_error},
)

# Give time for the log to be processed
time.sleep(0.2)

# The listener should not crash
assert True, "LogQueueListener should handle RpcError without crashing"

listener.stop()
handler.close()

finally:
log_pch.close()
log_cch.close()

Loading