Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stepfunctions-tasks): implement emr service integrations #5372

Merged
merged 64 commits into from
Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
6f8c1c5
added emr-create-cluster task
Dec 10, 2019
ba262db
convert clusterConfigurations to TaskInput
Dec 10, 2019
e94953b
remove console.log debugging
Dec 10, 2019
b869656
emr-set-cluster-termination-protection unit tests
Dec 10, 2019
ca0b2b8
Merge remote-tracking branch 'upstream/master'
Dec 10, 2019
22f66ba
emr -> elasticmapreduce in task ARNs
Dec 10, 2019
9fd0b42
reset noImplicitAny
Dec 10, 2019
c595aeb
added emr-terminate-cluster task
Dec 10, 2019
fe1dfe4
add emr-add-step task
Dec 10, 2019
228e0b7
add emr-cancel-step task
Dec 10, 2019
08fe474
add emr-modify-instance-fleet-by-name task
Dec 10, 2019
13c6d16
add emr-modify-instance-group-by-name task
Dec 11, 2019
9dc9404
standardize on single quotes
Dec 11, 2019
3517784
default emr tasks to SYNC instead of FIRE_AND_FORGET
Dec 11, 2019
6a8c01b
Merge remote-tracking branch 'upstream/master' into emr-tasks
Dec 11, 2019
9fb3397
fix emr task tests with specific service integrations
Dec 11, 2019
8361541
add @experimental to interfaces and classes
Dec 23, 2019
d9fa287
add REGION and ACCOUNT_ID to generated policies
Dec 23, 2019
1b2ce19
replace TaskInput.fromDataAt() with Data.stringAt()
Dec 23, 2019
56f8ed8
remove IDE generated trailing whitespace
Dec 23, 2019
402bb59
explicitly define cluster config parameters
Dec 24, 2019
4b9f867
updated props-default-doc exclusions
Dec 30, 2019
6ae496f
fully parameterized emr-add-step
Dec 30, 2019
fd04f13
additional emr-add-step test
Dec 30, 2019
71cf074
merge master
Dec 30, 2019
0f56de6
undo changes to .gitignore
Dec 30, 2019
879b459
undo changes to .gitignore
Dec 30, 2019
d5fb0fe
Merge branch 'master' into explicit-create-cluster-params
Dec 30, 2019
9030a0c
Merge remote-tracking branch 'upstream/master' into emr-stepfunctions…
Dec 30, 2019
763bc9e
Merge branch 'master' into explicit-create-cluster-params
Dec 30, 2019
2d2c18f
resolve A style import is forbidden errors
Dec 30, 2019
6272991
resolve A style import is forbidden errors
Dec 30, 2019
2584334
resolve A 'require()' style import is forbidden errors
Dec 30, 2019
00c0a3a
resolve A 'require()' style import is forbidden errors
Dec 30, 2019
6e7290a
Merge branch 'master' into explicit-create-cluster-params
Dec 30, 2019
b8a9347
fix CfnTag import dropped during merge
Dec 30, 2019
e0f1f28
explicitly set default VisibleToAllUsers
Dec 30, 2019
49146bc
optional cluster roles, create if not provided
Jan 21, 2020
4a0ffbf
Merge remote-tracking branch 'upstream/master' into optional-create-c…
Jan 21, 2020
2f95eba
Role tests, fix autoScalingRole binding
Jan 22, 2020
197aaca
Merge remote-tracking branch 'upstream/master' into optional-create-c…
Jan 22, 2020
802c8c7
drop extraneous console.log
Jan 22, 2020
fc1921a
added Application, BootstrapAction, Configuration, KerberosAttributes…
Jan 22, 2020
6f87428
added Instances and InstanceFleet
Jan 22, 2020
6c8ee2c
Merge remote-tracking branch 'upstream/master' into full-depth-create…
Jan 22, 2020
c0a5009
EbsBlockDeviceType enum
Jan 23, 2020
1a5e915
added InstanceFleet test
Jan 23, 2020
0274ed8
implement Instances.InstanceGroups
Jan 23, 2020
4299776
Merge remote-tracking branch 'upstream/master' into full-depth-create…
Jan 23, 2020
3a423b1
fix instanceFleeType -> instanceFleetType
Jan 23, 2020
58290a8
fix instanceFleeType -> instanceFleetType
Jan 24, 2020
f08d982
nullable enum handling
Jan 26, 2020
f4463ae
fix: don't create AutoScaling role if InstanceFleets are used
Jan 27, 2020
fb0fb90
create instance profile for instance role
Jan 27, 2020
5500dbf
fix: rolename -> roleArn
Jan 27, 2020
2bfc810
fix: replace generic TaskInput with specific named parameters
Jan 27, 2020
1e47c29
require both target capacities
Jan 27, 2020
e11d647
replace generic TaskInput with specific named parameters
Jan 27, 2020
c45c09f
replace TaskInput.fromDataAt with Data.numberAt
Jan 27, 2020
fa63ce4
Merge remote-tracking branch 'upstream/master'
Jan 27, 2020
cf015f7
use cdk.Duration for timeouts and periods
Feb 9, 2020
0f227f4
fixed linter documentation errors
Feb 9, 2020
2caae05
Merge remote-tracking branch 'upstream/master' into emr-stepfunctions…
Feb 9, 2020
53818ba
Merge branch 'master' into emr-stepfunctions-tasks
mergify[bot] Feb 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 189 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/emr-add-step.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import * as iam from '@aws-cdk/aws-iam';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import { Aws, Stack } from '@aws-cdk/core';
import { getResourceArn } from './resource-arn-suffix';

/**
* The action to take when the cluster step fails.
* @see https://docs.aws.amazon.com/emr/latest/APIReference/API_StepConfig.html
*
* Here, they are named as TERMINATE_JOB_FLOW, TERMINATE_CLUSTER, CANCEL_AND_WAIT, and CONTINUE respectively.
*
* @default CONTINUE
*
* @experimental
*/
export enum ActionOnFailure {
/**
* Terminate the Cluster on Step Failure
*/
TERMINATE_CLUSTER = 'TERMINATE_CLUSTER',

/**
* Cancel Step execution and enter WAITING state
*/
CANCEL_AND_WAIT = 'CANCEL_AND_WAIT',

/**
* Continue to the next Step
*/
CONTINUE = 'CONTINUE'
}

/**
* Properties for EmrAddStep
*
* @experimental
*/
export interface EmrAddStepProps {
/**
* The ClusterId to add the Step to.
*/
readonly clusterId: string;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @chamcca, one more question about using this.

I will probably want to get clusterId out of the create cluster state output and pass it here. Normally this would be done with passing parameters using paths but how could I do that here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use either the sfn.Data.stringAt() or sfn.TaskInput.fromDataAt().value methods. These both take a JSON path parameter.

take a look at the /test/emr-add-step.test.ts there is an example there that gets the clusterId from "$.ClusterId"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!


/**
* The name of the Step
*
* @see https://docs.aws.amazon.com/emr/latest/APIReference/API_StepConfig.html
*/
readonly name: string;

/**
* The action to take when the cluster step fails.
*
* @see https://docs.aws.amazon.com/emr/latest/APIReference/API_StepConfig.html
*
* @default CONTINUE
*/
readonly actionOnFailure?: ActionOnFailure;

/**
* A path to a JAR file run during the step.
*
* @see https://docs.aws.amazon.com/emr/latest/APIReference/API_HadoopJarStepConfig.html
*/
readonly jar: string;

/**
* The name of the main class in the specified Java file. If not specified, the JAR file should specify a Main-Class in its manifest file.
*
* @see https://docs.aws.amazon.com/emr/latest/APIReference/API_HadoopJarStepConfig.html
*
* @default No mainClass
*/
readonly mainClass?: string;

/**
* A list of command line arguments passed to the JAR file's main function when executed.
*
* @see https://docs.aws.amazon.com/emr/latest/APIReference/API_HadoopJarStepConfig.html
*
* @default No args
*/
readonly args?: string[];

/**
* A list of Java properties that are set when the step runs. You can use these properties to pass key value pairs to your main function.
*
* @see https://docs.aws.amazon.com/emr/latest/APIReference/API_HadoopJarStepConfig.html
*
* @default No properties
*/
readonly properties?: { [key: string]: string };

/**
* The service integration pattern indicates different ways to call AddStep.
*
* The valid value is either FIRE_AND_FORGET or SYNC.
*
* @default SYNC
*/
readonly integrationPattern?: sfn.ServiceIntegrationPattern;
}

/**
* A Step Functions Task to add a Step to an EMR Cluster
*
* The StepConfiguration is defined as Parameters in the state machine definition.
*
* OUTPUT: the StepId
*
* @experimental
*/
export class EmrAddStep implements sfn.IStepFunctionsTask {

private readonly actionOnFailure: ActionOnFailure;
private readonly integrationPattern: sfn.ServiceIntegrationPattern;

constructor(private readonly props: EmrAddStepProps) {
this.actionOnFailure = props.actionOnFailure || ActionOnFailure.CONTINUE;
this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.SYNC;

const supportedPatterns = [
sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
sfn.ServiceIntegrationPattern.SYNC
];

if (!supportedPatterns.includes(this.integrationPattern)) {
throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call AddStep.`);
}
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
return {
resourceArn: getResourceArn('elasticmapreduce', 'addStep', this.integrationPattern),
policyStatements: this.createPolicyStatements(_task),
parameters: {
ClusterId: this.props.clusterId,
Step: {
Name: this.props.name,
ActionOnFailure: this.actionOnFailure.valueOf(),
HadoopJarStep: {
Jar: this.props.jar,
MainClass: this.props.mainClass,
Args: this.props.args,
Properties: (this.props.properties === undefined) ?
undefined :
Object.entries(this.props.properties).map(
kv => ({
Key: kv[0],
Value: kv[1]
})
)
}
}
}
};
}

/**
* This generates the PolicyStatements required by the Task to call AddStep.
*/
private createPolicyStatements(task: sfn.Task): iam.PolicyStatement[] {
const stack = Stack.of(task);

const policyStatements = [
new iam.PolicyStatement({
actions: [
'elasticmapreduce:AddJobFlowSteps',
'elasticmapreduce:DescribeStep',
'elasticmapreduce:CancelSteps'
],
resources: [`arn:aws:elasticmapreduce:${Aws.REGION}:${Aws.ACCOUNT_ID}:cluster/*`]
})
];

if (this.integrationPattern === sfn.ServiceIntegrationPattern.SYNC) {
policyStatements.push(new iam.PolicyStatement({
actions: ['events:PutTargets', 'events:PutRule', 'events:DescribeRule'],
resources: [stack.formatArn({
service: 'events',
resource: 'rule',
resourceName: 'StepFunctionsGetEventForEMRAddJobFlowStepsRule'
})]
}));
}

return policyStatements;
}
}
48 changes: 48 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/emr-cancel-step.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import * as iam from '@aws-cdk/aws-iam';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import { Aws } from '@aws-cdk/core';
import { getResourceArn } from './resource-arn-suffix';

/**
* Properties for EmrCancelStep
*
* @experimental
*/
export interface EmrCancelStepProps {
/**
* The ClusterId to update.
*/
readonly clusterId: string;

/**
* The StepId to cancel.
*/
readonly stepId: string;
}

/**
* A Step Functions Task to to cancel a Step on an EMR Cluster.
*
* @experimental
*/
export class EmrCancelStep implements sfn.IStepFunctionsTask {

constructor(private readonly props: EmrCancelStepProps) {}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
return {
resourceArn: getResourceArn('elasticmapreduce', 'cancelStep',
sfn.ServiceIntegrationPattern.FIRE_AND_FORGET),
policyStatements: [
new iam.PolicyStatement({
actions: ['elasticmapreduce:CancelSteps'],
resources: [`arn:aws:elasticmapreduce:${Aws.REGION}:${Aws.ACCOUNT_ID}:cluster/*`]
})
],
parameters: {
ClusterId: this.props.clusterId,
StepId: this.props.stepId
}
};
}
}
Loading