Skip to content

Commit

Permalink
feat(stepfunctions): new service integration classes for Lambda, SNS,…
Browse files Browse the repository at this point in the history
… and SQS (aws#7946)

merge functionality currently modeled under task and state into a single object that represents a task state as an abstract class. Service integrations extend the base class and add service specific configuration, metrics, and policies.

this commit introduces the "new" service integrations for Lambda, SNS, and SQS

Motivation: 
The current service integrations that are offered in `aws-stepfunctions-tasks` all currently implement a `bind()` method and contribute a portion of the configuration that creates a `Task` state.

However, it's often useful to configure state level properties such as paths, retries, errors based on the service integration and the pattern that has been requested.

Implementation:
* Duplicate the current `Task` class and merge the properties of a task state and a task service integration into a new abstract base class.
* Concrete implementation per service integration class where all of the best practices and user intents can be encoded
* After all the service integrations have been migrated, we will want to retire the `Task` class as well since we don't want it to be instantiated.

Paves the way for:
aws#6715 by simplifying the invocation of service integration calls. We would be able to start adding properties such as retries and errors.

aws#6489 where we will be making service integrations consistent

### End Commit Message

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
shivlaks authored and karupanerura committed May 21, 2020
1 parent 8f95e8d commit baafb83
Show file tree
Hide file tree
Showing 25 changed files with 2,491 additions and 67 deletions.
110 changes: 60 additions & 50 deletions packages/@aws-cdk/aws-stepfunctions-tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,24 @@ The following snippet invokes a Lambda Function with the state input as the payl
by referencing the `$` path.

```ts
new sfn.Task(this, 'Invoke with state input');
import * as lambda from '@aws-cdk/aws-lambda';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';

const myLambda = new lambda.Function(this, 'my sample lambda', {
code: Code.fromInline(`exports.handler = async () => {
return {
statusCode: '200',
body: 'hello, world!'
};
};`),
runtime: Runtime.NODEJS_12_X,
handler: 'index.handler',
});

new tasks.LambdaInvoke(this, 'Invoke with state input', {
lambdaFunction: myLambda,
});
```

When a function is invoked, the Lambda service sends [these response
Expand All @@ -545,27 +562,25 @@ The following snippet invokes a Lambda Function by referencing the `$.Payload` p
to reference the output of a Lambda executed before it.

```ts
new sfn.Task(this, 'Invoke with empty object as payload', {
task: new tasks.RunLambdaTask(myLambda, {
payload: sfn.TaskInput.fromObject({})
}),
new tasks.LambdaInvoke(this, 'Invoke with empty object as payload', {
lambdaFunction: myLambda,
payload: sfn.TaskInput.fromObject({}),
});

new sfn.Task(this, 'Invoke with payload field in the state input', {
task: new tasks.RunLambdaTask(myOtherLambda, {
payload: sfn.TaskInput.fromDataAt('$.Payload'),
}),
// use the output of myLambda as input
new tasks.LambdaInvoke(this, 'Invoke with payload field in the state input', {
lambdaFunction: myOtherLambda,
payload: sfn.TaskInput.fromDataAt('$.Payload'),
});
```

The following snippet invokes a Lambda and sets the task output to only include
the Lambda function response.

```ts
new sfn.Task(this, 'Invoke and set function response as task output', {
task: new tasks.RunLambdaTask(myLambda, {
payload: sfn.TaskInput.fromDataAt('$'),
}),
new tasks.LambdaInvoke(this, 'Invoke and set function response as task output', {
lambdaFunction: myLambda,
payload: sfn.TaskInput.fromDataAt('$'),
outputPath: '$.Payload',
});
```
Expand All @@ -581,15 +596,14 @@ The following snippet invokes a Lambda with the task token as part of the input
to the Lambda.

```ts
const task = new sfn.Task(stack, 'Invoke with callback', {
task: new tasks.RunLambdaTask(myLambda, {
integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN,
payload: {
token: sfn.Context.taskToken,
input: sfn.TaskInput.fromDataAt('$.someField'),
}
})
});
new tasks.LambdaInvoke(stack, 'Invoke with callback', {
lambdaFunction: myLambda,
integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
payload: sfn.TaskInput.fromObject({
token: sfn.Context.taskToken,
input: sfn.Data.stringAt('$.someField'),
}),
});
```

⚠️ The task will pause until it receives that task token back with a `SendTaskSuccess` or `SendTaskFailure`
Expand Down Expand Up @@ -677,28 +691,28 @@ You can call the [`Publish`](https://docs.aws.amazon.com/sns/latest/api/API_Publ

```ts
import * as sns from '@aws-cdk/aws-sns';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';

// ...

const topic = new sns.Topic(this, 'Topic');

// Use a field from the execution data as message.
const task1 = new sfn.Task(this, 'Publish1', {
task: new tasks.PublishToTopic(topic, {
integrationPattern: sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
message: TaskInput.fromDataAt('$.state.message'),
})
const task1 = new tasks.SnsPublish(this, 'Publish1', {
topic,
integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE,
message: sfn.TaskInput.fromDataAt('$.state.message'),
});

// Combine a field from the execution data with
// a literal object.
const task2 = new sfn.Task(this, 'Publish2', {
task: new tasks.PublishToTopic(topic, {
message: TaskInput.fromObject({
field1: 'somedata',
field2: Data.stringAt('$.field2'),
})
})
const task2 = new tasks.SnsPublish(this, 'Publish2', {
topic,
message: sfn.TaskInput.fromObject({
field1: 'somedata',
field2: sfn.Data.stringAt('$.field2'),
})
});
```

Expand Down Expand Up @@ -740,31 +754,27 @@ You can call the [`SendMessage`](https://docs.aws.amazon.com/AWSSimpleQueueServi
to send a message to an SQS queue.

```ts
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';
import * as sqs from '@aws-cdk/aws-sqs';

// ...

const queue = new sns.Queue(this, 'Queue');
const queue = new sqs.Queue(this, 'Queue');

// Use a field from the execution data as message.
const task1 = new sfn.Task(this, 'Send1', {
task: new tasks.SendToQueue(queue, {
messageBody: TaskInput.fromDataAt('$.message'),
// Only for FIFO queues
messageGroupId: '1234'
})
const task1 = new tasks.SqsSendMessage(this, 'Send1', {
queue,
messageBody: sfn.TaskInput.fromDataAt('$.message'),
});

// Combine a field from the execution data with
// a literal object.
const task2 = new sfn.Task(this, 'Send2', {
task: new tasks.SendToQueue(queue, {
messageBody: TaskInput.fromObject({
field1: 'somedata',
field2: Data.stringAt('$.field2'),
}),
// Only for FIFO queues
messageGroupId: '1234'
})
const task2 = new tasks.SqsSendMessage(this, 'Send2', {
queue,
messageBody: sfn.TaskInput.fromObject({
field1: 'somedata',
field2: sfn.Data.stringAt('$.field2'),
}),
});
```
3 changes: 3 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
export * from './lambda/invoke-function';
export * from './lambda/run-lambda-task';
export * from './lambda/invoke';
export * from './invoke-activity';
export * from './ecs/run-ecs-task-base'; // Remove this once we can
export * from './ecs/run-ecs-task-base-types';
export * from './sns/publish-to-topic';
export * from './sns/publish';
export * from './sqs/send-to-queue';
export * from './sqs/send-message';
export * from './ecs/run-ecs-ec2-task';
export * from './ecs/run-ecs-fargate-task';
export * from './sagemaker/sagemaker-task-base-types';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import * as sfn from '@aws-cdk/aws-stepfunctions';
/**
* Properties for InvokeFunction
*
* @deprecated use `RunLambdaTask`
* @deprecated use `LambdaInvoke`
*/
export interface InvokeFunctionProps {
/**
Expand All @@ -25,7 +25,7 @@ export interface InvokeFunctionProps {
*
* OUTPUT: the output of this task is the return value of the Lambda Function.
*
* @deprecated Use `RunLambdaTask`
* @deprecated Use `LambdaInvoke`
*/
export class InvokeFunction implements sfn.IStepFunctionsTask {
constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: InvokeFunctionProps = {}) {
Expand Down
137 changes: 137 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/lambda/invoke.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import * as iam from '@aws-cdk/aws-iam';
import * as lambda from '@aws-cdk/aws-lambda';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as cdk from '@aws-cdk/core';
import { integrationResourceArn, validatePatternSupported } from '../private/task-utils';

/**
* Properties for invoking a Lambda function with LambdaInvoke
*/
export interface LambdaInvokeProps extends sfn.TaskStateBaseProps {

/**
* Lambda function to invoke
*/
readonly lambdaFunction: lambda.IFunction;

/**
* The JSON that will be supplied as input to the Lambda function
*
* @default - The state input (JSON path '$')
*/
readonly payload?: sfn.TaskInput;

/**
* Invocation type of the Lambda function
*
* @default InvocationType.REQUEST_RESPONSE
*/
readonly invocationType?: LambdaInvocationType;

/**
* Up to 3583 bytes of base64-encoded data about the invoking client
* to pass to the function.
*
* @default - No context
*/
readonly clientContext?: string;

/**
* Version or alias to invoke a published version of the function
*
* You only need to supply this if you want the version of the Lambda Function to depend
* on data in the state machine state. If not, you can pass the appropriate Alias or Version object
* directly as the `lambdaFunction` argument.
*
* @default - Version or alias inherent to the `lambdaFunction` object.
*/
readonly qualifier?: string;
}

/**
* Invoke a Lambda function as a Task
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-lambda.html
*/
export class LambdaInvoke extends sfn.TaskStateBase {

private static readonly SUPPORTED_INTEGRATION_PATTERNS: sfn.IntegrationPattern[] = [
sfn.IntegrationPattern.REQUEST_RESPONSE,
sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
];

protected readonly taskMetrics?: sfn.TaskMetricsConfig;
protected readonly taskPolicies?: iam.PolicyStatement[];

private readonly integrationPattern: sfn.IntegrationPattern;

constructor(scope: cdk.Construct, id: string, private readonly props: LambdaInvokeProps) {
super(scope, id, props);
this.integrationPattern = props.integrationPattern ?? sfn.IntegrationPattern.REQUEST_RESPONSE;

validatePatternSupported(this.integrationPattern, LambdaInvoke.SUPPORTED_INTEGRATION_PATTERNS);

if (this.integrationPattern === sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN
&& !sfn.FieldUtils.containsTaskToken(props.payload)) {
throw new Error('Task Token is required in `payload` for callback. Use Context.taskToken to set the token.');
}

this.taskMetrics = {
metricPrefixSingular: 'LambdaFunction',
metricPrefixPlural: 'LambdaFunctions',
metricDimensions: {
LambdaFunctionArn: this.props.lambdaFunction.functionArn,
...(this.props.qualifier && { Qualifier: this.props.qualifier }),
},
};

this.taskPolicies = [
new iam.PolicyStatement({
resources: [this.props.lambdaFunction.functionArn],
actions: ['lambda:InvokeFunction'],
}),
];
}

/**
* Provides the Lambda Invoke service integration task configuration
*/
protected renderTask(): any {
return {
Resource: integrationResourceArn('lambda', 'invoke', this.integrationPattern),
Parameters: sfn.FieldUtils.renderObject({
FunctionName: this.props.lambdaFunction.functionArn,
Payload: this.props.payload ? this.props.payload.value : sfn.TaskInput.fromDataAt('$').value,
InvocationType: this.props.invocationType,
ClientContext: this.props.clientContext,
Qualifier: this.props.qualifier,
}),
};
}
}

/**
* Invocation type of a Lambda
*/
export enum LambdaInvocationType {
/**
* Invoke the function synchronously.
*
* Keep the connection open until the function returns a response or times out.
* The API response includes the function response and additional data.
*/
REQUEST_RESPONSE = 'RequestResponse',

/**
* Invoke the function asynchronously.
*
* Send events that fail multiple times to the function's dead-letter queue (if it's configured).
* The API response only includes a status code.
*/
EVENT = 'Event',

/**
* Validate parameter values and verify that the user or role has permission to invoke the function.
*/
DRY_RUN = 'DryRun'
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { getResourceArn } from '../resource-arn-suffix';

/**
* Properties for RunLambdaTask
*
* @deprecated Use `LambdaInvoke`
*/
export interface RunLambdaTaskProps {
/**
Expand Down Expand Up @@ -58,6 +60,7 @@ export interface RunLambdaTaskProps {
* `SendTaskSuccess/SendTaskFailure` in `waitForTaskToken` mode.
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-lambda.html
* @deprecated Use `LambdaInvoke`
*/
export class RunLambdaTask implements sfn.IStepFunctionsTask {
private readonly integrationPattern: sfn.ServiceIntegrationPattern;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import {
IntegrationPattern,
} from '@aws-cdk/aws-stepfunctions';
import { Aws } from '@aws-cdk/core';

/**
* Verifies that a validation pattern is supported for a service integration
*
*/
export function validatePatternSupported(integrationPattern: IntegrationPattern, supportedPatterns: IntegrationPattern[]) {
if (!supportedPatterns.includes(integrationPattern)) {
throw new Error(`Unsupported service integration pattern. Supported Patterns: ${supportedPatterns}. Received: ${integrationPattern}`);
}
}

/**
* Suffixes corresponding to different service integration patterns
*
* Key is the service integration pattern, value is the resource ARN suffix.
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html
*/
const resourceArnSuffix: Record<IntegrationPattern, string> = {
[IntegrationPattern.REQUEST_RESPONSE]: '',
[IntegrationPattern.RUN_JOB]: '.sync',
[IntegrationPattern.WAIT_FOR_TASK_TOKEN]: '.waitForTaskToken',
};

export function integrationResourceArn(service: string, api: string, integrationPattern: IntegrationPattern): string {
if (!service || !api) {
throw new Error("Both 'service' and 'api' must be provided to build the resource ARN.");
}
return `arn:${Aws.PARTITION}:states:::${service}:${api}` +
(integrationPattern ? resourceArnSuffix[integrationPattern] : '');
}
Loading

0 comments on commit baafb83

Please sign in to comment.