Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: validate airbyte metadata added to iceberg schema #48604

Merged
merged 6 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
package io.airbyte.cdk.load.data

import io.airbyte.cdk.load.message.DestinationRecord
import java.util.LinkedHashMap
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test

class AirbyteTypeToAirbyteTypeWithMetaTest {
internal class AirbyteTypeToAirbyteTypeWithMetaTest {
private val expectedMeta =
linkedMapOf(
DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID to FieldType(StringType, nullable = false),
Expand Down Expand Up @@ -46,6 +46,7 @@ class AirbyteTypeToAirbyteTypeWithMetaTest {
FieldType(IntegerType, nullable = false)
)

@Test
fun testWithoutFlattening() {
val schema =
ObjectType(
Expand All @@ -56,11 +57,13 @@ class AirbyteTypeToAirbyteTypeWithMetaTest {
)
)
val withMeta = schema.withAirbyteMeta(flatten = false)
val expected = LinkedHashMap(expectedMeta)
expected[DestinationRecord.Meta.COLUMN_NAME_DATA] = FieldType(schema, nullable = false)
Assertions.assertEquals(expected, withMeta)
val expected = ObjectType(expectedMeta)
expected.properties[DestinationRecord.Meta.COLUMN_NAME_DATA] =
FieldType(schema, nullable = false)
assertEquals(expected, withMeta)
}

@Test
fun testWithFlattening() {
val schema =
ObjectType(
Expand All @@ -71,8 +74,8 @@ class AirbyteTypeToAirbyteTypeWithMetaTest {
)
)
val withMeta = schema.withAirbyteMeta(flatten = true)
val expected = LinkedHashMap(expectedMeta)
schema.properties.forEach { (name, field) -> expected[name] = field }
Assertions.assertEquals(expected, withMeta)
val expected = ObjectType(expectedMeta)
schema.properties.forEach { (name, field) -> expected.properties[name] = field }
assertEquals(expected, withMeta)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,9 @@ fun ObjectType.toIcebergSchema(primaryKeys: List<List<String>>): Schema {
val mutableListOf = mutableListOf<NestedField>()
val identifierFields = mutableSetOf<Int>()
val identifierFieldNames = primaryKeys.flatten().toSet()

val icebergTypeConverter = AirbyteTypeToIcebergSchema()
this.properties.entries.forEach { (name, field) ->
val id = UUID.randomUUID().hashCode()
val id = generatedSchemaFieldId()
mutableListOf.add(
NestedField.of(
id,
Expand All @@ -114,3 +113,5 @@ fun ObjectType.toIcebergSchema(primaryKeys: List<List<String>>): Schema {
}
return Schema(mutableListOf, identifierFields)
}

private fun generatedSchemaFieldId() = UUID.randomUUID().hashCode()
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c
dockerImageTag: 0.1.3
dockerImageTag: 0.1.4
dockerRepository: airbyte/destination-iceberg-v2
githubIssueLabel: destination-iceberg-v2
icon: s3.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import io.airbyte.cdk.load.write.StreamLoader
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableCleaner
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableWriterFactory
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil.constructGenerationIdSuffix
import io.github.oshai.kotlinlogging.KotlinLogging
import org.apache.iceberg.Table

Expand All @@ -25,6 +24,7 @@ class IcebergStreamLoader(
override val stream: DestinationStream,
private val table: Table,
private val icebergTableWriterFactory: IcebergTableWriterFactory,
private val icebergUtil: IcebergUtil,
private val pipeline: MapperPipeline,
private val stagingBranchName: String,
private val mainBranchName: String
Expand All @@ -36,12 +36,12 @@ class IcebergStreamLoader(
totalSizeBytes: Long
): Batch {
icebergTableWriterFactory
.create(table = table, generationId = constructGenerationIdSuffix(stream))
.create(table = table, generationId = icebergUtil.constructGenerationIdSuffix(stream))
.use { writer ->
log.info { "Writing records to branch $stagingBranchName" }
records.forEach { record ->
val icebergRecord =
IcebergUtil.toRecord(
icebergUtil.toRecord(
record = record,
stream = stream,
tableSchema = table.schema(),
Expand Down Expand Up @@ -77,8 +77,10 @@ class IcebergStreamLoader(
table.manageSnapshots().fastForwardBranch(mainBranchName, stagingBranchName).commit()
if (stream.minimumGenerationId > 0) {
val generationIdsToDelete =
(0 until stream.minimumGenerationId).map { constructGenerationIdSuffix(it) }
val icebergTableCleaner = IcebergTableCleaner()
(0 until stream.minimumGenerationId).map(
icebergUtil::constructGenerationIdSuffix
)
val icebergTableCleaner = IcebergTableCleaner(icebergUtil = icebergUtil)
icebergTableCleaner.deleteGenerationId(
table,
DEFAULT_STAGING_BRANCH,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@

package io.airbyte.integrations.destination.iceberg.v2

import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergSchema
import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory
import io.airbyte.cdk.load.data.withAirbyteMeta
import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableWriterFactory
Expand All @@ -20,21 +17,17 @@ import org.apache.iceberg.Schema
class IcebergV2Writer(
private val icebergTableWriterFactory: IcebergTableWriterFactory,
private val icebergConfiguration: IcebergV2Configuration,
private val icebergUtil: IcebergUtil,
) : DestinationWriter {

override fun createStreamLoader(stream: DestinationStream): StreamLoader {
val properties =
IcebergUtil.toCatalogProperties(icebergConfiguration = icebergConfiguration)
val catalog = IcebergUtil.createCatalog(DEFAULT_CATALOG_NAME, properties)
icebergUtil.toCatalogProperties(icebergConfiguration = icebergConfiguration)
val catalog = icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, properties)
val pipeline = ParquetMapperPipelineFactory().create(stream)
val primaryKeys =
when (stream.importType) {
is Dedupe -> (stream.importType as Dedupe).primaryKey
else -> emptyList()
}
val schema = pipeline.finalSchema.withAirbyteMeta(true).toIcebergSchema(primaryKeys)
val schema = icebergUtil.toIcebergSchema(stream = stream, pipeline = pipeline)
val table =
IcebergUtil.createTable(
icebergUtil.createTable(
streamDescriptor = stream.descriptor,
catalog = catalog,
schema = schema,
Expand All @@ -47,6 +40,7 @@ class IcebergV2Writer(
stream = stream,
table = table,
icebergTableWriterFactory = icebergTableWriterFactory,
icebergUtil = icebergUtil,
pipeline = pipeline,
stagingBranchName = DEFAULT_STAGING_BRANCH,
mainBranchName = icebergConfiguration.nessieServerConfiguration.mainBranchName,
Expand All @@ -58,13 +52,13 @@ class IcebergV2Writer(
catalogSchema
.asStruct()
.fields()
.map { Triple(it.name(), it.type(), it.isOptional) }
.map { Triple(it.name(), it.type().typeId(), it.isOptional) }
.toSet()
val existingFieldSet =
tableSchema
.asStruct()
.fields()
.map { Triple(it.name(), it.type(), it.isOptional) }
.map { Triple(it.name(), it.type().typeId(), it.isOptional) }
.toSet()

val missingInIncoming = existingFieldSet - incomingFieldSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.integrations.destination.iceberg.v2.io

import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil.assertGenerationIdSuffixIsOfValidFormat
import jakarta.inject.Singleton
import org.apache.iceberg.Table
import org.apache.iceberg.catalog.Catalog
Expand All @@ -17,7 +16,7 @@ import org.apache.iceberg.io.SupportsPrefixOperations
* catalog implementations do not clear the underlying files written to table storage.
*/
@Singleton
class IcebergTableCleaner {
class IcebergTableCleaner(private val icebergUtil: IcebergUtil) {

/**
* Clears the table identified by the provided [TableIdentifier]. This removes all data and
Expand Down Expand Up @@ -50,7 +49,7 @@ class IcebergTableCleaner {
val genIdsToDelete =
generationIdSuffix
.filter {
assertGenerationIdSuffixIsOfValidFormat(it)
icebergUtil.assertGenerationIdSuffixIsOfValidFormat(it)
true
}
.toSet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.integrations.destination.iceberg.v2.io

import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil.assertGenerationIdSuffixIsOfValidFormat
import jakarta.inject.Singleton
import java.util.UUID
import org.apache.iceberg.FileFormat
Expand All @@ -26,15 +25,15 @@ import org.apache.iceberg.util.PropertyUtil
* and whether primary keys are configured on the destination table's schema.
*/
@Singleton
class IcebergTableWriterFactory {
class IcebergTableWriterFactory(private val icebergUtil: IcebergUtil) {
/**
* Creates a new [BaseTaskWriter] based on the configuration of the destination target [Table].
*
* @param table An Iceberg [Table]
* @return The [BaseTaskWriter] that writes records to the target [Table].
*/
fun create(table: Table, generationId: String): BaseTaskWriter<Record> {
assertGenerationIdSuffixIsOfValidFormat(generationId)
icebergUtil.assertGenerationIdSuffixIsOfValidFormat(generationId)
val format =
FileFormat.valueOf(
table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@

package io.airbyte.integrations.destination.iceberg.v2.io

import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.ImportType
import io.airbyte.cdk.load.data.MapperPipeline
import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergRecord
import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergSchema
import io.airbyte.cdk.load.data.withAirbyteMeta
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.integrations.destination.iceberg.v2.IcebergV2Configuration
import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.inject.Singleton
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.CatalogProperties
import org.apache.iceberg.CatalogProperties.URI
Expand All @@ -32,6 +36,8 @@ import org.apache.iceberg.data.Record

private val logger = KotlinLogging.logger {}

const val AIRBYTE_CDC_DELETE_COLUMN = "_ab_cdc_deleted_at"

/**
* Extension function for the[DestinationStream.Descriptor] class that converts the descriptor to an
* Iceberg [TableIdentifier].
Expand All @@ -43,10 +49,12 @@ fun DestinationStream.Descriptor.toIcebergTableIdentifier(): TableIdentifier {
}

/** Collection of Iceberg related utilities. */
object IcebergUtil {
@Singleton
class IcebergUtil {
internal class InvalidFormatException(message: String) : Exception(message)

private val generationIdRegex = Regex("""ab-generation-id-\d+-e""")

fun assertGenerationIdSuffixIsOfValidFormat(generationId: String) {
if (!generationIdRegex.matches(generationId)) {
throw InvalidFormatException(
Expand Down Expand Up @@ -142,7 +150,7 @@ object IcebergUtil {
// TODO figure out how to detect the actual operation value
return RecordWrapper(
delegate = dataMapped.toIcebergRecord(tableSchema),
operation = Operation.INSERT
operation = getOperation(record = record, importType = stream.importType)
)
}

Expand Down Expand Up @@ -180,9 +188,36 @@ object IcebergUtil {
}
}

fun toIcebergSchema(stream: DestinationStream, pipeline: MapperPipeline): Schema {
val primaryKeys =
when (stream.importType) {
is Dedupe -> (stream.importType as Dedupe).primaryKey
else -> emptyList()
}
return pipeline.finalSchema.withAirbyteMeta(true).toIcebergSchema(primaryKeys)
}

private fun getSortOrder(schema: Schema): SortOrder {
val builder = SortOrder.builderFor(schema)
schema.identifierFieldNames().forEach { builder.asc(it) }
return builder.build()
}

private fun getOperation(
@Suppress("UNUSED_PARAMETER") record: DestinationRecord,
importType: ImportType,
): Operation =
// TODO This disabled out of caution due to the potential cost of serializing every record
// to check
// the deletion status. This should be revisited when there is a cheaper way to do
// this, such
// as after a protocol change that explicitly states the operation.
// if (record.data.toJson().get(AIRBYTE_CDC_DELETE_COLUMN) != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do something like this to dodge the serialize?

(record.data as ObjectValue).properties[DELETE_COLUMN] != null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgao Sure? I'll test it out and update the PR if it works

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgao That appeared to work. I will update the PR to include the DELETE operation detection based on your suggestion.

// Operation.DELETE
// } else
if (importType is Dedupe) {
Operation.UPDATE
} else {
Operation.INSERT
}
}
Loading
Loading