Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JS] feat: streaming support for Tools Augmentation #2195

Merged
merged 10 commits into from
Dec 9, 2024
9 changes: 6 additions & 3 deletions getting-started/CONCEPTS/STREAMING.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@ AI-powered bots tend to have slower response times which can disengage users. Th

A common solution is to stream the bot’s response to users while the LLM generates its full response. Through streaming, your bot can offer an experience that feels engaging, responsive, and on-par with leading AI products.

There are two parts to streaming:
There are three parts to streaming:

- **Informative Updates**: Provide users with insights into what your bot is doing before it has started generating its response.

- **Response Streaming**: Provide users with chunks of the response as they are generated by the LLM. This feels like the bot is actively typing out its message.

- **Tools Streaming**: Initiate tool (action) calls as part of the streaming response. Streaming can now be paired with the `tools` augmentation to enable action calling as part of the streaming experience.

## Sample Bots
- [C# Streaming ChefBot](https://github.com/microsoft/teams-ai/tree/main/dotnet/samples/04.ai.g.teamsChefBot-streaming)
- [JS Streaming ChefBot](https://github.com/microsoft/teams-ai/tree/main/js/samples/04.ai-apps/i.teamsChefBot-streaming)
- [JS Streaming+Tools LightBot](https://github.com/microsoft/teams-ai/tree/main/js/samples/03.ai-concepts/c.actionMapping-lightBot)
- [Python Streaming ListBot](https://github.com/microsoft/teams-ai/tree/main/python/samples/04.ai.h.chainedActions.listBot-streaming)

## Streaming Response Class
Expand All @@ -45,8 +48,6 @@ The expected sequence of calls is:
2. `queueTextChunk()`, ...,
3. `endStream()`.

Once `endStream()` is called, the stream is considered ended and no further updates can be sent.


## Configuration with Azure Open AI / Open AI

Expand All @@ -66,6 +67,8 @@ Once `endStream()` is called, the stream is considered ended and no further upda
- Attachments can only be sent in the final streamed chunk.
- Streaming is not available in conjunction with AI SDK's function calls yet.
- Streaming does not work with OpenAI's `o1` models.
- Tools Streaming only works with the `tools` augmentation. The `sequence` and `monologue` augmentations do not currently support streaming.
- Streaming without tools support works with the `default` augmentation.


### Setup Instructions:
Expand Down
54 changes: 40 additions & 14 deletions js/packages/teams-ai/src/models/OpenAIModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ export class OpenAIModel implements PromptCompletionModel {
params.presence_penalty = 0;
}

// Check for tools augmentation
const isToolsAugmentation =
template.config.augmentation && template.config.augmentation?.augmentation_type == 'tools';

// Call chat completion API
let message: Message<string>;
const completion = await this._client.chat.completions.create(params);
Expand All @@ -373,20 +377,44 @@ export class OpenAIModel implements PromptCompletionModel {
if (delta.content) {
message.content += delta.content;
}

// Handle tool calls
if (delta.tool_calls) {
message.action_calls = delta.tool_calls.map(
(toolCall: OpenAI.Chat.Completions.ChatCompletionChunk.Choice.Delta.ToolCall) => {
return {
id: toolCall.id,
function: {
name: toolCall.function!.name,
arguments: toolCall.function!.arguments
},
type: toolCall.type
} as ActionCall;
// - We don't know how many tool calls there will be so we need to add them one-by-one.
if (isToolsAugmentation && delta.tool_calls) {
// Create action calls array if it doesn't exist
if (!Array.isArray(message.action_calls)) {
message.action_calls = [];
}

// Add tool calls to action calls
for (const toolCall of delta.tool_calls) {
// Add empty tool call to message if new index
// - Note that a single tool call can span multiple chunks.
const index = toolCall.index;
if (index >= message.action_calls.length) {
message.action_calls.push({ id: '', function: { name: '', arguments: '' }, type: '' } as any);
}

// Set ID if provided
if (toolCall.id) {
message.action_calls[index].id = toolCall.id;
}

// Set type if provided
if (toolCall.type) {
message.action_calls[index].type = toolCall.type;
}

// Append function name if provided
if (toolCall.function?.name) {
message.action_calls[index].function.name += toolCall.function.name;
}

// Append function arguments if provided
if (toolCall.function?.arguments) {
message.action_calls[index].function.arguments += toolCall.function.arguments;
}
);
}
}

// Signal chunk received
Expand Down Expand Up @@ -415,8 +443,6 @@ export class OpenAIModel implements PromptCompletionModel {
message.context = messageWithContext.context;
}
const actionCalls: ActionCall[] = [];
const isToolsAugmentation =
template.config.augmentation && template.config.augmentation?.augmentation_type == 'tools';

// Log tool calls to be added to message of type Message<string> as action_calls
if (isToolsAugmentation && responseMessage?.tool_calls) {
Expand Down
65 changes: 43 additions & 22 deletions js/packages/teams-ai/src/planners/LLMClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ export class LLMClient<TContent = any> {
functions: PromptFunctions
): Promise<PromptResponse<TContent>> {
// Define event handlers
let isStreaming = false;
let streamer: StreamingResponse | undefined;
const beforeCompletion: PromptCompletionModelBeforeCompletionEvent = (
ctx,
Expand All @@ -301,20 +300,23 @@ export class LLMClient<TContent = any> {

// Check for a streaming response
if (streaming) {
isStreaming = true;

// Create streamer and send initial message
streamer = new StreamingResponse(context);
memory.setValue('temp.streamer', streamer);

if (this._enableFeedbackLoop != null) {
streamer.setFeedbackLoop(this._enableFeedbackLoop);
}

streamer.setGeneratedByAILabel(true);

if (this._startStreamingMessage) {
streamer.queueInformativeUpdate(this._startStreamingMessage);
// Attach to any existing streamer
// - see tool call note below to understand.
streamer = memory.getValue('temp.streamer');
if (!streamer) {
// Create streamer and send initial message
streamer = new StreamingResponse(context);
memory.setValue('temp.streamer', streamer);

if (this._enableFeedbackLoop != null) {
streamer.setFeedbackLoop(this._enableFeedbackLoop);
}

streamer.setGeneratedByAILabel(true);

if (this._startStreamingMessage) {
streamer.queueInformativeUpdate(this._startStreamingMessage);
}
}
}
};
Expand All @@ -325,6 +327,12 @@ export class LLMClient<TContent = any> {
return;
}

// Ignore tool calls
// - see the tool call note below to understand why we're ignoring them.
if ((chunk.delta as any)?.tool_calls || chunk.delta?.action_calls) {
return;
}

// Send chunk to client
const text = chunk.delta?.content ?? '';
const citations = chunk.delta?.context?.citations ?? undefined;
Expand All @@ -347,15 +355,28 @@ export class LLMClient<TContent = any> {
try {
// Complete the prompt
const response = await this.callCompletePrompt(context, memory, functions);
if (response.status == 'success' && isStreaming) {
// Delete message from response to avoid sending it twice
delete response.message;
}

// End the stream if streaming
// - We're not listening for the response received event because we can't await the completion of events.
// Handle streaming responses
if (streamer) {
await streamer.endStream();
// Tool call handling
// - We need to keep the streamer around during tool calls so we're just letting them return as normal
// messages minus the message content. The text content is being streamed to the client in chunks.
// - When the tool call completes we'll call back into ActionPlanner and end up re-attaching to the
// streamer. This will result in us continuing to stream the response to the client.
if (Array.isArray(response.message?.action_calls)) {
// Ensure content is empty for tool calls
response.message!.content = '' as TContent;
} else {
if (response.status == 'success') {
// Delete message from response to avoid sending it twice
delete response.message;
}

// End the stream and remove pointer from memory
// - We're not listening for the response received event because we can't await the completion of events.
await streamer.endStream();
memory.deleteValue('temp.streamer');
}
}

return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ const model = new OpenAIModel({
azureApiVersion: '2023-03-15-preview',

// Request logging
logRequests: true
logRequests: true,
stream: true
});

const prompts = new PromptManager({
Expand Down Expand Up @@ -131,13 +132,13 @@ app.ai.action('LightStatus', async (context: TurnContext, state: ApplicationTurn
// Register action handlers
app.ai.action('LightsOn', async (context: TurnContext, state: ApplicationTurnState) => {
state.conversation.lightsOn = true;
await context.sendActivity(`[lights on]`);
console.log('[Turning lights on]');
return `the lights are now on`;
});

app.ai.action('LightsOff', async (context: TurnContext, state: ApplicationTurnState) => {
state.conversation.lightsOn = false;
await context.sendActivity(`[lights off]`);
console.log('[Turning lights off]');
return `the lights are now off`;
});

Expand All @@ -146,7 +147,7 @@ interface PauseParameters {
}

app.ai.action('Pause', async (context: TurnContext, state: ApplicationTurnState, parameters: PauseParameters) => {
await context.sendActivity(`[pausing for ${parameters.time / 1000} seconds]`);
console.log(`[Pausing for ${parameters.time / 1000} seconds]`);
await new Promise((resolve) => setTimeout(resolve, parameters.time));
return `done pausing`;
});
Expand Down