Skip to content

Commit

Permalink
PySpark and Scala Flex ETL jobs L2 constructs
Browse files Browse the repository at this point in the history
  • Loading branch information
pras-b committed Apr 12, 2024
1 parent 5b9a16c commit 6118d80
Show file tree
Hide file tree
Showing 27 changed files with 2,253 additions and 0 deletions.
7 changes: 7 additions & 0 deletions packages/@aws-cdk/aws-glue-alpha/lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ export enum WorkerType {
Z_2X = 'Z.2X',
}

/**
* The number of workers of a defined workerType that are allocated when a job runs.
*
* @see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-jobs-job.html
*/

/**
* Job states emitted by Glue to CloudWatch Events.
*
Expand Down Expand Up @@ -196,6 +202,7 @@ export enum PythonVersion {
* Python 3.9 (the exact version depends on GlueVersion and JobCommand used)
*/
THREE_NINE = '3.9',

}

/**
Expand Down
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-glue-alpha/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ export * from './constants';
export * from './jobs/job';
// export * from './jobs/flex-job';
export * from './jobs/pyspark-etl-job';
export * from './jobs/pysparkflex-etl-job';
// export * from './jobs/python-shell-job';
// export * from './jobs/ray-job';
// export * from './jobs/scala-spark-etl-job';
export * from './jobs/scala-spark-flex-etl-job';
export * from './jobs/spark-ui-utils';
// export * from './jobs/spark-etl-job';
//export * from './jobs/streaming-job';
Expand Down
180 changes: 180 additions & 0 deletions packages/@aws-cdk/aws-glue-alpha/lib/jobs/pysparkflex-etl-job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import * as iam from 'aws-cdk-lib/aws-iam';
import { Bucket } from 'aws-cdk-lib/aws-s3';
import { CfnJob } from 'aws-cdk-lib/aws-glue';
import { Job, JobProperties } from './job';
import { Construct } from 'constructs';
import { JobType, GlueVersion, JobLanguage, PythonVersion, WorkerType, ExecutionClass } from '../constants';
import { SparkUIProps, SparkUILoggingLocation, validateSparkUiPrefix, cleanSparkUiPrefixForGrant } from './spark-ui-utils';

/**
* Flex Jobs class
*
* Flex jobs supports Python and Scala language.
* The flexible execution class is appropriate for non-urgent jobs such as
* pre-production jobs, testing, and one-time data loads.
* Flexible job runs are supported for jobs using AWS Glue version 3.0 or later and G.1X or
* G.2X worker types but will default to the latest version of Glue (currently Glue 3.0.)
*
* Similar to ETL, we’ll enable these features: —enable-metrics, —enable-spark-ui,
* —enable-continuous-cloudwatch-log
*
*/

export interface PySparkFlexEtlJobProps extends JobProperties {

/**
* Enables the Spark UI debugging and monitoring with the specified props.
*
* @default - Spark UI debugging and monitoring is disabled.
*
* @see https://docs.aws.amazon.com/glue/latest/dg/monitor-spark-ui-jobs.html
* @see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
*/
readonly sparkUI?: SparkUIProps;

/**
* Extra Python Files S3 URL (optional)
* S3 URL where additional python dependencies are located
* @default - no extra files
*/
readonly extraPythonFiles?: string[];

}

/**
* A Python Spark ETL Glue Job
*/
export class PySparkFlexEtlJob extends Job {

// Implement abstract Job attributes
public readonly jobArn: string;
public readonly jobName: string;
public readonly role: iam.IRole;
public readonly grantPrincipal: iam.IPrincipal;

/**
* The Spark UI logs location if Spark UI monitoring and debugging is enabled.
*
* @see https://docs.aws.amazon.com/glue/latest/dg/monitor-spark-ui-jobs.html
* @see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
*/
public readonly sparkUILoggingLocation?: SparkUILoggingLocation;

/**
* PySparkFlexEtlJob constructor
*
* @param scope
* @param id
* @param props
*/
constructor(scope: Construct, id: string, props: PySparkFlexEtlJobProps) {
super(scope, id, {
physicalName: props.jobName,
});

// Set up role and permissions for principal
this.role = props.role, {
assumedBy: new iam.ServicePrincipal('glue.amazonaws.com'),
managedPolicies: [iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSGlueServiceRole')],
};
this.grantPrincipal = this.role;

// Enable SparkUI by default as a best practice
const sparkUIArgs = props.sparkUI?.bucket ? this.setupSparkUI(this.role, props.sparkUI) : undefined;
this.sparkUILoggingLocation = sparkUIArgs?.location;

// Enable CloudWatch metrics and continuous logging by default as a best practice
const continuousLoggingArgs = props.continuousLogging?.enabled ? this.setupContinuousLogging(this.role, props.continuousLogging) : {};
const profilingMetricsArgs = { '--enable-metrics': '' };

// Gather executable arguments
const execuatbleArgs = this.executableArguments(props);

// Conbine command line arguments into a single line item
const defaultArguments = {
...execuatbleArgs,
...continuousLoggingArgs,
...profilingMetricsArgs,
...sparkUIArgs?.args,
...this.checkNoReservedArgs(props.defaultArguments),
};

/*if ((!props.workerType && props.numberOrWorkers !== undefined) || (props.workerType && props.numberOrWorkers === undefined)) {
throw new Error('Both workerType and numberOrWorkers must be set');
} */

const jobResource = new CfnJob(this, 'Resource', {
name: props.jobName,
description: props.description,
role: this.role.roleArn,
command: {
name: JobType.ETL,
scriptLocation: this.codeS3ObjectUrl(props.script),
pythonVersion: PythonVersion.THREE,
},
glueVersion: props.glueVersion ? props.glueVersion : GlueVersion.V3_0,
workerType: props.workerType ? props.workerType : WorkerType.G_2X,
numberOfWorkers: props.numberOrWorkers ? props.numberOrWorkers : 10,
maxRetries: props.maxRetries,
executionProperty: props.maxConcurrentRuns ? { maxConcurrentRuns: props.maxConcurrentRuns } : undefined,
//notificationProperty: props.notifyDelayAfter ? { notifyDelayAfter: props.notifyDelayAfter.toMinutes() } : undefined,
timeout: props.timeout?.toMinutes(),
connections: props.connections ? { connections: props.connections.map((connection) => connection.connectionName) } : undefined,
securityConfiguration: props.securityConfiguration?.securityConfigurationName,
tags: props.tags,
executionClass: ExecutionClass.FLEX,
defaultArguments,
});

const resourceName = this.getResourceNameAttribute(jobResource.ref);
this.jobArn = this.buildJobArn(this, resourceName);
this.jobName = resourceName;
}

/**
* Set the executable arguments with best practices enabled by default
*
* @param props
* @returns An array of arguments for Glue to use on execution
*/
private executableArguments(props: PySparkFlexEtlJobProps) {
const args: { [key: string]: string } = {};
args['--job-language'] = JobLanguage.PYTHON;

// TODO: Confirm with Glue service team what the mapping is from extra-x to job language, if any
if (props.extraPythonFiles && props.extraPythonFiles.length > 0) {
//args['--extra-py-files'] = props.extraPythonFiles.map(code => this.codeS3ObjectUrl(code)).join(',');
}

// if (props.extraJars && props.extraJars?.length > 0) {
// args['--extra-jars'] = props.extraJars.map(code => this.codeS3ObjectUrl(code)).join(',');
// }
// if (props.extraFiles && props.extraFiles.length > 0) {
// args['--extra-files'] = props.extraFiles.map(code => this.codeS3ObjectUrl(code)).join(',');
// }
// if (props.extraJarsFirst) {
// args['--user-jars-first'] = 'true';
// }

return args;
}

private setupSparkUI(role: iam.IRole, sparkUiProps: SparkUIProps) {

validateSparkUiPrefix(sparkUiProps.prefix);
const bucket = sparkUiProps.bucket ?? new Bucket(this, 'SparkUIBucket');
bucket.grantReadWrite(role, cleanSparkUiPrefixForGrant(sparkUiProps.prefix));
const args = {
'--enable-spark-ui': 'true',
'--spark-event-logs-path': bucket.s3UrlForObject(sparkUiProps.prefix),
};

return {
location: {
prefix: sparkUiProps.prefix,
bucket,
},
args,
};
}
}
Loading

0 comments on commit 6118d80

Please sign in to comment.