Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/app/v1/_lib/proxy/error-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
getErrorOverrideAsync,
isEmptyResponseError,
isRateLimitError,
isSSEErrorResponseError,
ProxyError,
type RateLimitError,
} from "./errors";
Expand Down Expand Up @@ -76,6 +77,11 @@ export class ProxyErrorHandler {
clientErrorMessage = error.getClientSafeMessage();
logErrorMessage = error.message; // 日志保留完整信息
statusCode = 502; // Bad Gateway
} else if (isSSEErrorResponseError(error)) {
// SSEErrorResponseError: SSE 流中首块返回错误
clientErrorMessage = error.getClientSafeMessage();
logErrorMessage = error.message; // 日志保留完整信息(包含供应商名称)
statusCode = 502; // Bad Gateway - 上游返回了有效 HTTP 200 但内容是错误
} else if (error instanceof Error) {
clientErrorMessage = error.message;
logErrorMessage = error.message;
Expand Down
52 changes: 52 additions & 0 deletions src/app/v1/_lib/proxy/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,53 @@ export function isEmptyResponseError(error: unknown): error is EmptyResponseErro
return error instanceof EmptyResponseError;
}

/**
* SSE 首块错误响应 - 用于检测上游返回 HTTP 200 但 SSE 首个 event 为 error 的情况
*
* 场景:某些上游服务在高并发时返回 HTTP 200 + text/event-stream,
* 但实际内容是 error event,需要触发重试和熔断器记录
*/
export class SSEErrorResponseError extends Error {
constructor(
public readonly providerId: number,
public readonly providerName: string,
public readonly errorCode: string | undefined,
public readonly errorMessage: string,
public readonly rawData: string
) {
super(`SSE error response from provider ${providerName}: ${errorMessage}`);
this.name = "SSEErrorResponseError";
}

/**
* 获取适合返回给客户端的安全错误信息
*/
getClientSafeMessage(): string {
return this.errorMessage || "Upstream returned error in SSE stream";
}

/**
* 获取适合记录的 JSON 元数据
*/
toJSON() {
return {
type: "sse_error_response",
provider_id: this.providerId,
provider_name: this.providerName,
error_code: this.errorCode,
error_message: this.errorMessage,
raw_data: this.rawData.slice(0, 500),
};
}
}

/**
* 类型守卫:检查是否为 SSEErrorResponseError
*/
export function isSSEErrorResponseError(error: unknown): error is SSEErrorResponseError {
return error instanceof SSEErrorResponseError;
}

/**
* 判断错误类型(异步版本)
*
Expand Down Expand Up @@ -801,6 +848,11 @@ export async function categorizeErrorAsync(error: Error): Promise<ErrorCategory>
return ErrorCategory.PROVIDER_ERROR; // 空响应视为供应商问题
}

// 优先级 3.3: SSE 首块错误响应 - 计入熔断器 + 触发故障切换
if (error instanceof SSEErrorResponseError) {
return ErrorCategory.PROVIDER_ERROR; // SSE 错误响应视为供应商问题
}

// 优先级 4: 其他所有错误都是系统错误
// 包括:
// - TypeError: fetch failed (网络层错误)
Expand Down
67 changes: 67 additions & 0 deletions src/app/v1/_lib/proxy/forwarder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
} from "@/lib/proxy-agent";
import { SessionManager } from "@/lib/session-manager";
import { CONTEXT_1M_BETA_HEADER, shouldApplyContext1m } from "@/lib/special-attributes";
import { detectSSEFirstBlockError } from "@/lib/utils/sse";
import {
isVendorTypeCircuitOpen,
recordVendorTypeAllEndpointsTimeout,
Expand All @@ -47,6 +48,7 @@ import {
isHttp2Error,
isSSLCertificateError,
ProxyError,
SSEErrorResponseError,
sanitizeUrl,
} from "./errors";
import { mapClientFormatToTransformer, mapProviderTypeToTransformer } from "./format-mapper";
Expand Down Expand Up @@ -407,6 +409,71 @@ export class ProxyForwarder {
});
}
}
} else {
// ========== SSE 首块错误检测(流式)==========
// 场景:HTTP 200 但首个 SSE event 是 error(如高并发时某些服务返回错误)
// 策略:预读首块(通常 < 1KB),检测后使用原始 Response(clone 保护原始流)
const clonedResponse = response.clone();
const reader = clonedResponse.body?.getReader();

if (reader) {
try {
// 读取首块(设置超时保护,避免首块读取阻塞过久)
const firstChunk = await Promise.race([
reader.read(),
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error("SSE first chunk read timeout")), 5000)
),
]);
Comment on lines +422 to +427
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only reads first network chunk - if SSE error event is split across chunks, detection may fail. Consider reading until first complete SSE event (detect \n\n boundary) or setting a reasonable byte limit (e.g., 4KB) instead of just reader.read() once.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/forwarder.ts
Line: 422:427

Comment:
Only reads first network chunk - if SSE error event is split across chunks, detection may fail. Consider reading until first complete SSE event (detect `\n\n` boundary) or setting a reasonable byte limit (e.g., 4KB) instead of just `reader.read()` once.

How can I resolve this? If you propose a fix, please make it concise.


if (!firstChunk.done && firstChunk.value) {
const decoder = new TextDecoder();
const firstChunkText = decoder.decode(firstChunk.value, { stream: true });

// 检测是否为 error event
const sseError = detectSSEFirstBlockError(firstChunkText);

if (sseError) {
// 取消 reader 避免资源泄漏
await reader.cancel();

logger.warn("ProxyForwarder: SSE first block contains error", {
providerId: currentProvider.id,
providerName: currentProvider.name,
errorCode: sseError.errorCode,
errorMessage: sseError.errorMessage,
attemptNumber: attemptCount,
});

throw new SSEErrorResponseError(
currentProvider.id,
currentProvider.name,
sseError.errorCode,
sseError.errorMessage,
sseError.rawData
);
}
}

// 正常 SSE:取消克隆的 reader(原始 response.body 未被消费)
await reader.cancel();
} catch (error) {
// 确保 reader 被取消,避免资源泄漏
await reader.cancel().catch(() => {});

// 如果是 SSEErrorResponseError,直接抛出(触发重试)
if (error instanceof SSEErrorResponseError) {
throw error;
}

// 超时或其他错误:记录日志但不阻止流程(降级到原有行为)
logger.debug("ProxyForwarder: SSE first chunk detection failed, proceeding", {
providerId: currentProvider.id,
providerName: currentProvider.name,
error: error instanceof Error ? error.message : String(error),
});
}
}
}

// ========== 成功分支 ==========
Expand Down
78 changes: 78 additions & 0 deletions src/lib/utils/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,81 @@ export function parseSSEDataForDisplay(sseText: string): ParsedSSEEvent[] {
return evt.data.trim() !== "[DONE]";
});
}

/**
* SSE 首块错误检测结果
*/
export interface SSEFirstBlockError {
errorCode?: string;
errorMessage: string;
rawData: string;
}

/**
* 检测 SSE 文本首个 event 是否为 error
*
* 支持的 error 格式:
* 1. event: error + data: {...}
* 2. 首个 data block 中包含 error 对象(type: "error" 或顶层 error 字段)
*
* @param sseText - SSE 文本(首块或完整)
* @returns 如果是 error event,返回解析后的错误信息;否则返回 null
*/
export function detectSSEFirstBlockError(sseText: string): SSEFirstBlockError | null {
const events = parseSSEData(sseText);

if (events.length === 0) {
return null;
}

const firstEvent = events[0];

// 情况 1:显式的 event: error
if (firstEvent.event === "error") {
const data = firstEvent.data;
if (typeof data === "object" && data !== null) {
const errorObj = (data as Record<string, unknown>).error as
| Record<string, unknown>
| undefined;
return {
errorCode: (errorObj?.code as string | undefined) ?? (errorObj?.type as string | undefined),
errorMessage:
(errorObj?.message as string) ||
((data as Record<string, unknown>).message as string) ||
"Unknown SSE error",
rawData: sseText.slice(0, 500),
};
}
return {
errorMessage: typeof data === "string" ? data : "Unknown SSE error",
rawData: sseText.slice(0, 500),
};
}

// 情况 2:首个 data block 类型为 error(如 Claude 的 type: "error")
if (typeof firstEvent.data === "object" && firstEvent.data !== null) {
const data = firstEvent.data as Record<string, unknown>;

// 2.1: type: "error" 格式(Claude API 错误格式)
if (data.type === "error") {
const errorObj = data.error as Record<string, unknown> | undefined;
return {
errorCode: (errorObj?.type as string | undefined) ?? (data.code as string | undefined),
errorMessage: (errorObj?.message as string) || (data.message as string) || "Unknown error",
rawData: sseText.slice(0, 500),
};
}

// 2.2: 顶层 error 字段(某些服务直接返回 data: {"error": {...}})
if (data.error && typeof data.error === "object") {
const errorObj = data.error as Record<string, unknown>;
return {
errorCode: (errorObj.code as string | undefined) ?? (errorObj.type as string | undefined),
errorMessage: (errorObj.message as string) || "Unknown SSE error",
rawData: sseText.slice(0, 500),
};
}
}

return null;
}
Comment on lines +119 to +176
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

detectSSEFirstBlockError 函数负责解析多种 SSE 错误格式,这导致其逻辑较为复杂且存在重复代码(例如 rawData: sseText.slice(0, 500))。为了提高可读性、可维护性和测试性,建议将不同的错误检测逻辑提取为独立的私有辅助函数。例如,可以有 _extractFromExplicitErrorEvent(data)_extractFromClaudeStyleError(data)_extractFromTopLevelErrorField(data) 等函数,每个函数专注于处理一种特定的错误结构。

这将使主函数更简洁,并允许对每种错误解析逻辑进行更精细的单元测试。

export function detectSSEFirstBlockError(sseText: string): SSEFirstBlockError | null {
  const events = parseSSEData(sseText);

  if (events.length === 0) {
    return null;
  }

  const firstEvent = events[0];
  const rawDataSnippet = sseText.slice(0, 500);

  // Helper to extract error details from a data object
  const extractErrorDetails = (data: Record<string, unknown>): SSEFirstBlockError => {
    const errorObj = data.error as Record<string, unknown> | undefined;
    return {
      errorCode: (errorObj?.code as string | undefined) ?? (errorObj?.type as string | undefined) ?? (data.code as string | undefined),
      errorMessage:
        (errorObj?.message as string) || (data.message as string) || "Unknown SSE error",
      rawData: rawDataSnippet,
    };
  };

  // 情况 1:显式的 event: error
  if (firstEvent.event === "error") {
    const data = firstEvent.data;
    if (typeof data === "object" && data !== null) {
      return extractErrorDetails(data);
    }
    return {
      errorMessage: typeof data === "string" ? data : "Unknown SSE error",
      rawData: rawDataSnippet,
    };
  }

  // 情况 2:首个 data block 类型为 error(如 Claude 的 type: "error")
  if (typeof firstEvent.data === "object" && firstEvent.data !== null) {
    const data = firstEvent.data as Record<string, unknown>;

    // 2.1: type: "error" 格式(Claude API 错误格式)
    if (data.type === "error") {
      return extractErrorDetails(data);
    }

    // 2.2: 顶层 error 字段(某些服务直接返回 data: {"error": {...}})
    if (data.error && typeof data.error === "object") {
      return extractErrorDetails(data);
    }
  }

  return null;
}

Loading
Loading