From f57f54f35c06b6a0909962d3402eac7095ace371 Mon Sep 17 00:00:00 2001 From: daniel-lxs Date: Fri, 26 Dec 2025 14:33:44 -0500 Subject: [PATCH] fix: capture extended thinking signatures for tool use continuations Fixes ROO-312 When extended thinking is enabled, Anthropic's API returns signature_delta events that contain cryptographic signatures for thinking blocks. These signatures are required when passing thinking blocks back to the API for tool use continuations. Without the signature, thinking blocks were stored as generic 'reasoning' blocks which get filtered out by anthropic-filter.ts, causing the API validation error: 'Expected thinking or redacted_thinking, but found tool_use' Changes: - Add lastThinkingSignature property to store captured signature - Add getThoughtSignature() method to expose signature to Task.ts - Handle signature_delta events in content_block_delta - Emit thinking_complete chunk when thinking block finishes with signature - Add comprehensive tests for signature capture functionality --- src/api/providers/__tests__/anthropic.spec.ts | 298 ++++++++++++++++++ src/api/providers/anthropic.ts | 79 ++++- 2 files changed, 367 insertions(+), 10 deletions(-) diff --git a/src/api/providers/__tests__/anthropic.spec.ts b/src/api/providers/__tests__/anthropic.spec.ts index 5c0c1632b4c..464eafe7de0 100644 --- a/src/api/providers/__tests__/anthropic.spec.ts +++ b/src/api/providers/__tests__/anthropic.spec.ts @@ -741,4 +741,302 @@ describe("AnthropicHandler", () => { }) }) }) + + describe("extended thinking with signature capture", () => { + const systemPrompt = "You are a helpful assistant." + const messages: Anthropic.Messages.MessageParam[] = [ + { + role: "user", + content: [{ type: "text" as const, text: "Think about this carefully" }], + }, + ] + + it("should capture signature_delta and make it available via getThoughtSignature()", async () => { + mockCreate.mockImplementationOnce(async () => ({ + async *[Symbol.asyncIterator]() { + yield { + type: "message_start", + message: { + usage: { + input_tokens: 100, + output_tokens: 50, + }, + }, + } + yield { + type: "content_block_start", + index: 0, + content_block: { + type: "thinking", + thinking: "Let me think...", + }, + } + yield { + type: "content_block_delta", + index: 0, + delta: { + type: "thinking_delta", + thinking: " about this carefully", + }, + } + yield { + type: "content_block_delta", + index: 0, + delta: { + type: "signature_delta", + signature: "test_signature_123", + }, + } + yield { + type: "content_block_stop", + index: 0, + } + yield { + type: "content_block_start", + index: 1, + content_block: { + type: "text", + text: "Here is my response", + }, + } + yield { + type: "content_block_stop", + index: 1, + } + }, + })) + + const stream = handler.createMessage(systemPrompt, messages, { + taskId: "test-task", + }) + + const chunks: any[] = [] + for await (const chunk of stream) { + chunks.push(chunk) + } + + // Verify reasoning chunks were emitted + const reasoningChunks = chunks.filter((chunk) => chunk.type === "reasoning") + expect(reasoningChunks).toHaveLength(2) + expect(reasoningChunks[0].text).toBe("Let me think...") + expect(reasoningChunks[1].text).toBe(" about this carefully") + + // Verify thinking_complete chunk was emitted with signature + const thinkingCompleteChunk = chunks.find((chunk) => chunk.type === "thinking_complete") + expect(thinkingCompleteChunk).toBeDefined() + expect(thinkingCompleteChunk.signature).toBe("test_signature_123") + + // Verify getThoughtSignature() returns the captured signature + expect(handler.getThoughtSignature()).toBe("test_signature_123") + }) + + it("should reset signature for each new request", async () => { + // First request with signature + mockCreate.mockImplementationOnce(async () => ({ + async *[Symbol.asyncIterator]() { + yield { + type: "message_start", + message: { usage: { input_tokens: 100, output_tokens: 50 } }, + } + yield { + type: "content_block_start", + index: 0, + content_block: { type: "thinking", thinking: "First thinking" }, + } + yield { + type: "content_block_delta", + index: 0, + delta: { type: "signature_delta", signature: "first_signature" }, + } + yield { type: "content_block_stop", index: 0 } + }, + })) + + const stream1 = handler.createMessage(systemPrompt, messages, { taskId: "test-task-1" }) + for await (const _chunk of stream1) { + // consume + } + expect(handler.getThoughtSignature()).toBe("first_signature") + + // Second request without signature + mockCreate.mockImplementationOnce(async () => ({ + async *[Symbol.asyncIterator]() { + yield { + type: "message_start", + message: { usage: { input_tokens: 100, output_tokens: 50 } }, + } + yield { + type: "content_block_start", + index: 0, + content_block: { type: "text", text: "Just text, no thinking" }, + } + yield { type: "content_block_stop", index: 0 } + }, + })) + + const stream2 = handler.createMessage(systemPrompt, messages, { taskId: "test-task-2" }) + for await (const _chunk of stream2) { + // consume + } + + // Signature should be reset (undefined) for the new request + expect(handler.getThoughtSignature()).toBeUndefined() + }) + + it("should accumulate signature_delta chunks (incremental signature)", async () => { + mockCreate.mockImplementationOnce(async () => ({ + async *[Symbol.asyncIterator]() { + yield { + type: "message_start", + message: { usage: { input_tokens: 100, output_tokens: 50 } }, + } + yield { + type: "content_block_start", + index: 0, + content_block: { type: "thinking", thinking: "Thinking..." }, + } + yield { + type: "content_block_delta", + index: 0, + delta: { type: "signature_delta", signature: "sig_part1" }, + } + yield { + type: "content_block_delta", + index: 0, + delta: { type: "signature_delta", signature: "_part2" }, + } + yield { + type: "content_block_delta", + index: 0, + delta: { type: "signature_delta", signature: "_part3" }, + } + yield { type: "content_block_stop", index: 0 } + }, + })) + + const stream = handler.createMessage(systemPrompt, messages, { taskId: "test-task" }) + + const chunks: any[] = [] + for await (const chunk of stream) { + chunks.push(chunk) + } + + // Verify the accumulated signature + expect(handler.getThoughtSignature()).toBe("sig_part1_part2_part3") + + // Verify thinking_complete has the accumulated signature + const thinkingCompleteChunk = chunks.find((chunk) => chunk.type === "thinking_complete") + expect(thinkingCompleteChunk?.signature).toBe("sig_part1_part2_part3") + }) + + it("should not emit thinking_complete if no signature is captured", async () => { + mockCreate.mockImplementationOnce(async () => ({ + async *[Symbol.asyncIterator]() { + yield { + type: "message_start", + message: { usage: { input_tokens: 100, output_tokens: 50 } }, + } + yield { + type: "content_block_start", + index: 0, + content_block: { type: "thinking", thinking: "Thinking without signature" }, + } + yield { + type: "content_block_delta", + index: 0, + delta: { type: "thinking_delta", thinking: "More thinking" }, + } + // No signature_delta event + yield { type: "content_block_stop", index: 0 } + }, + })) + + const stream = handler.createMessage(systemPrompt, messages, { taskId: "test-task" }) + + const chunks: any[] = [] + for await (const chunk of stream) { + chunks.push(chunk) + } + + // Verify thinking_complete was NOT emitted (no signature) + const thinkingCompleteChunk = chunks.find((chunk) => chunk.type === "thinking_complete") + expect(thinkingCompleteChunk).toBeUndefined() + + // Verify getThoughtSignature() returns undefined + expect(handler.getThoughtSignature()).toBeUndefined() + }) + + it("should handle interleaved thinking with tool use", async () => { + mockCreate.mockImplementationOnce(async () => ({ + async *[Symbol.asyncIterator]() { + yield { + type: "message_start", + message: { usage: { input_tokens: 100, output_tokens: 50 } }, + } + // First: thinking block + yield { + type: "content_block_start", + index: 0, + content_block: { type: "thinking", thinking: "Let me think about what tool to use" }, + } + yield { + type: "content_block_delta", + index: 0, + delta: { type: "signature_delta", signature: "thinking_signature_abc" }, + } + yield { type: "content_block_stop", index: 0 } + // Second: tool use block + yield { + type: "content_block_start", + index: 1, + content_block: { + type: "tool_use", + id: "toolu_456", + name: "get_weather", + }, + } + yield { + type: "content_block_delta", + index: 1, + delta: { + type: "input_json_delta", + partial_json: '{"location":"Paris"}', + }, + } + yield { type: "content_block_stop", index: 1 } + }, + })) + + const stream = handler.createMessage(systemPrompt, messages, { + taskId: "test-task", + tools: [ + { + type: "function" as const, + function: { + name: "get_weather", + description: "Get weather", + parameters: { type: "object", properties: { location: { type: "string" } } }, + }, + }, + ], + }) + + const chunks: any[] = [] + for await (const chunk of stream) { + chunks.push(chunk) + } + + // Verify thinking_complete was emitted for the thinking block + const thinkingCompleteChunk = chunks.find((chunk) => chunk.type === "thinking_complete") + expect(thinkingCompleteChunk).toBeDefined() + expect(thinkingCompleteChunk.signature).toBe("thinking_signature_abc") + + // Verify signature is available for tool use continuation + expect(handler.getThoughtSignature()).toBe("thinking_signature_abc") + + // Verify tool_call_partial was also emitted + const toolChunks = chunks.filter((chunk) => chunk.type === "tool_call_partial") + expect(toolChunks.length).toBeGreaterThan(0) + }) + }) }) diff --git a/src/api/providers/anthropic.ts b/src/api/providers/anthropic.ts index 4faf341d28f..16cfedd5889 100644 --- a/src/api/providers/anthropic.ts +++ b/src/api/providers/anthropic.ts @@ -34,6 +34,12 @@ export class AnthropicHandler extends BaseProvider implements SingleCompletionHa private options: ApiHandlerOptions private client: Anthropic private readonly providerName = "Anthropic" + /** + * Store the last thinking block signature for interleaved thinking with tool use. + * This is captured from signature_delta events during streaming and + * must be passed back to the API when providing tool results. + */ + private lastThinkingSignature?: string constructor(options: ApiHandlerOptions) { super() @@ -48,11 +54,23 @@ export class AnthropicHandler extends BaseProvider implements SingleCompletionHa }) } + /** + * Get the thinking signature from the last response. + * Used by Task.addToApiConversationHistory to persist the signature + * so it can be passed back to the API for tool use continuations. + */ + public getThoughtSignature(): string | undefined { + return this.lastThinkingSignature + } + async *createMessage( systemPrompt: string, messages: Anthropic.Messages.MessageParam[], metadata?: ApiHandlerCreateMessageMetadata, ): ApiStream { + // Reset per-request state + this.lastThinkingSignature = undefined + let stream: AnthropicStream const cacheControl: CacheControlEphemeral = { type: "ephemeral" } let { @@ -220,6 +238,10 @@ export class AnthropicHandler extends BaseProvider implements SingleCompletionHa let cacheWriteTokens = 0 let cacheReadTokens = 0 + // Track thinking blocks by index to capture text and signature for interleaved thinking + // This is critical for tool use continuations where thinking blocks must be passed back + const thinkingBlocks: Map = new Map() + for await (const chunk of stream) { switch (chunk.type) { case "message_start": { @@ -262,6 +284,11 @@ export class AnthropicHandler extends BaseProvider implements SingleCompletionHa case "content_block_start": switch (chunk.content_block.type) { case "thinking": + // Initialize thinking block tracking for interleaved thinking + thinkingBlocks.set(chunk.index, { + text: chunk.content_block.thinking || "", + }) + // We may receive multiple text blocks, in which // case just insert a line break between them. if (chunk.index > 0) { @@ -292,13 +319,37 @@ export class AnthropicHandler extends BaseProvider implements SingleCompletionHa } } break - case "content_block_delta": - switch (chunk.delta.type) { - case "thinking_delta": - yield { type: "reasoning", text: chunk.delta.thinking } + case "content_block_delta": { + const delta = chunk.delta as { + type: string + thinking?: string + signature?: string + text?: string + partial_json?: string + } + switch (delta.type) { + case "thinking_delta": { + // Accumulate thinking text for the block + const block = thinkingBlocks.get(chunk.index) + if (block && delta.thinking) { + block.text += delta.thinking + } + yield { type: "reasoning", text: delta.thinking || "" } break + } + case "signature_delta": { + // Capture signature for interleaved thinking with tool use + // This signature must be passed back to the API for tool use continuations + const block = thinkingBlocks.get(chunk.index) + if (block && delta.signature) { + block.signature = (block.signature || "") + delta.signature + // Store the last signature for retrieval via getThoughtSignature() + this.lastThinkingSignature = block.signature + } + break + } case "text_delta": - yield { type: "text", text: chunk.delta.text } + yield { type: "text", text: delta.text || "" } break case "input_json_delta": { // Emit tool call partial chunks as arguments stream in @@ -307,19 +358,27 @@ export class AnthropicHandler extends BaseProvider implements SingleCompletionHa index: chunk.index, id: undefined, name: undefined, - arguments: chunk.delta.partial_json, + arguments: delta.partial_json, } break } } break - case "content_block_stop": - // Block complete - no action needed for now. + } + case "content_block_stop": { + // Emit thinking_complete when a thinking block finishes with its signature + // This is critical for tool use continuations in interleaved thinking + const completedBlock = thinkingBlocks.get(chunk.index) + if (completedBlock?.signature) { + yield { + type: "thinking_complete", + signature: completedBlock.signature, + } + } // NativeToolCallParser handles tool call completion - // Note: Signature for multi-turn thinking would require using stream.finalMessage() - // after iteration completes, which requires restructuring the streaming approach. break + } } }