Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stepfunctions-tasks): fix bedrock input/output path in step-funct… #31305

Merged
merged 19 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
{
"Ref": "AWS::Region"
},
"::foundation-model/amazon.titan-text-express-v1\",\"Body\":{\"inputText.$\":\"States.Format('Alphabetize this list of first names:\\n{}', $.names)\",\"textGenerationConfig\":{\"maxTokenCount\":100,\"temperature\":1}}}},\"Prompt3\":{\"End\":true,\"Type\":\"Task\",\"InputPath\":\"$.names\",\"OutputPath\":\"$.names\",\"Resource\":\"arn:",
"::foundation-model/amazon.titan-text-express-v1\",\"Body\":{\"inputText.$\":\"States.Format('Alphabetize this list of first names: {}', $.names)\",\"textGenerationConfig\":{\"maxTokenCount\":100,\"temperature\":1}}}},\"Prompt3\":{\"Next\":\"Prompt4\",\"Type\":\"Task\",\"OutputPath\":\"$.Body.results[0].outputText\",\"Resource\":\"arn:",
{
"Ref": "AWS::Partition"
},
Expand All @@ -115,7 +115,19 @@
{
"Ref": "AWS::Region"
},
"::foundation-model/amazon.titan-text-express-v1\",\"Input\":{\"S3Uri.$\":\"$.names\"},\"Output\":{\"S3Uri.$\":\"$.names\"}}}},\"TimeoutSeconds\":30}"
"::foundation-model/amazon.titan-text-express-v1\",\"Body\":{\"inputText.$\":\"States.Format('Echo list of first names: {}', $.names)\",\"textGenerationConfig\":{\"maxTokenCount\":100,\"temperature\":1}}}},\"Prompt4\":{\"End\":true,\"Type\":\"Task\",\"Resource\":\"arn:",
{
"Ref": "AWS::Partition"
},
":states:::bedrock:invokeModel\",\"Parameters\":{\"ModelId\":\"arn:",
{
"Ref": "AWS::Partition"
},
":bedrock:",
{
"Ref": "AWS::Region"
},
"::foundation-model/amazon.titan-text-express-v1\",\"Input\":{\"S3Uri\":\"$.names\"},\"Output\":{\"S3Uri\":\"$.names\"}}}},\"TimeoutSeconds\":30}"
]
]
},
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ import { BedrockInvokeModel } from 'aws-cdk-lib/aws-stepfunctions-tasks';
* * aws stepfunctions describe-execution --execution-arn <exection-arn generated before> : should return status as SUCCEEDED
* This integ test does not actually verify a Step Functions execution, as not all AWS accounts have Bedrock model access.
*/
const app = new cdk.App();
const app = new cdk.App({
postCliContext: {
'@aws-cdk/aws-cdk.aws-stepfunctions-tasks:useNewS3UriParametersForBedrockInvokeModelTask': true,
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the default value of the flag is True, so we do not need to set this flag, and instead you need to update the old integration test cases to set the flag to false to test the current behaviour (using Input path, and output path instead of the new S3 uris)

});
const stack = new cdk.Stack(app, 'aws-stepfunctions-tasks-bedrock-invoke-model-integ');

const model = bedrock.FoundationModel.fromFoundationModelId(stack, 'Model', bedrock.FoundationModelIdentifier.AMAZON_TITAN_TEXT_G1_EXPRESS_V1);
Expand All @@ -37,7 +41,7 @@ const prompt2 = new BedrockInvokeModel(stack, 'Prompt2', {
body: sfn.TaskInput.fromObject(
{
inputText: sfn.JsonPath.format(
'Alphabetize this list of first names:\n{}',
'Alphabetize this list of first names: {}',
sfn.JsonPath.stringAt('$.names'),
),
textGenerationConfig: {
Expand All @@ -52,13 +56,33 @@ const prompt2 = new BedrockInvokeModel(stack, 'Prompt2', {
resultPath: '$',
});

/** Test for Bedrock Output Path */
const prompt3 = new BedrockInvokeModel(stack, 'Prompt3', {
model,
inputPath: sfn.JsonPath.stringAt('$.names'),
outputPath: sfn.JsonPath.stringAt('$.names'),
body: sfn.TaskInput.fromObject(
{
inputText: sfn.JsonPath.format(
'Echo list of first names: {}',
sfn.JsonPath.stringAt('$.names'),
),
textGenerationConfig: {
maxTokenCount: 100,
temperature: 1,
},
},
),
outputPath: '$.Body.results[0].outputText',
});

/** Test for Bedrock s3 URI Path */
//Execution will fail for the following input as it expects a valid s3 URI from previous prompt
const prompt4 = new BedrockInvokeModel(stack, 'Prompt4', {
model,
input: { s3InputUri: '$.names' },
output: { s3OutputUri: '$.names' },
});

const chain = sfn.Chain.start(prompt1).next(prompt2).next(prompt3);
const chain = sfn.Chain.start(prompt1).next(prompt2).next(prompt3).next(prompt4);

new sfn.StateMachine(stack, 'StateMachine', {
definitionBody: sfn.DefinitionBody.fromChainable(chain),
Expand Down
34 changes: 34 additions & 0 deletions packages/aws-cdk-lib/aws-stepfunctions-tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -398,11 +398,45 @@ const task = new tasks.BedrockInvokeModel(this, 'Prompt Model', {
names: sfn.JsonPath.stringAt('$.Body.results[0].outputText'),
},
});

```
### Using Input Path for S3 URI

Provide S3 URI as an input or output path to invoke a model

To specify the S3 URI as JSON path to your input or output fields, use props `s3InputUri` and `s3OutputUri` under BedrockInvokeModelProps and set
feature flag `@aws-cdk/aws-stepfunctions-tasks:useNewS3UriParametersForBedrockInvokeModelTask` to true.

If this flag is not set, then the existing behaviour of populating the S3Uri from `InputPath` and `OutputPath` will take effect.
shikha372 marked this conversation as resolved.
Show resolved Hide resolved

```ts

import * as bedrock from 'aws-cdk-lib/aws-bedrock';

const model = bedrock.FoundationModel.fromFoundationModelId(
this,
'Model',
bedrock.FoundationModelIdentifier.AMAZON_TITAN_TEXT_G1_EXPRESS_V1,
);

const task = new tasks.BedrockInvokeModel(this, 'Prompt Model', {
model,
input : { s3InputUri: sfn.JsonPath.stringAt('$.prompt') },
output: { s3OutputUri: sfn.JsonPath.stringAt('$.prompt') },
});

```

### Using Input Path

Provide S3 URI as an input or output path to invoke a model

Currently, input and output Path provided in the BedrockInvokeModelProps input is defined as S3URI field under task definition of state machine.
To modify the existing behaviour, set `@aws-cdk/aws-stepfunctions-tasks:useNewS3UriParametersForBedrockInvokeModelTask` to true.
shikha372 marked this conversation as resolved.
Show resolved Hide resolved

If this feature flag is enabled, S3URI fields will be generated from other Props(`s3InputUri` and `s3OutputUri`), and the given inputPath, OutputPath will be rendered as
it is in the JSON task definition.

```ts

import * as bedrock from 'aws-cdk-lib/aws-bedrock';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import * as bedrock from '../../../aws-bedrock';
import * as iam from '../../../aws-iam';
import * as s3 from '../../../aws-s3';
import * as sfn from '../../../aws-stepfunctions';
import { Stack } from '../../../core';
import { Annotations, Stack, FeatureFlags } from '../../../core';
import * as cxapi from '../../../cx-api';
import { integrationResourceArn, validatePatternSupported } from '../private/task-utils';

/**
Expand All @@ -22,6 +23,15 @@ export interface BedrockInvokeModelInputProps {
* @default - Input data is retrieved from the `body` field
*/
readonly s3Location?: s3.Location;

/**
* The source location where the API response is written.
*
* This field can be used to specify s3 URI in the form of token
*
* @default - The API response body is returned in the result.
*/
readonly s3InputUri?: string;
}

/**
Expand All @@ -40,6 +50,15 @@ export interface BedrockInvokeModelOutputProps {
* @default - Response body is returned in the task result
*/
readonly s3Location?: s3.Location;

/**
* The destination location where the API response is written.
*
* This field can be used to specify s3 URI in the form of token
*
* @default - The API response body is returned in the result.
*/
readonly s3OutputUri?: string;
}

/**
Expand Down Expand Up @@ -145,9 +164,17 @@ export class BedrockInvokeModel extends sfn.TaskStateBase {

validatePatternSupported(this.integrationPattern, BedrockInvokeModel.SUPPORTED_INTEGRATION_PATTERNS);

const useNewS3UriParamsForTask = FeatureFlags.of(this).isEnabled(cxapi.USE_NEW_S3URI_PARAMETERS_FOR_BEDROCK_INVOKE_MODEL_TASK);

const isBodySpecified = props.body !== undefined;
//Either specific props.input with bucket name and object key or input s3 path
const isInputSpecified = (props.input !== undefined && props.input.s3Location !== undefined) || (props.inputPath !== undefined);

let isInputSpecified: boolean;
if (!useNewS3UriParamsForTask) {
isInputSpecified = (props.input !== undefined && props.input.s3Location !== undefined) || (props.inputPath !== undefined);
} else {
//Either specific props.input with bucket name and object key or input s3 path
isInputSpecified = props.input!==undefined ? props.input?.s3Location !== undefined || props.input?.s3InputUri !== undefined : false;
shikha372 marked this conversation as resolved.
Show resolved Hide resolved
}

if (isBodySpecified && isInputSpecified) {
throw new Error('Either `body` or `input` must be specified, but not both.');
Expand All @@ -161,19 +188,33 @@ export class BedrockInvokeModel extends sfn.TaskStateBase {
if (props.output?.s3Location?.objectVersion !== undefined) {
throw new Error('Output S3 object version is not supported.');
}
if (props.input?.s3InputUri && props.input.s3Location) {
throw new Error('Cannot specify both S3 InputUri and S3 location');
}
if (props.input?.s3InputUri === '') {
throw new Error('S3 InputUri cannot be an empty string');
}
shikha372 marked this conversation as resolved.
Show resolved Hide resolved

//Warning to let users know about the newly introduced props
if (props.inputPath || props.outputPath && !useNewS3UriParamsForTask) {
Annotations.of(scope).addWarningV2('aws-cdk-lib/aws-stepfunctions-taks',
'These props will set the value of inputPath/outputPath as s3 URI under input/output field in state machine JSON definition. To modify the behaviour set feature flag `@aws-cdk/aws-stepfunctions-tasks:useNewS3UriParametersForBedrockInvokeModelTask": true` and use props input.s3InputUri/output.s3OutputUri');
}

this.taskPolicies = this.renderPolicyStatements();
}

private renderPolicyStatements(): iam.PolicyStatement[] {
const useNewS3UriParamsForTask = FeatureFlags.of(this).isEnabled(cxapi.USE_NEW_S3URI_PARAMETERS_FOR_BEDROCK_INVOKE_MODEL_TASK);
const policyStatements = [
new iam.PolicyStatement({
actions: ['bedrock:InvokeModel'],
resources: [this.props.model.modelArn],
}),
];

if (this.props.inputPath !== undefined) {
//For Compatibility with existing behaviour of input path
if (this.props.input?.s3InputUri !== undefined || (!useNewS3UriParamsForTask && this.props.inputPath !== undefined)) {
policyStatements.push(
new iam.PolicyStatement({
actions: ['s3:GetObject'],
Expand Down Expand Up @@ -204,7 +245,8 @@ export class BedrockInvokeModel extends sfn.TaskStateBase {
);
}

if (this.props.outputPath !== undefined) {
//For Compatibility with existing behaviour of output path
if (this.props.output?.s3OutputUri !== undefined || (!useNewS3UriParamsForTask && this.props.outputPath !== undefined)) {
policyStatements.push(
new iam.PolicyStatement({
actions: ['s3:PutObject'],
Expand Down Expand Up @@ -262,19 +304,19 @@ export class BedrockInvokeModel extends sfn.TaskStateBase {
* @internal
*/
protected _renderTask(): any {

const useNewS3UriParamsForTask = FeatureFlags.of(this).isEnabled(cxapi.USE_NEW_S3URI_PARAMETERS_FOR_BEDROCK_INVOKE_MODEL_TASK);
const inputSource = this.getInputSource(this.props.input, this.props.inputPath, useNewS3UriParamsForTask);
const outputSource = this.getOutputSource(this.props.output, this.props.outputPath, useNewS3UriParamsForTask);
return {
Resource: integrationResourceArn('bedrock', 'invokeModel'),
Parameters: sfn.FieldUtils.renderObject({
ModelId: this.props.model.modelArn,
Accept: this.props.accept,
ContentType: this.props.contentType,
Body: this.props.body?.value,
Input: this.props.input?.s3Location ? {
S3Uri: `s3://${this.props.input.s3Location.bucketName}/${this.props.input.s3Location.objectKey}`,
} : this.props.inputPath ? { S3Uri: this.props.inputPath } : undefined,
Output: this.props.output?.s3Location ? {
S3Uri: `s3://${this.props.output.s3Location.bucketName}/${this.props.output.s3Location.objectKey}`,
} : this.props.outputPath ? { S3Uri: this.props.outputPath }: undefined,
Input: inputSource ? { S3Uri: inputSource } : undefined,
Output: outputSource ? { S3Uri: outputSource } : undefined,
GuardrailIdentifier: this.props.guardrail?.guardrailIdentifier,
GuardrailVersion: this.props.guardrail?.guardrailVersion,
Trace: this.props.traceEnabled === undefined
Expand All @@ -285,5 +327,27 @@ export class BedrockInvokeModel extends sfn.TaskStateBase {
}),
};
};

private getInputSource(props?: BedrockInvokeModelInputProps, inputPath?: string, useNewS3UriParamsForTask?: boolean): string | undefined {
if (props?.s3Location) {
return `s3://${props.s3Location.bucketName}/${props.s3Location.objectKey}`;
} else if (useNewS3UriParamsForTask && props?.s3InputUri) {
return props.s3InputUri;
} else if (!useNewS3UriParamsForTask && inputPath) {
return inputPath;
}
return undefined;
}

private getOutputSource(props?: BedrockInvokeModelOutputProps, outputPath?: string, useNewS3UriParamsForTask?: boolean): string | undefined {
if (props?.s3Location) {
return `s3://${props.s3Location.bucketName}/${props.s3Location.objectKey}`;
} else if (useNewS3UriParamsForTask && props?.s3OutputUri) {
return props.s3OutputUri;
} else if (!useNewS3UriParamsForTask && outputPath) {
return outputPath;
}
return undefined;
}
}

Loading
Loading