fix(proxy): detect SSE error block in HTTP 200 response and trigger retry#649
fix(proxy): detect SSE error block in HTTP 200 response and trigger retry#649
Conversation
…etry
When upstream returns HTTP 200 + text/event-stream but the first SSE
event is an error block, the system now correctly:
- Detects the error using response.clone() to preserve the original stream
- Throws SSEErrorResponseError which is classified as PROVIDER_ERROR
- Triggers retry logic and records failure in circuit breaker
- Returns proper 502 status code to client
This fixes the issue where high-concurrency errors like:
event: error
data: {"error":{"code":"1302","message":"High concurrency..."}}
were incorrectly treated as successful requests.
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Summary of ChangesHello @ding113, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request resolves an issue where the proxy incorrectly treated upstream Server-Sent Event (SSE) responses as successful, even when the initial event in the stream indicated an error. The changes introduce a specialized error type and detection mechanism to identify these 'soft' errors. By doing so, the system can now correctly classify these situations as upstream provider failures, enabling proper retry mechanisms, circuit breaker activation, and returning an accurate 502 HTTP status to the client, thereby improving the reliability and fault tolerance of the proxy. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. 📝 Walkthrough方案演进本次变更为SSE流处理添加了首块错误检测机制。在上游返回HTTP 200但SSE流包含错误事件的场景中,系统现可识别并处理此类错误,通过新增 变更清单
代码审查工作量评估🎯 3 (Moderate) | ⏱️ ~25 分钟 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| const firstChunk = await Promise.race([ | ||
| reader.read(), | ||
| new Promise<never>((_, reject) => | ||
| setTimeout(() => reject(new Error("SSE first chunk read timeout")), 5000) | ||
| ), | ||
| ]); |
There was a problem hiding this comment.
Only reads first network chunk - if SSE error event is split across chunks, detection may fail. Consider reading until first complete SSE event (detect \n\n boundary) or setting a reasonable byte limit (e.g., 4KB) instead of just reader.read() once.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/forwarder.ts
Line: 422:427
Comment:
Only reads first network chunk - if SSE error event is split across chunks, detection may fail. Consider reading until first complete SSE event (detect `\n\n` boundary) or setting a reasonable byte limit (e.g., 4KB) instead of just `reader.read()` once.
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Code Review Summary
This PR implements SSE error detection for HTTP 200 responses containing error events. The implementation is well-structured with comprehensive test coverage (27 test cases). After thorough analysis through 6 specialized review perspectives, no significant issues were identified.
PR Size: M
- Lines changed: 597 (597 additions, 0 deletions)
- Files changed: 5
Issues Found
| Category | Critical | High | Medium | Low |
|---|---|---|---|---|
| Logic/Bugs | 0 | 0 | 0 | 0 |
| Security | 0 | 0 | 0 | 0 |
| Error Handling | 0 | 0 | 0 | 0 |
| Types | 0 | 0 | 0 | 0 |
| Comments/Docs | 0 | 0 | 0 | 0 |
| Tests | 0 | 0 | 0 | 0 |
| Simplification | 0 | 0 | 0 | 0 |
Review Coverage
- Logic and correctness - Clean
- Security (OWASP Top 10) - Clean
- Error handling - Clean
- Type safety - Clean
- Documentation accuracy - Clean
- Test coverage - Excellent (27 test cases covering edge cases)
- Code clarity - Good
Key Strengths
-
Comprehensive Error Detection: The
detectSSEFirstBlockError()function handles multiple error formats (explicit event:error, type:error, embedded error objects) -
Proper Resource Management: Uses
response.clone()to inspect SSE stream without consuming the original, with proper cleanup viareader.cancel() -
Excellent Test Coverage: 27 test cases covering:
- Multiple error formats (Claude, OpenAI, Gemini styles)
- Edge cases (malformed JSON, empty streams, long messages)
- Integration with error categorization system
-
Type Safety: New
SSEErrorResponseErrorclass properly integrated into error hierarchy with type guards -
Timeout Protection: 5-second timeout on first chunk read prevents indefinite blocking
Technical Implementation Notes
- The SSE detection runs only for streaming responses (content-type: text/event-stream)
- Error detection triggers retry logic and circuit breaker recording as intended
- Proper 502 status code returned to client for SSE errors
- No breaking changes to existing error handling flow
Automated review by Claude AI
| export function detectSSEFirstBlockError(sseText: string): SSEFirstBlockError | null { | ||
| const events = parseSSEData(sseText); | ||
|
|
||
| if (events.length === 0) { | ||
| return null; | ||
| } | ||
|
|
||
| const firstEvent = events[0]; | ||
|
|
||
| // 情况 1:显式的 event: error | ||
| if (firstEvent.event === "error") { | ||
| const data = firstEvent.data; | ||
| if (typeof data === "object" && data !== null) { | ||
| const errorObj = (data as Record<string, unknown>).error as | ||
| | Record<string, unknown> | ||
| | undefined; | ||
| return { | ||
| errorCode: (errorObj?.code as string | undefined) ?? (errorObj?.type as string | undefined), | ||
| errorMessage: | ||
| (errorObj?.message as string) || | ||
| ((data as Record<string, unknown>).message as string) || | ||
| "Unknown SSE error", | ||
| rawData: sseText.slice(0, 500), | ||
| }; | ||
| } | ||
| return { | ||
| errorMessage: typeof data === "string" ? data : "Unknown SSE error", | ||
| rawData: sseText.slice(0, 500), | ||
| }; | ||
| } | ||
|
|
||
| // 情况 2:首个 data block 类型为 error(如 Claude 的 type: "error") | ||
| if (typeof firstEvent.data === "object" && firstEvent.data !== null) { | ||
| const data = firstEvent.data as Record<string, unknown>; | ||
|
|
||
| // 2.1: type: "error" 格式(Claude API 错误格式) | ||
| if (data.type === "error") { | ||
| const errorObj = data.error as Record<string, unknown> | undefined; | ||
| return { | ||
| errorCode: (errorObj?.type as string | undefined) ?? (data.code as string | undefined), | ||
| errorMessage: (errorObj?.message as string) || (data.message as string) || "Unknown error", | ||
| rawData: sseText.slice(0, 500), | ||
| }; | ||
| } | ||
|
|
||
| // 2.2: 顶层 error 字段(某些服务直接返回 data: {"error": {...}}) | ||
| if (data.error && typeof data.error === "object") { | ||
| const errorObj = data.error as Record<string, unknown>; | ||
| return { | ||
| errorCode: (errorObj.code as string | undefined) ?? (errorObj.type as string | undefined), | ||
| errorMessage: (errorObj.message as string) || "Unknown SSE error", | ||
| rawData: sseText.slice(0, 500), | ||
| }; | ||
| } | ||
| } | ||
|
|
||
| return null; | ||
| } |
There was a problem hiding this comment.
detectSSEFirstBlockError 函数负责解析多种 SSE 错误格式,这导致其逻辑较为复杂且存在重复代码(例如 rawData: sseText.slice(0, 500))。为了提高可读性、可维护性和测试性,建议将不同的错误检测逻辑提取为独立的私有辅助函数。例如,可以有 _extractFromExplicitErrorEvent(data)、_extractFromClaudeStyleError(data) 和 _extractFromTopLevelErrorField(data) 等函数,每个函数专注于处理一种特定的错误结构。
这将使主函数更简洁,并允许对每种错误解析逻辑进行更精细的单元测试。
export function detectSSEFirstBlockError(sseText: string): SSEFirstBlockError | null {
const events = parseSSEData(sseText);
if (events.length === 0) {
return null;
}
const firstEvent = events[0];
const rawDataSnippet = sseText.slice(0, 500);
// Helper to extract error details from a data object
const extractErrorDetails = (data: Record<string, unknown>): SSEFirstBlockError => {
const errorObj = data.error as Record<string, unknown> | undefined;
return {
errorCode: (errorObj?.code as string | undefined) ?? (errorObj?.type as string | undefined) ?? (data.code as string | undefined),
errorMessage:
(errorObj?.message as string) || (data.message as string) || "Unknown SSE error",
rawData: rawDataSnippet,
};
};
// 情况 1:显式的 event: error
if (firstEvent.event === "error") {
const data = firstEvent.data;
if (typeof data === "object" && data !== null) {
return extractErrorDetails(data);
}
return {
errorMessage: typeof data === "string" ? data : "Unknown SSE error",
rawData: rawDataSnippet,
};
}
// 情况 2:首个 data block 类型为 error(如 Claude 的 type: "error")
if (typeof firstEvent.data === "object" && firstEvent.data !== null) {
const data = firstEvent.data as Record<string, unknown>;
// 2.1: type: "error" 格式(Claude API 错误格式)
if (data.type === "error") {
return extractErrorDetails(data);
}
// 2.2: 顶层 error 字段(某些服务直接返回 data: {"error": {...}})
if (data.error && typeof data.error === "object") {
return extractErrorDetails(data);
}
}
return null;
}
Summary
text/event-streambut first event is errorProblem
When upstream returns HTTP 200 with SSE content like:
The system incorrectly:
Related Issues:
Solution
SSEErrorResponseErrorinerrors.tsdetectSSEFirstBlockError()insse.tsresponse.clone()to check first SSE block without consuming original streamerror-handler.tsfor proper 502 responseChanges
Core Changes
src/lib/utils/sse.ts: AddeddetectSSEFirstBlockError()function to parse and detect error events in SSE streamssrc/app/v1/_lib/proxy/errors.ts: AddedSSEErrorResponseErrorclass and categorization asPROVIDER_ERRORsrc/app/v1/_lib/proxy/forwarder.ts: Integrated SSE error detection in streaming response path with 5s timeout protectionsrc/app/v1/_lib/proxy/error-handler.ts: Added handler forSSEErrorResponseErrorreturning 502 statusTesting
tests/unit/proxy/sse-error-detection.test.ts: Comprehensive test suite with 27 test cases covering various SSE error formatsTest Coverage
detectSSEFirstBlockError()- 16 test casesSSEErrorResponseErrorclass - 6 test casesbun run typecheckpassesbun run buildpassesDescription enhanced by Claude Code