-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(stepfunctions-tasks): implement emr service integrations #5372
Conversation
Thanks so much for taking the time to contribute to the AWS CDK ❤️ We will shortly assign someone to review this pull request and help get it
|
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
Thanks so much for taking the time to contribute to the AWS CDK ❤️ We will shortly assign someone to review this pull request and help get it
|
5 similar comments
Thanks so much for taking the time to contribute to the AWS CDK ❤️ We will shortly assign someone to review this pull request and help get it
|
Thanks so much for taking the time to contribute to the AWS CDK ❤️ We will shortly assign someone to review this pull request and help get it
|
Thanks so much for taking the time to contribute to the AWS CDK ❤️ We will shortly assign someone to review this pull request and help get it
|
Thanks so much for taking the time to contribute to the AWS CDK ❤️ We will shortly assign someone to review this pull request and help get it
|
Thanks so much for taking the time to contribute to the AWS CDK ❤️ We will shortly assign someone to review this pull request and help get it
|
A note on design decisions. A simple State Machine using these Step Functions Tasks is demonstrated below as a reference for the following explanations. L2s and Interfaces: These Step Functions Tasks do not use L2 constructs for their inputs because in the majority of use cases the constructs wouldn't exist or be created at CDK deployment time. Most notable is the One of the primary uses of the EMR/Step Functions integrations these new tasks implement is to support the management of Transient EMR clusters. Where the cluster is created by the Step Function at run time and only exists for the execution life time of the Step Function. This is demonstrated in the example below. We just published an example of this workflow on our AWS Big Data Blog here: https://aws.amazon.com/blogs/aws/new-using-step-functions-to-orchestrate-amazon-emr-workloads/ In the EmrAddStep example below, the clusterId: sfn.TaskInput.fromDataAt('$.CreateClusterResult.ClusterId').value, is automatically populated with the TaskInputs: 'Name': sfn.TaskInput.fromDataAt('$.ClusterName').value, within the 'Name.$': '$.ClusterName', in the ASL generated during deployment. This frees the user from having to manually do this. The same is true for the Example State Machine import iam = require('@aws-cdk/aws-iam');
import sfn = require('@aws-cdk/aws-stepfunctions');
import sfn_tasks = require('@aws-cdk/aws-stepfunctions-tasks');
import sqs = require('@aws-cdk/aws-sqs');
import cdk = require('@aws-cdk/core');
export class AwsCdkEmrTasksStack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
const roles = [
iam.Role.fromRoleArn(this, 'ServiceRole', this.formatArn({
region: '',
service: 'iam',
resource: 'role',
resourceName: 'EMR_DefaultRole'
})),
iam.Role.fromRoleArn(this, 'InstanceRole', this.formatArn({
region: '',
service: 'iam',
resource: 'role',
resourceName: 'EMR_EC2_DefaultRole'
}))
];
const createCluster = new sfn.Task(this, 'CreateCluster', {
task: new sfn_tasks.EmrCreateCluster({
clusterRoles: roles,
integrationPattern: sfn.ServiceIntegrationPattern.SYNC,
clusterConfiguration: sfn.TaskInput.fromObject({
'Name': sfn.TaskInput.fromDataAt('$.ClusterName').value,
'VisibleToAllUsers': true,
'ReleaseLabel': 'emr-5.28.0',
'Applications': [{ 'Name': 'Hive' }],
'ServiceRole': 'EMR_DefaultRole',
'JobFlowRole': 'EMR_EC2_DefaultRole',
'LogUri': 's3://chamcca-emr-launch-logs-uw2/elasticmapreduce/',
'Instances': {
'KeepJobFlowAliveWhenNoSteps': true,
'InstanceFleets': [
{
'Name': 'MasterFleet',
'InstanceFleetType': 'MASTER',
'TargetOnDemandCapacity': 1,
'InstanceTypeConfigs': [
{
'InstanceType': 'm4.xlarge'
}
]
},
{
'Name': 'CoreFleet',
'InstanceFleetType': 'CORE',
'TargetOnDemandCapacity': 1,
'InstanceTypeConfigs': [
{
'InstanceType': 'm4.xlarge'
}
]
}
]
}
})
}),
resultPath: '$.CreateClusterResult'
});
const step = new sfn.Task(this, 'Step', {
task: new sfn_tasks.EmrAddStep({
clusterId: sfn.TaskInput.fromDataAt('$.CreateClusterResult.ClusterId').value,
integrationPattern: sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
stepConfiguration: sfn.TaskInput.fromObject({
"Name": "The first step",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"hive-script",
"--run-hive-script",
"--args",
"-f",
"s3://eu-west-1.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q",
"-d",
"INPUT=s3://eu-west-1.elasticmapreduce.samples",
"-d",
"OUTPUT=s3://MY-BUCKET/MyHiveQueryResults/"
]
}
})
}),
resultPath: '$.StepResult'
});
const cancelStep = new sfn.Task(this, 'CancelStep', {
task: new sfn_tasks.EmrCancelStep({
clusterId: sfn.TaskInput.fromDataAt('$.CreateClusterResult.ClusterId').value,
stepId: sfn.TaskInput.fromDataAt('$.StepResult.StepId').value
}),
resultPath: '$.CancelStepResult'
});
const terminateCluster = new sfn.Task(this, 'TerminateCluster', {
task: new sfn_tasks.EmrTerminateCluster({
clusterId: sfn.TaskInput.fromDataAt('$.CreateClusterResult.ClusterId').value,
integrationPattern: sfn.ServiceIntegrationPattern.SYNC
}),
resultPath: '$.TerminateClusterResult'
});
const chain = sfn.Chain
.start(createCluster)
.next(setTerminationProtected)
.next(modifyInstanceFleet)
.next(step)
.next(cancelStep)
.next(setTerminationUnprotected)
.next(terminateCluster);
new sfn.StateMachine(this, 'StateMachine', {
definition: chain
});
}
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the thorough work!
To a first approximation, I like it, but I have two concerns:
- It seems very targeted at ephemeral cluster workloads, which is fine (I wouldn't have thought of that myself), but I feel the constructs will probably need to change in the future to accomodate persistent clusters as well.
- I would like to see some stronger typing at all the step parameters (just like is being done for the ECS integrations). I consider it one of the big value adds of the CDK that you don't have to know which field goes exactly where, you just specify what you want done and the CDK takes care of all the dirty little machinery. Let me know if that's unfounded though, if there are strong practical objections to it.
* | ||
* @see https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html | ||
*/ | ||
readonly clusterConfiguration: sfn.TaskInput; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I think I would prefer all the fields of clusterConfiguration
to be inlined here. I know this is a lot more work for you as a construct writer, but it adds the autocomplete and type safety that is one of the big benefits the CDK provides.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the emr-add-step task with strongly typed input parameters rather than the TaskInput
. I have a first pass at doing the same for the emr-create-cluster in a branch of my fork linked below.
In that branch, I only added strong typing for the top-level cluster configuration parameters, taking inputs and requirements from the RunJobFlow API. I used L2 Constructs for the input Roles, and renamed the JobFlowRole to ClusterRole to coincide with similar renames in the EMR/Step Functions Integration.
I didn't implement all levels of the cluster configuration parameters, as this would be duplicating the generated aws-emr.CfnClusterProps
and associated nested interfaces. The requirements of those nested parameters are dependent on the EMR Release Label requested at launch and are validated by the API. Attempting to implement that fully on the Step Functions Task seems like the wrong place...
Link to the branch with my first pass:
https://github.com/chamcca/aws-cdk/blob/explicit-create-cluster-params/packages/%40aws-cdk/aws-stepfunctions-tasks/lib/emr-create-cluster.ts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about the duplication. Can you point me to specific instances of duplication that you're unsure about?
As another note, in the CDK we tend to generate IAM roles for you instead of requiring you to do this yourself. Any chance that is a pattern you can take on as well?
At first glance, what you did for AddStep
is exactly what I had in mind. CDK wants to be a complete abstraction for AWS, we don't stop at saying "and this is where you put in your API parameters, go figure out what those are", we help you all the way, removing the need to cross-ref other docs as much as possible.
The requirements of those nested parameters are dependent on the EMR Release Label requested at launch and are validated by the API
Unfortunately I don't know what that means.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A deep implementation of the cluster configuration parameters becomes a recreation of the CfnClusterProps and other nested interfaces that are generated in https://github.com/aws/aws-cdk/tree/master/packages/%40aws-cdk/aws-emr/lib in emr.generated.ts.
I'm looking into whether it's possible to use some of the generated interfaces for the nested parameters. For example, using the CfnCluster.JobFlowInstancesConfigProperty
and related nested interfaces from emr.generated.ts as the instances
parameter on the EmrCreateCluster
task. If this works, is use of these Cfn escape hatches acceptable? Or is a deep recreation of those interfaces required? There are a lot of nested interfaces.
The validation of some of the nested parameters is dependent on the value of the top-level emrReleaseLabel
version (e.g. 'emr-5.20.0' vs 'emr-5.28.0'). The EMR API performs the validation checks. I don't think it's feasible or wise to try to perform those validations in the EmrCreateCluster
task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, what do you think of making the Roles optional and exposing them as properties of the EmrCreateCluster
task? If not provided to the constructor the task will generate them. We frequently see customers creating these Roles, setting up access policies, then reusing them across fleets of transient and/or persistent clusters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, using the CfnCluster.JobFlowInstancesConfigProperty
No, and I'll tell you why not: we want to keep the L1 and L2 layers separate, so that we can provide interface stability on the L2 even if the L1 changes.
The ideal solution would have been to make an L2 mirror of CfnCluster.JobFlowInstancesConfigProperty
(let's say it's called JobFlowInstances
, and you would re-use or extend that here.
That is still a deep mirror of all the properties, but at least it's not 2 deep mirrors.
making the Roles optional and exposing them as properties
Yes, that seems like a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It turns out that using the CfnCluster.JobFlowInstancesConfigProperty
wouldn't work anyway. The StepFunctions CreateCluster integration expects the same JSON input as the EMR RunJobFlow API. This JSON varies from the CloudFormation definition.
For example to define InstanceGroups with the RunJobFlow API the JSON would contain an array of InstanceGroup
objects, one for the Master group and another for the Core group. In CloudFormation this is defined with specific MasterInstanceGroup
and CoreInstanceGroup
attributes instead of an array.
I'm working on @experimental interfaces for the aws-stepfunctions-tasks.EmrCreateCluster
that mirror the JSON expected by the StepFunctions and RunJobFlow APIs. I considered creating these interfaces in the aws-emr
package, but since there are no L2s yet defined, I'm unsure if the L2 definition should mirror the API or CloudFormation...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rix0rrr I've pushed a large update to this PR that implements most of the requested changes to EmrCreateCluster. It implements Instances.InstanceFleets
but not the Instances.InstanceGroups
, I'm still working on those and running through deployment tests for both Fleets and Groups. Can you look over what I've pushed so far? I'll make any necessary changes while continuing to work on the InstanceGroups.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, Roles are now optional and created if not provided.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Full implementation of the Instances.InstanceFleets
and Instances.InstanceGroups
has been pushed. I am running additional deployment tests, but these take some time. So far everything looks good.
packages/@aws-cdk/aws-stepfunctions-tasks/test/emr-add-step.test.ts
Outdated
Show resolved
Hide resolved
* | ||
* @see https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html | ||
*/ | ||
readonly stepConfiguration: sfn.TaskInput; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as for clusterConfiguration
.
'elasticmapreduce:DescribeStep', | ||
'elasticmapreduce:CancelSteps' | ||
], | ||
resources: ['arn:aws:elasticmapreduce:*:*:cluster/*'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we at least scope this to the right account and region?
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
@rix0rrr I've completed my deployment testing with various cluster configurations and have pushed all fixes. I've also replaced generic These last updates should address the final outstanding change requests. |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
Okay, thanks for doing all of this work. I completely lack the domain knowledge to accurately judge it, but since we've marked everything |
@@ -179,7 +179,31 @@ | |||
"props-default-doc:@aws-cdk/aws-stepfunctions-tasks.TransformOutput.encryptionKey", | |||
"props-default-doc:@aws-cdk/aws-stepfunctions-tasks.TransformResources.volumeKmsKeyId", | |||
"docs-public-apis:@aws-cdk/aws-stepfunctions-tasks.VpcConfig", | |||
"props-default-doc:@aws-cdk/aws-stepfunctions-tasks.VpcConfig.subnets" | |||
"props-default-doc:@aws-cdk/aws-stepfunctions-tasks.VpcConfig.subnets", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh one more thing, sorry: you shouldn't really be ignoring all these warnings, you should be fixing them instead.
Why not add documentation on default behavior to all props?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added documentation on the default values where the EMR RunJobFlow API documentation gave information on the behavior of the defaults, or where I made a sane choice on behalf of the user. But the docs don't provide info on the default behavior of everything. SubnetId and SubnetIds are good examples. Neither is required and the docs don't mention what happens when none is provided, but a Subnet is somehow picked.
If you prefer, I can fix the errors by indicating that None
is the default for the interface, but won't be able to document the underlying behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably guess at the behavior, but if you don't want to do that, let's make our lack of knowledge explicit using something like:
@default - EMR will automatically select a subnet (details are service-specific)
Maybe you can think of a better phrasing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
linter errors fixed
"props-default-doc:@aws-cdk/aws-stepfunctions-tasks.EmrCreateClusterProps.visibleToAllUsers", | ||
"props-default-doc:@aws-cdk/aws-stepfunctions-tasks.EmrCreateCluster.*", | ||
"docs-public-apis:@aws-cdk/aws-stepfunctions-tasks.EmrCreateCluster.*", | ||
"duration-prop-type:@aws-cdk/aws-stepfunctions-tasks.EmrCreateCluster.CloudWatchAlarmDefinitionProperty.period", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Periods should definitely be modeled using core.Duration
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally fixed these linter messages and made them Durations. I also made the SubnetId and SubnetIds L2s.
But by using cdk value types these values can only be set at deployment time and not at State Machine execution time. By using simple data types we can set the values using sfn.Data.stringAt()
orsfn.Data.numberAt()
to dynamically set the values during State Machine execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should be able to do
Duration.seconds(sfn.Data.numberAt('$.timeout'))
As long as no conversion needs to take place, you can pass through lazy values that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Periods and Timeouts converted to cdk.Durations
/** | ||
* The ClusterId to add the Step to. | ||
*/ | ||
readonly clusterId: string; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @chamcca, one more question about using this.
I will probably want to get clusterId
out of the create cluster state output and pass it here. Normally this would be done with passing parameters using paths but how could I do that here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use either the sfn.Data.stringAt()
or sfn.TaskInput.fromDataAt().value
methods. These both take a JSON path parameter.
take a look at the /test/emr-add-step.test.ts there is an example there that gets the clusterId from "$.ClusterId"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks!
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
Thanks! |
Thank you for contributing! Your pull request is now being automatically merged. |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
Thank you for contributing! Your pull request is now being automatically merged. |
This PR implements aws-stepfunctions-tasks for the newly released EMR/Step Functions service integrations. These include:
createCluster
setClusterTerminationProtection
terminateCluster
addStep
cancelStep
modifyInstanceFleetByName
modifyInstanceGroupByName
closes: #5224
Testing:
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license
Commit Message
feat(stepfunctions): EMR service integrations
This PR implements aws-stepfunctions-tasks for the newly released EMR/Step Functions service integrations. These include:
Closes #5224