Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stepfunctions): task and heartbeat timeout specified by a path #23755

Merged
merged 10 commits into from
Jan 26, 2023
Merged
2 changes: 1 addition & 1 deletion packages/@aws-cdk/aws-stepfunctions-tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ new tasks.GlueStartJobRun(this, 'Task', {
arguments: sfn.TaskInput.fromObject({
key: 'value',
}),
timeout: Duration.minutes(30),
taskTimeout: sfn.Timeout.duration(Duration.minutes(30)),
notifyDelayAfter: Duration.minutes(5),
});
```
Expand Down
27 changes: 20 additions & 7 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/batch/submit-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,14 @@ export class BatchSubmitJob extends sfn.TaskStateBase {
});

// validate timeout
props.timeout !== undefined && withResolved(props.timeout.toSeconds(), (timeout) => {
if (timeout < 60) {
throw new Error(`attempt duration must be greater than 60 seconds. Received ${timeout} seconds.`);
}
});
(props.timeout !== undefined || props.taskTimeout !== undefined) && withResolved(
props.timeout?.toSeconds(),
props.taskTimeout?.seconds, (timeout, taskTimeout) => {
const definedTimeout = timeout ?? taskTimeout;
if (definedTimeout && definedTimeout < 60) {
throw new Error(`attempt duration must be greater than 60 seconds. Received ${definedTimeout} seconds.`);
}
});

// This is required since environment variables must not start with AWS_BATCH;
// this naming convention is reserved for variables that are set by the AWS Batch service.
Expand All @@ -216,6 +219,15 @@ export class BatchSubmitJob extends sfn.TaskStateBase {
* @internal
*/
protected _renderTask(): any {
let timeout: number | undefined = undefined;
if (this.props.timeout) {
timeout = this.props.timeout.toSeconds();
} else if (this.props.taskTimeout?.seconds) {
timeout = this.props.taskTimeout.seconds;
} else if (this.props.taskTimeout?.path) {
timeout = sfn.JsonPath.numberAt(this.props.taskTimeout.path);
}

return {
Resource: integrationResourceArn('batch', 'submitJob', this.integrationPattern),
Parameters: sfn.FieldUtils.renderObject({
Expand Down Expand Up @@ -244,11 +256,12 @@ export class BatchSubmitJob extends sfn.TaskStateBase {
? { Attempts: this.props.attempts }
: undefined,

Timeout: this.props.timeout
? { AttemptDurationSeconds: this.props.timeout.toSeconds() }
Timeout: timeout
? { AttemptDurationSeconds: timeout }
: undefined,
}),
TimeoutSeconds: undefined,
TimeoutSecondsPath: undefined,
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,27 @@ export class GlueStartJobRun extends sfn.TaskStateBase {
*/
protected _renderTask(): any {
const notificationProperty = this.props.notifyDelayAfter ? { NotifyDelayAfter: this.props.notifyDelayAfter.toMinutes() } : null;

let timeout: number | undefined = undefined;
if (this.props.timeout) {
timeout = this.props.timeout.toMinutes();
} else if (this.props.taskTimeout?.seconds) {
timeout = Duration.seconds(this.props.taskTimeout.seconds).toMinutes();
} else if (this.props.taskTimeout?.path) {
timeout = sfn.JsonPath.numberAt(this.props.taskTimeout.path);
}

return {
Resource: integrationResourceArn('glue', 'startJobRun', this.integrationPattern),
Parameters: sfn.FieldUtils.renderObject({
JobName: this.props.glueJobName,
Arguments: this.props.arguments?.value,
Timeout: this.props.timeout?.toMinutes(),
Timeout: timeout,
SecurityConfiguration: this.props.securityConfiguration,
NotificationProperty: notificationProperty,
}),
TimeoutSeconds: undefined,
TimeoutSecondsPath: undefined,
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class RunBatchStack extends cdk.Stack {
foo: sfn.JsonPath.stringAt('$.bar'),
}),
attempts: 3,
timeout: cdk.Duration.seconds(60),
taskTimeout: sfn.Timeout.duration(cdk.Duration.seconds(60)),
});

const definition = new sfn.Pass(this, 'Start', {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ test('Task with all the parameters', () => {
foo: sfn.JsonPath.stringAt('$.bar'),
}),
attempts: 3,
timeout: cdk.Duration.seconds(60),
taskTimeout: sfn.Timeout.duration(cdk.Duration.seconds(60)),
integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE,
});

Expand Down Expand Up @@ -132,7 +132,7 @@ test('supports tokens', () => {
jobDefinitionArn: batchJobDefinition.jobDefinitionArn,
jobQueueArn: batchJobQueue.jobQueueArn,
arraySize: sfn.JsonPath.numberAt('$.arraySize'),
timeout: cdk.Duration.seconds(sfn.JsonPath.numberAt('$.timeout')),
taskTimeout: sfn.Timeout.at('$.timeout'),
attempts: sfn.JsonPath.numberAt('$.attempts'),
});

Expand Down Expand Up @@ -328,7 +328,7 @@ test('Task throws if attempt duration is less than 60 sec', () => {
jobDefinitionArn: batchJobDefinition.jobDefinitionArn,
jobName: 'JobName',
jobQueueArn: batchJobQueue.jobQueueArn,
timeout: cdk.Duration.seconds(59),
taskTimeout: sfn.Timeout.duration(cdk.Duration.seconds(59)),
});
}).toThrow(
/attempt duration must be greater than 60 seconds./,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
{
"version": "21.0.0",
"version": "29.0.0",
"files": {
"998dbb594bb7c679342b230e008a9707c46a79167240b84e7fd60529137d2fe5": {
"0ee63f9e3418b742ec39af14137ee6aff6c03be8cbee3f04c27b663df1cbabd6": {
"source": {
"path": "aws-sfn-tasks-ecs-fargate-integ.template.json",
"packaging": "file"
},
"destinations": {
"current_account-current_region": {
"bucketName": "cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}",
"objectKey": "998dbb594bb7c679342b230e008a9707c46a79167240b84e7fd60529137d2fe5.json",
"objectKey": "0ee63f9e3418b742ec39af14137ee6aff6c03be8cbee3f04c27b663df1cbabd6.json",
"assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-file-publishing-role-${AWS::AccountId}-${AWS::Region}"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@
"Fn::Join": [
"",
[
"{\"StartAt\":\"Start\",\"States\":{\"Start\":{\"Type\":\"Pass\",\"Result\":{\"SomeKey\":\"SomeValue\"},\"Next\":\"FargateTask\"},\"FargateTask\":{\"End\":true,\"Type\":\"Task\",\"Resource\":\"arn:",
"{\"StartAt\":\"Start\",\"States\":{\"Start\":{\"Type\":\"Pass\",\"Result\":{\"SomeKey\":\"SomeValue\",\"Timeout\":900},\"Next\":\"FargateTask\"},\"FargateTask\":{\"End\":true,\"Type\":\"Task\",\"TimeoutSecondsPath\":\"$.Timeout\",\"Resource\":\"arn:",
{
"Ref": "AWS::Partition"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"version":"21.0.0"}
{"version":"29.0.0"}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "21.0.0",
"version": "29.0.0",
"testCases": {
"integ.fargate-run-task": {
"stacks": [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "21.0.0",
"version": "29.0.0",
"artifacts": {
"aws-sfn-tasks-ecs-fargate-integ.assets": {
"type": "cdk:asset-manifest",
Expand All @@ -17,7 +17,7 @@
"validateOnSynth": false,
"assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-deploy-role-${AWS::AccountId}-${AWS::Region}",
"cloudFormationExecutionRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-cfn-exec-role-${AWS::AccountId}-${AWS::Region}",
"stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/998dbb594bb7c679342b230e008a9707c46a79167240b84e7fd60529137d2fe5.json",
"stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/0ee63f9e3418b742ec39af14137ee6aff6c03be8cbee3f04c27b663df1cbabd6.json",
"requiresBootstrapStackVersion": 6,
"bootstrapStackVersionSsmParameter": "/cdk-bootstrap/hnb659fds/version",
"additionalDependencies": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,14 @@
"id": "TaskRole",
"path": "aws-sfn-tasks-ecs-fargate-integ/TaskDef/TaskRole",
"children": {
"ImportTaskRole": {
"id": "ImportTaskRole",
"path": "aws-sfn-tasks-ecs-fargate-integ/TaskDef/TaskRole/ImportTaskRole",
"constructInfo": {
"fqn": "@aws-cdk/core.Resource",
"version": "0.0.0"
}
},
"Resource": {
"id": "Resource",
"path": "aws-sfn-tasks-ecs-fargate-integ/TaskDef/TaskRole/Resource",
Expand Down Expand Up @@ -828,6 +836,14 @@
"id": "ExecutionRole",
"path": "aws-sfn-tasks-ecs-fargate-integ/TaskDef/ExecutionRole",
"children": {
"ImportExecutionRole": {
"id": "ImportExecutionRole",
"path": "aws-sfn-tasks-ecs-fargate-integ/TaskDef/ExecutionRole/ImportExecutionRole",
"constructInfo": {
"fqn": "@aws-cdk/core.Resource",
"version": "0.0.0"
}
},
"Resource": {
"id": "Resource",
"path": "aws-sfn-tasks-ecs-fargate-integ/TaskDef/ExecutionRole/Resource",
Expand Down Expand Up @@ -1008,6 +1024,14 @@
"id": "Role",
"path": "aws-sfn-tasks-ecs-fargate-integ/StateMachine/Role",
"children": {
"ImportRole": {
"id": "ImportRole",
"path": "aws-sfn-tasks-ecs-fargate-integ/StateMachine/Role/ImportRole",
"constructInfo": {
"fqn": "@aws-cdk/core.Resource",
"version": "0.0.0"
}
},
"Resource": {
"id": "Resource",
"path": "aws-sfn-tasks-ecs-fargate-integ/StateMachine/Role/Resource",
Expand Down Expand Up @@ -1258,7 +1282,7 @@
"Fn::Join": [
"",
[
"{\"StartAt\":\"Start\",\"States\":{\"Start\":{\"Type\":\"Pass\",\"Result\":{\"SomeKey\":\"SomeValue\"},\"Next\":\"FargateTask\"},\"FargateTask\":{\"End\":true,\"Type\":\"Task\",\"Resource\":\"arn:",
"{\"StartAt\":\"Start\",\"States\":{\"Start\":{\"Type\":\"Pass\",\"Result\":{\"SomeKey\":\"SomeValue\",\"Timeout\":900},\"Next\":\"FargateTask\"},\"FargateTask\":{\"End\":true,\"Type\":\"Task\",\"TimeoutSecondsPath\":\"$.Timeout\",\"Resource\":\"arn:",
{
"Ref": "AWS::Partition"
},
Expand Down Expand Up @@ -1336,7 +1360,7 @@
"path": "Tree",
"constructInfo": {
"fqn": "constructs.Construct",
"version": "10.1.140"
"version": "10.1.216"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const containerDefinition = taskDefinition.addContainer('TheContainer', {

// Build state machine
const definition = new sfn.Pass(stack, 'Start', {
result: sfn.Result.fromObject({ SomeKey: 'SomeValue' }),
result: sfn.Result.fromObject({ SomeKey: 'SomeValue', Timeout: 900 }),
}).next(
new tasks.EcsRunTask(stack, 'FargateTask', {
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
Expand All @@ -53,6 +53,7 @@ const definition = new sfn.Pass(stack, 'Start', {
launchTarget: new tasks.EcsFargateLaunchTarget({
platformVersion: ecs.FargatePlatformVersion.VERSION1_4,
}),
taskTimeout: sfn.Timeout.at('$.Timeout'),
}),
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,4 @@ describeDeprecated('RunGlueJobTask', () => {
});
}).toThrow(/Invalid Service Integration Pattern: WAIT_FOR_TASK_TOKEN is not supported to call Glue./i);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ test('Invoke glue job with full properties', () => {
glueJobName,
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
arguments: sfn.TaskInput.fromObject(jobArguments),
timeout: glueJobTimeout,
taskTimeout: sfn.Timeout.duration(glueJobTimeout),
securityConfiguration,
notifyDelayAfter,
});
Expand Down Expand Up @@ -87,6 +87,37 @@ test('Invoke glue job with full properties', () => {
});
});

test('Invoke glue job with Timeout.at()', () => {
const task = new GlueStartJobRun(stack, 'Task', {
glueJobName,
taskTimeout: sfn.Timeout.at('$.timeout'),
});
new sfn.StateMachine(stack, 'SM', {
definition: task,
});

expect(stack.resolve(task.toStateJson())).toEqual({
Type: 'Task',
Resource: {
'Fn::Join': [
'',
[
'arn:',
{
Ref: 'AWS::Partition',
},
':states:::glue:startJobRun',
],
],
},
End: true,
Parameters: {
'JobName': glueJobName,
'Timeout.$': '$.timeout',
},
});
});

test('job arguments can reference state input', () => {
const task = new GlueStartJobRun(stack, 'Task', {
glueJobName,
Expand Down
Loading