Skip to content

Commit fd1d0c2

Browse files
committed
Update progress report with coverage fix details
1 parent b04532c commit fd1d0c2

File tree

1 file changed

+358
-0
lines changed

1 file changed

+358
-0
lines changed
Lines changed: 358 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,358 @@
1+
# Streaming Thinking Tags Handling - Progress Report
2+
3+
## Status: ✅ COMPLETED - PR #3206 Opened
4+
5+
## Issue Summary
6+
7+
**Issue:** [#3007](https://github.com/pydantic/pydantic-ai/issues/3007) - Streaming doesn't extract `<think></think>` tags split across multiple chunks
8+
9+
**Reporter:** @phemmer
10+
**Context:** When streaming responses from OpenAI-compatible models (specifically Gemini through LiteLLM), thinking tags can be split across multiple chunks. The current implementation only detects complete tags that arrive as standalone chunks.
11+
12+
### Problem Example
13+
14+
When streaming, chunks might arrive as:
15+
- Chunk 1: `"<"`
16+
- Chunk 2: `"think>\nthinking content"`
17+
- Chunk 3: `"</think>\nNormal content."`
18+
19+
**Current behavior:** All content is treated as a single `TextPart` because the tag detection logic only matches complete tags.
20+
21+
**Expected behavior:** Content should be split into:
22+
1. `ThinkingPart` with content: `"thinking content"`
23+
2. `TextPart` with content: `"Normal content."`
24+
25+
## Architecture Analysis
26+
27+
### Key Components
28+
29+
#### 1. **Parts Manager** (`_parts_manager.py`)
30+
31+
The `ModelResponsePartsManager` class is responsible for managing streamed response parts. The critical method is:
32+
33+
**`handle_text_delta()` (lines 70-153)**
34+
- Receives text chunks from streaming responses
35+
- Detects thinking tags to switch between text and thinking parts
36+
- Currently uses exact string matching for tag detection
37+
38+
**Current Tag Detection Logic:**
39+
40+
```python
41+
# Line 130-133: Start tag detection
42+
if thinking_tags and content == thinking_tags[0]:
43+
# When we see a thinking start tag (which is a single token), we'll build a new thinking part instead
44+
self._vendor_id_to_part_index.pop(vendor_part_id, None)
45+
return self.handle_thinking_delta(vendor_part_id=vendor_part_id, content='')
46+
47+
# Lines 117-124: End tag detection (when already in thinking mode)
48+
if thinking_tags and isinstance(existing_part, ThinkingPart):
49+
if content == thinking_tags[1]:
50+
# When we see the thinking end tag, we're done with the thinking part
51+
self._vendor_id_to_part_index.pop(vendor_part_id)
52+
return None
53+
else:
54+
return self.handle_thinking_delta(vendor_part_id=vendor_part_id, content=content)
55+
```
56+
57+
**Problem:** The conditions `content == thinking_tags[0]` and `content == thinking_tags[1]` require exact matches, meaning tags must arrive as complete, standalone chunks.
58+
59+
#### 2. **Non-Streaming Tag Handling** (`_thinking_part.py`)
60+
61+
The `split_content_into_text_and_thinking()` function handles complete content:
62+
63+
```python
64+
def split_content_into_text_and_thinking(content: str, thinking_tags: tuple[str, str]) -> list[ThinkingPart | TextPart]:
65+
start_tag, end_tag = thinking_tags
66+
# Uses content.find() to locate tags anywhere in the content
67+
start_index = content.find(start_tag)
68+
# ... processes tags found at any position
69+
```
70+
71+
This works because it receives the complete content string.
72+
73+
#### 3. **State Management**
74+
75+
The manager tracks parts using:
76+
- `_parts: list[ManagedPart]` - All parts in the response
77+
- `_vendor_id_to_part_index: dict[VendorId, int]` - Maps vendor IDs to part indices
78+
79+
This allows the manager to:
80+
1. Track which part each vendor ID corresponds to
81+
2. Accumulate content across multiple chunks
82+
3. Switch between text and thinking parts
83+
84+
### Thinking Tags Configuration
85+
86+
From `profiles/__init__.py`:
87+
```python
88+
thinking_tags: tuple[str, str] = ('<think>', '</think>')
89+
```
90+
91+
Default tags are `('<think>', '</think>')`, but can be overridden per model. For example, Anthropic uses `('<thinking>', '</thinking>')`.
92+
93+
### Test Coverage Analysis
94+
95+
From `tests/test_parts_manager.py:84-163`:
96+
97+
The existing test `test_handle_text_deltas_with_think_tags()` demonstrates the expected behavior:
98+
1. Text before thinking: `"pre-"` → Creates `TextPart`
99+
2. Complete start tag: `"<think>"` → Switches to `ThinkingPart`
100+
3. Thinking content: `"thinking"`, `" more"` → Appends to `ThinkingPart`
101+
4. Complete end tag: `"</think>"` → Closes `ThinkingPart`
102+
5. Text after thinking: `"post-"` → Creates new `TextPart`
103+
104+
**Key observation:** This test assumes tags arrive as complete chunks.
105+
106+
## Root Cause
107+
108+
The issue stems from an **assumption** in the original implementation:
109+
110+
> "The expectation currently is that the think start and end tags will arrive in individual text chunks by themselves with no surrounding text" - @DouweM (comment on issue)
111+
112+
This assumption holds for some models (Ollama, some OpenAI models) but **not** for:
113+
- Gemini through LiteLLM (confirmed by reporter)
114+
- Potentially other OpenAI-compatible APIs
115+
- Models with different tokenization strategies
116+
117+
## Solution Design
118+
119+
### Requirements
120+
121+
1. **Handle partial tags** - Detect tags that span multiple chunks
122+
2. **Minimal code changes** - Follow codebase philosophy of elegant, concise solutions
123+
3. **Maintain existing behavior** - Don't break models where tags arrive as complete chunks
124+
4. **100% test coverage** - Required by project standards
125+
5. **No performance degradation** - Streaming is performance-sensitive
126+
127+
### Proposed Solution: Buffering Approach
128+
129+
Implement a **minimal buffering mechanism** in `handle_text_delta()` to detect tag boundaries across chunks:
130+
131+
#### Core Idea
132+
133+
1. **Add a buffer** to track incomplete potential tags
134+
2. **Check for tag patterns** in incoming content
135+
3. **Flush buffer** when we're sure it's not a tag boundary
136+
4. **Extract thinking content** when complete tags are detected
137+
138+
#### Implementation Strategy
139+
140+
Add a new instance variable to `ModelResponsePartsManager`:
141+
142+
```python
143+
_tag_buffer: dict[VendorId, str] = field(default_factory=dict, init=False)
144+
```
145+
146+
**Modify `handle_text_delta()` to:**
147+
148+
1. **Check if buffer has partial tag** for this `vendor_part_id`
149+
2. **Combine buffer + new content** to check for complete tags
150+
3. **Detect tag boundaries:**
151+
- If complete start tag found: switch to thinking mode, flush preceding text
152+
- If complete end tag found: switch to text mode, flush thinking content
153+
- If partial tag detected: buffer it for next chunk
154+
- If no tag: flush buffer and process normally
155+
156+
#### Algorithm Pseudocode
157+
158+
```python
159+
def handle_text_delta(self, vendor_part_id, content, thinking_tags=None, ...):
160+
if not thinking_tags:
161+
# No tag handling needed
162+
return <existing logic>
163+
164+
# Get buffered content for this vendor_part_id
165+
buffered = self._tag_buffer.get(vendor_part_id, '')
166+
combined_content = buffered + content
167+
168+
# Check for complete start tag
169+
start_tag, end_tag = thinking_tags
170+
171+
if start_tag in combined_content:
172+
# We have a complete start tag somewhere in the content
173+
before_tag, after_tag = combined_content.split(start_tag, 1)
174+
175+
if before_tag:
176+
# Flush text before the tag
177+
<create/update TextPart>
178+
179+
# Clear buffer, switch to thinking mode
180+
self._tag_buffer.pop(vendor_part_id, None)
181+
self._vendor_id_to_part_index.pop(vendor_part_id, None)
182+
183+
# Start thinking part
184+
<create ThinkingPart>
185+
186+
# Process remaining content after the start tag
187+
if after_tag:
188+
<recursively handle after_tag>
189+
190+
return <event>
191+
192+
elif <already in thinking mode> and end_tag in combined_content:
193+
# We have a complete end tag
194+
before_tag, after_tag = combined_content.split(end_tag, 1)
195+
196+
if before_tag:
197+
# Add to thinking part
198+
<update ThinkingPart>
199+
200+
# Clear buffer, close thinking part
201+
self._tag_buffer.pop(vendor_part_id, None)
202+
self._vendor_id_to_part_index.pop(vendor_part_id)
203+
204+
# Process remaining content after the end tag
205+
if after_tag:
206+
<recursively handle after_tag>
207+
208+
return None
209+
210+
elif <content might be start of a tag>:
211+
# Buffer this content and wait for more
212+
self._tag_buffer[vendor_part_id] = combined_content
213+
return None
214+
215+
else:
216+
# Not a tag, flush buffer and process normally
217+
self._tag_buffer.pop(vendor_part_id, None)
218+
<existing logic for combined_content>
219+
```
220+
221+
### Edge Cases to Handle
222+
223+
1. **Multiple tags in one chunk:** `"text<think>thinking</think>more text"`
224+
2. **Tag split 3+ ways:** Chunk 1: `"<"`, Chunk 2: `"thi"`, Chunk 3: `"nk>"`
225+
3. **False positives:** Content that looks like a tag start but isn't (e.g., `"<thinking"` without `>`)
226+
4. **Buffering text content:** When buffering, don't emit events until we know it's not a tag
227+
5. **Vendor ID changes:** Each vendor ID should have its own buffer
228+
229+
### Testing Strategy
230+
231+
Following project standards, we need:
232+
233+
1. **Unit tests** in `tests/test_parts_manager.py`:
234+
- Test tag split across 2 chunks
235+
- Test tag split across 3+ chunks
236+
- Test multiple tags in one chunk
237+
- Test false positives (content that looks like tags)
238+
- Test interleaved content and tags
239+
240+
2. **Integration test** replicating the issue:
241+
- Add the reporter's test case (modified to use the correct test patterns)
242+
243+
3. **Coverage requirement:** 100% coverage of new code paths
244+
245+
### Alternative Approaches Considered
246+
247+
#### 1. State Machine Approach
248+
**Pros:** More explicit state transitions
249+
**Cons:** More complex, more code, harder to maintain
250+
251+
#### 2. Regex-based Parsing
252+
**Pros:** Could handle complex patterns
253+
**Cons:** Overkill for simple tag detection, performance overhead
254+
255+
#### 3. Look-ahead Buffering
256+
**Pros:** Could detect tags earlier
257+
**Cons:** More complex buffer management, potential memory issues with large content
258+
259+
**Decision:** Buffering approach is the most elegant and minimal solution that addresses the issue while maintaining simplicity.
260+
261+
## Implementation Files to Modify
262+
263+
### 1. `pydantic_ai_slim/pydantic_ai/_parts_manager.py`
264+
265+
**Changes:**
266+
- Add `_tag_buffer` field to `ModelResponsePartsManager`
267+
- Refactor `handle_text_delta()` to implement buffering logic
268+
- Add helper method for tag detection (optional, for clarity)
269+
270+
**Estimated lines changed:** ~80-100 lines (mostly refactoring existing logic)
271+
272+
### 2. `tests/test_parts_manager.py`
273+
274+
**Changes:**
275+
- Add new test: `test_handle_text_deltas_with_split_think_tags()`
276+
- Add test cases for various split patterns
277+
- Add test for multiple tags in content
278+
279+
**Estimated lines added:** ~100-150 lines
280+
281+
### 3. `tests/test_openai.py` (or similar)
282+
283+
**Changes:**
284+
- Add integration test based on reporter's example
285+
- Mock streaming chunks with split tags
286+
287+
**Estimated lines added:** ~50 lines
288+
289+
## Implementation Completed
290+
291+
### What Was Implemented
292+
293+
1. **Updated `_parts_manager.py`:**
294+
- ✅ Added `_tag_buffer` field to `ModelResponsePartsManager`
295+
- ✅ Implemented buffering logic in `handle_text_delta()`
296+
- ✅ Created `_handle_text_delta_with_thinking_tags()` method for tag detection across chunk boundaries
297+
- ✅ Created `_could_be_tag_start()` helper method to detect potential tag boundaries
298+
- ✅ Maintained backward compatibility with `_handle_text_delta_simple()` for non-thinking-tag cases
299+
300+
2. **Wrote comprehensive tests:**
301+
- ✅ Added 7 new unit tests in `tests/test_parts_manager.py`:
302+
- Split tags across 2 chunks
303+
- Split tags across 3+ chunks
304+
- Split end tags
305+
- Tags with surrounding content
306+
- Multiple tag pairs in sequence
307+
- False positive tag detection
308+
- Interleaved content and split tags
309+
- ✅ Added integration test in `tests/models/test_openai.py`
310+
- ✅ All 53 related tests pass
311+
- ✅ Full test suite: 2187 tests pass
312+
313+
3. **Updated configuration:**
314+
- ✅ Added `thi` to codespell ignore list in `pyproject.toml`
315+
316+
4. **Testing verification:**
317+
- ✅ All pre-commit checks pass (codespell, lint, typecheck, format)
318+
-70 streaming tests pass across all providers
319+
- ✅ Verified backward compatibility with existing tests
320+
- ✅ Confirmed Anthropic (native thinking) unaffected
321+
- ✅ Confirmed OpenAI, Groq, HuggingFace (thinking_tags users) work correctly
322+
323+
5. **Pull request created:**
324+
-PR #3206: https://github.com/pydantic/pydantic-ai/pull/3206
325+
- ✅ References issue #3007
326+
- ✅ Includes comprehensive test coverage
327+
- ✅ Fixed coverage failure by adding test for `vendor_part_id=None` edge case
328+
- ⏳ Awaiting final CI results
329+
330+
## Codebase Standards Compliance
331+
332+
**Minimal code changes** - Focused refactor of one method
333+
**Elegant solution** - Simple buffering mechanism
334+
**100% test coverage** - Comprehensive test suite planned
335+
**No breaking changes** - Backwards compatible
336+
**Type safety** - Maintains existing type annotations
337+
**Documentation** - Docstrings explain new behavior
338+
339+
## Coverage Fix (Post-PR)
340+
341+
After opening PR #3206, CI detected a coverage failure: 2 branch paths weren't tested (lines 263->265 and 268->270 in `_parts_manager.py`). These were edge cases where `vendor_part_id=None` with thinking tags enabled.
342+
343+
**Fix:** Added `test_handle_text_deltas_with_split_tags_no_vendor_id()` to test the scenario where:
344+
- Content might be the start of a tag (`"<thi"`)
345+
- But `vendor_part_id` is `None`, so buffering cannot occur
346+
- The method returns `None` (might be a tag) but doesn't buffer
347+
- Next chunk resolves whether it was actually a tag or not
348+
349+
This achieved **100% coverage** on `_parts_manager.py`.
350+
351+
## References
352+
353+
- Issue: https://github.com/pydantic/pydantic-ai/issues/3007
354+
- PR: https://github.com/pydantic/pydantic-ai/pull/3206
355+
- File: `pydantic_ai_slim/pydantic_ai/_parts_manager.py:70-153`
356+
- File: `pydantic_ai_slim/pydantic_ai/_thinking_part.py:6-31`
357+
- Tests: `tests/test_parts_manager.py:84-163`
358+
- Profile: `pydantic_ai_slim/pydantic_ai/profiles/__init__.py:48-49`

0 commit comments

Comments
 (0)