diff --git a/src/app/v1/_lib/proxy/forwarder.ts b/src/app/v1/_lib/proxy/forwarder.ts index d7fd898b..4da8922b 100644 --- a/src/app/v1/_lib/proxy/forwarder.ts +++ b/src/app/v1/_lib/proxy/forwarder.ts @@ -2346,11 +2346,32 @@ export class ProxyForwarder { const fallbackInit = { ...init }; delete fallbackInit.dispatcher; try { - response = await fetch(proxyUrl, fallbackInit); + response = useErrorTolerantFetch + ? await ProxyForwarder.fetchWithoutAutoDecode( + proxyUrl, + fallbackInit, + provider.id, + provider.name, + session + ) + : await fetch(proxyUrl, fallbackInit); logger.info("ProxyForwarder: Direct connection succeeded after proxy failure", { providerId: provider.id, providerName: provider.name, }); + + // 重新启动响应超时计时器(如果之前有配置超时时间) + // 注意:responseTimeoutId 在 catch 块开头已被清除,这里只需检查 responseTimeoutMs + if (responseTimeoutMs > 0) { + responseTimeoutId = setTimeout(() => { + responseController.abort(); + logger.warn("ProxyForwarder: Response timeout after direct fallback", { + providerId: provider.id, + providerName: provider.name, + responseTimeoutMs, + }); + }, responseTimeoutMs); + } // 成功后跳过 throw,继续执行后续逻辑 } catch (directError) { // 直连也失败,抛出原始错误 @@ -2444,14 +2465,22 @@ export class ProxyForwarder { // 检查 HTTP 错误状态(4xx/5xx 均视为失败,触发重试) // 注意:用户要求所有 4xx 都重试,包括 401、403、429 等 if (!response.ok) { - // HTTP 错误:清除响应超时定时器 - if (responseTimeoutId) { - clearTimeout(responseTimeoutId); + // ⚠️ HTTP 错误:不要在读取响应体之前清除响应超时定时器 + // 原因:某些上游会在返回 4xx/5xx 后“卡住不结束 body”, + // 若提前 clearTimeout,会导致 ProxyError.fromUpstreamResponse() 的 response.text() 无限等待, + // 从而让整条请求链路(含客户端)悬挂,前端表现为一直“请求中”。 + // + // 正确策略:保留 response timeout 继续监控 body 读取,并在 finally 里清理定时器。 + try { + throw await ProxyError.fromUpstreamResponse(response, { + id: provider.id, + name: provider.name, + }); + } finally { + if (responseTimeoutId) { + clearTimeout(responseTimeoutId); + } } - throw await ProxyError.fromUpstreamResponse(response, { - id: provider.id, - name: provider.name, - }); } // 将响应超时清理函数和 controller 引用附加到 session,供 response-handler 使用 @@ -2822,7 +2851,7 @@ export class ProxyForwarder { // 将 Gunzip 流转换为 Web 流(容错版本) bodyStream = ProxyForwarder.nodeStreamToWebStreamSafe(gunzip, providerId, providerName); - // 移�� content-encoding 和 content-length(避免下游再解压或使用错误长度) + // 移除 content-encoding 和 content-length(避免下游再解压或使用错误长度) responseHeaders.delete("content-encoding"); responseHeaders.delete("content-length"); } else { diff --git a/src/repository/provider-endpoints.ts b/src/repository/provider-endpoints.ts index 18446615..4cd80765 100644 --- a/src/repository/provider-endpoints.ts +++ b/src/repository/provider-endpoints.ts @@ -370,11 +370,15 @@ export async function deleteProviderEndpointProbeLogsBeforeDateBatch(input: { batchSize?: number; }): Promise { const batchSize = input.batchSize ?? 10_000; + // Note: 兼容性:某些运行时/驱动组合会把 Date 参数序列化成 + // "Mon Feb ... GMT+0800 (China Standard Time)" 这类字符串,Postgres 无法解析(time zone not recognized)。 + // 统一转为 ISO-8601,并显式 cast 为 timestamptz,避免清理任务异常导致日志堆积。 + const beforeDateIso = input.beforeDate.toISOString(); const result = await db.execute(sql` WITH ids_to_delete AS ( SELECT id FROM provider_endpoint_probe_logs - WHERE created_at < ${input.beforeDate} + WHERE created_at < CAST(${beforeDateIso} AS timestamptz) ORDER BY created_at ASC LIMIT ${batchSize} FOR UPDATE SKIP LOCKED diff --git a/tests/unit/proxy/proxy-forwarder-nonok-body-hang.test.ts b/tests/unit/proxy/proxy-forwarder-nonok-body-hang.test.ts new file mode 100644 index 00000000..d1a1e3cf --- /dev/null +++ b/tests/unit/proxy/proxy-forwarder-nonok-body-hang.test.ts @@ -0,0 +1,299 @@ +import { createServer } from "node:http"; +import type { Socket } from "node:net"; +import { describe, expect, test, vi } from "vitest"; +import { ProxyForwarder } from "@/app/v1/_lib/proxy/forwarder"; +import { ProxyError } from "@/app/v1/_lib/proxy/errors"; +import { ProxySession } from "@/app/v1/_lib/proxy/session"; +import type { Provider } from "@/types/provider"; + +const mocks = vi.hoisted(() => { + return { + isHttp2Enabled: vi.fn(async () => false), + }; +}); + +vi.mock("@/lib/config", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + isHttp2Enabled: mocks.isHttp2Enabled, + }; +}); + +vi.mock("@/lib/logger", () => ({ + logger: { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + trace: vi.fn(), + error: vi.fn(), + fatal: vi.fn(), + }, +})); + +function createProvider(overrides: Partial = {}): Provider { + return { + id: 1, + name: "p1", + url: "http://127.0.0.1:1", + key: "k", + providerVendorId: null, + isEnabled: true, + weight: 1, + priority: 0, + groupPriorities: null, + costMultiplier: 1, + groupTag: null, + providerType: "openai-compatible", + preserveClientIp: false, + modelRedirects: null, + allowedModels: null, + mcpPassthroughType: "none", + mcpPassthroughUrl: null, + limit5hUsd: null, + limitDailyUsd: null, + dailyResetMode: "fixed", + dailyResetTime: "00:00", + limitWeeklyUsd: null, + limitMonthlyUsd: null, + limitTotalUsd: null, + totalCostResetAt: null, + limitConcurrentSessions: 0, + maxRetryAttempts: null, + circuitBreakerFailureThreshold: 5, + circuitBreakerOpenDuration: 1_800_000, + circuitBreakerHalfOpenSuccessThreshold: 2, + proxyUrl: null, + proxyFallbackToDirect: false, + firstByteTimeoutStreamingMs: 30_000, + streamingIdleTimeoutMs: 10_000, + requestTimeoutNonStreamingMs: 1_000, + websiteUrl: null, + faviconUrl: null, + cacheTtlPreference: null, + context1mPreference: null, + codexReasoningEffortPreference: null, + codexReasoningSummaryPreference: null, + codexTextVerbosityPreference: null, + codexParallelToolCallsPreference: null, + anthropicMaxTokensPreference: null, + anthropicThinkingBudgetPreference: null, + geminiGoogleSearchPreference: null, + tpm: 0, + rpm: 0, + rpd: 0, + cc: 0, + createdAt: new Date(), + updatedAt: new Date(), + deletedAt: null, + ...overrides, + }; +} + +function createSession(params?: { clientAbortSignal?: AbortSignal | null }): ProxySession { + const headers = new Headers(); + const session = Object.create(ProxySession.prototype); + + Object.assign(session, { + startTime: Date.now(), + method: "POST", + requestUrl: new URL("https://example.com/v1/chat/completions"), + headers, + originalHeaders: new Headers(headers), + headerLog: JSON.stringify(Object.fromEntries(headers.entries())), + request: { + model: "gpt-5.2", + log: "(test)", + message: { + model: "gpt-5.2", + messages: [{ role: "user", content: "hi" }], + }, + }, + userAgent: null, + context: null, + clientAbortSignal: params?.clientAbortSignal ?? null, + userName: "test-user", + authState: { success: true, user: null, key: null, apiKey: null }, + provider: null, + messageContext: null, + sessionId: null, + requestSequence: 1, + originalFormat: "claude", + providerType: null, + originalModelName: null, + originalUrlPathname: null, + providerChain: [], + cacheTtlResolved: null, + context1mApplied: false, + specialSettings: [], + cachedPriceData: undefined, + cachedBillingModelSource: undefined, + isHeaderModified: () => false, + }); + + return session as ProxySession; +} + +async function startServer(): Promise<{ baseUrl: string; close: () => Promise }> { + const sockets = new Set(); + const server = createServer((req, res) => { + // 模拟上游异常:返回 403,但永远不结束 body(导致 response.text() 无限等待) + res.writeHead(403, { "content-type": "application/json" }); + res.write(JSON.stringify({ error: { message: "forbidden" } })); + + // 连接/请求关闭时,主动销毁响应,避免测试进程残留挂起连接(降低 flakiness) + const cleanup = () => { + try { + res.destroy(); + } catch { + // ignore + } + }; + + req.on("aborted", cleanup); + req.on("close", cleanup); + }); + + server.on("connection", (socket) => { + sockets.add(socket); + socket.on("close", () => sockets.delete(socket)); + }); + + const baseUrl = await new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(0, "127.0.0.1", () => { + const addr = server.address(); + if (!addr || typeof addr === "string") { + reject(new Error("Failed to get server address")); + return; + } + resolve(`http://127.0.0.1:${addr.port}`); + }); + }); + + const close = async () => { + // server.close 只停止接收新连接;这里显式销毁已有 socket,避免挂死/跑飞 + for (const socket of sockets) { + try { + socket.destroy(); + } catch { + // ignore + } + } + sockets.clear(); + + await new Promise((resolve) => server.close(() => resolve())); + }; + + return { baseUrl, close }; +} + +describe("ProxyForwarder - non-ok response body hang", () => { + test("HTTP 4xx/5xx 在 body 不结束时也应被超时中断,避免请求悬挂", async () => { + const { baseUrl, close } = await startServer(); + const clientAbortController = new AbortController(); + + try { + const provider = createProvider({ + url: baseUrl, + requestTimeoutNonStreamingMs: 200, + }); + + const session = createSession({ clientAbortSignal: clientAbortController.signal }); + session.setProvider(provider); + + // 直接测试 doForward 以隔离单次转发行为,避免 send() 的重试/供应商切换逻辑干扰。 + const doForward = ( + ProxyForwarder as unknown as { + doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown; + } + ).doForward; + + const forwardPromise = doForward.call( + ProxyForwarder, + session, + provider, + baseUrl + ) as Promise; + + const result = await Promise.race([ + forwardPromise.then( + () => ({ type: "resolved" as const }), + (error) => ({ type: "rejected" as const, error }) + ), + new Promise<{ type: "timeout" }>((resolve) => + setTimeout(() => resolve({ type: "timeout" as const }), 2_000) + ), + ]); + + if (result.type === "timeout") { + // 兜底:避免回归时测试套件整体挂死 + clientAbortController.abort(new Error("test_timeout")); + throw new Error("doForward 超时未返回:可能存在非 ok 响应体读取悬挂问题"); + } + + expect(result.type).toBe("rejected"); + expect(result.type === "rejected" ? result.error : null).toBeInstanceOf(ProxyError); + + const err = (result as { type: "rejected"; error: unknown }).error as ProxyError; + expect(err.statusCode).toBe(403); + } finally { + await close(); + } + }); + + test("代理失败降级到直连后也必须恢复 response timeout,避免非 ok 响应体读取悬挂", async () => { + const { baseUrl, close } = await startServer(); + const clientAbortController = new AbortController(); + + try { + const provider = createProvider({ + url: baseUrl, + proxyUrl: "http://127.0.0.1:1", // 不可用的代理,触发 fallbackToDirect + proxyFallbackToDirect: true, + requestTimeoutNonStreamingMs: 200, + }); + + const session = createSession({ clientAbortSignal: clientAbortController.signal }); + session.setProvider(provider); + + // 直接测试 doForward 以隔离单次转发行为,避免 send() 的重试/供应商切换逻辑干扰。 + const doForward = ( + ProxyForwarder as unknown as { + doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown; + } + ).doForward; + + const forwardPromise = doForward.call( + ProxyForwarder, + session, + provider, + baseUrl + ) as Promise; + + const result = await Promise.race([ + forwardPromise.then( + () => ({ type: "resolved" as const }), + (error) => ({ type: "rejected" as const, error }) + ), + new Promise<{ type: "timeout" }>((resolve) => + setTimeout(() => resolve({ type: "timeout" as const }), 2_000) + ), + ]); + + if (result.type === "timeout") { + // 兜底:避免回归时测试套件整体挂死 + clientAbortController.abort(new Error("test_timeout")); + throw new Error("doForward 超时未返回:可能存在代理降级后 response timeout 未恢复的问题"); + } + + expect(result.type).toBe("rejected"); + expect(result.type === "rejected" ? result.error : null).toBeInstanceOf(ProxyError); + + const err = (result as { type: "rejected"; error: unknown }).error as ProxyError; + expect(err.statusCode).toBe(403); + } finally { + await close(); + } + }); +});