Skip to content

Commit

Permalink
basic cleaner
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Dec 16, 2024
1 parent 59aea3b commit 0f2354d
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ abstract class IntegrationTest(

@Suppress("DEPRECATION") private val randomSuffix = RandomStringUtils.randomAlphabetic(4)
private val timestampString =
LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC)
.format(DateTimeFormatter.ofPattern("YYYYMMDD"))
LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC).format(randomizedNamespaceDateFormatter)
// stream name doesn't need to be randomized, only the namespace.
val randomizedNamespace = "test$timestampString$randomSuffix"

Expand Down Expand Up @@ -262,6 +261,9 @@ abstract class IntegrationTest(
}

companion object {
val randomizedNamespaceRegex = Regex("test(\\d{8})[A-Za-z]{4}")
val randomizedNamespaceDateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd")

private val hasRunCleaner = AtomicBoolean(false)

// Connectors are calling System.getenv rather than using micronaut-y properties,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.airbyte.integrations.destination.iceberg.v2

import io.airbyte.cdk.load.test.util.DestinationCleaner
import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.randomizedNamespaceDateFormatter
import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.randomizedNamespaceRegex
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil
import java.time.LocalDate
import org.apache.iceberg.catalog.Catalog
import org.apache.iceberg.catalog.Namespace
import org.apache.iceberg.catalog.SupportsNamespaces

class IcebergDestinationCleaner(private val catalog: Catalog) : DestinationCleaner {
constructor(configuration: IcebergV2Configuration) : this(
IcebergUtil(SimpleTableIdGenerator()).let { icebergUtil ->
val props = icebergUtil.toCatalogProperties(configuration)
icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, props)
}
)

override fun cleanup() {
val cleanupCutoffDate = LocalDate.now().minusDays(15)
val namespaces: List<Namespace> =
(catalog as SupportsNamespaces)
.listNamespaces()
.filter {
val matchResult = randomizedNamespaceRegex.find(it.level(0))
if (matchResult == null) {
false
} else {
val namespaceCreationDate = LocalDate.parse(
matchResult.groupValues[1],
randomizedNamespaceDateFormatter
)
namespaceCreationDate.isBefore(cleanupCutoffDate)
}
}
namespaces.forEach { namespace ->
catalog.listTables(namespace)
.forEach { table -> catalog.dropTable(table, /* purge = */ true) }
catalog.dropNamespace(namespace)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@

package io.airbyte.integrations.destination.iceberg.v2

import io.airbyte.cdk.command.ValidatedJsonUtils
import java.net.URI
import java.nio.file.Files
import java.nio.file.Path

object IcebergV2TestUtil {
// TODO this is just here as an example, we should remove it + add real configs
val MINIMAL_CONFIG_PATH: Path = Path.of(getResourceUri("iceberg_dest_v2_minimal_required_config.json"))
val GLUE_CONFIG_PATH: Path = Path.of("secrets/glue.json")

fun parseConfig(path: Path) =
IcebergV2ConfigurationFactory().makeWithoutExceptionHandling(
ValidatedJsonUtils.parseOne(IcebergV2Specification::class.java, Files.readString(path))
)

private fun getResourceUri(path: String): URI =
this::class.java.classLoader.getResource(path)?.toURI()
?: throw IllegalArgumentException("File not found in resources")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import io.airbyte.cdk.load.write.StronglyTyped
import java.util.Base64
import okhttp3.*
import okhttp3.FormBody
import okhttp3.OkHttpClient
import okhttp3.Request
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import java.nio.file.Files
import java.util.Base64

abstract class IcebergV2WriteTest(
configContents: String,
Expand All @@ -26,7 +28,7 @@ abstract class IcebergV2WriteTest(
configContents,
IcebergV2Specification::class.java,
IcebergV2DataDumper,
NoopDestinationCleaner,
destinationCleaner,
NoopExpectedRecordMapper,
isStreamSchemaRetroactive = true,
supportsDedup = false,
Expand All @@ -36,19 +38,11 @@ abstract class IcebergV2WriteTest(
commitDataIncrementally = false,
supportFileTransfer = false,
allTypesBehavior = StronglyTyped(),
) {
companion object {
@JvmStatic
@BeforeAll
fun setup() {
NessieTestContainers.start()
}
}
}
)

class IcebergGlueWriteTest : IcebergV2WriteTest(
Files.readString(IcebergV2TestUtil.GLUE_CONFIG_PATH),
NoopDestinationCleaner,
IcebergDestinationCleaner(IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH)),
) {
@Test
override fun testBasicWrite() {
Expand Down Expand Up @@ -159,5 +153,11 @@ class IcebergNessieMinioWriteTest : IcebergV2WriteTest(
}
""".trimIndent()
}

@JvmStatic
@BeforeAll
fun setup() {
NessieTestContainers.start()
}
}
}

0 comments on commit 0f2354d

Please sign in to comment.