Skip to content

Commit a5ac1d5

Browse files
xuanyang15copybara-github
authored andcommitted
feat: Add progressive SSE streaming feature
Co-authored-by: Xuan Yang <xygoogle@google.com> PiperOrigin-RevId: 833483804
1 parent 0ec0195 commit a5ac1d5

File tree

4 files changed

+510
-3
lines changed

4 files changed

+510
-3
lines changed

src/google/adk/features/_feature_registry.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@
2424
class FeatureName(str, Enum):
2525
"""Feature names."""
2626

27-
JSON_SCHEMA_FOR_FUNC_DECL = "JSON_SCHEMA_FOR_FUNC_DECL"
2827
COMPUTER_USE = "COMPUTER_USE"
28+
JSON_SCHEMA_FOR_FUNC_DECL = "JSON_SCHEMA_FOR_FUNC_DECL"
29+
PROGRESSIVE_SSE_STREAMING = "PROGRESSIVE_SSE_STREAMING"
2930

3031

3132
class FeatureStage(Enum):
@@ -58,11 +59,14 @@ class FeatureConfig:
5859

5960
# Central registry: FeatureName -> FeatureConfig
6061
_FEATURE_REGISTRY: dict[FeatureName, FeatureConfig] = {
62+
FeatureName.COMPUTER_USE: FeatureConfig(
63+
FeatureStage.EXPERIMENTAL, default_on=True
64+
),
6165
FeatureName.JSON_SCHEMA_FOR_FUNC_DECL: FeatureConfig(
6266
FeatureStage.WIP, default_on=False
6367
),
64-
FeatureName.COMPUTER_USE: FeatureConfig(
65-
FeatureStage.EXPERIMENTAL, default_on=True
68+
FeatureName.PROGRESSIVE_SSE_STREAMING: FeatureConfig(
69+
FeatureStage.WIP, default_on=False
6670
),
6771
}
6872

src/google/adk/flows/llm_flows/base_llm_flow.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
from ...agents.run_config import StreamingMode
3939
from ...agents.transcription_entry import TranscriptionEntry
4040
from ...events.event import Event
41+
from ...features import FeatureName
42+
from ...features import is_feature_enabled
4143
from ...models.base_llm_connection import BaseLlmConnection
4244
from ...models.llm_request import LlmRequest
4345
from ...models.llm_response import LlmResponse
@@ -525,6 +527,16 @@ async def _postprocess_async(
525527

526528
# Handles function calls.
527529
if model_response_event.get_function_calls():
530+
531+
if is_feature_enabled(FeatureName.PROGRESSIVE_SSE_STREAMING):
532+
# In progressive SSE streaming mode stage 1, we skip partial FC events
533+
# Only execute FCs in the final aggregated event (partial=False)
534+
if (
535+
invocation_context.run_config.streaming_mode == StreamingMode.SSE
536+
and model_response_event.partial
537+
):
538+
return
539+
528540
async with Aclosing(
529541
self._postprocess_handle_function_calls_async(
530542
invocation_context, model_response_event, llm_request

src/google/adk/utils/streaming_utils.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
from google.genai import types
2121

22+
from ..features import FeatureName
23+
from ..features import is_feature_enabled
2224
from ..models.llm_response import LlmResponse
2325

2426

@@ -35,6 +37,30 @@ def __init__(self):
3537
self._usage_metadata = None
3638
self._response = None
3739

40+
# For progressive SSE streaming mode: accumulate parts in order
41+
self._parts_sequence: list[types.Part] = []
42+
self._current_text_buffer: str = ''
43+
self._current_text_is_thought: Optional[bool] = None
44+
self._finish_reason: Optional[types.FinishReason] = None
45+
46+
def _flush_text_buffer_to_sequence(self):
47+
"""Flush current text buffer to parts sequence.
48+
49+
This helper is used in progressive SSE mode to maintain part ordering.
50+
It only merges consecutive text parts of the same type (thought or regular).
51+
"""
52+
if self._current_text_buffer:
53+
if self._current_text_is_thought:
54+
self._parts_sequence.append(
55+
types.Part(text=self._current_text_buffer, thought=True)
56+
)
57+
else:
58+
self._parts_sequence.append(
59+
types.Part.from_text(text=self._current_text_buffer)
60+
)
61+
self._current_text_buffer = ''
62+
self._current_text_is_thought = None
63+
3864
async def process_response(
3965
self, response: types.GenerateContentResponse
4066
) -> AsyncGenerator[LlmResponse, None]:
@@ -51,6 +77,42 @@ async def process_response(
5177
self._response = response
5278
llm_response = LlmResponse.create(response)
5379
self._usage_metadata = llm_response.usage_metadata
80+
81+
# ========== Progressive SSE Streaming (new feature) ==========
82+
# Save finish_reason for final aggregation
83+
if llm_response.finish_reason:
84+
self._finish_reason = llm_response.finish_reason
85+
86+
if is_feature_enabled(FeatureName.PROGRESSIVE_SSE_STREAMING):
87+
# Accumulate parts while preserving their order
88+
# Only merge consecutive text parts of the same type (thought or regular)
89+
if llm_response.content and llm_response.content.parts:
90+
for part in llm_response.content.parts:
91+
if part.text:
92+
# Check if we need to flush the current buffer first
93+
# (when text type changes from thought to regular or vice versa)
94+
if (
95+
self._current_text_buffer
96+
and part.thought != self._current_text_is_thought
97+
):
98+
self._flush_text_buffer_to_sequence()
99+
100+
# Accumulate text to buffer
101+
if not self._current_text_buffer:
102+
self._current_text_is_thought = part.thought
103+
self._current_text_buffer += part.text
104+
else:
105+
# Non-text part (function_call, bytes, etc.)
106+
# Flush any buffered text first, then add the non-text part
107+
self._flush_text_buffer_to_sequence()
108+
self._parts_sequence.append(part)
109+
110+
# Mark ALL intermediate chunks as partial
111+
llm_response.partial = True
112+
yield llm_response
113+
return
114+
115+
# ========== Non-Progressive SSE Streaming (old behavior) ==========
54116
if (
55117
llm_response.content
56118
and llm_response.content.parts
@@ -89,6 +151,36 @@ def close(self) -> Optional[LlmResponse]:
89151
Returns:
90152
The aggregated LlmResponse.
91153
"""
154+
# ========== Progressive SSE Streaming (new feature) ==========
155+
if is_feature_enabled(FeatureName.PROGRESSIVE_SSE_STREAMING):
156+
# Always generate final aggregated response in progressive mode
157+
if self._response and self._response.candidates:
158+
# Flush any remaining text buffer to complete the sequence
159+
self._flush_text_buffer_to_sequence()
160+
161+
# Use the parts sequence which preserves original ordering
162+
final_parts = self._parts_sequence
163+
164+
if final_parts:
165+
candidate = self._response.candidates[0]
166+
finish_reason = self._finish_reason or candidate.finish_reason
167+
168+
return LlmResponse(
169+
content=types.ModelContent(parts=final_parts),
170+
error_code=None
171+
if finish_reason == types.FinishReason.STOP
172+
else finish_reason,
173+
error_message=None
174+
if finish_reason == types.FinishReason.STOP
175+
else candidate.finish_message,
176+
usage_metadata=self._usage_metadata,
177+
finish_reason=finish_reason,
178+
partial=False,
179+
)
180+
181+
return None
182+
183+
# ========== Non-Progressive SSE Streaming (old behavior) ==========
92184
if (
93185
(self._text or self._thought_text)
94186
and self._response

0 commit comments

Comments
 (0)