diff --git a/src/app/v1/_lib/proxy/error-handler.ts b/src/app/v1/_lib/proxy/error-handler.ts index b511b9824..ea310c9a6 100644 --- a/src/app/v1/_lib/proxy/error-handler.ts +++ b/src/app/v1/_lib/proxy/error-handler.ts @@ -12,6 +12,7 @@ import { getErrorOverrideAsync, isEmptyResponseError, isRateLimitError, + isSSEErrorResponseError, ProxyError, type RateLimitError, } from "./errors"; @@ -76,6 +77,11 @@ export class ProxyErrorHandler { clientErrorMessage = error.getClientSafeMessage(); logErrorMessage = error.message; // 日志保留完整信息 statusCode = 502; // Bad Gateway + } else if (isSSEErrorResponseError(error)) { + // SSEErrorResponseError: SSE 流中首块返回错误 + clientErrorMessage = error.getClientSafeMessage(); + logErrorMessage = error.message; // 日志保留完整信息(包含供应商名称) + statusCode = 502; // Bad Gateway - 上游返回了有效 HTTP 200 但内容是错误 } else if (error instanceof Error) { clientErrorMessage = error.message; logErrorMessage = error.message; diff --git a/src/app/v1/_lib/proxy/errors.ts b/src/app/v1/_lib/proxy/errors.ts index e12d6e7ff..7e7ec720e 100644 --- a/src/app/v1/_lib/proxy/errors.ts +++ b/src/app/v1/_lib/proxy/errors.ts @@ -744,6 +744,53 @@ export function isEmptyResponseError(error: unknown): error is EmptyResponseErro return error instanceof EmptyResponseError; } +/** + * SSE 首块错误响应 - 用于检测上游返回 HTTP 200 但 SSE 首个 event 为 error 的情况 + * + * 场景:某些上游服务在高并发时返回 HTTP 200 + text/event-stream, + * 但实际内容是 error event,需要触发重试和熔断器记录 + */ +export class SSEErrorResponseError extends Error { + constructor( + public readonly providerId: number, + public readonly providerName: string, + public readonly errorCode: string | undefined, + public readonly errorMessage: string, + public readonly rawData: string + ) { + super(`SSE error response from provider ${providerName}: ${errorMessage}`); + this.name = "SSEErrorResponseError"; + } + + /** + * 获取适合返回给客户端的安全错误信息 + */ + getClientSafeMessage(): string { + return this.errorMessage || "Upstream returned error in SSE stream"; + } + + /** + * 获取适合记录的 JSON 元数据 + */ + toJSON() { + return { + type: "sse_error_response", + provider_id: this.providerId, + provider_name: this.providerName, + error_code: this.errorCode, + error_message: this.errorMessage, + raw_data: this.rawData.slice(0, 500), + }; + } +} + +/** + * 类型守卫:检查是否为 SSEErrorResponseError + */ +export function isSSEErrorResponseError(error: unknown): error is SSEErrorResponseError { + return error instanceof SSEErrorResponseError; +} + /** * 判断错误类型(异步版本) * @@ -801,6 +848,11 @@ export async function categorizeErrorAsync(error: Error): Promise return ErrorCategory.PROVIDER_ERROR; // 空响应视为供应商问题 } + // 优先级 3.3: SSE 首块错误响应 - 计入熔断器 + 触发故障切换 + if (error instanceof SSEErrorResponseError) { + return ErrorCategory.PROVIDER_ERROR; // SSE 错误响应视为供应商问题 + } + // 优先级 4: 其他所有错误都是系统错误 // 包括: // - TypeError: fetch failed (网络层错误) diff --git a/src/app/v1/_lib/proxy/forwarder.ts b/src/app/v1/_lib/proxy/forwarder.ts index 87e8d02c6..55b7aa4f1 100644 --- a/src/app/v1/_lib/proxy/forwarder.ts +++ b/src/app/v1/_lib/proxy/forwarder.ts @@ -23,6 +23,7 @@ import { } from "@/lib/proxy-agent"; import { SessionManager } from "@/lib/session-manager"; import { CONTEXT_1M_BETA_HEADER, shouldApplyContext1m } from "@/lib/special-attributes"; +import { detectSSEFirstBlockError } from "@/lib/utils/sse"; import { isVendorTypeCircuitOpen, recordVendorTypeAllEndpointsTimeout, @@ -47,6 +48,7 @@ import { isHttp2Error, isSSLCertificateError, ProxyError, + SSEErrorResponseError, sanitizeUrl, } from "./errors"; import { mapClientFormatToTransformer, mapProviderTypeToTransformer } from "./format-mapper"; @@ -407,6 +409,71 @@ export class ProxyForwarder { }); } } + } else { + // ========== SSE 首块错误检测(流式)========== + // 场景:HTTP 200 但首个 SSE event 是 error(如高并发时某些服务返回错误) + // 策略:预读首块(通常 < 1KB),检测后使用原始 Response(clone 保护原始流) + const clonedResponse = response.clone(); + const reader = clonedResponse.body?.getReader(); + + if (reader) { + try { + // 读取首块(设置超时保护,避免首块读取阻塞过久) + const firstChunk = await Promise.race([ + reader.read(), + new Promise((_, reject) => + setTimeout(() => reject(new Error("SSE first chunk read timeout")), 5000) + ), + ]); + + if (!firstChunk.done && firstChunk.value) { + const decoder = new TextDecoder(); + const firstChunkText = decoder.decode(firstChunk.value, { stream: true }); + + // 检测是否为 error event + const sseError = detectSSEFirstBlockError(firstChunkText); + + if (sseError) { + // 取消 reader 避免资源泄漏 + await reader.cancel(); + + logger.warn("ProxyForwarder: SSE first block contains error", { + providerId: currentProvider.id, + providerName: currentProvider.name, + errorCode: sseError.errorCode, + errorMessage: sseError.errorMessage, + attemptNumber: attemptCount, + }); + + throw new SSEErrorResponseError( + currentProvider.id, + currentProvider.name, + sseError.errorCode, + sseError.errorMessage, + sseError.rawData + ); + } + } + + // 正常 SSE:取消克隆的 reader(原始 response.body 未被消费) + await reader.cancel(); + } catch (error) { + // 确保 reader 被取消,避免资源泄漏 + await reader.cancel().catch(() => {}); + + // 如果是 SSEErrorResponseError,直接抛出(触发重试) + if (error instanceof SSEErrorResponseError) { + throw error; + } + + // 超时或其他错误:记录日志但不阻止流程(降级到原有行为) + logger.debug("ProxyForwarder: SSE first chunk detection failed, proceeding", { + providerId: currentProvider.id, + providerName: currentProvider.name, + error: error instanceof Error ? error.message : String(error), + }); + } + } } // ========== 成功分支 ========== diff --git a/src/lib/utils/sse.ts b/src/lib/utils/sse.ts index 936bf663b..b9e1cec08 100644 --- a/src/lib/utils/sse.ts +++ b/src/lib/utils/sse.ts @@ -96,3 +96,81 @@ export function parseSSEDataForDisplay(sseText: string): ParsedSSEEvent[] { return evt.data.trim() !== "[DONE]"; }); } + +/** + * SSE 首块错误检测结果 + */ +export interface SSEFirstBlockError { + errorCode?: string; + errorMessage: string; + rawData: string; +} + +/** + * 检测 SSE 文本首个 event 是否为 error + * + * 支持的 error 格式: + * 1. event: error + data: {...} + * 2. 首个 data block 中包含 error 对象(type: "error" 或顶层 error 字段) + * + * @param sseText - SSE 文本(首块或完整) + * @returns 如果是 error event,返回解析后的错误信息;否则返回 null + */ +export function detectSSEFirstBlockError(sseText: string): SSEFirstBlockError | null { + const events = parseSSEData(sseText); + + if (events.length === 0) { + return null; + } + + const firstEvent = events[0]; + + // 情况 1:显式的 event: error + if (firstEvent.event === "error") { + const data = firstEvent.data; + if (typeof data === "object" && data !== null) { + const errorObj = (data as Record).error as + | Record + | undefined; + return { + errorCode: (errorObj?.code as string | undefined) ?? (errorObj?.type as string | undefined), + errorMessage: + (errorObj?.message as string) || + ((data as Record).message as string) || + "Unknown SSE error", + rawData: sseText.slice(0, 500), + }; + } + return { + errorMessage: typeof data === "string" ? data : "Unknown SSE error", + rawData: sseText.slice(0, 500), + }; + } + + // 情况 2:首个 data block 类型为 error(如 Claude 的 type: "error") + if (typeof firstEvent.data === "object" && firstEvent.data !== null) { + const data = firstEvent.data as Record; + + // 2.1: type: "error" 格式(Claude API 错误格式) + if (data.type === "error") { + const errorObj = data.error as Record | undefined; + return { + errorCode: (errorObj?.type as string | undefined) ?? (data.code as string | undefined), + errorMessage: (errorObj?.message as string) || (data.message as string) || "Unknown error", + rawData: sseText.slice(0, 500), + }; + } + + // 2.2: 顶层 error 字段(某些服务直接返回 data: {"error": {...}}) + if (data.error && typeof data.error === "object") { + const errorObj = data.error as Record; + return { + errorCode: (errorObj.code as string | undefined) ?? (errorObj.type as string | undefined), + errorMessage: (errorObj.message as string) || "Unknown SSE error", + rawData: sseText.slice(0, 500), + }; + } + } + + return null; +} diff --git a/tests/unit/proxy/sse-error-detection.test.ts b/tests/unit/proxy/sse-error-detection.test.ts new file mode 100644 index 000000000..0bece9fa9 --- /dev/null +++ b/tests/unit/proxy/sse-error-detection.test.ts @@ -0,0 +1,394 @@ +import { describe, expect, test } from "vitest"; +import { detectSSEFirstBlockError } from "@/lib/utils/sse"; +import { + SSEErrorResponseError, + isSSEErrorResponseError, + categorizeErrorAsync, + ErrorCategory, +} from "@/app/v1/_lib/proxy/errors"; + +function buildSse(events: Array<{ event?: string; data: unknown }>): string { + return events + .flatMap(({ event, data }) => { + const lines: string[] = []; + if (event) { + lines.push(`event: ${event}`); + } + lines.push(`data: ${JSON.stringify(data)}`); + lines.push(""); + return lines; + }) + .join("\n"); +} + +describe("detectSSEFirstBlockError", () => { + describe("explicit event: error format", () => { + test("should detect event: error with nested error object", () => { + const sseText = `event: error +data: {"error":{"code":"1302","message":"High concurrency usage of this API, please reduce concurrency or contact customer service to increase limits"},"request_id":"2026012315301053eda1059a6e4f85"} + +data: [DONE]`; + + const result = detectSSEFirstBlockError(sseText); + + expect(result).not.toBeNull(); + expect(result?.errorCode).toBe("1302"); + expect(result?.errorMessage).toBe( + "High concurrency usage of this API, please reduce concurrency or contact customer service to increase limits" + ); + expect(result?.rawData).toBeDefined(); + }); + + test("should detect event: error with type field instead of code", () => { + const sseText = buildSse([ + { + event: "error", + data: { + error: { + type: "overloaded_error", + message: "Server is overloaded", + }, + }, + }, + ]); + + const result = detectSSEFirstBlockError(sseText); + + expect(result).not.toBeNull(); + expect(result?.errorCode).toBe("overloaded_error"); + expect(result?.errorMessage).toBe("Server is overloaded"); + }); + + test("should detect event: error with plain text data", () => { + const sseText = `event: error +data: Connection reset by peer + +`; + + const result = detectSSEFirstBlockError(sseText); + + expect(result).not.toBeNull(); + expect(result?.errorMessage).toBe("Connection reset by peer"); + }); + + test("should detect event: error with top-level message", () => { + const sseText = buildSse([ + { + event: "error", + data: { + message: "Rate limit exceeded", + }, + }, + ]); + + const result = detectSSEFirstBlockError(sseText); + + expect(result).not.toBeNull(); + expect(result?.errorMessage).toBe("Rate limit exceeded"); + }); + }); + + describe("type: error format (Claude API style)", () => { + test("should detect data block with type: error", () => { + const sseText = buildSse([ + { + event: "error", + data: { + type: "error", + error: { + type: "rate_limit_error", + message: "Rate limit exceeded. Please retry after a moment.", + }, + }, + }, + ]); + + const result = detectSSEFirstBlockError(sseText); + + expect(result).not.toBeNull(); + expect(result?.errorCode).toBe("rate_limit_error"); + expect(result?.errorMessage).toBe("Rate limit exceeded. Please retry after a moment."); + }); + + test("should detect type: error without event prefix", () => { + const sseText = `data: {"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}} + +`; + + const result = detectSSEFirstBlockError(sseText); + + expect(result).not.toBeNull(); + expect(result?.errorCode).toBe("overloaded_error"); + expect(result?.errorMessage).toBe("Overloaded"); + }); + }); + + describe("embedded error object format", () => { + test("should detect first data block with top-level error object", () => { + const sseText = `data: {"error":{"code":"500","message":"Internal server error"}} + +`; + + const result = detectSSEFirstBlockError(sseText); + + expect(result).not.toBeNull(); + expect(result?.errorCode).toBe("500"); + expect(result?.errorMessage).toBe("Internal server error"); + }); + + test("should detect error with type field in error object", () => { + const sseText = buildSse([ + { + data: { + error: { + type: "authentication_error", + message: "Invalid API key", + }, + }, + }, + ]); + + const result = detectSSEFirstBlockError(sseText); + + expect(result).not.toBeNull(); + expect(result?.errorCode).toBe("authentication_error"); + expect(result?.errorMessage).toBe("Invalid API key"); + }); + }); + + describe("normal SSE streams (should return null)", () => { + test("should return null for normal Claude message_start event", () => { + const sseText = buildSse([ + { + event: "message_start", + data: { + type: "message_start", + message: { + id: "msg_01XFDUDYJgAACzvnptvVoYEL", + type: "message", + role: "assistant", + content: [], + model: "claude-3-opus-20240229", + }, + }, + }, + ]); + + const result = detectSSEFirstBlockError(sseText); + + expect(result).toBeNull(); + }); + + test("should return null for normal content_block_delta event", () => { + const sseText = buildSse([ + { + event: "content_block_delta", + data: { + type: "content_block_delta", + index: 0, + delta: { type: "text_delta", text: "Hello" }, + }, + }, + ]); + + const result = detectSSEFirstBlockError(sseText); + + expect(result).toBeNull(); + }); + + test("should return null for OpenAI chat completion chunk", () => { + const sseText = buildSse([ + { + data: { + id: "chatcmpl-123", + object: "chat.completion.chunk", + created: 1677652288, + model: "gpt-4", + choices: [ + { + index: 0, + delta: { content: "Hello" }, + finish_reason: null, + }, + ], + }, + }, + ]); + + const result = detectSSEFirstBlockError(sseText); + + expect(result).toBeNull(); + }); + + test("should return null for Gemini stream response", () => { + const sseText = `data: {"candidates":[{"content":{"parts":[{"text":"Hello"}],"role":"model"}}]} + +`; + + const result = detectSSEFirstBlockError(sseText); + + expect(result).toBeNull(); + }); + + test("should return null for empty SSE text", () => { + const result = detectSSEFirstBlockError(""); + + expect(result).toBeNull(); + }); + + test("should return null for SSE with only [DONE]", () => { + const sseText = `data: [DONE] + +`; + + const result = detectSSEFirstBlockError(sseText); + + expect(result).toBeNull(); + }); + }); + + describe("edge cases", () => { + test("should handle malformed JSON gracefully", () => { + const sseText = `event: error +data: {invalid json} + +`; + + const result = detectSSEFirstBlockError(sseText); + + expect(result).not.toBeNull(); + expect(result?.errorMessage).toBe("{invalid json}"); + }); + + test("should truncate rawData to 500 characters", () => { + const longMessage = "A".repeat(1000); + const sseText = buildSse([ + { + event: "error", + data: { + error: { + code: "500", + message: longMessage, + }, + }, + }, + ]); + + const result = detectSSEFirstBlockError(sseText); + + expect(result).not.toBeNull(); + expect(result?.rawData.length).toBeLessThanOrEqual(500); + }); + }); +}); + +describe("SSEErrorResponseError", () => { + test("should create error with correct properties", () => { + const error = new SSEErrorResponseError( + 1, + "test-provider", + "1302", + "High concurrency", + "raw data here" + ); + + expect(error.name).toBe("SSEErrorResponseError"); + expect(error.providerId).toBe(1); + expect(error.providerName).toBe("test-provider"); + expect(error.errorCode).toBe("1302"); + expect(error.errorMessage).toBe("High concurrency"); + expect(error.rawData).toBe("raw data here"); + expect(error.message).toBe("SSE error response from provider test-provider: High concurrency"); + }); + + test("should create error without error code", () => { + const error = new SSEErrorResponseError( + 1, + "test-provider", + undefined, + "Some error", + "raw data" + ); + + expect(error.errorCode).toBeUndefined(); + expect(error.errorMessage).toBe("Some error"); + }); + + test("getClientSafeMessage should return error message", () => { + const error = new SSEErrorResponseError(1, "test-provider", "500", "Server error", "raw"); + + expect(error.getClientSafeMessage()).toBe("Server error"); + }); + + test("getClientSafeMessage should return default message when errorMessage is empty", () => { + const error = new SSEErrorResponseError(1, "test-provider", "500", "", "raw"); + + expect(error.getClientSafeMessage()).toBe("Upstream returned error in SSE stream"); + }); + + test("toJSON should return structured metadata", () => { + const error = new SSEErrorResponseError( + 1, + "test-provider", + "1302", + "High concurrency", + "raw data" + ); + + expect(error.toJSON()).toEqual({ + type: "sse_error_response", + provider_id: 1, + provider_name: "test-provider", + error_code: "1302", + error_message: "High concurrency", + raw_data: "raw data", + }); + }); + + test("toJSON should truncate raw_data to 500 characters", () => { + const longRawData = "X".repeat(1000); + const error = new SSEErrorResponseError(1, "test-provider", "500", "Error", longRawData); + + const json = error.toJSON(); + + expect(json.raw_data.length).toBe(500); + }); +}); + +describe("isSSEErrorResponseError type guard", () => { + test("should return true for SSEErrorResponseError", () => { + const error = new SSEErrorResponseError(1, "test", "500", "Error", "raw"); + + expect(isSSEErrorResponseError(error)).toBe(true); + }); + + test("should return false for regular Error", () => { + const error = new Error("Regular error"); + + expect(isSSEErrorResponseError(error)).toBe(false); + }); + + test("should return false for null", () => { + expect(isSSEErrorResponseError(null)).toBe(false); + }); + + test("should return false for undefined", () => { + expect(isSSEErrorResponseError(undefined)).toBe(false); + }); +}); + +describe("categorizeErrorAsync with SSEErrorResponseError", () => { + test("should categorize SSEErrorResponseError as PROVIDER_ERROR", async () => { + const error = new SSEErrorResponseError( + 1, + "test-provider", + "1302", + "High concurrency", + "raw data" + ); + + const category = await categorizeErrorAsync(error); + + expect(category).toBe(ErrorCategory.PROVIDER_ERROR); + }); +});