Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stepfunctions-tasks): task for starting a job run in AWS Glue #8143

Merged
merged 5 commits into from
May 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions packages/@aws-cdk/aws-stepfunctions-tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -509,14 +509,13 @@ Step Functions supports [AWS Glue](https://docs.aws.amazon.com/step-functions/la
You can call the [`StartJobRun`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-jobs-runs.html#aws-glue-api-jobs-runs-StartJobRun) API from a `Task` state.

```ts
new sfn.Task(stack, 'Task', {
task: new tasks.RunGlueJobTask(jobName, {
arguments: {
key: 'value',
},
timeout: cdk.Duration.minutes(30),
notifyDelayAfter: cdk.Duration.minutes(5),
}),
new GlueStartJobRun(stack, 'Task', {
jobName: 'my-glue-job',
arguments: {
key: 'value',
},
timeout: cdk.Duration.minutes(30),
notifyDelayAfter: cdk.Duration.minutes(5),
});
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { getResourceArn } from '../resource-arn-suffix';

/**
* Properties for RunGlueJobTask
*
* @deprecated use `GlueStartJobRun`
*/
export interface RunGlueJobTaskProps {

Expand Down Expand Up @@ -63,6 +65,8 @@ export interface RunGlueJobTaskProps {
* https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-jobs-runs.html#aws-glue-api-jobs-runs-JobRun
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-glue.html
*
* @deprecated use `GlueStartJobRun`
*/
export class RunGlueJobTask implements sfn.IStepFunctionsTask {
private readonly integrationPattern: sfn.ServiceIntegrationPattern;
Expand Down
119 changes: 119 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/glue/start-job-run.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import * as iam from '@aws-cdk/aws-iam';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import { Construct, Duration, Stack } from '@aws-cdk/core';
import { integrationResourceArn, validatePatternSupported } from '../private/task-utils';

/**
* Properties for starting an AWS Glue job as a task
*/
export interface GlueStartJobRunProps extends sfn.TaskStateBaseProps {

/**
* Glue job name
*/
readonly glueJobName: string;

/**
* The job arguments specifically for this run.
*
* For this job run, they replace the default arguments set in the job
* definition itself.
*
* @default - Default arguments set in the job definition
*/
readonly arguments?: sfn.TaskInput;

/**
* The name of the SecurityConfiguration structure to be used with this job run.
*
* This must match the Glue API
* @see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-common.html#aws-glue-api-regex-oneLine
*
* @default - Default configuration set in the job definition
*/
readonly securityConfiguration?: string;

/**
* After a job run starts, the number of minutes to wait before sending a job run delay notification.
*
* Must be at least 1 minute.
*
* @default - Default delay set in the job definition
*/
readonly notifyDelayAfter?: Duration;
}

/**
* Starts an AWS Glue job in a Task state
*
* OUTPUT: the output of this task is a JobRun structure, for details consult
* https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-jobs-runs.html#aws-glue-api-jobs-runs-JobRun
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-glue.html
*/
export class GlueStartJobRun extends sfn.TaskStateBase {
private static readonly SUPPORTED_INTEGRATION_PATTERNS: sfn.IntegrationPattern[] = [
sfn.IntegrationPattern.REQUEST_RESPONSE,
sfn.IntegrationPattern.RUN_JOB,
];

protected readonly taskMetrics?: sfn.TaskMetricsConfig;
protected readonly taskPolicies?: iam.PolicyStatement[];

private readonly integrationPattern: sfn.IntegrationPattern;

constructor(scope: Construct, id: string, private readonly props: GlueStartJobRunProps) {
super(scope, id, props);
this.integrationPattern = props.integrationPattern ?? sfn.IntegrationPattern.REQUEST_RESPONSE;

validatePatternSupported(this.integrationPattern, GlueStartJobRun.SUPPORTED_INTEGRATION_PATTERNS);

this.taskPolicies = this.getPolicies();

this.taskMetrics = {
metricPrefixSingular: 'GlueJob',
metricPrefixPlural: 'GlueJobs',
metricDimensions: { GlueJobName: this.props.glueJobName },
};
}

protected renderTask(): any {
const notificationProperty = this.props.notifyDelayAfter ? { NotifyDelayAfter: this.props.notifyDelayAfter.toMinutes() } : null;
return {
Resource: integrationResourceArn('glue', 'startJobRun', this.integrationPattern),
Parameters: sfn.FieldUtils.renderObject({
JobName: this.props.glueJobName,
Arguments: this.props.arguments?.value,
Timeout: this.props.timeout?.toMinutes(),
SecurityConfiguration: this.props.securityConfiguration,
NotificationProperty: notificationProperty,
}),
TimeoutSeconds: undefined,
};
}

private getPolicies(): iam.PolicyStatement[] {
let iamActions: string[] | undefined;
if (this.integrationPattern === sfn.IntegrationPattern.REQUEST_RESPONSE) {
iamActions = ['glue:StartJobRun'];
} else if (this.integrationPattern === sfn.IntegrationPattern.RUN_JOB) {
iamActions = [
'glue:StartJobRun',
'glue:GetJobRun',
'glue:GetJobRuns',
'glue:BatchStopJobRun',
];
}

return [new iam.PolicyStatement({
resources: [
Stack.of(this).formatArn({
service: 'glue',
resource: 'job',
resourceName: this.props.glueJobName,
}),
],
actions: iamActions,
})];
}
}
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ export * from './emr/emr-cancel-step';
export * from './emr/emr-modify-instance-fleet-by-name';
export * from './emr/emr-modify-instance-group-by-name';
export * from './glue/run-glue-job-task';
export * from './glue/start-job-run';
export * from './batch/run-batch-job';
export * from './dynamodb/call-dynamodb';
Loading