Skip to content

Commit

Permalink
feat(aws-stepfunctions-tasks): allow specifying waitForTaskToken suff…
Browse files Browse the repository at this point in the history
…ix in resourceArn (aws#2658)

* Add waitForTaskToken property to SentToQueue, InvokeFunction, PublishToTopic task types
* Add payload parameter to InvokeLambda parameters
* Unit test

BREAKING CHANGE: InvokeFunction now requires props as second argument
  • Loading branch information
alberto-galimberti committed May 30, 2019
1 parent 01601c2 commit e50913e
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 12 deletions.
31 changes: 29 additions & 2 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/invoke-function.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,53 @@ import iam = require('@aws-cdk/aws-iam');
import lambda = require('@aws-cdk/aws-lambda');
import sfn = require('@aws-cdk/aws-stepfunctions');

/**
* Properties for InvokeFunction
*/
export interface InvokeFunctionProps {
/**
* The JSON that you want to provide to your Lambda function as input.
*/
readonly payload?: { [key: string]: string };

/**
* Whether to pause the workflow until a task token is returned
*
* @default false
*/
readonly waitForTaskToken?: boolean;
}

/**
* A StepFunctions Task to invoke a Lambda function.
*
* A Function can be used directly as a Resource, but this class mirrors
* integration with other AWS services via a specific class instance.
*/
export class InvokeFunction implements sfn.IStepFunctionsTask {
constructor(private readonly lambdaFunction: lambda.IFunction) {

private readonly waitForTaskToken: boolean;

constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: InvokeFunctionProps) {
this.waitForTaskToken = props.waitForTaskToken === true;
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskProperties {
return {
resourceArn: this.lambdaFunction.functionArn,
resourceArn: this.waitForTaskToken
? 'arn:aws:states:::lambda:invoke.waitForTaskToken'
: this.lambdaFunction.functionArn,
policyStatements: [new iam.PolicyStatement()
.addResource(this.lambdaFunction.functionArn)
.addActions("lambda:InvokeFunction")
],
metricPrefixSingular: 'LambdaFunction',
metricPrefixPlural: 'LambdaFunctions',
metricDimensions: { LambdaFunctionArn: this.lambdaFunction.functionArn },
parameters: {
FunctionName: this.lambdaFunction.functionName,
...this.props.payload && { Payload: this.props.payload },
}
};
}
}
18 changes: 15 additions & 3 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ export interface PublishToTopicProps {
* Message subject
*/
readonly subject?: string;

/**
* Whether to pause the workflow until a task token is returned
*
* @default false
*/
readonly waitForTaskToken?: boolean;
}

/**
Expand All @@ -47,24 +54,29 @@ export interface PublishToTopicProps {
* integration with other AWS services via a specific class instance.
*/
export class PublishToTopic implements sfn.IStepFunctionsTask {

private readonly waitForTaskToken: boolean;

constructor(private readonly topic: sns.ITopic, private readonly props: PublishToTopicProps) {
this.waitForTaskToken = props.waitForTaskToken === true;

if ((props.message === undefined) === (props.messageObject === undefined)) {
throw new Error(`Supply exactly one of 'message' or 'messageObject'`);
}
}

public bind(task: sfn.Task): sfn.StepFunctionsTaskProperties {
return {
resourceArn: 'arn:aws:states:::sns:publish',
resourceArn: 'arn:aws:states:::sns:publish' + (this.waitForTaskToken ? '.waitForTaskToken' : ''),
policyStatements: [new iam.PolicyStatement()
.addAction('sns:Publish')
.addResource(this.topic.topicArn)
],
parameters: {
TopicArn: this.topic.topicArn,
...(this.props.messageObject
? { Message: new cdk.Token(() => task.node.stringifyJson(this.props.messageObject)) }
: renderString('Message', this.props.message)),
? { Message: new cdk.Token(() => task.node.stringifyJson(this.props.messageObject)) }
: renderString('Message', this.props.message)),
MessageStructure: this.props.messagePerSubscriptionType ? "json" : undefined,
...renderString('Subject', this.props.subject),
}
Expand Down
13 changes: 12 additions & 1 deletion packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ export interface SendToQueueProps {
* @default No group ID
*/
readonly messageGroupId?: string;

/**
* Whether to pause the workflow until a task token is returned
*
* @default false
*/
readonly waitForTaskToken?: boolean;
}

/**
Expand All @@ -47,12 +54,16 @@ export interface SendToQueueProps {
* integration with other AWS services via a specific class instance.
*/
export class SendToQueue implements sfn.IStepFunctionsTask {

private readonly waitForTaskToken: boolean;

constructor(private readonly queue: sqs.IQueue, private readonly props: SendToQueueProps) {
this.waitForTaskToken = props.waitForTaskToken === true;
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskProperties {
return {
resourceArn: 'arn:aws:states:::sqs:sendMessage',
resourceArn: 'arn:aws:states:::sqs:sendMessage' + (this.waitForTaskToken ? '.waitForTaskToken' : ''),
policyStatements: [new iam.PolicyStatement()
.addAction('sqs:SendMessage')
.addResource(this.queue.queueArn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,33 @@ test('Lambda function can be used in a Task', () => {
handler: 'index.hello',
runtime: lambda.Runtime.Python27,
});
const task = new sfn.Task(stack, 'Task', { task: new tasks.InvokeFunction(fn) });
const task = new sfn.Task(stack, 'Task', {
task: new tasks.InvokeFunction(fn, { waitForTaskToken: false })
});
new sfn.StateMachine(stack, 'SM', {
definition: task
});

// THEN
expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
DefinitionString: {
"Fn::Join": ["", [
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Type\":\"Task\",\"Resource\":\"",
{ "Fn::GetAtt": ["Fn9270CBC0", "Arn"] },
"\"}}}"
]]
"Fn::Join": [
"",
[
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"FunctionName\":\"",
{
Ref: "Fn9270CBC0"
},
"\"},\"Type\":\"Task\",\"Resource\":\"",
{
"Fn::GetAtt": [
"Fn9270CBC0",
"Arn"
]
},
"\"}}}"
]
]
},
});
});

0 comments on commit e50913e

Please sign in to comment.