⚡️ Speed up method LoggingWorker._worker_loop by 5%
#425
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
📄 5% (0.05x) speedup for
LoggingWorker._worker_loopinlitellm/litellm_core_utils/logging_worker.py⏱️ Runtime :
1.10 seconds→1.05 seconds(best of15runs)📝 Explanation and details
The optimized code achieves a 5% runtime improvement and 7.1% throughput improvement through two key micro-optimizations in the high-frequency async worker loop:
Key Optimizations
1. Local Variable Caching
The most impactful change is caching frequently accessed methods and attributes as local variables before entering tight loops:
queue_get = _queue.getandqueue_task_done = _queue.task_doneeliminate repeated attribute lookupswait_for = asyncio.wait_forandtimeout = self.timeoutcache commonly used valuesclear_queue(), similar caching forqueue_get_nowait,loop_time(), and constantsThis optimization is particularly effective in Python because attribute access (
self.attributeormodule.function) involves dictionary lookups that add overhead in tight loops. The line profiler shows the mainwhile Trueloop executing 1,669 times, making these micro-optimizations compound significantly.2. Conditional Logging Guards
Added
verbose_logger.isEnabledFor()checks before expensive logging operations:f"LoggingWorker error: {e}") when logging level wouldn't emit the messagePerformance Impact
The optimization targets the hottest code paths shown in the profiler:
await queue_get()line (12.6% of total time) benefits from cached method referencequeue_task_done()call (4.1% of total time) similarly optimizedWorkload Suitability
This LoggingWorker appears designed for high-throughput background processing (mentioned "+200 RPS improvement" in comments), making these micro-optimizations valuable for:
The optimizations preserve all behavioral guarantees while delivering measurable performance gains in the target use case of processing many logging tasks asynchronously.
✅ Correctness verification report:
🌀 Generated Regression Tests and Runtime
import asyncio # used to run async functions
from types import SimpleNamespace
from typing import Optional
import pytest # used for our unit tests
from litellm.litellm_core_utils.logging_worker import LoggingWorker
---- Helper classes and functions for testing ----
class DummyContext:
"""A dummy context that mimics the interface used in LoggingWorker._worker_loop."""
def init(self):
self.ran = False
self.last_args = None
self.last_kwargs = None
class DummyContextWithException(DummyContext):
"""Context that raises an exception when run is called."""
async def run(self, func, coroutine):
raise RuntimeError("DummyContextWithException error")
async def dummy_coro(val=None):
"""A simple dummy coroutine for testing."""
return val
async def dummy_coro_exception():
"""A coroutine that raises an exception."""
raise ValueError("Intentional error")
---- Basic Test Cases ----
@pytest.mark.asyncio
async def test_worker_loop_returns_none_when_queue_is_none():
"""
Test that _worker_loop returns immediately (None) if the queue is None.
"""
worker = LoggingWorker()
# _queue is None by default
result = await worker._worker_loop()
@pytest.mark.asyncio
#------------------------------------------------
import asyncio # used to run async functions
from typing import Optional
from unittest.mock import MagicMock
import pytest # used for our unit tests
from litellm._logging import verbose_logger
from litellm.litellm_core_utils.logging_worker import LoggingWorker
--- Helper classes and functions for testing ---
class DummyContext:
"""A dummy context object with a .run method to simulate context.run."""
def init(self):
self.ran = False
self.args = None
self.kwargs = None
def make_logging_task(coro, context=None):
"""Helper to construct a LoggingTask dict for LoggingWorker."""
if context is None:
context = DummyContext()
return {"coroutine": coro, "context": context}
--- Basic Test Cases ---
@pytest.mark.asyncio
async def test_worker_loop_returns_none_when_queue_is_none():
"""
Test that _worker_loop returns immediately if the queue is None.
"""
worker = LoggingWorker()
# _queue is None by default
result = await worker._worker_loop()
@pytest.mark.asyncio
async def test_worker_loop_throughput_high_load():
"""
Throughput test: high load (200 tasks).
"""
worker = LoggingWorker()
worker._queue = asyncio.Queue()
num_tasks = 200
called = [False] * num_tasks
contexts = [DummyContext() for _ in range(num_tasks)]
codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
To edit these changes
git checkout codeflash/optimize-LoggingWorker._worker_loop-mhtuqr4zand push.