Conversation
The previous implementation extracted output_tokens from message_start event which appears at the beginning of the SSE stream, resulting in incorrect (usually 1) output token counts since the main content hadn't been generated yet. This fix: - Extracts input tokens and cache fields (5m/1h) from message_start - Extracts final output_tokens from message_delta (at stream end) - Merges both to get accurate usage metrics Closes #312 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Summary of ChangesHello @ding113, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses an issue where Anthropic SSE streaming responses were reporting incorrect output token counts due to premature extraction from the Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request correctly fixes an issue with output token counting for Anthropic SSE streams by extracting usage data from message_delta events. The implementation is sound. I've added one suggestion to refactor the logic to make it more robust and maintainable. By explicitly checking the providerType instead of implicitly detecting the provider based on SSE event names, the code becomes easier to understand and less prone to future bugs if other providers adopt similar event names.
| const events = parseSSEData(responseText); | ||
| for (const event of events) { | ||
| if (usageMetrics) { | ||
| break; | ||
| } | ||
|
|
||
| // Claude SSE 特殊处理: | ||
| // - message_start 包含 input tokens 和缓存创建字段(5m/1h 区分计费) | ||
| // - message_delta 包含最终的 output_tokens | ||
| // 需要分别提取并合并 | ||
| let messageStartUsage: UsageMetrics | null = null; | ||
| let messageDeltaOutputTokens: number | null = null; | ||
|
|
||
| for (const event of events) { | ||
| if (typeof event.data !== "object" || !event.data) { | ||
| continue; | ||
| } | ||
|
|
||
| const data = event.data as Record<string, unknown>; | ||
|
|
||
| // Claude message_start format: data.message.usage (preferred) | ||
| if (data.message && typeof data.message === "object") { | ||
| // Claude message_start format: data.message.usage | ||
| // 提取 input tokens 和缓存字段 | ||
| if (event.event === "message_start" && data.message && typeof data.message === "object") { | ||
| const messageObj = data.message as Record<string, unknown>; | ||
| applyUsageValue(messageObj.usage, `sse.${event.event}.message.usage`); | ||
| if (messageObj.usage && typeof messageObj.usage === "object") { | ||
| const extracted = extractUsageMetrics(messageObj.usage); | ||
| if (extracted) { | ||
| messageStartUsage = extracted; | ||
| logger.debug("[ResponseHandler] Extracted usage from message_start", { | ||
| source: "sse.message_start.message.usage", | ||
| usage: extracted, | ||
| }); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Claude message_delta format: data.usage.output_tokens | ||
| // 提取最终的 output_tokens(在流结束时) | ||
| if (event.event === "message_delta" && data.usage && typeof data.usage === "object") { | ||
| const deltaUsage = data.usage as Record<string, unknown>; | ||
| if (typeof deltaUsage.output_tokens === "number") { | ||
| messageDeltaOutputTokens = deltaUsage.output_tokens; | ||
| logger.debug("[ResponseHandler] Extracted output_tokens from message_delta", { | ||
| source: "sse.message_delta.usage.output_tokens", | ||
| outputTokens: messageDeltaOutputTokens, | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| // Fallback: Standard usage fields (data.usage) | ||
| applyUsageValue(data.usage, `sse.${event.event}.usage`); | ||
| // 非 Claude 格式的 SSE 处理(Gemini 等) | ||
| if (!messageStartUsage && !messageDeltaOutputTokens) { | ||
| // Standard usage fields (data.usage) | ||
| applyUsageValue(data.usage, `sse.${event.event}.usage`); | ||
|
|
||
| // Gemini usageMetadata | ||
| applyUsageValue(data.usageMetadata, `sse.${event.event}.usageMetadata`); | ||
|
|
||
| // Gemini usageMetadata | ||
| applyUsageValue(data.usageMetadata, `sse.${event.event}.usageMetadata`); | ||
| // Handle response wrapping in SSE | ||
| if (!usageMetrics && data.response && typeof data.response === "object") { | ||
| const responseObj = data.response as Record<string, unknown>; | ||
| applyUsageValue(responseObj.usage, `sse.${event.event}.response.usage`); | ||
| applyUsageValue(responseObj.usageMetadata, `sse.${event.event}.response.usageMetadata`); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Handle response wrapping in SSE | ||
| if (!usageMetrics && data.response && typeof data.response === "object") { | ||
| const responseObj = data.response as Record<string, unknown>; | ||
| applyUsageValue(responseObj.usage, `sse.${event.event}.response.usage`); | ||
| applyUsageValue(responseObj.usageMetadata, `sse.${event.event}.response.usageMetadata`); | ||
| // 合并 Claude SSE 的 message_start 和 message_delta 数据 | ||
| if (messageStartUsage) { | ||
| // 使用 message_delta 中的 output_tokens 覆盖 message_start 中的值 | ||
| if (messageDeltaOutputTokens !== null) { | ||
| messageStartUsage.output_tokens = messageDeltaOutputTokens; | ||
| logger.debug( | ||
| "[ResponseHandler] Merged output_tokens from message_delta into message_start usage", | ||
| { | ||
| finalOutputTokens: messageDeltaOutputTokens, | ||
| } | ||
| ); | ||
| } | ||
| usageMetrics = adjustUsageForProviderType(messageStartUsage, providerType); | ||
| usageRecord = messageStartUsage as unknown as Record<string, unknown>; | ||
| logger.debug("[ResponseHandler] Final merged usage from Claude SSE", { | ||
| providerType, | ||
| usage: usageMetrics, | ||
| }); | ||
| } |
There was a problem hiding this comment.
The current implementation correctly handles the Anthropic SSE stream parsing. However, it implicitly detects a Claude stream by checking for message_start or message_delta events. This could be brittle if other providers adopt similar event names in the future.
A more robust and maintainable approach would be to explicitly check the providerType to distinguish between Claude/Anthropic streams and other providers like Gemini. This makes the separation of parsing logic clearer and safer.
I suggest refactoring this block to have a clear if/else based on providerType. This also allows us to restore the more efficient break statement for non-Claude providers.
const events = parseSSEData(responseText);
if (providerType === "claude" || providerType === "claude-auth") {
// Claude SSE 特殊处理:
// - message_start 包含 input tokens 和缓存创建字段(5m/1h 区分计费)
// - message_delta 包含最终的 output_tokens
// 需要分别提取并合并
let messageStartUsage: UsageMetrics | null = null;
let messageDeltaOutputTokens: number | null = null;
for (const event of events) {
if (typeof event.data !== "object" || !event.data) {
continue;
}
const data = event.data as Record<string, unknown>;
// Claude message_start format: data.message.usage
// 提取 input tokens 和缓存字段
if (event.event === "message_start" && data.message && typeof data.message === "object") {
const messageObj = data.message as Record<string, unknown>;
if (messageObj.usage && typeof messageObj.usage === "object") {
const extracted = extractUsageMetrics(messageObj.usage);
if (extracted) {
messageStartUsage = extracted;
logger.debug("[ResponseHandler] Extracted usage from message_start", {
source: "sse.message_start.message.usage",
usage: extracted,
});
}
}
}
// Claude message_delta format: data.usage.output_tokens
// 提取最终的 output_tokens(在流结束时)
if (event.event === "message_delta" && data.usage && typeof data.usage === "object") {
const deltaUsage = data.usage as Record<string, unknown>;
if (typeof deltaUsage.output_tokens === "number") {
messageDeltaOutputTokens = deltaUsage.output_tokens;
logger.debug("[ResponseHandler] Extracted output_tokens from message_delta", {
source: "sse.message_delta.usage.output_tokens",
outputTokens: messageDeltaOutputTokens,
});
}
}
}
// 合并 Claude SSE 的 message_start 和 message_delta 数据
if (messageStartUsage) {
// 使用 message_delta 中的 output_tokens 覆盖 message_start 中的值
if (messageDeltaOutputTokens !== null) {
messageStartUsage.output_tokens = messageDeltaOutputTokens;
logger.debug(
"[ResponseHandler] Merged output_tokens from message_delta into message_start usage",
{
finalOutputTokens: messageDeltaOutputTokens,
}
);
}
usageMetrics = adjustUsageForProviderType(messageStartUsage, providerType);
usageRecord = messageStartUsage as unknown as Record<string, unknown>;
logger.debug("[ResponseHandler] Final merged usage from Claude SSE", {
providerType,
usage: usageMetrics,
});
}
} else {
// 非 Claude 格式的 SSE 处理(Gemini 等)
for (const event of events) {
if (usageMetrics) {
break;
}
if (typeof event.data !== "object" || !event.data) {
continue;
}
const data = event.data as Record<string, unknown>;
// Standard usage fields (data.usage)
applyUsageValue(data.usage, `sse.${event.event}.usage`);
// Gemini usageMetadata
applyUsageValue(data.usageMetadata, `sse.${event.event}.usageMetadata`);
// Handle response wrapping in SSE
if (!usageMetrics && data.response && typeof data.response === "object") {
const responseObj = data.response as Record<string, unknown>;
applyUsageValue(responseObj.usage, `sse.${event.event}.response.usage`);
applyUsageValue(responseObj.usageMetadata, `sse.${event.event}.response.usageMetadata`);
}
}
}There was a problem hiding this comment.
Code Review Summary
No significant issues identified in this PR. The implementation correctly addresses the bug where output_tokens was incorrectly extracted from message_start (which contains the initial/low value) instead of message_delta (which contains the final accurate count).
PR Size: XS
- Lines changed: 82 (66 additions, 16 deletions)
- Files changed: 1
Analysis Notes
The refactored parseUsageFromResponseText function now:
- ✅ Correctly separates Claude SSE handling from other formats (Gemini, etc.)
- ✅ Extracts
input_tokensand cache fields frommessage_startevent - ✅ Extracts final
output_tokensfrommessage_deltaevent - ✅ Merges both metrics appropriately
- ✅ Maintains backward compatibility with non-Claude SSE formats
The logic gate if (!messageStartUsage && !messageDeltaOutputTokens) correctly ensures non-Claude SSE processing only occurs when no Claude-specific events are detected, preventing format conflicts.
Review Coverage
- Logic and correctness - Clean
- Security (OWASP Top 10) - Clean
- Error handling - Clean (appropriate debug logging added)
- Type safety - Clean
- Documentation accuracy - Clean (comments match implementation)
- Test coverage - No automated tests for this function (pre-existing gap, not introduced by this PR)
- Code clarity - Good (well-named variables, clear separation of concerns)
Recommendation
✅ Approve - The implementation is sound. Manual testing with real Anthropic SSE streams is recommended before deployment as noted in the PR description.
Automated review by Claude AI
There was a problem hiding this comment.
Code Review Summary
No significant issues identified in this PR. The implementation correctly addresses the bug where output_tokens was incorrectly extracted from message_start (which contains the initial/low value) instead of message_delta (which contains the final accurate count).
PR Size: XS
- Lines changed: 82 (66 additions, 16 deletions)
- Files changed: 1
Analysis Notes
The refactored parseUsageFromResponseText function now:
- Correctly separates Claude SSE handling from other formats (Gemini, etc.)
- Extracts
input_tokensand cache fields frommessage_startevent - Extracts final
output_tokensfrommessage_deltaevent - Merges both metrics appropriately
- Maintains backward compatibility with non-Claude SSE formats
The logic gate if (!messageStartUsage && !messageDeltaOutputTokens) correctly ensures non-Claude SSE processing only occurs when no Claude-specific events are detected, preventing format conflicts.
Review Coverage
- Logic and correctness - Clean
- Security (OWASP Top 10) - Clean
- Error handling - Clean (appropriate debug logging added)
- Type safety - Clean
- Documentation accuracy - Clean (comments match implementation)
- Test coverage - No automated tests for this function (pre-existing gap, not introduced by this PR)
- Code clarity - Good (well-named variables, clear separation of concerns)
Recommendation
Approve - The implementation is sound. Manual testing with real Anthropic SSE streams is recommended before deployment as noted in the PR description.
Automated review by Claude AI
Summary
output_tokensfrommessage_deltaevent (at stream end) instead ofmessage_start(at stream beginning)message_startProblem
Fixes #312
The v0.3.27 update introduced differentiated cache billing (5m/1h), which changed the usage extraction logic to use
message_start. However,message_startappears at the beginning of the stream whenoutput_tokensis typically 1 (or very low), before the actual response content is generated.Example
message_start(incorrect source for output_tokens):{"type":"message_start","message":{"usage":{"input_tokens":8,"output_tokens":1,...}}}Example
message_delta(correct source for final output_tokens):{"type":"message_delta","delta":{"stop_reason":"tool_use"},"usage":{"output_tokens":356}}Solution
Modified
parseUsageFromResponseTextinsrc/app/v1/_lib/proxy/response-handler.tsto:message_startevent (required for 5m/1h cache billing)output_tokensfrommessage_deltaevent (appears at stream end with accurate count)This approach preserves the 5m/1h cache differentiation feature while fixing the output token counting bug.
Changes
src/app/v1/_lib/proxy/response-handler.ts: Rewrote SSE parsing logic for Claude format to handle message_start and message_delta separately, then merge the resultsTesting
Created by Claude AI in response to @claude mention