Skip to content

Commit

Permalink
get correct catalog instance
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Dec 16, 2024
1 parent 0f2354d commit c780537
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,13 @@ object IcebergV2DataDumper : DestinationDataDumper {
spec: ConfigurationSpecification,
stream: DestinationStream
): List<OutputRecord> {
val config =
IcebergV2ConfigurationFactory()
.makeWithoutExceptionHandling(spec as IcebergV2Specification)
val config = IcebergV2TestUtil.getConfig(spec)
val pipeline = ParquetMapperPipelineFactory().create(stream)
val schema = pipeline.finalSchema as ObjectType
val catalog = getNessieCatalog(config)
val catalog = IcebergV2TestUtil.getCatalog(config)
val table =
catalog.loadTable(
TableIdentifier.of(stream.descriptor.namespace, stream.descriptor.name)
TableIdGeneratorFactory(config).create().toTableIdentifier(stream.descriptor)
)

val outputRecords = mutableListOf<OutputRecord>()
Expand Down Expand Up @@ -121,7 +119,9 @@ object IcebergV2DataDumper : DestinationDataDumper {
}
}

catalog.close()
if (catalog is AutoCloseable) {
catalog.close()
}
return outputRecords
}

Expand All @@ -131,13 +131,4 @@ object IcebergV2DataDumper : DestinationDataDumper {
): List<String> {
throw NotImplementedError("Iceberg doesn't support universal file transfer")
}

private fun getNessieCatalog(config: IcebergV2Configuration): NessieCatalog {
val catalogProperties = IcebergUtil(SimpleTableIdGenerator()).toCatalogProperties(config)

val catalog = NessieCatalog()
catalog.setConf(Configuration())
catalog.initialize("nessie", catalogProperties)
return catalog
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

package io.airbyte.integrations.destination.iceberg.v2

import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.ValidatedJsonUtils
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil
import java.net.URI
import java.nio.file.Files
import java.nio.file.Path
Expand All @@ -15,9 +17,16 @@ object IcebergV2TestUtil {
val GLUE_CONFIG_PATH: Path = Path.of("secrets/glue.json")

fun parseConfig(path: Path) =
IcebergV2ConfigurationFactory().makeWithoutExceptionHandling(
ValidatedJsonUtils.parseOne(IcebergV2Specification::class.java, Files.readString(path))
)
getConfig(ValidatedJsonUtils.parseOne(IcebergV2Specification::class.java, Files.readString(path)))

fun getConfig(spec: ConfigurationSpecification) =
IcebergV2ConfigurationFactory().makeWithoutExceptionHandling(spec as IcebergV2Specification)

fun getCatalog(config: IcebergV2Configuration) =
IcebergUtil(SimpleTableIdGenerator()).let { icebergUtil ->
val props = icebergUtil.toCatalogProperties(config)
icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, props)
}

private fun getResourceUri(path: String): URI =
this::class.java.classLoader.getResource(path)?.toURI()
Expand Down

0 comments on commit c780537

Please sign in to comment.