diff --git a/packages/@aws-cdk/aws-kinesisfirehose-destinations/lib/common.ts b/packages/@aws-cdk/aws-kinesisfirehose-destinations/lib/common.ts index 4f93758dbe35f..9155346f06b09 100644 --- a/packages/@aws-cdk/aws-kinesisfirehose-destinations/lib/common.ts +++ b/packages/@aws-cdk/aws-kinesisfirehose-destinations/lib/common.ts @@ -1,6 +1,9 @@ import * as iam from '@aws-cdk/aws-iam'; import * as firehose from '@aws-cdk/aws-kinesisfirehose'; +import * as kms from '@aws-cdk/aws-kms'; import * as logs from '@aws-cdk/aws-logs'; +import * as s3 from '@aws-cdk/aws-s3'; +import * as cdk from '@aws-cdk/core'; /** * Possible compression options Kinesis Data Firehose can use to compress data on delivery. @@ -40,9 +43,24 @@ export class Compression { } /** - * Generic properties for defining a delivery stream destination. + * Options for S3 record backup of a delivery stream. + */ +export enum BackupMode { + /** + * All records are backed up. + */ + ALL, + + /** + * Only records that failed to deliver or transform are backed up. + */ + FAILED, +} + +/** + * Logging related properties for a delivery stream destination. */ -export interface CommonDestinationProps { +interface DestinationLoggingProps { /** * If true, log errors when data transformation or data delivery fails. * @@ -58,7 +76,102 @@ export interface CommonDestinationProps { * @default - if `logging` is set to `true`, a log group will be created for you. */ readonly logGroup?: logs.ILogGroup; +} +/** + * Common properties for defining a backup, intermediary, or final S3 destination for a Kinesis Data Firehose delivery stream. + */ +export interface CommonDestinationS3Props { + /** + * The length of time that Firehose buffers incoming data before delivering + * it to the S3 bucket. + * + * Minimum: Duration.seconds(60) + * Maximum: Duration.seconds(900) + * + * @default Duration.seconds(300) + */ + readonly bufferingInterval?: cdk.Duration; + + /** + * The size of the buffer that Kinesis Data Firehose uses for incoming data before + * delivering it to the S3 bucket. + * + * Minimum: Size.mebibytes(1) + * Maximum: Size.mebibytes(128) + * + * @default Size.mebibytes(5) + */ + readonly bufferingSize?: cdk.Size; + + /** + * The type of compression that Kinesis Data Firehose uses to compress the data + * that it delivers to the Amazon S3 bucket. + * + * The compression formats SNAPPY or ZIP cannot be specified for Amazon Redshift + * destinations because they are not supported by the Amazon Redshift COPY operation + * that reads from the S3 bucket. + * + * @default - UNCOMPRESSED + */ + readonly compression?: Compression; + + /** + * The AWS KMS key used to encrypt the data that it delivers to your Amazon S3 bucket. + * + * @default - Data is not encrypted. + */ + readonly encryptionKey?: kms.IKey; + + /** + * A prefix that Kinesis Data Firehose evaluates and adds to failed records before writing them to S3. + * + * This prefix appears immediately following the bucket name. + * @see https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html + * + * @default "YYYY/MM/DD/HH" + */ + readonly errorOutputPrefix?: string; + + /** + * A prefix that Kinesis Data Firehose evaluates and adds to records before writing them to S3. + * + * This prefix appears immediately following the bucket name. + * @see https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html + * + * @default "YYYY/MM/DD/HH" + */ + readonly dataOutputPrefix?: string; +} + +/** + * Properties for defining an S3 backup destination. + * + * S3 backup is available for all destinations, regardless of whether the final destination is S3 or not. + */ +export interface DestinationS3BackupProps extends DestinationLoggingProps, CommonDestinationS3Props { + /** + * The S3 bucket that will store data and failed records. + * + * @default - If `backup` is set to `BackupMode.ALL` or `BackupMode.FAILED`, a bucket will be created for you. + */ + readonly bucket?: s3.IBucket; + + /** + * Indicates the mode by which incoming records should be backed up to S3, if any. + * + * If `backupBucket ` is provided, this will be implicitly set to `BackupMode.ALL`. + * + * @default - If `backupBucket` is provided, the default will be `BackupMode.ALL`. Otherwise, + * source records are not backed up to S3. + */ + readonly mode?: BackupMode; +} + +/** + * Generic properties for defining a delivery stream destination. + */ +export interface CommonDestinationProps extends DestinationLoggingProps { /** * The IAM role associated with this destination. * @@ -74,4 +187,11 @@ export interface CommonDestinationProps { * @default - no data transformation will occur. */ readonly processor?: firehose.IDataProcessor; + + /** + * The configuration for backing up source records to S3. + * + * @default - source records will not be backed up to S3. + */ + readonly s3Backup?: DestinationS3BackupProps; } diff --git a/packages/@aws-cdk/aws-kinesisfirehose-destinations/lib/private/helpers.ts b/packages/@aws-cdk/aws-kinesisfirehose-destinations/lib/private/helpers.ts index c3d1e4515715b..d8ef3d4ac43a5 100644 --- a/packages/@aws-cdk/aws-kinesisfirehose-destinations/lib/private/helpers.ts +++ b/packages/@aws-cdk/aws-kinesisfirehose-destinations/lib/private/helpers.ts @@ -2,8 +2,10 @@ import * as iam from '@aws-cdk/aws-iam'; import * as firehose from '@aws-cdk/aws-kinesisfirehose'; import * as kms from '@aws-cdk/aws-kms'; import * as logs from '@aws-cdk/aws-logs'; +import * as s3 from '@aws-cdk/aws-s3'; import * as cdk from '@aws-cdk/core'; import { Construct, Node } from 'constructs'; +import { DestinationS3BackupProps } from '../common'; export interface DestinationLoggingProps { /** @@ -35,19 +37,28 @@ export interface DestinationLoggingProps { readonly streamId: string; } -export interface DestinationLoggingOutput { +interface ConfigWithDependables { + /** + * Resources that were created by the sub-config creator that must be deployed before the delivery stream is deployed. + */ + readonly dependables: cdk.IDependable[]; +} + +export interface DestinationLoggingConfig extends ConfigWithDependables { /** * Logging options that will be injected into the destination configuration. */ readonly loggingOptions: firehose.CfnDeliveryStream.CloudWatchLoggingOptionsProperty; +} +export interface DestinationBackupConfig extends ConfigWithDependables { /** - * Resources that were created by the sub-config creator that must be deployed before the delivery stream is deployed. + * S3 backup configuration that will be injected into the destination configuration. */ - readonly dependables: cdk.IDependable[]; + readonly backupConfig: firehose.CfnDeliveryStream.S3DestinationConfigurationProperty; } -export function createLoggingOptions(scope: Construct, props: DestinationLoggingProps): DestinationLoggingOutput | undefined { +export function createLoggingOptions(scope: Construct, props: DestinationLoggingProps): DestinationLoggingConfig | undefined { if (props.logging === false && props.logGroup) { throw new Error('logging cannot be set to false when logGroup is provided'); } @@ -130,3 +141,33 @@ function renderDataProcessor( parameters, }; } + +export function createBackupConfig(scope: Construct, role: iam.IRole, props?: DestinationS3BackupProps): DestinationBackupConfig | undefined { + if (!props || (props.mode === undefined && !props.bucket)) { + return undefined; + } + + const bucket = props.bucket ?? new s3.Bucket(scope, 'BackupBucket'); + const bucketGrant = bucket.grantReadWrite(role); + + const { loggingOptions, dependables: loggingDependables } = createLoggingOptions(scope, { + logging: props.logging, + logGroup: props.logGroup, + role, + streamId: 'S3Backup', + }) ?? {}; + + return { + backupConfig: { + bucketArn: bucket.bucketArn, + roleArn: role.roleArn, + prefix: props.dataOutputPrefix, + errorOutputPrefix: props.errorOutputPrefix, + bufferingHints: createBufferingHints(props.bufferingInterval, props.bufferingSize), + compressionFormat: props.compression?.value, + encryptionConfiguration: createEncryptionConfig(role, props.encryptionKey), + cloudWatchLoggingOptions: loggingOptions, + }, + dependables: [bucketGrant, ...(loggingDependables ?? [])], + }; +} diff --git a/packages/@aws-cdk/aws-kinesisfirehose-destinations/lib/s3-bucket.ts b/packages/@aws-cdk/aws-kinesisfirehose-destinations/lib/s3-bucket.ts index ff2008eb9221c..100c73af16b10 100644 --- a/packages/@aws-cdk/aws-kinesisfirehose-destinations/lib/s3-bucket.ts +++ b/packages/@aws-cdk/aws-kinesisfirehose-destinations/lib/s3-bucket.ts @@ -1,83 +1,25 @@ import * as iam from '@aws-cdk/aws-iam'; import * as firehose from '@aws-cdk/aws-kinesisfirehose'; -import * as kms from '@aws-cdk/aws-kms'; import * as s3 from '@aws-cdk/aws-s3'; -import { Duration, Size } from '@aws-cdk/core'; import { Construct } from 'constructs'; -import { CommonDestinationProps, Compression } from './common'; -import { createBufferingHints, createEncryptionConfig, createLoggingOptions, createProcessingConfig } from './private/helpers'; +import { BackupMode, CommonDestinationProps, CommonDestinationS3Props } from './common'; +import { createBackupConfig, createBufferingHints, createEncryptionConfig, createLoggingOptions, createProcessingConfig } from './private/helpers'; /** * Props for defining an S3 destination of a Kinesis Data Firehose delivery stream. */ -export interface S3BucketProps extends CommonDestinationProps { - /** - * The length of time that Firehose buffers incoming data before delivering - * it to the S3 bucket. - * - * Minimum: Duration.seconds(60) - * Maximum: Duration.seconds(900) - * - * @default Duration.seconds(300) - */ - readonly bufferingInterval?: Duration; - - /** - * The size of the buffer that Kinesis Data Firehose uses for incoming data before - * delivering it to the S3 bucket. - * - * Minimum: Size.mebibytes(1) - * Maximum: Size.mebibytes(128) - * - * @default Size.mebibytes(5) - */ - readonly bufferingSize?: Size; - - /** - * The type of compression that Kinesis Data Firehose uses to compress the data - * that it delivers to the Amazon S3 bucket. - * - * The compression formats SNAPPY or ZIP cannot be specified for Amazon Redshift - * destinations because they are not supported by the Amazon Redshift COPY operation - * that reads from the S3 bucket. - * - * @default - no compression is applied - */ - readonly compression?: Compression; - - /** - * The AWS KMS key used to encrypt the data that it delivers to your Amazon S3 bucket. - * - * @default - Data is not encrypted. - */ - readonly encryptionKey?: kms.IKey; - - /** - * A prefix that Kinesis Data Firehose evaluates and adds to failed records before writing them to S3. - * - * This prefix appears immediately following the bucket name. - * @see https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html - * - * @default "YYYY/MM/DD/HH" - */ - readonly errorOutputPrefix?: string; - - /** - * A prefix that Kinesis Data Firehose evaluates and adds to records before writing them to S3. - * - * This prefix appears immediately following the bucket name. - * @see https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html - * - * @default "YYYY/MM/DD/HH" - */ - readonly dataOutputPrefix?: string; +export interface S3BucketProps extends CommonDestinationS3Props, CommonDestinationProps { } /** * An S3 bucket destination for data from a Kinesis Data Firehose delivery stream. */ export class S3Bucket implements firehose.IDestination { - constructor(private readonly bucket: s3.IBucket, private readonly props: S3BucketProps = {}) { } + constructor(private readonly bucket: s3.IBucket, private readonly props: S3BucketProps = {}) { + if (this.props.s3Backup?.mode === BackupMode.FAILED) { + throw new Error('S3 destinations do not support BackupMode.FAILED'); + } + } bind(scope: Construct, _options: firehose.DestinationBindOptions): firehose.DestinationConfig { const role = this.props.role ?? new iam.Role(scope, 'S3 Destination Role', { @@ -93,11 +35,14 @@ export class S3Bucket implements firehose.IDestination { streamId: 'S3Destination', }) ?? {}; + const { backupConfig, dependables: backupDependables } = createBackupConfig(scope, role, this.props.s3Backup) ?? {}; return { extendedS3DestinationConfiguration: { cloudWatchLoggingOptions: loggingOptions, processingConfiguration: createProcessingConfig(scope, role, this.props.processor), roleArn: role.roleArn, + s3BackupConfiguration: backupConfig, + s3BackupMode: this.getS3BackupMode(), bufferingHints: createBufferingHints(this.props.bufferingInterval, this.props.bufferingSize), bucketArn: this.bucket.bucketArn, compressionFormat: this.props.compression?.value, @@ -105,7 +50,13 @@ export class S3Bucket implements firehose.IDestination { errorOutputPrefix: this.props.errorOutputPrefix, prefix: this.props.dataOutputPrefix, }, - dependables: [bucketGrant, ...(loggingDependables ?? [])], + dependables: [bucketGrant, ...(loggingDependables ?? []), ...(backupDependables ?? [])], }; } + + private getS3BackupMode(): string | undefined { + return this.props.s3Backup?.bucket || this.props.s3Backup?.mode === BackupMode.ALL + ? 'Enabled' + : undefined; + } } diff --git a/packages/@aws-cdk/aws-kinesisfirehose-destinations/test/integ.s3-bucket.lit.expected.json b/packages/@aws-cdk/aws-kinesisfirehose-destinations/test/integ.s3-bucket.lit.expected.json index 0bbccefc6e17c..2a85e28a6984d 100644 --- a/packages/@aws-cdk/aws-kinesisfirehose-destinations/test/integ.s3-bucket.lit.expected.json +++ b/packages/@aws-cdk/aws-kinesisfirehose-destinations/test/integ.s3-bucket.lit.expected.json @@ -165,6 +165,81 @@ "CustomS3AutoDeleteObjectsCustomResourceProviderRole3B1BD092" ] }, + "BackupBucket26B8E51C": { + "Type": "AWS::S3::Bucket", + "UpdateReplacePolicy": "Delete", + "DeletionPolicy": "Delete" + }, + "BackupBucketPolicy8C403F71": { + "Type": "AWS::S3::BucketPolicy", + "Properties": { + "Bucket": { + "Ref": "BackupBucket26B8E51C" + }, + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "s3:GetBucket*", + "s3:List*", + "s3:DeleteObject*" + ], + "Effect": "Allow", + "Principal": { + "AWS": { + "Fn::GetAtt": [ + "CustomS3AutoDeleteObjectsCustomResourceProviderRole3B1BD092", + "Arn" + ] + } + }, + "Resource": [ + { + "Fn::GetAtt": [ + "BackupBucket26B8E51C", + "Arn" + ] + }, + { + "Fn::Join": [ + "", + [ + { + "Fn::GetAtt": [ + "BackupBucket26B8E51C", + "Arn" + ] + }, + "/*" + ] + ] + } + ] + } + ], + "Version": "2012-10-17" + } + } + }, + "BackupBucketAutoDeleteObjectsCustomResourceD2F511C5": { + "Type": "Custom::S3AutoDeleteObjects", + "Properties": { + "ServiceToken": { + "Fn::GetAtt": [ + "CustomS3AutoDeleteObjectsCustomResourceProviderHandler9D90184F", + "Arn" + ] + }, + "BucketName": { + "Ref": "BackupBucket26B8E51C" + } + }, + "DependsOn": [ + "BackupBucketPolicy8C403F71" + ], + "UpdateReplacePolicy": "Delete", + "DeletionPolicy": "Delete" + }, "LogGroupF5B46931": { "Type": "AWS::Logs::LogGroup", "Properties": { @@ -309,6 +384,41 @@ "UpdateReplacePolicy": "Delete", "DeletionPolicy": "Delete" }, + "BackupKey60B97760": { + "Type": "AWS::KMS::Key", + "Properties": { + "KeyPolicy": { + "Statement": [ + { + "Action": "kms:*", + "Effect": "Allow", + "Principal": { + "AWS": { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::", + { + "Ref": "AWS::AccountId" + }, + ":root" + ] + ] + } + }, + "Resource": "*" + } + ], + "Version": "2012-10-17" + } + }, + "UpdateReplacePolicy": "Delete", + "DeletionPolicy": "Delete" + }, "DeliveryStreamServiceRole964EEBCC": { "Type": "AWS::IAM::Role", "Properties": { @@ -394,6 +504,67 @@ ] } }, + { + "Action": [ + "s3:GetObject*", + "s3:GetBucket*", + "s3:List*", + "s3:DeleteObject*", + "s3:PutObject", + "s3:Abort*" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::GetAtt": [ + "BackupBucket26B8E51C", + "Arn" + ] + }, + { + "Fn::Join": [ + "", + [ + { + "Fn::GetAtt": [ + "BackupBucket26B8E51C", + "Arn" + ] + }, + "/*" + ] + ] + } + ] + }, + { + "Action": [ + "logs:CreateLogStream", + "logs:PutLogEvents" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "DeliveryStreamLogGroup9D8FA3BB", + "Arn" + ] + } + }, + { + "Action": [ + "kms:Decrypt", + "kms:Encrypt", + "kms:ReEncrypt*", + "kms:GenerateDataKey*" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "BackupKey60B97760", + "Arn" + ] + } + }, { "Action": "lambda:InvokeFunction", "Effect": "Allow", @@ -430,6 +601,24 @@ ] } }, + "DeliveryStreamLogGroup9D8FA3BB": { + "Type": "AWS::Logs::LogGroup", + "Properties": { + "RetentionInDays": 731 + }, + "UpdateReplacePolicy": "Retain", + "DeletionPolicy": "Retain" + }, + "DeliveryStreamLogGroupS3BackupD5DF41B2": { + "Type": "AWS::Logs::LogStream", + "Properties": { + "LogGroupName": { + "Ref": "DeliveryStreamLogGroup9D8FA3BB" + } + }, + "UpdateReplacePolicy": "Retain", + "DeletionPolicy": "Retain" + }, "DeliveryStreamF6D5572D": { "Type": "AWS::KinesisFirehose::DeliveryStream", "Properties": { @@ -512,7 +701,48 @@ "DeliveryStreamS3DestinationRole500FC089", "Arn" ] - } + }, + "S3BackupConfiguration": { + "BucketARN": { + "Fn::GetAtt": [ + "BackupBucket26B8E51C", + "Arn" + ] + }, + "BufferingHints": { + "IntervalInSeconds": 60, + "SizeInMBs": 1 + }, + "CloudWatchLoggingOptions": { + "Enabled": true, + "LogGroupName": { + "Ref": "DeliveryStreamLogGroup9D8FA3BB" + }, + "LogStreamName": { + "Ref": "DeliveryStreamLogGroupS3BackupD5DF41B2" + } + }, + "CompressionFormat": "ZIP", + "EncryptionConfiguration": { + "KMSEncryptionConfig": { + "AWSKMSKeyARN": { + "Fn::GetAtt": [ + "BackupKey60B97760", + "Arn" + ] + } + } + }, + "ErrorOutputPrefix": "backupErrorPrefix", + "Prefix": "backupPrefix", + "RoleARN": { + "Fn::GetAtt": [ + "DeliveryStreamS3DestinationRole500FC089", + "Arn" + ] + } + }, + "S3BackupMode": "Enabled" } }, "DependsOn": [ diff --git a/packages/@aws-cdk/aws-kinesisfirehose-destinations/test/integ.s3-bucket.lit.ts b/packages/@aws-cdk/aws-kinesisfirehose-destinations/test/integ.s3-bucket.lit.ts index 83cc143c28654..0527cbd26f7d7 100644 --- a/packages/@aws-cdk/aws-kinesisfirehose-destinations/test/integ.s3-bucket.lit.ts +++ b/packages/@aws-cdk/aws-kinesisfirehose-destinations/test/integ.s3-bucket.lit.ts @@ -17,6 +17,10 @@ const bucket = new s3.Bucket(stack, 'Bucket', { autoDeleteObjects: true, }); +const backupBucket = new s3.Bucket(stack, 'BackupBucket', { + removalPolicy: cdk.RemovalPolicy.DESTROY, + autoDeleteObjects: true, +}); const logGroup = new logs.LogGroup(stack, 'LogGroup', { removalPolicy: cdk.RemovalPolicy.DESTROY, }); @@ -36,6 +40,10 @@ const key = new kms.Key(stack, 'Key', { removalPolicy: cdk.RemovalPolicy.DESTROY, }); +const backupKey = new kms.Key(stack, 'BackupKey', { + removalPolicy: cdk.RemovalPolicy.DESTROY, +}); + new firehose.DeliveryStream(stack, 'Delivery Stream', { destinations: [new destinations.S3Bucket(bucket, { logging: true, @@ -47,6 +55,16 @@ new firehose.DeliveryStream(stack, 'Delivery Stream', { bufferingInterval: cdk.Duration.seconds(60), bufferingSize: cdk.Size.mebibytes(1), encryptionKey: key, + s3Backup: { + mode: destinations.BackupMode.ALL, + bucket: backupBucket, + compression: destinations.Compression.ZIP, + dataOutputPrefix: 'backupPrefix', + errorOutputPrefix: 'backupErrorPrefix', + bufferingInterval: cdk.Duration.seconds(60), + bufferingSize: cdk.Size.mebibytes(1), + encryptionKey: backupKey, + }, })], }); diff --git a/packages/@aws-cdk/aws-kinesisfirehose-destinations/test/s3-bucket.test.ts b/packages/@aws-cdk/aws-kinesisfirehose-destinations/test/s3-bucket.test.ts index fb283b2aac911..dce9a250cbd43 100644 --- a/packages/@aws-cdk/aws-kinesisfirehose-destinations/test/s3-bucket.test.ts +++ b/packages/@aws-cdk/aws-kinesisfirehose-destinations/test/s3-bucket.test.ts @@ -476,4 +476,132 @@ describe('S3 destination', () => { }); }); }); + + describe('s3 backup configuration', () => { + it('set backupMode to ALL creates resources', () => { + const destination = new firehosedestinations.S3Bucket(bucket, { + role: destinationRole, + s3Backup: { + mode: firehosedestinations.BackupMode.ALL, + }, + }); + new firehose.DeliveryStream(stack, 'DeliveryStream', { + destinations: [destination], + }); + + expect(stack).toHaveResourceLike('AWS::KinesisFirehose::DeliveryStream', { + ExtendedS3DestinationConfiguration: { + S3BackupConfiguration: { + BucketARN: anything(), + CloudWatchLoggingOptions: { + Enabled: true, + LogGroupName: anything(), + LogStreamName: anything(), + }, + RoleARN: stack.resolve(destinationRole.roleArn), + }, + S3BackupMode: 'Enabled', + }, + }); + }); + + it('sets backup configuration if backup bucket provided', () => { + const backupBucket = new s3.Bucket(stack, 'MyBackupBucket'); + const destination = new firehosedestinations.S3Bucket(bucket, { + role: destinationRole, + s3Backup: { + bucket: backupBucket, + }, + }); + new firehose.DeliveryStream(stack, 'DeliveryStream', { + destinations: [destination], + }); + + expect(stack).toHaveResourceLike('AWS::KinesisFirehose::DeliveryStream', { + ExtendedS3DestinationConfiguration: { + S3BackupConfiguration: { + BucketARN: stack.resolve(backupBucket.bucketArn), + CloudWatchLoggingOptions: { + Enabled: true, + LogGroupName: anything(), + LogStreamName: anything(), + }, + RoleARN: stack.resolve(destinationRole.roleArn), + }, + S3BackupMode: 'Enabled', + }, + }); + }); + + it('throws error if backupMode set to FAILED', () => { + expect(() => new firehosedestinations.S3Bucket(bucket, { + role: destinationRole, + s3Backup: { + mode: firehosedestinations.BackupMode.FAILED, + }, + })).toThrowError('S3 destinations do not support BackupMode.FAILED'); + }); + + it('by default does not create resources', () => { + const destination = new firehosedestinations.S3Bucket(bucket); + new firehose.DeliveryStream(stack, 'DeliveryStream', { + destinations: [destination], + }); + + expect(stack).toHaveResource('AWS::S3::Bucket', 1); + expect(stack).toHaveResourceLike('AWS::KinesisFirehose::DeliveryStream', { + ExtendedS3DestinationConfiguration: { + S3BackupConfiguration: ABSENT, + }, + }); + }); + + it('sets full backup configuration', () => { + const backupBucket = new s3.Bucket(stack, 'MyBackupBucket'); + const key = new kms.Key(stack, 'Key'); + const logGroup = new logs.LogGroup(stack, 'BackupLogGroup'); + const destination = new firehosedestinations.S3Bucket(bucket, { + role: destinationRole, + s3Backup: { + mode: firehosedestinations.BackupMode.ALL, + bucket: backupBucket, + dataOutputPrefix: 'myBackupPrefix', + errorOutputPrefix: 'myBackupErrorPrefix', + bufferingSize: cdk.Size.mebibytes(1), + bufferingInterval: cdk.Duration.minutes(1), + compression: firehosedestinations.Compression.ZIP, + encryptionKey: key, + logging: true, + logGroup: logGroup, + }, + }); + new firehose.DeliveryStream(stack, 'DeliveryStream', { + destinations: [destination], + }); + + expect(stack).toHaveResourceLike('AWS::KinesisFirehose::DeliveryStream', { + ExtendedS3DestinationConfiguration: { + S3BackupConfiguration: { + BucketARN: stack.resolve(backupBucket.bucketArn), + CloudWatchLoggingOptions: { + Enabled: true, + LogGroupName: stack.resolve(logGroup.logGroupName), + LogStreamName: anything(), + }, + RoleARN: stack.resolve(destinationRole.roleArn), + EncryptionConfiguration: { + KMSEncryptionConfig: { + AWSKMSKeyARN: stack.resolve(key.keyArn), + }, + }, + Prefix: 'myBackupPrefix', + ErrorOutputPrefix: 'myBackupErrorPrefix', + BufferingHints: {}, + CompressionFormat: 'ZIP', + }, + S3BackupMode: 'Enabled', + }, + }); + }); + }); }); diff --git a/packages/@aws-cdk/aws-kinesisfirehose/README.md b/packages/@aws-cdk/aws-kinesisfirehose/README.md index 20a1883f19065..b8ec936b46901 100644 --- a/packages/@aws-cdk/aws-kinesisfirehose/README.md +++ b/packages/@aws-cdk/aws-kinesisfirehose/README.md @@ -311,6 +311,57 @@ new DeliveryStream(this, 'Delivery Stream', { }); ``` +## Backup + +A delivery stream can be configured to backup data to S3 that it attempted to deliver to +the configured destination. Backed up data can be all the data that the delivery stream +attempted to deliver or just data that it failed to deliver (Redshift and S3 destinations +can only backup all data). CDK can create a new S3 bucket where it will back up data or +you can provide a bucket where data will be backed up. You can also provide a prefix under +which your backed-up data will be placed within the bucket. By default, source data is not +backed up to S3. + +```ts fixture=with-bucket +import * as destinations from '@aws-cdk/aws-kinesisfirehose-destinations'; +import * as s3 from '@aws-cdk/aws-s3'; + +// Enable backup of all source records (to an S3 bucket created by CDK). +new DeliveryStream(this, 'Delivery Stream Backup All', { + destinations: [ + new destinations.S3Bucket(bucket, { + s3Backup: { + mode: BackupMode.ALL, + } + }), + ], +}); +// Explicitly provide an S3 bucket to which all source records will be backed up. +const backupBucket = new s3.Bucket(this, 'Bucket'); +new DeliveryStream(this, 'Delivery Stream Backup All Explicit Bucket', { + destinations: [ + new destinations.S3Bucket(bucket, { + s3Backup: { + bucket: backupBucket, + } + }), + ], +}); +// Explicitly provide an S3 prefix under which all source records will be backed up. +new DeliveryStream(this, 'Delivery Stream Backup All Explicit Prefix', { + destinations: [ + new destinations.S3Bucket(bucket, { + s3Backup: { + mode: BackupMode.ALL, + dataOutputPrefix: 'mybackup', + }, + }), + ], +}); +``` + +If any Data Processing or Transformation is configured on your Delivery Stream, the source +records will be backed up in their original format. + ## Data Processing/Transformation Data can be transformed before being delivered to destinations. There are two types of