Skip to content

Commit

Permalink
add table id generator
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Dec 14, 2024
1 parent 5ce5dac commit 59aea3b
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import io.airbyte.cdk.load.check.DestinationChecker
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableCleaner
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil
import io.airbyte.integrations.destination.iceberg.v2.io.toIcebergTableIdentifier
import javax.inject.Singleton
import org.apache.iceberg.Schema
import org.apache.iceberg.types.Types

@Singleton
class IcebergV2Checker(
private val icebergTableCleaner: IcebergTableCleaner,
private val icebergUtil: IcebergUtil
private val icebergUtil: IcebergUtil,
private val tableIdGenerator: TableIdGenerator,
) : DestinationChecker<IcebergV2Configuration> {

override fun check(config: IcebergV2Configuration) {
Expand All @@ -43,7 +43,7 @@ class IcebergV2Checker(

icebergTableCleaner.clearTable(
catalog,
testTableIdentifier.toIcebergTableIdentifier(),
tableIdGenerator.toTableIdentifier(testTableIdentifier),
table.io(),
table.location()
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.airbyte.integrations.destination.iceberg.v2

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.iceberg.parquet.GlueCatalogConfiguration
import io.micronaut.context.annotation.Factory
import javax.inject.Singleton
import org.apache.iceberg.catalog.Namespace
import org.apache.iceberg.catalog.TableIdentifier

/**
* Convert our internal stream descriptor to an Iceberg [TableIdentifier].
* Implementations should handle catalog-specific naming restrictions.
*/
// TODO accept default namespace in config as a val here
interface TableIdGenerator {
fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier
}

class SimpleTableIdGenerator : TableIdGenerator {
override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier =
tableIdOf(stream.namespace!!, stream.name)
}

/**
* AWS Glue requires lowercase database+table names.
*/
class GlueTableIdGenerator : TableIdGenerator {
override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier =
tableIdOf(stream.namespace!!.lowercase(), stream.name.lowercase())
}

@Factory
class TableIdGeneratorFactory(private val icebergConfiguration: IcebergV2Configuration) {
@Singleton
fun create() =
when (icebergConfiguration.icebergCatalogConfiguration.catalogConfiguration) {
is GlueCatalogConfiguration -> GlueTableIdGenerator()
else -> SimpleTableIdGenerator()
}
}

// iceberg namespace+name must both be nonnull.
private fun tableIdOf(namespace: String, name: String) =
TableIdentifier.of(Namespace.of(namespace), name)
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import io.airbyte.integrations.destination.iceberg.v2.ACCESS_KEY_ID
import io.airbyte.integrations.destination.iceberg.v2.GlueCredentialsProvider
import io.airbyte.integrations.destination.iceberg.v2.IcebergV2Configuration
import io.airbyte.integrations.destination.iceberg.v2.SECRET_ACCESS_KEY
import io.airbyte.integrations.destination.iceberg.v2.TableIdGenerator
import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.inject.Singleton
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -51,19 +52,9 @@ private val logger = KotlinLogging.logger {}

const val AIRBYTE_CDC_DELETE_COLUMN = "_ab_cdc_deleted_at"

/**
* Extension function for the[DestinationStream.Descriptor] class that converts the descriptor to an
* Iceberg [TableIdentifier].
*
* @return An Iceberg [TableIdentifier] representation of the stream descriptor.
*/
fun DestinationStream.Descriptor.toIcebergTableIdentifier(): TableIdentifier {
return TableIdentifier.of(Namespace.of(this.namespace), this.name)
}

/** Collection of Iceberg related utilities. */
@Singleton
class IcebergUtil {
class IcebergUtil(private val tableIdGenerator: TableIdGenerator) {
internal class InvalidFormatException(message: String) : Exception(message)

private val generationIdRegex = Regex("""ab-generation-id-\d+-e""")
Expand Down Expand Up @@ -117,7 +108,7 @@ class IcebergUtil {
schema: Schema,
properties: Map<String, String>
): Table {
val tableIdentifier = streamDescriptor.toIcebergTableIdentifier()
val tableIdentifier = tableIdGenerator.toTableIdentifier(streamDescriptor)
synchronized(tableIdentifier.namespace()) {
if (
catalog is SupportsNamespaces &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ object IcebergV2DataDumper : DestinationDataDumper {
}

private fun getNessieCatalog(config: IcebergV2Configuration): NessieCatalog {
val catalogProperties = IcebergUtil().toCatalogProperties(config)
val catalogProperties = IcebergUtil(SimpleTableIdGenerator()).toCatalogProperties(config)

val catalog = NessieCatalog()
catalog.setConf(Configuration())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_
import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_META
import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_RAW_ID
import io.airbyte.integrations.destination.iceberg.v2.IcebergV2Configuration
import io.airbyte.integrations.destination.iceberg.v2.SimpleTableIdGenerator
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
Expand All @@ -52,10 +53,11 @@ import org.junit.jupiter.api.assertThrows
internal class IcebergUtilTest {

private lateinit var icebergUtil: IcebergUtil
private val tableIdGenerator = SimpleTableIdGenerator()

@BeforeEach
fun setup() {
icebergUtil = IcebergUtil()
icebergUtil = IcebergUtil(tableIdGenerator)
}

@Test
Expand Down Expand Up @@ -86,11 +88,11 @@ internal class IcebergUtilTest {
every { create() } returns mockk()
}
val catalog: NessieCatalog = mockk {
every { buildTable(streamDescriptor.toIcebergTableIdentifier(), any()) } returns
every { buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any()) } returns
tableBuilder
every { createNamespace(any()) } returns Unit
every { namespaceExists(any()) } returns false
every { tableExists(streamDescriptor.toIcebergTableIdentifier()) } returns false
every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns false
}
val table =
icebergUtil.createTable(
Expand All @@ -101,7 +103,7 @@ internal class IcebergUtilTest {
)
assertNotNull(table)
verify(exactly = 1) {
catalog.createNamespace(streamDescriptor.toIcebergTableIdentifier().namespace())
catalog.createNamespace(tableIdGenerator.toTableIdentifier(streamDescriptor).namespace())
}
verify(exactly = 1) { tableBuilder.create() }
}
Expand All @@ -119,10 +121,10 @@ internal class IcebergUtilTest {
every { create() } returns mockk()
}
val catalog: NessieCatalog = mockk {
every { buildTable(streamDescriptor.toIcebergTableIdentifier(), any()) } returns
every { buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any()) } returns
tableBuilder
every { namespaceExists(any()) } returns true
every { tableExists(streamDescriptor.toIcebergTableIdentifier()) } returns false
every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns false
}
val table =
icebergUtil.createTable(
Expand All @@ -133,7 +135,7 @@ internal class IcebergUtilTest {
)
assertNotNull(table)
verify(exactly = 0) {
catalog.createNamespace(streamDescriptor.toIcebergTableIdentifier().namespace())
catalog.createNamespace(tableIdGenerator.toTableIdentifier(streamDescriptor).namespace())
}
verify(exactly = 1) { tableBuilder.create() }
}
Expand All @@ -144,9 +146,9 @@ internal class IcebergUtilTest {
val streamDescriptor = DestinationStream.Descriptor("namespace", "name")
val schema = Schema()
val catalog: NessieCatalog = mockk {
every { loadTable(streamDescriptor.toIcebergTableIdentifier()) } returns mockk()
every { loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns mockk()
every { namespaceExists(any()) } returns true
every { tableExists(streamDescriptor.toIcebergTableIdentifier()) } returns true
every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns true
}
val table =
icebergUtil.createTable(
Expand All @@ -157,9 +159,9 @@ internal class IcebergUtilTest {
)
assertNotNull(table)
verify(exactly = 0) {
catalog.createNamespace(streamDescriptor.toIcebergTableIdentifier().namespace())
catalog.createNamespace(tableIdGenerator.toTableIdentifier(streamDescriptor).namespace())
}
verify(exactly = 1) { catalog.loadTable(streamDescriptor.toIcebergTableIdentifier()) }
verify(exactly = 1) { catalog.loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor)) }
}

@Test
Expand Down

0 comments on commit 59aea3b

Please sign in to comment.