Skip to content

Commit

Permalink
feat(stepfunctions-tasks): add 'Emr on Eks' tasks (#17103)
Browse files Browse the repository at this point in the history
This CDK feature adds support for Emr on Eks by implementing API service integrations for the following three APIs.

This PR adds three tasks which support Emr on Eks:
1) [Create Virtual Cluster](https://docs.aws.amazon.com/emr-on-eks/latest/APIReference/API_CreateVirtualCluster.html)
2) [ Start a job run](https://docs.aws.amazon.com/emr-on-eks/latest/APIReference/API_StartJobRun.html)
3) [Delete virtual cluster ](https://docs.aws.amazon.com/emr-on-eks/latest/APIReference/API_DeleteVirtualCluster.html)


Continuation of #15262 by @matthewsvu and @BenChaimberg:

Closes #15234.

----
*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
sanava2010 authored Nov 24, 2021
1 parent b432822 commit f2bf322
Show file tree
Hide file tree
Showing 14 changed files with 6,787 additions and 0 deletions.
167 changes: 167 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ This module is part of the [AWS Cloud Development Kit](https://github.com/aws/aw
- [Cancel Step](#cancel-step)
- [Modify Instance Fleet](#modify-instance-fleet)
- [Modify Instance Group](#modify-instance-group)
- [EMR on EKS](#emr-on-eks)
- [Create Virtual Cluster](#create-virtual-cluster)
- [Delete Virtual Cluster](#delete-virtual-cluster)
- [Start Job Run](#start-job-run)
- [EKS](#eks)
- [Call](#call)
- [EventBridge](#eventbridge)
Expand Down Expand Up @@ -783,6 +787,169 @@ new tasks.EmrModifyInstanceGroupByName(this, 'Task', {
});
```

## EMR on EKS

Step Functions supports Amazon EMR on EKS through the service integration pattern.
The service integration APIs correspond to Amazon EMR on EKS APIs, but differ in the parameters that are used.

[Read more](https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr-eks.html) about the differences when using these service integrations.

[Setting up](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up.html) the EKS cluster is required.

### Create Virtual Cluster

The [CreateVirtualCluster](https://docs.aws.amazon.com/emr-on-eks/latest/APIReference/API_CreateVirtualCluster.html) API creates a single virtual cluster that's mapped to a single Kubernetes namespace.

The EKS cluster containing the Kubernetes namespace where the virtual cluster will be mapped can be passed in from the task input.

```ts
new tasks.EmrContainersCreateVirtualCluster(this, 'Create a Virtual Cluster', {
eksCluster: tasks.EksClusterInput.fromTaskInput(sfn.TaskInput.fromText('clusterId')),
});
```

The EKS cluster can also be passed in directly.

```ts
import * as eks from '@aws-cdk/aws-eks';

declare const eksCluster: eks.Cluster;

new tasks.EmrContainersCreateVirtualCluster(this, 'Create a Virtual Cluster', {
eksCluster: tasks.EksClusterInput.fromCluster(eksCluster),
});
```

By default, the Kubernetes namespace that a virtual cluster maps to is "default", but a specific namespace within an EKS cluster can be selected.

```ts
new tasks.EmrContainersCreateVirtualCluster(this, 'Create a Virtual Cluster', {
eksCluster: tasks.EksClusterInput.fromTaskInput(sfn.TaskInput.fromText('clusterId')),
eksNamespace: 'specified-namespace',
});
```

### Delete Virtual Cluster

The [DeleteVirtualCluster](https://docs.aws.amazon.com/emr-on-eks/latest/APIReference/API_DeleteVirtualCluster.html) API deletes a virtual cluster.

```ts
new tasks.EmrContainersDeleteVirtualCluster(this, 'Delete a Virtual Cluster', {
virtualClusterId: sfn.TaskInput.fromJsonPathAt('$.virtualCluster'),
});
```

### Start Job Run

The [StartJobRun](https://docs.aws.amazon.com/emr-on-eks/latest/APIReference/API_StartJobRun.html) API starts a job run. A job is a unit of work that you submit to Amazon EMR on EKS for execution. The work performed by the job can be defined by a Spark jar, PySpark script, or SparkSQL query. A job run is an execution of the job on the virtual cluster.

Required setup:

- If not done already, follow the [steps](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up.html) to setup EMR on EKS and [create an EKS Cluster](https://docs.aws.amazon.com/cdk/api/latest/docs/aws-eks-readme.html#quick-start).
- Enable [Cluster access](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up-cluster-access.html)
- Enable [IAM Role access](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up-enable-IAM.html)

The following actions must be performed if the virtual cluster ID is supplied from the task input. Otherwise, if it is supplied statically in the state machine definition, these actions will be done automatically.

- Create an [IAM role](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-iam.Role.html)
- Update the [Role Trust Policy](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up-trust-policy.html) of the Job Execution Role.

The job can be configured with spark submit parameters:

```ts
new tasks.EmrContainersStartJobRun(this, 'EMR Containers Start Job Run', {
virtualCluster: tasks.VirtualClusterInput.fromVirtualClusterId('de92jdei2910fwedz'),
releaseLabel: tasks.ReleaseLabel.EMR_6_2_0,
jobDriver: {
sparkSubmitJobDriver: {
entryPoint: sfn.TaskInput.fromText('local:///usr/lib/spark/examples/src/main/python/pi.py'),
sparkSubmitParameters: '--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1',
},
},
});
```

Configuring the job can also be done via application configuration:

```ts
new tasks.EmrContainersStartJobRun(this, 'EMR Containers Start Job Run', {
virtualCluster: tasks.VirtualClusterInput.fromVirtualClusterId('de92jdei2910fwedz'),
releaseLabel: tasks.ReleaseLabel.EMR_6_2_0,
jobName: 'EMR-Containers-Job',
jobDriver: {
sparkSubmitJobDriver: {
entryPoint: sfn.TaskInput.fromText('local:///usr/lib/spark/examples/src/main/python/pi.py'),
},
},
applicationConfig: [{
classification: tasks.Classification.SPARK_DEFAULTS,
properties: {
'spark.executor.instances': '1',
'spark.executor.memory': '512M',
},
}],
});
```

Job monitoring can be enabled if `monitoring.logging` is set true. This automatically generates an S3 bucket and CloudWatch logs.

```ts
new tasks.EmrContainersStartJobRun(this, 'EMR Containers Start Job Run', {
virtualCluster: tasks.VirtualClusterInput.fromVirtualClusterId('de92jdei2910fwedz'),
releaseLabel: tasks.ReleaseLabel.EMR_6_2_0,
jobDriver: {
sparkSubmitJobDriver: {
entryPoint: sfn.TaskInput.fromText('local:///usr/lib/spark/examples/src/main/python/pi.py'),
sparkSubmitParameters: '--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1',
},
},
monitoring: {
logging: true,
},
});
```

Otherwise, providing monitoring for jobs with existing log groups and log buckets is also available.

```ts
import * as logs from '@aws-cdk/aws-logs';

const logGroup = new logs.LogGroup(this, 'Log Group');
const logBucket = new s3.Bucket(this, 'S3 Bucket')

new tasks.EmrContainersStartJobRun(this, 'EMR Containers Start Job Run', {
virtualCluster: tasks.VirtualClusterInput.fromVirtualClusterId('de92jdei2910fwedz'),
releaseLabel: tasks.ReleaseLabel.EMR_6_2_0,
jobDriver: {
sparkSubmitJobDriver: {
entryPoint: sfn.TaskInput.fromText('local:///usr/lib/spark/examples/src/main/python/pi.py'),
sparkSubmitParameters: '--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1',
},
},
monitoring: {
logGroup: logGroup,
logBucket: logBucket,
},
});
```

Users can provide their own existing Job Execution Role.

```ts
new tasks.EmrContainersStartJobRun(this, 'EMR Containers Start Job Run', {
virtualCluster:tasks.VirtualClusterInput.fromTaskInput(sfn.TaskInput.fromJsonPathAt('$.VirtualClusterId')),
releaseLabel: tasks.ReleaseLabel.EMR_6_2_0,
jobName: 'EMR-Containers-Job',
executionRole: iam.Role.fromRoleArn(this, 'Job-Execution-Role', 'arn:aws:iam::xxxxxxxxxxxx:role/JobExecutionRole'),
jobDriver: {
sparkSubmitJobDriver: {
entryPoint: sfn.TaskInput.fromText('local:///usr/lib/spark/examples/src/main/python/pi.py'),
sparkSubmitParameters: '--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1',
},
},
});
```

## EKS

Step Functions supports Amazon EKS through the service integration pattern.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import * as eks from '@aws-cdk/aws-eks';
import * as iam from '@aws-cdk/aws-iam';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import { Stack } from '@aws-cdk/core';
import { Construct } from 'constructs';
import { integrationResourceArn, validatePatternSupported } from '../private/task-utils';

/**
* Class for supported types of EMR Containers' Container Providers
*/
enum ContainerProviderTypes {

/**
* Supported container provider type for a EKS Cluster
*/
EKS = 'EKS'
}

/**
* Class that supports methods which return the EKS cluster name depending on input type.
*/
export class EksClusterInput {

/**
* Specify an existing EKS Cluster as the name for this Cluster
*/
static fromCluster(cluster: eks.ICluster): EksClusterInput {
return new EksClusterInput(cluster.clusterName);
}

/**
* Specify a Task Input as the name for this Cluster
*/
static fromTaskInput(taskInput: sfn.TaskInput): EksClusterInput {
return new EksClusterInput(taskInput.value);
}

/**
* Initializes the clusterName
*
* @param clusterName The name of the EKS Cluster
*/
private constructor(readonly clusterName: string) { }
}

/**
* Properties to define a EMR Containers CreateVirtualCluster Task on an EKS cluster
*/
export interface EmrContainersCreateVirtualClusterProps extends sfn.TaskStateBaseProps {

/**
* EKS Cluster or task input that contains the name of the cluster
*/
readonly eksCluster: EksClusterInput;

/**
* The namespace of an EKS cluster
*
* @default - 'default'
*/
readonly eksNamespace?: string;

/**
* Name of the virtual cluster that will be created.
*
* @default - the name of the state machine execution that runs this task and state name
*/
readonly virtualClusterName?: string;

/**
* The tags assigned to the virtual cluster
*
* @default {}
*/
readonly tags?: { [key: string]: string };
}

/**
* Task that creates an EMR Containers virtual cluster from an EKS cluster
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr-eks.html
*/
export class EmrContainersCreateVirtualCluster extends sfn.TaskStateBase {

private static readonly SUPPORTED_INTEGRATION_PATTERNS: sfn.IntegrationPattern[] = [
sfn.IntegrationPattern.REQUEST_RESPONSE,
];

protected readonly taskMetrics?: sfn.TaskMetricsConfig;
protected readonly taskPolicies?: iam.PolicyStatement[];

private readonly integrationPattern: sfn.IntegrationPattern;

constructor(scope: Construct, id: string, private readonly props: EmrContainersCreateVirtualClusterProps) {
super(scope, id, props);
this.integrationPattern = props.integrationPattern ?? sfn.IntegrationPattern.REQUEST_RESPONSE;
validatePatternSupported(this.integrationPattern, EmrContainersCreateVirtualCluster.SUPPORTED_INTEGRATION_PATTERNS);

this.taskPolicies = this.createPolicyStatements();
}

/**
* @internal
*/
protected _renderTask(): any {
return {
Resource: integrationResourceArn('emr-containers', 'createVirtualCluster', this.integrationPattern),
Parameters: sfn.FieldUtils.renderObject({
Name: this.props.virtualClusterName ?? sfn.JsonPath.stringAt('States.Format(\'{}/{}\', $$.Execution.Name, $$.State.Name)'),
ContainerProvider: {
Id: this.props.eksCluster.clusterName,
Info: {
EksInfo: {
Namespace: this.props.eksNamespace ?? 'default',
},
},
Type: ContainerProviderTypes.EKS,
},
Tags: this.props.tags,
}),
};
};

private createPolicyStatements(): iam.PolicyStatement[] {
return [
new iam.PolicyStatement({
resources: ['*'], // We need * permissions for creating a virtual cluster https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up-iam.html
actions: ['emr-containers:CreateVirtualCluster'],
}),
new iam.PolicyStatement({
resources: [
Stack.of(this).formatArn({
service: 'iam',
region: '',
resource: 'role/aws-service-role/emr-containers.amazonaws.com',
resourceName: 'AWSServiceRoleForAmazonEMRContainers',
}),
],
actions: ['iam:CreateServiceLinkedRole'],
conditions: {
StringLike: { 'iam:AWSServiceName': 'emr-containers.amazonaws.com' },
},
}),
];
}
}
Loading

0 comments on commit f2bf322

Please sign in to comment.