diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/lambda/run-lambda-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/lambda/run-lambda-task.ts index 4f3c66c92178b..891f3596ecd62 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/lambda/run-lambda-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/lambda/run-lambda-task.ts @@ -10,9 +10,9 @@ export interface RunLambdaTaskProps { /** * The JSON that you want to provide to your Lambda function as input. * - * @default - No payload + * @default - The state input (JSON path '$') */ - readonly payload?: { [key: string]: any }; + readonly payload?: sfn.TaskInput; /** * The service integration pattern indicates different ways to invoke Lambda function. @@ -92,7 +92,7 @@ export class RunLambdaTask implements sfn.IStepFunctionsTask { metricDimensions: { LambdaFunctionArn: this.lambdaFunction.functionArn }, parameters: { FunctionName: this.lambdaFunction.functionName, - Payload: this.props.payload, + Payload: this.props.payload ? this.props.payload.value : sfn.TaskInput.fromDataAt('$').value, InvocationType: this.props.invocationType, ClientContext: this.props.clientContext, Qualifier: this.props.qualifier, diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/lambda/integ.invoke-function.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/lambda/integ.invoke-function.ts index 4b6ab021ac64d..d3791f59782ed 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/lambda/integ.invoke-function.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/lambda/integ.invoke-function.ts @@ -26,9 +26,9 @@ const callbackHandler = new Function(stack, 'CallbackHandler', { const taskTokenHandler = new sfn.Task(stack, 'Invoke Handler with task token', { task: new tasks.RunLambdaTask(callbackHandler, { integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, - payload: { + payload: sfn.TaskInput.fromObject({ token: sfn.Context.taskToken, - }, + }), }), inputPath: '$.guid', resultPath: '$.status', diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/lambda/integ.run-lambda.expected.json b/packages/@aws-cdk/aws-stepfunctions-tasks/test/lambda/integ.run-lambda.expected.json new file mode 100644 index 0000000000000..64f8d2444d7f2 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/lambda/integ.run-lambda.expected.json @@ -0,0 +1,207 @@ +{ + "Resources": { + "submitJobLambdaServiceRole4D897ABD": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ] + ] + } + ] + } + }, + "submitJobLambdaEFB00F3C": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "ZipFile": "exports.handler = async () => {\n return {\n statusCode: '200',\n body: 'hello, world!'\n };\n };" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "submitJobLambdaServiceRole4D897ABD", + "Arn" + ] + }, + "Runtime": "nodejs10.x" + }, + "DependsOn": [ + "submitJobLambdaServiceRole4D897ABD" + ] + }, + "checkJobStateLambdaServiceRoleB8B57B65": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ] + ] + } + ] + } + }, + "checkJobStateLambda4618B7B7": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "ZipFile": "exports.handler = async function(event, context) {\n return {\n status: event.statusCode === '200' ? 'SUCCEEDED' : 'FAILED'\n };\n };" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "checkJobStateLambdaServiceRoleB8B57B65", + "Arn" + ] + }, + "Runtime": "nodejs10.x" + }, + "DependsOn": [ + "checkJobStateLambdaServiceRoleB8B57B65" + ] + }, + "StateMachineRoleB840431D": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": { + "Fn::Join": [ + "", + [ + "states.", + { + "Ref": "AWS::Region" + }, + ".amazonaws.com" + ] + ] + } + } + } + ], + "Version": "2012-10-17" + } + } + }, + "StateMachineRoleDefaultPolicyDF1E6607": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": "lambda:InvokeFunction", + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "submitJobLambdaEFB00F3C", + "Arn" + ] + } + }, + { + "Action": "lambda:InvokeFunction", + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "checkJobStateLambda4618B7B7", + "Arn" + ] + } + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "StateMachineRoleDefaultPolicyDF1E6607", + "Roles": [ + { + "Ref": "StateMachineRoleB840431D" + } + ] + } + }, + "StateMachine2E01A3A5": { + "Type": "AWS::StepFunctions::StateMachine", + "Properties": { + "DefinitionString": { + "Fn::Join": [ + "", + [ + "{\"StartAt\":\"Invoke Handler\",\"States\":{\"Invoke Handler\":{\"Next\":\"Check the job state\",\"Parameters\":{\"FunctionName\":\"", + { + "Ref": "submitJobLambdaEFB00F3C" + }, + "\",\"Payload.$\":\"$\"},\"OutputPath\":\"$.Payload\",\"Type\":\"Task\",\"Resource\":\"arn:", + { + "Ref": "AWS::Partition" + }, + ":states:::lambda:invoke\"},\"Check the job state\":{\"Next\":\"Job Complete?\",\"Parameters\":{\"FunctionName\":\"", + { + "Ref": "checkJobStateLambda4618B7B7" + }, + "\",\"Payload.$\":\"$\"},\"OutputPath\":\"$.Payload\",\"Type\":\"Task\",\"Resource\":\"arn:", + { + "Ref": "AWS::Partition" + }, + ":states:::lambda:invoke\"},\"Job Complete?\":{\"Type\":\"Choice\",\"Choices\":[{\"Variable\":\"$.status\",\"StringEquals\":\"FAILED\",\"Next\":\"Job Failed\"},{\"Variable\":\"$.status\",\"StringEquals\":\"SUCCEEDED\",\"Next\":\"Final step\"}]},\"Job Failed\":{\"Type\":\"Fail\",\"Error\":\"Received a status that was not 200\",\"Cause\":\"Job Failed\"},\"Final step\":{\"Type\":\"Pass\",\"End\":true}},\"TimeoutSeconds\":30}" + ] + ] + }, + "RoleArn": { + "Fn::GetAtt": [ + "StateMachineRoleB840431D", + "Arn" + ] + } + }, + "DependsOn": [ + "StateMachineRoleDefaultPolicyDF1E6607", + "StateMachineRoleB840431D" + ] + } + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/lambda/integ.run-lambda.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/lambda/integ.run-lambda.ts new file mode 100644 index 0000000000000..dcce6c0ed5d08 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/lambda/integ.run-lambda.ts @@ -0,0 +1,68 @@ +import { Code, Function, Runtime } from '@aws-cdk/aws-lambda'; +import * as sfn from '@aws-cdk/aws-stepfunctions'; +import * as cdk from '@aws-cdk/core'; +import * as tasks from '../../lib'; + +/* + * Stack verification steps: + * The generated State Machine can be executed from the CLI (or Step Functions console) + * and runs with an execution status of `Succeeded`. + * + * -- aws stepfunctions start-execution --state-machine-arn provides execution arn + * -- aws stepfunctions describe-execution --execution-arn returns a status of `Succeeded` + */ +const app = new cdk.App(); +const stack = new cdk.Stack(app, 'aws-stepfunctions-tasks-run-lambda-integ'); + +const submitJobLambda = new Function(stack, 'submitJobLambda', { + code: Code.fromInline(`exports.handler = async () => { + return { + statusCode: '200', + body: 'hello, world!' + }; + };`), + runtime: Runtime.NODEJS_10_X, + handler: 'index.handler', +}); + +const submitJob = new sfn.Task(stack, 'Invoke Handler', { + task: new tasks.RunLambdaTask(submitJobLambda), + outputPath: '$.Payload', +}); + +const checkJobStateLambda = new Function(stack, 'checkJobStateLambda', { + code: Code.fromInline(`exports.handler = async function(event, context) { + return { + status: event.statusCode === '200' ? 'SUCCEEDED' : 'FAILED' + }; + };`), + runtime: Runtime.NODEJS_10_X, + handler: 'index.handler', +}); + +const checkJobState = new sfn.Task(stack, 'Check the job state', { + task: new tasks.RunLambdaTask(checkJobStateLambda), + outputPath: '$.Payload', +}); + +const isComplete = new sfn.Choice(stack, 'Job Complete?'); +const jobFailed = new sfn.Fail(stack, 'Job Failed', { + cause: 'Job Failed', + error: 'Received a status that was not 200', +}); +const finalStatus = new sfn.Pass(stack, 'Final step'); + +const chain = sfn.Chain.start(submitJob) + .next(checkJobState) + .next( + isComplete + .when(sfn.Condition.stringEquals('$.status', 'FAILED'), jobFailed) + .when(sfn.Condition.stringEquals('$.status', 'SUCCEEDED'), finalStatus), + ); + +new sfn.StateMachine(stack, 'StateMachine', { + definition: chain, + timeout: cdk.Duration.seconds(30), +}); + +app.synth(); diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/lambda/run-lambda-task.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/lambda/run-lambda-task.test.ts index 3aee22f9124ee..6d46eeaf368cb 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/lambda/run-lambda-task.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/lambda/run-lambda-task.test.ts @@ -18,9 +18,9 @@ beforeEach(() => { test('Invoke lambda with default magic ARN', () => { const task = new sfn.Task(stack, 'Task', { task: new tasks.RunLambdaTask(fn, { - payload: { + payload: sfn.TaskInput.fromObject({ foo: 'bar', - }, + }), invocationType: tasks.InvocationType.REQUEST_RESPONSE, clientContext: 'eyJoZWxsbyI6IndvcmxkIn0=', qualifier: '1', @@ -63,9 +63,9 @@ test('Lambda function can be used in a Task with Task Token', () => { const task = new sfn.Task(stack, 'Task', { task: new tasks.RunLambdaTask(fn, { integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, - payload: { + payload: sfn.TaskInput.fromObject({ token: sfn.Context.taskToken, - }, + }), }), }); new sfn.StateMachine(stack, 'SM', { @@ -98,6 +98,72 @@ test('Lambda function can be used in a Task with Task Token', () => { }); }); +test('Lambda function is invoked with the state input as payload by default', () => { + const task = new sfn.Task(stack, 'Task', { + task: new tasks.RunLambdaTask(fn), + }); + new sfn.StateMachine(stack, 'SM', { + definition: task, + }); + + expect(stack.resolve(task.toStateJson())).toEqual({ + Type: 'Task', + Resource: { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition', + }, + ':states:::lambda:invoke', + ], + ], + }, + End: true, + Parameters: { + 'FunctionName': { + Ref: 'Fn9270CBC0', + }, + 'Payload.$': '$', + }, + }); +}); + +test('Lambda function can be provided with the state input as the payload', () => { + const task = new sfn.Task(stack, 'Task', { + task: new tasks.RunLambdaTask(fn, { + payload: sfn.TaskInput.fromDataAt('$'), + }), + }); + new sfn.StateMachine(stack, 'SM', { + definition: task, + }); + + expect(stack.resolve(task.toStateJson())).toEqual({ + Type: 'Task', + Resource: { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition', + }, + ':states:::lambda:invoke', + ], + ], + }, + End: true, + Parameters: { + 'FunctionName': { + Ref: 'Fn9270CBC0', + }, + 'Payload.$': '$', + }, + }); +}); + test('Task throws if WAIT_FOR_TASK_TOKEN is supplied but task token is not included in payLoad', () => { expect(() => { new sfn.Task(stack, 'Task', { diff --git a/packages/@aws-cdk/aws-stepfunctions/README.md b/packages/@aws-cdk/aws-stepfunctions/README.md index 534ce7e1f3aa1..78233bd085c17 100644 --- a/packages/@aws-cdk/aws-stepfunctions/README.md +++ b/packages/@aws-cdk/aws-stepfunctions/README.md @@ -159,17 +159,80 @@ similar to (for example) `inputPath`. #### Lambda example +[Invoke](https://docs.aws.amazon.com/lambda/latest/dg/API_Invoke.html) a Lambda function. + +You can specify the input to your Lambda function through the `payload` attribute. +By default, Step Functions invokes Lambda function with the state input (JSON path '$') +as the input. + +The following snippet invokes a Lambda Function with the state input as the payload +by referencing the `$` path. + +```ts +new sfn.Task(this, 'Invoke with state input'); +``` + +When a function is invoked, the Lambda service sends [these response +elements](https://docs.aws.amazon.com/lambda/latest/dg/API_Invoke.html#API_Invoke_ResponseElements) +back. + +⚠️ The response from the Lambda function is in an attribute called `Payload` + +The following snippet invokes a Lambda Function by referencing the `$.Payload` path +to reference the output of a Lambda executed before it. + ```ts - const task = new sfn.Task(stack, 'Invoke2', { +new sfn.Task(this, 'Invoke with empty object as payload', { + task: new tasks.RunLambdaTask(myLambda, { + payload: sfn.TaskInput.fromObject({}) + }), +}); + +new sfn.Task(this, 'Invoke with payload field in the state input', { + task: new tasks.RunLambdaTask(myOtherLambda, { + payload: sfn.TaskInput.fromDataAt('$.Payload'), + }), +}); +``` + +The following snippet invokes a Lambda and sets the task output to only include +the Lambda function response. + +```ts +new sfn.Task(this, 'Invoke and set function response as task output', { + task: new tasks.RunLambdaTask(myLambda, { + payload: sfn.TaskInput.fromDataAt('$'), + }), + outputPath: '$.Payload', +}); +``` + +You can have Step Functions pause a task, and wait for an external process to +return a task token. Read more about the [callback pattern](https://docs.aws.amazon.com/step-functions/latest/dg/callback-task-sample-sqs.html#call-back-lambda-example) + +To use the callback pattern, set the `token` property on the task. Call the Step +Functions `SendTaskSuccess` or `SendTaskFailure` APIs with the token to +indicate that the task has completed and the state machine should resume execution. + +The following snippet invokes a Lambda with the task token as part of the input +to the Lambda. + +```ts + const task = new sfn.Task(stack, 'Invoke with callback', { task: new tasks.RunLambdaTask(myLambda, { integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, payload: { - token: sfn.Context.taskToken + token: sfn.Context.taskToken, + input: sfn.TaskInput.fromDataAt('$.someField'), } }) }); ``` +⚠️ The task will pause until it receives that task token back with a `SendTaskSuccess` or `SendTaskFailure` +call. Learn more about [Callback with the Task +Token](https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-wait-token). + #### Glue Job example ```ts