Skip to content

Commit

Permalink
feat(stepfunctions): task and heartbeat timeout specified by a path (#…
Browse files Browse the repository at this point in the history
…23755)

Add support for dynamic timeouts referenced by a path in the state.

Closes #15531 

----

### All Submissions:

* [x] Have you followed the guidelines in our [Contributing guide?](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md)

### Adding new Construct Runtime Dependencies:

* [ ] This PR adds new construct runtime dependencies following the process described [here](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md/#adding-construct-runtime-dependencies)

### New Features

* [x] Have you added the new feature to an [integration test](https://github.com/aws/aws-cdk/blob/main/INTEGRATION_TESTS.md)?
	* [x] Did you use `yarn integ` to deploy the infrastructure and generate the snapshot (i.e. `yarn integ` without `--dry-run`)?

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
jogold authored Jan 26, 2023
1 parent 475dbef commit 26e48c7
Show file tree
Hide file tree
Showing 17 changed files with 215 additions and 36 deletions.
2 changes: 1 addition & 1 deletion packages/@aws-cdk/aws-stepfunctions-tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ new tasks.GlueStartJobRun(this, 'Task', {
arguments: sfn.TaskInput.fromObject({
key: 'value',
}),
timeout: Duration.minutes(30),
taskTimeout: sfn.Timeout.duration(Duration.minutes(30)),
notifyDelayAfter: Duration.minutes(5),
});
```
Expand Down
27 changes: 20 additions & 7 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/batch/submit-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,14 @@ export class BatchSubmitJob extends sfn.TaskStateBase {
});

// validate timeout
props.timeout !== undefined && withResolved(props.timeout.toSeconds(), (timeout) => {
if (timeout < 60) {
throw new Error(`attempt duration must be greater than 60 seconds. Received ${timeout} seconds.`);
}
});
(props.timeout !== undefined || props.taskTimeout !== undefined) && withResolved(
props.timeout?.toSeconds(),
props.taskTimeout?.seconds, (timeout, taskTimeout) => {
const definedTimeout = timeout ?? taskTimeout;
if (definedTimeout && definedTimeout < 60) {
throw new Error(`attempt duration must be greater than 60 seconds. Received ${definedTimeout} seconds.`);
}
});

// This is required since environment variables must not start with AWS_BATCH;
// this naming convention is reserved for variables that are set by the AWS Batch service.
Expand All @@ -216,6 +219,15 @@ export class BatchSubmitJob extends sfn.TaskStateBase {
* @internal
*/
protected _renderTask(): any {
let timeout: number | undefined = undefined;
if (this.props.timeout) {
timeout = this.props.timeout.toSeconds();
} else if (this.props.taskTimeout?.seconds) {
timeout = this.props.taskTimeout.seconds;
} else if (this.props.taskTimeout?.path) {
timeout = sfn.JsonPath.numberAt(this.props.taskTimeout.path);
}

return {
Resource: integrationResourceArn('batch', 'submitJob', this.integrationPattern),
Parameters: sfn.FieldUtils.renderObject({
Expand Down Expand Up @@ -244,11 +256,12 @@ export class BatchSubmitJob extends sfn.TaskStateBase {
? { Attempts: this.props.attempts }
: undefined,

Timeout: this.props.timeout
? { AttemptDurationSeconds: this.props.timeout.toSeconds() }
Timeout: timeout
? { AttemptDurationSeconds: timeout }
: undefined,
}),
TimeoutSeconds: undefined,
TimeoutSecondsPath: undefined,
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,27 @@ export class GlueStartJobRun extends sfn.TaskStateBase {
*/
protected _renderTask(): any {
const notificationProperty = this.props.notifyDelayAfter ? { NotifyDelayAfter: this.props.notifyDelayAfter.toMinutes() } : null;

let timeout: number | undefined = undefined;
if (this.props.timeout) {
timeout = this.props.timeout.toMinutes();
} else if (this.props.taskTimeout?.seconds) {
timeout = Duration.seconds(this.props.taskTimeout.seconds).toMinutes();
} else if (this.props.taskTimeout?.path) {
timeout = sfn.JsonPath.numberAt(this.props.taskTimeout.path);
}

return {
Resource: integrationResourceArn('glue', 'startJobRun', this.integrationPattern),
Parameters: sfn.FieldUtils.renderObject({
JobName: this.props.glueJobName,
Arguments: this.props.arguments?.value,
Timeout: this.props.timeout?.toMinutes(),
Timeout: timeout,
SecurityConfiguration: this.props.securityConfiguration,
NotificationProperty: notificationProperty,
}),
TimeoutSeconds: undefined,
TimeoutSecondsPath: undefined,
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class RunBatchStack extends cdk.Stack {
foo: sfn.JsonPath.stringAt('$.bar'),
}),
attempts: 3,
timeout: cdk.Duration.seconds(60),
taskTimeout: sfn.Timeout.duration(cdk.Duration.seconds(60)),
});

const definition = new sfn.Pass(this, 'Start', {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ test('Task with all the parameters', () => {
foo: sfn.JsonPath.stringAt('$.bar'),
}),
attempts: 3,
timeout: cdk.Duration.seconds(60),
taskTimeout: sfn.Timeout.duration(cdk.Duration.seconds(60)),
integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE,
});

Expand Down Expand Up @@ -132,7 +132,7 @@ test('supports tokens', () => {
jobDefinitionArn: batchJobDefinition.jobDefinitionArn,
jobQueueArn: batchJobQueue.jobQueueArn,
arraySize: sfn.JsonPath.numberAt('$.arraySize'),
timeout: cdk.Duration.seconds(sfn.JsonPath.numberAt('$.timeout')),
taskTimeout: sfn.Timeout.at('$.timeout'),
attempts: sfn.JsonPath.numberAt('$.attempts'),
});

Expand Down Expand Up @@ -328,7 +328,7 @@ test('Task throws if attempt duration is less than 60 sec', () => {
jobDefinitionArn: batchJobDefinition.jobDefinitionArn,
jobName: 'JobName',
jobQueueArn: batchJobQueue.jobQueueArn,
timeout: cdk.Duration.seconds(59),
taskTimeout: sfn.Timeout.duration(cdk.Duration.seconds(59)),
});
}).toThrow(
/attempt duration must be greater than 60 seconds./,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
{
"version": "21.0.0",
"version": "29.0.0",
"files": {
"998dbb594bb7c679342b230e008a9707c46a79167240b84e7fd60529137d2fe5": {
"0ee63f9e3418b742ec39af14137ee6aff6c03be8cbee3f04c27b663df1cbabd6": {
"source": {
"path": "aws-sfn-tasks-ecs-fargate-integ.template.json",
"packaging": "file"
},
"destinations": {
"current_account-current_region": {
"bucketName": "cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}",
"objectKey": "998dbb594bb7c679342b230e008a9707c46a79167240b84e7fd60529137d2fe5.json",
"objectKey": "0ee63f9e3418b742ec39af14137ee6aff6c03be8cbee3f04c27b663df1cbabd6.json",
"assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-file-publishing-role-${AWS::AccountId}-${AWS::Region}"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@
"Fn::Join": [
"",
[
"{\"StartAt\":\"Start\",\"States\":{\"Start\":{\"Type\":\"Pass\",\"Result\":{\"SomeKey\":\"SomeValue\"},\"Next\":\"FargateTask\"},\"FargateTask\":{\"End\":true,\"Type\":\"Task\",\"Resource\":\"arn:",
"{\"StartAt\":\"Start\",\"States\":{\"Start\":{\"Type\":\"Pass\",\"Result\":{\"SomeKey\":\"SomeValue\",\"Timeout\":900},\"Next\":\"FargateTask\"},\"FargateTask\":{\"End\":true,\"Type\":\"Task\",\"TimeoutSecondsPath\":\"$.Timeout\",\"Resource\":\"arn:",
{
"Ref": "AWS::Partition"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"version":"21.0.0"}
{"version":"29.0.0"}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "21.0.0",
"version": "29.0.0",
"testCases": {
"integ.fargate-run-task": {
"stacks": [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "21.0.0",
"version": "29.0.0",
"artifacts": {
"aws-sfn-tasks-ecs-fargate-integ.assets": {
"type": "cdk:asset-manifest",
Expand All @@ -17,7 +17,7 @@
"validateOnSynth": false,
"assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-deploy-role-${AWS::AccountId}-${AWS::Region}",
"cloudFormationExecutionRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-cfn-exec-role-${AWS::AccountId}-${AWS::Region}",
"stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/998dbb594bb7c679342b230e008a9707c46a79167240b84e7fd60529137d2fe5.json",
"stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/0ee63f9e3418b742ec39af14137ee6aff6c03be8cbee3f04c27b663df1cbabd6.json",
"requiresBootstrapStackVersion": 6,
"bootstrapStackVersionSsmParameter": "/cdk-bootstrap/hnb659fds/version",
"additionalDependencies": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,14 @@
"id": "TaskRole",
"path": "aws-sfn-tasks-ecs-fargate-integ/TaskDef/TaskRole",
"children": {
"ImportTaskRole": {
"id": "ImportTaskRole",
"path": "aws-sfn-tasks-ecs-fargate-integ/TaskDef/TaskRole/ImportTaskRole",
"constructInfo": {
"fqn": "@aws-cdk/core.Resource",
"version": "0.0.0"
}
},
"Resource": {
"id": "Resource",
"path": "aws-sfn-tasks-ecs-fargate-integ/TaskDef/TaskRole/Resource",
Expand Down Expand Up @@ -828,6 +836,14 @@
"id": "ExecutionRole",
"path": "aws-sfn-tasks-ecs-fargate-integ/TaskDef/ExecutionRole",
"children": {
"ImportExecutionRole": {
"id": "ImportExecutionRole",
"path": "aws-sfn-tasks-ecs-fargate-integ/TaskDef/ExecutionRole/ImportExecutionRole",
"constructInfo": {
"fqn": "@aws-cdk/core.Resource",
"version": "0.0.0"
}
},
"Resource": {
"id": "Resource",
"path": "aws-sfn-tasks-ecs-fargate-integ/TaskDef/ExecutionRole/Resource",
Expand Down Expand Up @@ -1008,6 +1024,14 @@
"id": "Role",
"path": "aws-sfn-tasks-ecs-fargate-integ/StateMachine/Role",
"children": {
"ImportRole": {
"id": "ImportRole",
"path": "aws-sfn-tasks-ecs-fargate-integ/StateMachine/Role/ImportRole",
"constructInfo": {
"fqn": "@aws-cdk/core.Resource",
"version": "0.0.0"
}
},
"Resource": {
"id": "Resource",
"path": "aws-sfn-tasks-ecs-fargate-integ/StateMachine/Role/Resource",
Expand Down Expand Up @@ -1258,7 +1282,7 @@
"Fn::Join": [
"",
[
"{\"StartAt\":\"Start\",\"States\":{\"Start\":{\"Type\":\"Pass\",\"Result\":{\"SomeKey\":\"SomeValue\"},\"Next\":\"FargateTask\"},\"FargateTask\":{\"End\":true,\"Type\":\"Task\",\"Resource\":\"arn:",
"{\"StartAt\":\"Start\",\"States\":{\"Start\":{\"Type\":\"Pass\",\"Result\":{\"SomeKey\":\"SomeValue\",\"Timeout\":900},\"Next\":\"FargateTask\"},\"FargateTask\":{\"End\":true,\"Type\":\"Task\",\"TimeoutSecondsPath\":\"$.Timeout\",\"Resource\":\"arn:",
{
"Ref": "AWS::Partition"
},
Expand Down Expand Up @@ -1336,7 +1360,7 @@
"path": "Tree",
"constructInfo": {
"fqn": "constructs.Construct",
"version": "10.1.140"
"version": "10.1.216"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const containerDefinition = taskDefinition.addContainer('TheContainer', {

// Build state machine
const definition = new sfn.Pass(stack, 'Start', {
result: sfn.Result.fromObject({ SomeKey: 'SomeValue' }),
result: sfn.Result.fromObject({ SomeKey: 'SomeValue', Timeout: 900 }),
}).next(
new tasks.EcsRunTask(stack, 'FargateTask', {
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
Expand All @@ -53,6 +53,7 @@ const definition = new sfn.Pass(stack, 'Start', {
launchTarget: new tasks.EcsFargateLaunchTarget({
platformVersion: ecs.FargatePlatformVersion.VERSION1_4,
}),
taskTimeout: sfn.Timeout.at('$.Timeout'),
}),
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,4 @@ describeDeprecated('RunGlueJobTask', () => {
});
}).toThrow(/Invalid Service Integration Pattern: WAIT_FOR_TASK_TOKEN is not supported to call Glue./i);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ test('Invoke glue job with full properties', () => {
glueJobName,
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
arguments: sfn.TaskInput.fromObject(jobArguments),
timeout: glueJobTimeout,
taskTimeout: sfn.Timeout.duration(glueJobTimeout),
securityConfiguration,
notifyDelayAfter,
});
Expand Down Expand Up @@ -87,6 +87,37 @@ test('Invoke glue job with full properties', () => {
});
});

test('Invoke glue job with Timeout.at()', () => {
const task = new GlueStartJobRun(stack, 'Task', {
glueJobName,
taskTimeout: sfn.Timeout.at('$.timeout'),
});
new sfn.StateMachine(stack, 'SM', {
definition: task,
});

expect(stack.resolve(task.toStateJson())).toEqual({
Type: 'Task',
Resource: {
'Fn::Join': [
'',
[
'arn:',
{
Ref: 'AWS::Partition',
},
':states:::glue:startJobRun',
],
],
},
End: true,
Parameters: {
'JobName': glueJobName,
'Timeout.$': '$.timeout',
},
});
});

test('job arguments can reference state input', () => {
const task = new GlueStartJobRun(stack, 'Task', {
glueJobName,
Expand Down
Loading

0 comments on commit 26e48c7

Please sign in to comment.