fix(proxy): 修复请求卡死(AgentPool 驱逐阻塞)#759
Conversation
- Gemini SSE 透传不再在仅收到 headers 时清除首字节超时 - 首块数据到达后清除首字节超时,并支持 streamingIdleTimeoutMs 静默超时中断 - stats 任务失败时尽量落库/结束追踪,避免请求长期停留在 requesting - 添加回归测试覆盖无首块/首块后延迟/中途静默三种场景
Summary of ChangesHello @tesgth032, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! 此拉取请求旨在解决 Gemini 流式透传请求在特定情况下可能长时间卡死的问题。通过调整首字节超时的清除时机、引入流式静默超时机制以及优化异常处理流程,确保了流式连接的健壮性,避免了因上游响应行为异常导致的客户端悬挂和资源占用,并提供了全面的回归测试来验证这些改进。 Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthrough在 Gemini 代理响应路径引入流式处理增强:10MB 尾窗内存缓冲、首字节与空闲超时 watchdog、按块解码/缓冲/压缩与使用量计费、abortReason 传播及鲁棒最终化/回退;新增针对流超时的单元测试;代理池优先调用 destroy() 以避免 close() 挂起。 Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request effectively addresses the critical issue of Gemini streaming passthrough requests getting stuck by optimizing first-byte timeout handling and introducing a streaming idle timeout watchdog. The changes correctly defer clearing the first-byte timeout until the first data block is received, and the streamingIdleTimeoutMs mechanism ensures connections terminate correctly if the upstream provider goes silent mid-stream. Error handling in statsPromise is also enhanced to ensure request logging and tracking even in case of streaming failures, preventing "orphaned" records. However, it introduces a significant Denial of Service (DoS) vulnerability due to unbounded memory buffering of the upstream response body in the background statistics task, which could lead to Out-Of-Memory (OOM) errors and crash the proxy service. It is highly recommended to implement response size limits before deploying these changes to production.
| while (true) { | ||
| if (session.clientAbortSignal?.aborted) break; | ||
|
|
||
| const { done, value } = await reader.read(); | ||
| if (done) { | ||
| streamEndedNormally = true; | ||
| const wasResponseControllerAborted = | ||
| sessionWithController.responseController?.signal.aborted ?? false; | ||
| const clientAborted = session.clientAbortSignal?.aborted ?? false; | ||
|
|
||
| // abort -> nodeStreamToWebStreamSafe 可能会把错误吞掉并 close(),导致 done=true; | ||
| // 这里必须结合 abort signal 判断是否为“自然结束”。 | ||
| if (wasResponseControllerAborted || clientAborted) { | ||
| streamEndedNormally = false; | ||
| abortReason = abortReason ?? "STREAM_RESPONSE_TIMEOUT"; | ||
| } else { | ||
| streamEndedNormally = true; | ||
| } | ||
| break; | ||
| } | ||
|
|
||
| if (value) { | ||
| if (isFirstChunk) { | ||
| isFirstChunk = false; | ||
| session.recordTtfb(); | ||
| clearResponseTimeoutOnce(value.length); | ||
| } | ||
| chunks.push(decoder.decode(value, { stream: true })); | ||
|
|
||
| // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠) | ||
| if (!isFirstChunk) { | ||
| startIdleTimer(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| const flushed = decoder.decode(); | ||
| if (flushed) chunks.push(flushed); | ||
| const allContent = chunks.join(""); | ||
| clearIdleTimer(); | ||
| const allContent = flushAndJoin(); |
There was a problem hiding this comment.
The ProxyResponseHandler implementation for Gemini stream passthrough buffers the entire upstream response body in memory. This occurs in the background statsPromise task to facilitate usage tracking (token counting) and "fake 200" error detection.
Specifically, the code collects all received chunks into a chunks array (Line 970) and then joins them into a single large string allContent (Line 980 and 1045). There is no enforced limit on the size of the response being buffered.
An attacker or a malicious/compromised upstream provider can send an extremely large response (e.g., several gigabytes). This will cause the Node.js process to exhaust its heap memory and crash with an Out-Of-Memory (OOM) error. Since this is a gateway/proxy service, a crash of the process results in a denial of service for all concurrent requests and users.
Furthermore, the buffered allContent is passed to JSON.parse (via finalizeRequestStats -> parseUsageFromResponseText), which can block the Node.js event loop for a significant duration if the input is large, further contributing to DoS. It is also stored in Redis (Line 985) without size validation, which could lead to Redis memory exhaustion.
Recommendation:
Implement a strict limit on the maximum response size that the proxy is willing to buffer for statistics and logging.
- Track the total size of chunks received in the
whileloop. - If the total size exceeds a safe threshold (e.g., 10MB or 50MB), abort the stream, log a warning, and skip the full buffering/parsing logic.
- Ensure that the client still receives the data (if possible) or is disconnected gracefully, but the proxy must protect its own memory resources.
| } catch { | ||
| // ignore | ||
| } |
There was a problem hiding this comment.
在 finally 块中,clearResponseTimeoutOnce() 的 try...catch 语句使用了空的 catch 块。虽然这可以防止清理过程中的崩溃,但它可能会隐藏潜在的问题,如果这些清理操作持续失败,将难以调试。建议在此 catch 块中添加警告日志,以记录清理错误,这对于调试非常有价值。
} catch (e) {
logger.warn("[ResponseHandler] Gemini passthrough: Error during clearResponseTimeoutOnce cleanup", {
taskId,
error: e instanceof Error ? e.message : String(e),
});
}| } catch { | ||
| // ignore | ||
| } |
There was a problem hiding this comment.
在 finally 块中,reader?.cancel() 的 try...catch 语句使用了空的 catch 块。虽然这可以防止清理过程中的崩溃,但它可能会隐藏潜在的问题,如果这些清理操作持续失败,将难以调试。建议在此 catch 块中添加警告日志,以记录清理错误,这对于调试非常有价值。
} catch (e) {
logger.warn("[ResponseHandler] Gemini passthrough: Error during reader.cancel cleanup", {
taskId,
error: e instanceof Error ? e.message : String(e),
});
}| } catch { | ||
| // ignore | ||
| } |
There was a problem hiding this comment.
在 finally 块中,reader?.releaseLock() 的 try...catch 语句使用了空的 catch 块。虽然这可以防止清理过程中的崩溃,但它可能会隐藏潜在的问题,如果这些清理操作持续失败,将难以调试。建议在此 catch 块中添加警告日志,以记录清理错误,这对于调试非常有价值。
} catch (e) {
logger.warn("[ResponseHandler] Gemini passthrough: Error during reader.releaseLock cleanup", {
taskId,
error: e instanceof Error ? e.message : String(e),
});
}There was a problem hiding this comment.
Code Review Summary
No significant issues identified in this PR. The changes correctly fix a real hang/stuck bug in the Gemini stream passthrough path by deferring the first-byte timeout clearing from the response-headers stage to the first actual body chunk, and by adding an idle timeout watchdog for the passthrough path (consistent with the non-passthrough path). The error handling in the catch block is substantially improved with proper finalization and fallback to persistRequestFailure, preventing orphan "requesting" records. The test file provides good behavioral coverage of the three core scenarios.
PR Size: M
- Lines changed: 631 (601 additions, 30 deletions)
- Files changed: 2
Review Coverage
- Logic and correctness - Clean
- Security (OWASP Top 10) - Clean
- Error handling - Clean
- Type safety - Clean
- Documentation accuracy - Clean
- Test coverage - Adequate (3 behavioral tests covering first-byte timeout, timeout clearing after first chunk, and idle timeout)
- Code clarity - Good
Automated review by Claude AI
| if (wasResponseControllerAborted || clientAborted) { | ||
| streamEndedNormally = false; | ||
| abortReason = abortReason ?? "STREAM_RESPONSE_TIMEOUT"; | ||
| } else { |
There was a problem hiding this comment.
Client abort misclassified
In the passthrough stats reader, when done === true and clientAborted is true, the code sets abortReason = abortReason ?? "STREAM_RESPONSE_TIMEOUT" (see the if (wasResponseControllerAborted || clientAborted) branch). This will record client-driven disconnects as a timeout, which can skew stats/circuit attribution and contradicts the later catch block that maps client aborts to CLIENT_ABORTED. Consider setting abortReason based on which signal aborted (e.g. client -> CLIENT_ABORTED, controller -> STREAM_RESPONSE_TIMEOUT).
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 955:958
Comment:
**Client abort misclassified**
In the passthrough stats reader, when `done === true` and `clientAborted` is true, the code sets `abortReason = abortReason ?? "STREAM_RESPONSE_TIMEOUT"` (see the `if (wasResponseControllerAborted || clientAborted)` branch). This will record client-driven disconnects as a timeout, which can skew stats/circuit attribution and contradicts the later catch block that maps client aborts to `CLIENT_ABORTED`. Consider setting `abortReason` based on which signal aborted (e.g. client -> `CLIENT_ABORTED`, controller -> `STREAM_RESPONSE_TIMEOUT`).
How can I resolve this? If you propose a fix, please make it concise.| isFirstChunk = false; | ||
| session.recordTtfb(); | ||
| clearResponseTimeoutOnce(value.length); | ||
| } | ||
| chunks.push(decoder.decode(value, { stream: true })); | ||
|
|
||
| // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠) | ||
| if (!isFirstChunk) { | ||
| startIdleTimer(); | ||
| } |
There was a problem hiding this comment.
Idle timer never resets
startIdleTimer() is only invoked once (on the first received chunk) because it’s guarded by if (!isFirstChunk) inside the if (value) block, but isFirstChunk is set to false immediately before. That means subsequent chunks won’t restart the watchdog, so streamingIdleTimeoutMs won’t enforce “no data for N ms” during an ongoing stream. The idle timer should be restarted on every chunk after the first (or at least on every chunk) to correctly detect mid-stream silence.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 966:975
Comment:
**Idle timer never resets**
`startIdleTimer()` is only invoked once (on the first received chunk) because it’s guarded by `if (!isFirstChunk)` inside the `if (value)` block, but `isFirstChunk` is set to `false` immediately before. That means subsequent chunks won’t restart the watchdog, so `streamingIdleTimeoutMs` won’t enforce “no data for N ms” during an ongoing stream. The idle timer should be restarted on every chunk after the first (or at least on every chunk) to correctly detect mid-stream silence.
How can I resolve this? If you propose a fix, please make it concise.| } catch (error) { | ||
| if (!isClientAbortError(error as Error)) { | ||
| logger.error("[ResponseHandler] Gemini passthrough stats task failed:", error); | ||
| const err = error as Error; | ||
| const clientAborted = session.clientAbortSignal?.aborted ?? false; | ||
| const isResponseControllerAborted = | ||
| sessionWithController.responseController?.signal.aborted ?? false; | ||
| const isIdleTimeout = err.message?.includes("streaming_idle"); |
There was a problem hiding this comment.
Catch may throw ReferenceError
In the passthrough statsPromise catch block, isIdleTimeout is computed via err.message?.includes("streaming_idle"). If err.message is undefined (allowed by the Error type), this expression evaluates to undefined, and later isIdleTimeout ? ... will throw (undefined is not a boolean). Use a boolean coercion (e.g. !!err.message?.includes(...)) or default to false to keep the catch block from crashing while handling an error.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1012:1017
Comment:
**Catch may throw ReferenceError**
In the passthrough `statsPromise` catch block, `isIdleTimeout` is computed via `err.message?.includes("streaming_idle")`. If `err.message` is `undefined` (allowed by the Error type), this expression evaluates to `undefined`, and later `isIdleTimeout ? ...` will throw (`undefined` is not a boolean). Use a boolean coercion (e.g. `!!err.message?.includes(...)`) or default to `false` to keep the catch block from crashing while handling an error.
How can I resolve this? If you propose a fix, please make it concise.| test("不应在仅收到 headers 时清除首字节超时:无首块数据时应在窗口内中断避免悬挂", async () => { | ||
| asyncTasks.length = 0; | ||
| const { baseUrl, close } = await startSseServer((_req, res) => { | ||
| res.writeHead(200, { | ||
| "content-type": "text/event-stream", | ||
| "cache-control": "no-cache", | ||
| connection: "keep-alive", | ||
| }); | ||
| res.flushHeaders(); | ||
| // 不发送任何 body,保持连接不结束 | ||
| }); | ||
|
|
||
| const clientAbortController = new AbortController(); | ||
| try { | ||
| const provider = createProvider({ | ||
| url: baseUrl, | ||
| firstByteTimeoutStreamingMs: 200, | ||
| }); | ||
| const session = createSession({ | ||
| clientAbortSignal: clientAbortController.signal, | ||
| messageId: 1, | ||
| userId: 1, | ||
| }); | ||
| session.setProvider(provider); | ||
|
|
||
| const doForward = ( | ||
| ProxyForwarder as unknown as { | ||
| doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown; | ||
| } | ||
| ).doForward; | ||
|
|
||
| const upstreamResponse = (await doForward.call( | ||
| ProxyForwarder, | ||
| session, | ||
| provider, | ||
| baseUrl | ||
| )) as Response; | ||
|
|
||
| const clientResponse = await ProxyResponseHandler.dispatch(session, upstreamResponse); | ||
| const reader = clientResponse.body?.getReader(); | ||
| expect(reader).toBeTruthy(); | ||
| if (!reader) throw new Error("Missing body reader"); | ||
|
|
||
| const firstRead = await readWithTimeout(reader, 1500); | ||
| if (!firstRead.ok) { | ||
| clientAbortController.abort(new Error("test_timeout")); | ||
| throw new Error("首字节超时未生效:读首块数据在 1.5s 内仍未返回(可能仍会卡死)"); | ||
| } | ||
| } finally { |
There was a problem hiding this comment.
Test doesn’t assert timeout
In the “no first chunk” case, the test only checks that reader.read() returns within 1.5s, but it doesn’t assert why it returned (e.g., done === true due to abort vs a chunk arriving). As written, the test could pass even if the proxy returns an immediate empty body (or any early completion) unrelated to firstByteTimeoutStreamingMs. To make this a regression test for “headers-only then hang”, assert that the read completes with done === true (or that the session/controller was aborted) and ideally that it happens after ~firstByteTimeoutStreamingMs rather than just “< 1.5s”.
Prompt To Fix With AI
This is a comment left during a code review.
Path: tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts
Line: 240:288
Comment:
**Test doesn’t assert timeout**
In the “no first chunk” case, the test only checks that `reader.read()` returns within 1.5s, but it doesn’t assert *why* it returned (e.g., `done === true` due to abort vs a chunk arriving). As written, the test could pass even if the proxy returns an immediate empty body (or any early completion) unrelated to `firstByteTimeoutStreamingMs`. To make this a regression test for “headers-only then hang”, assert that the read completes with `done === true` (or that the session/controller was aborted) and ideally that it happens after ~`firstByteTimeoutStreamingMs` rather than just “< 1.5s”.
How can I resolve this? If you propose a fix, please make it concise.- AgentPool 驱逐时优先 destroy,避免 close 等待 in-flight 导致 getAgent/cleanup 卡住\n- Gemini SSE 透传 stats 读取增加内存上限、完善 abort reason 判定与清理日志\n- 补齐/加强回归测试
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@src/app/v1/_lib/proxy/response-handler.ts`:
- Around line 1149-1152: The call to reader?.cancel() currently uses void
reader?.cancel() which discards the returned Promise and can cause unhandled
rejection warnings; change this so the Promise rejection is handled—either await
reader?.cancel() inside an async try/catch or append .catch(...) to the Promise
(e.g., reader?.cancel().catch(err => { /* handle or log via responseHandler
logger */ })); update the code paths around reader?.cancel() in
response-handler.ts to ensure any error is caught and logged/ignored
appropriately.
- Around line 856-860: Remove all emoji characters from the modified comment
blocks in response-handler.ts: delete the "⚠️" and "⭐" emoji found in the
comment that begins "注意:不要在“仅收到响应头”时清除首字节超时。" and the other nearby explanatory
comments so they contain plain text only; ensure the revised comments still
convey the same guidance and comply with the rule forbidding emoji in code,
comments, or string literals.
In `@src/lib/proxy-agent/agent-pool.ts`:
- Around line 358-370: The comment inside closeAgent contains an emoji (⚠️)
which violates the no-emoji rule; remove the emoji and rephrase the warning as
plain text (e.g., "Warning:" or "Note:") while preserving the existing
explanation about preferring destroy() over close() and the rationale about
in-flight requests; update the comment near the checks for agent.destroy and
agent.close accordingly and run the linter to ensure the file no longer contains
emoji in comments or string literals.
🧹 Nitpick comments (2)
tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts (2)
52-71: 部分依赖未 mock,后台 stats 任务中的 DB 调用可能会抛异常。
@/repository/system-config(getSystemSettings)和@/repository/model-price(findLatestPriceByModel)未被 mock,这些在finalizeRequestStats→updateRequestCostFromUsage路径中会被调用。虽然这些异常会被 response-handler 内部的 try-catch 捕获,不会导致测试失败(因为测试关注的是超时行为而非统计准确性),但可能会在测试输出中产生虚假的错误日志。如果希望测试输出更干净,可以考虑补充这些 mock:
vi.mock("@/repository/system-config", () => ({ getSystemSettings: vi.fn(async () => ({ billingModelSource: "redirected" })), })); vi.mock("@/repository/model-price", () => ({ findLatestPriceByModel: vi.fn(async () => null), }));不阻塞合并,后续可补充。
270-281: 通过 unsafe cast 访问ProxyForwarder.doForward— 建议验证方法签名的稳定性。
doForward被通过as unknown as { doForward: ... }访问,绕过了访问控制。如果doForward的签名在后续重构中变化,TypeScript 不会在编译时捕获此处的不匹配。这在测试中是常见的做法,不阻塞合并,但建议在方法签名变更时同步更新此测试。
| let timeoutId: ReturnType<typeof setTimeout> | null = null; | ||
| try { | ||
| return await Promise.race([ | ||
| promise, |
There was a problem hiding this comment.
Fake timers left disabled
This test calls vi.useRealTimers() but never restores fake timers afterwards, so later tests in this file that rely on vi.advanceTimersByTime(...) will stop working (or behave nondeterministically). Wrap the real-timer section with a try/finally and call vi.useFakeTimers() in the finally (or explicitly restore the previous timer mode) before the test returns.
Prompt To Fix With AI
This is a comment left during a code review.
Path: tests/unit/lib/proxy-agent/agent-pool.test.ts
Line: 246:249
Comment:
**Fake timers left disabled**
This test calls `vi.useRealTimers()` but never restores fake timers afterwards, so later tests in this file that rely on `vi.advanceTimersByTime(...)` will stop working (or behave nondeterministically). Wrap the real-timer section with a `try/finally` and call `vi.useFakeTimers()` in the `finally` (or explicitly restore the previous timer mode) before the test returns.
How can I resolve this? If you propose a fix, please make it concise.| // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠) | ||
| if (!isFirstChunk) { | ||
| startIdleTimer(); |
There was a problem hiding this comment.
Idle timeout never triggers
startIdleTimer() is only called when value is truthy. If the stream yields { done: false, value: undefined } (allowed by the Streams API), the watchdog won’t be (re)armed and streamingIdleTimeoutMs can fail to abort a stuck connection. Consider starting/resetting the idle timer on every successful reader.read() iteration where done === false (or at least when done===false regardless of value) so “mid-stream silence” can’t bypass the timer.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1024:1026
Comment:
**Idle timeout never triggers**
`startIdleTimer()` is only called when `value` is truthy. If the stream yields `{ done: false, value: undefined }` (allowed by the Streams API), the watchdog won’t be (re)armed and `streamingIdleTimeoutMs` can fail to abort a stuck connection. Consider starting/resetting the idle timer on *every* successful `reader.read()` iteration where `done === false` (or at least when `done===false` regardless of `value`) so “mid-stream silence” can’t bypass the timer.
How can I resolve this? If you propose a fix, please make it concise.| const result1 = await realPool.getAgent(params); | ||
| const agent1 = result1.agent as unknown as { | ||
| close?: { mockImplementation: (fn: () => Promise<void>) => void }; | ||
| destroy?: unknown; | ||
| }; | ||
|
|
||
| // 模拟:close 可能因等待 in-flight 请求结束而长期不返回 | ||
| agent1.close?.mockImplementation(() => new Promise<void>(() => {})); |
There was a problem hiding this comment.
Mocking close likely ineffective
These tests assume result1.agent.close is a Vitest mock (mockImplementation), but AgentPoolImpl.createAgent() constructs a real undici Agent, whose .close() won’t have mockImplementation. As a result, the “close() never resolves” simulation may be a no-op and the test can pass without actually exercising the regression. To make this deterministic, inject a custom Dispatcher into the pool (or mock the undici Agent constructor) so you can provide a close()/destroy() stub that behaves as intended.
Prompt To Fix With AI
This is a comment left during a code review.
Path: tests/unit/lib/proxy-agent/agent-pool.test.ts
Line: 266:273
Comment:
**Mocking close likely ineffective**
These tests assume `result1.agent.close` is a Vitest mock (`mockImplementation`), but `AgentPoolImpl.createAgent()` constructs a real undici `Agent`, whose `.close()` won’t have `mockImplementation`. As a result, the “close() never resolves” simulation may be a no-op and the test can pass without actually exercising the regression. To make this deterministic, inject a custom Dispatcher into the pool (or mock the undici Agent constructor) so you can provide a `close()`/`destroy()` stub that behaves as intended.
How can I resolve this? If you propose a fix, please make it concise.| vi.mock("@/lib/config", async (importOriginal) => { | ||
| const actual = await importOriginal<typeof import("@/lib/config")>(); | ||
| return { | ||
| ...actual, | ||
| isHttp2Enabled: mocks.isHttp2Enabled, | ||
| }; | ||
| }); |
There was a problem hiding this comment.
Mock isHttp2Enabled not reset
mocks.isHttp2Enabled is hoisted and shared across tests, but none of the tests reset its call history/implementation. If other tests in this file (or later suites) change its behavior, assertions can become order-dependent. Add a beforeEach(() => mocks.isHttp2Enabled.mockReset()) (or vi.clearAllMocks()) to keep tests isolated.
Prompt To Fix With AI
This is a comment left during a code review.
Path: tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts
Line: 17:23
Comment:
**Mock isHttp2Enabled not reset**
`mocks.isHttp2Enabled` is hoisted and shared across tests, but none of the tests reset its call history/implementation. If other tests in this file (or later suites) change its behavior, assertions can become order-dependent. Add a `beforeEach(() => mocks.isHttp2Enabled.mockReset())` (or `vi.clearAllMocks()`) to keep tests isolated.
How can I resolve this? If you propose a fix, please make it concise.
Additional Comments (1)
Prompt To Fix With AIThis is a comment left during a code review.
Path: src/lib/proxy-agent/agent-pool.ts
Line: 332:338
Comment:
**Shutdown can still hang**
`shutdown()` awaits `Promise.all(closePromises)` (now calling `destroy()` first), but `Dispatcher.destroy()` isn’t guaranteed to resolve quickly in all implementations. Since the PR goal is “eviction/cleanup/shutdown never blocks new requests”, consider making shutdown/eviction non-blocking (fire-and-forget close/destroy, or add a bounded timeout) so a misbehaving dispatcher can’t stall shutdown/cleanup the same way `close()` did.
How can I resolve this? If you propose a fix, please make it concise. |
- closeAgent 触发 destroy/close 后不 await,避免驱逐路径被卡住\n- Gemini 透传 stats:仅在收到非空 chunk 后清首字节超时\n- 回归测试:覆盖 close 无 destroy 且永不返回的场景
- 透传 stats:done=false 时也会重置 idle timer(避免 value 异常导致 watchdog 不工作)\n- 回归测试:reset hoisted isHttp2Enabled mock,避免跨用例污染
| it("should prefer destroy over close to avoid hanging on in-flight streaming requests", async () => { | ||
| const result = await pool.getAgent({ | ||
| endpointUrl: "https://api.anthropic.com/v1/messages", | ||
| proxyUrl: null, | ||
| enableHttp2: true, | ||
| }); | ||
|
|
||
| const agent = result.agent as unknown as { | ||
| close?: () => Promise<void>; | ||
| destroy?: () => Promise<void>; | ||
| }; | ||
|
|
||
| // 模拟:close 可能因等待 in-flight 请求结束而长期不返回 | ||
| if (typeof agent.close === "function") { | ||
| ( | ||
| agent.close as unknown as { mockImplementation: (fn: () => Promise<void>) => void } | ||
| ).mockImplementation(() => new Promise<void>(() => {})); | ||
| } |
There was a problem hiding this comment.
Mocking close is ineffective
In should prefer destroy over close..., agent.close comes from the mocked undici.Agent (a vi.fn()), but it’s being cast to { mockImplementation } and invoked that way. This will throw at runtime (mockImplementation is not a function) or silently fail depending on how the cast is compiled, making the test non-deterministic and not actually simulating a never-resolving close(). Use vi.mocked(agent.close).mockImplementation(...) (or agent.close = vi.fn(() => new Promise(() => {}))) and likewise stub destroy to assert it was called.
Prompt To Fix With AI
This is a comment left during a code review.
Path: tests/unit/lib/proxy-agent/agent-pool.test.ts
Line: 505:522
Comment:
**Mocking close is ineffective**
In `should prefer destroy over close...`, `agent.close` comes from the mocked `undici.Agent` (a `vi.fn()`), but it’s being cast to `{ mockImplementation }` and invoked that way. This will throw at runtime (`mockImplementation is not a function`) or silently fail depending on how the cast is compiled, making the test non-deterministic and not actually simulating a never-resolving `close()`. Use `vi.mocked(agent.close).mockImplementation(...)` (or `agent.close = vi.fn(() => new Promise(() => {}))`) and likewise stub `destroy` to assert it was called.
How can I resolve this? If you propose a fix, please make it concise.| if (value) { | ||
| if (isFirstChunk) { | ||
| isFirstChunk = false; | ||
| session.recordTtfb(); | ||
| const chunkSize = value.byteLength; | ||
| if (chunkSize > 0) { | ||
| if (isFirstChunk) { | ||
| isFirstChunk = false; | ||
| session.recordTtfb(); | ||
| clearResponseTimeoutOnce(chunkSize); | ||
| } | ||
| pushChunk(decoder.decode(value, { stream: true }), chunkSize); | ||
| } | ||
|
|
||
| // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠) | ||
| if (!isFirstChunk) { | ||
| startIdleTimer(); | ||
| } | ||
| chunks.push(decoder.decode(value, { stream: true })); | ||
| } |
There was a problem hiding this comment.
Idle watchdog can be bypassed
In the passthrough stats reader loop, startIdleTimer() is only called inside if (value) { ... }. If the stream produces { done: false, value: undefined } (allowed by the Streams API) or yields zero-length chunks, the watchdog won’t be (re)armed and streamingIdleTimeoutMs won’t abort a stuck connection mid-stream. To ensure the “no data for N ms” guarantee, arm/reset the idle timer on every successful reader.read() when done === false (regardless of value), and consider treating byteLength === 0 as activity.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1016:1031
Comment:
**Idle watchdog can be bypassed**
In the passthrough stats reader loop, `startIdleTimer()` is only called inside `if (value) { ... }`. If the stream produces `{ done: false, value: undefined }` (allowed by the Streams API) or yields zero-length chunks, the watchdog won’t be (re)armed and `streamingIdleTimeoutMs` won’t abort a stuck connection mid-stream. To ensure the “no data for N ms” guarantee, arm/reset the idle timer on every successful `reader.read()` when `done === false` (regardless of `value`), and consider treating `byteLength === 0` as activity.
How can I resolve this? If you propose a fix, please make it concise.
Additional Comments (1)
Prompt To Fix With AIThis is a comment left during a code review.
Path: src/lib/proxy-agent/agent-pool.ts
Line: 326:343
Comment:
**Shutdown still waits on evictions**
`shutdown()` builds `closePromises` and `await Promise.all(closePromises)`, but `closeAgent()` no longer awaits `destroy()/close()` (it just attaches `.catch`). As a result, `shutdown()` resolves immediately while agents may still be tearing down in the background. If other code assumes `shutdown()` means “all sockets closed” (e.g., tests or reset logic), this breaks that guarantee and can cause flaky behavior/leaked connections. Either make `shutdown()` fire-and-forget explicitly (and rename/adjust callers), or return/await the actual `destroy()/close()` promise(s) in `closeAgent()` for shutdown-only paths.
How can I resolve this? If you propose a fix, please make it concise. |
Additional Comments (1)
Prompt To Fix With AIThis is a comment left during a code review.
Path: tests/unit/lib/proxy-agent/agent-pool.test.ts
Line: 497:509
Comment:
**Broken mockImplementation cast**
`agent.close` is a plain function in this test (from the `undici` mock), but the code casts it to `{ mockImplementation(...) }` and calls that method. At runtime this will throw (`mockImplementation is not a function`) or silently not do what the test intends, so the test can pass/fail nondeterministically and won’t actually simulate a never-resolving `close()`. Prefer stubbing the function directly (e.g. `agent.close = vi.fn(() => new Promise(() => {}))`) or `vi.mocked(agent.close).mockImplementation(...)` so the hang simulation is real.
How can I resolve this? If you propose a fix, please make it concise. |
Additional Comments (1)
Prompt To Fix With AIThis is a comment left during a code review.
Path: src/lib/proxy-agent/agent-pool.ts
Line: 392:414
Comment:
**Shutdown resolves before close**
`closeAgent()` no longer awaits `destroy()`/`close()`, but `shutdown()` still does `await Promise.all(closePromises)` expecting those promises to represent completion. As a result, `shutdown()` (and `resetGlobalAgentPool()`) can resolve while agents are still in the process of closing, which breaks the implied contract for callers/tests and can leave sockets/timers alive past shutdown. Consider awaiting `destroy()` in `shutdown()` (or adding a `closeAgentFast()` for eviction while keeping `shutdown()` blocking).
How can I resolve this? If you propose a fix, please make it concise. |
| if (operation === "destroy") { | ||
| agent.destroy().catch((error) => { | ||
| logger.warn("AgentPool: Error closing agent", { | ||
| key, | ||
| operation, | ||
| error: error instanceof Error ? error.message : String(error), | ||
| }); | ||
| }); |
There was a problem hiding this comment.
Fire-and-forget destroy() can leak resources if it rejects.
The unhandled rejection is logged but the agent may remain partially open. Consider tracking pending cleanup promises and awaiting them in shutdown() to ensure graceful teardown, or document that eviction is best-effort and resources may leak on error.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/lib/proxy-agent/agent-pool.ts
Line: 375:382
Comment:
Fire-and-forget `destroy()` can leak resources if it rejects.
The unhandled rejection is logged but the agent may remain partially open. Consider tracking pending cleanup promises and awaiting them in `shutdown()` to ensure graceful teardown, or document that eviction is best-effort and resources may leak on error.
How can I resolve this? If you propose a fix, please make it concise.| const chunkSize = value?.byteLength ?? 0; | ||
| if (value && chunkSize > 0) { | ||
| if (isFirstChunk) { | ||
| isFirstChunk = false; | ||
| session.recordTtfb(); | ||
| clearResponseTimeoutOnce(chunkSize); | ||
| } | ||
| chunks.push(decoder.decode(value, { stream: true })); | ||
| pushChunk(decoder.decode(value, { stream: true }), chunkSize); | ||
| } |
There was a problem hiding this comment.
Zero-byte chunks won't arm idle watchdog.
When value exists but chunkSize === 0, the code skips pushChunk() and won't enter the if (value && chunkSize > 0) block. Subsequent if (!isFirstChunk) will be false (since isFirstChunk is still true), so startIdleTimer() never runs. A stream yielding only zero-length chunks will bypass idle timeout entirely.
Consider moving startIdleTimer() outside the chunkSize > 0 check, or treating zero-byte reads as activity.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1016:1024
Comment:
Zero-byte chunks won't arm idle watchdog.
When `value` exists but `chunkSize === 0`, the code skips `pushChunk()` and won't enter the `if (value && chunkSize > 0)` block. Subsequent `if (!isFirstChunk)` will be false (since `isFirstChunk` is still true), so `startIdleTimer()` never runs. A stream yielding only zero-length chunks will bypass idle timeout entirely.
Consider moving `startIdleTimer()` outside the `chunkSize > 0` check, or treating zero-byte reads as activity.
How can I resolve this? If you propose a fix, please make it concise.| let abortReason: string | undefined; | ||
|
|
||
| // 静默期 Watchdog:透传也需要支持中途卡住(无新数据推送) | ||
| const idleTimeoutMs = | ||
| provider.streamingIdleTimeoutMs > 0 ? provider.streamingIdleTimeoutMs : Infinity; | ||
| let idleTimeoutId: NodeJS.Timeout | null = null; | ||
| const clearIdleTimer = () => { | ||
| if (idleTimeoutId) { | ||
| clearTimeout(idleTimeoutId); | ||
| idleTimeoutId = null; |
There was a problem hiding this comment.
Tail-window chunking can lose data for usage parsing.
When response exceeds MAX_STATS_BUFFER_BYTES, only the trailing 10MB is kept. If usage/cost metadata appears early in the stream (e.g., first SSE event), it will be discarded and stats finalization may fail to extract billing info.
Check if upstream Gemini responses front-load metadata, or document that stats are best-effort for >10MB responses.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 927:936
Comment:
Tail-window chunking can lose data for usage parsing.
When response exceeds `MAX_STATS_BUFFER_BYTES`, only the trailing 10MB is kept. If usage/cost metadata appears early in the stream (e.g., first SSE event), it will be discarded and stats finalization may fail to extract billing info.
Check if upstream Gemini responses front-load metadata, or document that stats are best-effort for >10MB responses.
How can I resolve this? If you propose a fix, please make it concise.| it("should prefer destroy over close to avoid hanging on in-flight streaming requests", async () => { | ||
| const result = await pool.getAgent({ | ||
| endpointUrl: "https://api.anthropic.com/v1/messages", | ||
| proxyUrl: null, | ||
| enableHttp2: true, | ||
| }); | ||
|
|
||
| const agent = result.agent as unknown as { | ||
| close?: () => Promise<void>; | ||
| destroy?: () => Promise<void>; | ||
| }; | ||
|
|
||
| // 模拟:close 可能因等待 in-flight 请求结束而长期不返回 | ||
| if (typeof agent.close === "function") { | ||
| vi.mocked(agent.close).mockImplementation(() => new Promise<void>(() => {})); | ||
| } | ||
|
|
||
| await pool.shutdown(); | ||
|
|
||
| // destroy 应被优先调用(避免 close 挂死导致 shutdown/evict 卡住) | ||
| if (typeof agent.destroy === "function") { | ||
| expect(agent.destroy).toHaveBeenCalled(); | ||
| } | ||
| if (typeof agent.close === "function") { | ||
| expect(agent.close).not.toHaveBeenCalled(); | ||
| } | ||
| }); |
There was a problem hiding this comment.
Test likely passes without exercising the fix.
AgentPoolImpl.createAgent() constructs a real undici Agent, so result.agent.destroy is a native method, not a Vitest mock. vi.mocked(agent.destroy) will fail or be a no-op, meaning the test can't verify destroy() was called over close().
Mock the undici Agent constructor (e.g., via vi.mock("undici")) or inject a custom dispatcher to make this regression test deterministic.
Prompt To Fix With AI
This is a comment left during a code review.
Path: tests/unit/lib/proxy-agent/agent-pool.test.ts
Line: 505:531
Comment:
Test likely passes without exercising the fix.
`AgentPoolImpl.createAgent()` constructs a real undici `Agent`, so `result.agent.destroy` is a native method, not a Vitest mock. `vi.mocked(agent.destroy)` will fail or be a no-op, meaning the test can't verify `destroy()` was called over `close()`.
Mock the undici Agent constructor (e.g., via `vi.mock("undici")`) or inject a custom dispatcher to make this regression test deterministic.
How can I resolve this? If you propose a fix, please make it concise.
概要
修复在某些情况下服务端“全局卡死、所有请求长期停留在 requesting”的问题:避免 AgentPool 驱逐时被
undici Dispatcher.close()的 in-flight 等待阻塞,从而让后续请求在发起 upstream 之前就被卡住(因此与 200/403 等返回码无关)。同时加固 Gemini SSE 透传的超时/中断链路,降低产生“永不结束的 in-flight”流式连接的概率,并为透传 stats 任务增加响应体缓冲上限,避免 OOM/DoS。
问题与根因(链路)
现象:一旦进入异常态,任意 provider / 任意返回码(200/403/...)的请求都卡在 requesting;重启容器后暂时恢复。
关键链路:
ProxyForwarder为出站请求获取 dispatcher:getGlobalAgentPool().getAgent(...)AgentPoolImpl.getAgent()会等待evictByKey()->closeAgent()close()会等待 in-flight 请求自然结束;若存在“流式/连接已卡住”的 in-flight,close()可能长期不返回getAgent()被阻塞,新请求无法真正发起 upstream fetch -> 客户端一直处于 requesting,相关 request record 也无法进入结束态解决方案
1) Root fix:AgentPool 驱逐不再阻塞
src/lib/proxy-agent/agent-pool.tsdestroy(),避免close()等待 in-flight 导致全局阻塞tests/unit/lib/proxy-agent/agent-pool.test.tsclose()永不 resolve,验证shutdown()/ unhealthy eviction 不会挂死,并优先调用destroy()2) Gemini SSE 透传链路加固(降低触发条件 + 更安全)
src/app/v1/_lib/proxy/response-handler.tsstreamingIdleTimeoutMs:每个 chunk 重置 watchdog,中途静默超时主动abort上游tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts兼容性 / Breaking Changes
无。仅内部资源管理与超时行为更健壮。
本地验证
npm testnpm run typechecknpm run buildGreptile Overview
Greptile Summary
This PR fixes a critical production issue where the proxy service would globally deadlock, leaving all requests stuck in "requesting" state indefinitely. The root cause was
AgentPooleviction blocking onundici Agent.close(), which waits for in-flight streaming requests to complete—if a stream hung, eviction would block, preventinggetAgent()from returning and causing all subsequent requests to hang before even reaching upstream.Key Changes
AgentPool blocking fix (
agent-pool.ts:358-390):closeAgent()to preferdestroy()overclose()sincedestroy()forcefully terminates connections without waitingawait, preventinggetAgent()/markUnhealthy()/shutdown()from hangingGemini SSE passthrough hardening (
response-handler.ts:853-1200):streamingIdleTimeoutMs) that resets on each chunk to detect mid-stream silenceCLIENT_ABORTEDvsSTREAM_IDLE_TIMEOUTvsSTREAM_RESPONSE_TIMEOUT)Test coverage:
agent-pool.test.ts:238-294,505-531)response-handler-gemini-stream-passthrough-timeouts.test.ts)Issues Found
The core fix is sound and addresses the deadlock root cause, but there are concerns:
destroy()preference test atagent-pool.test.ts:505-531attempts to mock methods on real undici Agent instances, which likely won't work as intended—the regression may not be properly validatedresponse-handler.ts:1016-1029, streams yielding only zero-length chunks won't arm the idle watchdog, allowing silent hangscloseAgent()logs errors but doesn't track pending promises for graceful shutdownThe approach is well-reasoned with detailed comments explaining the trade-offs. The fix should prevent the global deadlock, though the test coverage may give false confidence and edge cases around zero-byte chunks and large responses need consideration.
Confidence Score: 3/5
src/app/v1/_lib/proxy/response-handler.ts(idle timer logic with zero-byte chunks) andtests/unit/lib/proxy-agent/agent-pool.test.ts(mock setup may not validate the fix)Important Files Changed
closeAgent()to preferdestroy()overclose()and made eviction non-blocking by not awaiting cleanup to prevent pool-wide request hangsdestroy()preference, but mock setup may not actually exercise undici Agent methodsSequence Diagram
sequenceDiagram participant Client participant ProxyForwarder participant AgentPool participant Agent as undici Agent participant Upstream as Gemini API participant ResponseHandler participant StatsTask as Background Stats Task Client->>ProxyForwarder: Request with streaming ProxyForwarder->>AgentPool: getAgent(params) alt Agent exists & healthy AgentPool-->>ProxyForwarder: Return cached agent else Agent unhealthy/expired AgentPool->>Agent: destroy() (fire-and-forget) Note over AgentPool,Agent: Non-blocking eviction<br/>prevents pool-wide hang AgentPool->>Agent: Create new agent AgentPool-->>ProxyForwarder: Return new agent end ProxyForwarder->>Upstream: Forward request via agent Upstream-->>ProxyForwarder: 200 + SSE headers ProxyForwarder->>ResponseHandler: dispatch(session, response) ResponseHandler->>Client: Stream headers (passthrough) ResponseHandler->>StatsTask: Clone response & start background stats Note over StatsTask: First-byte timeout NOT cleared<br/>until first chunk arrives Upstream-->>ResponseHandler: First data chunk ResponseHandler->>Client: Forward chunk StatsTask->>StatsTask: recordTtfb()<br/>clearResponseTimeout()<br/>startIdleTimer() loop Streaming Upstream-->>ResponseHandler: Data chunk ResponseHandler->>Client: Forward chunk StatsTask->>StatsTask: pushChunk()<br/>reset idle watchdog alt Buffer exceeds 10MB StatsTask->>StatsTask: Evict old chunks<br/>(tail-window only) end end alt Stream completes normally Upstream-->>ResponseHandler: Stream end ResponseHandler->>Client: End stream StatsTask->>StatsTask: finalize stats & persist else Idle timeout StatsTask->>Upstream: abort() connection Note over StatsTask: Idle watchdog triggered<br/>no data for N ms StatsTask->>StatsTask: finalize with STREAM_IDLE_TIMEOUT else Client disconnect Client->>ResponseHandler: Abort StatsTask->>StatsTask: finalize with CLIENT_ABORTED end