Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 38 additions & 9 deletions src/app/v1/_lib/proxy/forwarder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2346,11 +2346,32 @@ export class ProxyForwarder {
const fallbackInit = { ...init };
delete fallbackInit.dispatcher;
try {
Comment on lines 2346 to 2348
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeout not cleared on retry
When re-starting responseTimeoutId after direct fallback, the previous timer (created before the proxy attempt) may still be active. If responseTimeoutId wasn’t cleared in all proxy-failure paths, this introduces multiple timers sharing the same responseController, so a stale timer can abort a later attempt unexpectedly. Ensure you always clear any existing responseTimeoutId before assigning a new one (or ensure it’s definitively cleared on every path that reaches this block).

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/forwarder.ts
Line: 2346:2348

Comment:
**Timeout not cleared on retry**
When re-starting `responseTimeoutId` after direct fallback, the previous timer (created before the proxy attempt) may still be active. If `responseTimeoutId` wasn’t cleared in all proxy-failure paths, this introduces multiple timers sharing the same `responseController`, so a stale timer can abort a later attempt unexpectedly. Ensure you always clear any existing `responseTimeoutId` before assigning a new one (or ensure it’s definitively cleared on every path that reaches this block).

How can I resolve this? If you propose a fix, please make it concise.

response = await fetch(proxyUrl, fallbackInit);
response = useErrorTolerantFetch
? await ProxyForwarder.fetchWithoutAutoDecode(
proxyUrl,
fallbackInit,
provider.id,
provider.name,
session
)
: await fetch(proxyUrl, fallbackInit);
logger.info("ProxyForwarder: Direct connection succeeded after proxy failure", {
providerId: provider.id,
providerName: provider.name,
});

// 重新启动响应超时计时器(如果之前有配置超时时间)
// 注意:responseTimeoutId 在 catch 块开头已被清除,这里只需检查 responseTimeoutMs
if (responseTimeoutMs > 0) {
responseTimeoutId = setTimeout(() => {
responseController.abort();
logger.warn("ProxyForwarder: Response timeout after direct fallback", {
providerId: provider.id,
providerName: provider.name,
responseTimeoutMs,
});
}, responseTimeoutMs);
}
// 成功后跳过 throw,继续执行后续逻辑
} catch (directError) {
// 直连也失败,抛出原始错误
Expand Down Expand Up @@ -2444,14 +2465,22 @@ export class ProxyForwarder {
// 检查 HTTP 错误状态(4xx/5xx 均视为失败,触发重试)
// 注意:用户要求所有 4xx 都重试,包括 401、403、429 等
if (!response.ok) {
// HTTP 错误:清除响应超时定时器
if (responseTimeoutId) {
clearTimeout(responseTimeoutId);
// ⚠️ HTTP 错误:不要在读取响应体之前清除响应超时定时器
// 原因:某些上游会在返回 4xx/5xx 后“卡住不结束 body”,
// 若提前 clearTimeout,会导致 ProxyError.fromUpstreamResponse() 的 response.text() 无限等待,
// 从而让整条请求链路(含客户端)悬挂,前端表现为一直“请求中”。
//
// 正确策略:保留 response timeout 继续监控 body 读取,并在 finally 里清理定时器。
try {
throw await ProxyError.fromUpstreamResponse(response, {
id: provider.id,
name: provider.name,
});
} finally {
if (responseTimeoutId) {
clearTimeout(responseTimeoutId);
}
}
Comment on lines +2474 to 2483
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeout timer not cleared

In the !response.ok branch, clearTimeout(responseTimeoutId) only runs if ProxyError.fromUpstreamResponse() completes. If that call hangs (the exact scenario this PR fixes), the finally never executes, so the timer is left running and may fire later during subsequent retries/requests, aborting via the shared responseController and producing confusing logs/behavior. This should clear the timer once the timeout fires (e.g., inside the setTimeout callback) so it can’t linger past the request lifecycle.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/forwarder.ts
Line: 2453:2462

Comment:
**Timeout timer not cleared**

In the `!response.ok` branch, `clearTimeout(responseTimeoutId)` only runs if `ProxyError.fromUpstreamResponse()` completes. If that call hangs (the exact scenario this PR fixes), the `finally` never executes, so the timer is left running and may fire later during subsequent retries/requests, aborting via the shared `responseController` and producing confusing logs/behavior. This should clear the timer once the timeout fires (e.g., inside the `setTimeout` callback) so it can’t linger past the request lifecycle.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +2474 to 2483
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaked timeout after abort

In the !response.ok path you await ProxyError.fromUpstreamResponse(response, ...) inside a try/finally and only clear responseTimeoutId in finally. If the timeout fires while response.text() is pending, the abort will happen but response.text() may still not resolve promptly, so the finally might not run until much later (or ever if undici never settles), leaving responseTimeoutId live past the request lifecycle. That can cause the timer to fire later and abort via the same responseController, affecting subsequent retries/requests and producing misleading logs. Clearing the timer when it fires (inside the setTimeout callback) avoids this lingering timer.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/forwarder.ts
Line: 2453:2462

Comment:
**Leaked timeout after abort**

In the `!response.ok` path you `await ProxyError.fromUpstreamResponse(response, ...)` inside a `try/finally` and only clear `responseTimeoutId` in `finally`. If the timeout fires while `response.text()` is pending, the abort will happen but `response.text()` may still not resolve promptly, so the `finally` might not run until much later (or ever if undici never settles), leaving `responseTimeoutId` live past the request lifecycle. That can cause the timer to fire later and abort via the same `responseController`, affecting subsequent retries/requests and producing misleading logs. Clearing the timer when it fires (inside the `setTimeout` callback) avoids this lingering timer.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines 2467 to 2483
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeout cleanup never runs

In the !response.ok path you rely on the finally to clearTimeout(responseTimeoutId), but the whole point of this change is that ProxyError.fromUpstreamResponse() can hang on response.text(). If that promise never settles (even after responseController.abort()), the finally won’t execute and the timer remains live past the request lifecycle, potentially firing later and aborting via the same responseController. Consider clearing the timeout when it fires (inside the setTimeout callback) so it can’t linger if the awaited body read never resolves.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/forwarder.ts
Line: 2467:2483

Comment:
**Timeout cleanup never runs**

In the `!response.ok` path you rely on the `finally` to `clearTimeout(responseTimeoutId)`, but the whole point of this change is that `ProxyError.fromUpstreamResponse()` can hang on `response.text()`. If that promise never settles (even after `responseController.abort()`), the `finally` won’t execute and the timer remains live past the request lifecycle, potentially firing later and aborting via the same `responseController`. Consider clearing the timeout when it fires (inside the `setTimeout` callback) so it can’t linger if the awaited body read never resolves.

How can I resolve this? If you propose a fix, please make it concise.

throw await ProxyError.fromUpstreamResponse(response, {
id: provider.id,
name: provider.name,
});
}

// 将响应超时清理函数和 controller 引用附加到 session,供 response-handler 使用
Expand Down Expand Up @@ -2822,7 +2851,7 @@ export class ProxyForwarder {
// 将 Gunzip 流转换为 Web 流(容错版本)
bodyStream = ProxyForwarder.nodeStreamToWebStreamSafe(gunzip, providerId, providerName);

// 移�� content-encoding 和 content-length(避免下游再解压或使用错误长度)
// 移除 content-encoding 和 content-length(避免下游再解压或使用错误长度)
responseHeaders.delete("content-encoding");
responseHeaders.delete("content-length");
} else {
Comment on lines 2853 to 2857
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Garbled comment encoding

This comment appears to contain mojibake (// 移�� content-encoding ...). It’s likely an encoding/copy artifact introduced in this PR and should be fixed to readable text to avoid confusing future maintainers.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/forwarder.ts
Line: 2832:2836

Comment:
**Garbled comment encoding**

This comment appears to contain mojibake (`// 移�� content-encoding ...`). It’s likely an encoding/copy artifact introduced in this PR and should be fixed to readable text to avoid confusing future maintainers.

How can I resolve this? If you propose a fix, please make it concise.

Expand Down
6 changes: 5 additions & 1 deletion src/repository/provider-endpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,15 @@ export async function deleteProviderEndpointProbeLogsBeforeDateBatch(input: {
batchSize?: number;
}): Promise<number> {
const batchSize = input.batchSize ?? 10_000;
// Note: 兼容性:某些运行时/驱动组合会把 Date 参数序列化成
// "Mon Feb ... GMT+0800 (China Standard Time)" 这类字符串,Postgres 无法解析(time zone not recognized)。
// 统一转为 ISO-8601,并显式 cast 为 timestamptz,避免清理任务异常导致日志堆积。
const beforeDateIso = input.beforeDate.toISOString();

const result = await db.execute(sql`
WITH ids_to_delete AS (
SELECT id FROM provider_endpoint_probe_logs
WHERE created_at < ${input.beforeDate}
WHERE created_at < CAST(${beforeDateIso} AS timestamptz)
ORDER BY created_at ASC
LIMIT ${batchSize}
FOR UPDATE SKIP LOCKED
Expand Down
299 changes: 299 additions & 0 deletions tests/unit/proxy/proxy-forwarder-nonok-body-hang.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
import { createServer } from "node:http";
import type { Socket } from "node:net";
import { describe, expect, test, vi } from "vitest";
import { ProxyForwarder } from "@/app/v1/_lib/proxy/forwarder";
import { ProxyError } from "@/app/v1/_lib/proxy/errors";
import { ProxySession } from "@/app/v1/_lib/proxy/session";
import type { Provider } from "@/types/provider";

const mocks = vi.hoisted(() => {
return {
isHttp2Enabled: vi.fn(async () => false),
};
});

vi.mock("@/lib/config", async (importOriginal) => {
const actual = await importOriginal<typeof import("@/lib/config")>();
return {
...actual,
isHttp2Enabled: mocks.isHttp2Enabled,
};
});

vi.mock("@/lib/logger", () => ({
logger: {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
trace: vi.fn(),
error: vi.fn(),
fatal: vi.fn(),
},
}));

function createProvider(overrides: Partial<Provider> = {}): Provider {
return {
id: 1,
name: "p1",
url: "http://127.0.0.1:1",
key: "k",
providerVendorId: null,
isEnabled: true,
weight: 1,
priority: 0,
groupPriorities: null,
costMultiplier: 1,
groupTag: null,
providerType: "openai-compatible",
preserveClientIp: false,
modelRedirects: null,
allowedModels: null,
mcpPassthroughType: "none",
mcpPassthroughUrl: null,
limit5hUsd: null,
limitDailyUsd: null,
dailyResetMode: "fixed",
dailyResetTime: "00:00",
limitWeeklyUsd: null,
limitMonthlyUsd: null,
limitTotalUsd: null,
totalCostResetAt: null,
limitConcurrentSessions: 0,
maxRetryAttempts: null,
circuitBreakerFailureThreshold: 5,
circuitBreakerOpenDuration: 1_800_000,
circuitBreakerHalfOpenSuccessThreshold: 2,
proxyUrl: null,
proxyFallbackToDirect: false,
firstByteTimeoutStreamingMs: 30_000,
streamingIdleTimeoutMs: 10_000,
requestTimeoutNonStreamingMs: 1_000,
websiteUrl: null,
faviconUrl: null,
cacheTtlPreference: null,
context1mPreference: null,
codexReasoningEffortPreference: null,
codexReasoningSummaryPreference: null,
codexTextVerbosityPreference: null,
codexParallelToolCallsPreference: null,
anthropicMaxTokensPreference: null,
anthropicThinkingBudgetPreference: null,
geminiGoogleSearchPreference: null,
tpm: 0,
rpm: 0,
rpd: 0,
cc: 0,
createdAt: new Date(),
updatedAt: new Date(),
deletedAt: null,
...overrides,
};
}

function createSession(params?: { clientAbortSignal?: AbortSignal | null }): ProxySession {
const headers = new Headers();
const session = Object.create(ProxySession.prototype);

Object.assign(session, {
startTime: Date.now(),
method: "POST",
requestUrl: new URL("https://example.com/v1/chat/completions"),
headers,
originalHeaders: new Headers(headers),
headerLog: JSON.stringify(Object.fromEntries(headers.entries())),
request: {
model: "gpt-5.2",
log: "(test)",
message: {
model: "gpt-5.2",
messages: [{ role: "user", content: "hi" }],
},
},
userAgent: null,
context: null,
clientAbortSignal: params?.clientAbortSignal ?? null,
userName: "test-user",
authState: { success: true, user: null, key: null, apiKey: null },
provider: null,
messageContext: null,
sessionId: null,
requestSequence: 1,
originalFormat: "claude",
providerType: null,
originalModelName: null,
originalUrlPathname: null,
providerChain: [],
cacheTtlResolved: null,
context1mApplied: false,
specialSettings: [],
cachedPriceData: undefined,
cachedBillingModelSource: undefined,
isHeaderModified: () => false,
});

return session as ProxySession;
}

async function startServer(): Promise<{ baseUrl: string; close: () => Promise<void> }> {
const sockets = new Set<Socket>();
const server = createServer((req, res) => {
// 模拟上游异常:返回 403,但永远不结束 body(导致 response.text() 无限等待)
res.writeHead(403, { "content-type": "application/json" });
res.write(JSON.stringify({ error: { message: "forbidden" } }));

// 连接/请求关闭时,主动销毁响应,避免测试进程残留挂起连接(降低 flakiness)
const cleanup = () => {
try {
res.destroy();
} catch {
// ignore
}
};

req.on("aborted", cleanup);
req.on("close", cleanup);
});

server.on("connection", (socket) => {
sockets.add(socket);
socket.on("close", () => sockets.delete(socket));
});

const baseUrl = await new Promise<string>((resolve, reject) => {
server.once("error", reject);
server.listen(0, "127.0.0.1", () => {
const addr = server.address();
if (!addr || typeof addr === "string") {
reject(new Error("Failed to get server address"));
return;
}
resolve(`http://127.0.0.1:${addr.port}`);
});
});

const close = async () => {
// server.close 只停止接收新连接;这里显式销毁已有 socket,避免挂死/跑飞
for (const socket of sockets) {
try {
socket.destroy();
} catch {
// ignore
}
}
sockets.clear();

await new Promise<void>((resolve) => server.close(() => resolve()));
};

return { baseUrl, close };
}

describe("ProxyForwarder - non-ok response body hang", () => {
test("HTTP 4xx/5xx 在 body 不结束时也应被超时中断,避免请求悬挂", async () => {
const { baseUrl, close } = await startServer();
const clientAbortController = new AbortController();

try {
const provider = createProvider({
url: baseUrl,
requestTimeoutNonStreamingMs: 200,
});

const session = createSession({ clientAbortSignal: clientAbortController.signal });
session.setProvider(provider);

// 直接测试 doForward 以隔离单次转发行为,避免 send() 的重试/供应商切换逻辑干扰。
const doForward = (
ProxyForwarder as unknown as {
doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown;
}
).doForward;

const forwardPromise = doForward.call(
ProxyForwarder,
session,
provider,
baseUrl
) as Promise<Response>;

const result = await Promise.race([
forwardPromise.then(
() => ({ type: "resolved" as const }),
(error) => ({ type: "rejected" as const, error })
),
new Promise<{ type: "timeout" }>((resolve) =>
setTimeout(() => resolve({ type: "timeout" as const }), 2_000)
),
]);

if (result.type === "timeout") {
// 兜底:避免回归时测试套件整体挂死
clientAbortController.abort(new Error("test_timeout"));
throw new Error("doForward 超时未返回:可能存在非 ok 响应体读取悬挂问题");
}

expect(result.type).toBe("rejected");
expect(result.type === "rejected" ? result.error : null).toBeInstanceOf(ProxyError);

const err = (result as { type: "rejected"; error: unknown }).error as ProxyError;
expect(err.statusCode).toBe(403);
} finally {
await close();
}
});

test("代理失败降级到直连后也必须恢复 response timeout,避免非 ok 响应体读取悬挂", async () => {
const { baseUrl, close } = await startServer();
const clientAbortController = new AbortController();

try {
const provider = createProvider({
url: baseUrl,
proxyUrl: "http://127.0.0.1:1", // 不可用的代理,触发 fallbackToDirect
proxyFallbackToDirect: true,
requestTimeoutNonStreamingMs: 200,
});

const session = createSession({ clientAbortSignal: clientAbortController.signal });
session.setProvider(provider);

// 直接测试 doForward 以隔离单次转发行为,避免 send() 的重试/供应商切换逻辑干扰。
const doForward = (
ProxyForwarder as unknown as {
doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown;
}
).doForward;

const forwardPromise = doForward.call(
ProxyForwarder,
session,
provider,
baseUrl
) as Promise<Response>;

const result = await Promise.race([
forwardPromise.then(
() => ({ type: "resolved" as const }),
(error) => ({ type: "rejected" as const, error })
),
new Promise<{ type: "timeout" }>((resolve) =>
setTimeout(() => resolve({ type: "timeout" as const }), 2_000)
),
]);

if (result.type === "timeout") {
// 兜底:避免回归时测试套件整体挂死
clientAbortController.abort(new Error("test_timeout"));
throw new Error("doForward 超时未返回:可能存在代理降级后 response timeout 未恢复的问题");
}

expect(result.type).toBe("rejected");
expect(result.type === "rejected" ? result.error : null).toBeInstanceOf(ProxyError);

const err = (result as { type: "rejected"; error: unknown }).error as ProxyError;
expect(err.statusCode).toBe(403);
} finally {
await close();
}
});
});
Loading