Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 38 additions & 21 deletions litellm/litellm_core_utils/logging_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,37 @@ def start(self) -> None:
async def _worker_loop(self) -> None:
"""Main worker loop that processes log coroutines sequentially."""
try:
if self._queue is None:
_queue = self._queue
if _queue is None:
return

# Cache frequently used locals to increase loop speed
queue_get = _queue.get
queue_task_done = _queue.task_done
wait_for = asyncio.wait_for
timeout = self.timeout

while True:
# Process one coroutine at a time to keep event loop load predictable
task = await self._queue.get()
task = await queue_get()
try:
# Run the coroutine in its original context
await asyncio.wait_for(
await wait_for(
task["context"].run(asyncio.create_task, task["coroutine"]),
timeout=self.timeout,
timeout=timeout,
)
except Exception as e:
verbose_logger.exception(f"LoggingWorker error: {e}")
# Prevent expensive formatting if not logging this level
if verbose_logger.isEnabledFor(verbose_logger.level):
verbose_logger.exception(f"LoggingWorker error: {e}")
pass
finally:
self._queue.task_done()
queue_task_done()

except asyncio.CancelledError:
verbose_logger.debug("LoggingWorker cancelled during shutdown")
# Minimize logging verbosity for high-performance shutdown
if verbose_logger.isEnabledFor(verbose_logger.level):
verbose_logger.debug("LoggingWorker cancelled during shutdown")
# Attempt to clear remaining items to prevent "never awaited" warnings
await self.clear_queue()

Expand Down Expand Up @@ -123,34 +134,40 @@ async def clear_queue(self):
"""
Clear the queue with a maximum time limit.
"""
if self._queue is None:
_queue = self._queue
if _queue is None:
return

start_time = asyncio.get_event_loop().time()
loop_time = asyncio.get_event_loop().time
start_time = loop_time()

for _ in range(self.MAX_ITERATIONS_TO_CLEAR_QUEUE):
queue_get_nowait = _queue.get_nowait
queue_task_done = _queue.task_done
wait_for = asyncio.wait_for
timeout = self.timeout
max_iterations = self.MAX_ITERATIONS_TO_CLEAR_QUEUE
max_time = self.MAX_TIME_TO_CLEAR_QUEUE

for _ in range(max_iterations):
# Check if we've exceeded the maximum time
if (
asyncio.get_event_loop().time() - start_time
>= self.MAX_TIME_TO_CLEAR_QUEUE
):
verbose_logger.warning(
f"clear_queue exceeded max_time of {self.MAX_TIME_TO_CLEAR_QUEUE}s, stopping early"
)
if (loop_time() - start_time) >= max_time:
if verbose_logger.isEnabledFor(verbose_logger.level):
verbose_logger.warning(f"clear_queue exceeded max_time of {max_time}s, stopping early")
break

try:
task = self._queue.get_nowait()
task = queue_get_nowait()
# Await the coroutine to properly execute and avoid "never awaited" warnings
# Await the coroutine to properly execute and avoid "never awaited" warnings
try:
await asyncio.wait_for(
await wait_for(
task["context"].run(asyncio.create_task, task["coroutine"]),
timeout=self.timeout,
timeout=timeout,
)
except Exception:
# Suppress errors during cleanup
pass
self._queue.task_done() # If you're using join() elsewhere
queue_task_done() # If you're using join() elsewhere
except asyncio.QueueEmpty:
break

Expand Down