Skip to content

Commit

Permalink
Add snapshots and new integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
msambol committed Aug 29, 2023
1 parent ccd8ab7 commit 51a9499
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as eks from 'aws-cdk-lib/aws-eks';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as cdk from 'aws-cdk-lib';
import * as integ from '@aws-cdk/integ-tests-alpha';
import {
Classification, VirtualClusterInput, EksClusterInput, EmrContainersDeleteVirtualCluster,
EmrContainersCreateVirtualCluster, EmrContainersStartJobRun, ReleaseLabel,
} from 'aws-cdk-lib/aws-stepfunctions-tasks';
import { EC2_RESTRICT_DEFAULT_SECURITY_GROUP } from 'aws-cdk-lib/cx-api';

/**
* Stack verification steps:
* Everything in the links below must be setup for the EKS Cluster and Execution Role before running the state machine.
* @see https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up-cluster-access.html
* @see https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up-enable-IAM.html
* @see https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up-trust-policy.html
*
* aws stepfunctions start-execution --state-machine-arn <deployed state machine arn> : should return execution arn
* aws stepfunctions describe-execution --execution-arn <exection-arn generated before> : should return status as SUCCEEDED
*/

const app = new cdk.App();
const stack = new cdk.Stack(app, 'aws-stepfunctions-tasks-emr-containers-all-services-test');
stack.node.setContext(EC2_RESTRICT_DEFAULT_SECURITY_GROUP, false);

const eksCluster = new eks.Cluster(stack, 'integration-test-eks-cluster', {
version: eks.KubernetesVersion.V1_22,
defaultCapacity: 3,
defaultCapacityInstance: ec2.InstanceType.of(ec2.InstanceClass.M5, ec2.InstanceSize.XLARGE),
});

const jobExecutionRole = new iam.Role(stack, 'JobExecutionRole', {
assumedBy: new iam.CompositePrincipal(
new iam.ServicePrincipal('emr-containers.amazonaws.com'),
new iam.ServicePrincipal('states.amazonaws.com'),
),
});

const createVirtualCluster = new EmrContainersCreateVirtualCluster(stack, 'Create a virtual Cluster', {
virtualClusterName: 'Virtual-Cluster-Name',
eksCluster: EksClusterInput.fromCluster(eksCluster),
resultPath: '$.cluster',
});

const startJobRunWithRoleFromArn = new EmrContainersStartJobRun(stack, 'Start a Job Run With Role From ARN', {
virtualCluster: VirtualClusterInput.fromTaskInput(sfn.TaskInput.fromJsonPathAt('$.cluster.Id')),
releaseLabel: ReleaseLabel.EMR_6_2_0,
jobName: 'EMR-Containers-Job-Role-Arn',
executionRoleArn: jobExecutionRole.roleArn,
jobDriver: {
sparkSubmitJobDriver: {
entryPoint: sfn.TaskInput.fromText('local:///usr/lib/spark/examples/src/main/python/pi.py'),
entryPointArguments: sfn.TaskInput.fromObject(['2']),
sparkSubmitParameters: '--conf spark.driver.memory=512M --conf spark.kubernetes.driver.request.cores=0.2 --conf spark.kubernetes.executor.request.cores=0.2 --conf spark.sql.shuffle.partitions=60 --conf spark.dynamicAllocation.enabled=false',
},
},
monitoring: {
logging: true,
persistentAppUI: true,
},
applicationConfig: [{
classification: Classification.SPARK_DEFAULTS,
properties: {
'spark.executor.instances': '1',
'spark.executor.memory': '512M',
},
}],
resultPath: '$.job',
});

const deleteVirtualCluster = new EmrContainersDeleteVirtualCluster(stack, 'Delete a Virtual Cluster', {
virtualClusterId: sfn.TaskInput.fromJsonPathAt('$.job.VirtualClusterId'),
});

const chain = sfn.Chain
.start(createVirtualCluster)
.next(startJobRunWithRoleFromArn)
.next(deleteVirtualCluster);

const sm = new sfn.StateMachine(stack, 'StateMachine', {
definitionBody: sfn.DefinitionBody.fromChainable(chain),
timeout: cdk.Duration.minutes(20),
});

new cdk.CfnOutput(stack, 'stateMachineArn', {
value: sm.stateMachineArn,
});

new integ.IntegTest(app, 'aws-stepfunctions-tasks-emr-containers-all-services', {
testCases: [stack],
// Test includes assets that are updated weekly. If not disabled, the upgrade PR will fail.
diffAssets: false,
cdkCommandOptions: {
deploy: {
args: {
rollback: true,
},
},
},
});

app.synth();
Original file line number Diff line number Diff line change
Expand Up @@ -70,40 +70,13 @@ const startJobRun = new EmrContainersStartJobRun(stack, 'Start a Job Run', {
resultPath: '$.job',
});

const startJobRunWithRoleFromArn = new EmrContainersStartJobRun(stack, 'Start a Job Run With Role From ARN', {
virtualCluster: VirtualClusterInput.fromTaskInput(sfn.TaskInput.fromJsonPathAt('$.cluster.Id')),
releaseLabel: ReleaseLabel.EMR_6_2_0,
jobName: 'EMR-Containers-Job-Role-Arn',
executionRoleArn: jobExecutionRole.roleArn,
jobDriver: {
sparkSubmitJobDriver: {
entryPoint: sfn.TaskInput.fromText('local:///usr/lib/spark/examples/src/main/python/pi.py'),
entryPointArguments: sfn.TaskInput.fromObject(['2']),
sparkSubmitParameters: '--conf spark.driver.memory=512M --conf spark.kubernetes.driver.request.cores=0.2 --conf spark.kubernetes.executor.request.cores=0.2 --conf spark.sql.shuffle.partitions=60 --conf spark.dynamicAllocation.enabled=false',
},
},
monitoring: {
logging: true,
persistentAppUI: true,
},
applicationConfig: [{
classification: Classification.SPARK_DEFAULTS,
properties: {
'spark.executor.instances': '1',
'spark.executor.memory': '512M',
},
}],
resultPath: '$.job',
});

const deleteVirtualCluster = new EmrContainersDeleteVirtualCluster(stack, 'Delete a Virtual Cluster', {
virtualClusterId: sfn.TaskInput.fromJsonPathAt('$.job.VirtualClusterId'),
});

const chain = sfn.Chain
.start(createVirtualCluster)
.next(startJobRun)
.next(startJobRunWithRoleFromArn)
.next(deleteVirtualCluster);

const sm = new sfn.StateMachine(stack, 'StateMachine', {
Expand Down

0 comments on commit 51a9499

Please sign in to comment.