-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhance LLM Streaming Response Handling and Event System #2266
Conversation
Disclaimer: This review was made by a crew of AI Agents. Code Review for PR #2266: Initial Stream WorkingOverviewThis pull request introduces streaming functionality for LLM responses, aiming to enhance the system’s capability to handle real-time data from language models. The changes are significant and involve several files primarily centered around the 1. Structural ImprovementsLLM Class OrganizationTo enhance maintainability, it’s suggested to create a dedicated class StreamingHandler:
def __init__(self, llm_instance):
self.llm = llm_instance
def handle_streaming_response(self, params, available_functions=None):
# Move streaming logic here
def process_chunk(self, chunk):
# Move chunk processing logic here Type DefinitionsConsolidating type definitions into a separate # types.py
from typing import TypedDict, Optional
class Delta(TypedDict):
content: Optional[str]
role: Optional[str]
class StreamingChoices(TypedDict):
delta: Delta
index: int
finish_reason: Optional[str] 2. Error Handling ImprovementsRobust Error HandlingImplementing custom exceptions for streaming errors enhances the resilience of the streaming feature: class StreamingError(Exception):
pass
def _handle_streaming_response(self, params, available_functions=None):
try:
for chunk in litellm.completion(**params):
if not self._is_valid_chunk(chunk):
raise StreamingError("Invalid chunk format")
# Process chunk
except StreamingError as e:
logging.error(f"Streaming error: {str(e)}")
return self._handle_fallback(params) 3. Code Duplication IssuesRedundant Response HandlingThere's an opportunity to reduce duplicated code between streaming and non-streaming responses. A common processing method can streamline this: def _process_llm_response(self, content, call_type):
"""Common response processing logic"""
self._handle_emit_call_events(content, call_type)
return content 4. Testing ImprovementsEnhanced Test CoverageTesting should cover more edge cases to ensure reliability in various scenarios: @pytest.mark.parametrize("error_scenario", [
"empty_chunk",
"invalid_chunk_format",
"network_error"
])
def test_streaming_error_scenarios(error_scenario):
# Implement tests for specified error scenarios 5. Documentation SuggestionsImproved Inline DocumentationAdding detailed docstrings to new methods will help with future maintenance: def _handle_streaming_response(
self, params: Dict[str, Any], available_functions: Optional[Dict[str, Any]] = None
) -> str:
"""
Handle streaming responses from the LLM.
Args:
params: Configuration parameters for the LLM call
available_functions: Dictionary of available tool functions
Returns:
str: Concatenated response from streaming chunks.
""" 6. Performance ConsiderationsMemory EfficiencyUsing a generator for streaming responses reduces memory overhead: def stream_response(self, params):
"""Stream response chunks without storing them all in memory"""
yield from (
chunk.content
for chunk in litellm.completion(**params)
if self._is_valid_chunk(chunk)
) 7. Security RecommendationsInput ValidationTo prevent misuse, robust validation for input parameters related to streaming must be implemented: def _validate_streaming_params(self, params):
"""Validate streaming-specific parameters"""
if not isinstance(params.get("stream"), bool):
raise ValueError("Stream parameter must be boolean") Overall Recommendations
The overall quality of the code is commendable, and implementing these suggestions will significantly improve maintainability and functionality, enhancing the LLM's streaming capabilities. |
Proof of work: https://www.loom.com/share/bdf457728ea04336a32867525fbdb818?sid=a3c71c25-cafa-479b-b963-0c6cf73da45c
Description
This PR improves the handling of streaming responses from LLMs in the CrewAI framework, addressing issues with empty responses and enhancing error handling. The changes include:
Core Improvements
_handle_streaming_response
method to properly extract content from various chunk formatsEvent System Enhancements
LLMStreamChunkEvent
to the event system to track streaming chunksTesting
These changes make the LLM streaming functionality more robust and reliable, especially when dealing with different LLM providers that may have varying response formats.