Skip to content

Commit

Permalink
feat: validate airbyte metadata added to iceberg schema (#48604)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev authored Nov 22, 2024
1 parent d6ad2c2 commit 5db1bb1
Show file tree
Hide file tree
Showing 12 changed files with 609 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
package io.airbyte.cdk.load.data

import io.airbyte.cdk.load.message.DestinationRecord
import java.util.LinkedHashMap
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test

class AirbyteTypeToAirbyteTypeWithMetaTest {
internal class AirbyteTypeToAirbyteTypeWithMetaTest {
private val expectedMeta =
linkedMapOf(
DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID to FieldType(StringType, nullable = false),
Expand Down Expand Up @@ -46,6 +46,7 @@ class AirbyteTypeToAirbyteTypeWithMetaTest {
FieldType(IntegerType, nullable = false)
)

@Test
fun testWithoutFlattening() {
val schema =
ObjectType(
Expand All @@ -56,11 +57,13 @@ class AirbyteTypeToAirbyteTypeWithMetaTest {
)
)
val withMeta = schema.withAirbyteMeta(flatten = false)
val expected = LinkedHashMap(expectedMeta)
expected[DestinationRecord.Meta.COLUMN_NAME_DATA] = FieldType(schema, nullable = false)
Assertions.assertEquals(expected, withMeta)
val expected = ObjectType(expectedMeta)
expected.properties[DestinationRecord.Meta.COLUMN_NAME_DATA] =
FieldType(schema, nullable = false)
assertEquals(expected, withMeta)
}

@Test
fun testWithFlattening() {
val schema =
ObjectType(
Expand All @@ -71,8 +74,8 @@ class AirbyteTypeToAirbyteTypeWithMetaTest {
)
)
val withMeta = schema.withAirbyteMeta(flatten = true)
val expected = LinkedHashMap(expectedMeta)
schema.properties.forEach { (name, field) -> expected[name] = field }
Assertions.assertEquals(expected, withMeta)
val expected = ObjectType(expectedMeta)
schema.properties.forEach { (name, field) -> expected.properties[name] = field }
assertEquals(expected, withMeta)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,9 @@ fun ObjectType.toIcebergSchema(primaryKeys: List<List<String>>): Schema {
val mutableListOf = mutableListOf<NestedField>()
val identifierFields = mutableSetOf<Int>()
val identifierFieldNames = primaryKeys.flatten().toSet()

val icebergTypeConverter = AirbyteTypeToIcebergSchema()
this.properties.entries.forEach { (name, field) ->
val id = UUID.randomUUID().hashCode()
val id = generatedSchemaFieldId()
mutableListOf.add(
NestedField.of(
id,
Expand All @@ -114,3 +113,5 @@ fun ObjectType.toIcebergSchema(primaryKeys: List<List<String>>): Schema {
}
return Schema(mutableListOf, identifierFields)
}

private fun generatedSchemaFieldId() = UUID.randomUUID().hashCode()
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c
dockerImageTag: 0.1.3
dockerImageTag: 0.1.4
dockerRepository: airbyte/destination-iceberg-v2
githubIssueLabel: destination-iceberg-v2
icon: s3.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import io.airbyte.cdk.load.write.StreamLoader
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableCleaner
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableWriterFactory
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil.constructGenerationIdSuffix
import io.github.oshai.kotlinlogging.KotlinLogging
import org.apache.iceberg.Table

Expand All @@ -25,6 +24,7 @@ class IcebergStreamLoader(
override val stream: DestinationStream,
private val table: Table,
private val icebergTableWriterFactory: IcebergTableWriterFactory,
private val icebergUtil: IcebergUtil,
private val pipeline: MapperPipeline,
private val stagingBranchName: String,
private val mainBranchName: String
Expand All @@ -36,12 +36,12 @@ class IcebergStreamLoader(
totalSizeBytes: Long
): Batch {
icebergTableWriterFactory
.create(table = table, generationId = constructGenerationIdSuffix(stream))
.create(table = table, generationId = icebergUtil.constructGenerationIdSuffix(stream))
.use { writer ->
log.info { "Writing records to branch $stagingBranchName" }
records.forEach { record ->
val icebergRecord =
IcebergUtil.toRecord(
icebergUtil.toRecord(
record = record,
stream = stream,
tableSchema = table.schema(),
Expand Down Expand Up @@ -77,8 +77,10 @@ class IcebergStreamLoader(
table.manageSnapshots().fastForwardBranch(mainBranchName, stagingBranchName).commit()
if (stream.minimumGenerationId > 0) {
val generationIdsToDelete =
(0 until stream.minimumGenerationId).map { constructGenerationIdSuffix(it) }
val icebergTableCleaner = IcebergTableCleaner()
(0 until stream.minimumGenerationId).map(
icebergUtil::constructGenerationIdSuffix
)
val icebergTableCleaner = IcebergTableCleaner(icebergUtil = icebergUtil)
icebergTableCleaner.deleteGenerationId(
table,
DEFAULT_STAGING_BRANCH,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@

package io.airbyte.integrations.destination.iceberg.v2

import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergSchema
import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory
import io.airbyte.cdk.load.data.withAirbyteMeta
import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableWriterFactory
Expand All @@ -20,21 +17,17 @@ import org.apache.iceberg.Schema
class IcebergV2Writer(
private val icebergTableWriterFactory: IcebergTableWriterFactory,
private val icebergConfiguration: IcebergV2Configuration,
private val icebergUtil: IcebergUtil,
) : DestinationWriter {

override fun createStreamLoader(stream: DestinationStream): StreamLoader {
val properties =
IcebergUtil.toCatalogProperties(icebergConfiguration = icebergConfiguration)
val catalog = IcebergUtil.createCatalog(DEFAULT_CATALOG_NAME, properties)
icebergUtil.toCatalogProperties(icebergConfiguration = icebergConfiguration)
val catalog = icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, properties)
val pipeline = ParquetMapperPipelineFactory().create(stream)
val primaryKeys =
when (stream.importType) {
is Dedupe -> (stream.importType as Dedupe).primaryKey
else -> emptyList()
}
val schema = pipeline.finalSchema.withAirbyteMeta(true).toIcebergSchema(primaryKeys)
val schema = icebergUtil.toIcebergSchema(stream = stream, pipeline = pipeline)
val table =
IcebergUtil.createTable(
icebergUtil.createTable(
streamDescriptor = stream.descriptor,
catalog = catalog,
schema = schema,
Expand All @@ -47,6 +40,7 @@ class IcebergV2Writer(
stream = stream,
table = table,
icebergTableWriterFactory = icebergTableWriterFactory,
icebergUtil = icebergUtil,
pipeline = pipeline,
stagingBranchName = DEFAULT_STAGING_BRANCH,
mainBranchName = icebergConfiguration.nessieServerConfiguration.mainBranchName,
Expand All @@ -58,13 +52,13 @@ class IcebergV2Writer(
catalogSchema
.asStruct()
.fields()
.map { Triple(it.name(), it.type(), it.isOptional) }
.map { Triple(it.name(), it.type().typeId(), it.isOptional) }
.toSet()
val existingFieldSet =
tableSchema
.asStruct()
.fields()
.map { Triple(it.name(), it.type(), it.isOptional) }
.map { Triple(it.name(), it.type().typeId(), it.isOptional) }
.toSet()

val missingInIncoming = existingFieldSet - incomingFieldSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.integrations.destination.iceberg.v2.io

import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil.assertGenerationIdSuffixIsOfValidFormat
import jakarta.inject.Singleton
import org.apache.iceberg.Table
import org.apache.iceberg.catalog.Catalog
Expand All @@ -17,7 +16,7 @@ import org.apache.iceberg.io.SupportsPrefixOperations
* catalog implementations do not clear the underlying files written to table storage.
*/
@Singleton
class IcebergTableCleaner {
class IcebergTableCleaner(private val icebergUtil: IcebergUtil) {

/**
* Clears the table identified by the provided [TableIdentifier]. This removes all data and
Expand Down Expand Up @@ -50,7 +49,7 @@ class IcebergTableCleaner {
val genIdsToDelete =
generationIdSuffix
.filter {
assertGenerationIdSuffixIsOfValidFormat(it)
icebergUtil.assertGenerationIdSuffixIsOfValidFormat(it)
true
}
.toSet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.integrations.destination.iceberg.v2.io

import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil.assertGenerationIdSuffixIsOfValidFormat
import jakarta.inject.Singleton
import java.util.UUID
import org.apache.iceberg.FileFormat
Expand All @@ -26,15 +25,15 @@ import org.apache.iceberg.util.PropertyUtil
* and whether primary keys are configured on the destination table's schema.
*/
@Singleton
class IcebergTableWriterFactory {
class IcebergTableWriterFactory(private val icebergUtil: IcebergUtil) {
/**
* Creates a new [BaseTaskWriter] based on the configuration of the destination target [Table].
*
* @param table An Iceberg [Table]
* @return The [BaseTaskWriter] that writes records to the target [Table].
*/
fun create(table: Table, generationId: String): BaseTaskWriter<Record> {
assertGenerationIdSuffixIsOfValidFormat(generationId)
icebergUtil.assertGenerationIdSuffixIsOfValidFormat(generationId)
val format =
FileFormat.valueOf(
table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@

package io.airbyte.integrations.destination.iceberg.v2.io

import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.ImportType
import io.airbyte.cdk.load.data.MapperPipeline
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergRecord
import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergSchema
import io.airbyte.cdk.load.data.withAirbyteMeta
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.integrations.destination.iceberg.v2.IcebergV2Configuration
import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.inject.Singleton
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.CatalogProperties
import org.apache.iceberg.CatalogProperties.URI
Expand All @@ -32,6 +37,8 @@ import org.apache.iceberg.data.Record

private val logger = KotlinLogging.logger {}

const val AIRBYTE_CDC_DELETE_COLUMN = "_ab_cdc_deleted_at"

/**
* Extension function for the[DestinationStream.Descriptor] class that converts the descriptor to an
* Iceberg [TableIdentifier].
Expand All @@ -43,10 +50,12 @@ fun DestinationStream.Descriptor.toIcebergTableIdentifier(): TableIdentifier {
}

/** Collection of Iceberg related utilities. */
object IcebergUtil {
@Singleton
class IcebergUtil {
internal class InvalidFormatException(message: String) : Exception(message)

private val generationIdRegex = Regex("""ab-generation-id-\d+-e""")

fun assertGenerationIdSuffixIsOfValidFormat(generationId: String) {
if (!generationIdRegex.matches(generationId)) {
throw InvalidFormatException(
Expand Down Expand Up @@ -142,7 +151,7 @@ object IcebergUtil {
// TODO figure out how to detect the actual operation value
return RecordWrapper(
delegate = dataMapped.toIcebergRecord(tableSchema),
operation = Operation.INSERT
operation = getOperation(record = record, importType = stream.importType)
)
}

Expand Down Expand Up @@ -180,9 +189,33 @@ object IcebergUtil {
}
}

fun toIcebergSchema(stream: DestinationStream, pipeline: MapperPipeline): Schema {
val primaryKeys =
when (stream.importType) {
is Dedupe -> (stream.importType as Dedupe).primaryKey
else -> emptyList()
}
return pipeline.finalSchema.withAirbyteMeta(true).toIcebergSchema(primaryKeys)
}

private fun getSortOrder(schema: Schema): SortOrder {
val builder = SortOrder.builderFor(schema)
schema.identifierFieldNames().forEach { builder.asc(it) }
return builder.build()
}

private fun getOperation(
record: DestinationRecord,
importType: ImportType,
): Operation =
if (
record.data is ObjectValue &&
(record.data as ObjectValue).values[AIRBYTE_CDC_DELETE_COLUMN] != null
) {
Operation.DELETE
} else if (importType is Dedupe) {
Operation.UPDATE
} else {
Operation.INSERT
}
}
Loading

0 comments on commit 5db1bb1

Please sign in to comment.