Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 66 additions & 16 deletions src/app/v1/_lib/proxy/response-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1355,35 +1355,85 @@ function parseUsageFromResponseText(
// 2. 纯 data: 格式 - Gemini
if (!usageMetrics && responseText.includes("data:")) {
const events = parseSSEData(responseText);
for (const event of events) {
if (usageMetrics) {
break;
}

// Claude SSE 特殊处理:
// - message_start 包含 input tokens 和缓存创建字段(5m/1h 区分计费)
// - message_delta 包含最终的 output_tokens
// 需要分别提取并合并
let messageStartUsage: UsageMetrics | null = null;
let messageDeltaOutputTokens: number | null = null;

for (const event of events) {
if (typeof event.data !== "object" || !event.data) {
continue;
}

const data = event.data as Record<string, unknown>;

// Claude message_start format: data.message.usage (preferred)
if (data.message && typeof data.message === "object") {
// Claude message_start format: data.message.usage
// 提取 input tokens 和缓存字段
if (event.event === "message_start" && data.message && typeof data.message === "object") {
const messageObj = data.message as Record<string, unknown>;
applyUsageValue(messageObj.usage, `sse.${event.event}.message.usage`);
if (messageObj.usage && typeof messageObj.usage === "object") {
const extracted = extractUsageMetrics(messageObj.usage);
if (extracted) {
messageStartUsage = extracted;
logger.debug("[ResponseHandler] Extracted usage from message_start", {
source: "sse.message_start.message.usage",
usage: extracted,
});
}
}
}

// Claude message_delta format: data.usage.output_tokens
// 提取最终的 output_tokens(在流结束时)
if (event.event === "message_delta" && data.usage && typeof data.usage === "object") {
const deltaUsage = data.usage as Record<string, unknown>;
if (typeof deltaUsage.output_tokens === "number") {
messageDeltaOutputTokens = deltaUsage.output_tokens;
logger.debug("[ResponseHandler] Extracted output_tokens from message_delta", {
source: "sse.message_delta.usage.output_tokens",
outputTokens: messageDeltaOutputTokens,
});
}
}

// Fallback: Standard usage fields (data.usage)
applyUsageValue(data.usage, `sse.${event.event}.usage`);
// 非 Claude 格式的 SSE 处理(Gemini 等)
if (!messageStartUsage && !messageDeltaOutputTokens) {
// Standard usage fields (data.usage)
applyUsageValue(data.usage, `sse.${event.event}.usage`);

// Gemini usageMetadata
applyUsageValue(data.usageMetadata, `sse.${event.event}.usageMetadata`);

// Gemini usageMetadata
applyUsageValue(data.usageMetadata, `sse.${event.event}.usageMetadata`);
// Handle response wrapping in SSE
if (!usageMetrics && data.response && typeof data.response === "object") {
const responseObj = data.response as Record<string, unknown>;
applyUsageValue(responseObj.usage, `sse.${event.event}.response.usage`);
applyUsageValue(responseObj.usageMetadata, `sse.${event.event}.response.usageMetadata`);
}
}
}

// Handle response wrapping in SSE
if (!usageMetrics && data.response && typeof data.response === "object") {
const responseObj = data.response as Record<string, unknown>;
applyUsageValue(responseObj.usage, `sse.${event.event}.response.usage`);
applyUsageValue(responseObj.usageMetadata, `sse.${event.event}.response.usageMetadata`);
// 合并 Claude SSE 的 message_start 和 message_delta 数据
if (messageStartUsage) {
// 使用 message_delta 中的 output_tokens 覆盖 message_start 中的值
if (messageDeltaOutputTokens !== null) {
messageStartUsage.output_tokens = messageDeltaOutputTokens;
logger.debug(
"[ResponseHandler] Merged output_tokens from message_delta into message_start usage",
{
finalOutputTokens: messageDeltaOutputTokens,
}
);
}
usageMetrics = adjustUsageForProviderType(messageStartUsage, providerType);
usageRecord = messageStartUsage as unknown as Record<string, unknown>;
logger.debug("[ResponseHandler] Final merged usage from Claude SSE", {
providerType,
usage: usageMetrics,
});
}
Comment on lines 1357 to 1437
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation correctly handles the Anthropic SSE stream parsing. However, it implicitly detects a Claude stream by checking for message_start or message_delta events. This could be brittle if other providers adopt similar event names in the future.

A more robust and maintainable approach would be to explicitly check the providerType to distinguish between Claude/Anthropic streams and other providers like Gemini. This makes the separation of parsing logic clearer and safer.

I suggest refactoring this block to have a clear if/else based on providerType. This also allows us to restore the more efficient break statement for non-Claude providers.

    const events = parseSSEData(responseText);

    if (providerType === "claude" || providerType === "claude-auth") {
      // Claude SSE 特殊处理:
      // - message_start 包含 input tokens 和缓存创建字段(5m/1h 区分计费)
      // - message_delta 包含最终的 output_tokens
      // 需要分别提取并合并
      let messageStartUsage: UsageMetrics | null = null;
      let messageDeltaOutputTokens: number | null = null;

      for (const event of events) {
        if (typeof event.data !== "object" || !event.data) {
          continue;
        }

        const data = event.data as Record<string, unknown>;

        // Claude message_start format: data.message.usage
        // 提取 input tokens 和缓存字段
        if (event.event === "message_start" && data.message && typeof data.message === "object") {
          const messageObj = data.message as Record<string, unknown>;
          if (messageObj.usage && typeof messageObj.usage === "object") {
            const extracted = extractUsageMetrics(messageObj.usage);
            if (extracted) {
              messageStartUsage = extracted;
              logger.debug("[ResponseHandler] Extracted usage from message_start", {
                source: "sse.message_start.message.usage",
                usage: extracted,
              });
            }
          }
        }

        // Claude message_delta format: data.usage.output_tokens
        // 提取最终的 output_tokens(在流结束时)
        if (event.event === "message_delta" && data.usage && typeof data.usage === "object") {
          const deltaUsage = data.usage as Record<string, unknown>;
          if (typeof deltaUsage.output_tokens === "number") {
            messageDeltaOutputTokens = deltaUsage.output_tokens;
            logger.debug("[ResponseHandler] Extracted output_tokens from message_delta", {
              source: "sse.message_delta.usage.output_tokens",
              outputTokens: messageDeltaOutputTokens,
            });
          }
        }
      }

      // 合并 Claude SSE 的 message_start 和 message_delta 数据
      if (messageStartUsage) {
        // 使用 message_delta 中的 output_tokens 覆盖 message_start 中的值
        if (messageDeltaOutputTokens !== null) {
          messageStartUsage.output_tokens = messageDeltaOutputTokens;
          logger.debug(
            "[ResponseHandler] Merged output_tokens from message_delta into message_start usage",
            {
              finalOutputTokens: messageDeltaOutputTokens,
            }
          );
        }
        usageMetrics = adjustUsageForProviderType(messageStartUsage, providerType);
        usageRecord = messageStartUsage as unknown as Record<string, unknown>;
        logger.debug("[ResponseHandler] Final merged usage from Claude SSE", {
          providerType,
          usage: usageMetrics,
        });
      }
    } else {
      // 非 Claude 格式的 SSE 处理(Gemini 等)
      for (const event of events) {
        if (usageMetrics) {
          break;
        }

        if (typeof event.data !== "object" || !event.data) {
          continue;
        }

        const data = event.data as Record<string, unknown>;

        // Standard usage fields (data.usage)
        applyUsageValue(data.usage, `sse.${event.event}.usage`);

        // Gemini usageMetadata
        applyUsageValue(data.usageMetadata, `sse.${event.event}.usageMetadata`);

        // Handle response wrapping in SSE
        if (!usageMetrics && data.response && typeof data.response === "object") {
          const responseObj = data.response as Record<string, unknown>;
          applyUsageValue(responseObj.usage, `sse.${event.event}.response.usage`);
          applyUsageValue(responseObj.usageMetadata, `sse.${event.event}.response.usageMetadata`);
        }
      }
    }

}

Expand Down
Loading