diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Checker.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Checker.kt index bfa977196f4e..d790c27560cd 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Checker.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Checker.kt @@ -8,7 +8,6 @@ import io.airbyte.cdk.load.check.DestinationChecker import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableCleaner import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil -import io.airbyte.integrations.destination.iceberg.v2.io.toIcebergTableIdentifier import javax.inject.Singleton import org.apache.iceberg.Schema import org.apache.iceberg.types.Types @@ -16,7 +15,8 @@ import org.apache.iceberg.types.Types @Singleton class IcebergV2Checker( private val icebergTableCleaner: IcebergTableCleaner, - private val icebergUtil: IcebergUtil + private val icebergUtil: IcebergUtil, + private val tableIdGenerator: TableIdGenerator, ) : DestinationChecker { override fun check(config: IcebergV2Configuration) { @@ -43,7 +43,7 @@ class IcebergV2Checker( icebergTableCleaner.clearTable( catalog, - testTableIdentifier.toIcebergTableIdentifier(), + tableIdGenerator.toTableIdentifier(testTableIdentifier), table.io(), table.location() ) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/TableIdGenerator.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/TableIdGenerator.kt new file mode 100644 index 000000000000..5222c7a12d41 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/TableIdGenerator.kt @@ -0,0 +1,44 @@ +package io.airbyte.integrations.destination.iceberg.v2 + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.iceberg.parquet.GlueCatalogConfiguration +import io.micronaut.context.annotation.Factory +import javax.inject.Singleton +import org.apache.iceberg.catalog.Namespace +import org.apache.iceberg.catalog.TableIdentifier + +/** + * Convert our internal stream descriptor to an Iceberg [TableIdentifier]. + * Implementations should handle catalog-specific naming restrictions. + */ +// TODO accept default namespace in config as a val here +interface TableIdGenerator { + fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier +} + +class SimpleTableIdGenerator : TableIdGenerator { + override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier = + tableIdOf(stream.namespace!!, stream.name) +} + +/** + * AWS Glue requires lowercase database+table names. + */ +class GlueTableIdGenerator : TableIdGenerator { + override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier = + tableIdOf(stream.namespace!!.lowercase(), stream.name.lowercase()) +} + +@Factory +class TableIdGeneratorFactory(private val icebergConfiguration: IcebergV2Configuration) { + @Singleton + fun create() = + when (icebergConfiguration.icebergCatalogConfiguration.catalogConfiguration) { + is GlueCatalogConfiguration -> GlueTableIdGenerator() + else -> SimpleTableIdGenerator() + } +} + +// iceberg namespace+name must both be nonnull. +private fun tableIdOf(namespace: String, name: String) = + TableIdentifier.of(Namespace.of(namespace), name) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt index 3771af1b36ac..e516b02ef1ca 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt @@ -20,6 +20,7 @@ import io.airbyte.integrations.destination.iceberg.v2.ACCESS_KEY_ID import io.airbyte.integrations.destination.iceberg.v2.GlueCredentialsProvider import io.airbyte.integrations.destination.iceberg.v2.IcebergV2Configuration import io.airbyte.integrations.destination.iceberg.v2.SECRET_ACCESS_KEY +import io.airbyte.integrations.destination.iceberg.v2.TableIdGenerator import io.github.oshai.kotlinlogging.KotlinLogging import jakarta.inject.Singleton import org.apache.hadoop.conf.Configuration @@ -51,19 +52,9 @@ private val logger = KotlinLogging.logger {} const val AIRBYTE_CDC_DELETE_COLUMN = "_ab_cdc_deleted_at" -/** - * Extension function for the[DestinationStream.Descriptor] class that converts the descriptor to an - * Iceberg [TableIdentifier]. - * - * @return An Iceberg [TableIdentifier] representation of the stream descriptor. - */ -fun DestinationStream.Descriptor.toIcebergTableIdentifier(): TableIdentifier { - return TableIdentifier.of(Namespace.of(this.namespace), this.name) -} - /** Collection of Iceberg related utilities. */ @Singleton -class IcebergUtil { +class IcebergUtil(private val tableIdGenerator: TableIdGenerator) { internal class InvalidFormatException(message: String) : Exception(message) private val generationIdRegex = Regex("""ab-generation-id-\d+-e""") @@ -117,7 +108,7 @@ class IcebergUtil { schema: Schema, properties: Map ): Table { - val tableIdentifier = streamDescriptor.toIcebergTableIdentifier() + val tableIdentifier = tableIdGenerator.toTableIdentifier(streamDescriptor) synchronized(tableIdentifier.namespace()) { if ( catalog is SupportsNamespaces && diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt index 5689320eedbb..5bf76c748afd 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt @@ -133,7 +133,7 @@ object IcebergV2DataDumper : DestinationDataDumper { } private fun getNessieCatalog(config: IcebergV2Configuration): NessieCatalog { - val catalogProperties = IcebergUtil().toCatalogProperties(config) + val catalogProperties = IcebergUtil(SimpleTableIdGenerator()).toCatalogProperties(config) val catalog = NessieCatalog() catalog.setConf(Configuration()) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt index 82279fbd0bef..fbd85281e0b3 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt @@ -27,6 +27,7 @@ import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_ import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_META import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_RAW_ID import io.airbyte.integrations.destination.iceberg.v2.IcebergV2Configuration +import io.airbyte.integrations.destination.iceberg.v2.SimpleTableIdGenerator import io.mockk.every import io.mockk.mockk import io.mockk.verify @@ -52,10 +53,11 @@ import org.junit.jupiter.api.assertThrows internal class IcebergUtilTest { private lateinit var icebergUtil: IcebergUtil + private val tableIdGenerator = SimpleTableIdGenerator() @BeforeEach fun setup() { - icebergUtil = IcebergUtil() + icebergUtil = IcebergUtil(tableIdGenerator) } @Test @@ -86,11 +88,11 @@ internal class IcebergUtilTest { every { create() } returns mockk() } val catalog: NessieCatalog = mockk { - every { buildTable(streamDescriptor.toIcebergTableIdentifier(), any()) } returns + every { buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any()) } returns tableBuilder every { createNamespace(any()) } returns Unit every { namespaceExists(any()) } returns false - every { tableExists(streamDescriptor.toIcebergTableIdentifier()) } returns false + every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns false } val table = icebergUtil.createTable( @@ -101,7 +103,7 @@ internal class IcebergUtilTest { ) assertNotNull(table) verify(exactly = 1) { - catalog.createNamespace(streamDescriptor.toIcebergTableIdentifier().namespace()) + catalog.createNamespace(tableIdGenerator.toTableIdentifier(streamDescriptor).namespace()) } verify(exactly = 1) { tableBuilder.create() } } @@ -119,10 +121,10 @@ internal class IcebergUtilTest { every { create() } returns mockk() } val catalog: NessieCatalog = mockk { - every { buildTable(streamDescriptor.toIcebergTableIdentifier(), any()) } returns + every { buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any()) } returns tableBuilder every { namespaceExists(any()) } returns true - every { tableExists(streamDescriptor.toIcebergTableIdentifier()) } returns false + every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns false } val table = icebergUtil.createTable( @@ -133,7 +135,7 @@ internal class IcebergUtilTest { ) assertNotNull(table) verify(exactly = 0) { - catalog.createNamespace(streamDescriptor.toIcebergTableIdentifier().namespace()) + catalog.createNamespace(tableIdGenerator.toTableIdentifier(streamDescriptor).namespace()) } verify(exactly = 1) { tableBuilder.create() } } @@ -144,9 +146,9 @@ internal class IcebergUtilTest { val streamDescriptor = DestinationStream.Descriptor("namespace", "name") val schema = Schema() val catalog: NessieCatalog = mockk { - every { loadTable(streamDescriptor.toIcebergTableIdentifier()) } returns mockk() + every { loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns mockk() every { namespaceExists(any()) } returns true - every { tableExists(streamDescriptor.toIcebergTableIdentifier()) } returns true + every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns true } val table = icebergUtil.createTable( @@ -157,9 +159,9 @@ internal class IcebergUtilTest { ) assertNotNull(table) verify(exactly = 0) { - catalog.createNamespace(streamDescriptor.toIcebergTableIdentifier().namespace()) + catalog.createNamespace(tableIdGenerator.toTableIdentifier(streamDescriptor).namespace()) } - verify(exactly = 1) { catalog.loadTable(streamDescriptor.toIcebergTableIdentifier()) } + verify(exactly = 1) { catalog.loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor)) } } @Test