Skip to content

Commit

Permalink
feat(kinesisfirehose): add support for backing up source records to S3 (
Browse files Browse the repository at this point in the history
aws#15725)

Closes aws#15724 


----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
madeline-k authored and hollanddd committed Aug 26, 2021
1 parent c9fe386 commit 2c7f2eb
Show file tree
Hide file tree
Showing 7 changed files with 613 additions and 74 deletions.
124 changes: 122 additions & 2 deletions packages/@aws-cdk/aws-kinesisfirehose-destinations/lib/common.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import * as iam from '@aws-cdk/aws-iam';
import * as firehose from '@aws-cdk/aws-kinesisfirehose';
import * as kms from '@aws-cdk/aws-kms';
import * as logs from '@aws-cdk/aws-logs';
import * as s3 from '@aws-cdk/aws-s3';
import * as cdk from '@aws-cdk/core';

/**
* Possible compression options Kinesis Data Firehose can use to compress data on delivery.
Expand Down Expand Up @@ -40,9 +43,24 @@ export class Compression {
}

/**
* Generic properties for defining a delivery stream destination.
* Options for S3 record backup of a delivery stream.
*/
export enum BackupMode {
/**
* All records are backed up.
*/
ALL,

/**
* Only records that failed to deliver or transform are backed up.
*/
FAILED,
}

/**
* Logging related properties for a delivery stream destination.
*/
export interface CommonDestinationProps {
interface DestinationLoggingProps {
/**
* If true, log errors when data transformation or data delivery fails.
*
Expand All @@ -58,7 +76,102 @@ export interface CommonDestinationProps {
* @default - if `logging` is set to `true`, a log group will be created for you.
*/
readonly logGroup?: logs.ILogGroup;
}

/**
* Common properties for defining a backup, intermediary, or final S3 destination for a Kinesis Data Firehose delivery stream.
*/
export interface CommonDestinationS3Props {
/**
* The length of time that Firehose buffers incoming data before delivering
* it to the S3 bucket.
*
* Minimum: Duration.seconds(60)
* Maximum: Duration.seconds(900)
*
* @default Duration.seconds(300)
*/
readonly bufferingInterval?: cdk.Duration;

/**
* The size of the buffer that Kinesis Data Firehose uses for incoming data before
* delivering it to the S3 bucket.
*
* Minimum: Size.mebibytes(1)
* Maximum: Size.mebibytes(128)
*
* @default Size.mebibytes(5)
*/
readonly bufferingSize?: cdk.Size;

/**
* The type of compression that Kinesis Data Firehose uses to compress the data
* that it delivers to the Amazon S3 bucket.
*
* The compression formats SNAPPY or ZIP cannot be specified for Amazon Redshift
* destinations because they are not supported by the Amazon Redshift COPY operation
* that reads from the S3 bucket.
*
* @default - UNCOMPRESSED
*/
readonly compression?: Compression;

/**
* The AWS KMS key used to encrypt the data that it delivers to your Amazon S3 bucket.
*
* @default - Data is not encrypted.
*/
readonly encryptionKey?: kms.IKey;

/**
* A prefix that Kinesis Data Firehose evaluates and adds to failed records before writing them to S3.
*
* This prefix appears immediately following the bucket name.
* @see https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html
*
* @default "YYYY/MM/DD/HH"
*/
readonly errorOutputPrefix?: string;

/**
* A prefix that Kinesis Data Firehose evaluates and adds to records before writing them to S3.
*
* This prefix appears immediately following the bucket name.
* @see https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html
*
* @default "YYYY/MM/DD/HH"
*/
readonly dataOutputPrefix?: string;
}

/**
* Properties for defining an S3 backup destination.
*
* S3 backup is available for all destinations, regardless of whether the final destination is S3 or not.
*/
export interface DestinationS3BackupProps extends DestinationLoggingProps, CommonDestinationS3Props {
/**
* The S3 bucket that will store data and failed records.
*
* @default - If `backup` is set to `BackupMode.ALL` or `BackupMode.FAILED`, a bucket will be created for you.
*/
readonly bucket?: s3.IBucket;

/**
* Indicates the mode by which incoming records should be backed up to S3, if any.
*
* If `backupBucket ` is provided, this will be implicitly set to `BackupMode.ALL`.
*
* @default - If `backupBucket` is provided, the default will be `BackupMode.ALL`. Otherwise,
* source records are not backed up to S3.
*/
readonly mode?: BackupMode;
}

/**
* Generic properties for defining a delivery stream destination.
*/
export interface CommonDestinationProps extends DestinationLoggingProps {
/**
* The IAM role associated with this destination.
*
Expand All @@ -74,4 +187,11 @@ export interface CommonDestinationProps {
* @default - no data transformation will occur.
*/
readonly processor?: firehose.IDataProcessor;

/**
* The configuration for backing up source records to S3.
*
* @default - source records will not be backed up to S3.
*/
readonly s3Backup?: DestinationS3BackupProps;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ import * as iam from '@aws-cdk/aws-iam';
import * as firehose from '@aws-cdk/aws-kinesisfirehose';
import * as kms from '@aws-cdk/aws-kms';
import * as logs from '@aws-cdk/aws-logs';
import * as s3 from '@aws-cdk/aws-s3';
import * as cdk from '@aws-cdk/core';
import { Construct, Node } from 'constructs';
import { DestinationS3BackupProps } from '../common';

export interface DestinationLoggingProps {
/**
Expand Down Expand Up @@ -35,19 +37,28 @@ export interface DestinationLoggingProps {
readonly streamId: string;
}

export interface DestinationLoggingOutput {
interface ConfigWithDependables {
/**
* Resources that were created by the sub-config creator that must be deployed before the delivery stream is deployed.
*/
readonly dependables: cdk.IDependable[];
}

export interface DestinationLoggingConfig extends ConfigWithDependables {
/**
* Logging options that will be injected into the destination configuration.
*/
readonly loggingOptions: firehose.CfnDeliveryStream.CloudWatchLoggingOptionsProperty;
}

export interface DestinationBackupConfig extends ConfigWithDependables {
/**
* Resources that were created by the sub-config creator that must be deployed before the delivery stream is deployed.
* S3 backup configuration that will be injected into the destination configuration.
*/
readonly dependables: cdk.IDependable[];
readonly backupConfig: firehose.CfnDeliveryStream.S3DestinationConfigurationProperty;
}

export function createLoggingOptions(scope: Construct, props: DestinationLoggingProps): DestinationLoggingOutput | undefined {
export function createLoggingOptions(scope: Construct, props: DestinationLoggingProps): DestinationLoggingConfig | undefined {
if (props.logging === false && props.logGroup) {
throw new Error('logging cannot be set to false when logGroup is provided');
}
Expand Down Expand Up @@ -130,3 +141,33 @@ function renderDataProcessor(
parameters,
};
}

export function createBackupConfig(scope: Construct, role: iam.IRole, props?: DestinationS3BackupProps): DestinationBackupConfig | undefined {
if (!props || (props.mode === undefined && !props.bucket)) {
return undefined;
}

const bucket = props.bucket ?? new s3.Bucket(scope, 'BackupBucket');
const bucketGrant = bucket.grantReadWrite(role);

const { loggingOptions, dependables: loggingDependables } = createLoggingOptions(scope, {
logging: props.logging,
logGroup: props.logGroup,
role,
streamId: 'S3Backup',
}) ?? {};

return {
backupConfig: {
bucketArn: bucket.bucketArn,
roleArn: role.roleArn,
prefix: props.dataOutputPrefix,
errorOutputPrefix: props.errorOutputPrefix,
bufferingHints: createBufferingHints(props.bufferingInterval, props.bufferingSize),
compressionFormat: props.compression?.value,
encryptionConfiguration: createEncryptionConfig(role, props.encryptionKey),
cloudWatchLoggingOptions: loggingOptions,
},
dependables: [bucketGrant, ...(loggingDependables ?? [])],
};
}
85 changes: 18 additions & 67 deletions packages/@aws-cdk/aws-kinesisfirehose-destinations/lib/s3-bucket.ts
Original file line number Diff line number Diff line change
@@ -1,83 +1,25 @@
import * as iam from '@aws-cdk/aws-iam';
import * as firehose from '@aws-cdk/aws-kinesisfirehose';
import * as kms from '@aws-cdk/aws-kms';
import * as s3 from '@aws-cdk/aws-s3';
import { Duration, Size } from '@aws-cdk/core';
import { Construct } from 'constructs';
import { CommonDestinationProps, Compression } from './common';
import { createBufferingHints, createEncryptionConfig, createLoggingOptions, createProcessingConfig } from './private/helpers';
import { BackupMode, CommonDestinationProps, CommonDestinationS3Props } from './common';
import { createBackupConfig, createBufferingHints, createEncryptionConfig, createLoggingOptions, createProcessingConfig } from './private/helpers';

/**
* Props for defining an S3 destination of a Kinesis Data Firehose delivery stream.
*/
export interface S3BucketProps extends CommonDestinationProps {
/**
* The length of time that Firehose buffers incoming data before delivering
* it to the S3 bucket.
*
* Minimum: Duration.seconds(60)
* Maximum: Duration.seconds(900)
*
* @default Duration.seconds(300)
*/
readonly bufferingInterval?: Duration;

/**
* The size of the buffer that Kinesis Data Firehose uses for incoming data before
* delivering it to the S3 bucket.
*
* Minimum: Size.mebibytes(1)
* Maximum: Size.mebibytes(128)
*
* @default Size.mebibytes(5)
*/
readonly bufferingSize?: Size;

/**
* The type of compression that Kinesis Data Firehose uses to compress the data
* that it delivers to the Amazon S3 bucket.
*
* The compression formats SNAPPY or ZIP cannot be specified for Amazon Redshift
* destinations because they are not supported by the Amazon Redshift COPY operation
* that reads from the S3 bucket.
*
* @default - no compression is applied
*/
readonly compression?: Compression;

/**
* The AWS KMS key used to encrypt the data that it delivers to your Amazon S3 bucket.
*
* @default - Data is not encrypted.
*/
readonly encryptionKey?: kms.IKey;

/**
* A prefix that Kinesis Data Firehose evaluates and adds to failed records before writing them to S3.
*
* This prefix appears immediately following the bucket name.
* @see https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html
*
* @default "YYYY/MM/DD/HH"
*/
readonly errorOutputPrefix?: string;

/**
* A prefix that Kinesis Data Firehose evaluates and adds to records before writing them to S3.
*
* This prefix appears immediately following the bucket name.
* @see https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html
*
* @default "YYYY/MM/DD/HH"
*/
readonly dataOutputPrefix?: string;
export interface S3BucketProps extends CommonDestinationS3Props, CommonDestinationProps {
}

/**
* An S3 bucket destination for data from a Kinesis Data Firehose delivery stream.
*/
export class S3Bucket implements firehose.IDestination {
constructor(private readonly bucket: s3.IBucket, private readonly props: S3BucketProps = {}) { }
constructor(private readonly bucket: s3.IBucket, private readonly props: S3BucketProps = {}) {
if (this.props.s3Backup?.mode === BackupMode.FAILED) {
throw new Error('S3 destinations do not support BackupMode.FAILED');
}
}

bind(scope: Construct, _options: firehose.DestinationBindOptions): firehose.DestinationConfig {
const role = this.props.role ?? new iam.Role(scope, 'S3 Destination Role', {
Expand All @@ -93,19 +35,28 @@ export class S3Bucket implements firehose.IDestination {
streamId: 'S3Destination',
}) ?? {};

const { backupConfig, dependables: backupDependables } = createBackupConfig(scope, role, this.props.s3Backup) ?? {};
return {
extendedS3DestinationConfiguration: {
cloudWatchLoggingOptions: loggingOptions,
processingConfiguration: createProcessingConfig(scope, role, this.props.processor),
roleArn: role.roleArn,
s3BackupConfiguration: backupConfig,
s3BackupMode: this.getS3BackupMode(),
bufferingHints: createBufferingHints(this.props.bufferingInterval, this.props.bufferingSize),
bucketArn: this.bucket.bucketArn,
compressionFormat: this.props.compression?.value,
encryptionConfiguration: createEncryptionConfig(role, this.props.encryptionKey),
errorOutputPrefix: this.props.errorOutputPrefix,
prefix: this.props.dataOutputPrefix,
},
dependables: [bucketGrant, ...(loggingDependables ?? [])],
dependables: [bucketGrant, ...(loggingDependables ?? []), ...(backupDependables ?? [])],
};
}

private getS3BackupMode(): string | undefined {
return this.props.s3Backup?.bucket || this.props.s3Backup?.mode === BackupMode.ALL
? 'Enabled'
: undefined;
}
}
Loading

0 comments on commit 2c7f2eb

Please sign in to comment.