-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
aws-stepfunctions-tasks: implement EMR Service Integrations #5224
Comments
@SomayaB the integration is enabled in the same way the SNS, SQS, and Lambda Invocation service integrations for Step Functions are enabled. The CDK currently supports these integrations without specific CFN implementations. The EMR Integrations can be enabled in the same way, with an IStepFunctionsTask for each API that generates the ResourceArn, Parameters, Policy, etc. |
@chamcca Ah yes I see that now, sorry about the confusion. If you would like to submit a PR, someone from the team will review it, otherwise someone will update this issue when there is movement. |
We would need an L2 for EMR before we could properly tackle this |
@rix0rrr re:Invent has slowed me this week, but I'm hoping to have an implementation for review next week. |
@SomayaB I have a PR ready, with 7 new aws-stepfunctions-tasks. I'll be creating a number of different Step Functions today to deploy and test the various tasks to ensure they work as expected. Would it be possible to get a set of eyes on the 7 tasks and unit tests I've added? If they are going require a significant change I'd like to find that out before putting too much effort into deploying and testing them all. The new tasks are the 7 emr-*.ts files located here: Unit tests are the 7 emr-*.test.ts files located here: |
Hey @chamcca, Thank you for putting time into this! We always appreciate contributions, especially on this scale. Unfortunately, the way the tasks are structured is a bit contrary to our desired format. The most notable, egregious of those differences is passing a cluster reference by After some thought, the best way forward is probably to author a small portion of an EMR L2 that includes I will follow up shortly with a link to our guide for authoring L2s. 😸 |
@NGL321 if I understand this correctly, using an EMR L2 and an One of the primary use cases for these EMR/Step Function integrations is the management of Transient EMR clusters. Where the Cluster is created at run time of the Step Function and only exists for the duration of the Step Function execution. In this case, the purpose of the Step Function is to manage the cluster. The Step Function's stability is directly tied to the cluster's. A example of this was just published on our AWS Big Data Blog: https://aws.amazon.com/blogs/aws/new-using-step-functions-to-orchestrate-amazon-emr-workloads/ A simple example of this is shown the the State Machine definition below which I was using to test some of the Tasks: import iam = require('@aws-cdk/aws-iam');
import sfn = require('@aws-cdk/aws-stepfunctions');
import sfn_tasks = require('@aws-cdk/aws-stepfunctions-tasks');
import cdk = require('@aws-cdk/core');
export class AwsCdkEmrTasksStack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
const roles = [
iam.Role.fromRoleArn(this, 'ServiceRole', this.formatArn({
region: '',
service: 'iam',
resource: 'role',
resourceName: 'EMR_DefaultRole'
})),
iam.Role.fromRoleArn(this, 'InstanceRole', this.formatArn({
region: '',
service: 'iam',
resource: 'role',
resourceName: 'EMR_EC2_DefaultRole'
}))
];
const createCluster = new sfn.Task(this, 'CreateCluster', {
task: new sfn_tasks.EmrCreateCluster({
clusterRoles: roles,
integrationPattern: sfn.ServiceIntegrationPattern.SYNC,
clusterConfiguration: sfn.TaskInput.fromObject({
'Name': sfn.TaskInput.fromDataAt('$.ClusterName').value,
'VisibleToAllUsers': true,
'ReleaseLabel': 'emr-5.28.0',
'Applications': [{ 'Name': 'Hive' }],
'ServiceRole': 'EMR_DefaultRole',
'JobFlowRole': 'EMR_EC2_DefaultRole',
'LogUri': 's3://chamcca-emr-launch-logs-uw2/elasticmapreduce/',
'Instances': {
'KeepJobFlowAliveWhenNoSteps': true,
'InstanceFleets': [
{
'Name': 'MasterFleet',
'InstanceFleetType': 'MASTER',
'TargetOnDemandCapacity': 1,
'InstanceTypeConfigs': [
{
'InstanceType': 'm4.xlarge'
}
]
},
{
'Name': 'CoreFleet',
'InstanceFleetType': 'CORE',
'TargetOnDemandCapacity': 1,
'InstanceTypeConfigs': [
{
'InstanceType': 'm4.xlarge'
}
]
}
]
}
})
}),
resultPath: '$.CreateClusterResult'
});
const setTerminationProtected = new sfn.Task(this, 'TerminationProtected', {
task: new sfn_tasks.EmrSetClusterTerminationProtection({
clusterId: sfn.TaskInput.fromDataAt('$.CreateClusterResult.ClusterId').value,
terminationProtected: true
}),
resultPath: '$.TerminationProtectedResult'
});
const modifyInstanceFleet = new sfn.Task(this, 'ModifyInstanceFleet', {
task: new sfn_tasks.EmrModifyInstanceFleetByName({
clusterId: sfn.TaskInput.fromDataAt('$.CreateClusterResult.ClusterId').value,
instanceFleetName: 'CoreFleet',
instanceFleetConfiguration: sfn.TaskInput.fromObject({
'TargetOnDemandCapacity': 2,
'TargetSpotCapacity': 0
})
}),
resultPath: '$.ModifyInstanceFleetResult'
});
const step = new sfn.Task(this, 'Step', {
task: new sfn_tasks.EmrAddStep({
clusterId: sfn.TaskInput.fromDataAt('$.CreateClusterResult.ClusterId').value,
integrationPattern: sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
stepConfiguration: sfn.TaskInput.fromObject({
"Name": "The first step",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"hive-script",
"--run-hive-script",
"--args",
"-f",
"s3://eu-west-1.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q",
"-d",
"INPUT=s3://eu-west-1.elasticmapreduce.samples",
"-d",
"OUTPUT=s3://MY-BUCKET/MyHiveQueryResults/"
]
}
})
}),
resultPath: '$.StepResult'
});
const cancelStep = new sfn.Task(this, 'CancelStep', {
task: new sfn_tasks.EmrCancelStep({
clusterId: sfn.TaskInput.fromDataAt('$.CreateClusterResult.ClusterId').value,
stepId: sfn.TaskInput.fromDataAt('$.StepResult.StepId').value
}),
resultPath: '$.CancelStepResult'
});
const setTerminationUnprotected = new sfn.Task(this, 'TerminationUnprotected', {
task: new sfn_tasks.EmrSetClusterTerminationProtection({
clusterId: sfn.TaskInput.fromDataAt('$.CreateClusterResult.ClusterId').value,
terminationProtected: false
}),
resultPath: '$.TerminationUnprotectedResult'
});
const terminateCluster = new sfn.Task(this, 'TerminateCluster', {
task: new sfn_tasks.EmrTerminateCluster({
clusterId: sfn.TaskInput.fromDataAt('$.CreateClusterResult.ClusterId').value,
integrationPattern: sfn.ServiceIntegrationPattern.SYNC
}),
resultPath: '$.TerminateClusterResult'
});
const chain = sfn.Chain
.start(createCluster)
.next(setTerminationProtected)
.next(modifyInstanceFleet)
.next(step)
.next(cancelStep)
.next(setTerminationUnprotected)
.next(terminateCluster);
new sfn.StateMachine(this, 'StateMachine', {
definition: chain
});
}
} Please send along the info on authoring L2s, I'm happy to assist with that. But in order to get the full value of these integrations, a solution that works with run time as well as deployment time resources will be needed. |
Hi, just bumping this for an update. I see #5372 is waiting for either a rebase or CR for a week now. Would really love to see this change merged soon as it fits my use-case. Thank you! |
This PR implements aws-stepfunctions-tasks for the newly released EMR/Step Functions service integrations. These include: * createCluster * setClusterTerminationProtection * terminateCluster * addStep * cancelStep * modifyInstanceFleetByName * modifyInstanceGroupByName Closes #5224
Implement newly released Step Functions/EMR Service Integrations
createCluster
setClusterTerminationProtection
terminateCluster
addStep
cancelStep
modifyInstanceFleetByName
modifyInstanceGroupByName
Described here:
https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr.html
Use Case
Enable the CDK to be used to build simplified orchestration pipelines for EMR using Step Functions
Proposed Solution
Add aws-stepfunctions-tasks for each of these Service Integration APIs
This is a 🚀 Feature Request
The text was updated successfully, but these errors were encountered: