Skip to content

Commit

Permalink
feat(kinesisfirehose-destinations): add support for prefixes in the S…
Browse files Browse the repository at this point in the history
…3 destination (#15552)

Closes #15551 

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
madeline-k authored Jul 28, 2021
1 parent 1eb56a0 commit d227e48
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,26 @@ export interface S3BucketProps extends CommonDestinationProps {
* @default - no compression is applied
*/
readonly compression?: Compression;

/**
* A prefix that Kinesis Data Firehose evaluates and adds to failed records before writing them to S3.
*
* This prefix appears immediately following the bucket name.
* @see https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html
*
* @default "YYYY/MM/DD/HH"
*/
readonly errorOutputPrefix?: string;

/**
* A prefix that Kinesis Data Firehose evaluates and adds to records before writing them to S3.
*
* This prefix appears immediately following the bucket name.
* @see https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html
*
* @default "YYYY/MM/DD/HH"
*/
readonly dataOutputPrefix?: string;
}

/**
Expand Down Expand Up @@ -48,6 +68,8 @@ export class S3Bucket implements firehose.IDestination {
roleArn: role.roleArn,
bucketArn: this.bucket.bucketArn,
compressionFormat: this.props.compression?.value,
errorOutputPrefix: this.props.errorOutputPrefix,
prefix: this.props.dataOutputPrefix,
},
dependables: [bucketGrant, ...(loggingDependables ?? [])],
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@
}
},
"CompressionFormat": "GZIP",
"ErrorOutputPrefix": "errorPrefix",
"Prefix": "regularPrefix",
"RoleARN": {
"Fn::GetAtt": [
"DeliveryStreamS3DestinationRole500FC089",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ new firehose.DeliveryStream(stack, 'Delivery Stream', {
logging: true,
logGroup: logGroup,
compression: destinations.Compression.GZIP,
dataOutputPrefix: 'regularPrefix',
errorOutputPrefix: 'errorPrefix',
})],
});

app.synth();
app.synth();
13 changes: 13 additions & 0 deletions packages/@aws-cdk/aws-kinesisfirehose/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@ new DeliveryStream(this, 'Delivery Stream', {
});
```

The S3 destination also supports custom dynamic prefixes. `prefix` will be used for files
successfully delivered to S3. `errorOutputPrefix` will be added to failed records before
writing them to S3.

```ts fixture=with-bucket
const s3Destination = new destinations.S3Bucket(bucket, {
dataOutputPrefix: 'myFirehose/DeliveredYear=!{timestamp:yyyy}/anyMonth/rand=!{firehose:random-string}',
errorOutputPrefix: 'myFirehoseFailures/!{firehose:error-output-type}/!{timestamp:yyyy}/anyMonth/!{timestamp:dd}',
});
```

See: [Custom S3 Prefixes](https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html) in the *Kinesis Data Firehose Developer Guide*.

## Server-side Encryption

Enabling server-side encryption (SSE) requires Kinesis Data Firehose to encrypt all data
Expand Down

1 comment on commit d227e48

@jakejscott
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madeline-k

I've been playing around with this today, and I needed to add property overrides to the ExtendedS3DestinationConfiguration because my dataOutputPrefix contained partitionKeyFromQuery fragments.

const s3Destination = new destinations.S3Bucket(s3DestinationBucket, {
  logging: true,
  logGroup: s3DestinationLogGroup,
  bufferingInterval: cdk.Duration.seconds(60),
  bufferingSize: cdk.Size.mebibytes(64),
  compression: undefined,
  dataOutputPrefix:
    "user/!{partitionKeyFromQuery:userId}/order/!{partitionKeyFromQuery:orderId}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/!{timestamp:HH}/",
  errorOutputPrefix:
    "failed/!{firehose:error-output-type}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/!{timestamp:HH}/",
});

const s3DeliveryStream = new kinesisfirehose.DeliveryStream(
  this,
  "S3DeliveryStream",
  {
    destinations: [s3Destination],
  }
);

s3DeliveryStreamEscapeHatch.addPropertyOverride(
  "ExtendedS3DestinationConfiguration.DynamicPartitioningConfiguration",
  {
    Enabled: true,
    RetryOptions: {
      DurationInSeconds: 300,
    },
  }
);

s3DeliveryStreamEscapeHatch.addPropertyOverride(
  "ExtendedS3DestinationConfiguration.ProcessingConfiguration",
  {
    Enabled: true,
    Processors: [
      {
        Type: "MetadataExtraction",
        Parameters: [
          {
            ParameterName: "MetadataExtractionQuery",
            ParameterValue: "{userId:.userId,orderId:.orderId}",
          },
          {
            ParameterName: "JsonParsingEngine",
            ParameterValue: "JQ-1.6",
          },
        ],
      },
    ],
  }
);

Please sign in to comment.