Skip to content

Commit

Permalink
💾 feat: Anthropic Prompt Caching (#3670)
Browse files Browse the repository at this point in the history
* wip: initial cache control implementation, add typing for transactions handling

* feat: first pass of Anthropic Prompt Caching

* feat: standardize stream usage as pass in when calculating token counts

* feat: Add getCacheMultiplier function to calculate cache multiplier for different valueKeys and cacheTypes

* chore: imports order

* refactor: token usage recording in AnthropicClient, no need to "correct" as we have the correct amount

* feat: more accurate token counting using stream usage data

* feat: Improve token counting accuracy with stream usage data

* refactor: ensure more accurate than not token estimations if custom instructions or files are not being resent with every request

* refactor: cleanup updateUserMessageTokenCount to allow transactions to be as accurate as possible even if we shouldn't update user message token counts

* ci: fix tests
  • Loading branch information
danny-avila committed Aug 17, 2024
1 parent 83ada25 commit 1b716f3
Show file tree
Hide file tree
Showing 17 changed files with 973 additions and 34 deletions.
159 changes: 149 additions & 10 deletions api/app/clients/AnthropicClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ const { encodeAndFormat } = require('~/server/services/Files/images/encode');
const {
truncateText,
formatMessage,
addCacheControl,
titleFunctionPrompt,
parseParamFromPrompt,
createContextHandlers,
} = require('./prompts');
const spendTokens = require('~/models/spendTokens');
const { getModelMaxTokens } = require('~/utils');
const { spendTokens, spendStructuredTokens } = require('~/models/spendTokens');
const { getModelMaxTokens, matchModelName } = require('~/utils');
const { sleep } = require('~/server/utils');
const BaseClient = require('./BaseClient');
const { logger } = require('~/config');
Expand All @@ -32,6 +33,7 @@ function delayBeforeRetry(attempts, baseDelay = 1000) {
return new Promise((resolve) => setTimeout(resolve, baseDelay * attempts));
}

const tokenEventTypes = new Set(['message_start', 'message_delta']);
const { legacy } = anthropicSettings;

class AnthropicClient extends BaseClient {
Expand All @@ -44,6 +46,24 @@ class AnthropicClient extends BaseClient {
? options.contextStrategy.toLowerCase()
: 'discard';
this.setOptions(options);
/** @type {string | undefined} */
this.systemMessage;
/** @type {AnthropicMessageStartEvent| undefined} */
this.message_start;
/** @type {AnthropicMessageDeltaEvent| undefined} */
this.message_delta;
/** Whether the model is part of the Claude 3 Family
* @type {boolean} */
this.isClaude3;
/** Whether to use Messages API or Completions API
* @type {boolean} */
this.useMessages;
/** Whether or not the model is limited to the legacy amount of output tokens
* @type {boolean} */
this.isLegacyOutput;
/** Whether or not the model supports Prompt Caching
* @type {boolean} */
this.supportsCacheControl;
}

setOptions(options) {
Expand All @@ -69,8 +89,10 @@ class AnthropicClient extends BaseClient {
model: modelOptions.model || anthropicSettings.model.default,
};

this.isClaude3 = this.modelOptions.model.includes('claude-3');
this.isLegacyOutput = !this.modelOptions.model.includes('claude-3-5-sonnet');
const modelMatch = matchModelName(this.modelOptions.model, EModelEndpoint.anthropic);
this.isClaude3 = modelMatch.startsWith('claude-3');
this.isLegacyOutput = !modelMatch.startsWith('claude-3-5-sonnet');
this.supportsCacheControl = this.checkPromptCacheSupport(modelMatch);

if (
this.isLegacyOutput &&
Expand Down Expand Up @@ -147,19 +169,74 @@ class AnthropicClient extends BaseClient {
options.baseURL = this.options.reverseProxyUrl;
}

if (requestOptions?.model && requestOptions.model.includes('claude-3-5-sonnet')) {
if (
this.supportsCacheControl &&
requestOptions?.model &&
requestOptions.model.includes('claude-3-5-sonnet')
) {
options.defaultHeaders = {
'anthropic-beta': 'max-tokens-3-5-sonnet-2024-07-15',
'anthropic-beta': 'max-tokens-3-5-sonnet-2024-07-15,prompt-caching-2024-07-31',
};
} else if (this.supportsCacheControl) {
options.defaultHeaders = {
'anthropic-beta': 'prompt-caching-2024-07-31',
};
}

return new Anthropic(options);
}

getTokenCountForResponse(response) {
/**
* Get stream usage as returned by this client's API response.
* @returns {AnthropicStreamUsage} The stream usage object.
*/
getStreamUsage() {
const inputUsage = this.message_start?.message?.usage ?? {};
const outputUsage = this.message_delta?.usage ?? {};
return Object.assign({}, inputUsage, outputUsage);
}

/**
* Calculates the correct token count for the current message based on the token count map and API usage.
* Edge case: If the calculation results in a negative value, it returns the original estimate.
* If revisiting a conversation with a chat history entirely composed of token estimates,
* the cumulative token count going forward should become more accurate as the conversation progresses.
* @param {Object} params - The parameters for the calculation.
* @param {Record<string, number>} params.tokenCountMap - A map of message IDs to their token counts.
* @param {string} params.currentMessageId - The ID of the current message to calculate.
* @param {AnthropicStreamUsage} params.usage - The usage object returned by the API.
* @returns {number} The correct token count for the current message.
*/
calculateCurrentTokenCount({ tokenCountMap, currentMessageId, usage }) {
const originalEstimate = tokenCountMap[currentMessageId] || 0;

if (!usage || typeof usage.input_tokens !== 'number') {
return originalEstimate;
}

tokenCountMap[currentMessageId] = 0;
const totalTokensFromMap = Object.values(tokenCountMap).reduce((sum, count) => {
const numCount = Number(count);
return sum + (isNaN(numCount) ? 0 : numCount);
}, 0);
const totalInputTokens =
(usage.input_tokens ?? 0) +
(usage.cache_creation_input_tokens ?? 0) +
(usage.cache_read_input_tokens ?? 0);

const currentMessageTokens = totalInputTokens - totalTokensFromMap;
return currentMessageTokens > 0 ? currentMessageTokens : originalEstimate;
}

/**
* Get Token Count for LibreChat Message
* @param {TMessage} responseMessage
* @returns {number}
*/
getTokenCountForResponse(responseMessage) {
return this.getTokenCountForMessage({
role: 'assistant',
content: response.text,
content: responseMessage.text,
});
}

Expand Down Expand Up @@ -212,7 +289,38 @@ class AnthropicClient extends BaseClient {
return files;
}

async recordTokenUsage({ promptTokens, completionTokens, model, context = 'message' }) {
/**
* @param {object} params
* @param {number} params.promptTokens
* @param {number} params.completionTokens
* @param {AnthropicStreamUsage} [params.usage]
* @param {string} [params.model]
* @param {string} [params.context='message']
* @returns {Promise<void>}
*/
async recordTokenUsage({ promptTokens, completionTokens, usage, model, context = 'message' }) {
if (usage != null && usage?.input_tokens != null) {
const input = usage.input_tokens ?? 0;
const write = usage.cache_creation_input_tokens ?? 0;
const read = usage.cache_read_input_tokens ?? 0;

await spendStructuredTokens(
{
context,
user: this.user,
conversationId: this.conversationId,
model: model ?? this.modelOptions.model,
endpointTokenConfig: this.options.endpointTokenConfig,
},
{
promptTokens: { input, write, read },
completionTokens,
},
);

return;
}

await spendTokens(
{
context,
Expand Down Expand Up @@ -560,6 +668,18 @@ class AnthropicClient extends BaseClient {
: await client.completions.create(options);
}

/**
* @param {string} modelName
* @returns {boolean}
*/
checkPromptCacheSupport(modelName) {
const modelMatch = matchModelName(modelName, EModelEndpoint.anthropic);
if (modelMatch === 'claude-3-5-sonnet' || modelMatch === 'claude-3-haiku') {
return true;
}
return false;
}

async sendCompletion(payload, { onProgress, abortController }) {
if (!abortController) {
abortController = new AbortController();
Expand Down Expand Up @@ -606,10 +726,22 @@ class AnthropicClient extends BaseClient {
requestOptions.max_tokens_to_sample = maxOutputTokens || 1500;
}

if (this.systemMessage) {
if (this.systemMessage && this.supportsCacheControl === true) {
requestOptions.system = [
{
type: 'text',
text: this.systemMessage,
cache_control: { type: 'ephemeral' },
},
];
} else if (this.systemMessage) {
requestOptions.system = this.systemMessage;
}

if (this.supportsCacheControl === true && this.useMessages) {
requestOptions.messages = addCacheControl(requestOptions.messages);
}

logger.debug('[AnthropicClient]', { ...requestOptions });

const handleChunk = (currentChunk) => {
Expand Down Expand Up @@ -639,6 +771,11 @@ class AnthropicClient extends BaseClient {

for await (const completion of response) {
// Handle each completion as before
const type = completion?.type ?? '';
if (tokenEventTypes.has(type)) {
logger.debug(`[AnthropicClient] ${type}`, completion);
this[type] = completion;
}
if (completion?.delta?.text) {
handleChunk(completion.delta.text);
} else if (completion.completion) {
Expand Down Expand Up @@ -727,6 +864,8 @@ class AnthropicClient extends BaseClient {
*/
async titleConvo({ text, responseText = '' }) {
let title = 'New Chat';
this.message_delta = undefined;
this.message_start = undefined;
const convo = `<initial_message>
${truncateText(text)}
</initial_message>
Expand Down
104 changes: 99 additions & 5 deletions api/app/clients/BaseClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,22 @@ class BaseClient {
throw new Error('Subclasses attempted to call summarizeMessages without implementing it');
}

async getTokenCountForResponse(response) {
logger.debug('`[BaseClient] recordTokenUsage` not implemented.', response);
/**
* Abstract method to get the token count for a message. Subclasses must implement this method.
* @param {TMessage} responseMessage
* @returns {number}
*/
getTokenCountForResponse(responseMessage) {
logger.debug('`[BaseClient] recordTokenUsage` not implemented.', responseMessage);
}

/**
* Abstract method to record token usage. Subclasses must implement this method.
* If a correction to the token usage is needed, the method should return an object with the corrected token counts.
* @param {number} promptTokens
* @param {number} completionTokens
* @returns {Promise<void>}
*/
async recordTokenUsage({ promptTokens, completionTokens }) {
logger.debug('`[BaseClient] recordTokenUsage` not implemented.', {
promptTokens,
Expand Down Expand Up @@ -536,13 +548,31 @@ class BaseClient {
this.getTokenCountForResponse &&
this.getTokenCount
) {
responseMessage.tokenCount = this.getTokenCountForResponse(responseMessage);
const completionTokens = this.getTokenCount(completion);
await this.recordTokenUsage({ promptTokens, completionTokens });
let completionTokens;

/**
* Metadata about input/output costs for the current message. The client
* should provide a function to get the current stream usage metadata; if not,
* use the legacy token estimations.
* @type {StreamUsage | null} */
const usage = this.getStreamUsage != null ? this.getStreamUsage() : null;

if (usage != null && Number(usage.output_tokens) > 0) {
responseMessage.tokenCount = usage.output_tokens;
completionTokens = responseMessage.tokenCount;
await this.updateUserMessageTokenCount({ usage, tokenCountMap, userMessage, opts });
} else {
responseMessage.tokenCount = this.getTokenCountForResponse(responseMessage);
completionTokens = this.getTokenCount(completion);
}

await this.recordTokenUsage({ promptTokens, completionTokens, usage });
}

if (this.userMessagePromise) {
await this.userMessagePromise;
}

this.responsePromise = this.saveMessageToDatabase(responseMessage, saveOptions, user);
const messageCache = getLogStores(CacheKeys.MESSAGES);
messageCache.set(
Expand All @@ -557,6 +587,66 @@ class BaseClient {
return responseMessage;
}

/**
* Stream usage should only be used for user message token count re-calculation if:
* - The stream usage is available, with input tokens greater than 0,
* - the client provides a function to calculate the current token count,
* - files are being resent with every message (default behavior; or if `false`, with no attachments),
* - the `promptPrefix` (custom instructions) is not set.
*
* In these cases, the legacy token estimations would be more accurate.
*
* TODO: included system messages in the `orderedMessages` accounting, potentially as a
* separate message in the UI. ChatGPT does this through "hidden" system messages.
* @param {object} params
* @param {StreamUsage} params.usage
* @param {Record<string, number>} params.tokenCountMap
* @param {TMessage} params.userMessage
* @param {object} params.opts
*/
async updateUserMessageTokenCount({ usage, tokenCountMap, userMessage, opts }) {
/** @type {boolean} */
const shouldUpdateCount =
this.calculateCurrentTokenCount != null &&
Number(usage.input_tokens) > 0 &&
(this.options.resendFiles ||
(!this.options.resendFiles && !this.options.attachments?.length)) &&
!this.options.promptPrefix;

if (!shouldUpdateCount) {
return;
}

const userMessageTokenCount = this.calculateCurrentTokenCount({
currentMessageId: userMessage.messageId,
tokenCountMap,
usage,
});

if (userMessageTokenCount === userMessage.tokenCount) {
return;
}

userMessage.tokenCount = userMessageTokenCount;
/*
Note: `AskController` saves the user message, so we update the count of its `userMessage` reference
*/
if (typeof opts?.getReqData === 'function') {
opts.getReqData({
userMessage,
});
}
/*
Note: we update the user message to be sure it gets the calculated token count;
though `AskController` saves the user message, EditController does not
*/
await this.userMessagePromise;
await this.updateMessageInDatabase({
messageId: userMessage.messageId,
tokenCount: userMessageTokenCount,
});
}

async loadHistory(conversationId, parentMessageId = null) {
logger.debug('[BaseClient] Loading history:', { conversationId, parentMessageId });

Expand Down Expand Up @@ -644,6 +734,10 @@ class BaseClient {
return { message: savedMessage, conversation };
}

/**
* Update a message in the database.
* @param {Partial<TMessage>} message
*/
async updateMessageInDatabase(message) {
await updateMessage(this.options.req, message);
}
Expand Down
2 changes: 1 addition & 1 deletion api/app/clients/OpenAIClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ const {
createContextHandlers,
} = require('./prompts');
const { encodeAndFormat } = require('~/server/services/Files/images/encode');
const { spendTokens } = require('~/models/spendTokens');
const { isEnabled, sleep } = require('~/server/utils');
const { handleOpenAIErrors } = require('./tools/util');
const spendTokens = require('~/models/spendTokens');
const { createLLM, RunManager } = require('./llm');
const ChatGPTClient = require('./ChatGPTClient');
const { summaryBuffer } = require('./memory');
Expand Down
Loading

0 comments on commit 1b716f3

Please sign in to comment.