diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/invoke-function.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/invoke-function.ts index f7e173dd344e9..7d3573548ea58 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/invoke-function.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/invoke-function.ts @@ -1,6 +1,7 @@ import iam = require('@aws-cdk/aws-iam'); import lambda = require('@aws-cdk/aws-lambda'); import sfn = require('@aws-cdk/aws-stepfunctions'); +import { FieldUtils } from '../../aws-stepfunctions/lib/fields'; /** * Properties for InvokeFunction @@ -9,7 +10,7 @@ export interface InvokeFunctionProps { /** * The JSON that you want to provide to your Lambda function as input. */ - readonly payload?: { [key: string]: string }; + readonly payload?: { [key: string]: any }; /** * Whether to pause the workflow until a task token is returned @@ -17,6 +18,14 @@ export interface InvokeFunctionProps { * @default false */ readonly waitForTaskToken?: boolean; + + /** + * Whether to invoke lambda via integrated service ARN "arn:aws:states:::lambda:invoke" + * or via Function ARN. + * + * @default false + */ + readonly invokeAsIntegratedService?: boolean; } /** @@ -28,16 +37,28 @@ export interface InvokeFunctionProps { export class InvokeFunction implements sfn.IStepFunctionsTask { private readonly waitForTaskToken: boolean; + private readonly invokeAsIntegratedService: boolean; - constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: InvokeFunctionProps) { + constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: InvokeFunctionProps = {}) { this.waitForTaskToken = props.waitForTaskToken === true; + + // Invoke function as integrated service if flag is in props, or if waitForTaskToken property is true + this.invokeAsIntegratedService = props.invokeAsIntegratedService === true || this.waitForTaskToken; + + if (this.waitForTaskToken && !FieldUtils.containsTaskToken(props.payload)) { + throw new Error('Task Token is missing in payload'); + } } public bind(_task: sfn.Task): sfn.StepFunctionsTaskProperties { + const resourceArn = this.invokeAsIntegratedService + ? 'arn:aws:states:::lambda:invoke' + this.waitForTaskToken ? '.waitForTaskToken' : '' + : this.lambdaFunction.functionArn; + + const includeParameters = this.invokeAsIntegratedService || this.props.payload; + return { - resourceArn: this.waitForTaskToken - ? 'arn:aws:states:::lambda:invoke.waitForTaskToken' - : this.lambdaFunction.functionArn, + resourceArn, policyStatements: [new iam.PolicyStatement() .addResource(this.lambdaFunction.functionArn) .addActions("lambda:InvokeFunction") @@ -45,9 +66,11 @@ export class InvokeFunction implements sfn.IStepFunctionsTask { metricPrefixSingular: 'LambdaFunction', metricPrefixPlural: 'LambdaFunctions', metricDimensions: { LambdaFunctionArn: this.lambdaFunction.functionArn }, - parameters: { - FunctionName: this.lambdaFunction.functionName, - ...this.props.payload && { Payload: this.props.payload }, + ...includeParameters && { + parameters: { + ...this.invokeAsIntegratedService && { FunctionName: this.lambdaFunction.functionName }, + ...this.props.payload && { Payload: this.props.payload }, + } } }; } diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.expected.json b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.expected.json new file mode 100644 index 0000000000000..70c5732cbba7e --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.expected.json @@ -0,0 +1,317 @@ +{ + "Resources": { + "HandlerServiceRoleFCDC14AE": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": { + "Fn::Join": [ + "", + [ + "lambda.", + { + "Ref": "AWS::URLSuffix" + } + ] + ] + } + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ] + ] + } + ] + } + }, + "Handler886CB40B": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "S3Bucket": { + "Ref": "HandlerCodeS3Bucket8DD11ED9" + }, + "S3Key": { + "Fn::Join": [ + "", + [ + { + "Fn::Select": [ + 0, + { + "Fn::Split": [ + "||", + { + "Ref": "HandlerCodeS3VersionKey0BB5191E" + } + ] + } + ] + }, + { + "Fn::Select": [ + 1, + { + "Fn::Split": [ + "||", + { + "Ref": "HandlerCodeS3VersionKey0BB5191E" + } + ] + } + ] + } + ] + ] + } + }, + "Handler": "index.main", + "Role": { + "Fn::GetAtt": [ + "HandlerServiceRoleFCDC14AE", + "Arn" + ] + }, + "Runtime": "python3.6" + }, + "DependsOn": [ + "HandlerServiceRoleFCDC14AE" + ] + }, + "CallbackHandlerServiceRole3689695E": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": { + "Fn::Join": [ + "", + [ + "lambda.", + { + "Ref": "AWS::URLSuffix" + } + ] + ] + } + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ] + ] + } + ] + } + }, + "CallbackHandler4434C38D": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "S3Bucket": { + "Ref": "CallbackHandlerCodeS3Bucket806D7490" + }, + "S3Key": { + "Fn::Join": [ + "", + [ + { + "Fn::Select": [ + 0, + { + "Fn::Split": [ + "||", + { + "Ref": "CallbackHandlerCodeS3VersionKeyDD40A461" + } + ] + } + ] + }, + { + "Fn::Select": [ + 1, + { + "Fn::Split": [ + "||", + { + "Ref": "CallbackHandlerCodeS3VersionKeyDD40A461" + } + ] + } + ] + } + ] + ] + } + }, + "Handler": "index.main", + "Role": { + "Fn::GetAtt": [ + "CallbackHandlerServiceRole3689695E", + "Arn" + ] + }, + "Runtime": "python3.6" + }, + "DependsOn": [ + "CallbackHandlerServiceRole3689695E" + ] + }, + "StateMachineRoleB840431D": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": { + "Fn::Join": [ + "", + [ + "states.", + { + "Ref": "AWS::Region" + }, + ".amazonaws.com" + ] + ] + } + } + } + ], + "Version": "2012-10-17" + } + } + }, + "StateMachineRoleDefaultPolicyDF1E6607": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": "lambda:InvokeFunction", + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "Handler886CB40B", + "Arn" + ] + } + }, + { + "Action": "lambda:InvokeFunction", + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "CallbackHandler4434C38D", + "Arn" + ] + } + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "StateMachineRoleDefaultPolicyDF1E6607", + "Roles": [ + { + "Ref": "StateMachineRoleB840431D" + } + ] + } + }, + "StateMachine2E01A3A5": { + "Type": "AWS::StepFunctions::StateMachine", + "Properties": { + "DefinitionString": { + "Fn::Join": [ + "", + [ + "{\"StartAt\":\"Invoke Handler\",\"States\":{\"Invoke Handler\":{\"Next\":\"Invoke Handler with task token\",\"Type\":\"Task\",\"Resource\":\"", + { + "Fn::GetAtt": [ + "Handler886CB40B", + "Arn" + ] + }, + "\"},\"Invoke Handler with task token\":{\"Next\":\"Job Complete?\",\"InputPath\":\"$.guid\",\"Type\":\"Task\",\"Resource\":\"", + { + "Fn::GetAtt": [ + "CallbackHandler4434C38D", + "Arn" + ] + }, + "\",\"ResultPath\":\"$.status\"},\"Job Complete?\":{\"Type\":\"Choice\",\"Choices\":[{\"Variable\":\"$.status\",\"StringEquals\":\"FAILED\",\"Next\":\"Job Failed\"},{\"Variable\":\"$.status\",\"StringEquals\":\"SUCCEEDED\",\"Next\":\"Final step\"}]},\"Job Failed\":{\"Type\":\"Fail\",\"Error\":\"DescribeJob returned FAILED\",\"Cause\":\"AWS Batch Job Failed\"},\"Final step\":{\"Type\":\"Pass\",\"End\":true}},\"TimeoutSeconds\":30}" + ] + ] + }, + "RoleArn": { + "Fn::GetAtt": [ + "StateMachineRoleB840431D", + "Arn" + ] + } + } + } + }, + "Parameters": { + "HandlerCodeS3Bucket8DD11ED9": { + "Type": "String", + "Description": "S3 bucket for asset \"aws-stepfunctions-integ/Handler/Code\"" + }, + "HandlerCodeS3VersionKey0BB5191E": { + "Type": "String", + "Description": "S3 key for asset version \"aws-stepfunctions-integ/Handler/Code\"" + }, + "HandlerCodeArtifactHashD7814EF8": { + "Type": "String", + "Description": "Artifact hash for asset \"aws-stepfunctions-integ/Handler/Code\"" + }, + "CallbackHandlerCodeS3Bucket806D7490": { + "Type": "String", + "Description": "S3 bucket for asset \"aws-stepfunctions-integ/CallbackHandler/Code\"" + }, + "CallbackHandlerCodeS3VersionKeyDD40A461": { + "Type": "String", + "Description": "S3 key for asset version \"aws-stepfunctions-integ/CallbackHandler/Code\"" + }, + "CallbackHandlerCodeArtifactHash2D279BFF": { + "Type": "String", + "Description": "Artifact hash for asset \"aws-stepfunctions-integ/CallbackHandler/Code\"" + } + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.ts new file mode 100644 index 0000000000000..014b15def3ec1 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.ts @@ -0,0 +1,57 @@ +import sfn = require('@aws-cdk/aws-stepfunctions'); +import cdk = require('@aws-cdk/cdk'); +import path = require('path'); +import { Code, Function, Runtime } from '../../aws-lambda/lib'; +import tasks = require('../lib'); + +const app = new cdk.App(); +const stack = new cdk.Stack(app, 'aws-stepfunctions-integ'); + +const handler = new Function(stack, 'Handler', { + code: Code.asset(path.join(__dirname, 'my-lambda-handler')), + handler: 'index.main', + runtime: Runtime.Python36 +}); + +const submitJob = new sfn.Task(stack, 'Invoke Handler', { + task: new tasks.InvokeFunction(handler), +}); + +const callbackHandler = new Function(stack, 'CallbackHandler', { + code: Code.asset(path.join(__dirname, 'my-lambda-handler')), + handler: 'index.main', + runtime: Runtime.Python36 +}); + +const taskTokenHandler = new sfn.Task(stack, 'Invoke Handler with task token', { + task: new tasks.InvokeFunction(callbackHandler, { + waitForTaskToken: true, + payload: { + "token.$": "$$.Task.Token" + } + }), + inputPath: '$.guid', + resultPath: '$.status', +}); + +const isComplete = new sfn.Choice(stack, 'Job Complete?'); +const jobFailed = new sfn.Fail(stack, 'Job Failed', { + cause: 'AWS Batch Job Failed', + error: 'DescribeJob returned FAILED', +}); +const finalStatus = new sfn.Pass(stack, 'Final step'); + +const chain = sfn.Chain + .start(submitJob) + .next(taskTokenHandler) + .next(isComplete + .when(sfn.Condition.stringEquals('$.status', 'FAILED'), jobFailed) + .when(sfn.Condition.stringEquals('$.status', 'SUCCEEDED'), finalStatus) + ); + +new sfn.StateMachine(stack, 'StateMachine', { + definition: chain, + timeoutSec: 30 +}); + +app.run(); \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/invoke-function.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/invoke-function.test.ts index 1d506caed19a2..1322af017ed45 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/invoke-function.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/invoke-function.test.ts @@ -8,6 +8,33 @@ test('Lambda function can be used in a Task', () => { // GIVEN const stack = new Stack(); + // WHEN + const fn = new lambda.Function(stack, 'Fn', { + code: lambda.Code.inline('hello'), + handler: 'index.hello', + runtime: lambda.Runtime.Python27, + }); + const task = new sfn.Task(stack, 'Task', { task: new tasks.InvokeFunction(fn) }); + new sfn.StateMachine(stack, 'SM', { + definition: task + }); + + // THEN + expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', { + DefinitionString: { + "Fn::Join": ["", [ + "{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Type\":\"Task\",\"Resource\":\"", + { "Fn::GetAtt": ["Fn9270CBC0", "Arn"] }, + "\"}}}" + ]] + }, + }); +}); + +test('Lambda function can be used in a Task with Task Token', () => { + // GIVEN + const stack = new Stack(); + // WHEN const fn = new lambda.Function(stack, 'Fn', { code: lambda.Code.inline('hello'), @@ -15,7 +42,41 @@ test('Lambda function can be used in a Task', () => { runtime: lambda.Runtime.Python27, }); const task = new sfn.Task(stack, 'Task', { - task: new tasks.InvokeFunction(fn, { waitForTaskToken: false }) + task: new tasks.InvokeFunction(fn, { + waitForTaskToken: true, + payload: { + "token.$": "$$.Task.Token" + } + }) + }); + new sfn.StateMachine(stack, 'SM', { + definition: task + }); + + // THEN + expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', { + DefinitionString: { + "Fn::Join": ["", [ + "{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Type\":\"Task\",\"Resource\":\"", + { "Fn::GetAtt": ["Fn9270CBC0", "Arn"] }, + "\"}}}" + ]] + }, + }); +}); + +test('Lambda function can be used in a Task with integrated service ARN', () => { + // GIVEN + const stack = new Stack(); + + // WHEN + const fn = new lambda.Function(stack, 'Fn', { + code: lambda.Code.inline('hello'), + handler: 'index.hello', + runtime: lambda.Runtime.Python27, + }); + const task = new sfn.Task(stack, 'Task', { + task: new tasks.InvokeFunction(fn, { invokeAsIntegratedService: true }) }); new sfn.StateMachine(stack, 'SM', { definition: task @@ -31,16 +92,44 @@ test('Lambda function can be used in a Task', () => { { Ref: "Fn9270CBC0" }, - "\"},\"Type\":\"Task\",\"Resource\":\"", - { - "Fn::GetAtt": [ - "Fn9270CBC0", - "Arn" - ] - }, - "\"}}}" + "\"},\"Type\":\"Task\",\"Resource\":\".waitForTaskToken\"}}}" ] ] }, }); +}); + +test('Lambda function can be used in a Task with integrated service ARN and with Task Token', () => { + // GIVEN + const stack = new Stack(); + + // WHEN + const fn = new lambda.Function(stack, 'Fn', { + code: lambda.Code.inline('hello'), + handler: 'index.hello', + runtime: lambda.Runtime.Python27, + }); + const task = new sfn.Task(stack, 'Task', { + task: new tasks.InvokeFunction(fn, { + invokeAsIntegratedService: true, + waitForTaskToken: true, + payload: { + "token.$": "$$.Task.Token" + } + }) + }); + new sfn.StateMachine(stack, 'SM', { + definition: task + }); + + // THEN + expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', { + DefinitionString: { + "Fn::Join": ["", [ + "{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Type\":\"Task\",\"Resource\":\"", + { "Fn::GetAtt": ["Fn9270CBC0", "Arn"] }, + "\"}}}" + ]] + }, + }); }); \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/my-lambda-handler/index.py b/packages/@aws-cdk/aws-stepfunctions-tasks/test/my-lambda-handler/index.py new file mode 100644 index 0000000000000..179dcbbb27423 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/my-lambda-handler/index.py @@ -0,0 +1,4 @@ +def main(event, context): + return { + 'message': 'Hello, world!' + } \ No newline at end of file