From c780537a05ef08319c21ffb99e70abb4949932d4 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 16 Dec 2024 10:22:42 -0800 Subject: [PATCH] get correct catalog instance --- .../iceberg/v2/IcebergV2DataDumper.kt | 21 ++++++------------- .../iceberg/v2/IcebergV2TestUtil.kt | 15 ++++++++++--- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt index 5bf76c748afd..d3fe9f0173a9 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt @@ -83,15 +83,13 @@ object IcebergV2DataDumper : DestinationDataDumper { spec: ConfigurationSpecification, stream: DestinationStream ): List { - val config = - IcebergV2ConfigurationFactory() - .makeWithoutExceptionHandling(spec as IcebergV2Specification) + val config = IcebergV2TestUtil.getConfig(spec) val pipeline = ParquetMapperPipelineFactory().create(stream) val schema = pipeline.finalSchema as ObjectType - val catalog = getNessieCatalog(config) + val catalog = IcebergV2TestUtil.getCatalog(config) val table = catalog.loadTable( - TableIdentifier.of(stream.descriptor.namespace, stream.descriptor.name) + TableIdGeneratorFactory(config).create().toTableIdentifier(stream.descriptor) ) val outputRecords = mutableListOf() @@ -121,7 +119,9 @@ object IcebergV2DataDumper : DestinationDataDumper { } } - catalog.close() + if (catalog is AutoCloseable) { + catalog.close() + } return outputRecords } @@ -131,13 +131,4 @@ object IcebergV2DataDumper : DestinationDataDumper { ): List { throw NotImplementedError("Iceberg doesn't support universal file transfer") } - - private fun getNessieCatalog(config: IcebergV2Configuration): NessieCatalog { - val catalogProperties = IcebergUtil(SimpleTableIdGenerator()).toCatalogProperties(config) - - val catalog = NessieCatalog() - catalog.setConf(Configuration()) - catalog.initialize("nessie", catalogProperties) - return catalog - } } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt index f87741015f4f..07d00365d322 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt @@ -4,7 +4,9 @@ package io.airbyte.integrations.destination.iceberg.v2 +import io.airbyte.cdk.command.ConfigurationSpecification import io.airbyte.cdk.command.ValidatedJsonUtils +import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil import java.net.URI import java.nio.file.Files import java.nio.file.Path @@ -15,9 +17,16 @@ object IcebergV2TestUtil { val GLUE_CONFIG_PATH: Path = Path.of("secrets/glue.json") fun parseConfig(path: Path) = - IcebergV2ConfigurationFactory().makeWithoutExceptionHandling( - ValidatedJsonUtils.parseOne(IcebergV2Specification::class.java, Files.readString(path)) - ) + getConfig(ValidatedJsonUtils.parseOne(IcebergV2Specification::class.java, Files.readString(path))) + + fun getConfig(spec: ConfigurationSpecification) = + IcebergV2ConfigurationFactory().makeWithoutExceptionHandling(spec as IcebergV2Specification) + + fun getCatalog(config: IcebergV2Configuration) = + IcebergUtil(SimpleTableIdGenerator()).let { icebergUtil -> + val props = icebergUtil.toCatalogProperties(config) + icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, props) + } private fun getResourceUri(path: String): URI = this::class.java.classLoader.getResource(path)?.toURI()