-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: validate airbyte metadata added to iceberg schema #48604
feat: validate airbyte metadata added to iceberg schema #48604
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
this looks a lot like https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToAirbyteTypeWithMeta.kt ? which we can maybe reuse in some way |
} | ||
} | ||
|
||
private fun createAirbyteMetadataFields(): List<NestedField> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of these columns are being added to the schema using the mapper architecture. if you take a look at
val schema = pipeline.finalSchema.withAirbyteMeta(true).toIcebergSchema(primaryKeys)
In IcebergV2Writer. The withAirbyteMeta
method is adding a bunch of em. If we want to add more like the generation id , etc. we should use the same design.
...-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt
Outdated
Show resolved
Hide resolved
@edgao It does and I wasn't sure how to reuse that given that it returns Airbyte types and not Iceberg types. I suspect what is really being pointed out here is some refactor to this in the CDK to make the common parts more common. |
PR has been updated to validate that the metadata is added to the schema. I have removed the code that I originally added to do this, as the existing logic already adds the fields. This is confirmed via new tests added to this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, just a small question about the deleted_at detection
// the deletion status. This should be revisited when there is a cheaper way to do | ||
// this, such | ||
// as after a protocol change that explicitly states the operation. | ||
// if (record.data.toJson().get(AIRBYTE_CDC_DELETE_COLUMN) != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do something like this to dodge the serialize?
(record.data as ObjectValue).properties[DELETE_COLUMN] != null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@edgao Sure? I'll test it out and update the PR if it works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@edgao That appeared to work. I will update the PR to include the DELETE operation detection based on your suggestion.
What
How
withMetadata()
is calledIcebergUtil
to a Micronaut singleton to better facilitate testingReview guide
IcebergUtil.kt
IcebergUtilTest.kt
IcebergV2WriterTest.kt
AirbyteTypeToAirbyteTypeWithMetaTest.kt
Can this PR be safely reverted and rolled back?