Skip to content

Commit

Permalink
feat(glue): Job construct (#12506)
Browse files Browse the repository at this point in the history
Closes #12443

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
humanzz authored Sep 8, 2021
1 parent 15edd67 commit fc74110
Show file tree
Hide file tree
Showing 13 changed files with 3,309 additions and 11 deletions.
82 changes: 72 additions & 10 deletions packages/@aws-cdk/aws-glue/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,69 @@

This module is part of the [AWS Cloud Development Kit](https://github.com/aws/aws-cdk) project.

## Job

A `Job` encapsulates a script that connects to data sources, processes them, and then writes output to a data target.

There are 3 types of jobs supported by AWS Glue: Spark ETL, Spark Streaming, and Python Shell jobs.

The `glue.JobExecutable` allows you to specify the type of job, the language to use and the code assets required by the job.

`glue.Code` allows you to refer to the different code assets required by the job, either from an existing S3 location or from a local file path.

### Spark Jobs

These jobs run in an Apache Spark environment managed by AWS Glue.

#### ETL Jobs

An ETL job processes data in batches using Apache Spark.

```ts
new glue.Job(stack, 'ScalaSparkEtlJob', {
executable: glue.JobExecutable.scalaEtl({
glueVersion: glue.GlueVersion.V2_0,
script: glue.Code.fromBucket(bucket, 'src/com/example/HelloWorld.scala'),
className: 'com.example.HelloWorld',
extraJars: [glue.Code.fromBucket(bucket, 'jars/HelloWorld.jar')],
}),
description: 'an example Scala ETL job',
});
```

#### Streaming Jobs

A Streaming job is similar to an ETL job, except that it performs ETL on data streams. It uses the Apache Spark Structured Streaming framework. Some Spark job features are not available to streaming ETL jobs.

```ts
new glue.Job(stack, 'PythonSparkStreamingJob', {
executable: glue.JobExecutable.pythonStreaming({
glueVersion: glue.GlueVersion.V2_0,
pythonVersion: glue.PythonVersion.THREE,
script: glue.Code.fromAsset(path.join(__dirname, 'job-script/hello_world.py')),
}),
description: 'an example Python Streaming job',
});
```

### Python Shell Jobs

A Python shell job runs Python scripts as a shell and supports a Python version that depends on the AWS Glue version you are using.
This can be used to schedule and run tasks that don't require an Apache Spark environment.

```ts
new glue.Job(stack, 'PythonShellJob', {
executable: glue.JobExecutable.pythonShell({
glueVersion: glue.GlueVersion.V1_0,
pythonVersion: PythonVersion.THREE,
script: glue.Code.fromBucket(bucket, 'script.py'),
}),
description: 'an example Python Shell job',
});
```

See [documentation](https://docs.aws.amazon.com/glue/latest/dg/add-job.html) for more information on adding jobs in Glue.

## Connection

A `Connection` allows Glue jobs, crawlers and development endpoints to access certain types of data stores. For example, to create a network connection to connect to a data source within a VPC:
Expand All @@ -41,16 +104,6 @@ If you need to use a connection type that doesn't exist as a static member on `C

See [Adding a Connection to Your Data Store](https://docs.aws.amazon.com/glue/latest/dg/populate-add-connection.html) and [Connection Structure](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-connections.html#aws-glue-api-catalog-connections-Connection) documentation for more information on the supported data stores and their configurations.

## Database

A `Database` is a logical grouping of `Tables` in the Glue Catalog.

```ts
new glue.Database(stack, 'MyDatabase', {
databaseName: 'my_database'
});
```

## SecurityConfiguration

A `SecurityConfiguration` is a set of security properties that can be used by AWS Glue to encrypt data at rest.
Expand Down Expand Up @@ -84,6 +137,15 @@ new glue.SecurityConfiguration(stack, 'MySecurityConfiguration', {

See [documentation](https://docs.aws.amazon.com/glue/latest/dg/encryption-security-configuration.html) for more info for Glue encrypting data written by Crawlers, Jobs, and Development Endpoints.

## Database

A `Database` is a logical grouping of `Tables` in the Glue Catalog.

```ts
new glue.Database(stack, 'MyDatabase', {
databaseName: 'my_database'
});
```

## Table

Expand Down
112 changes: 112 additions & 0 deletions packages/@aws-cdk/aws-glue/lib/code.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import * as crypto from 'crypto';
import * as fs from 'fs';
import * as iam from '@aws-cdk/aws-iam';
import * as s3 from '@aws-cdk/aws-s3';
import * as s3assets from '@aws-cdk/aws-s3-assets';
import * as cdk from '@aws-cdk/core';
import * as constructs from 'constructs';

/**
* Represents a Glue Job's Code assets (an asset can be a scripts, a jar, a python file or any other file).
*/
export abstract class Code {

/**
* Job code as an S3 object.
* @param bucket The S3 bucket
* @param key The object key
*/
public static fromBucket(bucket: s3.IBucket, key: string): S3Code {
return new S3Code(bucket, key);
}

/**
* Job code from a local disk path.
*
* @param path code file (not a directory).
*/
public static fromAsset(path: string, options?: s3assets.AssetOptions): AssetCode {
return new AssetCode(path, options);
}

/**
* Called when the Job is initialized to allow this object to bind.
*/
public abstract bind(scope: constructs.Construct, grantable: iam.IGrantable): CodeConfig;
}

/**
* Glue job Code from an S3 bucket.
*/
export class S3Code extends Code {
constructor(private readonly bucket: s3.IBucket, private readonly key: string) {
super();
}

public bind(_scope: constructs.Construct, grantable: iam.IGrantable): CodeConfig {
this.bucket.grantRead(grantable, this.key);
return {
s3Location: {
bucketName: this.bucket.bucketName,
objectKey: this.key,
},
};
}
}

/**
* Job Code from a local file.
*/
export class AssetCode extends Code {
private asset?: s3assets.Asset;

/**
* @param path The path to the Code file.
*/
constructor(private readonly path: string, private readonly options: s3assets.AssetOptions = { }) {
super();

if (fs.lstatSync(this.path).isDirectory()) {
throw new Error(`Code path ${this.path} is a directory. Only files are supported`);
}
}

public bind(scope: constructs.Construct, grantable: iam.IGrantable): CodeConfig {
// If the same AssetCode is used multiple times, retain only the first instantiation.
if (!this.asset) {
this.asset = new s3assets.Asset(scope, `Code${this.hashcode(this.path)}`, {
path: this.path,
...this.options,
});
} else if (cdk.Stack.of(this.asset) !== cdk.Stack.of(scope)) {
throw new Error(`Asset is already associated with another stack '${cdk.Stack.of(this.asset).stackName}'. ` +
'Create a new Code instance for every stack.');
}
this.asset.grantRead(grantable);
return {
s3Location: {
bucketName: this.asset.s3BucketName,
objectKey: this.asset.s3ObjectKey,
},
};
}

/**
* Hash a string
*/
private hashcode(s: string): string {
const hash = crypto.createHash('md5');
hash.update(s);
return hash.digest('hex');
};
}

/**
* Result of binding `Code` into a `Job`.
*/
export interface CodeConfig {
/**
* The location of the code in S3.
*/
readonly s3Location: s3.Location;
}
3 changes: 3 additions & 0 deletions packages/@aws-cdk/aws-glue/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ export * from './glue.generated';
export * from './connection';
export * from './data-format';
export * from './database';
export * from './job';
export * from './job-executable';
export * from './code';
export * from './schema';
export * from './security-configuration';
export * from './table';
Loading

0 comments on commit fc74110

Please sign in to comment.