From a75dc024c583c976df32c56c7d12a0eb00104f18 Mon Sep 17 00:00:00 2001 From: Wenqian Wang Date: Wed, 19 Jun 2019 14:53:25 -0700 Subject: [PATCH 1/3] fix(aws-stepfunctions, aws-stepfunctions-tasks): missing suffix in field names of reference paths * add suffix ".$" for the fields whose values are reference paths, including json path and context object * fix unit tests and integration test for lambda task fixes #2937 --- .../test/integ.invoke-function.expected.json | 2 +- .../test/invoke-function.test.ts | 41 +---------- .../test/run-lambda-task.test.ts | 73 +++++++++++++++++++ .../aws-stepfunctions/lib/json-path.ts | 21 +++++- .../aws-stepfunctions/lib/state-graph.ts | 3 +- .../aws-stepfunctions/test/test.fields.ts | 8 +- 6 files changed, 104 insertions(+), 44 deletions(-) create mode 100644 packages/@aws-cdk/aws-stepfunctions-tasks/test/run-lambda-task.test.ts diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.expected.json b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.expected.json index 18f710ef22d77..16f8c78de01ff 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.expected.json +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.expected.json @@ -272,7 +272,7 @@ { "Ref": "CallbackHandler4434C38D" }, - "\",\"Payload\":{\"token\":\"$$.Task.Token\"}},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke.waitForTaskToken\",\"ResultPath\":\"$.status\"},\"Job Complete?\":{\"Type\":\"Choice\",\"Choices\":[{\"Variable\":\"$.status\",\"StringEquals\":\"FAILED\",\"Next\":\"Job Failed\"},{\"Variable\":\"$.status\",\"StringEquals\":\"SUCCEEDED\",\"Next\":\"Final step\"}]},\"Job Failed\":{\"Type\":\"Fail\",\"Error\":\"DescribeJob returned FAILED\",\"Cause\":\"AWS Batch Job Failed\"},\"Final step\":{\"Type\":\"Pass\",\"End\":true}},\"TimeoutSeconds\":30}" + "\",\"Payload\":{\"token.$\":\"$$.Task.Token\"}},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke.waitForTaskToken\",\"ResultPath\":\"$.status\"},\"Job Complete?\":{\"Type\":\"Choice\",\"Choices\":[{\"Variable\":\"$.status\",\"StringEquals\":\"FAILED\",\"Next\":\"Job Failed\"},{\"Variable\":\"$.status\",\"StringEquals\":\"SUCCEEDED\",\"Next\":\"Final step\"}]},\"Job Failed\":{\"Type\":\"Fail\",\"Error\":\"DescribeJob returned FAILED\",\"Cause\":\"AWS Batch Job Failed\"},\"Final step\":{\"Type\":\"Pass\",\"End\":true}},\"TimeoutSeconds\":30}" ] ] }, diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/invoke-function.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/invoke-function.test.ts index 611f30d627815..d9b3c8664af1d 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/invoke-function.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/invoke-function.test.ts @@ -15,7 +15,7 @@ beforeEach(() => { }); }); -test('Lambda function can be used in a Task', () => { +test('Invoke lambda with function ARN', () => { // WHEN const task = new sfn.Task(stack, 'Task', { task: new tasks.InvokeFunction(fn) }); new sfn.StateMachine(stack, 'SM', { @@ -39,7 +39,7 @@ test('Lambda function payload ends up in Parameters', () => { definition: new sfn.Task(stack, 'Task', { task: new tasks.InvokeFunction(fn, { payload: { - foo: 'bar' + foo: sfn.Data.stringAt('$.bar') } }) }) @@ -48,45 +48,10 @@ test('Lambda function payload ends up in Parameters', () => { expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', { DefinitionString: { "Fn::Join": ["", [ - "{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"foo\":\"bar\"},\"Type\":\"Task\",\"Resource\":\"", + "{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"foo.$\":\"$.bar\"},\"Type\":\"Task\",\"Resource\":\"", { "Fn::GetAtt": ["Fn9270CBC0", "Arn"] }, "\"}}}" ]] }, }); }); - -test('Lambda function can be used in a Task with Task Token', () => { - const task = new sfn.Task(stack, 'Task', { - task: new tasks.RunLambdaTask(fn, { - waitForTaskToken: true, - payload: { - token: sfn.Context.taskToken - } - }) - }); - new sfn.StateMachine(stack, 'SM', { - definition: task - }); - - // THEN - expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', { - DefinitionString: { - "Fn::Join": ["", [ - "{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"FunctionName\":\"", - { Ref: "Fn9270CBC0" }, - "\",\"Payload\":{\"token\":\"$$.Task.Token\"}},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke.waitForTaskToken\"}}}" - ]] - }, - }); -}); - -test('Task throws if waitForTaskToken is supplied but task token is not included', () => { - expect(() => { - new sfn.Task(stack, 'Task', { - task: new tasks.RunLambdaTask(fn, { - waitForTaskToken: true - }) - }); - }).toThrow(/Task Token is missing in payload/i); -}); \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-lambda-task.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-lambda-task.test.ts new file mode 100644 index 0000000000000..545158b911768 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-lambda-task.test.ts @@ -0,0 +1,73 @@ +import '@aws-cdk/assert/jest'; +import lambda = require('@aws-cdk/aws-lambda'); +import sfn = require('@aws-cdk/aws-stepfunctions'); +import { Stack } from '@aws-cdk/cdk'; +import tasks = require('../lib'); + +let stack: Stack; +let fn: lambda.Function; +beforeEach(() => { + stack = new Stack(); + fn = new lambda.Function(stack, 'Fn', { + code: lambda.Code.inline('hello'), + handler: 'index.hello', + runtime: lambda.Runtime.Python27, + }); +}); + +test('Invoke lambda with default magic ARN', () => { + const task = new sfn.Task(stack, 'Task', { + task: new tasks.RunLambdaTask(fn, { + payload: { + foo: 'bar' + } + }) + }); + new sfn.StateMachine(stack, 'SM', { + definition: task + }); + + expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', { + DefinitionString: { + "Fn::Join": ["", [ + "{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"FunctionName\":\"", + { Ref: "Fn9270CBC0" }, + "\",\"Payload\":{\"foo\":\"bar\"}},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke\"}}}" + ]] + }, + }); +}); + +test('Lambda function can be used in a Task with Task Token', () => { + const task = new sfn.Task(stack, 'Task', { + task: new tasks.RunLambdaTask(fn, { + waitForTaskToken: true, + payload: { + token: sfn.Context.taskToken + } + }) + }); + new sfn.StateMachine(stack, 'SM', { + definition: task + }); + + expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', { + DefinitionString: { + "Fn::Join": ["", [ + "{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"FunctionName\":\"", + { Ref: "Fn9270CBC0" }, + "\",\"Payload\":{\"token.$\":\"$$.Task.Token\"}},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke.waitForTaskToken\"}}}" + ]] + }, + }); +}); + +test('Task throws if waitForTaskToken is supplied but task token is not included', () => { + expect(() => { + new sfn.Task(stack, 'Task', { + task: new tasks.RunLambdaTask(fn, { + waitForTaskToken: true + }) + }); + }).toThrow(/Task Token is missing in payload/i); +}); \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/json-path.ts b/packages/@aws-cdk/aws-stepfunctions/lib/json-path.ts index ab31ef372c37c..e9f07fa2f7a4a 100644 --- a/packages/@aws-cdk/aws-stepfunctions/lib/json-path.ts +++ b/packages/@aws-cdk/aws-stepfunctions/lib/json-path.ts @@ -36,7 +36,8 @@ export function renderObject(obj: object | undefined): object | undefined { return recurseObject(obj, { handleString: renderString, handleList: renderStringList, - handleNumber: renderNumber + handleNumber: renderNumber, + handleBoolean: renderBoolean, }); } @@ -63,6 +64,10 @@ export function findReferencedPaths(obj: object | undefined): Set { const path = jsonPathNumber(x); if (path !== undefined) { found.add(path); } return {}; + }, + + handleBoolean(_key: string, _x: boolean) { + return {}; } }); @@ -73,6 +78,7 @@ interface FieldHandlers { handleString(key: string, x: string): {[key: string]: string}; handleList(key: string, x: string[]): {[key: string]: string[] | string }; handleNumber(key: string, x: number): {[key: string]: number | string}; + handleBoolean(key: string, x: boolean): {[key: string]: boolean}; } export function recurseObject(obj: object | undefined, handlers: FieldHandlers): object | undefined { @@ -86,6 +92,8 @@ export function recurseObject(obj: object | undefined, handlers: FieldHandlers): Object.assign(ret, handlers.handleNumber(key, value)); } else if (Array.isArray(value)) { Object.assign(ret, recurseArray(key, value, handlers)); + } else if (typeof value === 'boolean') { + Object.assign(ret, handlers.handleBoolean(key, value)); } else if (value === null || value === undefined) { // Nothing } else if (typeof value === 'object') { @@ -144,7 +152,7 @@ function renderString(key: string, value: string): {[key: string]: string} { } /** - * Render a parameter string + * Render a parameter string list * * If the string value starts with '$.', render it as a path string, otherwise as a direct string. */ @@ -158,7 +166,7 @@ function renderStringList(key: string, value: string[]): {[key: string]: string[ } /** - * Render a parameter string + * Render a parameter number * * If the string value starts with '$.', render it as a path string, otherwise as a direct string. */ @@ -171,6 +179,13 @@ function renderNumber(key: string, value: number): {[key: string]: number | stri } } +/** + * Render a parameter boolean + */ +function renderBoolean(key: string, value: boolean): {[key: string]: boolean} { + return { [key]: value }; +} + /** * If the indicated string is an encoded JSON path, return the path * diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/state-graph.ts b/packages/@aws-cdk/aws-stepfunctions/lib/state-graph.ts index 50d368493ce63..e7b382f7dbfec 100644 --- a/packages/@aws-cdk/aws-stepfunctions/lib/state-graph.ts +++ b/packages/@aws-cdk/aws-stepfunctions/lib/state-graph.ts @@ -1,4 +1,5 @@ import iam = require('@aws-cdk/aws-iam'); +import { FieldUtils } from "./fields"; import { State } from "./states/state"; /** @@ -101,7 +102,7 @@ export class StateGraph { public toGraphJson(): object { const states: any = {}; for (const state of this.allStates) { - states[state.stateId] = state.toStateJson(); + states[state.stateId] = FieldUtils.renderObject(state.toStateJson()); } return { diff --git a/packages/@aws-cdk/aws-stepfunctions/test/test.fields.ts b/packages/@aws-cdk/aws-stepfunctions/test/test.fields.ts index 737ad04d43ec3..575b0d33d72e7 100644 --- a/packages/@aws-cdk/aws-stepfunctions/test/test.fields.ts +++ b/packages/@aws-cdk/aws-stepfunctions/test/test.fields.ts @@ -4,6 +4,8 @@ import { Context, Data, FieldUtils } from "../lib"; export = { 'deep replace correctly handles fields in arrays'(test: Test) { test.deepEqual(FieldUtils.renderObject({ + unknown: undefined, + bool: true, literal: 'literal', field: Data.stringAt('$.stringField'), listField: Data.listAt('$.listField'), @@ -14,6 +16,7 @@ export = { } ] }), { + 'bool': true, 'literal': 'literal', 'field.$': '$.stringField', 'listField.$': '$.listField', @@ -33,10 +36,12 @@ export = { str: Context.stringAt('$$.Execution.StartTime'), count: Context.numberAt('$$.State.RetryCount'), token: Context.taskToken, + entire: Context.entireContext }), { 'str.$': '$$.Execution.StartTime', 'count.$': '$$.State.RetryCount', - 'token.$': '$$.Task.Token' + 'token.$': '$$.Task.Token', + 'entire.$': '$$' }); test.done(); @@ -44,6 +49,7 @@ export = { 'find all referenced paths'(test: Test) { test.deepEqual(FieldUtils.findReferencedPaths({ + bool: false, literal: 'literal', field: Data.stringAt('$.stringField'), listField: Data.listAt('$.listField'), From dcf972b6cc4318a62677b84288e6b24df05509fb Mon Sep 17 00:00:00 2001 From: Rico Huijbers Date: Thu, 20 Jun 2019 11:05:25 +0200 Subject: [PATCH 2/3] Move renderObject() to consuming class so it can't be forgotten --- .../lib/publish-to-topic.ts | 8 +++---- .../lib/run-ecs-task-base.ts | 4 ++-- .../lib/sagemaker-train-task.ts | 2 +- .../lib/sagemaker-transform-task.ts | 2 +- .../lib/send-to-queue.ts | 10 ++++----- .../test/publish-to-topic.test.ts | 22 +++++++++++++++++++ .../aws-stepfunctions/lib/state-graph.ts | 3 +-- .../aws-stepfunctions/lib/states/task.ts | 3 ++- 8 files changed, 36 insertions(+), 18 deletions(-) diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts index 3c222772e0a5b..011fe6d1c8e4e 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts @@ -63,11 +63,9 @@ export class PublishToTopic implements sfn.IStepFunctionsTask { })], parameters: { TopicArn: this.topic.topicArn, - ...sfn.FieldUtils.renderObject({ - Message: this.props.message.value, - MessageStructure: this.props.messagePerSubscriptionType ? "json" : undefined, - Subject: this.props.subject, - }) + Message: this.props.message.value, + MessageStructure: this.props.messagePerSubscriptionType ? "json" : undefined, + Subject: this.props.subject, } }; } diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-ecs-task-base.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-ecs-task-base.ts index b51a08e08ff31..7e055275b6d08 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-ecs-task-base.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-ecs-task-base.ts @@ -166,7 +166,7 @@ function renderOverrides(containerOverrides?: ContainerOverride[]) { const ret = new Array(); for (const override of containerOverrides) { - ret.push(sfn.FieldUtils.renderObject({ + ret.push({ Name: override.containerName, Command: override.command, Cpu: override.cpu, @@ -176,7 +176,7 @@ function renderOverrides(containerOverrides?: ContainerOverride[]) { Name: e.name, Value: e.value, })) - })); + }); } return { ContainerOverrides: ret }; diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-train-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-train-task.ts index 6059174c5eb75..a6b7f7d75b9ae 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-train-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-train-task.ts @@ -151,7 +151,7 @@ export class SagemakerTrainTask implements ec2.IConnectable, sfn.IStepFunctionsT public bind(task: sfn.Task): sfn.StepFunctionsTaskConfig { return { resourceArn: 'arn:aws:states:::sagemaker:createTrainingJob' + (this.props.synchronous ? '.sync' : ''), - parameters: sfn.FieldUtils.renderObject(this.renderParameters()), + parameters: this.renderParameters(), policyStatements: this.makePolicyStatements(task), }; } diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-transform-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-transform-task.ts index 9d485f5bfadcb..94f87e75f9342 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-transform-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-transform-task.ts @@ -125,7 +125,7 @@ export class SagemakerTransformTask implements sfn.IStepFunctionsTask { public bind(task: sfn.Task): sfn.StepFunctionsTaskConfig { return { resourceArn: 'arn:aws:states:::sagemaker:createTransformJob' + (this.props.synchronous ? '.sync' : ''), - parameters: sfn.FieldUtils.renderObject(this.renderParameters()), + parameters: this.renderParameters(), policyStatements: this.makePolicyStatements(task), }; } diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts index 3a202c5962e0d..eff664452bc3e 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts @@ -72,12 +72,10 @@ export class SendToQueue implements sfn.IStepFunctionsTask { })], parameters: { QueueUrl: this.queue.queueUrl, - ...sfn.FieldUtils.renderObject({ - MessageBody: this.props.messageBody.value, - DelaySeconds: this.props.delaySeconds, - MessageDeduplicationId: this.props.messageDeduplicationId, - MessageGroupId: this.props.messageGroupId, - }) + MessageBody: this.props.messageBody.value, + DelaySeconds: this.props.delaySeconds, + MessageDeduplicationId: this.props.messageDeduplicationId, + MessageGroupId: this.props.messageGroupId, } }; } diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/publish-to-topic.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/publish-to-topic.test.ts index 5dbfcd9cda4ae..76c746fbe7366 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/publish-to-topic.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/publish-to-topic.test.ts @@ -25,6 +25,28 @@ test('publish to SNS', () => { }); }); +test('publish to topic with ARN from payload', () => { + // GIVEN + const stack = new cdk.Stack(); + const topic = sns.Topic.fromTopicArn(stack, 'Topic', sfn.Data.stringAt('$.topicArn')); + + // WHEN + const pub = new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, { + message: sfn.TaskInput.fromText('Send this message') + }) }); + + // THEN + expect(stack.resolve(pub.toStateJson())).toEqual({ + Type: 'Task', + Resource: 'arn:aws:states:::sns:publish', + End: true, + Parameters: { + 'TopicArn.$': '$.topicArn', + 'Message': 'Send this message' + }, + }); +}); + test('publish JSON to SNS', () => { // GIVEN const stack = new cdk.Stack(); diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/state-graph.ts b/packages/@aws-cdk/aws-stepfunctions/lib/state-graph.ts index e7b382f7dbfec..50d368493ce63 100644 --- a/packages/@aws-cdk/aws-stepfunctions/lib/state-graph.ts +++ b/packages/@aws-cdk/aws-stepfunctions/lib/state-graph.ts @@ -1,5 +1,4 @@ import iam = require('@aws-cdk/aws-iam'); -import { FieldUtils } from "./fields"; import { State } from "./states/state"; /** @@ -102,7 +101,7 @@ export class StateGraph { public toGraphJson(): object { const states: any = {}; for (const state of this.allStates) { - states[state.stateId] = FieldUtils.renderObject(state.toStateJson()); + states[state.stateId] = state.toStateJson(); } return { diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/states/task.ts b/packages/@aws-cdk/aws-stepfunctions/lib/states/task.ts index e39fd94561b83..783d0014b946c 100644 --- a/packages/@aws-cdk/aws-stepfunctions/lib/states/task.ts +++ b/packages/@aws-cdk/aws-stepfunctions/lib/states/task.ts @@ -1,6 +1,7 @@ import cloudwatch = require('@aws-cdk/aws-cloudwatch'); import cdk = require('@aws-cdk/cdk'); import { Chain } from '../chain'; +import { FieldUtils } from '../fields'; import { StateGraph } from '../state-graph'; import { IStepFunctionsTask, StepFunctionsTaskConfig } from '../step-functions-task'; import { CatchProps, IChainable, INextable, RetryProps } from '../types'; @@ -126,7 +127,7 @@ export class Task extends State implements INextable { Type: StateType.Task, Comment: this.comment, Resource: this.taskProps.resourceArn, - Parameters: this.taskProps.parameters, + Parameters: this.taskProps.parameters && FieldUtils.renderObject(this.taskProps.parameters), ResultPath: renderJsonPath(this.resultPath), TimeoutSeconds: this.timeoutSeconds, HeartbeatSeconds: this.taskProps.heartbeatSeconds, From 125a259f9a204d3d0ef00a18317382b681309726 Mon Sep 17 00:00:00 2001 From: Wenqian Wang Date: Thu, 20 Jun 2019 20:39:21 -0700 Subject: [PATCH 3/3] feat(aws-stepfunctions-tasks) complete supporting parameters for lambda * Add Qualifier in the list of supporting parameters for lambda * Increase unit test coverage for all the services which support "waitForTaskToken" --- .../lib/run-lambda-task.ts | 8 +++ .../test/publish-to-topic.test.ts | 53 ++++++++++------ .../test/run-lambda-task.test.ts | 10 +++- .../test/send-to-queue.test.ts | 60 +++++++++++++++---- 4 files changed, 98 insertions(+), 33 deletions(-) diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts index c6071227854d2..b55405537c3d6 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts @@ -36,6 +36,13 @@ export interface RunLambdaTaskProps { * @default - No context */ readonly clientContext?: string; + + /** + * Version or alias of the function to be invoked + * + * @default - No qualifier + */ + readonly qualifier?: string; } /** @@ -75,6 +82,7 @@ export class RunLambdaTask implements sfn.IStepFunctionsTask { Payload: this.props.payload, InvocationType: this.props.invocationType, ClientContext: this.props.clientContext, + Qualifier: this.props.qualifier } }; } diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/publish-to-topic.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/publish-to-topic.test.ts index 76c746fbe7366..ca349bc0a7732 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/publish-to-topic.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/publish-to-topic.test.ts @@ -3,14 +3,14 @@ import sfn = require('@aws-cdk/aws-stepfunctions'); import cdk = require('@aws-cdk/cdk'); import tasks = require('../lib'); -test('publish to SNS', () => { +test('Publish literal message to SNS topic', () => { // GIVEN const stack = new cdk.Stack(); const topic = new sns.Topic(stack, 'Topic'); // WHEN const pub = new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, { - message: sfn.TaskInput.fromText('Send this message') + message: sfn.TaskInput.fromText('Publish this message') }) }); // THEN @@ -20,43 +20,62 @@ test('publish to SNS', () => { End: true, Parameters: { TopicArn: { Ref: 'TopicBFC7AF6E' }, - Message: 'Send this message' + Message: 'Publish this message' }, }); }); -test('publish to topic with ARN from payload', () => { +test('Publish JSON to SNS topic with task token', () => { // GIVEN const stack = new cdk.Stack(); - const topic = sns.Topic.fromTopicArn(stack, 'Topic', sfn.Data.stringAt('$.topicArn')); + const topic = new sns.Topic(stack, 'Topic'); // WHEN const pub = new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, { - message: sfn.TaskInput.fromText('Send this message') + waitForTaskToken: true, + message: sfn.TaskInput.fromObject({ + Input: 'Publish this message', + Token: sfn.Context.taskToken + }) }) }); // THEN expect(stack.resolve(pub.toStateJson())).toEqual({ Type: 'Task', - Resource: 'arn:aws:states:::sns:publish', + Resource: 'arn:aws:states:::sns:publish.waitForTaskToken', End: true, Parameters: { - 'TopicArn.$': '$.topicArn', - 'Message': 'Send this message' + TopicArn: { Ref: 'TopicBFC7AF6E' }, + Message: { + 'Input': 'Publish this message', + 'Token.$': '$$.Task.Token' + } }, }); }); -test('publish JSON to SNS', () => { +test('Task throws if waitForTaskToken is supplied but task token is not included in message', () => { + expect(() => { + // GIVEN + const stack = new cdk.Stack(); + const topic = new sns.Topic(stack, 'Topic'); + // WHEN + new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, { + waitForTaskToken: true, + message: sfn.TaskInput.fromText('Publish this message') + }) }); + // THEN + }).toThrow(/Task Token is missing in message/i); +}); + +test('Publish to topic with ARN from payload', () => { // GIVEN const stack = new cdk.Stack(); - const topic = new sns.Topic(stack, 'Topic'); + const topic = sns.Topic.fromTopicArn(stack, 'Topic', sfn.Data.stringAt('$.topicArn')); // WHEN const pub = new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, { - message: sfn.TaskInput.fromObject({ - Input: 'Send this message' - }) + message: sfn.TaskInput.fromText('Publish this message') }) }); // THEN @@ -65,10 +84,8 @@ test('publish JSON to SNS', () => { Resource: 'arn:aws:states:::sns:publish', End: true, Parameters: { - TopicArn: { Ref: 'TopicBFC7AF6E' }, - Message: { - Input: 'Send this message' - } + 'TopicArn.$': '$.topicArn', + 'Message': 'Publish this message' }, }); }); diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-lambda-task.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-lambda-task.test.ts index 545158b911768..1884e76c0b8b5 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-lambda-task.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-lambda-task.test.ts @@ -20,7 +20,10 @@ test('Invoke lambda with default magic ARN', () => { task: new tasks.RunLambdaTask(fn, { payload: { foo: 'bar' - } + }, + invocationType: tasks.InvocationType.RequestResponse, + clientContext: "eyJoZWxsbyI6IndvcmxkIn0=", + qualifier: "1", }) }); new sfn.StateMachine(stack, 'SM', { @@ -32,7 +35,8 @@ test('Invoke lambda with default magic ARN', () => { "Fn::Join": ["", [ "{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"FunctionName\":\"", { Ref: "Fn9270CBC0" }, - "\",\"Payload\":{\"foo\":\"bar\"}},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke\"}}}" + "\",\"Payload\":{\"foo\":\"bar\"},\"InvocationType\":\"RequestResponse\",\"ClientContext\":\"eyJoZWxsbyI6IndvcmxkIn0=\"," + + "\"Qualifier\":\"1\"},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke\"}}}" ]] }, }); @@ -62,7 +66,7 @@ test('Lambda function can be used in a Task with Task Token', () => { }); }); -test('Task throws if waitForTaskToken is supplied but task token is not included', () => { +test('Task throws if waitForTaskToken is supplied but task token is not included in payLoad', () => { expect(() => { new sfn.Task(stack, 'Task', { task: new tasks.RunLambdaTask(fn, { diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/send-to-queue.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/send-to-queue.test.ts index 2407e3c6a2d41..fc0e03d0aee13 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/send-to-queue.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/send-to-queue.test.ts @@ -12,15 +12,15 @@ beforeEach(() => { queue = new sqs.Queue(stack, 'Queue'); }); -test('publish to queue', () => { +test('Send message to queue', () => { // WHEN - const pub = new sfn.Task(stack, 'Send', { task: new tasks.SendToQueue(queue, { + const task = new sfn.Task(stack, 'Send', { task: new tasks.SendToQueue(queue, { messageBody: sfn.TaskInput.fromText('Send this message'), messageDeduplicationId: sfn.Data.stringAt('$.deduping'), }) }); // THEN - expect(stack.resolve(pub.toStateJson())).toEqual({ + expect(stack.resolve(task.toStateJson())).toEqual({ Type: 'Task', Resource: 'arn:aws:states:::sqs:sendMessage', End: true, @@ -32,16 +32,52 @@ test('publish to queue', () => { }); }); -test('message body can come from state', () => { +test('Send message to SQS queue with task token', () => { // WHEN - const pub = new sfn.Task(stack, 'Send', { + const task = new sfn.Task(stack, 'Send', { task: new tasks.SendToQueue(queue, { + waitForTaskToken: true, + messageBody: sfn.TaskInput.fromObject({ + Input: 'Send this message', + Token: sfn.Context.taskToken + }) + }) }); + + // THEN + expect(stack.resolve(task.toStateJson())).toEqual({ + Type: 'Task', + Resource: 'arn:aws:states:::sqs:sendMessage.waitForTaskToken', + End: true, + Parameters: { + QueueUrl: { Ref: 'Queue4A7E3555' }, + MessageBody: { + 'Input': 'Send this message', + 'Token.$': '$$.Task.Token' + } + }, + }); +}); + +test('Task throws if waitForTaskToken is supplied but task token is not included in messageBody', () => { + expect(() => { + // WHEN + new sfn.Task(stack, 'Send', { task: new tasks.SendToQueue(queue, { + waitForTaskToken: true, + messageBody: sfn.TaskInput.fromText('Send this message') + }) }); + // THEN + }).toThrow(/Task Token is missing in messageBody/i); +}); + +test('Message body can come from state', () => { + // WHEN + const task = new sfn.Task(stack, 'Send', { task: new tasks.SendToQueue(queue, { messageBody: sfn.TaskInput.fromDataAt('$.theMessage') }) }); // THEN - expect(stack.resolve(pub.toStateJson())).toEqual({ + expect(stack.resolve(task.toStateJson())).toEqual({ Type: 'Task', Resource: 'arn:aws:states:::sqs:sendMessage', End: true, @@ -52,9 +88,9 @@ test('message body can come from state', () => { }); }); -test('message body can be an object', () => { +test('Message body can be an object', () => { // WHEN - const pub = new sfn.Task(stack, 'Send', { + const task = new sfn.Task(stack, 'Send', { task: new tasks.SendToQueue(queue, { messageBody: sfn.TaskInput.fromObject({ literal: 'literal', @@ -64,7 +100,7 @@ test('message body can be an object', () => { }); // THEN - expect(stack.resolve(pub.toStateJson())).toEqual({ + expect(stack.resolve(task.toStateJson())).toEqual({ Type: 'Task', Resource: 'arn:aws:states:::sqs:sendMessage', End: true, @@ -78,9 +114,9 @@ test('message body can be an object', () => { }); }); -test('message body object can contain references', () => { +test('Message body object can contain references', () => { // WHEN - const pub = new sfn.Task(stack, 'Send', { + const task = new sfn.Task(stack, 'Send', { task: new tasks.SendToQueue(queue, { messageBody: sfn.TaskInput.fromObject({ queueArn: queue.queueArn @@ -89,7 +125,7 @@ test('message body object can contain references', () => { }); // THEN - expect(stack.resolve(pub.toStateJson())).toEqual({ + expect(stack.resolve(task.toStateJson())).toEqual({ Type: 'Task', Resource: 'arn:aws:states:::sqs:sendMessage', End: true,