diff --git a/src/api/providers/__tests__/anthropic.spec.ts b/src/api/providers/__tests__/anthropic.spec.ts index 5c0c1632b4..464eafe7de 100644 --- a/src/api/providers/__tests__/anthropic.spec.ts +++ b/src/api/providers/__tests__/anthropic.spec.ts @@ -741,4 +741,302 @@ describe("AnthropicHandler", () => { }) }) }) + + describe("extended thinking with signature capture", () => { + const systemPrompt = "You are a helpful assistant." + const messages: Anthropic.Messages.MessageParam[] = [ + { + role: "user", + content: [{ type: "text" as const, text: "Think about this carefully" }], + }, + ] + + it("should capture signature_delta and make it available via getThoughtSignature()", async () => { + mockCreate.mockImplementationOnce(async () => ({ + async *[Symbol.asyncIterator]() { + yield { + type: "message_start", + message: { + usage: { + input_tokens: 100, + output_tokens: 50, + }, + }, + } + yield { + type: "content_block_start", + index: 0, + content_block: { + type: "thinking", + thinking: "Let me think...", + }, + } + yield { + type: "content_block_delta", + index: 0, + delta: { + type: "thinking_delta", + thinking: " about this carefully", + }, + } + yield { + type: "content_block_delta", + index: 0, + delta: { + type: "signature_delta", + signature: "test_signature_123", + }, + } + yield { + type: "content_block_stop", + index: 0, + } + yield { + type: "content_block_start", + index: 1, + content_block: { + type: "text", + text: "Here is my response", + }, + } + yield { + type: "content_block_stop", + index: 1, + } + }, + })) + + const stream = handler.createMessage(systemPrompt, messages, { + taskId: "test-task", + }) + + const chunks: any[] = [] + for await (const chunk of stream) { + chunks.push(chunk) + } + + // Verify reasoning chunks were emitted + const reasoningChunks = chunks.filter((chunk) => chunk.type === "reasoning") + expect(reasoningChunks).toHaveLength(2) + expect(reasoningChunks[0].text).toBe("Let me think...") + expect(reasoningChunks[1].text).toBe(" about this carefully") + + // Verify thinking_complete chunk was emitted with signature + const thinkingCompleteChunk = chunks.find((chunk) => chunk.type === "thinking_complete") + expect(thinkingCompleteChunk).toBeDefined() + expect(thinkingCompleteChunk.signature).toBe("test_signature_123") + + // Verify getThoughtSignature() returns the captured signature + expect(handler.getThoughtSignature()).toBe("test_signature_123") + }) + + it("should reset signature for each new request", async () => { + // First request with signature + mockCreate.mockImplementationOnce(async () => ({ + async *[Symbol.asyncIterator]() { + yield { + type: "message_start", + message: { usage: { input_tokens: 100, output_tokens: 50 } }, + } + yield { + type: "content_block_start", + index: 0, + content_block: { type: "thinking", thinking: "First thinking" }, + } + yield { + type: "content_block_delta", + index: 0, + delta: { type: "signature_delta", signature: "first_signature" }, + } + yield { type: "content_block_stop", index: 0 } + }, + })) + + const stream1 = handler.createMessage(systemPrompt, messages, { taskId: "test-task-1" }) + for await (const _chunk of stream1) { + // consume + } + expect(handler.getThoughtSignature()).toBe("first_signature") + + // Second request without signature + mockCreate.mockImplementationOnce(async () => ({ + async *[Symbol.asyncIterator]() { + yield { + type: "message_start", + message: { usage: { input_tokens: 100, output_tokens: 50 } }, + } + yield { + type: "content_block_start", + index: 0, + content_block: { type: "text", text: "Just text, no thinking" }, + } + yield { type: "content_block_stop", index: 0 } + }, + })) + + const stream2 = handler.createMessage(systemPrompt, messages, { taskId: "test-task-2" }) + for await (const _chunk of stream2) { + // consume + } + + // Signature should be reset (undefined) for the new request + expect(handler.getThoughtSignature()).toBeUndefined() + }) + + it("should accumulate signature_delta chunks (incremental signature)", async () => { + mockCreate.mockImplementationOnce(async () => ({ + async *[Symbol.asyncIterator]() { + yield { + type: "message_start", + message: { usage: { input_tokens: 100, output_tokens: 50 } }, + } + yield { + type: "content_block_start", + index: 0, + content_block: { type: "thinking", thinking: "Thinking..." }, + } + yield { + type: "content_block_delta", + index: 0, + delta: { type: "signature_delta", signature: "sig_part1" }, + } + yield { + type: "content_block_delta", + index: 0, + delta: { type: "signature_delta", signature: "_part2" }, + } + yield { + type: "content_block_delta", + index: 0, + delta: { type: "signature_delta", signature: "_part3" }, + } + yield { type: "content_block_stop", index: 0 } + }, + })) + + const stream = handler.createMessage(systemPrompt, messages, { taskId: "test-task" }) + + const chunks: any[] = [] + for await (const chunk of stream) { + chunks.push(chunk) + } + + // Verify the accumulated signature + expect(handler.getThoughtSignature()).toBe("sig_part1_part2_part3") + + // Verify thinking_complete has the accumulated signature + const thinkingCompleteChunk = chunks.find((chunk) => chunk.type === "thinking_complete") + expect(thinkingCompleteChunk?.signature).toBe("sig_part1_part2_part3") + }) + + it("should not emit thinking_complete if no signature is captured", async () => { + mockCreate.mockImplementationOnce(async () => ({ + async *[Symbol.asyncIterator]() { + yield { + type: "message_start", + message: { usage: { input_tokens: 100, output_tokens: 50 } }, + } + yield { + type: "content_block_start", + index: 0, + content_block: { type: "thinking", thinking: "Thinking without signature" }, + } + yield { + type: "content_block_delta", + index: 0, + delta: { type: "thinking_delta", thinking: "More thinking" }, + } + // No signature_delta event + yield { type: "content_block_stop", index: 0 } + }, + })) + + const stream = handler.createMessage(systemPrompt, messages, { taskId: "test-task" }) + + const chunks: any[] = [] + for await (const chunk of stream) { + chunks.push(chunk) + } + + // Verify thinking_complete was NOT emitted (no signature) + const thinkingCompleteChunk = chunks.find((chunk) => chunk.type === "thinking_complete") + expect(thinkingCompleteChunk).toBeUndefined() + + // Verify getThoughtSignature() returns undefined + expect(handler.getThoughtSignature()).toBeUndefined() + }) + + it("should handle interleaved thinking with tool use", async () => { + mockCreate.mockImplementationOnce(async () => ({ + async *[Symbol.asyncIterator]() { + yield { + type: "message_start", + message: { usage: { input_tokens: 100, output_tokens: 50 } }, + } + // First: thinking block + yield { + type: "content_block_start", + index: 0, + content_block: { type: "thinking", thinking: "Let me think about what tool to use" }, + } + yield { + type: "content_block_delta", + index: 0, + delta: { type: "signature_delta", signature: "thinking_signature_abc" }, + } + yield { type: "content_block_stop", index: 0 } + // Second: tool use block + yield { + type: "content_block_start", + index: 1, + content_block: { + type: "tool_use", + id: "toolu_456", + name: "get_weather", + }, + } + yield { + type: "content_block_delta", + index: 1, + delta: { + type: "input_json_delta", + partial_json: '{"location":"Paris"}', + }, + } + yield { type: "content_block_stop", index: 1 } + }, + })) + + const stream = handler.createMessage(systemPrompt, messages, { + taskId: "test-task", + tools: [ + { + type: "function" as const, + function: { + name: "get_weather", + description: "Get weather", + parameters: { type: "object", properties: { location: { type: "string" } } }, + }, + }, + ], + }) + + const chunks: any[] = [] + for await (const chunk of stream) { + chunks.push(chunk) + } + + // Verify thinking_complete was emitted for the thinking block + const thinkingCompleteChunk = chunks.find((chunk) => chunk.type === "thinking_complete") + expect(thinkingCompleteChunk).toBeDefined() + expect(thinkingCompleteChunk.signature).toBe("thinking_signature_abc") + + // Verify signature is available for tool use continuation + expect(handler.getThoughtSignature()).toBe("thinking_signature_abc") + + // Verify tool_call_partial was also emitted + const toolChunks = chunks.filter((chunk) => chunk.type === "tool_call_partial") + expect(toolChunks.length).toBeGreaterThan(0) + }) + }) }) diff --git a/src/api/providers/anthropic.ts b/src/api/providers/anthropic.ts index 4faf341d28..16cfedd588 100644 --- a/src/api/providers/anthropic.ts +++ b/src/api/providers/anthropic.ts @@ -34,6 +34,12 @@ export class AnthropicHandler extends BaseProvider implements SingleCompletionHa private options: ApiHandlerOptions private client: Anthropic private readonly providerName = "Anthropic" + /** + * Store the last thinking block signature for interleaved thinking with tool use. + * This is captured from signature_delta events during streaming and + * must be passed back to the API when providing tool results. + */ + private lastThinkingSignature?: string constructor(options: ApiHandlerOptions) { super() @@ -48,11 +54,23 @@ export class AnthropicHandler extends BaseProvider implements SingleCompletionHa }) } + /** + * Get the thinking signature from the last response. + * Used by Task.addToApiConversationHistory to persist the signature + * so it can be passed back to the API for tool use continuations. + */ + public getThoughtSignature(): string | undefined { + return this.lastThinkingSignature + } + async *createMessage( systemPrompt: string, messages: Anthropic.Messages.MessageParam[], metadata?: ApiHandlerCreateMessageMetadata, ): ApiStream { + // Reset per-request state + this.lastThinkingSignature = undefined + let stream: AnthropicStream const cacheControl: CacheControlEphemeral = { type: "ephemeral" } let { @@ -220,6 +238,10 @@ export class AnthropicHandler extends BaseProvider implements SingleCompletionHa let cacheWriteTokens = 0 let cacheReadTokens = 0 + // Track thinking blocks by index to capture text and signature for interleaved thinking + // This is critical for tool use continuations where thinking blocks must be passed back + const thinkingBlocks: Map = new Map() + for await (const chunk of stream) { switch (chunk.type) { case "message_start": { @@ -262,6 +284,11 @@ export class AnthropicHandler extends BaseProvider implements SingleCompletionHa case "content_block_start": switch (chunk.content_block.type) { case "thinking": + // Initialize thinking block tracking for interleaved thinking + thinkingBlocks.set(chunk.index, { + text: chunk.content_block.thinking || "", + }) + // We may receive multiple text blocks, in which // case just insert a line break between them. if (chunk.index > 0) { @@ -292,13 +319,37 @@ export class AnthropicHandler extends BaseProvider implements SingleCompletionHa } } break - case "content_block_delta": - switch (chunk.delta.type) { - case "thinking_delta": - yield { type: "reasoning", text: chunk.delta.thinking } + case "content_block_delta": { + const delta = chunk.delta as { + type: string + thinking?: string + signature?: string + text?: string + partial_json?: string + } + switch (delta.type) { + case "thinking_delta": { + // Accumulate thinking text for the block + const block = thinkingBlocks.get(chunk.index) + if (block && delta.thinking) { + block.text += delta.thinking + } + yield { type: "reasoning", text: delta.thinking || "" } break + } + case "signature_delta": { + // Capture signature for interleaved thinking with tool use + // This signature must be passed back to the API for tool use continuations + const block = thinkingBlocks.get(chunk.index) + if (block && delta.signature) { + block.signature = (block.signature || "") + delta.signature + // Store the last signature for retrieval via getThoughtSignature() + this.lastThinkingSignature = block.signature + } + break + } case "text_delta": - yield { type: "text", text: chunk.delta.text } + yield { type: "text", text: delta.text || "" } break case "input_json_delta": { // Emit tool call partial chunks as arguments stream in @@ -307,19 +358,27 @@ export class AnthropicHandler extends BaseProvider implements SingleCompletionHa index: chunk.index, id: undefined, name: undefined, - arguments: chunk.delta.partial_json, + arguments: delta.partial_json, } break } } break - case "content_block_stop": - // Block complete - no action needed for now. + } + case "content_block_stop": { + // Emit thinking_complete when a thinking block finishes with its signature + // This is critical for tool use continuations in interleaved thinking + const completedBlock = thinkingBlocks.get(chunk.index) + if (completedBlock?.signature) { + yield { + type: "thinking_complete", + signature: completedBlock.signature, + } + } // NativeToolCallParser handles tool call completion - // Note: Signature for multi-turn thinking would require using stream.finalMessage() - // after iteration completes, which requires restructuring the streaming approach. break + } } }