From 9d892696801d67387a652e122d3adb1e0045725f Mon Sep 17 00:00:00 2001 From: Ben Chaimberg Date: Tue, 14 Jan 2020 12:05:57 -0800 Subject: [PATCH 01/12] feat(stepfunctions-tasks): add step functions task to run glue job * add new task RunGlueJobTask and associated unit tests * since Job construct does not yet exist, uses job name as required parameter closes #5266 --- .../aws-stepfunctions-tasks/lib/index.ts | 1 + .../lib/run-glue-job-task.ts | 116 ++++++++++++++++++ .../aws-stepfunctions-tasks/package.json | 4 +- .../test/run-glue-job-task.test.ts | 100 +++++++++++++++ packages/@aws-cdk/aws-stepfunctions/README.md | 14 +++ 5 files changed, 234 insertions(+), 1 deletion(-) create mode 100644 packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts index c1ab798e39367..9908c6bb4e422 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts @@ -19,3 +19,4 @@ export * from './emr-add-step'; export * from './emr-cancel-step'; export * from './emr-modify-instance-fleet-by-name'; export * from './emr-modify-instance-group-by-name'; +export * from './run-glue-job-task'; diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts new file mode 100644 index 0000000000000..3220abf747730 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts @@ -0,0 +1,116 @@ +import * as glue from '@aws-cdk/aws-glue'; +import * as iam from '@aws-cdk/aws-iam'; +import * as sfn from '@aws-cdk/aws-stepfunctions'; +import { Duration } from '@aws-cdk/core'; +import { getResourceArn } from './resource-arn-suffix'; + +/** + * Properties for RunGlueJobTask + */ +export interface RunGlueJobTaskProps { + + /** + * The service integration pattern indicates different ways to start the Glue job. + * + * The valid value for Glue is either FIRE_AND_FORGET or SYNC. + * + * @default FIRE_AND_FORGET + */ + readonly integrationPattern?: sfn.ServiceIntegrationPattern; + + /** + * The ID of a previous JobRun to retry. + * + * @default - Creates new run + */ + readonly jobRunId?: string; + + /** + * The job arguments specifically for this run. For this job run, they replace the + * default arguments set in the job definition itself. + * + * @default - Only arguments consumed by Glue + */ + readonly arguments?: { [key: string]: string }; + + /** + * The number of AWS Glue data processing units (DPUs) to allocate to this JobRun. + * + * @default - 10 + */ + readonly allocatedCapacity?: number; + + /** + * The JobRun timeout in minutes. This is the maximum time that a job run can consume + * resources before it is terminated and enters TIMEOUT status. + * + * @default - 2,880 (48 hours) + */ + readonly timeout?: Duration; + + /** + * The name of the SecurityConfiguration structure to be used with this job run. + * + * @default - No configuration + */ + readonly securityConfiguration?: string; + + /** + * Specifies configuration properties of a job run notification. + * + * @default - No configuration + */ + readonly notificationProperty?: glue.CfnJob.NotificationPropertyProperty; +} + +/** + * Invoke a Glue job as a Task + * + * OUTPUT: the output of this task is a JobRun structure, for details consult + * https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-jobs-runs.html#aws-glue-api-jobs-runs-JobRun + * + * @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-glue.html + */ +export class RunGlueJobTask implements sfn.IStepFunctionsTask { + private readonly integrationPattern: sfn.ServiceIntegrationPattern; + + constructor(private readonly glueJobName: string, private readonly props: RunGlueJobTaskProps = {}) { + this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET; + + const supportedPatterns = [ + sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, + sfn.ServiceIntegrationPattern.SYNC + ]; + + if (!supportedPatterns.includes(this.integrationPattern)) { + throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call Glue.`); + } + } + + public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig { + return { + resourceArn: getResourceArn("glue", "startJobRun", this.integrationPattern), + policyStatements: [new iam.PolicyStatement({ + resources: ["*"], + actions: [ + "glue:StartJobRun", + "glue:GetJobRun", + "glue:GetJobRuns", + "glue:BatchStopJobRun" + ], + })], + metricPrefixSingular: 'GlueJob', + metricPrefixPlural: 'GlueJobs', + metricDimensions: { GlueJobName: this.glueJobName }, + parameters: { + JobName: this.glueJobName, + JobRunId: this.props.jobRunId, + Arguments: this.props.arguments, + AllocatedCapacity: this.props.allocatedCapacity, + Timeout: this.props.timeout, + SecurityConfiguration: this.props.securityConfiguration, + NotificationProperty: this.props.notificationProperty + } + }; + } +} diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/package.json b/packages/@aws-cdk/aws-stepfunctions-tasks/package.json index 309e3f55c0b27..8aa257d1c12e3 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/package.json +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/package.json @@ -92,6 +92,7 @@ "@aws-cdk/aws-ecr": "1.23.0", "@aws-cdk/aws-ecr-assets": "1.23.0", "@aws-cdk/aws-ecs": "1.23.0", + "@aws-cdk/aws-glue": "1.23.0", "@aws-cdk/aws-iam": "1.23.0", "@aws-cdk/aws-kms": "1.23.0", "@aws-cdk/aws-lambda": "1.23.0", @@ -109,6 +110,7 @@ "@aws-cdk/aws-ecr": "1.23.0", "@aws-cdk/aws-ecr-assets": "1.23.0", "@aws-cdk/aws-ecs": "1.23.0", + "@aws-cdk/aws-glue": "1.23.0", "@aws-cdk/aws-iam": "1.23.0", "@aws-cdk/aws-kms": "1.23.0", "@aws-cdk/aws-lambda": "1.23.0", @@ -182,4 +184,4 @@ "props-default-doc:@aws-cdk/aws-stepfunctions-tasks.VpcConfig.subnets" ] } -} \ No newline at end of file +} diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts new file mode 100644 index 0000000000000..3ed0e69869f3d --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts @@ -0,0 +1,100 @@ +import '@aws-cdk/assert/jest'; +import * as sfn from '@aws-cdk/aws-stepfunctions'; +import { Duration, Stack } from '@aws-cdk/core'; +import * as tasks from '../lib'; + +const jobName = "GlueJob"; +let stack: Stack; +beforeEach(() => { + stack = new Stack(); +}); + +test('Invoke glue job with just job ARN', () => { + const task = new sfn.Task(stack, 'Task', { + task: new tasks.RunGlueJobTask(jobName) + }); + new sfn.StateMachine(stack, 'SM', { + definition: task + }); + + expect(stack.resolve(task.toStateJson())).toEqual({ + Type: 'Task', + Resource: { + "Fn::Join": [ + "", + [ + "arn:", + { + Ref: "AWS::Partition", + }, + ":states:::glue:startJobRun", + ], + ], + }, + End: true, + Parameters: { + JobName: jobName + }, + }); +}); + +test('Invoke glue job with full properties', () => { + const jobRunId = "jobRunId"; + const jobArguments = { + key: "value" + }; + const allocatedCapacity = 100; + const timeout = Duration.minutes(1440); + const securityConfiguration = "securityConfiguration"; + const notificationProperty = { notifyDelayAfter: 10 }; + const task = new sfn.Task(stack, 'Task', { + task: new tasks.RunGlueJobTask(jobName, { + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, + jobRunId, + arguments: jobArguments, + allocatedCapacity, + timeout, + securityConfiguration, + notificationProperty + }) + }); + new sfn.StateMachine(stack, 'SM', { + definition: task + }); + + expect(stack.resolve(task.toStateJson())).toEqual({ + Type: 'Task', + Resource: { + "Fn::Join": [ + "", + [ + "arn:", + { + Ref: "AWS::Partition", + }, + ":states:::glue:startJobRun.sync", + ], + ], + }, + End: true, + Parameters: { + JobName: jobName, + JobRunId: jobRunId, + Arguments: jobArguments, + AllocatedCapacity: allocatedCapacity, + Timeout: timeout, + SecurityConfiguration: securityConfiguration, + NotificationProperty: notificationProperty + }, + }); +}); + +test('Task throws if WAIT_FOR_TASK_TOKEN is supplied as service integration pattern', () => { + expect(() => { + new sfn.Task(stack, 'Task', { + task: new tasks.RunGlueJobTask(jobName, { + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN + }) + }); + }).toThrow(/Invalid Service Integration Pattern: WAIT_FOR_TASK_TOKEN is not supported to call Glue./i); +}); diff --git a/packages/@aws-cdk/aws-stepfunctions/README.md b/packages/@aws-cdk/aws-stepfunctions/README.md index c15bdc7c86412..fd7c819dd1534 100644 --- a/packages/@aws-cdk/aws-stepfunctions/README.md +++ b/packages/@aws-cdk/aws-stepfunctions/README.md @@ -126,6 +126,7 @@ couple of the tasks available are: queue that you poll on a compute fleet you manage yourself) * `tasks.InvokeFunction` -- invoke a Lambda function with function ARN * `tasks.RunLambdaTask` -- call Lambda as integrated service with magic ARN +* `tasks.RunGlueJobTask` -- call Glue Job as integrated service * `tasks.PublishToTopic` -- publish a message to an SNS topic * `tasks.SendToQueue` -- send a message to an SQS queue * `tasks.RunEcsFargateTask`/`ecs.RunEcsEc2Task` -- run a container task, @@ -185,6 +186,19 @@ task.next(nextState); }); ``` +#### Glue Job example + +```ts + const task = new sfn.Task(stack, 'ETL', { + task: new tasks.RunGlueJobTask(glueJobArn, { + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, + arguments: { + "--table-prefix": "myTable" + } + }) + }); +``` + #### SNS example ```ts From 402a76ce00c18a469786ba07f0c1438e6da64ec1 Mon Sep 17 00:00:00 2001 From: Ben Chaimberg Date: Fri, 21 Feb 2020 15:48:38 -0800 Subject: [PATCH 02/12] cleanup constructor properties, add integration test --- .../lib/run-glue-job-task.ts | 30 ++- .../test/integ.glue-task.expected.json | 185 ++++++++++++++++++ .../test/integ.glue-task.ts | 55 ++++++ .../test/my-glue-script/job.py | 1 + .../test/run-glue-job-task.test.ts | 17 +- packages/@aws-cdk/aws-stepfunctions/README.md | 2 +- 6 files changed, 263 insertions(+), 27 deletions(-) create mode 100644 packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.expected.json create mode 100644 packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions-tasks/test/my-glue-script/job.py diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts index 3220abf747730..ef81a51b10c53 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts @@ -1,4 +1,3 @@ -import * as glue from '@aws-cdk/aws-glue'; import * as iam from '@aws-cdk/aws-iam'; import * as sfn from '@aws-cdk/aws-stepfunctions'; import { Duration } from '@aws-cdk/core'; @@ -29,22 +28,16 @@ export interface RunGlueJobTaskProps { * The job arguments specifically for this run. For this job run, they replace the * default arguments set in the job definition itself. * - * @default - Only arguments consumed by Glue + * @default - Default arguments set in the job definition */ readonly arguments?: { [key: string]: string }; /** - * The number of AWS Glue data processing units (DPUs) to allocate to this JobRun. + * The job run timeout. This is the maximum time that a job run can consume + * resources before it is terminated and enters TIMEOUT status. Must be at least 1 + * minute. * - * @default - 10 - */ - readonly allocatedCapacity?: number; - - /** - * The JobRun timeout in minutes. This is the maximum time that a job run can consume - * resources before it is terminated and enters TIMEOUT status. - * - * @default - 2,880 (48 hours) + * @default Duration.hours(48) */ readonly timeout?: Duration; @@ -56,11 +49,12 @@ export interface RunGlueJobTaskProps { readonly securityConfiguration?: string; /** - * Specifies configuration properties of a job run notification. + * After a job run starts, the number of minutes to wait before sending a job run delay + * notification. Must be at least 1 minute. * - * @default - No configuration + * @default - No delay */ - readonly notificationProperty?: glue.CfnJob.NotificationPropertyProperty; + readonly notifyDelayAfter?: Duration; } /** @@ -88,6 +82,7 @@ export class RunGlueJobTask implements sfn.IStepFunctionsTask { } public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig { + const notificationProperty = this.props.notifyDelayAfter ? { NotifyDelayAfter: this.props.notifyDelayAfter.toMinutes() } : null return { resourceArn: getResourceArn("glue", "startJobRun", this.integrationPattern), policyStatements: [new iam.PolicyStatement({ @@ -106,10 +101,9 @@ export class RunGlueJobTask implements sfn.IStepFunctionsTask { JobName: this.glueJobName, JobRunId: this.props.jobRunId, Arguments: this.props.arguments, - AllocatedCapacity: this.props.allocatedCapacity, - Timeout: this.props.timeout, + Timeout: this.props.timeout?.toMinutes(), SecurityConfiguration: this.props.securityConfiguration, - NotificationProperty: this.props.notificationProperty + NotificationProperty: notificationProperty } }; } diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.expected.json b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.expected.json new file mode 100644 index 0000000000000..b8c30102dc8e1 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.expected.json @@ -0,0 +1,185 @@ +{ + "Parameters": { + "AssetParameters00e9009a1958b2a7ac277352874efb747a15aebd9bd8fb3c1e8b5a4a12f16819S3BucketCE4DD78A": { + "Type": "String", + "Description": "S3 bucket for asset \"00e9009a1958b2a7ac277352874efb747a15aebd9bd8fb3c1e8b5a4a12f16819\"" + }, + "AssetParameters00e9009a1958b2a7ac277352874efb747a15aebd9bd8fb3c1e8b5a4a12f16819S3VersionKeyBC9A77D3": { + "Type": "String", + "Description": "S3 key for asset version \"00e9009a1958b2a7ac277352874efb747a15aebd9bd8fb3c1e8b5a4a12f16819\"" + }, + "AssetParameters00e9009a1958b2a7ac277352874efb747a15aebd9bd8fb3c1e8b5a4a12f16819ArtifactHash34012B55": { + "Type": "String", + "Description": "Artifact hash for asset \"00e9009a1958b2a7ac277352874efb747a15aebd9bd8fb3c1e8b5a4a12f16819\"" + } + }, + "Resources": { + "GlueJobRole1CD031E0": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "glue.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSGlueServiceRole" + ] + ] + } + ] + } + }, + "GlueJob": { + "Type": "AWS::Glue::Job", + "Properties": { + "Name": "My Glue Job", + "Command": { + "Name": "glueetl", + "PythonVersion": "3", + "ScriptLocation": { + "Fn::Join": [ + "", + [ + "s3://", + { + "Ref": "AssetParameters00e9009a1958b2a7ac277352874efb747a15aebd9bd8fb3c1e8b5a4a12f16819S3BucketCE4DD78A" + }, + "/", + { + "Fn::Select": [ + 0, + { + "Fn::Split": [ + "||", + { + "Ref": "AssetParameters00e9009a1958b2a7ac277352874efb747a15aebd9bd8fb3c1e8b5a4a12f16819S3VersionKeyBC9A77D3" + } + ] + } + ] + }, + { + "Fn::Select": [ + 1, + { + "Fn::Split": [ + "||", + { + "Ref": "AssetParameters00e9009a1958b2a7ac277352874efb747a15aebd9bd8fb3c1e8b5a4a12f16819S3VersionKeyBC9A77D3" + } + ] + } + ] + } + ] + ] + } + }, + "Role": { + "Fn::GetAtt": [ + "GlueJobRole1CD031E0", + "Arn" + ] + }, + "GlueVersion": "1.0" + } + }, + "StateMachineRole543B9670": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": { + "Fn::Join": [ + "", + [ + "states.", + { + "Ref": "AWS::Region" + }, + ".amazonaws.com" + ] + ] + } + } + } + ], + "Version": "2012-10-17" + } + } + }, + "StateMachineRoleDefaultPolicyDA5F7DA8": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "glue:StartJobRun", + "glue:GetJobRun", + "glue:GetJobRuns", + "glue:BatchStopJobRun" + ], + "Effect": "Allow", + "Resource": "*" + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "StateMachineRoleDefaultPolicyDA5F7DA8", + "Roles": [ + { + "Ref": "StateMachineRole543B9670" + } + ] + } + }, + "StateMachine81935E76": { + "Type": "AWS::StepFunctions::StateMachine", + "Properties": { + "DefinitionString": { + "Fn::Join": [ + "", + [ + "{\"StartAt\":\"Start Task\",\"States\":{\"Start Task\":{\"Type\":\"Pass\",\"Next\":\"Glue Job Task\"},\"Glue Job Task\":{\"Next\":\"End Task\",\"Parameters\":{\"JobName\":\"My Glue Job\",\"Arguments\":{\"--enable-metrics\":\"true\"}},\"Type\":\"Task\",\"Resource\":\"arn:", + { + "Ref": "AWS::Partition" + }, + ":states:::glue:startJobRun.sync\"},\"End Task\":{\"Type\":\"Pass\",\"End\":true}}}" + ] + ] + }, + "RoleArn": { + "Fn::GetAtt": [ + "StateMachineRole543B9670", + "Arn" + ] + } + }, + "DependsOn": [ + "StateMachineRoleDefaultPolicyDA5F7DA8", + "StateMachineRole543B9670" + ] + } + } +} diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts new file mode 100644 index 0000000000000..97d5a03738e55 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts @@ -0,0 +1,55 @@ +import * as glue from '@aws-cdk/aws-glue'; +import * as iam from '@aws-cdk/aws-iam'; +import * as sfn from '@aws-cdk/aws-stepfunctions'; +import * as assets from '@aws-cdk/aws-s3-assets'; +import * as cdk from '@aws-cdk/core'; +import * as path from 'path'; +import * as tasks from '../lib'; + +/* + * Stack verification steps: + * * aws stepfunctions start-execution --state-machine-arn + */ + +const app = new cdk.App(); +const stack = new cdk.Stack(app, 'aws-stepfunctions-integ'); + +const codeAsset = new assets.Asset(stack, 'Glue Job Script', { + path: path.join(__dirname, 'my-glue-script/job.py') +}); + +const jobRole = new iam.Role(stack, 'Glue Job Role', { + assumedBy: new iam.ServicePrincipal('glue'), + managedPolicies: [ + iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSGlueServiceRole') + ] +}); + +const job = new glue.CfnJob(stack, 'Glue Job', { + name: 'My Glue Job', + glueVersion: '1.0', + command: { + name: 'glueetl', + pythonVersion: '3', + scriptLocation: `s3://${codeAsset.s3BucketName}/${codeAsset.s3ObjectKey}` + }, + role: jobRole.roleArn +}); + +const jobTask = new sfn.Task(stack, 'Glue Job Task', { + task: new tasks.RunGlueJobTask(job.name!, { + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, + arguments: { + "--enable-metrics": "true" + } + }) +}); + +const startTask = new sfn.Pass(stack, 'Start Task'); +const endTask = new sfn.Pass(stack, 'End Task'); + +new sfn.StateMachine(stack, 'State Machine', { + definition: sfn.Chain.start(startTask).next(jobTask).next(endTask) +}); + +app.synth() diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/my-glue-script/job.py b/packages/@aws-cdk/aws-stepfunctions-tasks/test/my-glue-script/job.py new file mode 100644 index 0000000000000..7df869a15e76c --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/my-glue-script/job.py @@ -0,0 +1 @@ +print("Hello, World!") diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts index 3ed0e69869f3d..3f30f2a1ad6bf 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts @@ -43,19 +43,19 @@ test('Invoke glue job with full properties', () => { const jobArguments = { key: "value" }; - const allocatedCapacity = 100; - const timeout = Duration.minutes(1440); + const timeoutMinutes = 1440; + const timeout = Duration.minutes(timeoutMinutes); const securityConfiguration = "securityConfiguration"; - const notificationProperty = { notifyDelayAfter: 10 }; + const notifyDelayAfterMinutes = 10; + const notifyDelayAfter = Duration.minutes(notifyDelayAfterMinutes); const task = new sfn.Task(stack, 'Task', { task: new tasks.RunGlueJobTask(jobName, { integrationPattern: sfn.ServiceIntegrationPattern.SYNC, jobRunId, arguments: jobArguments, - allocatedCapacity, timeout, securityConfiguration, - notificationProperty + notifyDelayAfter }) }); new sfn.StateMachine(stack, 'SM', { @@ -81,10 +81,11 @@ test('Invoke glue job with full properties', () => { JobName: jobName, JobRunId: jobRunId, Arguments: jobArguments, - AllocatedCapacity: allocatedCapacity, - Timeout: timeout, + Timeout: timeoutMinutes, SecurityConfiguration: securityConfiguration, - NotificationProperty: notificationProperty + NotificationProperty: { + NotifyDelayAfter: notifyDelayAfterMinutes + } }, }); }); diff --git a/packages/@aws-cdk/aws-stepfunctions/README.md b/packages/@aws-cdk/aws-stepfunctions/README.md index fd7c819dd1534..1104aa3633003 100644 --- a/packages/@aws-cdk/aws-stepfunctions/README.md +++ b/packages/@aws-cdk/aws-stepfunctions/README.md @@ -190,7 +190,7 @@ task.next(nextState); ```ts const task = new sfn.Task(stack, 'ETL', { - task: new tasks.RunGlueJobTask(glueJobArn, { + task: new tasks.RunGlueJobTask(glueJobName, { integrationPattern: sfn.ServiceIntegrationPattern.SYNC, arguments: { "--table-prefix": "myTable" From 56d8371947533435ff9001d7ad2e2e36e1fe2e5d Mon Sep 17 00:00:00 2001 From: Ben Chaimberg Date: Mon, 24 Feb 2020 10:39:11 -0800 Subject: [PATCH 03/12] remove job run ID from props, update default prop descriptions --- .../lib/run-glue-job-task.ts | 14 +++----------- .../test/run-glue-job-task.test.ts | 3 --- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts index ef81a51b10c53..f842b6b724e7b 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts @@ -17,13 +17,6 @@ export interface RunGlueJobTaskProps { */ readonly integrationPattern?: sfn.ServiceIntegrationPattern; - /** - * The ID of a previous JobRun to retry. - * - * @default - Creates new run - */ - readonly jobRunId?: string; - /** * The job arguments specifically for this run. For this job run, they replace the * default arguments set in the job definition itself. @@ -37,14 +30,14 @@ export interface RunGlueJobTaskProps { * resources before it is terminated and enters TIMEOUT status. Must be at least 1 * minute. * - * @default Duration.hours(48) + * @default - Default timeout set in the job definition */ readonly timeout?: Duration; /** * The name of the SecurityConfiguration structure to be used with this job run. * - * @default - No configuration + * @default - Default configuration set in the job definition */ readonly securityConfiguration?: string; @@ -52,7 +45,7 @@ export interface RunGlueJobTaskProps { * After a job run starts, the number of minutes to wait before sending a job run delay * notification. Must be at least 1 minute. * - * @default - No delay + * @default - Default delay set in the job definition */ readonly notifyDelayAfter?: Duration; } @@ -99,7 +92,6 @@ export class RunGlueJobTask implements sfn.IStepFunctionsTask { metricDimensions: { GlueJobName: this.glueJobName }, parameters: { JobName: this.glueJobName, - JobRunId: this.props.jobRunId, Arguments: this.props.arguments, Timeout: this.props.timeout?.toMinutes(), SecurityConfiguration: this.props.securityConfiguration, diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts index 3f30f2a1ad6bf..1eaf4ef0d9472 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts @@ -39,7 +39,6 @@ test('Invoke glue job with just job ARN', () => { }); test('Invoke glue job with full properties', () => { - const jobRunId = "jobRunId"; const jobArguments = { key: "value" }; @@ -51,7 +50,6 @@ test('Invoke glue job with full properties', () => { const task = new sfn.Task(stack, 'Task', { task: new tasks.RunGlueJobTask(jobName, { integrationPattern: sfn.ServiceIntegrationPattern.SYNC, - jobRunId, arguments: jobArguments, timeout, securityConfiguration, @@ -79,7 +77,6 @@ test('Invoke glue job with full properties', () => { End: true, Parameters: { JobName: jobName, - JobRunId: jobRunId, Arguments: jobArguments, Timeout: timeoutMinutes, SecurityConfiguration: securityConfiguration, From f4285b3b01dc544f8b86c5c34f1a00c9c5ba088c Mon Sep 17 00:00:00 2001 From: Ben Chaimberg Date: Mon, 24 Feb 2020 11:30:08 -0800 Subject: [PATCH 04/12] add s3 assets package to module --- packages/@aws-cdk/aws-stepfunctions-tasks/package.json | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/package.json b/packages/@aws-cdk/aws-stepfunctions-tasks/package.json index 2a85034d7cf96..7ede6a4312cb2 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/package.json +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/package.json @@ -97,6 +97,7 @@ "@aws-cdk/aws-kms": "1.25.0", "@aws-cdk/aws-lambda": "1.25.0", "@aws-cdk/aws-s3": "1.25.0", + "@aws-cdk/aws-s3-assets": "1.25.0", "@aws-cdk/aws-sns": "1.25.0", "@aws-cdk/aws-sqs": "1.25.0", "@aws-cdk/aws-stepfunctions": "1.25.0", @@ -115,6 +116,7 @@ "@aws-cdk/aws-kms": "1.25.0", "@aws-cdk/aws-lambda": "1.25.0", "@aws-cdk/aws-s3": "1.25.0", + "@aws-cdk/aws-s3-assets": "1.25.0", "@aws-cdk/aws-sns": "1.25.0", "@aws-cdk/aws-sqs": "1.25.0", "@aws-cdk/aws-stepfunctions": "1.25.0", From 442f2cde948dd66ffc9e1405127334b94a462313 Mon Sep 17 00:00:00 2001 From: Ben Chaimberg Date: Mon, 24 Feb 2020 12:40:46 -0800 Subject: [PATCH 05/12] fix linting errors --- .../@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts | 2 +- .../@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts | 4 ++-- .../aws-stepfunctions-tasks/test/run-glue-job-task.test.ts | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts index f842b6b724e7b..69db461e23031 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts @@ -75,7 +75,7 @@ export class RunGlueJobTask implements sfn.IStepFunctionsTask { } public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig { - const notificationProperty = this.props.notifyDelayAfter ? { NotifyDelayAfter: this.props.notifyDelayAfter.toMinutes() } : null + const notificationProperty = this.props.notifyDelayAfter ? { NotifyDelayAfter: this.props.notifyDelayAfter.toMinutes() } : null; return { resourceArn: getResourceArn("glue", "startJobRun", this.integrationPattern), policyStatements: [new iam.PolicyStatement({ diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts index 97d5a03738e55..603233f6a131b 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts @@ -1,7 +1,7 @@ import * as glue from '@aws-cdk/aws-glue'; import * as iam from '@aws-cdk/aws-iam'; -import * as sfn from '@aws-cdk/aws-stepfunctions'; import * as assets from '@aws-cdk/aws-s3-assets'; +import * as sfn from '@aws-cdk/aws-stepfunctions'; import * as cdk from '@aws-cdk/core'; import * as path from 'path'; import * as tasks from '../lib'; @@ -52,4 +52,4 @@ new sfn.StateMachine(stack, 'State Machine', { definition: sfn.Chain.start(startTask).next(jobTask).next(endTask) }); -app.synth() +app.synth(); diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts index 1eaf4ef0d9472..75d2e761b2a83 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts @@ -81,7 +81,7 @@ test('Invoke glue job with full properties', () => { Timeout: timeoutMinutes, SecurityConfiguration: securityConfiguration, NotificationProperty: { - NotifyDelayAfter: notifyDelayAfterMinutes + NotifyDelayAfter: notifyDelayAfterMinutes } }, }); From 876e05ee4ebf150be599a641717370635f64ad4b Mon Sep 17 00:00:00 2001 From: BenChaimberg Date: Wed, 26 Feb 2020 23:13:54 +0000 Subject: [PATCH 06/12] clean up documentation, add links to docs and glue task example --- .../lib/run-glue-job-task.ts | 20 ++++++++++++------- .../aws-stepfunctions-tasks/package.json | 3 +-- packages/@aws-cdk/aws-stepfunctions/README.md | 2 ++ 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts index 69db461e23031..1751357fd970e 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts @@ -18,17 +18,19 @@ export interface RunGlueJobTaskProps { readonly integrationPattern?: sfn.ServiceIntegrationPattern; /** - * The job arguments specifically for this run. For this job run, they replace the - * default arguments set in the job definition itself. + * The job arguments specifically for this run. + * + * For this job run, they replace the default arguments set in the job definition itself. * * @default - Default arguments set in the job definition */ readonly arguments?: { [key: string]: string }; /** - * The job run timeout. This is the maximum time that a job run can consume - * resources before it is terminated and enters TIMEOUT status. Must be at least 1 - * minute. + * The job run timeout. + * + * This is the maximum time that a job run can consume resources before it is terminated and enters TIMEOUT status. + * Must be at least 1 minute. * * @default - Default timeout set in the job definition */ @@ -37,13 +39,17 @@ export interface RunGlueJobTaskProps { /** * The name of the SecurityConfiguration structure to be used with this job run. * + * This must match the Glue API + * [single-line string pattern](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-common.html#aws-glue-api-regex-oneLine). + * * @default - Default configuration set in the job definition */ readonly securityConfiguration?: string; /** - * After a job run starts, the number of minutes to wait before sending a job run delay - * notification. Must be at least 1 minute. + * After a job run starts, the number of minutes to wait before sending a job run delay notification. + * + * Must be at least 1 minute. * * @default - Default delay set in the job definition */ diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/package.json b/packages/@aws-cdk/aws-stepfunctions-tasks/package.json index 3adcde0154f7b..b1d80d5db8110 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/package.json +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/package.json @@ -80,6 +80,7 @@ "license": "Apache-2.0", "devDependencies": { "@aws-cdk/assert": "999.0.0", + "@aws-cdk/aws-s3-assets": "999.0.0", "cdk-build-tools": "999.0.0", "cdk-integ-tools": "999.0.0", "jest": "^24.9.0", @@ -97,7 +98,6 @@ "@aws-cdk/aws-kms": "999.0.0", "@aws-cdk/aws-lambda": "999.0.0", "@aws-cdk/aws-s3": "999.0.0", - "@aws-cdk/aws-s3-assets": "999.0.0", "@aws-cdk/aws-sns": "999.0.0", "@aws-cdk/aws-sqs": "999.0.0", "@aws-cdk/aws-stepfunctions": "999.0.0", @@ -116,7 +116,6 @@ "@aws-cdk/aws-kms": "999.0.0", "@aws-cdk/aws-lambda": "999.0.0", "@aws-cdk/aws-s3": "999.0.0", - "@aws-cdk/aws-s3-assets": "999.0.0", "@aws-cdk/aws-sns": "999.0.0", "@aws-cdk/aws-sqs": "999.0.0", "@aws-cdk/aws-stepfunctions": "999.0.0", diff --git a/packages/@aws-cdk/aws-stepfunctions/README.md b/packages/@aws-cdk/aws-stepfunctions/README.md index 1104aa3633003..2c3aa2720d823 100644 --- a/packages/@aws-cdk/aws-stepfunctions/README.md +++ b/packages/@aws-cdk/aws-stepfunctions/README.md @@ -199,6 +199,8 @@ task.next(nextState); }); ``` +[Example CDK app](../aws-stepfunctions-tasks/test/integ.glue-task.ts) + #### SNS example ```ts From 2d9c360562f34fa5a5d3d6af2f04bc08da32636d Mon Sep 17 00:00:00 2001 From: BenChaimberg Date: Wed, 26 Feb 2020 23:14:19 +0000 Subject: [PATCH 07/12] add verification step to integration step, ensure job succeeds --- .../test/integ.glue-task.expected.json | 88 ++++++++++++++++--- .../test/integ.glue-task.ts | 9 +- .../test/my-glue-script/job.py | 15 ++++ 3 files changed, 99 insertions(+), 13 deletions(-) diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.expected.json b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.expected.json index b8c30102dc8e1..9da6cd522fc3c 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.expected.json +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.expected.json @@ -1,16 +1,16 @@ { "Parameters": { - "AssetParameters00e9009a1958b2a7ac277352874efb747a15aebd9bd8fb3c1e8b5a4a12f16819S3BucketCE4DD78A": { + "AssetParameters634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54S3BucketB960DFE2": { "Type": "String", - "Description": "S3 bucket for asset \"00e9009a1958b2a7ac277352874efb747a15aebd9bd8fb3c1e8b5a4a12f16819\"" + "Description": "S3 bucket for asset \"634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54\"" }, - "AssetParameters00e9009a1958b2a7ac277352874efb747a15aebd9bd8fb3c1e8b5a4a12f16819S3VersionKeyBC9A77D3": { + "AssetParameters634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54S3VersionKeyF9932D20": { "Type": "String", - "Description": "S3 key for asset version \"00e9009a1958b2a7ac277352874efb747a15aebd9bd8fb3c1e8b5a4a12f16819\"" + "Description": "S3 key for asset version \"634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54\"" }, - "AssetParameters00e9009a1958b2a7ac277352874efb747a15aebd9bd8fb3c1e8b5a4a12f16819ArtifactHash34012B55": { + "AssetParameters634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54ArtifactHash5EEBE9B2": { "Type": "String", - "Description": "Artifact hash for asset \"00e9009a1958b2a7ac277352874efb747a15aebd9bd8fb3c1e8b5a4a12f16819\"" + "Description": "Artifact hash for asset \"634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54\"" } }, "Resources": { @@ -45,10 +45,66 @@ ] } }, + "GlueJobRoleDefaultPolicy3D94D6F1": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "s3:GetObject*", + "s3:GetBucket*", + "s3:List*" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":s3:::", + { + "Ref": "AssetParameters634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54S3BucketB960DFE2" + } + ] + ] + }, + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":s3:::", + { + "Ref": "AssetParameters634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54S3BucketB960DFE2" + }, + "/*" + ] + ] + } + ] + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "GlueJobRoleDefaultPolicy3D94D6F1", + "Roles": [ + { + "Ref": "GlueJobRole1CD031E0" + } + ] + } + }, "GlueJob": { "Type": "AWS::Glue::Job", "Properties": { - "Name": "My Glue Job", "Command": { "Name": "glueetl", "PythonVersion": "3", @@ -58,7 +114,7 @@ [ "s3://", { - "Ref": "AssetParameters00e9009a1958b2a7ac277352874efb747a15aebd9bd8fb3c1e8b5a4a12f16819S3BucketCE4DD78A" + "Ref": "AssetParameters634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54S3BucketB960DFE2" }, "/", { @@ -68,7 +124,7 @@ "Fn::Split": [ "||", { - "Ref": "AssetParameters00e9009a1958b2a7ac277352874efb747a15aebd9bd8fb3c1e8b5a4a12f16819S3VersionKeyBC9A77D3" + "Ref": "AssetParameters634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54S3VersionKeyF9932D20" } ] } @@ -81,7 +137,7 @@ "Fn::Split": [ "||", { - "Ref": "AssetParameters00e9009a1958b2a7ac277352874efb747a15aebd9bd8fb3c1e8b5a4a12f16819S3VersionKeyBC9A77D3" + "Ref": "AssetParameters634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54S3VersionKeyF9932D20" } ] } @@ -97,7 +153,8 @@ "Arn" ] }, - "GlueVersion": "1.0" + "GlueVersion": "1.0", + "Name": "My Glue Job" } }, "StateMachineRole543B9670": { @@ -181,5 +238,12 @@ "StateMachineRole543B9670" ] } + }, + "Outputs": { + "StateMachineARNOutput": { + "Value": { + "Ref": "StateMachine81935E76" + } + } } -} +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts index 603233f6a131b..feff9ed994d0d 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts @@ -9,6 +9,8 @@ import * as tasks from '../lib'; /* * Stack verification steps: * * aws stepfunctions start-execution --state-machine-arn + * * aws stepfunctions describe-execution --execution-arn + * * should eventually return status "SUCCEEDED" */ const app = new cdk.App(); @@ -24,6 +26,7 @@ const jobRole = new iam.Role(stack, 'Glue Job Role', { iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSGlueServiceRole') ] }); +codeAsset.grantRead(jobRole); const job = new glue.CfnJob(stack, 'Glue Job', { name: 'My Glue Job', @@ -48,8 +51,12 @@ const jobTask = new sfn.Task(stack, 'Glue Job Task', { const startTask = new sfn.Pass(stack, 'Start Task'); const endTask = new sfn.Pass(stack, 'End Task'); -new sfn.StateMachine(stack, 'State Machine', { +const stateMachine = new sfn.StateMachine(stack, 'State Machine', { definition: sfn.Chain.start(startTask).next(jobTask).next(endTask) }); +new cdk.CfnOutput(stack, 'State Machine ARN Output', { + value: stateMachine.stateMachineArn +}); + app.synth(); diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/my-glue-script/job.py b/packages/@aws-cdk/aws-stepfunctions-tasks/test/my-glue-script/job.py index 7df869a15e76c..054cefb5a8804 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/my-glue-script/job.py +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/my-glue-script/job.py @@ -1 +1,16 @@ +import sys + +from awsglue.context import GlueContext +from awsglue.job import Job +from awsglue.utils import getResolvedOptions +from pyspark.context import SparkContext + +spark_context = SparkContext() +glue_context = GlueContext(spark_context) +job = Job(glue_context) +args = getResolvedOptions(sys.argv, ["JOB_NAME"]) +job.init(args["JOB_NAME"], args) + print("Hello, World!") + +job.commit() From bf5f39a7e11db847d97549108548fdfe0ab4c2f6 Mon Sep 17 00:00:00 2001 From: BenChaimberg Date: Wed, 26 Feb 2020 23:55:57 +0000 Subject: [PATCH 08/12] update expected integration test stack (asset names) --- .../test/integ.glue-task.expected.json | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.expected.json b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.expected.json index 9da6cd522fc3c..aa23dcb439c45 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.expected.json +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.expected.json @@ -1,16 +1,16 @@ { "Parameters": { - "AssetParameters634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54S3BucketB960DFE2": { + "AssetParametersd030bb7913ca422df69f29b2ea678ab4e5085bb3cbb17029e4b101d2dc4e3e0dS3BucketB8F6851B": { "Type": "String", - "Description": "S3 bucket for asset \"634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54\"" + "Description": "S3 bucket for asset \"d030bb7913ca422df69f29b2ea678ab4e5085bb3cbb17029e4b101d2dc4e3e0d\"" }, - "AssetParameters634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54S3VersionKeyF9932D20": { + "AssetParametersd030bb7913ca422df69f29b2ea678ab4e5085bb3cbb17029e4b101d2dc4e3e0dS3VersionKey7BCC06FC": { "Type": "String", - "Description": "S3 key for asset version \"634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54\"" + "Description": "S3 key for asset version \"d030bb7913ca422df69f29b2ea678ab4e5085bb3cbb17029e4b101d2dc4e3e0d\"" }, - "AssetParameters634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54ArtifactHash5EEBE9B2": { + "AssetParametersd030bb7913ca422df69f29b2ea678ab4e5085bb3cbb17029e4b101d2dc4e3e0dArtifactHashEC764944": { "Type": "String", - "Description": "Artifact hash for asset \"634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54\"" + "Description": "Artifact hash for asset \"d030bb7913ca422df69f29b2ea678ab4e5085bb3cbb17029e4b101d2dc4e3e0d\"" } }, "Resources": { @@ -68,7 +68,7 @@ }, ":s3:::", { - "Ref": "AssetParameters634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54S3BucketB960DFE2" + "Ref": "AssetParametersd030bb7913ca422df69f29b2ea678ab4e5085bb3cbb17029e4b101d2dc4e3e0dS3BucketB8F6851B" } ] ] @@ -83,7 +83,7 @@ }, ":s3:::", { - "Ref": "AssetParameters634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54S3BucketB960DFE2" + "Ref": "AssetParametersd030bb7913ca422df69f29b2ea678ab4e5085bb3cbb17029e4b101d2dc4e3e0dS3BucketB8F6851B" }, "/*" ] @@ -114,7 +114,7 @@ [ "s3://", { - "Ref": "AssetParameters634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54S3BucketB960DFE2" + "Ref": "AssetParametersd030bb7913ca422df69f29b2ea678ab4e5085bb3cbb17029e4b101d2dc4e3e0dS3BucketB8F6851B" }, "/", { @@ -124,7 +124,7 @@ "Fn::Split": [ "||", { - "Ref": "AssetParameters634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54S3VersionKeyF9932D20" + "Ref": "AssetParametersd030bb7913ca422df69f29b2ea678ab4e5085bb3cbb17029e4b101d2dc4e3e0dS3VersionKey7BCC06FC" } ] } @@ -137,7 +137,7 @@ "Fn::Split": [ "||", { - "Ref": "AssetParameters634604e16697683c518163141aefd2814b73b010e143b29dfbecc48dc98c3c54S3VersionKeyF9932D20" + "Ref": "AssetParametersd030bb7913ca422df69f29b2ea678ab4e5085bb3cbb17029e4b101d2dc4e3e0dS3VersionKey7BCC06FC" } ] } From 8933a676c05c38a5a4e95fc530698067a94bf160 Mon Sep 17 00:00:00 2001 From: BenChaimberg Date: Thu, 27 Feb 2020 19:10:33 +0000 Subject: [PATCH 09/12] add integ test verification comment about glue cold start --- .../aws-stepfunctions-tasks/test/integ.glue-task.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts index feff9ed994d0d..c72d937c64964 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts @@ -10,7 +10,11 @@ import * as tasks from '../lib'; * Stack verification steps: * * aws stepfunctions start-execution --state-machine-arn * * aws stepfunctions describe-execution --execution-arn - * * should eventually return status "SUCCEEDED" + * The "describe-execution" call should eventually return status "SUCCEEDED", but it may take up to + * 15 minutes to do so. This is because the cold start time for AWS Glue (as of 02/2020) is around + * 10-15 minutes and so even a job with a functional 20 second runtime (as this job is) can take + * that long to complete. There can also be a delay between the job finishing and the step functions + * task registering its complation but this delay is dominated by the above cold start time. */ const app = new cdk.App(); From 7db0ab038b150e4794d64c60b505628af1fb8512 Mon Sep 17 00:00:00 2001 From: Niranjan Jayakar Date: Fri, 28 Feb 2020 09:58:15 +0000 Subject: [PATCH 10/12] cleaned up the note around cold start --- .../aws-stepfunctions-tasks/test/integ.glue-task.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts index c72d937c64964..0c7573f5e9a26 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.ts @@ -10,11 +10,9 @@ import * as tasks from '../lib'; * Stack verification steps: * * aws stepfunctions start-execution --state-machine-arn * * aws stepfunctions describe-execution --execution-arn - * The "describe-execution" call should eventually return status "SUCCEEDED", but it may take up to - * 15 minutes to do so. This is because the cold start time for AWS Glue (as of 02/2020) is around - * 10-15 minutes and so even a job with a functional 20 second runtime (as this job is) can take - * that long to complete. There can also be a delay between the job finishing and the step functions - * task registering its complation but this delay is dominated by the above cold start time. + * The "describe-execution" call should eventually return status "SUCCEEDED". + * NOTE: It will take up to 15 minutes for the step function to completem due to the cold start time + * for AWS Glue, which as of 02/2020, is around 10-15 minutes. */ const app = new cdk.App(); From f7b2147d6a1af11e708e9db7a92f3d46aba385ab Mon Sep 17 00:00:00 2001 From: BenChaimberg Date: Fri, 28 Feb 2020 18:56:17 +0000 Subject: [PATCH 11/12] specify glue job ARN in state machine role permissions --- .../lib/run-glue-job-task.ts | 12 ++++++++--- .../test/integ.glue-task.expected.json | 21 ++++++++++++++++++- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts index 1751357fd970e..f5e403225f269 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts @@ -1,6 +1,6 @@ import * as iam from '@aws-cdk/aws-iam'; import * as sfn from '@aws-cdk/aws-stepfunctions'; -import { Duration } from '@aws-cdk/core'; +import { Duration, Stack } from '@aws-cdk/core'; import { getResourceArn } from './resource-arn-suffix'; /** @@ -80,12 +80,18 @@ export class RunGlueJobTask implements sfn.IStepFunctionsTask { } } - public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig { + public bind(task: sfn.Task): sfn.StepFunctionsTaskConfig { const notificationProperty = this.props.notifyDelayAfter ? { NotifyDelayAfter: this.props.notifyDelayAfter.toMinutes() } : null; return { resourceArn: getResourceArn("glue", "startJobRun", this.integrationPattern), policyStatements: [new iam.PolicyStatement({ - resources: ["*"], + resources: [ + Stack.of(task).formatArn({ + service: "glue", + resource: "job", + resourceName: this.glueJobName + }) + ], actions: [ "glue:StartJobRun", "glue:GetJobRun", diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.expected.json b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.expected.json index aa23dcb439c45..8f9e9aba0102f 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.expected.json +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.glue-task.expected.json @@ -198,7 +198,26 @@ "glue:BatchStopJobRun" ], "Effect": "Allow", - "Resource": "*" + "Resource": { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":glue:", + { + "Ref": "AWS::Region" + }, + ":", + { + "Ref": "AWS::AccountId" + }, + ":job/My Glue Job" + ] + ] + } } ], "Version": "2012-10-17" From f81ee8de22ec140a59a1211e0fabed848786677d Mon Sep 17 00:00:00 2001 From: BenChaimberg Date: Fri, 28 Feb 2020 20:08:46 +0000 Subject: [PATCH 12/12] change state machine role permissions based on service integration pattern --- .../lib/run-glue-job-task.ts | 18 +++++--- .../test/run-glue-job-task.test.ts | 43 +++++++++++++++++++ 2 files changed, 55 insertions(+), 6 deletions(-) diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts index f5e403225f269..725edfa615866 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-glue-job-task.ts @@ -82,6 +82,17 @@ export class RunGlueJobTask implements sfn.IStepFunctionsTask { public bind(task: sfn.Task): sfn.StepFunctionsTaskConfig { const notificationProperty = this.props.notifyDelayAfter ? { NotifyDelayAfter: this.props.notifyDelayAfter.toMinutes() } : null; + let iamActions: string[] | undefined; + if (this.integrationPattern === sfn.ServiceIntegrationPattern.FIRE_AND_FORGET) { + iamActions = ["glue:StartJobRun"]; + } else if (this.integrationPattern === sfn.ServiceIntegrationPattern.SYNC) { + iamActions = [ + "glue:StartJobRun", + "glue:GetJobRun", + "glue:GetJobRuns", + "glue:BatchStopJobRun" + ]; + } return { resourceArn: getResourceArn("glue", "startJobRun", this.integrationPattern), policyStatements: [new iam.PolicyStatement({ @@ -92,12 +103,7 @@ export class RunGlueJobTask implements sfn.IStepFunctionsTask { resourceName: this.glueJobName }) ], - actions: [ - "glue:StartJobRun", - "glue:GetJobRun", - "glue:GetJobRuns", - "glue:BatchStopJobRun" - ], + actions: iamActions })], metricPrefixSingular: 'GlueJob', metricPrefixPlural: 'GlueJobs', diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts index 75d2e761b2a83..b702d8c3f3d1f 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-glue-job-task.test.ts @@ -87,6 +87,49 @@ test('Invoke glue job with full properties', () => { }); }); +test('permitted role actions limited to start job run if service integration pattern is FIRE_AND_FORGET', () => { + const task = new sfn.Task(stack, 'Task', { + task: new tasks.RunGlueJobTask(jobName, { + integrationPattern: sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, + }) + }); + new sfn.StateMachine(stack, 'SM', { + definition: task + }); + + expect(stack).toHaveResourceLike('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [{ + Action: "glue:StartJobRun" + }] + } + }); +}); + +test('permitted role actions include start, get, and stop job run if service integration pattern is SYNC', () => { + const task = new sfn.Task(stack, 'Task', { + task: new tasks.RunGlueJobTask(jobName, { + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, + }) + }); + new sfn.StateMachine(stack, 'SM', { + definition: task + }); + + expect(stack).toHaveResourceLike('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [{ + Action: [ + "glue:StartJobRun", + "glue:GetJobRun", + "glue:GetJobRuns", + "glue:BatchStopJobRun" + ] + }] + } + }); +}); + test('Task throws if WAIT_FOR_TASK_TOKEN is supplied as service integration pattern', () => { expect(() => { new sfn.Task(stack, 'Task', {