diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/invoke-function.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/invoke-function.ts index a57d0e1580409..f7e173dd344e9 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/invoke-function.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/invoke-function.ts @@ -2,6 +2,23 @@ import iam = require('@aws-cdk/aws-iam'); import lambda = require('@aws-cdk/aws-lambda'); import sfn = require('@aws-cdk/aws-stepfunctions'); +/** + * Properties for InvokeFunction + */ +export interface InvokeFunctionProps { + /** + * The JSON that you want to provide to your Lambda function as input. + */ + readonly payload?: { [key: string]: string }; + + /** + * Whether to pause the workflow until a task token is returned + * + * @default false + */ + readonly waitForTaskToken?: boolean; +} + /** * A StepFunctions Task to invoke a Lambda function. * @@ -9,12 +26,18 @@ import sfn = require('@aws-cdk/aws-stepfunctions'); * integration with other AWS services via a specific class instance. */ export class InvokeFunction implements sfn.IStepFunctionsTask { - constructor(private readonly lambdaFunction: lambda.IFunction) { + + private readonly waitForTaskToken: boolean; + + constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: InvokeFunctionProps) { + this.waitForTaskToken = props.waitForTaskToken === true; } public bind(_task: sfn.Task): sfn.StepFunctionsTaskProperties { return { - resourceArn: this.lambdaFunction.functionArn, + resourceArn: this.waitForTaskToken + ? 'arn:aws:states:::lambda:invoke.waitForTaskToken' + : this.lambdaFunction.functionArn, policyStatements: [new iam.PolicyStatement() .addResource(this.lambdaFunction.functionArn) .addActions("lambda:InvokeFunction") @@ -22,6 +45,10 @@ export class InvokeFunction implements sfn.IStepFunctionsTask { metricPrefixSingular: 'LambdaFunction', metricPrefixPlural: 'LambdaFunctions', metricDimensions: { LambdaFunctionArn: this.lambdaFunction.functionArn }, + parameters: { + FunctionName: this.lambdaFunction.functionName, + ...this.props.payload && { Payload: this.props.payload }, + } }; } } \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts index 6bcda1589857f..ec2fd3ef36e75 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts @@ -38,6 +38,13 @@ export interface PublishToTopicProps { * Message subject */ readonly subject?: string; + + /** + * Whether to pause the workflow until a task token is returned + * + * @default false + */ + readonly waitForTaskToken?: boolean; } /** @@ -47,7 +54,12 @@ export interface PublishToTopicProps { * integration with other AWS services via a specific class instance. */ export class PublishToTopic implements sfn.IStepFunctionsTask { + + private readonly waitForTaskToken: boolean; + constructor(private readonly topic: sns.ITopic, private readonly props: PublishToTopicProps) { + this.waitForTaskToken = props.waitForTaskToken === true; + if ((props.message === undefined) === (props.messageObject === undefined)) { throw new Error(`Supply exactly one of 'message' or 'messageObject'`); } @@ -55,7 +67,7 @@ export class PublishToTopic implements sfn.IStepFunctionsTask { public bind(task: sfn.Task): sfn.StepFunctionsTaskProperties { return { - resourceArn: 'arn:aws:states:::sns:publish', + resourceArn: 'arn:aws:states:::sns:publish' + (this.waitForTaskToken ? '.waitForTaskToken' : ''), policyStatements: [new iam.PolicyStatement() .addAction('sns:Publish') .addResource(this.topic.topicArn) @@ -63,8 +75,8 @@ export class PublishToTopic implements sfn.IStepFunctionsTask { parameters: { TopicArn: this.topic.topicArn, ...(this.props.messageObject - ? { Message: new cdk.Token(() => task.node.stringifyJson(this.props.messageObject)) } - : renderString('Message', this.props.message)), + ? { Message: new cdk.Token(() => task.node.stringifyJson(this.props.messageObject)) } + : renderString('Message', this.props.message)), MessageStructure: this.props.messagePerSubscriptionType ? "json" : undefined, ...renderString('Subject', this.props.subject), } diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts index 820af3e1e83cf..22237069465c2 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts @@ -38,6 +38,13 @@ export interface SendToQueueProps { * @default No group ID */ readonly messageGroupId?: string; + + /** + * Whether to pause the workflow until a task token is returned + * + * @default false + */ + readonly waitForTaskToken?: boolean; } /** @@ -47,12 +54,16 @@ export interface SendToQueueProps { * integration with other AWS services via a specific class instance. */ export class SendToQueue implements sfn.IStepFunctionsTask { + + private readonly waitForTaskToken: boolean; + constructor(private readonly queue: sqs.IQueue, private readonly props: SendToQueueProps) { + this.waitForTaskToken = props.waitForTaskToken === true; } public bind(_task: sfn.Task): sfn.StepFunctionsTaskProperties { return { - resourceArn: 'arn:aws:states:::sqs:sendMessage', + resourceArn: 'arn:aws:states:::sqs:sendMessage' + (this.waitForTaskToken ? '.waitForTaskToken' : ''), policyStatements: [new iam.PolicyStatement() .addAction('sqs:SendMessage') .addResource(this.queue.queueArn) diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/invoke-function.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/invoke-function.test.ts index e0fab6553ac17..1d506caed19a2 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/invoke-function.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/invoke-function.test.ts @@ -14,7 +14,9 @@ test('Lambda function can be used in a Task', () => { handler: 'index.hello', runtime: lambda.Runtime.Python27, }); - const task = new sfn.Task(stack, 'Task', { task: new tasks.InvokeFunction(fn) }); + const task = new sfn.Task(stack, 'Task', { + task: new tasks.InvokeFunction(fn, { waitForTaskToken: false }) + }); new sfn.StateMachine(stack, 'SM', { definition: task }); @@ -22,11 +24,23 @@ test('Lambda function can be used in a Task', () => { // THEN expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', { DefinitionString: { - "Fn::Join": ["", [ - "{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Type\":\"Task\",\"Resource\":\"", - { "Fn::GetAtt": ["Fn9270CBC0", "Arn"] }, - "\"}}}" - ]] + "Fn::Join": [ + "", + [ + "{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"FunctionName\":\"", + { + Ref: "Fn9270CBC0" + }, + "\"},\"Type\":\"Task\",\"Resource\":\"", + { + "Fn::GetAtt": [ + "Fn9270CBC0", + "Arn" + ] + }, + "\"}}}" + ] + ] }, }); }); \ No newline at end of file