Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(conversation): support response streaming #2986

Merged
merged 7 commits into from
Nov 4, 2024

Conversation

atierian
Copy link
Member

@atierian atierian commented Oct 29, 2024

Description of changes

  • Adds support for streaming assistant responses.
  • Updates E2E test cases for streaming.
  • Adds E2E test case for streaming with client tools.
  • Adds snapshot test cases for streaming resolvers.

Design

The key change here is that the lambda function sends chunks / events to the assistant response mutation rather than a full message.

Example
Bedrock streaming response “hello world” is broken up into two chunks “hello” and “ world”.

// first event
{ 
  // mutation response 
   conversationId: '123',
  associatedUserMessageId: 'abc',
  contentBlockIndex: 0,
  contentBlockDeltaIndex: 0,
  contentBlockText: 'hello',
  
  // persisted in messages table
  accumulatedTurnContent: [{ text: 'hello' }]
}

// second event
{
  // mutation response  
  conversationId: '123',
  associatedUserMessageId: 'abc',
  contentBlockIndex: 0,
  contentBlockDeltaIndex: 1,
  contentBlockText: ' world',
  
  // persisted in messages table
  accumulatedTurnContent: [{ text: 'hello world' }]
}

Types

GraphQL Input
The input type for the assistant response stream mutation invoked by the Lambda function.

input CreateConversationMessageRouteAssistantStreamingInput {
  # always included
  conversationId: ID!
  associatedUserMessageId: ID!
  contentBlockIndex: Int!
  accumulatedTurnContent: [ContentBlock]
  
  # text chunk 
  contentBlockDeltaIndex: Int
  contentBlockText: String
 
  # end of block. applicable to text blocks.
  contentBlockDoneAtIndex: Int
   
  # well-formed tool use (client tool)
  contentBlockToolUse: AWSJSON 
  
  # turn complete
  stopReason: String  
}

ConversationMessageStreamPart Type
The response type of the assistant response stream mutation and paired subscription

type ConversationMessageStreamPart {
  id: ID!
  owner: String
  conversationId: ID!
  associatedUserMessageId: ID!
  contentBlockIndex: Int!
  contentBlockText: String
  contentBlockDeltaIndex: Int
  contentBlockToolUse: ToolUseBlock
  contentBlockDoneAtIndex: Int
  stopReason: String
}

Data Flow

image

Related PRs

CDK / CloudFormation Parameters Changed

N/A

Issue #, if available

N/A

Description of how you validated changes

Checklist

  • PR description included
  • yarn test passes
  • E2E test run linked
  • Tests are changed or added
  • Relevant documentation is changed or added (and PR referenced)
  • New AWS SDK calls or CloudFormation actions have been added to relevant test and service IAM policies
  • Any CDK or CloudFormation parameter changes are called out explicitly

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

Comment on lines +289 to +311
export const constructStreamResponseType = (): ObjectTypeDefinitionNode => {
return {
kind: 'ObjectTypeDefinition',
name: { kind: 'Name', value: STREAM_RESPONSE_TYPE_NAME },
fields: [
makeField('id', [], makeNonNullType(makeNamedType('ID'))),
makeField('owner', [], makeNamedType('String')),
makeField('conversationId', [], makeNonNullType(makeNamedType('ID'))),
makeField('associatedUserMessageId', [], makeNonNullType(makeNamedType('ID'))),

makeField('contentBlockIndex', [], makeNonNullType(makeNamedType('Int'))),

makeField('contentBlockText', [], makeNamedType('String')),
makeField('contentBlockDeltaIndex', [], makeNamedType('Int')),

makeField('contentBlockToolUse', [], makeNamedType('AWSJSON')),

makeField('contentBlockDoneAtIndex', [], makeNamedType('Int')),

makeField('stopReason', [], makeNamedType('String')),
],
};
};
Copy link
Member Author

@atierian atierian Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This currently isn't being used; the type comes from data-schema.
However, that is changing in a follow up PR where the supporting types are consolidated within the transformer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the type is coming from data-schema, do we need to refactor the definition so it's common to both data-schema and the transformer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This currently isn't being used. I have a WIP change that removes all supporting type definitions from data-schema. For that change, this type is necessary. Happy to remove it from this PR if you'd like; in hindsight it shouldn't have been included here.

p5quared
p5quared previously approved these changes Oct 31, 2024
// reconstruct the message from the events
const sortedEvents = events
.filter((event) => event.contentBlockText)
.sort((a, b) => a.contentBlockDeltaIndex - b.contentBlockDeltaIndex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we are able to assume that all of the events from from this single subscription have the same contentBlockIndex? I'm curious what would happen if I sent two messages at the same time with the same conversationID e.g. does one fail or does one hang or is it undefined?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we are able to assume that all of the events from from this single subscription have the same contentBlockIndex?

Close, but not quite.
The contentBlockIndex represents the index of a content block for a given assistant response message. In the example below, the first content block has a contentBlockIndex of 0, the second 1.

{
  // other fields
  content: [
    {
       text: "Checking the weather",
    },
    {
       text: "The temperature in Charleston, SC is 84° F currently",
    },
  ]
}

This test is indeed assuming that there will only be one content block returned from the assistant because the test implementation guarantees it.

In hindsight, this test should just be using the reconcileStreamEvents function. I'll switch to using that in a follow up.

I'm curious what would happen if I sent two messages at the same time with the same conversationID e.g. does one fail or does one hang or is it undefined?

No blocking or failing 😄
When a user sends a message, we read the conversation history from the ConversationMessage DDB table. If an assistant response hasn't yet been written to that table for a separate in-flight user message, that other in-flight user message isn't included in history.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarification on contentBlockIndex and double messages for me.

- adds assistant response mutation streaming resolver implementation.
- adds asistant response mutation resolver pipeline.
- adds assistant response stream mutation input type to schema.
- updates invoke-lambda resolver payload to include streaming metadata.
@atierian atierian force-pushed the ai.conversation-streaming-accumulated-turn-content branch from 51ae7fc to d8cdc68 Compare November 4, 2024 15:19
Comment on lines 101 to 102
const assistantResponsePipelineResolver = generateResolverPipeline(assistantResponsePipelineDefinition, directive, ctx);
ctx.resolvers.addResolver(parentName, directive.assistantResponseMutation.field.name.value, assistantResponsePipelineResolver);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the non-streaming response pipeline; it will be removed in a follow up.


makeInputValueDefinition('stopReason', makeNamedType('String')),

makeInputValueDefinition('accumulatedTurnContent', makeListType(makeNamedType('ContentBlockInput'))),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re ContentBlockInput: I'm mildly nervous about referring to predefined AI types defined with a string literal, especially when that type is not local to the module. Do we have a formal definition of this somewhere that we can reference rather than using string typing? If not, maybe we just refactor it into a module-scoped constant like you do with STREAM_RESPONSE_TYPE_NAME?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good point. I'm going to be changing names of supporting types in an upcoming PR to reduce the likelihood of naming collisions with existing schema types. I'll const the names there.

@@ -256,3 +283,29 @@ const constructConversationMessageModel = (

return object;
};

const STREAM_RESPONSE_TYPE_NAME = 'ConversationMessageStreamPart';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future PR nit: move to top of file for easier discovery

Comment on lines +289 to +311
export const constructStreamResponseType = (): ObjectTypeDefinitionNode => {
return {
kind: 'ObjectTypeDefinition',
name: { kind: 'Name', value: STREAM_RESPONSE_TYPE_NAME },
fields: [
makeField('id', [], makeNonNullType(makeNamedType('ID'))),
makeField('owner', [], makeNamedType('String')),
makeField('conversationId', [], makeNonNullType(makeNamedType('ID'))),
makeField('associatedUserMessageId', [], makeNonNullType(makeNamedType('ID'))),

makeField('contentBlockIndex', [], makeNonNullType(makeNamedType('Int'))),

makeField('contentBlockText', [], makeNamedType('String')),
makeField('contentBlockDeltaIndex', [], makeNamedType('Int')),

makeField('contentBlockToolUse', [], makeNamedType('AWSJSON')),

makeField('contentBlockDoneAtIndex', [], makeNamedType('Int')),

makeField('stopReason', [], makeNamedType('String')),
],
};
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the type is coming from data-schema, do we need to refactor the definition so it's common to both data-schema and the transformer?

/**
* The init slot for the assistant response mutation resolver.
*/
function init(): ResolverFunctionDefinition {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We prefer arrow functions rather than function declarations. I'm not sure how this even passed linting?

Comment on lines +27 to +30
fileName: 'init-resolver-fn.template.js',
generateTemplate: (_, code) => MappingTemplate.inlineTemplateFromString(code),
substitutions: (_, ctx) => ({
GRAPHQL_API_ENDPOINT: ctx.api.graphqlUrl,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future refactor nit: we should join up the template and substitution keys in a structure so they're easier to manage.

}

/**
* The auth slot for the assistant response mutation resolver.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit here & throughout: it'd be great to document the important functionality going on in each of the slots rather than reiterating the name. e.g., auth enforces CUP & ownership; session owner verifies session owned by sub, etc...

* Creates a template generator specific to the assistant response pipeline for a given slot name.
*/
function templateGenerator(slotName: string) {
return createS3AssetMappingTemplateGenerator('Mutation', slotName, fieldName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make sure I'm reading this correctly: you're intentionally passing the fieldName function as an argument to createS3AssetMappingTemplateGenerator right?

@@ -129,3 +131,4 @@ function templateGenerator(slotName: string) {
}

const selectionSet = `id conversationId content { image { format source { bytes }} text toolUse { toolUseId name input } toolResult { status toolUseId content { json text image { format source { bytes }} document { format name source { bytes }} }}} role owner createdAt updatedAt`;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this in use any more?

':updatedAt': updatedAt,
});

// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ReservedWords.html
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What reserved word prompted this?

@@ -65,15 +66,17 @@ export class ConversationPrepareHandler {
*/
private prepareResourcesForDirective(directive: ConversationDirectiveConfiguration, ctx: TransformerPrepareStepContextProvider): void {
// TODO: Add @aws_cognito_user_pools directive to send messages mutation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO covered by this change?

@atierian atierian merged commit 815d51f into main Nov 4, 2024
7 checks passed
@atierian atierian deleted the ai.conversation-streaming-accumulated-turn-content branch November 4, 2024 17:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants