diff --git a/.env.example b/.env.example index 5e2c5031e..72ea5933c 100644 --- a/.env.example +++ b/.env.example @@ -128,6 +128,16 @@ FETCH_HEADERS_TIMEOUT=600000 FETCH_BODY_TIMEOUT=600000 MAX_RETRY_ATTEMPTS_DEFAULT=2 # 单供应商最大尝试次数(含首次调用),范围 1-10,留空使用默认值 2 +# Langfuse Observability (optional, auto-enabled when keys are set) +# 功能说明:企业级 LLM 可观测性集成,自动追踪所有代理请求的完整生命周期 +# - 配置 PUBLIC_KEY 和 SECRET_KEY 后自动启用 +# - 支持 Langfuse Cloud 和自托管实例 +LANGFUSE_PUBLIC_KEY= # Langfuse project public key (pk-lf-...) +LANGFUSE_SECRET_KEY= # Langfuse project secret key (sk-lf-...) +LANGFUSE_BASE_URL=https://cloud.langfuse.com # Langfuse server URL (self-hosted or cloud) +LANGFUSE_SAMPLE_RATE=1.0 # Trace sampling rate (0.0-1.0, default: 1.0 = 100%) +LANGFUSE_DEBUG=false # Enable Langfuse debug logging + # 智能探测配置 # 功能说明:当熔断器处于 OPEN 状态时,定期探测供应商以实现更快恢复 # - ENABLE_SMART_PROBING:是否启用智能探测(默认:false) diff --git a/package.json b/package.json index 6411d827c..fb7d0914c 100644 --- a/package.json +++ b/package.json @@ -44,7 +44,11 @@ "@hono/zod-openapi": "^1", "@hookform/resolvers": "^5", "@iarna/toml": "^2.2.5", + "@langfuse/client": "^4.6.1", + "@langfuse/otel": "^4.6.1", + "@langfuse/tracing": "^4.6.1", "@lobehub/icons": "^2", + "@opentelemetry/sdk-node": "^0.212.0", "@radix-ui/react-alert-dialog": "^1", "@radix-ui/react-avatar": "^1", "@radix-ui/react-checkbox": "^1", diff --git a/src/app/v1/_lib/proxy-handler.ts b/src/app/v1/_lib/proxy-handler.ts index 5257ef670..5f2b90b4e 100644 --- a/src/app/v1/_lib/proxy-handler.ts +++ b/src/app/v1/_lib/proxy-handler.ts @@ -78,9 +78,12 @@ export async function handleProxyRequest(c: Context): Promise { }); } + session.recordForwardStart(); const response = await ProxyForwarder.send(session); const handled = await ProxyResponseHandler.dispatch(session, response); - return await attachSessionIdToErrorResponse(session.sessionId, handled); + const finalResponse = await attachSessionIdToErrorResponse(session.sessionId, handled); + + return finalResponse; } catch (error) { logger.error("Proxy handler error:", error); if (session) { diff --git a/src/app/v1/_lib/proxy/forwarder.ts b/src/app/v1/_lib/proxy/forwarder.ts index 3759a8570..1abf4019c 100644 --- a/src/app/v1/_lib/proxy/forwarder.ts +++ b/src/app/v1/_lib/proxy/forwarder.ts @@ -1648,6 +1648,7 @@ export class ProxyForwarder { const bodyString = JSON.stringify(bodyToSerialize); requestBody = bodyString; + session.forwardedRequestBody = bodyString; } // 检测流式请求:Gemini 支持两种方式 @@ -1974,6 +1975,7 @@ export class ProxyForwarder { const bodyString = JSON.stringify(messageToSend); requestBody = bodyString; + session.forwardedRequestBody = bodyString; try { const parsed = JSON.parse(bodyString); diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index c2afe9d90..3dc16dc1b 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -8,7 +8,8 @@ import { RateLimitService } from "@/lib/rate-limit"; import type { LeaseWindowType } from "@/lib/rate-limit/lease"; import { SessionManager } from "@/lib/session-manager"; import { SessionTracker } from "@/lib/session-tracker"; -import { calculateRequestCost } from "@/lib/utils/cost-calculation"; +import type { CostBreakdown } from "@/lib/utils/cost-calculation"; +import { calculateRequestCost, calculateRequestCostBreakdown } from "@/lib/utils/cost-calculation"; import { hasValidPriceData } from "@/lib/utils/price-data"; import { isSSEText, parseSSEData } from "@/lib/utils/sse"; import { detectUpstreamErrorFromSseOrJsonText } from "@/lib/utils/upstream-error-detection"; @@ -39,6 +40,49 @@ export type UsageMetrics = { output_image_tokens?: number; }; +/** + * Fire Langfuse trace asynchronously. Non-blocking, error-tolerant. + */ +function emitLangfuseTrace( + session: ProxySession, + data: { + responseHeaders: Headers; + responseText: string; + usageMetrics: UsageMetrics | null; + costUsd: string | undefined; + costBreakdown?: CostBreakdown; + statusCode: number; + durationMs: number; + isStreaming: boolean; + sseEventCount?: number; + errorMessage?: string; + } +): void { + if (!process.env.LANGFUSE_PUBLIC_KEY || !process.env.LANGFUSE_SECRET_KEY) return; + + void import("@/lib/langfuse/trace-proxy-request") + .then(({ traceProxyRequest }) => { + void traceProxyRequest({ + session, + responseHeaders: data.responseHeaders, + durationMs: data.durationMs, + statusCode: data.statusCode, + isStreaming: data.isStreaming, + responseText: data.responseText, + usageMetrics: data.usageMetrics, + costUsd: data.costUsd, + costBreakdown: data.costBreakdown, + sseEventCount: data.sseEventCount, + errorMessage: data.errorMessage, + }); + }) + .catch((err) => { + logger.warn("[ResponseHandler] Langfuse trace failed", { + error: err instanceof Error ? err.message : String(err), + }); + }); +} + /** * 清理 Response headers 中的传输相关 header * @@ -520,6 +564,18 @@ export class ProxyResponseHandler { duration, errorMessageForFinalize ); + + emitLangfuseTrace(session, { + responseHeaders: response.headers, + responseText, + usageMetrics: parseUsageFromResponseText(responseText, provider.providerType) + .usageMetrics, + costUsd: undefined, + statusCode, + durationMs: duration, + isStreaming: false, + errorMessage: errorMessageForFinalize, + }); } catch (error) { if (!isClientAbortError(error as Error)) { logger.error( @@ -687,10 +743,11 @@ export class ProxyResponseHandler { await trackCostToRedis(session, usageMetrics); } - // 更新 session 使用量到 Redis(用于实时监控) - if (session.sessionId && usageMetrics) { - // 计算成本(复用相同逻辑) - let costUsdStr: string | undefined; + // Calculate cost for session tracking (with multiplier) and Langfuse (raw) + let costUsdStr: string | undefined; + let rawCostUsdStr: string | undefined; + let costBreakdown: CostBreakdown | undefined; + if (usageMetrics) { try { if (session.request.model) { const priceData = await session.getCachedPriceDataByBillingSource(); @@ -704,6 +761,30 @@ export class ProxyResponseHandler { if (cost.gt(0)) { costUsdStr = cost.toString(); } + // Raw cost without multiplier for Langfuse + if (provider.costMultiplier !== 1) { + const rawCost = calculateRequestCost( + usageMetrics, + priceData, + 1.0, + session.getContext1mApplied() + ); + if (rawCost.gt(0)) { + rawCostUsdStr = rawCost.toString(); + } + } else { + rawCostUsdStr = costUsdStr; + } + // Cost breakdown for Langfuse (raw, no multiplier) + try { + costBreakdown = calculateRequestCostBreakdown( + usageMetrics, + priceData, + session.getContext1mApplied() + ); + } catch { + /* non-critical */ + } } } } catch (error) { @@ -711,7 +792,10 @@ export class ProxyResponseHandler { error: error instanceof Error ? error.message : String(error), }); } + } + // 更新 session 使用量到 Redis(用于实时监控) + if (session.sessionId && usageMetrics) { void SessionManager.updateSessionUsage(session.sessionId, { inputTokens: usageMetrics.input_tokens, outputTokens: usageMetrics.output_tokens, @@ -782,6 +866,17 @@ export class ProxyResponseHandler { providerName: provider.name, statusCode, }); + + emitLangfuseTrace(session, { + responseHeaders: response.headers, + responseText, + usageMetrics, + costUsd: rawCostUsdStr, + costBreakdown, + statusCode, + durationMs: Date.now() - session.startTime, + isStreaming: false, + }); } catch (error) { // 检测 AbortError 的来源:响应超时 vs 客户端中断 const err = error as Error; @@ -1220,6 +1315,18 @@ export class ProxyResponseHandler { finalized.errorMessage ?? undefined, finalized.providerIdForPersistence ?? undefined ); + + emitLangfuseTrace(session, { + responseHeaders: response.headers, + responseText: allContent, + usageMetrics: parseUsageFromResponseText(allContent, provider.providerType) + .usageMetrics, + costUsd: undefined, + statusCode: finalized.effectiveStatusCode, + durationMs: duration, + isStreaming: true, + errorMessage: finalized.errorMessage ?? undefined, + }); } catch (error) { const err = error instanceof Error ? error : new Error(String(error)); const clientAborted = session.clientAbortSignal?.aborted ?? false; @@ -1588,11 +1695,13 @@ export class ProxyResponseHandler { // 追踪消费到 Redis(用于限流) await trackCostToRedis(session, usageForCost); - // 更新 session 使用量到 Redis(用于实时监控) - if (session.sessionId) { - let costUsdStr: string | undefined; + // Calculate cost for session tracking (with multiplier) and Langfuse (raw) + let costUsdStr: string | undefined; + let rawCostUsdStr: string | undefined; + let costBreakdown: CostBreakdown | undefined; + if (usageForCost) { try { - if (usageForCost && session.request.model) { + if (session.request.model) { const priceData = await session.getCachedPriceDataByBillingSource(); if (priceData) { const cost = calculateRequestCost( @@ -1604,6 +1713,30 @@ export class ProxyResponseHandler { if (cost.gt(0)) { costUsdStr = cost.toString(); } + // Raw cost without multiplier for Langfuse + if (provider.costMultiplier !== 1) { + const rawCost = calculateRequestCost( + usageForCost, + priceData, + 1.0, + session.getContext1mApplied() + ); + if (rawCost.gt(0)) { + rawCostUsdStr = rawCost.toString(); + } + } else { + rawCostUsdStr = costUsdStr; + } + // Cost breakdown for Langfuse (raw, no multiplier) + try { + costBreakdown = calculateRequestCostBreakdown( + usageForCost, + priceData, + session.getContext1mApplied() + ); + } catch { + /* non-critical */ + } } } } catch (error) { @@ -1611,7 +1744,10 @@ export class ProxyResponseHandler { error: error instanceof Error ? error.message : String(error), }); } + } + // 更新 session 使用量到 Redis(用于实时监控) + if (session.sessionId) { const payload: SessionUsageUpdate = { status: effectiveStatusCode >= 200 && effectiveStatusCode < 300 ? "completed" : "error", statusCode: effectiveStatusCode, @@ -1650,6 +1786,19 @@ export class ProxyResponseHandler { providerId: providerIdForPersistence ?? session.provider?.id, // 更新最终供应商ID(重试切换后) context1mApplied: session.getContext1mApplied(), }); + + emitLangfuseTrace(session, { + responseHeaders: response.headers, + responseText: allContent, + usageMetrics: usageForCost, + costUsd: rawCostUsdStr, + costBreakdown, + statusCode: effectiveStatusCode, + durationMs: duration, + isStreaming: true, + sseEventCount: chunks.length, + errorMessage: streamErrorMessage ?? undefined, + }); }; try { @@ -2919,6 +3068,18 @@ async function persistRequestFailure(options: { }); } } + + // Emit Langfuse trace for error/abort paths + emitLangfuseTrace(session, { + responseHeaders: new Headers(), + responseText: "", + usageMetrics: null, + costUsd: undefined, + statusCode, + durationMs: duration, + isStreaming: phase === "stream", + errorMessage, + }); } /** diff --git a/src/app/v1/_lib/proxy/session.ts b/src/app/v1/_lib/proxy/session.ts index a163c772d..abae33872 100644 --- a/src/app/v1/_lib/proxy/session.ts +++ b/src/app/v1/_lib/proxy/session.ts @@ -67,6 +67,12 @@ export class ProxySession { // Time To First Byte (ms). Streaming: first chunk. Non-stream: equals durationMs. ttfbMs: number | null = null; + // Timestamp when guard pipeline finished and forwarding started (epoch ms). + forwardStartTime: number | null = null; + + // Actual serialized request body sent to upstream (after all preprocessing). + forwardedRequestBody: string | null = null; + // Session ID(用于会话粘性和并发限流) sessionId: string | null; @@ -313,6 +319,16 @@ export class ProxySession { return value; } + /** + * Record the timestamp when guard pipeline finished and upstream forwarding begins. + * Called once; subsequent calls are no-ops. + */ + recordForwardStart(): void { + if (this.forwardStartTime === null) { + this.forwardStartTime = Date.now(); + } + } + /** * 设置 session ID */ diff --git a/src/instrumentation.ts b/src/instrumentation.ts index a3303e51d..85ddfd3ba 100644 --- a/src/instrumentation.ts +++ b/src/instrumentation.ts @@ -140,6 +140,15 @@ function warmupApiKeyVacuumFilter(): void { export async function register() { // 仅在服务器端执行 if (process.env.NEXT_RUNTIME === "nodejs") { + // Initialize Langfuse observability (no-op if env vars not set) + try { + const { initLangfuse } = await import("@/lib/langfuse"); + await initLangfuse(); + } catch (error) { + logger.warn("[Instrumentation] Langfuse initialization failed (non-critical)", { + error: error instanceof Error ? error.message : String(error), + }); + } // Skip initialization in CI environment (no DB connection needed) if (process.env.CI === "true") { logger.warn( @@ -216,6 +225,16 @@ export async function register() { }); } + // Flush Langfuse pending spans + try { + const { shutdownLangfuse } = await import("@/lib/langfuse"); + await shutdownLangfuse(); + } catch (error) { + logger.warn("[Instrumentation] Failed to shutdown Langfuse", { + error: error instanceof Error ? error.message : String(error), + }); + } + // 尽力将 message_request 的异步批量更新刷入数据库(避免终止时丢失尾部日志) try { const { stopMessageRequestWriteBuffer } = await import( diff --git a/src/lib/config/env.schema.ts b/src/lib/config/env.schema.ts index a845a0db5..b7dacd738 100644 --- a/src/lib/config/env.schema.ts +++ b/src/lib/config/env.schema.ts @@ -127,6 +127,13 @@ export const EnvSchema = z.object({ FETCH_BODY_TIMEOUT: z.coerce.number().default(600_000), // 请求/响应体传输超时(默认 600 秒) FETCH_HEADERS_TIMEOUT: z.coerce.number().default(600_000), // 响应头接收超时(默认 600 秒) FETCH_CONNECT_TIMEOUT: z.coerce.number().default(30000), // TCP 连接建立超时(默认 30 秒) + + // Langfuse Observability (optional, auto-enabled when keys are set) + LANGFUSE_PUBLIC_KEY: z.string().optional(), + LANGFUSE_SECRET_KEY: z.string().optional(), + LANGFUSE_BASE_URL: z.string().default("https://cloud.langfuse.com"), + LANGFUSE_SAMPLE_RATE: z.coerce.number().min(0).max(1).default(1.0), + LANGFUSE_DEBUG: z.string().default("false").transform(booleanTransform), }); /** diff --git a/src/lib/langfuse/index.ts b/src/lib/langfuse/index.ts new file mode 100644 index 000000000..56889ed36 --- /dev/null +++ b/src/lib/langfuse/index.ts @@ -0,0 +1,92 @@ +import type { LangfuseSpanProcessor } from "@langfuse/otel"; + +import type { NodeSDK } from "@opentelemetry/sdk-node"; +import { logger } from "@/lib/logger"; + +let sdk: NodeSDK | null = null; +let spanProcessor: LangfuseSpanProcessor | null = null; +let initialized = false; + +export function isLangfuseEnabled(): boolean { + return !!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY); +} + +/** + * Initialize Langfuse OpenTelemetry SDK. + * Must be called early in the process (instrumentation.ts register()). + * No-op if LANGFUSE_PUBLIC_KEY / LANGFUSE_SECRET_KEY are not set. + */ +export async function initLangfuse(): Promise { + if (initialized || !isLangfuseEnabled()) { + return; + } + + try { + const { NodeSDK: OtelNodeSDK } = await import("@opentelemetry/sdk-node"); + const { LangfuseSpanProcessor: LfSpanProcessor } = await import("@langfuse/otel"); + + const sampleRate = Number.parseFloat(process.env.LANGFUSE_SAMPLE_RATE || "1.0"); + + spanProcessor = new LfSpanProcessor({ + publicKey: process.env.LANGFUSE_PUBLIC_KEY, + secretKey: process.env.LANGFUSE_SECRET_KEY, + baseUrl: process.env.LANGFUSE_BASE_URL || "https://cloud.langfuse.com", + // Only export spans from langfuse-sdk scope (avoid noise from other OTel instrumentations) + shouldExportSpan: ({ otelSpan }) => otelSpan.instrumentationScope.name === "langfuse-sdk", + }); + + const samplerConfig = + sampleRate < 1.0 + ? await (async () => { + const { TraceIdRatioBasedSampler } = await import("@opentelemetry/sdk-trace-base"); + return { sampler: new TraceIdRatioBasedSampler(sampleRate) }; + })() + : {}; + + sdk = new OtelNodeSDK({ + spanProcessors: [spanProcessor], + ...samplerConfig, + }); + + sdk.start(); + initialized = true; + + logger.info("[Langfuse] Observability initialized", { + baseUrl: process.env.LANGFUSE_BASE_URL || "https://cloud.langfuse.com", + sampleRate, + debug: process.env.LANGFUSE_DEBUG === "true", + }); + + if (process.env.LANGFUSE_DEBUG === "true") { + const { configureGlobalLogger, LogLevel } = await import("@langfuse/core"); + configureGlobalLogger({ level: LogLevel.DEBUG }); + } + } catch (error) { + logger.error("[Langfuse] Failed to initialize", { + error: error instanceof Error ? error.message : String(error), + }); + } +} + +/** + * Flush pending spans and shut down the SDK. + * Called during graceful shutdown (SIGTERM/SIGINT). + */ +export async function shutdownLangfuse(): Promise { + if (!initialized || !spanProcessor) { + return; + } + + try { + await spanProcessor.forceFlush(); + if (sdk) { + await sdk.shutdown(); + } + initialized = false; + logger.info("[Langfuse] Shutdown complete"); + } catch (error) { + logger.warn("[Langfuse] Shutdown error", { + error: error instanceof Error ? error.message : String(error), + }); + } +} diff --git a/src/lib/langfuse/trace-proxy-request.ts b/src/lib/langfuse/trace-proxy-request.ts new file mode 100644 index 000000000..cc940b394 --- /dev/null +++ b/src/lib/langfuse/trace-proxy-request.ts @@ -0,0 +1,363 @@ +import type { UsageMetrics } from "@/app/v1/_lib/proxy/response-handler"; +import type { ProxySession } from "@/app/v1/_lib/proxy/session"; +import { isLangfuseEnabled } from "@/lib/langfuse/index"; +import { logger } from "@/lib/logger"; +import type { CostBreakdown } from "@/lib/utils/cost-calculation"; + +function buildRequestBodySummary(session: ProxySession): Record { + const msg = session.request.message as Record; + return { + model: session.request.model, + messageCount: session.getMessagesLength(), + hasSystemPrompt: Array.isArray(msg.system) && msg.system.length > 0, + toolsCount: Array.isArray(msg.tools) ? msg.tools.length : 0, + stream: msg.stream === true, + maxTokens: typeof msg.max_tokens === "number" ? msg.max_tokens : undefined, + temperature: typeof msg.temperature === "number" ? msg.temperature : undefined, + }; +} + +function getStatusCategory(statusCode: number): string { + if (statusCode >= 200 && statusCode < 300) return "2xx"; + if (statusCode >= 400 && statusCode < 500) return "4xx"; + if (statusCode >= 500) return "5xx"; + return `${Math.floor(statusCode / 100)}xx`; +} + +/** + * Convert Headers to a plain record. + * + * Security note: session.headers are the CLIENT's original request headers + * (user -> CCH), which may include the user's own CCH auth key. These are + * safe to log -- the user already knows their own credentials. + * + * The upstream PROVIDER API key (outboundKey) is injected by ProxyForwarder + * into a separate Headers object and is NEVER present in session.headers or + * ctx.responseHeaders, so no redaction is needed here. + */ +function headersToRecord(headers: Headers): Record { + const result: Record = {}; + headers.forEach((value, key) => { + result[key] = value; + }); + return result; +} + +const SUCCESS_REASONS = new Set([ + "request_success", + "retry_success", + "initial_selection", + "session_reuse", +]); + +function isSuccessReason(reason: string | undefined): boolean { + return !!reason && SUCCESS_REASONS.has(reason); +} + +const ERROR_REASONS = new Set([ + "system_error", + "vendor_type_all_timeout", + "endpoint_pool_exhausted", +]); + +function isErrorReason(reason: string | undefined): boolean { + return !!reason && ERROR_REASONS.has(reason); +} + +type ObservationLevel = "DEBUG" | "DEFAULT" | "WARNING" | "ERROR"; + +export interface TraceContext { + session: ProxySession; + responseHeaders: Headers; + durationMs: number; + statusCode: number; + responseText?: string; + isStreaming: boolean; + sseEventCount?: number; + errorMessage?: string; + usageMetrics?: UsageMetrics | null; + costUsd?: string; + costBreakdown?: CostBreakdown; +} + +/** + * Send a trace to Langfuse for a completed proxy request. + * Fully async and non-blocking. Errors are caught and logged. + */ +export async function traceProxyRequest(ctx: TraceContext): Promise { + if (!isLangfuseEnabled()) { + return; + } + + try { + const { startObservation, propagateAttributes } = await import("@langfuse/tracing"); + + const { session, durationMs, statusCode, isStreaming } = ctx; + const provider = session.provider; + const messageContext = session.messageContext; + + // Compute actual request timing from session data + const requestStartTime = new Date(session.startTime); + const requestEndTime = new Date(session.startTime + durationMs); + + // Compute timing breakdown from forwardStartTime + const forwardStartDate = session.forwardStartTime ? new Date(session.forwardStartTime) : null; + const guardPipelineMs = session.forwardStartTime + ? session.forwardStartTime - session.startTime + : null; + + const timingBreakdown = { + guardPipelineMs, + upstreamTotalMs: + guardPipelineMs != null ? Math.max(0, durationMs - guardPipelineMs) : durationMs, + ttfbFromForwardMs: + guardPipelineMs != null && session.ttfbMs != null + ? Math.max(0, session.ttfbMs - guardPipelineMs) + : null, + tokenGenerationMs: session.ttfbMs != null ? Math.max(0, durationMs - session.ttfbMs) : null, + failedAttempts: session.getProviderChain().filter((i) => !isSuccessReason(i.reason)).length, + providersAttempted: new Set(session.getProviderChain().map((i) => i.id)).size, + }; + + // Compute observation level for root span + let rootSpanLevel: ObservationLevel = "DEFAULT"; + if (statusCode < 200 || statusCode >= 300) { + rootSpanLevel = "ERROR"; + } else { + const failedAttempts = session + .getProviderChain() + .filter((i) => !isSuccessReason(i.reason)).length; + if (failedAttempts >= 1) rootSpanLevel = "WARNING"; + } + + // Actual request body (forwarded to upstream after all preprocessing) - no truncation + const actualRequestBody = session.forwardedRequestBody + ? tryParseJsonSafe(session.forwardedRequestBody) + : session.request.message; + + // Actual response body - no truncation + const actualResponseBody = ctx.responseText + ? tryParseJsonSafe(ctx.responseText) + : isStreaming + ? { streaming: true, sseEventCount: ctx.sseEventCount } + : { statusCode }; + + // Root span metadata (former input/output summaries moved here) + const rootSpanMetadata: Record = { + endpoint: session.getEndpoint(), + method: session.method, + model: session.getCurrentModel(), + clientFormat: session.originalFormat, + providerName: provider?.name, + statusCode, + durationMs, + hasUsage: !!ctx.usageMetrics, + costUsd: ctx.costUsd, + timingBreakdown, + }; + + // Build tags - include provider name and model + const tags: string[] = []; + if (provider?.providerType) tags.push(provider.providerType); + if (provider?.name) tags.push(provider.name); + if (session.originalFormat) tags.push(session.originalFormat); + if (session.getCurrentModel()) tags.push(session.getCurrentModel()!); + tags.push(getStatusCategory(statusCode)); + + // Build trace-level metadata (propagateAttributes requires all values to be strings) + const traceMetadata: Record = { + keyName: messageContext?.key?.name ?? "", + endpoint: session.getEndpoint() ?? "", + method: session.method, + clientFormat: session.originalFormat, + userAgent: session.userAgent ?? "", + requestSequence: String(session.getRequestSequence()), + }; + + // Build generation metadata - all request detail fields, raw headers (no redaction) + const generationMetadata: Record = { + // Provider + providerId: provider?.id, + providerName: provider?.name, + providerType: provider?.providerType, + providerChain: session.getProviderChain(), + // Model + model: session.getCurrentModel(), + originalModel: session.getOriginalModel(), + modelRedirected: session.isModelRedirected(), + // Special settings + specialSettings: session.getSpecialSettings(), + // Request context + endpoint: session.getEndpoint(), + method: session.method, + clientFormat: session.originalFormat, + userAgent: session.userAgent, + requestSequence: session.getRequestSequence(), + sessionId: session.sessionId, + keyName: messageContext?.key?.name, + // Timing + durationMs, + ttfbMs: session.ttfbMs, + timingBreakdown, + // Flags + isStreaming, + cacheTtlApplied: session.getCacheTtlResolved(), + context1mApplied: session.getContext1mApplied(), + // Error + errorMessage: ctx.errorMessage, + // Request summary (quick overview) + requestSummary: buildRequestBodySummary(session), + // SSE + sseEventCount: ctx.sseEventCount, + // Headers (raw, no redaction) + requestHeaders: headersToRecord(session.headers), + responseHeaders: headersToRecord(ctx.responseHeaders), + }; + + // Build usage details for Langfuse generation + const usageDetails: Record | undefined = ctx.usageMetrics + ? { + ...(ctx.usageMetrics.input_tokens != null + ? { input: ctx.usageMetrics.input_tokens } + : {}), + ...(ctx.usageMetrics.output_tokens != null + ? { output: ctx.usageMetrics.output_tokens } + : {}), + ...(ctx.usageMetrics.cache_read_input_tokens != null + ? { cache_read_input_tokens: ctx.usageMetrics.cache_read_input_tokens } + : {}), + ...(ctx.usageMetrics.cache_creation_input_tokens != null + ? { cache_creation_input_tokens: ctx.usageMetrics.cache_creation_input_tokens } + : {}), + } + : undefined; + + // Build cost details (prefer breakdown, fallback to total-only) + const costDetails: Record | undefined = ctx.costBreakdown + ? { ...ctx.costBreakdown } + : ctx.costUsd && Number.parseFloat(ctx.costUsd) > 0 + ? { total: Number.parseFloat(ctx.costUsd) } + : undefined; + + // Create the root trace span with actual bodies, level, and metadata + const rootSpan = startObservation( + "proxy-request", + { + input: actualRequestBody, + output: actualResponseBody, + level: rootSpanLevel, + metadata: rootSpanMetadata, + }, + { + startTime: requestStartTime, + } + ); + + // Propagate trace attributes + await propagateAttributes( + { + userId: messageContext?.user?.name ?? undefined, + sessionId: session.sessionId ?? undefined, + tags, + metadata: traceMetadata, + traceName: `${session.method} ${session.getEndpoint() ?? "/"}`, + }, + async () => { + // 1. Guard pipeline span (if forwardStartTime was recorded) + if (forwardStartDate) { + const guardSpan = rootSpan.startObservation( + "guard-pipeline", + { + output: { durationMs: guardPipelineMs, passed: true }, + }, + { startTime: requestStartTime } as Record + ); + guardSpan.end(forwardStartDate); + } + + // 2. Provider attempt events (one per failed chain item) + for (const item of session.getProviderChain()) { + if (!isSuccessReason(item.reason)) { + const eventObs = rootSpan.startObservation( + "provider-attempt", + { + level: isErrorReason(item.reason) ? "ERROR" : "WARNING", + input: { + providerId: item.id, + providerName: item.name, + attempt: item.attemptNumber, + }, + output: { + reason: item.reason, + errorMessage: item.errorMessage, + statusCode: item.statusCode, + }, + metadata: { ...item }, + }, + { + asType: "event", + startTime: new Date(item.timestamp ?? session.startTime), + } as { asType: "event" } + ); + eventObs.end(); + } + } + + // 3. LLM generation (startTime = forwardStartTime when available) + const generationStartTime = forwardStartDate ?? requestStartTime; + + // Generation input/output = raw payload, no truncation + const generationInput = actualRequestBody; + const generationOutput = ctx.responseText + ? tryParseJsonSafe(ctx.responseText) + : isStreaming + ? { streaming: true, sseEventCount: ctx.sseEventCount } + : { statusCode }; + + // Create the LLM generation observation + const generation = rootSpan.startObservation( + "llm-call", + { + model: session.getCurrentModel() ?? undefined, + input: generationInput, + output: generationOutput, + ...(usageDetails && Object.keys(usageDetails).length > 0 ? { usageDetails } : {}), + ...(costDetails ? { costDetails } : {}), + metadata: generationMetadata, + }, + // SDK runtime supports startTime on child observations but types don't expose it + { asType: "generation", startTime: generationStartTime } as { asType: "generation" } + ); + + // Set TTFB as completionStartTime + if (session.ttfbMs != null) { + generation.update({ + completionStartTime: new Date(session.startTime + session.ttfbMs), + }); + } + + generation.end(requestEndTime); + } + ); + + // Explicitly set trace-level input/output (propagateAttributes does not support these) + rootSpan.updateTrace({ + input: actualRequestBody, + output: actualResponseBody, + }); + + rootSpan.end(requestEndTime); + } catch (error) { + logger.warn("[Langfuse] Failed to trace proxy request", { + error: error instanceof Error ? error.message : String(error), + }); + } +} + +function tryParseJsonSafe(text: string): unknown { + try { + return JSON.parse(text); + } catch { + return text; + } +} diff --git a/src/lib/utils/cost-calculation.ts b/src/lib/utils/cost-calculation.ts index 1212a1f99..0be83453d 100644 --- a/src/lib/utils/cost-calculation.ts +++ b/src/lib/utils/cost-calculation.ts @@ -98,6 +98,214 @@ function calculateTieredCostWithSeparatePrices( return baseCost.add(premiumCost); } +export interface CostBreakdown { + input: number; + output: number; + cache_creation: number; + cache_read: number; + total: number; +} + +/** + * Calculate cost breakdown by category (always raw cost, multiplier=1.0). + * Returns per-category costs as plain numbers. + */ +export function calculateRequestCostBreakdown( + usage: UsageMetrics, + priceData: ModelPriceData, + context1mApplied: boolean = false +): CostBreakdown { + let inputBucket = new Decimal(0); + let outputBucket = new Decimal(0); + let cacheCreationBucket = new Decimal(0); + let cacheReadBucket = new Decimal(0); + + const inputCostPerToken = priceData.input_cost_per_token; + const outputCostPerToken = priceData.output_cost_per_token; + const inputCostPerRequest = priceData.input_cost_per_request; + + // Per-request cost -> input bucket + if ( + typeof inputCostPerRequest === "number" && + Number.isFinite(inputCostPerRequest) && + inputCostPerRequest >= 0 + ) { + const requestCost = toDecimal(inputCostPerRequest); + if (requestCost) { + inputBucket = inputBucket.add(requestCost); + } + } + + const cacheCreation5mCost = + priceData.cache_creation_input_token_cost ?? + (inputCostPerToken != null ? inputCostPerToken * 1.25 : undefined); + + const cacheCreation1hCost = + priceData.cache_creation_input_token_cost_above_1hr ?? + (inputCostPerToken != null ? inputCostPerToken * 2 : undefined) ?? + cacheCreation5mCost; + + const cacheReadCost = + priceData.cache_read_input_token_cost ?? + (inputCostPerToken != null + ? inputCostPerToken * 0.1 + : outputCostPerToken != null + ? outputCostPerToken * 0.1 + : undefined); + + // Derive cache creation tokens by TTL + let cache5mTokens = usage.cache_creation_5m_input_tokens; + let cache1hTokens = usage.cache_creation_1h_input_tokens; + + if (typeof usage.cache_creation_input_tokens === "number") { + const remaining = + usage.cache_creation_input_tokens - (cache5mTokens ?? 0) - (cache1hTokens ?? 0); + + if (remaining > 0) { + const target = usage.cache_ttl === "1h" ? "1h" : "5m"; + if (target === "1h") { + cache1hTokens = (cache1hTokens ?? 0) + remaining; + } else { + cache5mTokens = (cache5mTokens ?? 0) + remaining; + } + } + } + + const inputAbove200k = priceData.input_cost_per_token_above_200k_tokens; + const outputAbove200k = priceData.output_cost_per_token_above_200k_tokens; + + // Input tokens -> input bucket + if (context1mApplied && inputCostPerToken != null && usage.input_tokens != null) { + inputBucket = inputBucket.add( + calculateTieredCost( + usage.input_tokens, + inputCostPerToken, + CONTEXT_1M_INPUT_PREMIUM_MULTIPLIER + ) + ); + } else if (inputAbove200k != null && inputCostPerToken != null && usage.input_tokens != null) { + inputBucket = inputBucket.add( + calculateTieredCostWithSeparatePrices(usage.input_tokens, inputCostPerToken, inputAbove200k) + ); + } else { + inputBucket = inputBucket.add(multiplyCost(usage.input_tokens, inputCostPerToken)); + } + + // Output tokens -> output bucket + if (context1mApplied && outputCostPerToken != null && usage.output_tokens != null) { + outputBucket = outputBucket.add( + calculateTieredCost( + usage.output_tokens, + outputCostPerToken, + CONTEXT_1M_OUTPUT_PREMIUM_MULTIPLIER + ) + ); + } else if (outputAbove200k != null && outputCostPerToken != null && usage.output_tokens != null) { + outputBucket = outputBucket.add( + calculateTieredCostWithSeparatePrices( + usage.output_tokens, + outputCostPerToken, + outputAbove200k + ) + ); + } else { + outputBucket = outputBucket.add(multiplyCost(usage.output_tokens, outputCostPerToken)); + } + + // Cache costs + const cacheCreationAbove200k = priceData.cache_creation_input_token_cost_above_200k_tokens; + const cacheReadAbove200k = priceData.cache_read_input_token_cost_above_200k_tokens; + const hasRealCacheCreationBase = priceData.cache_creation_input_token_cost != null; + const hasRealCacheReadBase = priceData.cache_read_input_token_cost != null; + + // Cache creation 5m -> cache_creation bucket + if (context1mApplied && cacheCreation5mCost != null && cache5mTokens != null) { + cacheCreationBucket = cacheCreationBucket.add( + calculateTieredCost(cache5mTokens, cacheCreation5mCost, CONTEXT_1M_INPUT_PREMIUM_MULTIPLIER) + ); + } else if ( + hasRealCacheCreationBase && + cacheCreationAbove200k != null && + cacheCreation5mCost != null && + cache5mTokens != null + ) { + cacheCreationBucket = cacheCreationBucket.add( + calculateTieredCostWithSeparatePrices( + cache5mTokens, + cacheCreation5mCost, + cacheCreationAbove200k + ) + ); + } else { + cacheCreationBucket = cacheCreationBucket.add(multiplyCost(cache5mTokens, cacheCreation5mCost)); + } + + // Cache creation 1h -> cache_creation bucket + if (context1mApplied && cacheCreation1hCost != null && cache1hTokens != null) { + cacheCreationBucket = cacheCreationBucket.add( + calculateTieredCost(cache1hTokens, cacheCreation1hCost, CONTEXT_1M_INPUT_PREMIUM_MULTIPLIER) + ); + } else if ( + hasRealCacheCreationBase && + cacheCreationAbove200k != null && + cacheCreation1hCost != null && + cache1hTokens != null + ) { + cacheCreationBucket = cacheCreationBucket.add( + calculateTieredCostWithSeparatePrices( + cache1hTokens, + cacheCreation1hCost, + cacheCreationAbove200k + ) + ); + } else { + cacheCreationBucket = cacheCreationBucket.add(multiplyCost(cache1hTokens, cacheCreation1hCost)); + } + + // Cache read -> cache_read bucket + if ( + hasRealCacheReadBase && + cacheReadAbove200k != null && + cacheReadCost != null && + usage.cache_read_input_tokens != null + ) { + cacheReadBucket = cacheReadBucket.add( + calculateTieredCostWithSeparatePrices( + usage.cache_read_input_tokens, + cacheReadCost, + cacheReadAbove200k + ) + ); + } else { + cacheReadBucket = cacheReadBucket.add( + multiplyCost(usage.cache_read_input_tokens, cacheReadCost) + ); + } + + // Image tokens -> respective buckets + if (usage.output_image_tokens != null && usage.output_image_tokens > 0) { + const imageCostPerToken = + priceData.output_cost_per_image_token ?? priceData.output_cost_per_token; + outputBucket = outputBucket.add(multiplyCost(usage.output_image_tokens, imageCostPerToken)); + } + + if (usage.input_image_tokens != null && usage.input_image_tokens > 0) { + const imageCostPerToken = + priceData.input_cost_per_image_token ?? priceData.input_cost_per_token; + inputBucket = inputBucket.add(multiplyCost(usage.input_image_tokens, imageCostPerToken)); + } + + const total = inputBucket.add(outputBucket).add(cacheCreationBucket).add(cacheReadBucket); + + return { + input: inputBucket.toDecimalPlaces(COST_SCALE).toNumber(), + output: outputBucket.toDecimalPlaces(COST_SCALE).toNumber(), + cache_creation: cacheCreationBucket.toDecimalPlaces(COST_SCALE).toNumber(), + cache_read: cacheReadBucket.toDecimalPlaces(COST_SCALE).toNumber(), + total: total.toDecimalPlaces(COST_SCALE).toNumber(), + }; +} + /** * 计算单次请求的费用 * @param usage - token使用量 diff --git a/tests/unit/langfuse/langfuse-trace.test.ts b/tests/unit/langfuse/langfuse-trace.test.ts new file mode 100644 index 000000000..c1760bb7d --- /dev/null +++ b/tests/unit/langfuse/langfuse-trace.test.ts @@ -0,0 +1,996 @@ +import { describe, expect, test, vi, beforeEach, afterEach } from "vitest"; + +// Mock the langfuse modules at the top level +const mockStartObservation = vi.fn(); +const mockPropagateAttributes = vi.fn(); +const mockSpanEnd = vi.fn(); +const mockGenerationEnd = vi.fn(); +const mockGenerationUpdate = vi.fn(); +const mockGuardSpanEnd = vi.fn(); +const mockEventEnd = vi.fn(); + +const mockGeneration: any = { + update: (...args: unknown[]) => { + mockGenerationUpdate(...args); + return mockGeneration; + }, + end: mockGenerationEnd, +}; + +const mockGuardSpan: any = { + end: mockGuardSpanEnd, +}; + +const mockEventObs: any = { + end: mockEventEnd, +}; + +const mockUpdateTrace = vi.fn(); + +const mockRootSpan = { + startObservation: vi.fn(), + updateTrace: mockUpdateTrace, + end: mockSpanEnd, +}; + +// Default: route by observation name +function setupDefaultStartObservation() { + mockRootSpan.startObservation.mockImplementation((name: string) => { + if (name === "guard-pipeline") return mockGuardSpan; + if (name === "provider-attempt") return mockEventObs; + return mockGeneration; // "llm-call" + }); +} + +vi.mock("@langfuse/tracing", () => ({ + startObservation: (...args: unknown[]) => { + mockStartObservation(...args); + return mockRootSpan; + }, + propagateAttributes: async (attrs: unknown, fn: () => Promise) => { + mockPropagateAttributes(attrs); + await fn(); + }, +})); + +vi.mock("@/lib/logger", () => ({ + logger: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }, +})); + +let langfuseEnabled = true; +vi.mock("@/lib/langfuse/index", () => ({ + isLangfuseEnabled: () => langfuseEnabled, +})); + +function createMockSession(overrides: Record = {}) { + const startTime = (overrides.startTime as number) ?? Date.now() - 500; + return { + startTime, + method: "POST", + headers: new Headers({ + "content-type": "application/json", + "x-api-key": "test-mock-key-not-real", + "user-agent": "claude-code/1.0", + }), + request: { + message: { + model: "claude-sonnet-4-20250514", + messages: [{ role: "user", content: "Hello" }], + stream: true, + max_tokens: 4096, + tools: [{ name: "tool1" }], + }, + model: "claude-sonnet-4-20250514", + }, + originalFormat: "claude", + userAgent: "claude-code/1.0", + sessionId: "sess_abc12345_def67890", + provider: { + id: 1, + name: "anthropic-main", + providerType: "claude", + }, + messageContext: { + id: 42, + user: { id: 7, name: "testuser" }, + key: { name: "default-key" }, + }, + ttfbMs: 200, + forwardStartTime: startTime + 5, + forwardedRequestBody: null, + getEndpoint: () => "/v1/messages", + getRequestSequence: () => 3, + getMessagesLength: () => 1, + getCurrentModel: () => "claude-sonnet-4-20250514", + getOriginalModel: () => "claude-sonnet-4-20250514", + isModelRedirected: () => false, + getProviderChain: () => [ + { + id: 1, + name: "anthropic-main", + providerType: "claude", + reason: "initial_selection", + timestamp: startTime + 2, + }, + ], + getSpecialSettings: () => null, + getCacheTtlResolved: () => null, + getContext1mApplied: () => false, + ...overrides, + } as any; +} + +describe("traceProxyRequest", () => { + beforeEach(() => { + vi.clearAllMocks(); + langfuseEnabled = true; + setupDefaultStartObservation(); + }); + + test("should not trace when Langfuse is disabled", async () => { + langfuseEnabled = false; + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + expect(mockStartObservation).not.toHaveBeenCalled(); + }); + + test("should trace when Langfuse is enabled with actual bodies", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + const responseBody = { content: "Hi there" }; + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers({ "content-type": "application/json" }), + durationMs: 500, + statusCode: 200, + isStreaming: false, + responseText: JSON.stringify(responseBody), + }); + + // Root span should have actual request body as input (not summary) + const rootCall = mockStartObservation.mock.calls[0]; + expect(rootCall[0]).toBe("proxy-request"); + // Input should be the actual request message (since forwardedRequestBody is null) + expect(rootCall[1].input).toEqual( + expect.objectContaining({ + model: "claude-sonnet-4-20250514", + messages: expect.any(Array), + }) + ); + // Output should be actual response body + expect(rootCall[1].output).toEqual(responseBody); + // Should have level + expect(rootCall[1].level).toBe("DEFAULT"); + // Should have metadata with former summaries + expect(rootCall[1].metadata).toEqual( + expect.objectContaining({ + endpoint: "/v1/messages", + method: "POST", + statusCode: 200, + durationMs: 500, + }) + ); + + // Should have child observations + const callNames = mockRootSpan.startObservation.mock.calls.map((c: unknown[]) => c[0]); + expect(callNames).toContain("guard-pipeline"); + expect(callNames).toContain("llm-call"); + + expect(mockSpanEnd).toHaveBeenCalledWith(expect.any(Date)); + expect(mockGenerationEnd).toHaveBeenCalledWith(expect.any(Date)); + }); + + test("should use actual request messages as generation input", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + const session = createMockSession(); + + await traceProxyRequest({ + session, + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + responseText: '{"content": "response"}', + }); + + // Find the llm-call invocation + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall).toBeDefined(); + expect(llmCall[1].input).toEqual(session.request.message); + }); + + test("should use actual response body as generation output", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + const responseBody = { content: [{ type: "text", text: "Hello!" }] }; + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + responseText: JSON.stringify(responseBody), + }); + + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[1].output).toEqual(responseBody); + }); + + test("should pass raw headers without redaction", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers({ "x-api-key": "secret-mock" }), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + const metadata = llmCall[1].metadata; + expect(metadata.requestHeaders["x-api-key"]).toBe("test-mock-key-not-real"); + expect(metadata.requestHeaders["content-type"]).toBe("application/json"); + expect(metadata.responseHeaders["x-api-key"]).toBe("secret-mock"); + }); + + test("should include provider name and model in tags", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + expect(mockPropagateAttributes).toHaveBeenCalledWith( + expect.objectContaining({ + userId: "testuser", + sessionId: "sess_abc12345_def67890", + tags: expect.arrayContaining([ + "claude", + "anthropic-main", + "claude-sonnet-4-20250514", + "2xx", + ]), + }) + ); + }); + + test("should include usage details when provided", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + usageMetrics: { + input_tokens: 100, + output_tokens: 50, + cache_read_input_tokens: 20, + }, + costUsd: "0.0015", + }); + + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[1].usageDetails).toEqual({ + input: 100, + output: 50, + cache_read_input_tokens: 20, + }); + expect(llmCall[1].costDetails).toEqual({ + total: 0.0015, + }); + }); + + test("should include providerChain, specialSettings, and model in metadata", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const providerChain = [ + { + id: 1, + name: "anthropic-main", + providerType: "claude", + reason: "initial_selection", + timestamp: Date.now(), + }, + ]; + + await traceProxyRequest({ + session: createMockSession({ + getSpecialSettings: () => ({ maxThinking: 8192 }), + getProviderChain: () => providerChain, + }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + const metadata = llmCall[1].metadata; + expect(metadata.providerChain).toEqual(providerChain); + expect(metadata.specialSettings).toEqual({ maxThinking: 8192 }); + expect(metadata.model).toBe("claude-sonnet-4-20250514"); + expect(metadata.originalModel).toBe("claude-sonnet-4-20250514"); + expect(metadata.providerName).toBe("anthropic-main"); + expect(metadata.requestSummary).toEqual( + expect.objectContaining({ + model: "claude-sonnet-4-20250514", + messageCount: 1, + }) + ); + }); + + test("should handle model redirect metadata", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession({ + isModelRedirected: () => true, + getOriginalModel: () => "claude-sonnet-4-20250514", + getCurrentModel: () => "glm-4", + request: { + message: { model: "glm-4", messages: [] }, + model: "glm-4", + }, + }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[1].metadata.modelRedirected).toBe(true); + expect(llmCall[1].metadata.originalModel).toBe("claude-sonnet-4-20250514"); + }); + + test("should set completionStartTime from ttfbMs", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const startTime = Date.now() - 500; + await traceProxyRequest({ + session: createMockSession({ startTime, ttfbMs: 200 }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + expect(mockGenerationUpdate).toHaveBeenCalledWith({ + completionStartTime: new Date(startTime + 200), + }); + }); + + test("should pass correct startTime and endTime to observations", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const startTime = 1700000000000; + const durationMs = 5000; + + await traceProxyRequest({ + session: createMockSession({ startTime, forwardStartTime: startTime + 5 }), + responseHeaders: new Headers(), + durationMs, + statusCode: 200, + isStreaming: false, + }); + + const expectedStart = new Date(startTime); + const expectedEnd = new Date(startTime + durationMs); + const expectedForwardStart = new Date(startTime + 5); + + // Root span gets startTime in options (3rd arg) + expect(mockStartObservation).toHaveBeenCalledWith("proxy-request", expect.any(Object), { + startTime: expectedStart, + }); + + // Generation gets forwardStartTime in options (3rd arg) + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[2]).toEqual({ + asType: "generation", + startTime: expectedForwardStart, + }); + + // Both end() calls receive the computed endTime + expect(mockGenerationEnd).toHaveBeenCalledWith(expectedEnd); + expect(mockSpanEnd).toHaveBeenCalledWith(expectedEnd); + }); + + test("should handle errors gracefully without throwing", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + // Make startObservation throw + mockStartObservation.mockImplementationOnce(() => { + throw new Error("SDK error"); + }); + + await expect( + traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }) + ).resolves.toBeUndefined(); + }); + + test("should include correct tags for error responses", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 502, + isStreaming: false, + errorMessage: "upstream error", + }); + + expect(mockPropagateAttributes).toHaveBeenCalledWith( + expect.objectContaining({ + tags: expect.arrayContaining(["5xx"]), + }) + ); + }); + + test("should pass large input/output without truncation", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + // Generate a large response text + const largeContent = "x".repeat(200_000); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + responseText: largeContent, + }); + + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + const output = llmCall[1].output as string; + // Should be the full content, no truncation + expect(output).toBe(largeContent); + expect(output).not.toContain("...[truncated]"); + }); + + test("should show streaming output with sseEventCount when no responseText", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: true, + sseEventCount: 42, + }); + + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[1].output).toEqual({ + streaming: true, + sseEventCount: 42, + }); + }); + + test("should include costUsd in root span metadata", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + costUsd: "0.05", + }); + + const rootCall = mockStartObservation.mock.calls[0]; + expect(rootCall[1].metadata).toEqual( + expect.objectContaining({ + costUsd: "0.05", + }) + ); + }); + + test("should set trace-level input/output via updateTrace with actual bodies", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + const responseBody = { result: "ok" }; + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + responseText: JSON.stringify(responseBody), + costUsd: "0.05", + }); + + expect(mockUpdateTrace).toHaveBeenCalledWith({ + input: expect.objectContaining({ + model: "claude-sonnet-4-20250514", + messages: expect.any(Array), + }), + output: responseBody, + }); + }); + + // --- New tests for multi-span hierarchy --- + + test("should create guard-pipeline span with correct timing", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const startTime = 1700000000000; + const forwardStartTime = startTime + 8; // 8ms guard pipeline + + await traceProxyRequest({ + session: createMockSession({ startTime, forwardStartTime }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const guardCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "guard-pipeline" + ); + expect(guardCall).toBeDefined(); + expect(guardCall[1]).toEqual({ + output: { durationMs: 8, passed: true }, + }); + expect(guardCall[2]).toEqual({ startTime: new Date(startTime) }); + + // Guard span should end at forwardStartTime + expect(mockGuardSpanEnd).toHaveBeenCalledWith(new Date(forwardStartTime)); + }); + + test("should skip guard-pipeline span when forwardStartTime is null", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession({ forwardStartTime: null }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const guardCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "guard-pipeline" + ); + expect(guardCall).toBeUndefined(); + expect(mockGuardSpanEnd).not.toHaveBeenCalled(); + }); + + test("should create provider-attempt events for failed chain items", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const startTime = 1700000000000; + const failTimestamp = startTime + 100; + + await traceProxyRequest({ + session: createMockSession({ + startTime, + getProviderChain: () => [ + { + id: 1, + name: "provider-a", + providerType: "claude", + reason: "retry_failed", + errorMessage: "502 Bad Gateway", + statusCode: 502, + attemptNumber: 1, + timestamp: failTimestamp, + }, + { + id: 2, + name: "provider-b", + providerType: "claude", + reason: "system_error", + errorMessage: "ECONNREFUSED", + timestamp: failTimestamp + 50, + }, + { + id: 3, + name: "provider-c", + providerType: "claude", + reason: "request_success", + timestamp: failTimestamp + 200, + }, + ], + }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const eventCalls = mockRootSpan.startObservation.mock.calls.filter( + (c: unknown[]) => c[0] === "provider-attempt" + ); + // 2 failed items (retry_failed + system_error), success is skipped + expect(eventCalls).toHaveLength(2); + + // First event: retry_failed -> WARNING level + expect(eventCalls[0][1]).toEqual( + expect.objectContaining({ + level: "WARNING", + input: expect.objectContaining({ + providerId: 1, + providerName: "provider-a", + attempt: 1, + }), + output: expect.objectContaining({ + reason: "retry_failed", + errorMessage: "502 Bad Gateway", + statusCode: 502, + }), + }) + ); + expect(eventCalls[0][2]).toEqual({ + asType: "event", + startTime: new Date(failTimestamp), + }); + + // Second event: system_error -> ERROR level + expect(eventCalls[1][1].level).toBe("ERROR"); + expect(eventCalls[1][1].output.reason).toBe("system_error"); + }); + + test("should set generation startTime to forwardStartTime", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const startTime = 1700000000000; + const forwardStartTime = startTime + 10; + + await traceProxyRequest({ + session: createMockSession({ startTime, forwardStartTime }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[2]).toEqual({ + asType: "generation", + startTime: new Date(forwardStartTime), + }); + }); + + test("should fall back to requestStartTime when forwardStartTime is null", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const startTime = 1700000000000; + + await traceProxyRequest({ + session: createMockSession({ startTime, forwardStartTime: null }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[2]).toEqual({ + asType: "generation", + startTime: new Date(startTime), + }); + }); + + test("should include timingBreakdown in root span metadata and generation metadata", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const startTime = 1700000000000; + const forwardStartTime = startTime + 5; + + await traceProxyRequest({ + session: createMockSession({ + startTime, + forwardStartTime, + ttfbMs: 105, + getProviderChain: () => [ + { id: 1, name: "p1", reason: "retry_failed", timestamp: startTime + 50 }, + { id: 2, name: "p2", reason: "request_success", timestamp: startTime + 100 }, + ], + }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const expectedTimingBreakdown = { + guardPipelineMs: 5, + upstreamTotalMs: 495, + ttfbFromForwardMs: 100, // ttfbMs(105) - guardPipelineMs(5) + tokenGenerationMs: 395, // durationMs(500) - ttfbMs(105) + failedAttempts: 1, // only retry_failed is non-success + providersAttempted: 2, // 2 unique provider ids + }; + + // Root span metadata should have timingBreakdown + const rootCall = mockStartObservation.mock.calls[0]; + expect(rootCall[1].metadata.timingBreakdown).toEqual(expectedTimingBreakdown); + + // Generation metadata should also have timingBreakdown + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[1].metadata.timingBreakdown).toEqual(expectedTimingBreakdown); + }); + + test("should not create provider-attempt events when all providers succeeded", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession({ + getProviderChain: () => [ + { id: 1, name: "p1", reason: "initial_selection", timestamp: Date.now() }, + { id: 1, name: "p1", reason: "request_success", timestamp: Date.now() }, + ], + }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const eventCalls = mockRootSpan.startObservation.mock.calls.filter( + (c: unknown[]) => c[0] === "provider-attempt" + ); + expect(eventCalls).toHaveLength(0); + }); + + // --- New tests for input/output, level, and cost breakdown --- + + test("should use forwardedRequestBody as trace input when available", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const forwardedBody = JSON.stringify({ + model: "claude-sonnet-4-20250514", + messages: [{ role: "user", content: "Preprocessed Hello" }], + stream: true, + }); + + await traceProxyRequest({ + session: createMockSession({ + forwardedRequestBody: forwardedBody, + }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + responseText: '{"ok": true}', + }); + + // Root span input should be the forwarded body (parsed JSON) + const rootCall = mockStartObservation.mock.calls[0]; + expect(rootCall[1].input).toEqual(JSON.parse(forwardedBody)); + + // updateTrace should also use forwarded body + expect(mockUpdateTrace).toHaveBeenCalledWith({ + input: JSON.parse(forwardedBody), + output: { ok: true }, + }); + }); + + test("should set root span level to DEFAULT for successful request", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const rootCall = mockStartObservation.mock.calls[0]; + expect(rootCall[1].level).toBe("DEFAULT"); + }); + + test("should set root span level to WARNING when retries occurred", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const startTime = Date.now() - 500; + await traceProxyRequest({ + session: createMockSession({ + startTime, + getProviderChain: () => [ + { id: 1, name: "p1", reason: "retry_failed", timestamp: startTime + 50 }, + { id: 2, name: "p2", reason: "request_success", timestamp: startTime + 200 }, + ], + }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const rootCall = mockStartObservation.mock.calls[0]; + expect(rootCall[1].level).toBe("WARNING"); + }); + + test("should set root span level to ERROR for non-200 status", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 502, + isStreaming: false, + }); + + const rootCall = mockStartObservation.mock.calls[0]; + expect(rootCall[1].level).toBe("ERROR"); + }); + + test("should set root span level to ERROR for 499 client abort", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 499, + isStreaming: false, + }); + + const rootCall = mockStartObservation.mock.calls[0]; + expect(rootCall[1].level).toBe("ERROR"); + }); + + test("should include cost breakdown in costDetails when provided", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const costBreakdown = { + input: 0.001, + output: 0.002, + cache_creation: 0.0005, + cache_read: 0.0001, + total: 0.0036, + }; + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + costUsd: "0.0036", + costBreakdown, + }); + + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[1].costDetails).toEqual(costBreakdown); + }); + + test("should fallback to total-only costDetails when no breakdown", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + costUsd: "0.05", + }); + + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[1].costDetails).toEqual({ total: 0.05 }); + }); + + test("should include former summaries in root span metadata", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + costUsd: "0.05", + }); + + const rootCall = mockStartObservation.mock.calls[0]; + const metadata = rootCall[1].metadata; + // Former input summary fields + expect(metadata.endpoint).toBe("/v1/messages"); + expect(metadata.method).toBe("POST"); + expect(metadata.model).toBe("claude-sonnet-4-20250514"); + expect(metadata.clientFormat).toBe("claude"); + expect(metadata.providerName).toBe("anthropic-main"); + // Former output summary fields + expect(metadata.statusCode).toBe(200); + expect(metadata.durationMs).toBe(500); + expect(metadata.costUsd).toBe("0.05"); + expect(metadata.timingBreakdown).toBeDefined(); + }); +}); + +describe("isLangfuseEnabled", () => { + const originalPublicKey = process.env.LANGFUSE_PUBLIC_KEY; + const originalSecretKey = process.env.LANGFUSE_SECRET_KEY; + + afterEach(() => { + // Restore env + if (originalPublicKey !== undefined) { + process.env.LANGFUSE_PUBLIC_KEY = originalPublicKey; + } else { + delete process.env.LANGFUSE_PUBLIC_KEY; + } + if (originalSecretKey !== undefined) { + process.env.LANGFUSE_SECRET_KEY = originalSecretKey; + } else { + delete process.env.LANGFUSE_SECRET_KEY; + } + }); + + test("should return false when env vars are not set", () => { + delete process.env.LANGFUSE_PUBLIC_KEY; + delete process.env.LANGFUSE_SECRET_KEY; + + // Direct function test (not using the mock) + const isEnabled = !!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY); + expect(isEnabled).toBe(false); + }); + + test("should return true when both keys are set", () => { + process.env.LANGFUSE_PUBLIC_KEY = "pk-lf-test-mock"; + process.env.LANGFUSE_SECRET_KEY = "test-mock-not-real"; + + const isEnabled = !!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY); + expect(isEnabled).toBe(true); + }); +}); diff --git a/tests/unit/lib/cost-calculation-breakdown.test.ts b/tests/unit/lib/cost-calculation-breakdown.test.ts new file mode 100644 index 000000000..b8589ffb9 --- /dev/null +++ b/tests/unit/lib/cost-calculation-breakdown.test.ts @@ -0,0 +1,159 @@ +import { describe, expect, test } from "vitest"; +import { calculateRequestCostBreakdown, type CostBreakdown } from "@/lib/utils/cost-calculation"; +import type { ModelPriceData } from "@/types/model-price"; + +function makePriceData(overrides: Partial = {}): ModelPriceData { + return { + input_cost_per_token: 0.000003, // $3/MTok + output_cost_per_token: 0.000015, // $15/MTok + cache_creation_input_token_cost: 0.00000375, // 1.25x input + cache_read_input_token_cost: 0.0000003, // 0.1x input + ...overrides, + }; +} + +describe("calculateRequestCostBreakdown", () => { + test("basic input + output tokens", () => { + const result = calculateRequestCostBreakdown( + { input_tokens: 1000, output_tokens: 500 }, + makePriceData() + ); + + expect(result.input).toBeCloseTo(0.003, 6); // 1000 * 0.000003 + expect(result.output).toBeCloseTo(0.0075, 6); // 500 * 0.000015 + expect(result.cache_creation).toBe(0); + expect(result.cache_read).toBe(0); + expect(result.total).toBeCloseTo(0.0105, 6); + }); + + test("cache creation (5m + 1h) + cache read", () => { + const result = calculateRequestCostBreakdown( + { + input_tokens: 100, + output_tokens: 50, + cache_creation_5m_input_tokens: 200, + cache_creation_1h_input_tokens: 300, + cache_read_input_tokens: 1000, + }, + makePriceData({ + cache_creation_input_token_cost_above_1hr: 0.000006, // 2x input + }) + ); + + // cache_creation = 200 * 0.00000375 + 300 * 0.000006 + expect(result.cache_creation).toBeCloseTo(0.00255, 6); + // cache_read = 1000 * 0.0000003 + expect(result.cache_read).toBeCloseTo(0.0003, 6); + expect(result.total).toBeCloseTo( + result.input + result.output + result.cache_creation + result.cache_read, + 10 + ); + }); + + test("image tokens go to input/output buckets", () => { + const result = calculateRequestCostBreakdown( + { + input_tokens: 100, + output_tokens: 50, + input_image_tokens: 500, + output_image_tokens: 200, + }, + makePriceData({ + input_cost_per_image_token: 0.00001, + output_cost_per_image_token: 0.00005, + }) + ); + + // input = 100 * 0.000003 + 500 * 0.00001 + expect(result.input).toBeCloseTo(0.0053, 6); + // output = 50 * 0.000015 + 200 * 0.00005 + expect(result.output).toBeCloseTo(0.01075, 6); + }); + + test("tiered pricing with context1mApplied", () => { + const result = calculateRequestCostBreakdown( + { + input_tokens: 300000, // crosses 200k threshold + output_tokens: 100, + }, + makePriceData(), + true // context1mApplied + ); + + // input: 200000 * 0.000003 + 100000 * 0.000003 * 2.0 = 0.6 + 0.6 = 1.2 + expect(result.input).toBeCloseTo(1.2, 4); + // output: 100 tokens, below 200k threshold + expect(result.output).toBeCloseTo(0.0015, 6); + }); + + test("200k tier pricing (Gemini style)", () => { + const result = calculateRequestCostBreakdown( + { + input_tokens: 300000, // crosses 200k threshold + output_tokens: 100, + }, + makePriceData({ + input_cost_per_token_above_200k_tokens: 0.000006, // 2x base for >200k + }) + ); + + // input: 200000 * 0.000003 + 100000 * 0.000006 = 0.6 + 0.6 = 1.2 + expect(result.input).toBeCloseTo(1.2, 4); + }); + + test("categories sum to total", () => { + const result = calculateRequestCostBreakdown( + { + input_tokens: 5000, + output_tokens: 2000, + cache_creation_input_tokens: 1000, + cache_read_input_tokens: 3000, + }, + makePriceData() + ); + + const sum = result.input + result.output + result.cache_creation + result.cache_read; + expect(result.total).toBeCloseTo(sum, 10); + }); + + test("zero usage returns all zeros", () => { + const result = calculateRequestCostBreakdown({}, makePriceData()); + + expect(result).toEqual({ + input: 0, + output: 0, + cache_creation: 0, + cache_read: 0, + total: 0, + }); + }); + + test("per-request cost goes to input bucket", () => { + const result = calculateRequestCostBreakdown( + { input_tokens: 0 }, + makePriceData({ input_cost_per_request: 0.01 }) + ); + + expect(result.input).toBeCloseTo(0.01, 6); + expect(result.total).toBeCloseTo(0.01, 6); + }); + + test("cache_creation_input_tokens distributed by cache_ttl", () => { + // When only cache_creation_input_tokens is set (no 5m/1h split), + // it should be assigned based on cache_ttl + const result = calculateRequestCostBreakdown( + { + input_tokens: 0, + output_tokens: 0, + cache_creation_input_tokens: 1000, + cache_ttl: "1h", + }, + makePriceData({ + cache_creation_input_token_cost_above_1hr: 0.000006, + }) + ); + + // 1000 tokens should go to 1h tier at 0.000006 + expect(result.cache_creation).toBeCloseTo(0.006, 6); + }); +}); diff --git a/tests/unit/proxy/proxy-handler-session-id-error.test.ts b/tests/unit/proxy/proxy-handler-session-id-error.test.ts index d336e0dc4..18062cc79 100644 --- a/tests/unit/proxy/proxy-handler-session-id-error.test.ts +++ b/tests/unit/proxy/proxy-handler-session-id-error.test.ts @@ -13,6 +13,7 @@ const h = vi.hoisted(() => ({ }, isCountTokensRequest: () => false, setOriginalFormat: () => {}, + recordForwardStart: () => {}, messageContext: null, provider: null, } as any,