Skip to content

Commit 79bcba2

Browse files
authored
feat(kinesisfirehose): support DeliveryStream record format conversion for S3 Bucket Destination (#35410)
### Issue \# Closes #15501. ### Reason for this change From the Kinesis Firehose RFC, the ability to define record format conversion is still missing https://github.com/aws/aws-cdk-rfcs/blob/main/text/0340-firehose-l2.md#record-format-conversion-using-aws-glue ### Description of changes See my comment in the issue thread #15501 (comment) for a few more details about the changes These changes add several classes and data models to support Firehose's [record format conversion](https://docs.aws.amazon.com/firehose/latest/dev/record-format-conversion.html) feature with the L2 `DeliveryStream` construct, as specified in the [RFC](https://github.com/aws/aws-cdk-rfcs/blob/main/text/0340-firehose-l2.md#record-format-conversion-using-aws-glue). The main focus for the design is to allow configurability of the underlying settings while still providing sane defaults for the 99% of users that won't touch the advanced settings. One note is that the RFC shows usage of the (as of now) alpha `glue.Table` construct. Since those constructs are not yet in stable release, we should support `glue.CfnTable` for now and provide a way to later use the L2 construct whenever that is released without any breaking changes. With these changes merged, users can specify record format conversion like so: ```ts import * as glue from '@aws-cdk/aws-glue'; import * as s3 from '@aws-cdk/aws-s3'; import * as destinations from '@aws-cdk/aws-kinesisfirehose-destinations'; declare const myGlueSchemaTable: glue.CfnTable; declare const bucket: s3.Bucket; new firehose.DeliveryStream(this, 'Delivery Stream', { destination: new destinations.S3Bucket(bucket, { dataFormatConversion: { schemaConfiguration: firehose.SchemaConfiguration.fromCfnTable(myGlueSchemaTable), inputFormat: firehose.InputFormat.OPENX_JSON, outputFormat: firehose.OuputFormat.PARQUET, }, }), }); ``` If you want to customize the parameters of the OPENX_JSON input format, specify it like so: ```ts const inputFormat = new OpenXJsonInputFormat({ lowercaseColumnNames: false, // default true columnToJsonKeyMappings: { 'columnA': 'path.to.key', 'columnB': 'path.to.key2', }, convertDotsInJsonKeysToUnderscores: true, // default false }); ``` If you want to customize the parameters of the HIVE_JSON input format, specify it like so: ```ts const inputFormat = new firehose.HiveJsonInputFormat({ timestampParsers: [firehose.TimestampParser.fromFormatSring('yyyy-MM-dd')], // in Joda Time pattern https://www.joda.org/joda-time/key_format.html }); ``` If you want to customize the parameters of the PARQUET output format, specify it like so: ```ts const outputFormat = new firehose.ParquetOutputFormat({ blockSize: core.Size.mebibytes(512), compression: firehose.ParquetCompression.UNCOMPRESSED, enableDictionaryCompression: true, maxPadding: core.Size.bytes(10), pageSize: core.Size.mebibytes(2), writerVersion: firehose.ParquetWriterVersion.V2, }); ``` If you want to customize the parameters of the ORC output format, specify it like so: ```ts const outputFormat = new firehose.OrcOutputFormat({ formatVersion: firehose.OrcFormatVersion.V0_11, blockSize: core.Size.mebibytes(256), compression: firehose.OrcCompression.NONE, bloomFilterColumns: ['columnA'], bloomFilterFalsePositiveProbability: 0.1, dictionaryKeyThreshold: 0.7, enablePadding: true, paddingTolerance: 0.2, rowIndexStride: 9000, stripeSize: core.Size.mebibytes(32), }); ``` Changelist: - Add support for record format conversion - Readme updates ### Describe any new or updated permissions being added The following permissions are added to the S3 Destination role when `dataFormatConversion` is set: ```json { "Effect": "Allow", "Action": [ "glue:GetTable", "glue:GetTableVersion", "glue:GetTableVersions" ], "Resource": [ "arn:aws:glue:region:account-id:catalog", "arn:aws:glue:region:account-id:database/databaseName", "arn:aws:glue:region:account-id:table/databaseName/tableName", ] } ``` ```json { "Effect": "Allow", "Action": "glue:GetSchemaVersion", "Resource": "*" } ``` The permissions are acquired from the aws docs https://docs.aws.amazon.com/firehose/latest/dev/controlling-access.html#using-iam-glue, though these docs are a bit misleading. It specifies `table-arn` as the resource, but you need to give permissions to the database and catalog as well. See https://docs.aws.amazon.com/glue/latest/dg/glue-specifying-resource-arns.html >Actions on a table, partition, or table version require permission on the table, database, and catalog. ### Description of how you validated changes Added unit test file and integration test. Unit tests: - Split across tests for Schema creation, Input format creation, output format creation. - Tests for policies added when creating a schema Integ test: - Creates several delivery streams with different configurations, and uploads the same JSON input. Successful outputs are written to a single S3 prefix; the assertions expect X number of files if there are X delivery streams. ### Checklist - [x] My code adheres to the [CONTRIBUTING GUIDE](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md) and [DESIGN GUIDELINES](https://github.com/aws/aws-cdk/blob/main/docs/DESIGN_GUIDELINES.md) ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
1 parent dbd7626 commit 79bcba2

File tree

33 files changed

+41421
-10
lines changed

33 files changed

+41421
-10
lines changed

packages/@aws-cdk-testing/framework-integ/test/aws-kinesisfirehose/test/integ.record-format-conversion-schema.js.snapshot/RecordFormatConversionSchema.assets.json

Lines changed: 34 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)