Skip to content

Conversation

@Kasra-G
Copy link
Contributor

@Kasra-G Kasra-G commented Sep 4, 2025

Issue #

Closes #15501.

Reason for this change

From the Kinesis Firehose RFC, the ability to define record format conversion is still missing
https://github.com/aws/aws-cdk-rfcs/blob/main/text/0340-firehose-l2.md#record-format-conversion-using-aws-glue

Description of changes

See my comment in the issue thread #15501 (comment) for a few more details about the changes

These changes add several classes and data models to support Firehose's record format conversion feature with the L2 DeliveryStream construct, as specified in the RFC.

The main focus for the design is to allow configurability of the underlying settings while still providing sane defaults for the 99% of users that won't touch the advanced settings.

One note is that the RFC shows usage of the (as of now) alpha glue.Table construct. Since those constructs are not yet in stable release, we should support glue.CfnTable for now and provide a way to later use the L2 construct whenever that is released without any breaking changes.

With these changes merged, users can specify record format conversion like so:

import * as glue from '@aws-cdk/aws-glue';
import * as s3 from '@aws-cdk/aws-s3';
import * as destinations from '@aws-cdk/aws-kinesisfirehose-destinations';

declare const myGlueSchemaTable: glue.CfnTable;
declare const bucket: s3.Bucket;

new firehose.DeliveryStream(this, 'Delivery Stream', {
  destination: new destinations.S3Bucket(bucket, {
    dataFormatConversion: {
      schemaConfiguration: firehose.SchemaConfiguration.fromCfnTable(myGlueSchemaTable),
      inputFormat: firehose.InputFormat.OPENX_JSON,
      outputFormat: firehose.OuputFormat.PARQUET,
    },
  }),
});

If you want to customize the parameters of the OPENX_JSON input format, specify it like so:

const inputFormat = new OpenXJsonInputFormat({
  lowercaseColumnNames: false, // default true
  columnToJsonKeyMappings: {
    'columnA': 'path.to.key',
    'columnB': 'path.to.key2',
  },
  convertDotsInJsonKeysToUnderscores: true, // default false
});

If you want to customize the parameters of the HIVE_JSON input format, specify it like so:

const inputFormat = new firehose.HiveJsonInputFormat({
  timestampParsers: [firehose.TimestampParser.fromFormatSring('yyyy-MM-dd')], // in Joda Time pattern https://www.joda.org/joda-time/key_format.html
});

If you want to customize the parameters of the PARQUET output format, specify it like so:

const outputFormat = new firehose.ParquetOutputFormat({
  blockSize: core.Size.mebibytes(512),
  compression: firehose.ParquetCompression.UNCOMPRESSED,
  enableDictionaryCompression: true,
  maxPadding: core.Size.bytes(10),
  pageSize: core.Size.mebibytes(2),
  writerVersion: firehose.ParquetWriterVersion.V2,
});

If you want to customize the parameters of the ORC output format, specify it like so:

const outputFormat = new firehose.OrcOutputFormat({
  formatVersion: firehose.OrcFormatVersion.V0_11,
  blockSize: core.Size.mebibytes(256),
  compression: firehose.OrcCompression.NONE,
  bloomFilterColumns: ['columnA'],
  bloomFilterFalsePositiveProbability: 0.1,
  dictionaryKeyThreshold: 0.7,
  enablePadding: true,
  paddingTolerance: 0.2,
  rowIndexStride: 9000,
  stripeSize: core.Size.mebibytes(32),
});

Changelist:

  • Add support for record format conversion
  • Readme updates

Describe any new or updated permissions being added

The following permissions are added to the S3 Destination role when dataFormatConversion is set:

{
    "Effect": "Allow",
    "Action": [
        "glue:GetTable",
        "glue:GetTableVersion",
        "glue:GetTableVersions"
    ],
    "Resource": [
        "arn:aws:glue:region:account-id:catalog",
        "arn:aws:glue:region:account-id:database/databaseName",
        "arn:aws:glue:region:account-id:table/databaseName/tableName",
    ]
}
{
    "Effect": "Allow",
    "Action":  "glue:GetSchemaVersion",
    "Resource": "*"
}

The permissions are acquired from the aws docs https://docs.aws.amazon.com/firehose/latest/dev/controlling-access.html#using-iam-glue, though these docs are a bit misleading. It specifies table-arn as the resource, but you need to give permissions to the database and catalog as well. See https://docs.aws.amazon.com/glue/latest/dg/glue-specifying-resource-arns.html

Actions on a table, partition, or table version require permission on the table, database, and catalog.

Description of how you validated changes

Added unit test file and integration test.

Unit tests:

  • Split across tests for Schema creation, Input format creation, output format creation.
  • Tests for policies added when creating a schema

Integ test:

  • Creates several delivery streams with different configurations, and uploads the same JSON input. Successful outputs are written to a single S3 prefix; the assertions expect X number of files if there are X delivery streams.

Checklist


By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license

@aws-cdk-automation aws-cdk-automation requested a review from a team September 4, 2025 07:50
@github-actions github-actions bot added effort/medium Medium work item – several days of effort feature-request A feature should be added or improved. p1 beginning-contributor [Pilot] contributed between 0-2 PRs to the CDK labels Sep 4, 2025
Copy link
Collaborator

@aws-cdk-automation aws-cdk-automation left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This review is outdated)

@Kasra-G Kasra-G changed the title feat(kinesisfirehose): support L2 record format conversion feat(kinesisfirehose): support DeliveryStream record format conversion for S3 Bucket Destination Sep 4, 2025
@Kasra-G

This comment was marked as outdated.

@Kasra-G Kasra-G force-pushed the ft/kinesis-firehose/record-format-conversion branch 3 times, most recently from f936570 to d57bd01 Compare September 5, 2025 21:11
@aws-cdk-automation aws-cdk-automation dismissed their stale review September 6, 2025 16:36

✅ Updated pull request passes all PRLinter validations. Dismissing previous PRLinter review.

@Kasra-G
Copy link
Contributor Author

Kasra-G commented Sep 6, 2025

I'd consider this to be in a reviewable state now. A few things I am thinking of adding:

  • Schema.fromColumns() - create schema table and database automatically from given columns
  • Cover more input/output format configs in integ tests
  • Making the Compression setting set on the S3 Destination map to the Compression setting used in the OutputFormat.
    • Currently these are two separate fields, with some validation. This mirrors the Delivery stream Cloudformation spec, but may not be intuitive to users.

@Kasra-G Kasra-G marked this pull request as ready for review September 6, 2025 18:49
@aws-cdk-automation aws-cdk-automation added the pr/needs-maintainer-review This PR needs a review from a Core Team Member label Sep 6, 2025
replace IDeserializer and ISerializer with IInputFormat and
IOutputFormat

rename DataFormatConversionSchema to ConversionSchema

Rename bind() to render(), add validations and remove placeholder types
to ParquetOutputFormatProps

add OrcOutputFormat validations

create HiveJson and OpenXJson validation/wrapper classes

add comments to Hive Json props and run linter [skip actions]

add comments for OpenX JSON and rename ConversionSchema to Schema

add some more comments

separate record format conversion changes to different files

some linting

add some more validation when record data format conversion is enabled

initial README changes

Add comments for parquet props

add more comments on Schema and Parquet OutputFormat

added comments for ORC format

update README with ORC sample props

update fixture

add exports

fix jsii errors

fix build errors

fix rosetta errors

rename core to cdk

move some validation to s3 destination instantiation

add unit tests

add tests for schema iam policy creation

fix default value being incorrect when data formatting enabled

fix permission issue and remove invalid DEFAULT timestamp parser

add integ test

update permissions
@Kasra-G Kasra-G force-pushed the ft/kinesis-firehose/record-format-conversion branch from cefc863 to 9a756a2 Compare September 8, 2025 16:10
@Kasra-G

This comment was marked as outdated.

@kumvprat kumvprat self-assigned this Sep 30, 2025
@aws-cdk-automation aws-cdk-automation added the pr/needs-further-review PR requires additional review from our team specialists due to the scope or complexity of changes. label Sep 30, 2025
Copy link
Contributor

@kumvprat kumvprat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @Kasra-G

I have added a couple of inline comments.

Since this is for native conversion of input records from one format to another in Firehose, should this be limited to Glue tables as source ?
Maybe it can be applied to any source of Firehose, I don't have full context here so feel free to share some documentation that you might have.

@Kasra-G
Copy link
Contributor Author

Kasra-G commented Oct 15, 2025

Maybe it can be applied to any source of Firehose

Yes, the record format conversion can run on any input source for Firehose (data streams, direct put, etc)
https://docs.aws.amazon.com/firehose/latest/dev/create-transform.html

You can use a lambda that runs on each record that is input into the DeliveryStream, which will then convert the input json to an output json. But this does not effect the file format that firehose uses to write to s3.

The record format conversion is specifically to write parquet/orc files to s3 instead of json, and would run after the lambda (though the lambda is optional). This is useful for analytics use cases since the parquet/orc files are much more efficient to query than jsonl, and for saving space since the compression is typically better.

@Kasra-G
Copy link
Contributor Author

Kasra-G commented Oct 15, 2025

Curious: any thoughts on the Schema class implementation?
As a reminder, the only thing we need from a Schema is these properties:

{
  tableName: tableName,
  databaseName: databaseName,
  region: region, // region of the table
  catalogId: catalogId, // this is just the account id of the table
  versionId: versionId, // this can be passed in or default to 'LATEST'
};

This is cloudformation requirement.

If the AWS Glue L2 constructs were in stable release, we could just do an implementation that follows the RFC:

dataFormatConversionConfiguration: {
  schema: myGlueTable,
  inputFormat: destinations.InputFormat.OPENX_JSON,
  outputFormat: destinations.OuputFormat.PARQUET,
},

However glue constructs are not in stable release so I cannot do this. Additionally I can see some abstractions we can provide for format conversion, such as just providing a list of columns and types, so that the glue table/database creation can be done behind the scenes. This is a level of abstraction that even the console doesn't provide, however.

I wonder if the new ITableRef would be something that could be used for this. That would allow us to use the same function for both L1 and L2 glue tables.
Seems ITableRef does not expose the table arn so that would not work either.

I am thinking of this kind of usage: Schema.fromTableArn(arn). That leaves open the possibility for a later release, once glue constructs are stable, to create a Schema.fromGlueTable(table). This is probably an okay middle ground to settle on.

@kumvprat
Copy link
Contributor

For the comment : #35410 (comment)

I think the documentation here : https://docs.aws.amazon.com/firehose/latest/APIReference/API_DataFormatConversionConfiguration.html, calls out that it should be Glue catalog table. And the code change done here also point to the same fact.

My assumption : S3 is the only data sink that supports this record conversion natively and it can be used by any input source format(direct puts, streams,etc.).
I am not well-versed with Firehose so I might be confusing some terms.

So basically any data source can use this BUT the main requirement is to use Glue catalog to store the schema and S3 as destination, do correct me if I am wrong here.

Curious: any thoughts on the Schema class implementation?

I think SchemaProperties/SchemaConfiguration will be a better fit since we are using metadata to identify the schema but not the schema itself.
I am debating if this class is needed, since we can use the fromCfnTable() to create the construct, or better yet make the constructor private so that users are forced to have glue table to use this functionality. (?)

Overall the class looks good with potential renaming.

Additionally I can see some abstractions we can provide for format conversion, such as just providing a list of columns and types, so that the glue table/database creation can be done behind the scenes.

This is a great idea, but maybe the original destination of this would be glue package itself rather than firehose.
Once stable we can use glue's constructs here and give a helper function(like your suggestion) to automatically create glue catalog table and use that internally.

@Kasra-G

This comment was marked as outdated.

@Kasra-G

This comment was marked as resolved.

@kumvprat
Copy link
Contributor

I do see other PR builds running fine, will try by updating your branch with main

@Kasra-G
Copy link
Contributor Author

Kasra-G commented Oct 20, 2025

Still getting the eslint OOM error. gonna retry the build with a merge. I see other pr builds failing with the same error, I wonder if the actions worker needs more memory

Copy link
Contributor

@kumvprat kumvprat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment on the helper class

Rest looks good

@aws-cdk-automation aws-cdk-automation removed the pr/needs-maintainer-review This PR needs a review from a Core Team Member label Oct 21, 2025
@mergify mergify bot dismissed kumvprat’s stale review October 21, 2025 06:52

Pull request has been modified.

Copy link
Contributor

@kumvprat kumvprat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Thank you for the work on this

@mergify
Copy link
Contributor

mergify bot commented Oct 21, 2025

Thank you for contributing! Your pull request will be updated from main and then merged automatically (do not update manually, and be sure to allow changes to be pushed to your fork).

@mergify mergify bot added the queued label Oct 21, 2025
@mergify
Copy link
Contributor

mergify bot commented Oct 21, 2025

Thank you for contributing! Your pull request will be updated from main and then merged automatically (do not update manually, and be sure to allow changes to be pushed to your fork).

@mergify mergify bot merged commit 79bcba2 into aws:main Oct 21, 2025
19 of 20 checks passed
@mergify mergify bot removed the queued label Oct 21, 2025
@github-actions
Copy link
Contributor

Comments on closed issues and PRs are hard for our team to see.
If you need help, please open a new issue that references this one.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Oct 21, 2025
@Kasra-G Kasra-G deleted the ft/kinesis-firehose/record-format-conversion branch October 21, 2025 09:33
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

beginning-contributor [Pilot] contributed between 0-2 PRs to the CDK effort/medium Medium work item – several days of effort feature-request A feature should be added or improved. p1 pr/needs-further-review PR requires additional review from our team specialists due to the scope or complexity of changes.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

(aws-kinesisfirehose): Record format conversion with AWS Glue for DeliveryStream S3 destination

3 participants