Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stepfunctions-tasks): start a nested state machine execution as a construct #8178

Merged
merged 12 commits into from
Jun 3, 2020
Merged
17 changes: 8 additions & 9 deletions packages/@aws-cdk/aws-stepfunctions-tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -728,15 +728,14 @@ const child = new sfn.StateMachine(stack, 'ChildStateMachine', {
});

// Include the state machine in a Task state with callback pattern
const task = new sfn.Task(stack, 'ChildTask', {
task: new tasks.ExecuteStateMachine(child, {
integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN,
input: {
token: sfn.Context.taskToken,
foo: 'bar'
},
name: 'MyExecutionName'
})
const task = new StepFunctionsStartExecution(stack, 'ChildTask', {
stateMachine: child,
integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
input: sfn.TaskInput.fromObject({
token: sfn.Context.taskToken,
foo: 'bar'
}),
name: 'MyExecutionName'
});

// Define a second state machine with the Task state above
Expand Down
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export * from './sagemaker/sagemaker-task-base-types';
export * from './sagemaker/sagemaker-train-task';
export * from './sagemaker/sagemaker-transform-task';
export * from './start-execution';
export * from './stepfunctions/start-execution';
export * from './evaluate-expression';
export * from './emr/emr-create-cluster';
export * from './emr/emr-set-cluster-termination-protection';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { getResourceArn } from './resource-arn-suffix';

/**
* Properties for StartExecution
*
* @deprecated - use 'StepFunctionsStartExecution'
*/
export interface StartExecutionProps {
/**
Expand Down Expand Up @@ -39,6 +41,8 @@ export interface StartExecutionProps {
* A Step Functions Task to call StartExecution on another state machine.
*
* It supports three service integration patterns: FIRE_AND_FORGET, SYNC and WAIT_FOR_TASK_TOKEN.
*
* @deprecated - use 'StepFunctionsStartExecution'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as with batch.

Can we call new StepFunctionsStartExecution() in the constructor here and delete most of the code in this file?

Copy link
Contributor Author

@shivlaks shivlaks Jun 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's pretty natural and organic if we were replacing one API with another to create an adapter and call into the replacement. I don't think this is a scenario where we can just proxy into the replacement as the replacement contains and configures properties (i.e. inputPath, outputPath, ...) that the older

In this case, we are replacing a class that implemented a bind method to contribute a portion of the properties to represent a Task with a class that's a construct and configures all of the properties for a Task state.

The user experience with the current format will be to embed the call to start execution into the invocation of a new Task. I don't think proxying to invoke a new StepFunctionStartExeuction yields the right replacement.

new Task(this, 'my-task', {
  task: new tasks.ExecuteStateMachine(child, {...}),
...
}

Some of the other ones to come like DynamoDB and ECS, I've tried to reuse the shared types. I could perhaps move more into a central location here and DRY it up a bit.

Did you have something else in mind? If so, are you opposed to me making a subsequent PR to do some refactoring?

*/
export class StartExecution implements sfn.IStepFunctionsTask {
private readonly integrationPattern: sfn.ServiceIntegrationPattern;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import * as iam from '@aws-cdk/aws-iam';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import { Construct, Stack } from '@aws-cdk/core';
import { integrationResourceArn, validatePatternSupported } from '../private/task-utils';

/**
* Properties for StartExecution
*/
export interface StepFunctionsStartExecutionProps extends sfn.TaskStateBaseProps {
/**
* The Step Functions state machine to start the execution on.
*/
readonly stateMachine: sfn.IStateMachine;

/**
* The JSON input for the execution, same as that of StartExecution.
*
* @see https://docs.aws.amazon.com/step-functions/latest/apireference/API_StartExecution.html
*
* @default - The state input (JSON path '$')
*/
readonly input?: sfn.TaskInput;

/**
* The name of the execution, same as that of StartExecution.
*
* @see https://docs.aws.amazon.com/step-functions/latest/apireference/API_StartExecution.html
*
* @default - None
*/
readonly name?: string;
}

/**
* A Step Functions Task to call StartExecution on another state machine.
*
* It supports three service integration patterns: FIRE_AND_FORGET, SYNC and WAIT_FOR_TASK_TOKEN.
*/
export class StepFunctionsStartExecution extends sfn.TaskStateBase {
private static readonly SUPPORTED_INTEGRATION_PATTERNS = [
sfn.IntegrationPattern.REQUEST_RESPONSE,
sfn.IntegrationPattern.RUN_JOB,
sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
];

protected readonly taskMetrics?: sfn.TaskMetricsConfig;
protected readonly taskPolicies?: iam.PolicyStatement[];

private readonly integrationPattern: sfn.IntegrationPattern;

constructor(scope: Construct, id: string, private readonly props: StepFunctionsStartExecutionProps) {
super(scope, id, props);

this.integrationPattern = props.integrationPattern || sfn.IntegrationPattern.REQUEST_RESPONSE;
validatePatternSupported(this.integrationPattern, StepFunctionsStartExecution.SUPPORTED_INTEGRATION_PATTERNS);

if (this.integrationPattern === sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN && !sfn.FieldUtils.containsTaskToken(props.input)) {
throw new Error('Task Token is required in `input` for callback. Use Context.taskToken to set the token.');
}

this.taskPolicies = this.createScopedAccessPolicy();
}

protected renderTask(): any {
// suffix of ':2' indicates that the output of the nested state machine should be JSON
// suffix is only applicable when waiting for a nested state machine to complete (RUN_JOB)
// https://docs.aws.amazon.com/step-functions/latest/dg/connect-stepfunctions.html
const suffix = this.integrationPattern === sfn.IntegrationPattern.RUN_JOB ? ':2' : '';
return {
Resource: `${integrationResourceArn('states', 'startExecution', this.integrationPattern)}${suffix}`,
Parameters: sfn.FieldUtils.renderObject({
Input: this.props.input ? this.props.input.value : sfn.TaskInput.fromDataAt('$').value,
StateMachineArn: this.props.stateMachine.stateMachineArn,
Name: this.props.name,
}),
};
}

/**
* As StateMachineArn is extracted automatically from the state machine object included in the constructor,
*
* the scoped access policy should be generated accordingly.
*
* This means the action of StartExecution should be restricted on the given state machine, instead of being granted to all the resources (*).
*/
private createScopedAccessPolicy(): iam.PolicyStatement[] {
const stack = Stack.of(this);

const policyStatements = [
new iam.PolicyStatement({
actions: ['states:StartExecution'],
resources: [this.props.stateMachine.stateMachineArn],
}),
];

// Step Functions use Cloud Watch managed rules to deal with synchronous tasks.
if (this.integrationPattern === sfn.IntegrationPattern.RUN_JOB) {
policyStatements.push(
new iam.PolicyStatement({
actions: ['states:DescribeExecution', 'states:StopExecution'],
// https://docs.aws.amazon.com/step-functions/latest/dg/concept-create-iam-advanced.html#concept-create-iam-advanced-execution
resources: [
stack.formatArn({
service: 'states',
resource: 'execution',
sep: ':',
resourceName: `${stack.parseArn(this.props.stateMachine.stateMachineArn, ':').resourceName}*`,
}),
],
}),
);

policyStatements.push(
new iam.PolicyStatement({
actions: ['events:PutTargets', 'events:PutRule', 'events:DescribeRule'],
resources: [
stack.formatArn({
service: 'events',
resource: 'rule',
resourceName: 'StepFunctionsGetEventsForStepFunctionsExecutionRule',
}),
],
}),
);
}

return policyStatements;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
{
"Resources": {
"ChildRole1E3E0EF5": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": {
"Fn::Join": [
"",
[
"states.",
{
"Ref": "AWS::Region"
},
".amazonaws.com"
]
]
}
}
}
],
"Version": "2012-10-17"
}
}
},
"ChildDAB30558": {
"Type": "AWS::StepFunctions::StateMachine",
"Properties": {
"DefinitionString": "{\"StartAt\":\"Pass\",\"States\":{\"Pass\":{\"Type\":\"Pass\",\"End\":true}}}",
"RoleArn": {
"Fn::GetAtt": ["ChildRole1E3E0EF5", "Arn"]
}
},
"DependsOn": ["ChildRole1E3E0EF5"]
},
"ParentRole5F0C366C": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": {
"Fn::Join": [
"",
[
"states.",
{
"Ref": "AWS::Region"
},
".amazonaws.com"
]
]
}
}
}
],
"Version": "2012-10-17"
}
}
},
"ParentRoleDefaultPolicy9BDC56DC": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "states:StartExecution",
"Effect": "Allow",
"Resource": {
"Ref": "ChildDAB30558"
}
},
{
"Action": ["states:DescribeExecution", "states:StopExecution"],
"Effect": "Allow",
"Resource": {
"Fn::Join": [
"",
[
"arn:",
{
"Ref": "AWS::Partition"
},
":states:",
{
"Ref": "AWS::Region"
},
":",
{
"Ref": "AWS::AccountId"
},
":execution:",
{
"Fn::Select": [
6,
{
"Fn::Split": [
":",
{
"Ref": "ChildDAB30558"
}
]
}
]
},
"*"
]
]
}
},
{
"Action": ["events:PutTargets", "events:PutRule", "events:DescribeRule"],
"Effect": "Allow",
"Resource": {
"Fn::Join": [
"",
[
"arn:",
{
"Ref": "AWS::Partition"
},
":events:",
{
"Ref": "AWS::Region"
},
":",
{
"Ref": "AWS::AccountId"
},
":rule/StepFunctionsGetEventsForStepFunctionsExecutionRule"
]
]
}
}
],
"Version": "2012-10-17"
},
"PolicyName": "ParentRoleDefaultPolicy9BDC56DC",
"Roles": [
{
"Ref": "ParentRole5F0C366C"
}
]
}
},
"Parent8B210403": {
"Type": "AWS::StepFunctions::StateMachine",
"Properties": {
"DefinitionString": {
"Fn::Join": [
"",
[
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Type\":\"Task\",\"Resource\":\"arn:",
{
"Ref": "AWS::Partition"
},
":states:::states:startExecution.sync:2\",\"Parameters\":{\"Input\":{\"hello.$\":\"$.hello\"},\"StateMachineArn\":\"",
{
"Ref": "ChildDAB30558"
},
"\"}}}}"
]
]
},
"RoleArn": {
"Fn::GetAtt": ["ParentRole5F0C366C", "Arn"]
}
},
"DependsOn": ["ParentRoleDefaultPolicy9BDC56DC", "ParentRole5F0C366C"]
}
},
"Outputs": {
"StateMachineARN": {
"Value": {
"Ref": "Parent8B210403"
}
}
}
}
Loading