From 5db1bb16a7042bf671a026e8db4ea3382ef3a796 Mon Sep 17 00:00:00 2001 From: Jonathan Pearlin Date: Fri, 22 Nov 2024 13:57:15 -0500 Subject: [PATCH] feat: validate airbyte metadata added to iceberg schema (#48604) --- .../AirbyteTypeToAirbyteTypeWithMetaTest.kt | 21 +- .../parquet/AirbyteTypeToIcebergSchema.kt | 5 +- .../destination-iceberg-v2/metadata.yaml | 2 +- .../iceberg/v2/IcebergStreamLoader.kt | 12 +- .../destination/iceberg/v2/IcebergV2Writer.kt | 22 +- .../iceberg/v2/io/IcebergTableCleaner.kt | 5 +- .../v2/io/IcebergTableWriterFactory.kt | 5 +- .../destination/iceberg/v2/io/IcebergUtil.kt | 37 ++- .../iceberg/v2/IcebergV2WriterTest.kt | 314 ++++++++++++++++++ .../iceberg/v2/io/IcebergTableCleanerTest.kt | 26 +- .../v2/io/IcebergTableWriterFactoryTest.kt | 36 +- .../iceberg/v2/io/IcebergUtilTest.kt | 199 ++++++++++- 12 files changed, 609 insertions(+), 75 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriterTest.kt diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToAirbyteTypeWithMetaTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToAirbyteTypeWithMetaTest.kt index 685c34823d01..af6aa4740ece 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToAirbyteTypeWithMetaTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToAirbyteTypeWithMetaTest.kt @@ -5,10 +5,10 @@ package io.airbyte.cdk.load.data import io.airbyte.cdk.load.message.DestinationRecord -import java.util.LinkedHashMap -import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test -class AirbyteTypeToAirbyteTypeWithMetaTest { +internal class AirbyteTypeToAirbyteTypeWithMetaTest { private val expectedMeta = linkedMapOf( DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID to FieldType(StringType, nullable = false), @@ -46,6 +46,7 @@ class AirbyteTypeToAirbyteTypeWithMetaTest { FieldType(IntegerType, nullable = false) ) + @Test fun testWithoutFlattening() { val schema = ObjectType( @@ -56,11 +57,13 @@ class AirbyteTypeToAirbyteTypeWithMetaTest { ) ) val withMeta = schema.withAirbyteMeta(flatten = false) - val expected = LinkedHashMap(expectedMeta) - expected[DestinationRecord.Meta.COLUMN_NAME_DATA] = FieldType(schema, nullable = false) - Assertions.assertEquals(expected, withMeta) + val expected = ObjectType(expectedMeta) + expected.properties[DestinationRecord.Meta.COLUMN_NAME_DATA] = + FieldType(schema, nullable = false) + assertEquals(expected, withMeta) } + @Test fun testWithFlattening() { val schema = ObjectType( @@ -71,8 +74,8 @@ class AirbyteTypeToAirbyteTypeWithMetaTest { ) ) val withMeta = schema.withAirbyteMeta(flatten = true) - val expected = LinkedHashMap(expectedMeta) - schema.properties.forEach { (name, field) -> expected[name] = field } - Assertions.assertEquals(expected, withMeta) + val expected = ObjectType(expectedMeta) + schema.properties.forEach { (name, field) -> expected.properties[name] = field } + assertEquals(expected, withMeta) } } diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteTypeToIcebergSchema.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteTypeToIcebergSchema.kt index 1ec297f53e13..2da7d8ac0e0d 100644 --- a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteTypeToIcebergSchema.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteTypeToIcebergSchema.kt @@ -96,10 +96,9 @@ fun ObjectType.toIcebergSchema(primaryKeys: List>): Schema { val mutableListOf = mutableListOf() val identifierFields = mutableSetOf() val identifierFieldNames = primaryKeys.flatten().toSet() - val icebergTypeConverter = AirbyteTypeToIcebergSchema() this.properties.entries.forEach { (name, field) -> - val id = UUID.randomUUID().hashCode() + val id = generatedSchemaFieldId() mutableListOf.add( NestedField.of( id, @@ -114,3 +113,5 @@ fun ObjectType.toIcebergSchema(primaryKeys: List>): Schema { } return Schema(mutableListOf, identifierFields) } + +private fun generatedSchemaFieldId() = UUID.randomUUID().hashCode() diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml b/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml index c02cfb67e1cf..f308b663d750 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c - dockerImageTag: 0.1.3 + dockerImageTag: 0.1.4 dockerRepository: airbyte/destination-iceberg-v2 githubIssueLabel: destination-iceberg-v2 icon: s3.svg diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt index d080cf7c471b..1578756933fb 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt @@ -16,7 +16,6 @@ import io.airbyte.cdk.load.write.StreamLoader import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableCleaner import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableWriterFactory import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil -import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil.constructGenerationIdSuffix import io.github.oshai.kotlinlogging.KotlinLogging import org.apache.iceberg.Table @@ -25,6 +24,7 @@ class IcebergStreamLoader( override val stream: DestinationStream, private val table: Table, private val icebergTableWriterFactory: IcebergTableWriterFactory, + private val icebergUtil: IcebergUtil, private val pipeline: MapperPipeline, private val stagingBranchName: String, private val mainBranchName: String @@ -36,12 +36,12 @@ class IcebergStreamLoader( totalSizeBytes: Long ): Batch { icebergTableWriterFactory - .create(table = table, generationId = constructGenerationIdSuffix(stream)) + .create(table = table, generationId = icebergUtil.constructGenerationIdSuffix(stream)) .use { writer -> log.info { "Writing records to branch $stagingBranchName" } records.forEach { record -> val icebergRecord = - IcebergUtil.toRecord( + icebergUtil.toRecord( record = record, stream = stream, tableSchema = table.schema(), @@ -77,8 +77,10 @@ class IcebergStreamLoader( table.manageSnapshots().fastForwardBranch(mainBranchName, stagingBranchName).commit() if (stream.minimumGenerationId > 0) { val generationIdsToDelete = - (0 until stream.minimumGenerationId).map { constructGenerationIdSuffix(it) } - val icebergTableCleaner = IcebergTableCleaner() + (0 until stream.minimumGenerationId).map( + icebergUtil::constructGenerationIdSuffix + ) + val icebergTableCleaner = IcebergTableCleaner(icebergUtil = icebergUtil) icebergTableCleaner.deleteGenerationId( table, DEFAULT_STAGING_BRANCH, diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Writer.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Writer.kt index 2da2d4cb1288..c2cdf54bbfd6 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Writer.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Writer.kt @@ -4,11 +4,8 @@ package io.airbyte.integrations.destination.iceberg.v2 -import io.airbyte.cdk.load.command.Dedupe import io.airbyte.cdk.load.command.DestinationStream -import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergSchema import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory -import io.airbyte.cdk.load.data.withAirbyteMeta import io.airbyte.cdk.load.write.DestinationWriter import io.airbyte.cdk.load.write.StreamLoader import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableWriterFactory @@ -20,21 +17,17 @@ import org.apache.iceberg.Schema class IcebergV2Writer( private val icebergTableWriterFactory: IcebergTableWriterFactory, private val icebergConfiguration: IcebergV2Configuration, + private val icebergUtil: IcebergUtil, ) : DestinationWriter { override fun createStreamLoader(stream: DestinationStream): StreamLoader { val properties = - IcebergUtil.toCatalogProperties(icebergConfiguration = icebergConfiguration) - val catalog = IcebergUtil.createCatalog(DEFAULT_CATALOG_NAME, properties) + icebergUtil.toCatalogProperties(icebergConfiguration = icebergConfiguration) + val catalog = icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, properties) val pipeline = ParquetMapperPipelineFactory().create(stream) - val primaryKeys = - when (stream.importType) { - is Dedupe -> (stream.importType as Dedupe).primaryKey - else -> emptyList() - } - val schema = pipeline.finalSchema.withAirbyteMeta(true).toIcebergSchema(primaryKeys) + val schema = icebergUtil.toIcebergSchema(stream = stream, pipeline = pipeline) val table = - IcebergUtil.createTable( + icebergUtil.createTable( streamDescriptor = stream.descriptor, catalog = catalog, schema = schema, @@ -47,6 +40,7 @@ class IcebergV2Writer( stream = stream, table = table, icebergTableWriterFactory = icebergTableWriterFactory, + icebergUtil = icebergUtil, pipeline = pipeline, stagingBranchName = DEFAULT_STAGING_BRANCH, mainBranchName = icebergConfiguration.nessieServerConfiguration.mainBranchName, @@ -58,13 +52,13 @@ class IcebergV2Writer( catalogSchema .asStruct() .fields() - .map { Triple(it.name(), it.type(), it.isOptional) } + .map { Triple(it.name(), it.type().typeId(), it.isOptional) } .toSet() val existingFieldSet = tableSchema .asStruct() .fields() - .map { Triple(it.name(), it.type(), it.isOptional) } + .map { Triple(it.name(), it.type().typeId(), it.isOptional) } .toSet() val missingInIncoming = existingFieldSet - incomingFieldSet diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergTableCleaner.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergTableCleaner.kt index 7311d2798bff..fb664641de53 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergTableCleaner.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergTableCleaner.kt @@ -4,7 +4,6 @@ package io.airbyte.integrations.destination.iceberg.v2.io -import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil.assertGenerationIdSuffixIsOfValidFormat import jakarta.inject.Singleton import org.apache.iceberg.Table import org.apache.iceberg.catalog.Catalog @@ -17,7 +16,7 @@ import org.apache.iceberg.io.SupportsPrefixOperations * catalog implementations do not clear the underlying files written to table storage. */ @Singleton -class IcebergTableCleaner { +class IcebergTableCleaner(private val icebergUtil: IcebergUtil) { /** * Clears the table identified by the provided [TableIdentifier]. This removes all data and @@ -50,7 +49,7 @@ class IcebergTableCleaner { val genIdsToDelete = generationIdSuffix .filter { - assertGenerationIdSuffixIsOfValidFormat(it) + icebergUtil.assertGenerationIdSuffixIsOfValidFormat(it) true } .toSet() diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergTableWriterFactory.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergTableWriterFactory.kt index 698263aac3b0..1760d154b75f 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergTableWriterFactory.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergTableWriterFactory.kt @@ -4,7 +4,6 @@ package io.airbyte.integrations.destination.iceberg.v2.io -import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil.assertGenerationIdSuffixIsOfValidFormat import jakarta.inject.Singleton import java.util.UUID import org.apache.iceberg.FileFormat @@ -26,7 +25,7 @@ import org.apache.iceberg.util.PropertyUtil * and whether primary keys are configured on the destination table's schema. */ @Singleton -class IcebergTableWriterFactory { +class IcebergTableWriterFactory(private val icebergUtil: IcebergUtil) { /** * Creates a new [BaseTaskWriter] based on the configuration of the destination target [Table]. * @@ -34,7 +33,7 @@ class IcebergTableWriterFactory { * @return The [BaseTaskWriter] that writes records to the target [Table]. */ fun create(table: Table, generationId: String): BaseTaskWriter { - assertGenerationIdSuffixIsOfValidFormat(generationId) + icebergUtil.assertGenerationIdSuffixIsOfValidFormat(generationId) val format = FileFormat.valueOf( table diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt index 5f8ca349f649..052e3b1d5996 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt @@ -4,13 +4,18 @@ package io.airbyte.integrations.destination.iceberg.v2.io +import io.airbyte.cdk.load.command.Dedupe import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.ImportType import io.airbyte.cdk.load.data.MapperPipeline +import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergRecord +import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergSchema import io.airbyte.cdk.load.data.withAirbyteMeta import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.integrations.destination.iceberg.v2.IcebergV2Configuration import io.github.oshai.kotlinlogging.KotlinLogging +import jakarta.inject.Singleton import org.apache.hadoop.conf.Configuration import org.apache.iceberg.CatalogProperties import org.apache.iceberg.CatalogProperties.URI @@ -32,6 +37,8 @@ import org.apache.iceberg.data.Record private val logger = KotlinLogging.logger {} +const val AIRBYTE_CDC_DELETE_COLUMN = "_ab_cdc_deleted_at" + /** * Extension function for the[DestinationStream.Descriptor] class that converts the descriptor to an * Iceberg [TableIdentifier]. @@ -43,10 +50,12 @@ fun DestinationStream.Descriptor.toIcebergTableIdentifier(): TableIdentifier { } /** Collection of Iceberg related utilities. */ -object IcebergUtil { +@Singleton +class IcebergUtil { internal class InvalidFormatException(message: String) : Exception(message) private val generationIdRegex = Regex("""ab-generation-id-\d+-e""") + fun assertGenerationIdSuffixIsOfValidFormat(generationId: String) { if (!generationIdRegex.matches(generationId)) { throw InvalidFormatException( @@ -142,7 +151,7 @@ object IcebergUtil { // TODO figure out how to detect the actual operation value return RecordWrapper( delegate = dataMapped.toIcebergRecord(tableSchema), - operation = Operation.INSERT + operation = getOperation(record = record, importType = stream.importType) ) } @@ -180,9 +189,33 @@ object IcebergUtil { } } + fun toIcebergSchema(stream: DestinationStream, pipeline: MapperPipeline): Schema { + val primaryKeys = + when (stream.importType) { + is Dedupe -> (stream.importType as Dedupe).primaryKey + else -> emptyList() + } + return pipeline.finalSchema.withAirbyteMeta(true).toIcebergSchema(primaryKeys) + } + private fun getSortOrder(schema: Schema): SortOrder { val builder = SortOrder.builderFor(schema) schema.identifierFieldNames().forEach { builder.asc(it) } return builder.build() } + + private fun getOperation( + record: DestinationRecord, + importType: ImportType, + ): Operation = + if ( + record.data is ObjectValue && + (record.data as ObjectValue).values[AIRBYTE_CDC_DELETE_COLUMN] != null + ) { + Operation.DELETE + } else if (importType is Dedupe) { + Operation.UPDATE + } else { + Operation.INSERT + } } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriterTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriterTest.kt new file mode 100644 index 000000000000..05b718f3bbb1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriterTest.kt @@ -0,0 +1,314 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.v2 + +import io.airbyte.cdk.load.command.Append +import io.airbyte.cdk.load.command.Dedupe +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.aws.AWSAccessKeyConfiguration +import io.airbyte.cdk.load.command.iceberg.parquet.NessieServerConfiguration +import io.airbyte.cdk.load.command.s3.S3BucketConfiguration +import io.airbyte.cdk.load.command.s3.S3BucketRegion +import io.airbyte.cdk.load.data.FieldType +import io.airbyte.cdk.load.data.IntegerType +import io.airbyte.cdk.load.data.MapperPipeline +import io.airbyte.cdk.load.data.ObjectType +import io.airbyte.cdk.load.data.StringType +import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergSchema +import io.airbyte.cdk.load.data.withAirbyteMeta +import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_EXTRACTED_AT +import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_GENERATION_ID +import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_META +import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_RAW_ID +import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableWriterFactory +import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil +import io.mockk.every +import io.mockk.mockk +import org.apache.iceberg.Schema +import org.apache.iceberg.Table +import org.apache.iceberg.catalog.Catalog +import org.apache.iceberg.types.Types +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows + +internal class IcebergV2WriterTest { + + @Test + fun testCreateStreamLoader() { + val streamDescriptor = DestinationStream.Descriptor(namespace = "namespace", name = "name") + val stream = + DestinationStream( + descriptor = streamDescriptor, + importType = Append, + schema = + ObjectType( + linkedMapOf( + "id" to FieldType(IntegerType, nullable = true), + "name" to FieldType(StringType, nullable = true), + ) + ), + generationId = 1, + minimumGenerationId = 1, + syncId = 1, + ) + val icebergSchema = + Schema( + Types.NestedField.of(1, true, "id", Types.LongType.get()), + Types.NestedField.of(2, true, "name", Types.StringType.get()), + Types.NestedField.of(3, false, COLUMN_NAME_AB_RAW_ID, Types.StringType.get()), + Types.NestedField.of(4, false, COLUMN_NAME_AB_EXTRACTED_AT, Types.LongType.get()), + Types.NestedField.of( + 5, + false, + COLUMN_NAME_AB_META, + Types.StructType.of( + Types.NestedField.of(6, false, "sync_id", Types.LongType.get()), + Types.NestedField.of( + 7, + false, + "changes", + Types.ListType.ofRequired( + 8, + Types.StructType.of( + Types.NestedField.of(9, false, "field", Types.StringType.get()), + Types.NestedField.of( + 10, + false, + "change", + Types.StringType.get() + ), + Types.NestedField.of( + 11, + false, + "reason", + Types.StringType.get() + ) + ) + ) + ) + ) + ), + Types.NestedField.of(12, false, COLUMN_NAME_AB_GENERATION_ID, Types.LongType.get()), + ) + val icebergTableWriterFactory: IcebergTableWriterFactory = mockk() + val awsConfiguration: AWSAccessKeyConfiguration = mockk { + every { accessKeyId } returns "access-key" + every { secretAccessKey } returns "secret-access-key" + } + val bucketConfiguration: S3BucketConfiguration = mockk { + every { s3BucketRegion } returns S3BucketRegion.`us-east-1` + every { s3BucketName } returns "bucket" + every { s3Endpoint } returns "http://localhost:8080" + } + val nessieConfiguration: NessieServerConfiguration = mockk { + every { accessToken } returns "access-token" + every { mainBranchName } returns "main" + every { serverUri } returns "http://localhost:8080/api/v1" + every { warehouseLocation } returns "s3://bucket/" + } + val icebergConfiguration: IcebergV2Configuration = mockk { + every { awsAccessKeyConfiguration } returns awsConfiguration + every { nessieServerConfiguration } returns nessieConfiguration + every { s3BucketConfiguration } returns bucketConfiguration + } + val catalog: Catalog = mockk() + val table: Table = mockk { every { schema() } returns icebergSchema } + val icebergUtil: IcebergUtil = mockk { + every { createCatalog(any(), any()) } returns catalog + every { createTable(any(), any(), any(), any()) } returns table + every { toCatalogProperties(any()) } returns mapOf() + every { toIcebergSchema(any(), any()) } answers + { + val pipeline = secondArg() as MapperPipeline + pipeline.finalSchema.withAirbyteMeta(true).toIcebergSchema(emptyList()) + } + } + val icebergV2Writer = + IcebergV2Writer( + icebergTableWriterFactory = icebergTableWriterFactory, + icebergConfiguration = icebergConfiguration, + icebergUtil = icebergUtil, + ) + val streamLoader = icebergV2Writer.createStreamLoader(stream = stream) + assertNotNull(streamLoader) + } + + @Test + fun testCreateStreamLoaderWithMismatchedSchemas() { + val streamDescriptor = DestinationStream.Descriptor(namespace = "namespace", name = "name") + val stream = + DestinationStream( + descriptor = streamDescriptor, + importType = Append, + schema = + ObjectType( + linkedMapOf( + "id" to FieldType(IntegerType, nullable = true), + "name" to FieldType(StringType, nullable = true), + ) + ), + generationId = 1, + minimumGenerationId = 1, + syncId = 1, + ) + val icebergSchema = + Schema( + Types.NestedField.of(2, true, "name", Types.StringType.get()), + ) + val icebergTableWriterFactory: IcebergTableWriterFactory = mockk() + val awsConfiguration: AWSAccessKeyConfiguration = mockk { + every { accessKeyId } returns "access-key" + every { secretAccessKey } returns "secret-access-key" + } + val bucketConfiguration: S3BucketConfiguration = mockk { + every { s3BucketRegion } returns S3BucketRegion.`us-east-1` + every { s3BucketName } returns "bucket" + every { s3Endpoint } returns "http://localhost:8080" + } + val nessieConfiguration: NessieServerConfiguration = mockk { + every { accessToken } returns "access-token" + every { mainBranchName } returns "main" + every { serverUri } returns "http://localhost:8080/api/v1" + every { warehouseLocation } returns "s3://bucket/" + } + val icebergConfiguration: IcebergV2Configuration = mockk { + every { awsAccessKeyConfiguration } returns awsConfiguration + every { nessieServerConfiguration } returns nessieConfiguration + every { s3BucketConfiguration } returns bucketConfiguration + } + val catalog: Catalog = mockk() + val table: Table = mockk { every { schema() } returns icebergSchema } + val icebergUtil: IcebergUtil = mockk { + every { createCatalog(any(), any()) } returns catalog + every { createTable(any(), any(), any(), any()) } returns table + every { toCatalogProperties(any()) } returns mapOf() + every { toIcebergSchema(any(), any()) } answers + { + val pipeline = secondArg() as MapperPipeline + pipeline.finalSchema.withAirbyteMeta(true).toIcebergSchema(emptyList()) + } + } + val icebergV2Writer = + IcebergV2Writer( + icebergTableWriterFactory = icebergTableWriterFactory, + icebergConfiguration = icebergConfiguration, + icebergUtil = icebergUtil, + ) + val e = + assertThrows { + icebergV2Writer.createStreamLoader(stream = stream) + } + assertTrue( + e.message?.startsWith("Table schema fields are different than catalog schema") ?: false + ) + } + + @Test + fun testCreateStreamLoaderMismatchedPrimaryKeys() { + val primaryKeys = listOf("id") + val streamDescriptor = DestinationStream.Descriptor(namespace = "namespace", name = "name") + val stream = + DestinationStream( + descriptor = streamDescriptor, + importType = Dedupe(primaryKey = listOf(primaryKeys), cursor = primaryKeys), + schema = + ObjectType( + linkedMapOf( + "id" to FieldType(IntegerType, nullable = false), + "name" to FieldType(StringType, nullable = true), + ) + ), + generationId = 1, + minimumGenerationId = 1, + syncId = 1, + ) + val columns = + listOf( + Types.NestedField.of(1, false, "id", Types.LongType.get()), + Types.NestedField.of(2, true, "name", Types.StringType.get()), + Types.NestedField.of(3, false, COLUMN_NAME_AB_RAW_ID, Types.StringType.get()), + Types.NestedField.of(4, false, COLUMN_NAME_AB_EXTRACTED_AT, Types.LongType.get()), + Types.NestedField.of( + 5, + false, + COLUMN_NAME_AB_META, + Types.StructType.of( + Types.NestedField.of(6, false, "sync_id", Types.LongType.get()), + Types.NestedField.of( + 7, + false, + "changes", + Types.ListType.ofRequired( + 8, + Types.StructType.of( + Types.NestedField.of(9, false, "field", Types.StringType.get()), + Types.NestedField.of( + 10, + false, + "change", + Types.StringType.get() + ), + Types.NestedField.of( + 11, + false, + "reason", + Types.StringType.get() + ) + ) + ) + ) + ) + ), + Types.NestedField.of(12, false, COLUMN_NAME_AB_GENERATION_ID, Types.LongType.get()), + ) + val icebergSchema = Schema(columns, emptySet()) + val icebergTableWriterFactory: IcebergTableWriterFactory = mockk() + val awsConfiguration: AWSAccessKeyConfiguration = mockk { + every { accessKeyId } returns "access-key" + every { secretAccessKey } returns "secret-access-key" + } + val bucketConfiguration: S3BucketConfiguration = mockk { + every { s3BucketRegion } returns S3BucketRegion.`us-east-1` + every { s3BucketName } returns "bucket" + every { s3Endpoint } returns "http://localhost:8080" + } + val nessieConfiguration: NessieServerConfiguration = mockk { + every { accessToken } returns "access-token" + every { mainBranchName } returns "main" + every { serverUri } returns "http://localhost:8080/api/v1" + every { warehouseLocation } returns "s3://bucket/" + } + val icebergConfiguration: IcebergV2Configuration = mockk { + every { awsAccessKeyConfiguration } returns awsConfiguration + every { nessieServerConfiguration } returns nessieConfiguration + every { s3BucketConfiguration } returns bucketConfiguration + } + val catalog: Catalog = mockk() + val table: Table = mockk { every { schema() } returns icebergSchema } + val icebergUtil: IcebergUtil = mockk { + every { createCatalog(any(), any()) } returns catalog + every { createTable(any(), any(), any(), any()) } returns table + every { toCatalogProperties(any()) } returns mapOf() + every { toIcebergSchema(any(), any()) } answers + { + val pipeline = secondArg() as MapperPipeline + pipeline.finalSchema.withAirbyteMeta(true).toIcebergSchema(listOf(primaryKeys)) + } + } + val icebergV2Writer = + IcebergV2Writer( + icebergTableWriterFactory = icebergTableWriterFactory, + icebergConfiguration = icebergConfiguration, + icebergUtil = icebergUtil, + ) + val e = + assertThrows { + icebergV2Writer.createStreamLoader(stream = stream) + } + assertTrue(e.message?.startsWith("Identifier fields are different") ?: false) + } +} diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergTableCleanerTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergTableCleanerTest.kt index 70aaf4344370..e2df9ebb3b2a 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergTableCleanerTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergTableCleanerTest.kt @@ -19,7 +19,6 @@ import org.apache.iceberg.catalog.Catalog import org.apache.iceberg.catalog.TableIdentifier import org.apache.iceberg.io.CloseableIterable import org.apache.iceberg.io.CloseableIterator -import org.apache.iceberg.io.CloseableIterator.empty import org.apache.iceberg.io.FileIO import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow @@ -29,11 +28,12 @@ internal class IcebergTableCleanerTest { @Test fun testClearingTableWithPrefix() { val catalog: Catalog = mockk { every { dropTable(any(), true) } returns true } + val icebergUtil: IcebergUtil = mockk() val tableIdentifier: TableIdentifier = mockk() val fileIo: S3FileIO = mockk { every { deletePrefix(any()) } returns Unit } val tableLocation = "table/location" - val cleaner = IcebergTableCleaner() + val cleaner = IcebergTableCleaner(icebergUtil = icebergUtil) cleaner.clearTable( catalog = catalog, @@ -49,11 +49,12 @@ internal class IcebergTableCleanerTest { @Test fun testClearingTableWithoutPrefix() { val catalog: Catalog = mockk { every { dropTable(any(), true) } returns true } + val icebergUtil: IcebergUtil = mockk() val tableIdentifier: TableIdentifier = mockk() val fileIo: FileIO = mockk() val tableLocation = "table/location" - val cleaner = IcebergTableCleaner() + val cleaner = IcebergTableCleaner(icebergUtil = icebergUtil) cleaner.clearTable( catalog = catalog, @@ -68,7 +69,10 @@ internal class IcebergTableCleanerTest { @Test fun `deleteGenerationId handles empty scan results gracefully`() { - val cleaner = IcebergTableCleaner() + val icebergUtil: IcebergUtil = mockk { + every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit + } + val cleaner = IcebergTableCleaner(icebergUtil = icebergUtil) val generationIdSuffix = "ab-generation-id-0-e" val tasks = CloseableIterable.empty() @@ -83,7 +87,10 @@ internal class IcebergTableCleanerTest { @Test fun `deleteGenerationId deletes matching file via deleteFile`() { - val cleaner = IcebergTableCleaner() + val icebergUtil: IcebergUtil = mockk { + every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit + } + val cleaner = IcebergTableCleaner(icebergUtil = icebergUtil) val generationIdSuffix = "ab-generation-id-0-e" val filePathToDelete = "path/to/gen-5678/foo-bar-ab-generation-id-0-e.parquet" val fileScanTask = mockk() @@ -107,7 +114,7 @@ internal class IcebergTableCleanerTest { } verify { - IcebergUtil.assertGenerationIdSuffixIsOfValidFormat(generationIdSuffix) + icebergUtil.assertGenerationIdSuffixIsOfValidFormat(generationIdSuffix) table.newDelete().toBranch(eq("staging")) delete.deleteFile(fileScanTask.file().path()) delete.commit() @@ -116,7 +123,10 @@ internal class IcebergTableCleanerTest { @Test fun `deleteGenerationId should not delete non matching file via deleteFile`() { - val cleaner = IcebergTableCleaner() + val icebergUtil: IcebergUtil = mockk { + every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit + } + val cleaner = IcebergTableCleaner(icebergUtil = icebergUtil) val generationIdSuffix = "ab-generation-id-10-e" val filePathToDelete = "path/to/gen-5678/foo-bar-ab-generation-id-10-e.parquet" val fileScanTask = mockk() @@ -140,7 +150,7 @@ internal class IcebergTableCleanerTest { } verify(exactly = 0) { - IcebergUtil.assertGenerationIdSuffixIsOfValidFormat(generationIdSuffix) + icebergUtil.assertGenerationIdSuffixIsOfValidFormat(generationIdSuffix) table.newDelete().toBranch(any()) delete.deleteFile(any()) delete.commit() diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergTableWriterFactoryTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergTableWriterFactoryTest.kt index ef341b77063c..64b6bae01770 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergTableWriterFactoryTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergTableWriterFactoryTest.kt @@ -23,6 +23,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test internal class IcebergTableWriterFactoryTest { + private val generationIdSuffix: String = "ab-generation-id-0-e" @Test @@ -64,8 +65,11 @@ internal class IcebergTableWriterFactoryTest { every { schema() } returns tableSchema every { spec() } returns tableSpec } + val icebergUtil: IcebergUtil = mockk { + every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit + } - val factory = IcebergTableWriterFactory() + val factory = IcebergTableWriterFactory(icebergUtil = icebergUtil) val writer = factory.create(table = table, generationId = generationIdSuffix) assertNotNull(writer) assertEquals(PartitionedDeltaWriter::class.java, writer.javaClass) @@ -110,13 +114,13 @@ internal class IcebergTableWriterFactoryTest { every { schema() } returns tableSchema every { spec() } returns tableSpec } + val icebergUtil: IcebergUtil = mockk { + every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit + } - val factory = IcebergTableWriterFactory() + val factory = IcebergTableWriterFactory(icebergUtil = icebergUtil) val writer = - factory.create( - table = table, - generationId = IcebergUtil.constructGenerationIdSuffix(Random.nextLong(100)) - ) + factory.create(table = table, generationId = "ab-generation-id-${Random.nextLong(100)}") assertNotNull(writer) assertEquals(UnpartitionedDeltaWriter::class.java, writer.javaClass) } @@ -159,13 +163,13 @@ internal class IcebergTableWriterFactoryTest { every { schema() } returns tableSchema every { spec() } returns tableSpec } + val icebergUtil: IcebergUtil = mockk { + every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit + } - val factory = IcebergTableWriterFactory() + val factory = IcebergTableWriterFactory(icebergUtil = icebergUtil) val writer = - factory.create( - table = table, - generationId = IcebergUtil.constructGenerationIdSuffix(Random.nextLong(100)) - ) + factory.create(table = table, generationId = "ab-generation-id-${Random.nextLong(100)}") assertNotNull(writer) assertEquals(PartitionedAppendWriter::class.java, writer.javaClass) } @@ -208,13 +212,13 @@ internal class IcebergTableWriterFactoryTest { every { schema() } returns tableSchema every { spec() } returns tableSpec } + val icebergUtil: IcebergUtil = mockk { + every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit + } - val factory = IcebergTableWriterFactory() + val factory = IcebergTableWriterFactory(icebergUtil = icebergUtil) val writer = - factory.create( - table = table, - generationId = IcebergUtil.constructGenerationIdSuffix(Random.nextLong(100)) - ) + factory.create(table = table, generationId = "ab-generation-id-${Random.nextLong(100)}") assertNotNull(writer) assertEquals(UnpartitionedAppendWriter::class.java, writer.javaClass) } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt index d20ac502e919..82c4641f003f 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.iceberg.v2.io import io.airbyte.cdk.load.command.Append +import io.airbyte.cdk.load.command.Dedupe import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.command.aws.AWSAccessKeyConfiguration import io.airbyte.cdk.load.command.iceberg.parquet.NessieServerConfiguration @@ -17,8 +18,13 @@ import io.airbyte.cdk.load.data.ObjectType import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.data.StringType import io.airbyte.cdk.load.data.StringValue +import io.airbyte.cdk.load.data.TimestampValue import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_EXTRACTED_AT +import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_GENERATION_ID +import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_META +import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_RAW_ID import io.airbyte.integrations.destination.iceberg.v2.IcebergV2Configuration import io.mockk.every import io.mockk.mockk @@ -37,12 +43,20 @@ import org.apache.iceberg.nessie.NessieCatalog import org.apache.iceberg.types.Types import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows internal class IcebergUtilTest { + private lateinit var icebergUtil: IcebergUtil + + @BeforeEach + fun setup() { + icebergUtil = IcebergUtil() + } + @Test fun testCreateCatalog() { val catalogName = "test-catalog" @@ -52,7 +66,7 @@ internal class IcebergUtilTest { URI to "http://localhost:19120/api/v1", WAREHOUSE_LOCATION to "s3://test/" ) - val catalog = IcebergUtil.createCatalog(catalogName = catalogName, properties = properties) + val catalog = icebergUtil.createCatalog(catalogName = catalogName, properties = properties) assertNotNull(catalog) assertEquals(catalogName, catalog.name()) assertEquals(NessieCatalog::class.java, catalog.javaClass) @@ -78,7 +92,7 @@ internal class IcebergUtilTest { every { tableExists(streamDescriptor.toIcebergTableIdentifier()) } returns false } val table = - IcebergUtil.createTable( + icebergUtil.createTable( streamDescriptor = streamDescriptor, catalog = catalog, schema = schema, @@ -110,7 +124,7 @@ internal class IcebergUtilTest { every { tableExists(streamDescriptor.toIcebergTableIdentifier()) } returns false } val table = - IcebergUtil.createTable( + icebergUtil.createTable( streamDescriptor = streamDescriptor, catalog = catalog, schema = schema, @@ -134,7 +148,7 @@ internal class IcebergUtilTest { every { tableExists(streamDescriptor.toIcebergTableIdentifier()) } returns true } val table = - IcebergUtil.createTable( + icebergUtil.createTable( streamDescriptor = streamDescriptor, catalog = catalog, schema = schema, @@ -148,7 +162,7 @@ internal class IcebergUtilTest { } @Test - fun testConvertAirbyteRecordToIcebergRecord() { + fun testConvertAirbyteRecordToIcebergRecordInsert() { val streamDescriptor = DestinationStream.Descriptor(namespace = "namespace", name = "name") val airbyteStream = DestinationStream( @@ -184,7 +198,7 @@ internal class IcebergUtilTest { ) val schema = Schema(columns) val icebergRecord = - IcebergUtil.toRecord( + icebergUtil.toRecord( record = airbyteRecord, pipeline = pipeline, tableSchema = schema, @@ -195,6 +209,106 @@ internal class IcebergUtilTest { assertEquals(Operation.INSERT, (icebergRecord as RecordWrapper).operation) } + @Test + fun testConvertAirbyteRecordToIcebergRecordDelete() { + val streamDescriptor = DestinationStream.Descriptor(namespace = "namespace", name = "name") + val airbyteStream = + DestinationStream( + descriptor = streamDescriptor, + importType = Append, + schema = + ObjectType( + linkedMapOf( + "id" to FieldType(IntegerType, nullable = true), + "name" to FieldType(StringType, nullable = true), + ) + ), + generationId = 1, + minimumGenerationId = 1, + syncId = 1, + ) + val airbyteRecord = + DestinationRecord( + stream = airbyteStream.descriptor, + data = + ObjectValue( + linkedMapOf( + "id" to IntegerValue(42L), + "name" to StringValue("John Doe"), + AIRBYTE_CDC_DELETE_COLUMN to TimestampValue("2024-01-01T00:00:00Z"), + ) + ), + emittedAtMs = System.currentTimeMillis(), + meta = DestinationRecord.Meta(), + serialized = "{\"id\":42, \"name\":\"John Doe\"}" + ) + val pipeline = ParquetMapperPipelineFactory().create(airbyteStream) + val columns = + mutableListOf( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "name", Types.StringType.get()), + ) + val schema = Schema(columns, setOf(1)) + val icebergRecord = + icebergUtil.toRecord( + record = airbyteRecord, + pipeline = pipeline, + tableSchema = schema, + stream = airbyteStream + ) + assertNotNull(icebergRecord) + assertEquals(RecordWrapper::class.java, icebergRecord.javaClass) + assertEquals(Operation.DELETE, (icebergRecord as RecordWrapper).operation) + } + + @Test + fun testConvertAirbyteRecordToIcebergRecordUpdate() { + val streamDescriptor = DestinationStream.Descriptor(namespace = "namespace", name = "name") + val airbyteStream = + DestinationStream( + descriptor = streamDescriptor, + importType = Dedupe(primaryKey = listOf(listOf("id")), cursor = listOf("id")), + schema = + ObjectType( + linkedMapOf( + "id" to FieldType(IntegerType, nullable = false), + "name" to FieldType(StringType, nullable = true), + ) + ), + generationId = 1, + minimumGenerationId = 1, + syncId = 1, + ) + val airbyteRecord = + DestinationRecord( + stream = airbyteStream.descriptor, + data = + ObjectValue( + linkedMapOf("id" to IntegerValue(42L), "name" to StringValue("John Doe")) + ), + emittedAtMs = System.currentTimeMillis(), + meta = DestinationRecord.Meta(), + serialized = "{\"id\":42, \"name\":\"John Doe\"}" + ) + val pipeline = ParquetMapperPipelineFactory().create(airbyteStream) + val columns = + mutableListOf( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "name", Types.StringType.get()), + ) + val schema = Schema(columns, setOf(1)) + val icebergRecord = + icebergUtil.toRecord( + record = airbyteRecord, + pipeline = pipeline, + tableSchema = schema, + stream = airbyteStream + ) + assertNotNull(icebergRecord) + assertEquals(RecordWrapper::class.java, icebergRecord.javaClass) + assertEquals(Operation.UPDATE, (icebergRecord as RecordWrapper).operation) + } + @Test fun testCatalogProperties() { val awsAccessKey = "access-key" @@ -229,7 +343,7 @@ internal class IcebergUtilTest { s3BucketConfiguration = s3BucketConfiguration, ) val catalogProperties = - IcebergUtil.toCatalogProperties(icebergConfiguration = configuration) + icebergUtil.toCatalogProperties(icebergConfiguration = configuration) assertEquals(ICEBERG_CATALOG_TYPE_NESSIE, catalogProperties[ICEBERG_CATALOG_TYPE]) assertEquals(nessieServerUri, catalogProperties[URI]) assertEquals(warehouseLocation, catalogProperties[WAREHOUSE_LOCATION]) @@ -248,7 +362,7 @@ internal class IcebergUtilTest { fun `assertGenerationIdSuffixIsOfValidFormat accepts valid format`() { val validGenerationId = "ab-generation-id-123-e" assertDoesNotThrow { - IcebergUtil.assertGenerationIdSuffixIsOfValidFormat(validGenerationId) + icebergUtil.assertGenerationIdSuffixIsOfValidFormat(validGenerationId) } } @@ -257,7 +371,7 @@ internal class IcebergUtilTest { val invalidGenerationId = "invalid-generation-id-123" val exception = assertThrows { - IcebergUtil.assertGenerationIdSuffixIsOfValidFormat(invalidGenerationId) + icebergUtil.assertGenerationIdSuffixIsOfValidFormat(invalidGenerationId) } assertEquals( "Invalid format: $invalidGenerationId. Expected format is 'ab-generation-id--e'", @@ -270,7 +384,7 @@ internal class IcebergUtilTest { val invalidGenerationId = "ab-generation-id-" val exception = assertThrows { - IcebergUtil.assertGenerationIdSuffixIsOfValidFormat(invalidGenerationId) + icebergUtil.assertGenerationIdSuffixIsOfValidFormat(invalidGenerationId) } assertEquals( "Invalid format: $invalidGenerationId. Expected format is 'ab-generation-id--e'", @@ -283,7 +397,7 @@ internal class IcebergUtilTest { val stream = mockk() every { stream.generationId } returns 42 val expectedSuffix = "ab-generation-id-42-e" - val result = IcebergUtil.constructGenerationIdSuffix(stream) + val result = icebergUtil.constructGenerationIdSuffix(stream) assertEquals(expectedSuffix, result) } @@ -293,11 +407,72 @@ internal class IcebergUtilTest { every { stream.generationId } returns -1 val exception = assertThrows { - IcebergUtil.constructGenerationIdSuffix(stream) + icebergUtil.constructGenerationIdSuffix(stream) } assertEquals( "GenerationId must be non-negative. Provided: ${stream.generationId}", exception.message ) } + + @Test + fun testConversionToIcebergSchemaWithMetadataAndPrimaryKey() { + val streamDescriptor = DestinationStream.Descriptor(namespace = "namespace", name = "name") + val primaryKeys = listOf("id") + val stream = + DestinationStream( + descriptor = streamDescriptor, + importType = Dedupe(primaryKey = listOf(primaryKeys), cursor = primaryKeys), + schema = + ObjectType( + linkedMapOf( + "id" to FieldType(IntegerType, nullable = false), + "name" to FieldType(StringType, nullable = true), + ) + ), + generationId = 1, + minimumGenerationId = 1, + syncId = 1, + ) + val pipeline = ParquetMapperPipelineFactory().create(stream) + val schema = icebergUtil.toIcebergSchema(stream = stream, pipeline = pipeline) + assertEquals(primaryKeys.toSet(), schema.identifierFieldNames()) + assertEquals(6, schema.columns().size) + assertNotNull(schema.findField("id")) + assertNotNull(schema.findField("name")) + assertNotNull(schema.findField(COLUMN_NAME_AB_RAW_ID)) + assertNotNull(schema.findField(COLUMN_NAME_AB_EXTRACTED_AT)) + assertNotNull(schema.findField(COLUMN_NAME_AB_META)) + assertNotNull(schema.findField(COLUMN_NAME_AB_GENERATION_ID)) + } + + @Test + fun testConversionToIcebergSchemaWithMetadataAndWithoutPrimaryKey() { + val streamDescriptor = DestinationStream.Descriptor(namespace = "namespace", name = "name") + val stream = + DestinationStream( + descriptor = streamDescriptor, + importType = Append, + schema = + ObjectType( + linkedMapOf( + "id" to FieldType(IntegerType, nullable = true), + "name" to FieldType(StringType, nullable = true), + ) + ), + generationId = 1, + minimumGenerationId = 1, + syncId = 1, + ) + val pipeline = ParquetMapperPipelineFactory().create(stream) + val schema = icebergUtil.toIcebergSchema(stream = stream, pipeline = pipeline) + assertEquals(emptySet(), schema.identifierFieldNames()) + assertEquals(6, schema.columns().size) + assertNotNull(schema.findField("id")) + assertNotNull(schema.findField("name")) + assertNotNull(schema.findField(COLUMN_NAME_AB_RAW_ID)) + assertNotNull(schema.findField(COLUMN_NAME_AB_EXTRACTED_AT)) + assertNotNull(schema.findField(COLUMN_NAME_AB_META)) + assertNotNull(schema.findField(COLUMN_NAME_AB_GENERATION_ID)) + } }