feat(observability): integrate Langfuse for LLM request tracing#791
feat(observability): integrate Langfuse for LLM request tracing#791
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthrough本次变更在代码库中新增 Langfuse 可观测性集成(OTel 初始化、追踪上报、生命周期)、请求成本拆解与上报、会话转发时间与转发体捕获、相关环境变量与运行时依赖,并添加大量单元测试;未修改公开 API 签名或主要控制流。 Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @ding113, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a robust observability solution by integrating Langfuse for comprehensive tracing of all LLM proxy requests. The integration provides deep insights into the lifecycle of each request, from initial processing through the guard pipeline to upstream provider interactions and final response generation. This enhancement allows for better monitoring, debugging, and performance analysis of the LLM proxy service, offering detailed timing, usage, and cost breakdowns without impacting performance due to its asynchronous and optional nature. Highlights
Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
🧪 测试结果
总体结果: ✅ 所有测试通过 |
| ...(ctx.usageMetrics.cache_read_input_tokens != null | ||
| ? { cache_read_input_tokens: ctx.usageMetrics.cache_read_input_tokens } | ||
| : {}), | ||
| ...(ctx.usageMetrics.cache_creation_input_tokens != null | ||
| ? { cache_creation_input_tokens: ctx.usageMetrics.cache_creation_input_tokens } | ||
| : {}), |
There was a problem hiding this comment.
Usage keys use snake_case (cache_read_input_tokens) while base keys use lowercase (input, output). Consider standardizing to camelCase for consistency with Langfuse conventions.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/lib/langfuse/trace-proxy-request.ts
Line: 210:215
Comment:
Usage keys use snake_case (`cache_read_input_tokens`) while base keys use lowercase (`input`, `output`). Consider standardizing to camelCase for consistency with Langfuse conventions.
<sub>Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!</sub>
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive integration with Langfuse for observability, covering tracing for the entire proxy request lifecycle, including a multi-span hierarchy, timing breakdowns, and cost propagation. However, a medium-severity security vulnerability was identified due to incomplete redaction of sensitive headers, which could lead to API key leakage for certain providers. To improve maintainability, consider centralizing configuration loading in the Langfuse initialization logic and addressing some code duplication in the response handler. The implementation is robust, with non-blocking trace emission and data truncation, and includes an extensive test suite, ensuring reliability.
| const REDACTED_HEADERS = new Set([ | ||
| "x-api-key", | ||
| "authorization", | ||
| "x-goog-api-key", | ||
| "anthropic-api-key", | ||
| "cookie", | ||
| "set-cookie", | ||
| ]); |
There was a problem hiding this comment.
The REDACTED_HEADERS set is missing the api-key header, which is commonly used by providers like Azure OpenAI. This could lead to sensitive API keys being leaked to the Langfuse observability backend in plain text. It is recommended to add api-key and other common authentication headers to the redaction list.
const REDACTED_HEADERS = new Set([
"x-api-key",
"authorization",
"x-goog-api-key",
"anthropic-api-key",
"api-key",
"x-api-token",
"x-auth-token",
"cookie",
"set-cookie",
]);| // Raw cost without multiplier for Langfuse | ||
| if (provider.costMultiplier !== 1) { | ||
| const rawCost = calculateRequestCost( | ||
| usageMetrics, | ||
| priceData, | ||
| 1.0, | ||
| session.getContext1mApplied() | ||
| ); | ||
| if (rawCost.gt(0)) { | ||
| rawCostUsdStr = rawCost.toString(); | ||
| } | ||
| } else { | ||
| rawCostUsdStr = costUsdStr; | ||
| } |
There was a problem hiding this comment.
| // Raw cost without multiplier for Langfuse | ||
| if (provider.costMultiplier !== 1) { | ||
| const rawCost = calculateRequestCost( | ||
| usageForCost, | ||
| priceData, | ||
| 1.0, | ||
| session.getContext1mApplied() | ||
| ); | ||
| if (rawCost.gt(0)) { | ||
| rawCostUsdStr = rawCost.toString(); | ||
| } | ||
| } else { | ||
| rawCostUsdStr = costUsdStr; | ||
| } |
| export async function initLangfuse(): Promise<void> { | ||
| if (initialized || !isLangfuseEnabled()) { | ||
| return; | ||
| } | ||
|
|
||
| try { | ||
| const { NodeSDK: OtelNodeSDK } = await import("@opentelemetry/sdk-node"); | ||
| const { LangfuseSpanProcessor: LfSpanProcessor } = await import("@langfuse/otel"); | ||
|
|
||
| const sampleRate = Number.parseFloat(process.env.LANGFUSE_SAMPLE_RATE || "1.0"); | ||
|
|
||
| spanProcessor = new LfSpanProcessor({ | ||
| publicKey: process.env.LANGFUSE_PUBLIC_KEY, | ||
| secretKey: process.env.LANGFUSE_SECRET_KEY, | ||
| baseUrl: process.env.LANGFUSE_BASE_URL || "https://cloud.langfuse.com", | ||
| // Only export spans from langfuse-sdk scope (avoid noise from other OTel instrumentations) | ||
| shouldExportSpan: ({ otelSpan }) => otelSpan.instrumentationScope.name === "langfuse-sdk", | ||
| }); | ||
|
|
||
| const samplerConfig = | ||
| sampleRate < 1.0 | ||
| ? await (async () => { | ||
| const { TraceIdRatioBasedSampler } = await import("@opentelemetry/sdk-trace-base"); | ||
| return { sampler: new TraceIdRatioBasedSampler(sampleRate) }; | ||
| })() | ||
| : {}; | ||
|
|
||
| sdk = new OtelNodeSDK({ | ||
| spanProcessors: [spanProcessor], | ||
| ...samplerConfig, | ||
| }); | ||
|
|
||
| sdk.start(); | ||
| initialized = true; | ||
|
|
||
| logger.info("[Langfuse] Observability initialized", { | ||
| baseUrl: process.env.LANGFUSE_BASE_URL || "https://cloud.langfuse.com", | ||
| sampleRate, | ||
| debug: process.env.LANGFUSE_DEBUG === "true", | ||
| }); | ||
|
|
||
| if (process.env.LANGFUSE_DEBUG === "true") { | ||
| const { configureGlobalLogger, LogLevel } = await import("@langfuse/core"); | ||
| configureGlobalLogger({ level: LogLevel.DEBUG }); | ||
| } | ||
| } catch (error) { | ||
| logger.error("[Langfuse] Failed to initialize", { | ||
| error: error instanceof Error ? error.message : String(error), | ||
| }); | ||
| } | ||
| } |
There was a problem hiding this comment.
To improve maintainability and ensure consistency with configuration validation, it's better to use the getEnvConfig() function to retrieve all Langfuse-related environment variables. This avoids duplicating the parsing and default value logic that is already defined in src/lib/config/env.schema.ts.
export async function initLangfuse(): Promise<void> {
if (initialized || !isLangfuseEnabled()) {
return;
}
try {
const { NodeSDK: OtelNodeSDK } = await import("@opentelemetry/sdk-node");
const { LangfuseSpanProcessor: LfSpanProcessor } = await import("@langfuse/otel");
const { getEnvConfig } = require("@/lib/config/env.schema");
const {
LANGFUSE_PUBLIC_KEY,
LANGFUSE_SECRET_KEY,
LANGFUSE_BASE_URL,
LANGFUSE_SAMPLE_RATE,
LANGFUSE_DEBUG,
} = getEnvConfig();
spanProcessor = new LfSpanProcessor({
publicKey: LANGFUSE_PUBLIC_KEY,
secretKey: LANGFUSE_SECRET_KEY,
baseUrl: LANGFUSE_BASE_URL,
// Only export spans from langfuse-sdk scope (avoid noise from other OTel instrumentations)
shouldExportSpan: ({ otelSpan }) => otelSpan.instrumentationScope.name === "langfuse-sdk",
});
const samplerConfig =
LANGFUSE_SAMPLE_RATE < 1.0
? await (async () => {
const { TraceIdRatioBasedSampler } = await import("@opentelemetry/sdk-trace-base");
return { sampler: new TraceIdRatioBasedSampler(LANGFUSE_SAMPLE_RATE) };
})()
: {};
sdk = new OtelNodeSDK({
spanProcessors: [spanProcessor],
...samplerConfig,
});
sdk.start();
initialized = true;
logger.info("[Langfuse] Observability initialized", {
baseUrl: LANGFUSE_BASE_URL,
sampleRate: LANGFUSE_SAMPLE_RATE,
debug: LANGFUSE_DEBUG,
});
if (LANGFUSE_DEBUG) {
const { configureGlobalLogger, LogLevel } = await import("@langfuse/core");
configureGlobalLogger({ level: LogLevel.DEBUG });
}
} catch (error) {
logger.error("[Langfuse] Failed to initialize", {
error: error instanceof Error ? error.message : String(error),
});
}
}There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In @.env.example:
- Around line 131-139: Add the missing LANGFUSE_MAX_IO_SIZE environment variable
documentation to .env.example: describe the variable name LANGFUSE_MAX_IO_SIZE,
its default value (100000), allowed range (1-10000000), and its purpose (limits
Langfuse I/O payload size) along with an example value and note that it is
optional; place it near the other LANGFUSE_* entries (alongside
LANGFUSE_SAMPLE_RATE and LANGFUSE_DEBUG) so operators can discover and tune the
setting.
In `@package.json`:
- Around line 47-51: 在 package.json 中修正不可用的依赖版本:将 "@opentelemetry/sdk-node" 的版本从
"0.212.0" 改为 "^0.211.0",并将三个 Langfuse 包 "@langfuse/client", "@langfuse/otel",
"@langfuse/tracing" 的版本从 "4.6.1" 更新为一个在 npm 上存在的版本(例如
"^4.5.1")或确认并替换为你们实际需要的已发布版本;保存后运行 npm/yarn 安装以验证没有版本解析错误并修复 package-lock.json 或
yarn.lock 中的锁定文件。
In `@src/app/v1/_lib/proxy/response-handler.ts`:
- Around line 1763-1774: The field sseEventCount is incorrectly populated with
chunks.length (raw read() byte-chunk count) in emitLangfuseTrace; update the
call in response-handler.ts to either rename the field to chunkCount to
accurately reflect chunks.length, or compute the actual SSE event count from the
parsed/decoded SSE stream (e.g., count complete SSE events after your SSE
parsing logic) and pass that value as sseEventCount; adjust the
TraceContext/interface usage accordingly (or update Langfuse payload field name)
and ensure the symbol emitLangfuseTrace and the variable chunks are the points
you modify.
🧹 Nitpick comments (10)
src/lib/config/env.schema.ts (1)
131-137: 考虑为LANGFUSE_BASE_URL添加.url()校验。其他 URL 类型的环境变量(如
DSN,第 34 行)使用了.url()校验。建议对LANGFUSE_BASE_URL也添加 URL 格式校验,以便在启动时尽早发现配置错误,而不是等到运行时连接失败。建议的修改
- LANGFUSE_BASE_URL: z.string().default("https://cloud.langfuse.com"), + LANGFUSE_BASE_URL: z.string().url().default("https://cloud.langfuse.com"),src/lib/langfuse/index.ts (2)
28-33:initLangfuse()直接读取process.env绕过了 Zod 校验。
LANGFUSE_SAMPLE_RATE和LANGFUSE_BASE_URL在env.schema.ts中定义了校验规则(如 min/max 边界和默认值),但此处通过process.env直接读取并硬编码了默认值。这会导致:
- 如果用户设置了无效值(如
LANGFUSE_SAMPLE_RATE=abc),Number.parseFloat返回NaN,不会触发 Zod 的范围校验。- 默认值在两处维护,未来容易不同步。
理解这里需要在 env schema 初始化之前运行,但至少建议对
sampleRate做NaN防御。建议对 sampleRate 添加 NaN 防御
const sampleRate = Number.parseFloat(process.env.LANGFUSE_SAMPLE_RATE || "1.0"); + // Fallback to 1.0 (no sampling) if value is invalid + const effectiveSampleRate = Number.isFinite(sampleRate) ? Math.max(0, Math.min(1, sampleRate)) : 1.0;然后在后续代码中使用
effectiveSampleRate代替sampleRate。
75-91: 关闭时未重置sdk和spanProcessor引用。
shutdownLangfuse()将initialized重置为false,但未将sdk和spanProcessor置为null。虽然当前不太可能在 shutdown 后再次调用initLangfuse(),但为了防御性编程,建议一并清理。建议的修改
initialized = false; + sdk = null; + spanProcessor = null; logger.info("[Langfuse] Shutdown complete");src/lib/langfuse/trace-proxy-request.ts (3)
130-141:failedAttempts计数可能不够精确。第 139 行将
reason不在SUCCESS_REASONS(包含"initial_selection"和"session_reuse")中的 chain item 都计为失败。但"initial_selection"是选择事件,不代表最终结果;只有后续有另一个带结果 reason 的 chain item 才表示该次尝试的成败。这可能导致某些中间状态的 chain item 被错误地计入失败次数。这不影响功能,仅影响 trace 中的元数据准确性。考虑过滤出明确的失败 reason(
"retry_failed","system_error"等)而非取 success 的补集。
73-89:truncateForLangfuse中JSON.stringify对循环引用可能抛异常。第 78 行对对象调用
JSON.stringify(data)可能因循环引用而抛出TypeError。虽然外层traceProxyRequest有 try/catch,但这会导致整个 trace 丢失。建议在此处添加 try/catch 保护,仅对该对象降级处理而不中断整个 trace 流程。建议的防御性修改
if (data != null && typeof data === "object") { - const str = JSON.stringify(data); - if (str.length > maxChars) { - return { - _truncated: true, - _length: str.length, - _preview: str.substring(0, Math.min(maxChars, 2000)), - }; + try { + const str = JSON.stringify(data); + if (str.length > maxChars) { + return { + _truncated: true, + _length: str.length, + _preview: str.substring(0, Math.min(maxChars, 2000)), + }; + } + } catch { + return { _truncated: true, _error: "unserializable" }; } return data; }
339-355: 根 span 的 input/output 在第 229-243 行和第 341-354 行中重复设置。
startObservation调用时已经传入了input和output(第 229-243 行),updateTrace又以几乎相同的内容再次设置(第 341-354 行)。注释说明propagateAttributes不支持 input/output,但第一次的startObservation已经设置了。如果两者设置的是不同层级的数据(span vs trace),建议在注释中更明确地说明区别。src/app/v1/_lib/proxy/response-handler.ts (3)
42-81:emitLangfuseTrace中的启用检查与isLangfuseEnabled()逻辑重复。第 59 行直接检查
process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY,与src/lib/langfuse/index.ts中的isLangfuseEnabled()逻辑重复。理解这是为了避免在禁用状态下加载模块,设计合理。但如果未来启用条件变更(例如增加 feature flag),需要同步修改两处。建议至少添加注释说明此处的判断逻辑与isLangfuseEnabled()保持一致。
760-773: 非流式和流式路径中的 raw cost 计算逻辑完全重复。非流式(第 760-773 行)和流式(第 1700-1713 行)路径中计算
rawCostUsdStr的逻辑完全一致:当costMultiplier !== 1时以1.0为乘数重新计算。考虑将其提取为共享辅助函数以消除重复。建议提取共享函数
function computeRawCost( usageMetrics: UsageMetrics, priceData: ModelPriceData, costMultiplier: number, context1mApplied: boolean, costUsdStr: string | undefined, ): string | undefined { if (costMultiplier !== 1) { const rawCost = calculateRequestCost(usageMetrics, priceData, 1.0, context1mApplied); return rawCost.gt(0) ? rawCost.toString() : undefined; } return costUsdStr; }Also applies to: 1700-1713
564-575: Gemini 透传路径中costUsd始终为undefined。第 570 行
costUsd: undefined——在 Gemini 透传非流式路径中,使用finalizeRequestStats处理统计,但 Langfuse trace 不包含 cost 信息。同样的情况出现在 Gemini 透传流式路径(第 1309 行)。如果需要在 Langfuse 中查看 Gemini 请求的成本,此处需要额外计算。如果是有意为之(由于 Gemini 透传路径的 cost 计算在finalizeRequestStats内部),建议添加注释说明。tests/unit/langfuse/langfuse-trace.test.ts (1)
793-827:isLangfuseEnabled测试未验证实际实现。这两个测试用内联表达式
!!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY)替代了对真实isLangfuseEnabled函数的调用。如果实际实现逻辑发生变化(例如增加LANGFUSE_BASE_URL检查或采样率判断),这些测试不会捕获到回归。建议直接导入并调用真实函数(需要绕过上面的 mock,或在单独的测试文件中测试)。
建议方案:使用独立文件或 `vi.importActual` 测试真实实现
- test("should return false when env vars are not set", () => { - delete process.env.LANGFUSE_PUBLIC_KEY; - delete process.env.LANGFUSE_SECRET_KEY; - - // Direct function test (not using the mock) - const isEnabled = !!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY); - expect(isEnabled).toBe(false); - }); - - test("should return true when both keys are set", () => { - process.env.LANGFUSE_PUBLIC_KEY = "pk-lf-test-mock"; - process.env.LANGFUSE_SECRET_KEY = "test-mock-not-real"; - - const isEnabled = !!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY); - expect(isEnabled).toBe(true); - }); + // 建议将这些测试移到单独的测试文件(如 langfuse-enabled.test.ts), + // 不 mock `@/lib/langfuse/index`,直接导入并测试 isLangfuseEnabled 函数。
| # Langfuse Observability (optional, auto-enabled when keys are set) | ||
| # 功能说明:企业级 LLM 可观测性集成,自动追踪所有代理请求的完整生命周期 | ||
| # - 配置 PUBLIC_KEY 和 SECRET_KEY 后自动启用 | ||
| # - 支持 Langfuse Cloud 和自托管实例 | ||
| LANGFUSE_PUBLIC_KEY= # Langfuse project public key (pk-lf-...) | ||
| LANGFUSE_SECRET_KEY= # Langfuse project secret key (sk-lf-...) | ||
| LANGFUSE_BASE_URL=https://cloud.langfuse.com # Langfuse server URL (self-hosted or cloud) | ||
| LANGFUSE_SAMPLE_RATE=1.0 # Trace sampling rate (0.0-1.0, default: 1.0 = 100%) | ||
| LANGFUSE_DEBUG=false # Enable Langfuse debug logging |
There was a problem hiding this comment.
缺少 LANGFUSE_MAX_IO_SIZE 配置项说明。
env schema 中定义了 LANGFUSE_MAX_IO_SIZE(默认 100,000,范围 1-10,000,000),但 .env.example 中未包含该配置项。建议补充以便运维人员了解此可调参数。
建议补充
LANGFUSE_SAMPLE_RATE=1.0 # Trace sampling rate (0.0-1.0, default: 1.0 = 100%)
LANGFUSE_DEBUG=false # Enable Langfuse debug logging
+LANGFUSE_MAX_IO_SIZE=100000 # Max I/O size per trace (chars, default: 100000, max: 10000000)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Langfuse Observability (optional, auto-enabled when keys are set) | |
| # 功能说明:企业级 LLM 可观测性集成,自动追踪所有代理请求的完整生命周期 | |
| # - 配置 PUBLIC_KEY 和 SECRET_KEY 后自动启用 | |
| # - 支持 Langfuse Cloud 和自托管实例 | |
| LANGFUSE_PUBLIC_KEY= # Langfuse project public key (pk-lf-...) | |
| LANGFUSE_SECRET_KEY= # Langfuse project secret key (sk-lf-...) | |
| LANGFUSE_BASE_URL=https://cloud.langfuse.com # Langfuse server URL (self-hosted or cloud) | |
| LANGFUSE_SAMPLE_RATE=1.0 # Trace sampling rate (0.0-1.0, default: 1.0 = 100%) | |
| LANGFUSE_DEBUG=false # Enable Langfuse debug logging | |
| # Langfuse Observability (optional, auto-enabled when keys are set) | |
| # 功能说明:企业级 LLM 可观测性集成,自动追踪所有代理请求的完整生命周期 | |
| # - 配置 PUBLIC_KEY 和 SECRET_KEY 后自动启用 | |
| # - 支持 Langfuse Cloud 和自托管实例 | |
| LANGFUSE_PUBLIC_KEY= # Langfuse project public key (pk-lf-...) | |
| LANGFUSE_SECRET_KEY= # Langfuse project secret key (sk-lf-...) | |
| LANGFUSE_BASE_URL=https://cloud.langfuse.com # Langfuse server URL (self-hosted or cloud) | |
| LANGFUSE_SAMPLE_RATE=1.0 # Trace sampling rate (0.0-1.0, default: 1.0 = 100%) | |
| LANGFUSE_DEBUG=false # Enable Langfuse debug logging | |
| LANGFUSE_MAX_IO_SIZE=100000 # Max I/O size per trace (chars, default: 100000, max: 10000000) |
🧰 Tools
🪛 dotenv-linter (4.0.0)
[warning] 135-135: [SpaceCharacter] The line has spaces around equal sign
(SpaceCharacter)
[warning] 135-135: [ValueWithoutQuotes] This value needs to be surrounded in quotes
(ValueWithoutQuotes)
[warning] 136-136: [SpaceCharacter] The line has spaces around equal sign
(SpaceCharacter)
[warning] 136-136: [ValueWithoutQuotes] This value needs to be surrounded in quotes
(ValueWithoutQuotes)
[warning] 137-137: [UnorderedKey] The LANGFUSE_BASE_URL key should go before the LANGFUSE_PUBLIC_KEY key
(UnorderedKey)
[warning] 137-137: [ValueWithoutQuotes] This value needs to be surrounded in quotes
(ValueWithoutQuotes)
[warning] 138-138: [UnorderedKey] The LANGFUSE_SAMPLE_RATE key should go before the LANGFUSE_SECRET_KEY key
(UnorderedKey)
[warning] 138-138: [ValueWithoutQuotes] This value needs to be surrounded in quotes
(ValueWithoutQuotes)
[warning] 139-139: [UnorderedKey] The LANGFUSE_DEBUG key should go before the LANGFUSE_PUBLIC_KEY key
(UnorderedKey)
[warning] 139-139: [ValueWithoutQuotes] This value needs to be surrounded in quotes
(ValueWithoutQuotes)
🤖 Prompt for AI Agents
In @.env.example around lines 131 - 139, Add the missing LANGFUSE_MAX_IO_SIZE
environment variable documentation to .env.example: describe the variable name
LANGFUSE_MAX_IO_SIZE, its default value (100000), allowed range (1-10000000),
and its purpose (limits Langfuse I/O payload size) along with an example value
and note that it is optional; place it near the other LANGFUSE_* entries
(alongside LANGFUSE_SAMPLE_RATE and LANGFUSE_DEBUG) so operators can discover
and tune the setting.
| "@langfuse/client": "^4.6.1", | ||
| "@langfuse/otel": "^4.6.1", | ||
| "@langfuse/tracing": "^4.6.1", | ||
| "@lobehub/icons": "^2", | ||
| "@opentelemetry/sdk-node": "^0.212.0", |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
@opentelemetry/sdk-node npm registry version 0.212.0
💡 Result:
@opentelemetry/sdk-node@0.212.0 does not appear to be published on the npm registry. The latest published npm version shown by multiple package indexes is 0.211.0. [1][2][3]
There is an upstream GitHub release tag experimental/v0.212.0 in the open-telemetry/opentelemetry-js repo, but that doesn’t necessarily mean the @opentelemetry/sdk-node npm package was published at 0.212.0. [4]
What to do
- Install the latest available:
npm i@opentelemetry/sdk-node@0.211.0[1][2][3] - Or verify from your environment:
npm view@opentelemetry/sdk-nodeversions --json | jq 'map(select(.=="0.212.0"))'(should return nothing if it’s unpublished)
Sources: [1][2][3][4]
🌐 Web query:
@langfuse/client @langfuse/otel @langfuse/tracing npm registry version 4.6.1
💡 Result:
I couldn’t find any evidence that version 4.6.1 exists on the npm registry for @langfuse/client, @langfuse/tracing, or @langfuse/otel based on publicly indexable sources.
What I could verify:
- npm package pages (as indexed) show
4.0.0for@langfuse/client,@langfuse/tracing, and@langfuse/otel. [1][2][3] - A registry mirror (Tessl) shows at least
4.2.0for@langfuse/oteland@langfuse/tracing. [4][5] - A recent community post shows usage of
^4.5.1for@langfuse/oteland@langfuse/tracing. [6]
To confirm from your environment (authoritative), run:
npm view `@langfuse/client` version
npm view `@langfuse/tracing` version
npm view `@langfuse/otel` version
npm view `@langfuse/client`@4.6.1 version
npm view `@langfuse/tracing`@4.6.1 version
npm view `@langfuse/otel`@4.6.1 versionSources: [1] [2] [3] [4] [5] [6]
修正依赖包版本号:所指定版本在 npm 注册表中不存在。
@opentelemetry/sdk-node@0.212.0未在 npm 注册表中发布。最新可用版本为0.211.0。请更新为^0.211.0。@langfuse/client@4.6.1、@langfuse/otel@4.6.1、@langfuse/tracing@4.6.1无法在公开来源验证版本4.6.1是否存在。已确认的版本有4.0.0、4.2.0和4.5.1。请确认实际可用版本并更新(建议验证4.5.1是否符合需求)。
🤖 Prompt for AI Agents
In `@package.json` around lines 47 - 51, 在 package.json 中修正不可用的依赖版本:将
"@opentelemetry/sdk-node" 的版本从 "0.212.0" 改为 "^0.211.0",并将三个 Langfuse 包
"@langfuse/client", "@langfuse/otel", "@langfuse/tracing" 的版本从 "4.6.1" 更新为一个在
npm 上存在的版本(例如 "^4.5.1")或确认并替换为你们实际需要的已发布版本;保存后运行 npm/yarn 安装以验证没有版本解析错误并修复
package-lock.json 或 yarn.lock 中的锁定文件。
|
|
||
| emitLangfuseTrace(session, { | ||
| responseHeaders: response.headers, | ||
| responseText: allContent, | ||
| usageMetrics: usageForCost, | ||
| costUsd: rawCostUsdStr, | ||
| statusCode: effectiveStatusCode, | ||
| durationMs: duration, | ||
| isStreaming: true, | ||
| sseEventCount: chunks.length, | ||
| errorMessage: streamErrorMessage ?? undefined, | ||
| }); |
There was a problem hiding this comment.
sseEventCount 实际传入的是 chunks.length(原始数据块数),非 SSE 事件数。
第 1772 行 sseEventCount: chunks.length 传入的是 reader.read() 返回的原始字节块数量,而非解析后的 SSE 事件数量。单次 read() 可能包含多个 SSE 事件,或者一个事件被拆分到多次 read() 中。TraceContext 接口中该字段命名为 sseEventCount,在 Langfuse 中也以此名称展示,可能造成误导。
建议要么重命名为 chunkCount 以准确反映含义,要么传入实际的 SSE 事件计数。
🤖 Prompt for AI Agents
In `@src/app/v1/_lib/proxy/response-handler.ts` around lines 1763 - 1774, The
field sseEventCount is incorrectly populated with chunks.length (raw read()
byte-chunk count) in emitLangfuseTrace; update the call in response-handler.ts
to either rename the field to chunkCount to accurately reflect chunks.length, or
compute the actual SSE event count from the parsed/decoded SSE stream (e.g.,
count complete SSE events after your SSE parsing logic) and pass that value as
sseEventCount; adjust the TraceContext/interface usage accordingly (or update
Langfuse payload field name) and ensure the symbol emitLangfuseTrace and the
variable chunks are the points you modify.
| try { | ||
| return JSON.parse(text); | ||
| } catch { | ||
| return text; |
There was a problem hiding this comment.
[CRITICAL] [ERROR-SILENT] Silent catch in tryParseJsonSafe
Location: src/lib/langfuse/trace-proxy-request.ts:368
Evidence:
} catch {
return text;
}Why this is a problem: When responseText looks like JSON but is malformed/truncated, this suppresses the parse failure and makes tracing output harder to debug.
Suggested fix:
function tryParseJsonSafe(text: string): unknown {
const trimmed = text.trimStart();
if (!trimmed.startsWith("{") && !trimmed.startsWith("[")) {
return text;
}
try {
return JSON.parse(text);
} catch (error) {
logger.trace("[Langfuse] Failed to parse JSON output for Langfuse, sending raw text", {
error: error instanceof Error ? error.message : String(error),
});
return text;
}
}| LANGFUSE_BASE_URL=https://cloud.langfuse.com # Langfuse server URL (self-hosted or cloud) | ||
| LANGFUSE_SAMPLE_RATE=1.0 # Trace sampling rate (0.0-1.0, default: 1.0 = 100%) | ||
| LANGFUSE_DEBUG=false # Enable Langfuse debug logging | ||
|
|
There was a problem hiding this comment.
[MEDIUM] [COMMENT-INCOMPLETE] Missing LANGFUSE_MAX_IO_SIZE in .env.example
Location: .env.example:140
Why this is a problem: Tracing truncation uses LANGFUSE_MAX_IO_SIZE (src/lib/config/env.schema.ts:137, src/lib/langfuse/trace-proxy-request.ts:46), but operators will not discover/tune it via .env.example.
Suggested fix (add to the Langfuse block):
LANGFUSE_MAX_IO_SIZE=100000 # Max chars of traced input/output payload (default: 100000)There was a problem hiding this comment.
Code Review Summary
This PR adds a large new observability surface area (Langfuse + OpenTelemetry) across the proxy pipeline. One error-handling pattern in the new tracing code silently swallows parse failures, which will hide malformed payloads when tracing is enabled. The environment docs also omit a newly-supported tuning knob.
PR Size: XL
- Lines changed: 1500
- Files changed: 11
Split suggestions (recommended for follow-ups):
- PR 1: dependency + env schema changes (
package.json,src/lib/config/env.schema.ts,.env.example) - PR 2: Langfuse SDK lifecycle (
src/lib/langfuse/index.ts,src/instrumentation.ts) - PR 3: tracing payload/content model (
src/lib/langfuse/trace-proxy-request.ts) - PR 4: proxy integration points (
src/app/v1/_lib/proxy-handler.ts,src/app/v1/_lib/proxy/session.ts,src/app/v1/_lib/proxy/response-handler.ts) - PR 5: unit tests (
tests/unit/langfuse/langfuse-trace.test.ts)
Issues Found
| Category | Critical | High | Medium | Low |
|---|---|---|---|---|
| Logic/Bugs | 0 | 0 | 0 | 0 |
| Security | 0 | 0 | 0 | 0 |
| Error Handling | 1 | 0 | 0 | 0 |
| Types | 0 | 0 | 0 | 0 |
| Comments/Docs | 0 | 0 | 1 | 0 |
| Tests | 0 | 0 | 0 | 0 |
| Simplification | 0 | 0 | 0 | 0 |
Critical Issues (Must Fix)
src/lib/langfuse/trace-proxy-request.ts:368[CRITICAL] [ERROR-SILENT]tryParseJsonSafeswallows JSON parse errors without logging (Confidence: 97/100)
High Priority Issues (Should Fix)
.env.example:140[MEDIUM] [COMMENT-INCOMPLETE] missingLANGFUSE_MAX_IO_SIZEdocumentation (Confidence: 88/100)
Review Coverage
- Logic and correctness
- Security (OWASP Top 10)
- Error handling
- Type safety
- Documentation accuracy
- Test coverage
- Code clarity
Automated review by Codex AI
There was a problem hiding this comment.
Code Review Summary
This PR implements Langfuse observability integration with a well-designed multi-span trace hierarchy. The implementation is solid with comprehensive test coverage (827 lines), proper error handling throughout, and appropriate security considerations (header redaction).
PR Size: XL
- Lines changed: 1500 (1491 additions, 9 deletions)
- Files changed: 11
Issues Found
| Category | Critical | High | Medium | Low |
|---|---|---|---|---|
| Logic/Bugs | 0 | 0 | 0 | 0 |
| Security | 0 | 0 | 0 | 0 |
| Error Handling | 0 | 0 | 0 | 0 |
| Types | 0 | 0 | 0 | 0 |
| Comments/Docs | 0 | 0 | 0 | 0 |
| Tests | 0 | 0 | 0 | 0 |
| Simplification | 0 | 0 | 0 | 0 |
Review Coverage
- Logic and correctness - Clean
- Security (OWASP Top 10) - Clean (header redaction implemented)
- Error handling - Clean (all catch blocks log errors appropriately)
- Type safety - Clean (type assertions documented with SDK limitations)
- Documentation accuracy - Clean (comments match implementation)
- Test coverage - Comprehensive (827-line test suite)
- Code clarity - Good (well-structured with clear separation of concerns)
Implementation Quality Notes
Strengths:
- Multi-span trace hierarchy (root -> guard-pipeline + provider-attempt events + llm-call generation)
- Non-blocking fire-and-forget design with error tolerance
- Configurable sampling via
LANGFUSE_SAMPLE_RATEwith Zod validation - I/O truncation to prevent excessive payloads
- Auth-sensitive header redaction before tracing
- Proper lifecycle management with graceful shutdown
Minor observations (below reporting threshold):
initLangfusereadsLANGFUSE_SAMPLE_RATEdirectly fromprocess.envwhile other config values usegetEnvConfig()- this is safe due to Zod validation but slightly inconsistent- Type assertions used to work around SDK type limitations are documented inline
Automated review by Claude AI
There was a problem hiding this comment.
- Identified the active PR as
#791and applied the labelsize/XL(1500 LOC changed across 11 files). - Posted 2 inline review comments (diff-only):
src/lib/langfuse/trace-proxy-request.ts:368— [CRITICAL][ERROR-SILENT]emptycatch {}intryParseJsonSafe; suggested a guard + logged fallback..env.example:140— [MEDIUM][COMMENT-INCOMPLETE]missingLANGFUSE_MAX_IO_SIZEdocumentation, despite being used by truncation.
- Submitted the required “Code Review Summary” review comment on
#791, including XL split suggestions and confidence scores.
aaa48a9 to
c7fc66c
Compare
🧪 测试结果
总体结果: ✅ 所有测试通过 |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@src/app/v1/_lib/proxy/response-handler.ts`:
- Around line 567-578: The Langfuse trace for the Gemini passthrough path is
missing cost data: when calling emitLangfuseTrace in response-handler.ts (the
block that already passes responseHeaders, responseText, usageMetrics:
parseUsageFromResponseText(responseText, provider.providerType).usageMetrics,
etc.) compute the costUsd and costBreakdown from the parsed usage before
emitting; i.e., call the same cost-calculation helper used in the
non-passthrough paths (the function that converts usage metrics to cost — search
for usages where costUsd and costBreakdown are set for non-stream and stream
flows) and supply those values to emitLangfuseTrace instead of leaving costUsd
undefined so Gemini passthrough traces include cost and costBreakdown.
- Around line 46-84: The emitLangfuseTrace function currently checks process.env
directly which diverges from trace-proxy-request's isLangfuseEnabled; replace
the direct env check with a call to isLangfuseEnabled() (imported from
"@/lib/langfuse" or "@/lib/langfuse/index") so both use the same enablement
logic, i.e., call isLangfuseEnabled() at the top of emitLangfuseTrace and only
perform the dynamic import/traceProxyRequest when it returns true, preserving
the existing error logging behavior for the import/trace call.
In `@src/lib/langfuse/trace-proxy-request.ts`:
- Around line 26-37: The hasSystemPrompt check in buildRequestBodySummary
currently only treats msg.system as an array; update the logic in
buildRequestBodySummary (which takes a ProxySession and reads
session.request.message as msg) to consider both non-empty strings and non-empty
arrays as a system prompt (e.g., treat typeof msg.system === "string" &&
msg.system.trim().length > 0 OR Array.isArray(msg.system) && msg.system.length >
0 as true); keep existing handling for toolsCount, stream, maxTokens and
temperature unchanged.
In `@tests/unit/langfuse/langfuse-trace.test.ts`:
- Around line 962-995: The tests in the "isLangfuseEnabled" describe block are
checking a local inline expression instead of calling the real isLangfuseEnabled
implementation (the module "@/lib/langfuse/index" is mocked earlier), so update
the block to exercise the real function: either move this describe to a separate
test file that does not mock "@/lib/langfuse/index" or restore the real
implementation inside the block using vi.importActual to import and call
isLangfuseEnabled from "@/lib/langfuse/index"; ensure you remove the inline
!!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY) checks
and instead call isLangfuseEnabled() so changes to the function (e.g., sampling
logic) are tested.
🧹 Nitpick comments (7)
src/lib/utils/cost-calculation.ts (1)
101-306: 建议抽取共享计算核心以避免与 calculateRequestCost 逻辑漂移
新函数与现有计费逻辑高度重复,后续改动容易产生不一致。可考虑抽取公共分桶/分层计算或复用一处核心逻辑。src/app/v1/_lib/proxy/response-handler.ts (2)
746-795: 原始成本 / 成本拆解的计算逻辑在 non-stream 和 stream 路径中完全重复。Lines 746-795(non-stream)与 Lines 1698-1747(stream
finalizeStream)的rawCostUsdStr/costBreakdown计算逻辑几乎逐行一致。建议提取为一个共用的辅助函数(如computeLangfuseCost),减少维护时两处不同步的风险。
3071-3083:persistRequestFailure中补发 Langfuse trace:确认不会与主路径产生重复 trace。此处为兜底路径,在主路径的
emitLangfuseTrace未执行时补发 trace。逻辑上看,主路径的 trace 在finalizeStream/handleNonStream的 try 块末尾发射,若之前抛出异常则不会到达,因此进入persistRequestFailure时通常不会重复。但如果后续重构打乱了执行顺序,可能产生重复 trace。可以考虑在 session 上加一个langfuseTraceEmitted标记来做幂等保护。src/lib/langfuse/trace-proxy-request.ts (3)
112-145:session.getProviderChain()被多次调用,建议缓存。
getProviderChain()在 Lines 143、144、152-154、304 处被重复调用。虽然该方法应该是幂等的,但每次调用可能创建新数组或执行内部逻辑。建议在函数入口处缓存一次:建议缓存 providerChain
const requestStartTime = new Date(session.startTime); const requestEndTime = new Date(session.startTime + durationMs); + const providerChain = session.getProviderChain(); // ... const timingBreakdown = { // ... - failedAttempts: session.getProviderChain().filter((i) => !isSuccessReason(i.reason)).length, - providersAttempted: new Set(session.getProviderChain().map((i) => i.id)).size, + failedAttempts: providerChain.filter((i) => !isSuccessReason(i.reason)).length, + providersAttempted: new Set(providerChain.map((i) => i.id)).size, };
74-90:truncateForLangfuse对对象类型会完整序列化两次。Line 79 调用
JSON.stringify(data)来检测大小,若超限则 Line 84 再次 substring 取前 2000 字符作为 preview。如果对象体积很大(如包含 base64 图片的消息),第一次完整序列化会临时占用大量内存。由于 tracing 是异步非阻塞的,不会影响请求延迟,但在高并发场景下可能造成内存峰值。可以考虑先用
JSON.stringify(data).length的替代方案(如先序列化一次缓存结果),或者对 preview 复用同一次序列化的结果:复用序列化结果
if (data != null && typeof data === "object") { const str = JSON.stringify(data); if (str.length > maxChars) { return { _truncated: true, _length: str.length, - _preview: str.substring(0, Math.min(maxChars, 2000)), + _preview: str.substring(0, 2000), }; } - return data; + return data; // Under limit, original object is fine (Langfuse SDK will serialize it) }实际上
str已经缓存了,当前代码已经是复用的。但如果不超限,str被丢弃而返回原始data,Langfuse SDK 还会再序列化一次。可以考虑直接返回已序列化的str(如果 SDK 接受字符串)。
184-190:tags数组可能包含重复值。当
provider.providerType与session.originalFormat相同时(例如均为"claude"),tag 会重复出现。虽然 Langfuse 能容忍重复 tag,但可以考虑使用Set去重后再转数组。tests/unit/langfuse/langfuse-trace.test.ts (1)
470-492: 精确断言截断大小限制。当前测试验证
output.length < 200_000和包含...[truncated]的断言过于宽松。LANGFUSE_MAX_IO_SIZE默认值为100_000,截断后字符串为${data.substring(0, 100_000)}...[truncated],总长度约100_014字符。建议改为expect(output.length).toBeLessThanOrEqual(100_000 + 20)以更精确地验证截断阈值,防止截断逻辑中的边界错误。
| function emitLangfuseTrace( | ||
| session: ProxySession, | ||
| data: { | ||
| responseHeaders: Headers; | ||
| responseText: string; | ||
| usageMetrics: UsageMetrics | null; | ||
| costUsd: string | undefined; | ||
| costBreakdown?: CostBreakdown; | ||
| statusCode: number; | ||
| durationMs: number; | ||
| isStreaming: boolean; | ||
| sseEventCount?: number; | ||
| errorMessage?: string; | ||
| } | ||
| ): void { | ||
| if (!process.env.LANGFUSE_PUBLIC_KEY || !process.env.LANGFUSE_SECRET_KEY) return; | ||
|
|
||
| void import("@/lib/langfuse/trace-proxy-request") | ||
| .then(({ traceProxyRequest }) => { | ||
| void traceProxyRequest({ | ||
| session, | ||
| responseHeaders: data.responseHeaders, | ||
| durationMs: data.durationMs, | ||
| statusCode: data.statusCode, | ||
| isStreaming: data.isStreaming, | ||
| responseText: data.responseText, | ||
| usageMetrics: data.usageMetrics, | ||
| costUsd: data.costUsd, | ||
| costBreakdown: data.costBreakdown, | ||
| sseEventCount: data.sseEventCount, | ||
| errorMessage: data.errorMessage, | ||
| }); | ||
| }) | ||
| .catch((err) => { | ||
| logger.warn("[ResponseHandler] Langfuse trace failed", { | ||
| error: err instanceof Error ? err.message : String(err), | ||
| }); | ||
| }); | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
emitLangfuseTrace 使用 process.env 直接检查,与 isLangfuseEnabled() 不一致。
Line 61 直接读取 process.env.LANGFUSE_PUBLIC_KEY / process.env.LANGFUSE_SECRET_KEY,而 trace-proxy-request.ts 内部使用的是 isLangfuseEnabled() (来自 @/lib/langfuse/index)。两处判断逻辑如果后续出现分歧(例如 isLangfuseEnabled 增加采样率判断),会导致不必要的动态 import 或跳过 trace。建议统一使用 isLangfuseEnabled()。
建议统一为 isLangfuseEnabled()
+import { isLangfuseEnabled } from "@/lib/langfuse/index";
+
function emitLangfuseTrace(
session: ProxySession,
data: { ... }
): void {
- if (!process.env.LANGFUSE_PUBLIC_KEY || !process.env.LANGFUSE_SECRET_KEY) return;
+ if (!isLangfuseEnabled()) return;
void import("@/lib/langfuse/trace-proxy-request")🤖 Prompt for AI Agents
In `@src/app/v1/_lib/proxy/response-handler.ts` around lines 46 - 84, The
emitLangfuseTrace function currently checks process.env directly which diverges
from trace-proxy-request's isLangfuseEnabled; replace the direct env check with
a call to isLangfuseEnabled() (imported from "@/lib/langfuse" or
"@/lib/langfuse/index") so both use the same enablement logic, i.e., call
isLangfuseEnabled() at the top of emitLangfuseTrace and only perform the dynamic
import/traceProxyRequest when it returns true, preserving the existing error
logging behavior for the import/trace call.
| function buildRequestBodySummary(session: ProxySession): Record<string, unknown> { | ||
| const msg = session.request.message as Record<string, unknown>; | ||
| return { | ||
| model: session.request.model, | ||
| messageCount: session.getMessagesLength(), | ||
| hasSystemPrompt: Array.isArray(msg.system) && msg.system.length > 0, | ||
| toolsCount: Array.isArray(msg.tools) ? msg.tools.length : 0, | ||
| stream: msg.stream === true, | ||
| maxTokens: typeof msg.max_tokens === "number" ? msg.max_tokens : undefined, | ||
| temperature: typeof msg.temperature === "number" ? msg.temperature : undefined, | ||
| }; | ||
| } |
There was a problem hiding this comment.
hasSystemPrompt 仅检查数组形式的 system prompt。
Line 31 使用 Array.isArray(msg.system) 来判断是否有系统提示。但 Claude API 的 system 字段也可以是字符串(非数组)。这会导致字符串形式的系统提示在 requestSummary 中被报告为 hasSystemPrompt: false。
建议同时检查字符串形式
- hasSystemPrompt: Array.isArray(msg.system) && msg.system.length > 0,
+ hasSystemPrompt: Array.isArray(msg.system) ? msg.system.length > 0 : typeof msg.system === "string" && msg.system.length > 0,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| function buildRequestBodySummary(session: ProxySession): Record<string, unknown> { | |
| const msg = session.request.message as Record<string, unknown>; | |
| return { | |
| model: session.request.model, | |
| messageCount: session.getMessagesLength(), | |
| hasSystemPrompt: Array.isArray(msg.system) && msg.system.length > 0, | |
| toolsCount: Array.isArray(msg.tools) ? msg.tools.length : 0, | |
| stream: msg.stream === true, | |
| maxTokens: typeof msg.max_tokens === "number" ? msg.max_tokens : undefined, | |
| temperature: typeof msg.temperature === "number" ? msg.temperature : undefined, | |
| }; | |
| } | |
| function buildRequestBodySummary(session: ProxySession): Record<string, unknown> { | |
| const msg = session.request.message as Record<string, unknown>; | |
| return { | |
| model: session.request.model, | |
| messageCount: session.getMessagesLength(), | |
| hasSystemPrompt: Array.isArray(msg.system) ? msg.system.length > 0 : typeof msg.system === "string" && msg.system.length > 0, | |
| toolsCount: Array.isArray(msg.tools) ? msg.tools.length : 0, | |
| stream: msg.stream === true, | |
| maxTokens: typeof msg.max_tokens === "number" ? msg.max_tokens : undefined, | |
| temperature: typeof msg.temperature === "number" ? msg.temperature : undefined, | |
| }; | |
| } |
🤖 Prompt for AI Agents
In `@src/lib/langfuse/trace-proxy-request.ts` around lines 26 - 37, The
hasSystemPrompt check in buildRequestBodySummary currently only treats
msg.system as an array; update the logic in buildRequestBodySummary (which takes
a ProxySession and reads session.request.message as msg) to consider both
non-empty strings and non-empty arrays as a system prompt (e.g., treat typeof
msg.system === "string" && msg.system.trim().length > 0 OR
Array.isArray(msg.system) && msg.system.length > 0 as true); keep existing
handling for toolsCount, stream, maxTokens and temperature unchanged.
| describe("isLangfuseEnabled", () => { | ||
| const originalPublicKey = process.env.LANGFUSE_PUBLIC_KEY; | ||
| const originalSecretKey = process.env.LANGFUSE_SECRET_KEY; | ||
|
|
||
| afterEach(() => { | ||
| // Restore env | ||
| if (originalPublicKey !== undefined) { | ||
| process.env.LANGFUSE_PUBLIC_KEY = originalPublicKey; | ||
| } else { | ||
| delete process.env.LANGFUSE_PUBLIC_KEY; | ||
| } | ||
| if (originalSecretKey !== undefined) { | ||
| process.env.LANGFUSE_SECRET_KEY = originalSecretKey; | ||
| } else { | ||
| delete process.env.LANGFUSE_SECRET_KEY; | ||
| } | ||
| }); | ||
|
|
||
| test("should return false when env vars are not set", () => { | ||
| delete process.env.LANGFUSE_PUBLIC_KEY; | ||
| delete process.env.LANGFUSE_SECRET_KEY; | ||
|
|
||
| // Direct function test (not using the mock) | ||
| const isEnabled = !!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY); | ||
| expect(isEnabled).toBe(false); | ||
| }); | ||
|
|
||
| test("should return true when both keys are set", () => { | ||
| process.env.LANGFUSE_PUBLIC_KEY = "pk-lf-test-mock"; | ||
| process.env.LANGFUSE_SECRET_KEY = "test-mock-not-real"; | ||
|
|
||
| const isEnabled = !!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY); | ||
| expect(isEnabled).toBe(true); | ||
| }); |
There was a problem hiding this comment.
isLangfuseEnabled 测试块并未测试真实的 isLangfuseEnabled 函数。
由于 Line 66-68 已将 @/lib/langfuse/index mock 为 () => langfuseEnabled,这里 Lines 980-995 实际测试的是 !!(process.env.X && process.env.Y) 这个内联表达式,而非真实模块中的 isLangfuseEnabled() 实现。如果真实函数的逻辑发生变化(如增加采样率检查),此测试不会捕获到回归。
建议将此 describe 块移到单独的测试文件中(不 mock @/lib/langfuse/index),或使用 vi.importActual 在此 block 内恢复真实实现。
🤖 Prompt for AI Agents
In `@tests/unit/langfuse/langfuse-trace.test.ts` around lines 962 - 995, The tests
in the "isLangfuseEnabled" describe block are checking a local inline expression
instead of calling the real isLangfuseEnabled implementation (the module
"@/lib/langfuse/index" is mocked earlier), so update the block to exercise the
real function: either move this describe to a separate test file that does not
mock "@/lib/langfuse/index" or restore the real implementation inside the block
using vi.importActual to import and call isLangfuseEnabled from
"@/lib/langfuse/index"; ensure you remove the inline
!!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY) checks
and instead call isLangfuseEnabled() so changes to the function (e.g., sampling
logic) are tested.
🧪 测试结果
总体结果: ✅ 所有测试通过 |
…est tracing Add optional Langfuse observability integration that auto-enables when LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY env vars are set. Traces every proxy request with full context: provider chain, model, usage metrics, cost, TTFB, headers (auth redacted), special settings, and error details. Built on OpenTelemetry via @langfuse/otel with async non-blocking fire-and-forget semantics and zero overhead when disabled. Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>
…e-handler for complete data The previous Langfuse integration fired traces in proxy-handler.ts immediately after dispatch() returned. For streaming responses, the response body hadn't been consumed yet, so responseText, usageMetrics, costUsd, sseEventCount, and errorMessage were all undefined. Move trace invocation into response-handler.ts at the exact points where ALL data has been collected: - Non-streaming standard path (after response body parsed + cost calculated) - Non-streaming Gemini passthrough (after finalizeRequestStats) - Streaming standard path (end of finalizeStream) - Streaming Gemini passthrough (after finalizeRequestStats) - Error/abort paths (inside persistRequestFailure) Also enhance the trace quality: - Generation input: actual request messages instead of summary object - Generation output: actual parsed response body instead of empty/status - Tags: add provider.name and model alongside providerType - Metadata: add model, originalModel, endpoint, sessionId, keyName, requestSequence, sseEventCount, and requestSummary - Root span output: include costUsd - Add truncateForLangfuse() helper (configurable via LANGFUSE_MAX_IO_SIZE) - TraceContext: replace response:Response with responseHeaders:Headers - Import UsageMetrics type from response-handler
propagateAttributes() does not support input/output fields, and the root span's input/output do not auto-inherit to the trace level. This caused Langfuse UI to show Input: undefined and Output: undefined. Add explicit rootSpan.updateTrace() call to set trace-level input/output per Langfuse SDK documentation (Solution B: set input/output directly on the trace).
… names - Pass actual startTime/endTime to startObservation and end() so root span and generation reflect real request duration instead of ~0ms - Change costDetails key from 'totalUsd' to 'total' per Langfuse SDK convention (values are already in USD) - Change usageDetails keys from camelCase (cacheRead, cacheCreation) to snake_case (cache_read_input_tokens, cache_creation_input_tokens) so Langfuse UI can categorize them as input usage types
…serId - Calculate raw cost (multiplier=1.0) separately for Langfuse while keeping multiplied cost for session billing and rate limiting - Change userId from numeric ID to user.name for readable Langfuse traces - Key name already available in trace metadata as keyName
…r-attempt events, and timing breakdown Enrich Langfuse traces from a flat 2-observation structure to a proper span hierarchy that reveals where request time is spent: - Add forwardStartTime to ProxySession to mark when guard pipeline ends - Create guard-pipeline child span (startTime -> forwardStartTime) - Emit provider-attempt events for each failed chain item with WARNING/ERROR level classification - Compute timingBreakdown (guardPipelineMs, upstreamTotalMs, ttfbFromForwardMs, tokenGenerationMs, failedAttempts) with Math.max(0) defensive guards against clock jitter - Set LLM generation startTime to forwardStartTime for accurate upstream call duration - Update test mocks to route by observation name, add 7 new tests
…est mock - Add LANGFUSE_MAX_IO_SIZE to Zod env schema with int/min/max/default validation, replacing raw Number() parse that risked NaN on invalid input - Use lazy getEnvConfig() accessor instead of module-level constant - Add missing recordForwardStart() to proxy-handler test mock
…d cost breakdown - Root span input/output now shows actual forwarded request body and response body instead of metadata summaries (summaries moved to root span metadata) - Add observation level on root span: DEFAULT (200), WARNING (retried), ERROR (non-200/499) - Add CostBreakdown type with input/output/cache_creation/cache_read/total categories, passed through to Langfuse costDetails - Capture forwardedRequestBody in ProxySession from forwarder (both Gemini and standard paths) - updateTrace uses actual bodies for trace-level input/output
…servability - Remove truncateForLangfuse, sanitizeHeaders, REDACTED_HEADERS, and LANGFUSE_MAX_IO_SIZE env config -- observability platforms should store raw data without lossy transformations - Replace sanitizeHeaders with headersToRecord (pass-through) - Unify generation input to use actualRequestBody (forwardedRequestBody when available) instead of session.request.message, ensuring the complete forwarded request body is recorded everywhere
0d84d3b to
a04356e
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@src/lib/langfuse/trace-proxy-request.ts`:
- Around line 246-253: 当前把 messageContext?.user?.name 直接作为 userId 传给
propagateAttributes,可能会把真实姓名/邮箱等 PII 持久化到 Langfuse;请改为使用非敏感标识(例如内部 user
id/username slug:messageContext?.user?.id 或
messageContext?.user?.username)或在传递前对 name 做不可逆脱敏/哈希(调用项目内的哈希函数或新增 util,例如
hashUserIdentifier(name))后再赋给 userId,确保 propagateAttributes 调用和相关变量名(userId /
messageContext?.user?.name / propagateAttributes)保持一致。
- Around line 166-204: The generationMetadata object currently includes
sensitive/internal fields that should not be sent to external observability
services; update the code that builds generationMetadata (symbol:
generationMetadata in trace-proxy-request.ts) to separate or redact
internal-only fields (e.g., session.getProviderChain(), session.sessionId,
messageContext?.key?.name, headersToRecord(session.headers),
headersToRecord(ctx.responseHeaders), and any raw body summary from
buildRequestBodySummary(session)) from the external payload: create two objects
(externalMetadata and internalMetadata) or apply a whitelist/redaction based on
a config flag (e.g., sendInternalMetadata=false) and ensure only safe fields are
included in the externalMetadata sent out while keeping internalMetadata
logged/stored locally with appropriate access controls; also add a code comment
documenting this design decision and the config flag controlling exposure.
- Around line 27-33: headersToRecord currently returns all headers verbatim
which leaks credentials; update it to redact known auth-sensitive headers (e.g.,
Authorization, X-Api-Key, x-api-key, Anthropics-Api-Key, anthropic-api-key,
Api-Key, api-key, etc.) in a case-insensitive way by replacing their values with
a constant like "[REDACTED]" while preserving other headers, and ensure any
other call sites that serialize headers (the other occurrences referenced in the
review) use this redacting logic or call headersToRecord so credentials are
never sent to Langfuse; keep the header keys unchanged but normalize matching to
lower-case to detect sensitive names.
- Around line 232-243: The TypeScript assertions around startObservation (used
to create rootSpan) are compensating for incomplete type declarations in
`@langfuse/tracing` (you pass startTime and asType via `as`), so first try
updating `@langfuse/tracing` to a version that exposes these parameters; if no
updated typing exists, open an SDK typing issue/PR requesting startTime/asType
in the public types, and meanwhile keep the runtime assertions but add a concise
TODO comment near startObservation/rootSpan documenting the typing mismatch
(reference startObservation, rootSpan, startTime, asType) so future maintainers
know to remove the `as` once the SDK types are fixed.
🧹 Nitpick comments (3)
src/lib/langfuse/trace-proxy-request.ts (3)
98-119:failedAttempts重复计算。
timingBreakdown.failedAttempts(Line 107)和rootSpanLevel判断中的failedAttempts(Line 116-118)使用完全相同的逻辑各计算一次。可复用timingBreakdown.failedAttempts。建议的修复
// Compute observation level for root span let rootSpanLevel: ObservationLevel = "DEFAULT"; if (statusCode < 200 || statusCode >= 300) { rootSpanLevel = "ERROR"; } else { - const failedAttempts = session - .getProviderChain() - .filter((i) => !isSuccessReason(i.reason)).length; - if (failedAttempts >= 1) rootSpanLevel = "WARNING"; + if (timingBreakdown.failedAttempts >= 1) rootSpanLevel = "WARNING"; }
298-304:generationOutput与actualResponseBody(Line 128-132)逻辑完全重复。直接复用
actualResponseBody即可,无需重新构造。建议的修复
- // Generation input/output = raw payload, no truncation - const generationInput = actualRequestBody; - const generationOutput = ctx.responseText - ? tryParseJsonSafe(ctx.responseText) - : isStreaming - ? { streaming: true, sseEventCount: ctx.sseEventCount } - : { statusCode }; + const generationInput = actualRequestBody; + const generationOutput = actualResponseBody;
346-352:tryParseJsonSafe对超大 response body 无保护。
ctx.responseText可能非常大(如长对话或大量 tool output)。对其调用JSON.parse会在内存中生成完整的 JS 对象图,且该解析在 Lines 129 和 301 各执行一次(修复重复后仍至少一次)。对于 observability 路径,考虑是否需要对超大 payload 做截断或仅保留摘要。
🧪 测试结果
总体结果: ✅ 所有测试通过 |
Additional Comments (1)
Use Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time! Prompt To Fix With AIThis is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 20:20
Comment:
early return prevents trace emission on missing keys
```suggestion
if (!process.env.LANGFUSE_PUBLIC_KEY || !process.env.LANGFUSE_SECRET_KEY) return;
```
Use `isLangfuseEnabled()` for consistency with other files.
<sub>Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!</sub>
How can I resolve this? If you propose a fix, please make it concise. |
session.headers = client -> CCH (user's own key, safe to log). Upstream provider API key (outboundKey) is injected by ProxyForwarder into a separate Headers object, never present in traced headers.
🧪 测试结果
总体结果: ✅ 所有测试通过 |
| const { NodeSDK: OtelNodeSDK } = await import("@opentelemetry/sdk-node"); | ||
| const { LangfuseSpanProcessor: LfSpanProcessor } = await import("@langfuse/otel"); | ||
|
|
||
| const sampleRate = Number.parseFloat(process.env.LANGFUSE_SAMPLE_RATE || "1.0"); |
There was a problem hiding this comment.
Bypasses env schema validation
The env schema (env.schema.ts) defines LANGFUSE_SAMPLE_RATE with z.coerce.number().min(0).max(1).default(1.0), but initLangfuse re-parses the raw process.env string directly. This means the 0–1 bounds check is bypassed at init time — a value like LANGFUSE_SAMPLE_RATE=5 would pass through to TraceIdRatioBasedSampler without validation. The same applies to LANGFUSE_BASE_URL and LANGFUSE_DEBUG on lines 33, 55, 57, 60.
Consider using getEnvConfig() instead:
import { getEnvConfig } from "@/lib/config/env.schema";
// ...
const env = getEnvConfig();
const sampleRate = env.LANGFUSE_SAMPLE_RATE;
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/lib/langfuse/index.ts
Line: 28:28
Comment:
**Bypasses env schema validation**
The env schema (`env.schema.ts`) defines `LANGFUSE_SAMPLE_RATE` with `z.coerce.number().min(0).max(1).default(1.0)`, but `initLangfuse` re-parses the raw `process.env` string directly. This means the 0–1 bounds check is bypassed at init time — a value like `LANGFUSE_SAMPLE_RATE=5` would pass through to `TraceIdRatioBasedSampler` without validation. The same applies to `LANGFUSE_BASE_URL` and `LANGFUSE_DEBUG` on lines 33, 55, 57, 60.
Consider using `getEnvConfig()` instead:
```
import { getEnvConfig } from "@/lib/config/env.schema";
// ...
const env = getEnvConfig();
const sampleRate = env.LANGFUSE_SAMPLE_RATE;
```
How can I resolve this? If you propose a fix, please make it concise.| errorMessage?: string; | ||
| } | ||
| ): void { | ||
| if (!process.env.LANGFUSE_PUBLIC_KEY || !process.env.LANGFUSE_SECRET_KEY) return; |
There was a problem hiding this comment.
Redundant enabled check
emitLangfuseTrace checks process.env.LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY here, and then traceProxyRequest immediately checks isLangfuseEnabled() which performs the exact same check. This is not a bug, but the outer check could use isLangfuseEnabled() for consistency, or be removed entirely since traceProxyRequest already guards against it.
| if (!process.env.LANGFUSE_PUBLIC_KEY || !process.env.LANGFUSE_SECRET_KEY) return; | |
| if (!process.env.LANGFUSE_PUBLIC_KEY || !process.env.LANGFUSE_SECRET_KEY) return; |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 61:61
Comment:
**Redundant enabled check**
`emitLangfuseTrace` checks `process.env.LANGFUSE_PUBLIC_KEY` and `LANGFUSE_SECRET_KEY` here, and then `traceProxyRequest` immediately checks `isLangfuseEnabled()` which performs the exact same check. This is not a bug, but the outer check could use `isLangfuseEnabled()` for consistency, or be removed entirely since `traceProxyRequest` already guards against it.
```suggestion
if (!process.env.LANGFUSE_PUBLIC_KEY || !process.env.LANGFUSE_SECRET_KEY) return;
```
<sub>Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!</sub>
How can I resolve this? If you propose a fix, please make it concise.
Summary
proxy-requestspan ->guard-pipelinespan +provider-attemptevents +llm-callgenerationusageDetailsandcostDetailson the LLM generation observationLANGFUSE_SAMPLE_RATE(0-1), disabled by default when keys are not setLANGFUSE_MAX_IO_SIZE(default 100K chars) to avoid excessive payloadsChanges
src/lib/langfuse/index.tssrc/lib/langfuse/trace-proxy-request.tssrc/app/v1/_lib/proxy/response-handler.tssrc/app/v1/_lib/proxy/session.tsforwardStartTime+recordForwardStart()src/app/v1/_lib/proxy-handler.tsrecordForwardStart()before forwardingsrc/instrumentation.tssrc/lib/config/env.schema.ts.env.exampletests/unit/langfuse/langfuse-trace.test.tsTest plan
bun run buildpassesbun run lintpassesbun run typecheckpassesbun run testpasses (2326 tests, 0 failures)LANGFUSE_PUBLIC_KEY+LANGFUSE_SECRET_KEY, send proxy requests, verify traces appear with correct hierarchyLANGFUSE_SAMPLE_RATE=0.5, confirm ~50% of requests are tracedGreptile Summary
Integrates Langfuse as an optional observability backend for tracing all proxy LLM requests through the existing OpenTelemetry pipeline. The integration is cleanly gated behind
LANGFUSE_PUBLIC_KEYandLANGFUSE_SECRET_KEYenv vars — when unset, all code paths are no-ops.proxy-requestspan →guard-pipelinechild span +provider-attemptevents (for failed retries) +llm-callgeneration observation with usage/cost detailscalculateRequestCostBreakdownincost-calculation.tsto provide per-category cost breakdowns (input, output, cache creation, cache read) for Langfuse, separate from the existing multiplier-applied cost used for billingemitLangfuseTraceis called at all 5 response finalization paths (non-stream success, non-stream error/Gemini, SSE Gemini, SSE Claude, andpersistRequestFailurefor error/abort)LANGFUSE_SAMPLE_RATE(0-1) using OpenTelemetry'sTraceIdRatioBasedSamplerLANGFUSE_MAX_IO_SIZE" and "auth-sensitive headers redacted", but the implementation sends raw bodies without truncation and raw client headers without redaction (the code comments explain provider keys are never insession.headers, which is correct). The description appears outdated relative to the final implementation.src/lib/langfuse/index.tsreads env vars directly fromprocess.envinstead of using the validatedgetEnvConfig(), bypassing the Zod schema constraints defined inenv.schema.tsConfidence Score: 4/5
src/lib/langfuse/index.ts(bypasses env schema validation),src/app/v1/_lib/proxy/response-handler.ts(duplicated cost calculation blocks)Important Files Changed
emitLangfuseTrace()calls at all 5 response finalization paths (non-stream, non-stream error, SSE Gemini, SSE Claude, and persistRequestFailure). Raw cost calculation block is duplicated between non-stream and stream paths. Minor redundancy in enabled check.forwardStartTime,forwardedRequestBodyfields andrecordForwardStart()method. Idempotent, clean implementation.calculateRequestCostBreakdownthat mirrors existingcalculateRequestCostlogic but returns per-category costs. Correctly handles all tiers (context1m, 200k, cache). Good test coverage.Sequence Diagram
sequenceDiagram participant Client participant ProxyHandler participant GuardPipeline participant ProxyForwarder participant Upstream as Upstream LLM participant ResponseHandler participant Langfuse Client->>ProxyHandler: LLM Request ProxyHandler->>GuardPipeline: Run guards (auth, rate limit, etc.) GuardPipeline-->>ProxyHandler: Pass Note over ProxyHandler: recordForwardStart() ProxyHandler->>ProxyForwarder: Forward request Note over ProxyForwarder: Set session.forwardedRequestBody ProxyForwarder->>Upstream: HTTP Request Upstream-->>ResponseHandler: Response (stream/non-stream) Note over ResponseHandler: Calculate cost breakdown ResponseHandler-->>Client: Proxied Response ResponseHandler-)Langfuse: emitLangfuseTrace (async, fire-and-forget) Note over Langfuse: Root span: proxy-request Note over Langfuse: Child: guard-pipeline Note over Langfuse: Events: provider-attempt (failures) Note over Langfuse: Generation: llm-call (usage + cost)Last reviewed commit: db043d8