Skip to content

Commit

Permalink
feat(stepfunctions-tasks): task for invoking a Step Functions activit…
Browse files Browse the repository at this point in the history
…y worker (#8840)

Replacement class for `InvokeActivity` which currently uses the embedded task.

The notable changes are:
* This change merges task and service integration level properties by extending
`TaskStateBase`, similar to all the other task states.
* `activity` is now a property in the new class and not specified in the constructor

I've left the current tests intact for fidelity and updated the `README`

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
shivlaks authored Jul 6, 2020
1 parent 4ba20fd commit 021533c
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 0 deletions.
31 changes: 31 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ This module is part of the [AWS Cloud Development Kit](https://github.com/aws/aw
- [Create Transform Job](#create-transform-job)
- [SNS](#sns)
- [Step Functions](#step-functions)
- [Start Execution](#start-execution)
- [Invoke Activity Worker](#invoke-activity)
- [SQS](#sqs)

## Task
Expand Down Expand Up @@ -750,6 +752,8 @@ const task2 = new tasks.SnsPublish(this, 'Publish2', {

## Step Functions

### Start Execution

You can manage [AWS Step Functions](https://docs.aws.amazon.com/step-functions/latest/dg/connect-stepfunctions.html) executions.

AWS Step Functions supports it's own [`StartExecution`](https://docs.aws.amazon.com/step-functions/latest/apireference/API_StartExecution.html) API as a service integration.
Expand Down Expand Up @@ -777,6 +781,33 @@ new sfn.StateMachine(stack, 'ParentStateMachine', {
});
```

### Invoke Activity

You can invoke a [Step Functions Activity](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-activities.html) which enables you to have
a task in your state machine where the work is performed by a *worker* that can
be hosted on Amazon EC2, Amazon ECS, AWS Lambda, basically anywhere. Activities
are a way to associate code running somewhere (known as an activity worker) with
a specific task in a state machine.

When Step Functions reaches an activity task state, the workflow waits for an
activity worker to poll for a task. An activity worker polls Step Functions by
using GetActivityTask, and sending the ARN for the related activity.

After the activity worker completes its work, it can provide a report of its
success or failure by using `SendTaskSuccess` or `SendTaskFailure`. These two
calls use the taskToken provided by GetActivityTask to associate the result
with that task.

The following example creates an activity and creates a task that invokes the activity.

```ts
const submitJobActivity = new sfn.Activity(this, 'SubmitJob');

new tasks.StepFunctionsInvokeActivity(this, 'Submit Job', {
activity: submitJobActivity,
});
```

## SQS

Step Functions supports [Amazon SQS](https://docs.aws.amazon.com/step-functions/latest/dg/connect-sqs.html)
Expand Down
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export * from './sagemaker/create-training-job';
export * from './sagemaker/create-transform-job';
export * from './start-execution';
export * from './stepfunctions/start-execution';
export * from './stepfunctions/invoke-activity';
export * from './evaluate-expression';
export * from './emr/emr-create-cluster';
export * from './emr/emr-set-cluster-termination-protection';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ export interface InvokeActivityProps {
* A Step Functions Task to invoke an Activity worker.
*
* An Activity can be used directly as a Resource.
*
* @deprecated - use `StepFunctionsInvokeActivity`
*/
export class InvokeActivity implements sfn.IStepFunctionsTask {
constructor(private readonly activity: sfn.IActivity, private readonly props: InvokeActivityProps = {}) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import * as iam from '@aws-cdk/aws-iam';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as cdk from '@aws-cdk/core';

/**
* Properties for invoking an Activity worker
*/
export interface StepFunctionsInvokeActivityProps extends sfn.TaskStateBaseProps {

/**
* Step Functions Activity to invoke
*/
readonly activity: sfn.IActivity
}

/**
* A Step Functions Task to invoke an Activity worker.
*
* An Activity can be used directly as a Resource.
*/
export class StepFunctionsInvokeActivity extends sfn.TaskStateBase {
protected readonly taskMetrics?: sfn.TaskMetricsConfig;
// No IAM permissions necessary, execution role implicitly has Activity permissions.
protected readonly taskPolicies?: iam.PolicyStatement[];

constructor(scope: cdk.Construct, id: string, private readonly props: StepFunctionsInvokeActivityProps) {
super(scope, id, props);

this.taskMetrics = {
metricDimensions: { ActivityArn: this.props.activity.activityArn },
metricPrefixSingular: 'Activity',
metricPrefixPlural: 'Activities',
};
}

/**
* @internal
*/
protected _renderTask(): any {
return {
Resource: this.props.activity.activityArn,
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
{
"Resources": {
"SubmitJobFB773A16": {
"Type": "AWS::StepFunctions::Activity",
"Properties": {
"Name": "awsstepfunctionsintegSubmitJobA2508960"
}
},
"CheckJob5FFC1D6F": {
"Type": "AWS::StepFunctions::Activity",
"Properties": {
"Name": "awsstepfunctionsintegCheckJobC4AC762D"
}
},
"StateMachineRoleB840431D": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": {
"Fn::Join": [
"",
[
"states.",
{
"Ref": "AWS::Region"
},
".amazonaws.com"
]
]
}
}
}
],
"Version": "2012-10-17"
}
}
},
"StateMachine2E01A3A5": {
"Type": "AWS::StepFunctions::StateMachine",
"Properties": {
"RoleArn": {
"Fn::GetAtt": [
"StateMachineRoleB840431D",
"Arn"
]
},
"DefinitionString": {
"Fn::Join": [
"",
[
"{\"StartAt\":\"Submit Job\",\"States\":{\"Submit Job\":{\"Next\":\"Wait X Seconds\",\"Type\":\"Task\",\"ResultPath\":\"$.guid\",\"Resource\":\"",
{
"Ref": "SubmitJobFB773A16"
},
"\"},\"Wait X Seconds\":{\"Type\":\"Wait\",\"SecondsPath\":\"$.wait_time\",\"Next\":\"Get Job Status\"},\"Get Job Status\":{\"Next\":\"Job Complete?\",\"Type\":\"Task\",\"InputPath\":\"$.guid\",\"ResultPath\":\"$.status\",\"Resource\":\"",
{
"Ref": "CheckJob5FFC1D6F"
},
"\"},\"Job Complete?\":{\"Type\":\"Choice\",\"Choices\":[{\"Variable\":\"$.status\",\"StringEquals\":\"FAILED\",\"Next\":\"Job Failed\"},{\"Variable\":\"$.status\",\"StringEquals\":\"SUCCEEDED\",\"Next\":\"Get Final Job Status\"}],\"Default\":\"Wait X Seconds\"},\"Job Failed\":{\"Type\":\"Fail\",\"Error\":\"DescribeJob returned FAILED\",\"Cause\":\"AWS Batch Job Failed\"},\"Get Final Job Status\":{\"End\":true,\"Type\":\"Task\",\"InputPath\":\"$.guid\",\"Resource\":\"",
{
"Ref": "CheckJob5FFC1D6F"
},
"\"}},\"TimeoutSeconds\":300}"
]
]
}
},
"DependsOn": [
"StateMachineRoleB840431D"
]
}
},
"Outputs": {
"stateMachineArn": {
"Value": {
"Ref": "StateMachine2E01A3A5"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as cdk from '@aws-cdk/core';
import * as tasks from '../../lib';

/*
* Creates a state machine with a job poller sample project
* https://docs.aws.amazon.com/step-functions/latest/dg/sample-project-job-poller.html
*
* Stack verification steps:
* The generated State Machine can be executed from the CLI (or Step Functions console)
* and runs with an execution status of `Running`.
*
* An external process can call the state machine to send a heartbeat or response before it times out.
*
* -- aws stepfunctions start-execution --state-machine-arn <state-machine-arn-from-output> provides execution arn
* -- aws stepfunctions describe-execution --execution-arn <state-machine-arn-from-output> returns a status of `Running`
*
* CHANGEME: extend this test to create the external resources to report heartbeats
*/
class InvokeActivityStack extends cdk.Stack {
constructor(scope: cdk.App, id: string, props: cdk.StackProps = {}) {
super(scope, id, props);

const submitJobActivity = new sfn.Activity(this, 'SubmitJob');
const checkJobActivity = new sfn.Activity(this, 'CheckJob');

const submitJob = new tasks.StepFunctionsInvokeActivity(this, 'Submit Job', {
activity: submitJobActivity,
resultPath: '$.guid',
});
const waitX = new sfn.Wait(this, 'Wait X Seconds', { time: sfn.WaitTime.secondsPath('$.wait_time') });
const getStatus = new tasks.StepFunctionsInvokeActivity(this, 'Get Job Status', {
activity: checkJobActivity,
inputPath: '$.guid',
resultPath: '$.status',
});
const isComplete = new sfn.Choice(this, 'Job Complete?');
const jobFailed = new sfn.Fail(this, 'Job Failed', {
cause: 'AWS Batch Job Failed',
error: 'DescribeJob returned FAILED',
});
const finalStatus = new tasks.StepFunctionsInvokeActivity(this, 'Get Final Job Status', {
activity: checkJobActivity,
inputPath: '$.guid',
});

const chain = sfn.Chain
.start(submitJob)
.next(waitX)
.next(getStatus)
.next(isComplete
.when(sfn.Condition.stringEquals('$.status', 'FAILED'), jobFailed)
.when(sfn.Condition.stringEquals('$.status', 'SUCCEEDED'), finalStatus)
.otherwise(waitX));

const sm = new sfn.StateMachine(this, 'StateMachine', {
definition: chain,
timeout: cdk.Duration.seconds(300),
});

new cdk.CfnOutput(this, 'stateMachineArn', {
value: sm.stateMachineArn,
});
}
}

const app = new cdk.App();
new InvokeActivityStack(app, 'aws-stepfunctions-integ');
app.synth();
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import '@aws-cdk/assert/jest';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import { Stack } from '@aws-cdk/core';
import { StepFunctionsInvokeActivity } from '../../lib/stepfunctions/invoke-activity';

test('Activity can be used in a Task', () => {
// GIVEN
const stack = new Stack();

// WHEN
const activity = new sfn.Activity(stack, 'Activity');
const task = new StepFunctionsInvokeActivity(stack, 'Task', { activity });
new sfn.StateMachine(stack, 'SM', {
definition: task,
});

// THEN
expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
DefinitionString: {
'Fn::Join': ['', [
'{"StartAt":"Task","States":{"Task":{"End":true,"Type":"Task","Resource":"',
{ Ref: 'Activity04690B0A' },
'"}}}',
]],
},
});
});

test('Activity Task metrics and Activity metrics are the same', () => {
// GIVEN
const stack = new Stack();
const activity = new sfn.Activity(stack, 'Activity');
const task = new StepFunctionsInvokeActivity(stack, 'Invoke', {activity });

// WHEN
const activityMetrics = [
activity.metricFailed(),
activity.metricHeartbeatTimedOut(),
activity.metricRunTime(),
activity.metricScheduled(),
activity.metricScheduleTime(),
activity.metricStarted(),
activity.metricSucceeded(),
activity.metricTime(),
activity.metricTimedOut(),
];

const taskMetrics = [
task.metricFailed(),
task.metricHeartbeatTimedOut(),
task.metricRunTime(),
task.metricScheduled(),
task.metricScheduleTime(),
task.metricStarted(),
task.metricSucceeded(),
task.metricTime(),
task.metricTimedOut(),
];

// THEN
for (let i = 0; i < activityMetrics.length; i++) {
expect(activityMetrics[i]).toEqual(taskMetrics[i]);
}
});

0 comments on commit 021533c

Please sign in to comment.