Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stepfunctions): new service integration classes for Lambda, SNS, and SQS #7946

Merged
merged 25 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bf66763
feat(stepfunctions): new service integration classes that merge that …
shivlaks May 13, 2020
f58149c
get rid of TODO
shivlaks May 14, 2020
dfb89d4
remove unnecessary resultPath
shivlaks May 14, 2020
6bf1eae
functionName -> functionArn
shivlaks May 14, 2020
375ae58
update error message for supported service integration patterns
shivlaks May 14, 2020
0cde795
make policies and metrics properties
shivlaks May 15, 2020
193b048
make taskMetrics and taskPolicies abstract properties
shivlaks May 15, 2020
e87f1bf
simplify rendering of task json. move common state json into base class
shivlaks May 16, 2020
dd07450
Update packages/@aws-cdk/aws-stepfunctions-tasks/lib/lambda/invoke.ts
shivlaks May 16, 2020
4f02d8b
add unit tests
shivlaks May 17, 2020
57ed3ca
add integ tests
shivlaks May 17, 2020
dbdd526
Merge branch 'master' into shivlaks/sfn-merge-task-and-state-lambda
shivlaks May 17, 2020
c30e9b6
update comments and doc strings
shivlaks May 17, 2020
6a001e6
README updates
shivlaks May 17, 2020
77f03ea
add dimension for qualifier if it was specified
shivlaks May 18, 2020
5604abc
update default strings so they're prefixed with '-'
shivlaks May 18, 2020
d352300
simplify unit tests
shivlaks May 20, 2020
28b7165
cleanup
shivlaks May 20, 2020
7183cb2
Merge branch 'master' into shivlaks/sfn-merge-task-and-state-lambda
shivlaks May 20, 2020
e40f1cc
Update packages/@aws-cdk/aws-stepfunctions-tasks/lib/lambda/invoke.ts
shivlaks May 20, 2020
69b2a45
mark older implementations of SNS, SQS, Lambda as deprecated
shivlaks May 20, 2020
6e695a8
Merge branch 'shivlaks/sfn-merge-task-and-state-lambda' of https://gi…
shivlaks May 20, 2020
76c5918
Merge branch 'master' into shivlaks/sfn-merge-task-and-state-lambda
shivlaks May 20, 2020
9134c43
Merge branch 'master' into shivlaks/sfn-merge-task-and-state-lambda
mergify[bot] May 20, 2020
fab0b69
Merge branch 'master' into shivlaks/sfn-merge-task-and-state-lambda
mergify[bot] May 20, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 60 additions & 50 deletions packages/@aws-cdk/aws-stepfunctions-tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,24 @@ The following snippet invokes a Lambda Function with the state input as the payl
by referencing the `$` path.

```ts
new sfn.Task(this, 'Invoke with state input');
import * as lambda from '@aws-cdk/aws-lambda';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';

const myLambda = new lambda.Function(this, 'my sample lambda', {
code: Code.fromInline(`exports.handler = async () => {
return {
statusCode: '200',
body: 'hello, world!'
};
};`),
runtime: Runtime.NODEJS_12_X,
handler: 'index.handler',
});

new tasks.LambdaInvoke(this, 'Invoke with state input', {
lambdaFunction: myLambda,
});
```

When a function is invoked, the Lambda service sends [these response
Expand All @@ -545,27 +562,25 @@ The following snippet invokes a Lambda Function by referencing the `$.Payload` p
to reference the output of a Lambda executed before it.

```ts
new sfn.Task(this, 'Invoke with empty object as payload', {
task: new tasks.RunLambdaTask(myLambda, {
payload: sfn.TaskInput.fromObject({})
}),
new tasks.LambdaInvoke(this, 'Invoke with empty object as payload', {
lambdaFunction: myLambda,
payload: sfn.TaskInput.fromObject({}),
});

new sfn.Task(this, 'Invoke with payload field in the state input', {
task: new tasks.RunLambdaTask(myOtherLambda, {
payload: sfn.TaskInput.fromDataAt('$.Payload'),
}),
// use the output of myLambda as input
new tasks.LambdaInvoke(this, 'Invoke with payload field in the state input', {
lambdaFunction: myOtherLambda,
payload: sfn.TaskInput.fromDataAt('$.Payload'),
});
```

The following snippet invokes a Lambda and sets the task output to only include
the Lambda function response.

```ts
new sfn.Task(this, 'Invoke and set function response as task output', {
task: new tasks.RunLambdaTask(myLambda, {
payload: sfn.TaskInput.fromDataAt('$'),
}),
new tasks.LambdaInvoke(this, 'Invoke and set function response as task output', {
lambdaFunction: myLambda,
payload: sfn.TaskInput.fromDataAt('$'),
outputPath: '$.Payload',
});
```
Expand All @@ -581,15 +596,14 @@ The following snippet invokes a Lambda with the task token as part of the input
to the Lambda.

```ts
const task = new sfn.Task(stack, 'Invoke with callback', {
task: new tasks.RunLambdaTask(myLambda, {
integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN,
payload: {
token: sfn.Context.taskToken,
input: sfn.TaskInput.fromDataAt('$.someField'),
}
})
});
new tasks.LambdaInvoke(stack, 'Invoke with callback', {
lambdaFunction: myLambda,
integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
payload: sfn.TaskInput.fromObject({
token: sfn.Context.taskToken,
input: sfn.Data.stringAt('$.someField'),
}),
});
```

⚠️ The task will pause until it receives that task token back with a `SendTaskSuccess` or `SendTaskFailure`
Expand Down Expand Up @@ -677,28 +691,28 @@ You can call the [`Publish`](https://docs.aws.amazon.com/sns/latest/api/API_Publ

```ts
import * as sns from '@aws-cdk/aws-sns';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';

// ...

const topic = new sns.Topic(this, 'Topic');

// Use a field from the execution data as message.
const task1 = new sfn.Task(this, 'Publish1', {
task: new tasks.PublishToTopic(topic, {
integrationPattern: sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
message: TaskInput.fromDataAt('$.state.message'),
})
const task1 = new tasks.SnsPublish(this, 'Publish1', {
topic,
integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE,
message: sfn.TaskInput.fromDataAt('$.state.message'),
});

// Combine a field from the execution data with
// a literal object.
const task2 = new sfn.Task(this, 'Publish2', {
task: new tasks.PublishToTopic(topic, {
message: TaskInput.fromObject({
field1: 'somedata',
field2: Data.stringAt('$.field2'),
})
})
const task2 = new tasks.SnsPublish(this, 'Publish2', {
topic,
message: sfn.TaskInput.fromObject({
field1: 'somedata',
field2: sfn.Data.stringAt('$.field2'),
})
});
```

Expand Down Expand Up @@ -740,31 +754,27 @@ You can call the [`SendMessage`](https://docs.aws.amazon.com/AWSSimpleQueueServi
to send a message to an SQS queue.

```ts
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';
import * as sqs from '@aws-cdk/aws-sqs';

// ...

const queue = new sns.Queue(this, 'Queue');
const queue = new sqs.Queue(this, 'Queue');

// Use a field from the execution data as message.
const task1 = new sfn.Task(this, 'Send1', {
task: new tasks.SendToQueue(queue, {
messageBody: TaskInput.fromDataAt('$.message'),
// Only for FIFO queues
messageGroupId: '1234'
})
const task1 = new tasks.SqsSendMessage(this, 'Send1', {
queue,
messageBody: sfn.TaskInput.fromDataAt('$.message'),
});

// Combine a field from the execution data with
// a literal object.
const task2 = new sfn.Task(this, 'Send2', {
task: new tasks.SendToQueue(queue, {
messageBody: TaskInput.fromObject({
field1: 'somedata',
field2: Data.stringAt('$.field2'),
}),
// Only for FIFO queues
messageGroupId: '1234'
})
const task2 = new tasks.SqsSendMessage(this, 'Send2', {
queue,
messageBody: sfn.TaskInput.fromObject({
field1: 'somedata',
field2: sfn.Data.stringAt('$.field2'),
}),
});
```
3 changes: 3 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
export * from './lambda/invoke-function';
export * from './lambda/run-lambda-task';
export * from './lambda/invoke';
export * from './invoke-activity';
export * from './ecs/run-ecs-task-base'; // Remove this once we can
export * from './ecs/run-ecs-task-base-types';
export * from './sns/publish-to-topic';
export * from './sns/publish';
export * from './sqs/send-to-queue';
export * from './sqs/send-message';
export * from './ecs/run-ecs-ec2-task';
export * from './ecs/run-ecs-fargate-task';
export * from './sagemaker/sagemaker-task-base-types';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import * as sfn from '@aws-cdk/aws-stepfunctions';
/**
* Properties for InvokeFunction
*
* @deprecated use `RunLambdaTask`
* @deprecated use `LambdaInvoke`
*/
export interface InvokeFunctionProps {
/**
Expand All @@ -25,7 +25,7 @@ export interface InvokeFunctionProps {
*
* OUTPUT: the output of this task is the return value of the Lambda Function.
*
* @deprecated Use `RunLambdaTask`
* @deprecated Use `LambdaInvoke`
*/
export class InvokeFunction implements sfn.IStepFunctionsTask {
constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: InvokeFunctionProps = {}) {
Expand Down
137 changes: 137 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/lambda/invoke.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import * as iam from '@aws-cdk/aws-iam';
import * as lambda from '@aws-cdk/aws-lambda';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as cdk from '@aws-cdk/core';
import { integrationResourceArn, validatePatternSupported } from '../private/task-utils';

/**
* Properties for invoking a Lambda function with LambdaInvoke
*/
export interface LambdaInvokeProps extends sfn.TaskStateBaseProps {

/**
* Lambda function to invoke
*/
readonly lambdaFunction: lambda.IFunction;

/**
* The JSON that will be supplied as input to the Lambda function
*
* @default - The state input (JSON path '$')
*/
readonly payload?: sfn.TaskInput;

/**
* Invocation type of the Lambda function
*
* @default InvocationType.REQUEST_RESPONSE
*/
readonly invocationType?: LambdaInvocationType;

/**
* Up to 3583 bytes of base64-encoded data about the invoking client
* to pass to the function.
*
* @default - No context
*/
readonly clientContext?: string;

/**
* Version or alias to invoke a published version of the function
*
* You only need to supply this if you want the version of the Lambda Function to depend
* on data in the state machine state. If not, you can pass the appropriate Alias or Version object
* directly as the `lambdaFunction` argument.
*
* @default - Version or alias inherent to the `lambdaFunction` object.
*/
readonly qualifier?: string;
shivlaks marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Invoke a Lambda function as a Task
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-lambda.html
*/
export class LambdaInvoke extends sfn.TaskStateBase {
shivlaks marked this conversation as resolved.
Show resolved Hide resolved

private static readonly SUPPORTED_INTEGRATION_PATTERNS: sfn.IntegrationPattern[] = [
sfn.IntegrationPattern.REQUEST_RESPONSE,
sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
];

protected readonly taskMetrics?: sfn.TaskMetricsConfig;
protected readonly taskPolicies?: iam.PolicyStatement[];

private readonly integrationPattern: sfn.IntegrationPattern;

constructor(scope: cdk.Construct, id: string, private readonly props: LambdaInvokeProps) {
super(scope, id, props);
this.integrationPattern = props.integrationPattern ?? sfn.IntegrationPattern.REQUEST_RESPONSE;

validatePatternSupported(this.integrationPattern, LambdaInvoke.SUPPORTED_INTEGRATION_PATTERNS);

if (this.integrationPattern === sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN
&& !sfn.FieldUtils.containsTaskToken(props.payload)) {
throw new Error('Task Token is required in `payload` for callback. Use Context.taskToken to set the token.');
}

this.taskMetrics = {
metricPrefixSingular: 'LambdaFunction',
metricPrefixPlural: 'LambdaFunctions',
metricDimensions: {
LambdaFunctionArn: this.props.lambdaFunction.functionArn,
...(this.props.qualifier && { Qualifier: this.props.qualifier }),
},
};

this.taskPolicies = [
new iam.PolicyStatement({
resources: [this.props.lambdaFunction.functionArn],
actions: ['lambda:InvokeFunction'],
}),
];
}

/**
* Provides the Lambda Invoke service integration task configuration
*/
protected renderTask(): any {
return {
Resource: integrationResourceArn('lambda', 'invoke', this.integrationPattern),
Parameters: sfn.FieldUtils.renderObject({
FunctionName: this.props.lambdaFunction.functionArn,
Payload: this.props.payload ? this.props.payload.value : sfn.TaskInput.fromDataAt('$').value,
InvocationType: this.props.invocationType,
ClientContext: this.props.clientContext,
Qualifier: this.props.qualifier,
}),
};
}
}

/**
* Invocation type of a Lambda
*/
export enum LambdaInvocationType {
/**
* Invoke the function synchronously.
*
* Keep the connection open until the function returns a response or times out.
* The API response includes the function response and additional data.
*/
REQUEST_RESPONSE = 'RequestResponse',

/**
* Invoke the function asynchronously.
*
* Send events that fail multiple times to the function's dead-letter queue (if it's configured).
* The API response only includes a status code.
*/
EVENT = 'Event',

/**
* Validate parameter values and verify that the user or role has permission to invoke the function.
*/
DRY_RUN = 'DryRun'
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { getResourceArn } from '../resource-arn-suffix';

/**
* Properties for RunLambdaTask
*
* @deprecated Use `LambdaInvoke`
*/
export interface RunLambdaTaskProps {
/**
Expand Down Expand Up @@ -58,6 +60,7 @@ export interface RunLambdaTaskProps {
* `SendTaskSuccess/SendTaskFailure` in `waitForTaskToken` mode.
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-lambda.html
* @deprecated Use `LambdaInvoke`
*/
export class RunLambdaTask implements sfn.IStepFunctionsTask {
private readonly integrationPattern: sfn.ServiceIntegrationPattern;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import {
IntegrationPattern,
} from '@aws-cdk/aws-stepfunctions';
import { Aws } from '@aws-cdk/core';

/**
* Verifies that a validation pattern is supported for a service integration
*
*/
export function validatePatternSupported(integrationPattern: IntegrationPattern, supportedPatterns: IntegrationPattern[]) {
if (!supportedPatterns.includes(integrationPattern)) {
throw new Error(`Unsupported service integration pattern. Supported Patterns: ${supportedPatterns}. Received: ${integrationPattern}`);
}
}

/**
* Suffixes corresponding to different service integration patterns
*
* Key is the service integration pattern, value is the resource ARN suffix.
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html
*/
const resourceArnSuffix: Record<IntegrationPattern, string> = {
[IntegrationPattern.REQUEST_RESPONSE]: '',
[IntegrationPattern.RUN_JOB]: '.sync',
[IntegrationPattern.WAIT_FOR_TASK_TOKEN]: '.waitForTaskToken',
};

export function integrationResourceArn(service: string, api: string, integrationPattern: IntegrationPattern): string {
if (!service || !api) {
throw new Error("Both 'service' and 'api' must be provided to build the resource ARN.");
}
return `arn:${Aws.PARTITION}:states:::${service}:${api}` +
(integrationPattern ? resourceArnSuffix[integrationPattern] : '');
}
Loading