-
-
Notifications
You must be signed in to change notification settings - Fork 181
feat(observability): integrate Langfuse for LLM request tracing #791
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
084be30
3251c2f
69fc61b
5e93260
debdd69
fa180e5
a910f32
cd15600
a04356e
db043d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -128,6 +128,16 @@ FETCH_HEADERS_TIMEOUT=600000 | |
| FETCH_BODY_TIMEOUT=600000 | ||
| MAX_RETRY_ATTEMPTS_DEFAULT=2 # 单供应商最大尝试次数(含首次调用),范围 1-10,留空使用默认值 2 | ||
|
|
||
| # Langfuse Observability (optional, auto-enabled when keys are set) | ||
| # 功能说明:企业级 LLM 可观测性集成,自动追踪所有代理请求的完整生命周期 | ||
| # - 配置 PUBLIC_KEY 和 SECRET_KEY 后自动启用 | ||
| # - 支持 Langfuse Cloud 和自托管实例 | ||
| LANGFUSE_PUBLIC_KEY= # Langfuse project public key (pk-lf-...) | ||
| LANGFUSE_SECRET_KEY= # Langfuse project secret key (sk-lf-...) | ||
| LANGFUSE_BASE_URL=https://cloud.langfuse.com # Langfuse server URL (self-hosted or cloud) | ||
| LANGFUSE_SAMPLE_RATE=1.0 # Trace sampling rate (0.0-1.0, default: 1.0 = 100%) | ||
| LANGFUSE_DEBUG=false # Enable Langfuse debug logging | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [MEDIUM] [COMMENT-INCOMPLETE] Missing Location: Why this is a problem: Tracing truncation uses Suggested fix (add to the Langfuse block): LANGFUSE_MAX_IO_SIZE=100000 # Max chars of traced input/output payload (default: 100000) |
||
| # 智能探测配置 | ||
| # 功能说明:当熔断器处于 OPEN 状态时,定期探测供应商以实现更快恢复 | ||
| # - ENABLE_SMART_PROBING:是否启用智能探测(默认:false) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,7 +44,11 @@ | |
| "@hono/zod-openapi": "^1", | ||
| "@hookform/resolvers": "^5", | ||
| "@iarna/toml": "^2.2.5", | ||
| "@langfuse/client": "^4.6.1", | ||
| "@langfuse/otel": "^4.6.1", | ||
| "@langfuse/tracing": "^4.6.1", | ||
| "@lobehub/icons": "^2", | ||
| "@opentelemetry/sdk-node": "^0.212.0", | ||
|
Comment on lines
+47
to
+51
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🌐 Web query:
💡 Result:
There is an upstream GitHub release tag experimental/v0.212.0 in the What to do
Sources: [1][2][3][4] 🌐 Web query:
💡 Result: I couldn’t find any evidence that version What I could verify:
To confirm from your environment (authoritative), run: npm view `@langfuse/client` version
npm view `@langfuse/tracing` version
npm view `@langfuse/otel` version
npm view `@langfuse/client`@4.6.1 version
npm view `@langfuse/tracing`@4.6.1 version
npm view `@langfuse/otel`@4.6.1 versionSources: [1] [2] [3] [4] [5] [6] 修正依赖包版本号:所指定版本在 npm 注册表中不存在。
🤖 Prompt for AI Agents |
||
| "@radix-ui/react-alert-dialog": "^1", | ||
| "@radix-ui/react-avatar": "^1", | ||
| "@radix-ui/react-checkbox": "^1", | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -8,7 +8,8 @@ import { RateLimitService } from "@/lib/rate-limit"; | |||||
| import type { LeaseWindowType } from "@/lib/rate-limit/lease"; | ||||||
| import { SessionManager } from "@/lib/session-manager"; | ||||||
| import { SessionTracker } from "@/lib/session-tracker"; | ||||||
| import { calculateRequestCost } from "@/lib/utils/cost-calculation"; | ||||||
| import type { CostBreakdown } from "@/lib/utils/cost-calculation"; | ||||||
| import { calculateRequestCost, calculateRequestCostBreakdown } from "@/lib/utils/cost-calculation"; | ||||||
| import { hasValidPriceData } from "@/lib/utils/price-data"; | ||||||
| import { isSSEText, parseSSEData } from "@/lib/utils/sse"; | ||||||
| import { detectUpstreamErrorFromSseOrJsonText } from "@/lib/utils/upstream-error-detection"; | ||||||
|
|
@@ -39,6 +40,49 @@ export type UsageMetrics = { | |||||
| output_image_tokens?: number; | ||||||
| }; | ||||||
|
|
||||||
| /** | ||||||
| * Fire Langfuse trace asynchronously. Non-blocking, error-tolerant. | ||||||
| */ | ||||||
| function emitLangfuseTrace( | ||||||
| session: ProxySession, | ||||||
| data: { | ||||||
| responseHeaders: Headers; | ||||||
| responseText: string; | ||||||
| usageMetrics: UsageMetrics | null; | ||||||
| costUsd: string | undefined; | ||||||
| costBreakdown?: CostBreakdown; | ||||||
| statusCode: number; | ||||||
| durationMs: number; | ||||||
| isStreaming: boolean; | ||||||
| sseEventCount?: number; | ||||||
| errorMessage?: string; | ||||||
| } | ||||||
| ): void { | ||||||
| if (!process.env.LANGFUSE_PUBLIC_KEY || !process.env.LANGFUSE_SECRET_KEY) return; | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Redundant enabled check
Suggested change
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time! Prompt To Fix With AIThis is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 61:61
Comment:
**Redundant enabled check**
`emitLangfuseTrace` checks `process.env.LANGFUSE_PUBLIC_KEY` and `LANGFUSE_SECRET_KEY` here, and then `traceProxyRequest` immediately checks `isLangfuseEnabled()` which performs the exact same check. This is not a bug, but the outer check could use `isLangfuseEnabled()` for consistency, or be removed entirely since `traceProxyRequest` already guards against it.
```suggestion
if (!process.env.LANGFUSE_PUBLIC_KEY || !process.env.LANGFUSE_SECRET_KEY) return;
```
<sub>Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!</sub>
How can I resolve this? If you propose a fix, please make it concise. |
||||||
|
|
||||||
| void import("@/lib/langfuse/trace-proxy-request") | ||||||
| .then(({ traceProxyRequest }) => { | ||||||
| void traceProxyRequest({ | ||||||
| session, | ||||||
| responseHeaders: data.responseHeaders, | ||||||
| durationMs: data.durationMs, | ||||||
| statusCode: data.statusCode, | ||||||
| isStreaming: data.isStreaming, | ||||||
| responseText: data.responseText, | ||||||
| usageMetrics: data.usageMetrics, | ||||||
| costUsd: data.costUsd, | ||||||
| costBreakdown: data.costBreakdown, | ||||||
| sseEventCount: data.sseEventCount, | ||||||
| errorMessage: data.errorMessage, | ||||||
| }); | ||||||
| }) | ||||||
| .catch((err) => { | ||||||
| logger.warn("[ResponseHandler] Langfuse trace failed", { | ||||||
| error: err instanceof Error ? err.message : String(err), | ||||||
| }); | ||||||
| }); | ||||||
| } | ||||||
|
Comment on lines
+46
to
+84
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion | 🟠 Major
Line 61 直接读取 建议统一为 isLangfuseEnabled()+import { isLangfuseEnabled } from "@/lib/langfuse/index";
+
function emitLangfuseTrace(
session: ProxySession,
data: { ... }
): void {
- if (!process.env.LANGFUSE_PUBLIC_KEY || !process.env.LANGFUSE_SECRET_KEY) return;
+ if (!isLangfuseEnabled()) return;
void import("@/lib/langfuse/trace-proxy-request")🤖 Prompt for AI Agents |
||||||
|
|
||||||
| /** | ||||||
| * 清理 Response headers 中的传输相关 header | ||||||
| * | ||||||
|
|
@@ -520,6 +564,18 @@ export class ProxyResponseHandler { | |||||
| duration, | ||||||
| errorMessageForFinalize | ||||||
| ); | ||||||
|
|
||||||
| emitLangfuseTrace(session, { | ||||||
| responseHeaders: response.headers, | ||||||
| responseText, | ||||||
| usageMetrics: parseUsageFromResponseText(responseText, provider.providerType) | ||||||
| .usageMetrics, | ||||||
| costUsd: undefined, | ||||||
| statusCode, | ||||||
| durationMs: duration, | ||||||
| isStreaming: false, | ||||||
| errorMessage: errorMessageForFinalize, | ||||||
| }); | ||||||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| } catch (error) { | ||||||
| if (!isClientAbortError(error as Error)) { | ||||||
| logger.error( | ||||||
|
|
@@ -687,10 +743,11 @@ export class ProxyResponseHandler { | |||||
| await trackCostToRedis(session, usageMetrics); | ||||||
| } | ||||||
|
|
||||||
| // 更新 session 使用量到 Redis(用于实时监控) | ||||||
| if (session.sessionId && usageMetrics) { | ||||||
| // 计算成本(复用相同逻辑) | ||||||
| let costUsdStr: string | undefined; | ||||||
| // Calculate cost for session tracking (with multiplier) and Langfuse (raw) | ||||||
| let costUsdStr: string | undefined; | ||||||
| let rawCostUsdStr: string | undefined; | ||||||
| let costBreakdown: CostBreakdown | undefined; | ||||||
| if (usageMetrics) { | ||||||
| try { | ||||||
| if (session.request.model) { | ||||||
| const priceData = await session.getCachedPriceDataByBillingSource(); | ||||||
|
|
@@ -704,14 +761,41 @@ export class ProxyResponseHandler { | |||||
| if (cost.gt(0)) { | ||||||
| costUsdStr = cost.toString(); | ||||||
| } | ||||||
| // Raw cost without multiplier for Langfuse | ||||||
| if (provider.costMultiplier !== 1) { | ||||||
| const rawCost = calculateRequestCost( | ||||||
| usageMetrics, | ||||||
| priceData, | ||||||
| 1.0, | ||||||
| session.getContext1mApplied() | ||||||
| ); | ||||||
| if (rawCost.gt(0)) { | ||||||
| rawCostUsdStr = rawCost.toString(); | ||||||
| } | ||||||
| } else { | ||||||
| rawCostUsdStr = costUsdStr; | ||||||
| } | ||||||
|
Comment on lines
+764
to
+777
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
| // Cost breakdown for Langfuse (raw, no multiplier) | ||||||
| try { | ||||||
| costBreakdown = calculateRequestCostBreakdown( | ||||||
| usageMetrics, | ||||||
| priceData, | ||||||
| session.getContext1mApplied() | ||||||
| ); | ||||||
| } catch { | ||||||
| /* non-critical */ | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
| } catch (error) { | ||||||
| logger.error("[ResponseHandler] Failed to calculate session cost, skipping", { | ||||||
| error: error instanceof Error ? error.message : String(error), | ||||||
| }); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| // 更新 session 使用量到 Redis(用于实时监控) | ||||||
| if (session.sessionId && usageMetrics) { | ||||||
| void SessionManager.updateSessionUsage(session.sessionId, { | ||||||
| inputTokens: usageMetrics.input_tokens, | ||||||
| outputTokens: usageMetrics.output_tokens, | ||||||
|
|
@@ -782,6 +866,17 @@ export class ProxyResponseHandler { | |||||
| providerName: provider.name, | ||||||
| statusCode, | ||||||
| }); | ||||||
|
|
||||||
| emitLangfuseTrace(session, { | ||||||
| responseHeaders: response.headers, | ||||||
| responseText, | ||||||
| usageMetrics, | ||||||
| costUsd: rawCostUsdStr, | ||||||
| costBreakdown, | ||||||
| statusCode, | ||||||
| durationMs: Date.now() - session.startTime, | ||||||
| isStreaming: false, | ||||||
| }); | ||||||
| } catch (error) { | ||||||
| // 检测 AbortError 的来源:响应超时 vs 客户端中断 | ||||||
| const err = error as Error; | ||||||
|
|
@@ -1220,6 +1315,18 @@ export class ProxyResponseHandler { | |||||
| finalized.errorMessage ?? undefined, | ||||||
| finalized.providerIdForPersistence ?? undefined | ||||||
| ); | ||||||
|
|
||||||
| emitLangfuseTrace(session, { | ||||||
| responseHeaders: response.headers, | ||||||
| responseText: allContent, | ||||||
| usageMetrics: parseUsageFromResponseText(allContent, provider.providerType) | ||||||
| .usageMetrics, | ||||||
| costUsd: undefined, | ||||||
| statusCode: finalized.effectiveStatusCode, | ||||||
| durationMs: duration, | ||||||
| isStreaming: true, | ||||||
| errorMessage: finalized.errorMessage ?? undefined, | ||||||
| }); | ||||||
| } catch (error) { | ||||||
| const err = error instanceof Error ? error : new Error(String(error)); | ||||||
| const clientAborted = session.clientAbortSignal?.aborted ?? false; | ||||||
|
|
@@ -1588,11 +1695,13 @@ export class ProxyResponseHandler { | |||||
| // 追踪消费到 Redis(用于限流) | ||||||
| await trackCostToRedis(session, usageForCost); | ||||||
|
|
||||||
| // 更新 session 使用量到 Redis(用于实时监控) | ||||||
| if (session.sessionId) { | ||||||
| let costUsdStr: string | undefined; | ||||||
| // Calculate cost for session tracking (with multiplier) and Langfuse (raw) | ||||||
| let costUsdStr: string | undefined; | ||||||
| let rawCostUsdStr: string | undefined; | ||||||
| let costBreakdown: CostBreakdown | undefined; | ||||||
| if (usageForCost) { | ||||||
| try { | ||||||
| if (usageForCost && session.request.model) { | ||||||
| if (session.request.model) { | ||||||
| const priceData = await session.getCachedPriceDataByBillingSource(); | ||||||
| if (priceData) { | ||||||
| const cost = calculateRequestCost( | ||||||
|
|
@@ -1604,14 +1713,41 @@ export class ProxyResponseHandler { | |||||
| if (cost.gt(0)) { | ||||||
| costUsdStr = cost.toString(); | ||||||
| } | ||||||
| // Raw cost without multiplier for Langfuse | ||||||
| if (provider.costMultiplier !== 1) { | ||||||
| const rawCost = calculateRequestCost( | ||||||
| usageForCost, | ||||||
| priceData, | ||||||
| 1.0, | ||||||
| session.getContext1mApplied() | ||||||
| ); | ||||||
| if (rawCost.gt(0)) { | ||||||
| rawCostUsdStr = rawCost.toString(); | ||||||
| } | ||||||
| } else { | ||||||
| rawCostUsdStr = costUsdStr; | ||||||
| } | ||||||
|
Comment on lines
+1716
to
+1729
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
| // Cost breakdown for Langfuse (raw, no multiplier) | ||||||
| try { | ||||||
| costBreakdown = calculateRequestCostBreakdown( | ||||||
| usageForCost, | ||||||
| priceData, | ||||||
| session.getContext1mApplied() | ||||||
| ); | ||||||
| } catch { | ||||||
| /* non-critical */ | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
| } catch (error) { | ||||||
| logger.error("[ResponseHandler] Failed to calculate session cost (stream), skipping", { | ||||||
| error: error instanceof Error ? error.message : String(error), | ||||||
| }); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| // 更新 session 使用量到 Redis(用于实时监控) | ||||||
| if (session.sessionId) { | ||||||
| const payload: SessionUsageUpdate = { | ||||||
| status: effectiveStatusCode >= 200 && effectiveStatusCode < 300 ? "completed" : "error", | ||||||
| statusCode: effectiveStatusCode, | ||||||
|
|
@@ -1650,6 +1786,19 @@ export class ProxyResponseHandler { | |||||
| providerId: providerIdForPersistence ?? session.provider?.id, // 更新最终供应商ID(重试切换后) | ||||||
| context1mApplied: session.getContext1mApplied(), | ||||||
| }); | ||||||
|
|
||||||
| emitLangfuseTrace(session, { | ||||||
| responseHeaders: response.headers, | ||||||
| responseText: allContent, | ||||||
| usageMetrics: usageForCost, | ||||||
| costUsd: rawCostUsdStr, | ||||||
| costBreakdown, | ||||||
| statusCode: effectiveStatusCode, | ||||||
| durationMs: duration, | ||||||
| isStreaming: true, | ||||||
| sseEventCount: chunks.length, | ||||||
| errorMessage: streamErrorMessage ?? undefined, | ||||||
| }); | ||||||
| }; | ||||||
|
|
||||||
| try { | ||||||
|
|
@@ -2919,6 +3068,18 @@ async function persistRequestFailure(options: { | |||||
| }); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| // Emit Langfuse trace for error/abort paths | ||||||
| emitLangfuseTrace(session, { | ||||||
| responseHeaders: new Headers(), | ||||||
| responseText: "", | ||||||
| usageMetrics: null, | ||||||
| costUsd: undefined, | ||||||
| statusCode, | ||||||
| durationMs: duration, | ||||||
| isStreaming: phase === "stream", | ||||||
| errorMessage, | ||||||
| }); | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
缺少
LANGFUSE_MAX_IO_SIZE配置项说明。env schema 中定义了
LANGFUSE_MAX_IO_SIZE(默认 100,000,范围 1-10,000,000),但.env.example中未包含该配置项。建议补充以便运维人员了解此可调参数。建议补充
LANGFUSE_SAMPLE_RATE=1.0 # Trace sampling rate (0.0-1.0, default: 1.0 = 100%) LANGFUSE_DEBUG=false # Enable Langfuse debug logging +LANGFUSE_MAX_IO_SIZE=100000 # Max I/O size per trace (chars, default: 100000, max: 10000000)📝 Committable suggestion
🧰 Tools
🪛 dotenv-linter (4.0.0)
[warning] 135-135: [SpaceCharacter] The line has spaces around equal sign
(SpaceCharacter)
[warning] 135-135: [ValueWithoutQuotes] This value needs to be surrounded in quotes
(ValueWithoutQuotes)
[warning] 136-136: [SpaceCharacter] The line has spaces around equal sign
(SpaceCharacter)
[warning] 136-136: [ValueWithoutQuotes] This value needs to be surrounded in quotes
(ValueWithoutQuotes)
[warning] 137-137: [UnorderedKey] The LANGFUSE_BASE_URL key should go before the LANGFUSE_PUBLIC_KEY key
(UnorderedKey)
[warning] 137-137: [ValueWithoutQuotes] This value needs to be surrounded in quotes
(ValueWithoutQuotes)
[warning] 138-138: [UnorderedKey] The LANGFUSE_SAMPLE_RATE key should go before the LANGFUSE_SECRET_KEY key
(UnorderedKey)
[warning] 138-138: [ValueWithoutQuotes] This value needs to be surrounded in quotes
(ValueWithoutQuotes)
[warning] 139-139: [UnorderedKey] The LANGFUSE_DEBUG key should go before the LANGFUSE_PUBLIC_KEY key
(UnorderedKey)
[warning] 139-139: [ValueWithoutQuotes] This value needs to be surrounded in quotes
(ValueWithoutQuotes)
🤖 Prompt for AI Agents