Skip to content

Commit

Permalink
anthropic[patch]: add vertex and bedrock support, streamResponseChunk… (
Browse files Browse the repository at this point in the history
#6206)

* anthropic[patch]: add vertex and bedrock support, streamResponseChunks optimize

* yarn.lock and format/lint

* drop bedrock/vertex

---------

Co-authored-by: tofuliang <tofuliang@apifox.com>
Co-authored-by: bracesproul <braceasproul@gmail.com>
  • Loading branch information
3 people authored Jul 26, 2024
1 parent 21f6fd7 commit 4b60a96
Showing 1 changed file with 38 additions and 134 deletions.
172 changes: 38 additions & 134 deletions libs/langchain-anthropic/src/chat_models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,17 @@ import type {
MessageCreateParams,
Tool as AnthropicTool,
} from "@anthropic-ai/sdk/resources/index.mjs";
import { concat } from "@langchain/core/utils/stream";

import {
AnthropicToolsOutputParser,
extractToolCalls,
} from "./output_parsers.js";
import { AnthropicToolResponse } from "./types.js";
import {
AnthropicToolChoice,
AnthropicToolTypes,
handleToolChoice,
} from "./utils.js";
import { AnthropicToolResponse } from "./types.js";

type AnthropicMessage = Anthropic.MessageParam;
type AnthropicMessageCreateParams = Anthropic.MessageCreateParamsNonStreaming;
Expand Down Expand Up @@ -160,18 +160,10 @@ function _makeMessageChunkFromAnthropicEvent(
streamUsage: boolean;
coerceContentToString: boolean;
usageData: { input_tokens: number; output_tokens: number };
toolUse?: {
id: string;
name: string;
};
}
): {
chunk: AIMessageChunk;
usageData: { input_tokens: number; output_tokens: number };
toolUse?: {
id: string;
name: string;
};
} | null {
let usageDataCopy = { ...fields.usageData };

Expand Down Expand Up @@ -241,10 +233,6 @@ function _makeMessageChunkFromAnthropicEvent(
additional_kwargs: {},
}),
usageData: usageDataCopy,
toolUse: {
id: data.content_block.id,
name: data.content_block.name,
},
};
} else if (
data.type === "content_block_delta" &&
Expand Down Expand Up @@ -286,25 +274,27 @@ function _makeMessageChunkFromAnthropicEvent(
}),
usageData: usageDataCopy,
};
} else if (data.type === "content_block_stop" && fields.toolUse) {
// Only yield the ID & name when the tool_use block is complete.
// This is so the names & IDs do not get concatenated.
return {
chunk: new AIMessageChunk({
content: fields.coerceContentToString
? ""
: [
{
id: fields.toolUse.id,
name: fields.toolUse.name,
index: data.index,
type: "input_json_delta",
},
],
additional_kwargs: {},
}),
usageData: usageDataCopy,
};
} else if (
data.type === "content_block_start" &&
data.content_block.type === "text"
) {
const content = data.content_block?.text;
if (content !== undefined) {
return {
chunk: new AIMessageChunk({
content: fields.coerceContentToString
? content
: [
{
index: data.index,
...data.content_block,
},
],
additional_kwargs: {},
}),
usageData: usageDataCopy,
};
}
}

return null;
Expand Down Expand Up @@ -669,60 +659,14 @@ function extractToken(chunk: AIMessageChunk): string | undefined {
return typeof chunk.content[0].input === "string"
? chunk.content[0].input
: JSON.stringify(chunk.content[0].input);
}
return undefined;
}

function extractToolUseContent(
chunk: AIMessageChunk,
concatenatedChunks: AIMessageChunk | undefined
) {
let newConcatenatedChunks = concatenatedChunks;
// Remove `tool_use` content types until the last chunk.
let toolUseContent:
| {
id: string;
type: "tool_use";
name: string;
input: Record<string, unknown>;
}
| undefined;
if (!newConcatenatedChunks) {
newConcatenatedChunks = chunk;
} else {
newConcatenatedChunks = concat(newConcatenatedChunks, chunk);
}
if (
Array.isArray(newConcatenatedChunks.content) &&
newConcatenatedChunks.content.find((c) => c.type === "tool_use")
} else if (
Array.isArray(chunk.content) &&
chunk.content.length >= 1 &&
"text" in chunk.content[0]
) {
try {
const toolUseMsg = newConcatenatedChunks.content.find(
(c) => c.type === "tool_use"
);
if (
!toolUseMsg ||
!("input" in toolUseMsg || "name" in toolUseMsg || "id" in toolUseMsg)
)
return;
const parsedArgs = JSON.parse(toolUseMsg.input);
if (parsedArgs) {
toolUseContent = {
type: "tool_use",
id: toolUseMsg.id,
name: toolUseMsg.name,
input: parsedArgs,
};
}
} catch (_) {
// no-op
}
return chunk.content[0].text;
}

return {
toolUseContent,
concatenatedChunks: newConcatenatedChunks,
};
return undefined;
}

/**
Expand Down Expand Up @@ -814,9 +758,11 @@ export class ChatAnthropicMessages<
fields?.apiKey ??
fields?.anthropicApiKey ??
getEnvironmentVariable("ANTHROPIC_API_KEY");

if (!this.anthropicApiKey) {
throw new Error("Anthropic API key not found");
}
this.clientOptions = fields?.clientOptions ?? {};
/** Keep anthropicApiKey for backwards compatibility */
this.apiKey = this.anthropicApiKey;

Expand All @@ -837,7 +783,6 @@ export class ChatAnthropicMessages<
this.stopSequences = fields?.stopSequences ?? this.stopSequences;

this.streaming = fields?.streaming ?? false;
this.clientOptions = fields?.clientOptions ?? {};
this.streamUsage = fields?.streamUsage ?? this.streamUsage;
}

Expand Down Expand Up @@ -979,16 +924,6 @@ export class ChatAnthropicMessages<
});
let usageData = { input_tokens: 0, output_tokens: 0 };

let concatenatedChunks: AIMessageChunk | undefined;
// Anthropic only yields the tool name and id once, so we need to save those
// so we can yield them with the rest of the tool_use content.
let toolUse:
| {
id: string;
name: string;
}
| undefined;

for await (const data of stream) {
if (options.signal?.aborted) {
stream.controller.abort();
Expand All @@ -999,53 +934,21 @@ export class ChatAnthropicMessages<
streamUsage: !!(this.streamUsage || options.streamUsage),
coerceContentToString,
usageData,
toolUse: toolUse
? {
id: toolUse.id,
name: toolUse.name,
}
: undefined,
});
if (!result) continue;

const {
chunk,
usageData: updatedUsageData,
toolUse: updatedToolUse,
} = result;
const { chunk, usageData: updatedUsageData } = result;

usageData = updatedUsageData;

if (updatedToolUse) {
toolUse = updatedToolUse;
}

const newToolCallChunk = extractToolCallChunk(chunk);
// Maintain concatenatedChunks for accessing the complete `tool_use` content block.
concatenatedChunks = concatenatedChunks
? concat(concatenatedChunks, chunk)
: chunk;

let toolUseContent;
const extractedContent = extractToolUseContent(chunk, concatenatedChunks);
if (extractedContent) {
toolUseContent = extractedContent.toolUseContent;
concatenatedChunks = extractedContent.concatenatedChunks;
}

// Filter partial `tool_use` content, and only add `tool_use` chunks if complete JSON available.
const chunkContent = Array.isArray(chunk.content)
? chunk.content.filter((c) => c.type !== "tool_use")
: chunk.content;
if (Array.isArray(chunkContent) && toolUseContent) {
chunkContent.push(toolUseContent);
}

// Extract the text content token for text field and runManager.
const token = extractToken(chunk);
yield new ChatGenerationChunk({
message: new AIMessageChunk({
content: chunkContent,
// Just yield chunk as it is and tool_use will be concat by BaseChatModel._generateUncached().
content: chunk.content,
additional_kwargs: chunk.additional_kwargs,
tool_call_chunks: newToolCallChunk ? [newToolCallChunk] : undefined,
usage_metadata: chunk.usage_metadata,
Expand Down Expand Up @@ -1163,6 +1066,7 @@ export class ChatAnthropicMessages<
/**
* Creates a streaming request with retry.
* @param request The parameters for creating a completion.
* @param options
* @returns A streaming request.
*/
protected async createStreamWithRetry(
Expand Down Expand Up @@ -1196,11 +1100,11 @@ export class ChatAnthropicMessages<
request: AnthropicMessageCreateParams & Kwargs,
options: AnthropicRequestOptions
): Promise<Anthropic.Message> {
if (!this.apiKey) {
throw new Error("Missing Anthropic API key.");
}
if (!this.batchClient) {
const options = this.apiUrl ? { baseURL: this.apiUrl } : undefined;
if (!this.apiKey) {
throw new Error("Missing Anthropic API key.");
}
this.batchClient = new Anthropic({
...this.clientOptions,
...options,
Expand Down

0 comments on commit 4b60a96

Please sign in to comment.