diff --git a/.github/workflows/publish-bulk-cdk.yml b/.github/workflows/publish-bulk-cdk.yml index a1ef1e716c85..d0f0e4c91c33 100644 --- a/.github/workflows/publish-bulk-cdk.yml +++ b/.github/workflows/publish-bulk-cdk.yml @@ -76,6 +76,16 @@ jobs: gradle-distribution-sha-256-sum-warning: false arguments: --scan :airbyte-cdk:bulk:bulkCdkBuild + - name: Integration test Bulk CDK + uses: burrunan/gradle-cache-action@v1 + env: + CI: true + with: + job-id: bulk-cdk-publish + concurrent: true + gradle-distribution-sha-256-sum-warning: false + arguments: --scan :airbyte-cdk:bulk:bulkCdkIntegrationTest + - name: Publish Poms and Jars to CloudRepo uses: burrunan/gradle-cache-action@v1 env: diff --git a/airbyte-cdk/bulk/build.gradle b/airbyte-cdk/bulk/build.gradle index b8474f81a507..6f5cb2ee9a2a 100644 --- a/airbyte-cdk/bulk/build.gradle +++ b/airbyte-cdk/bulk/build.gradle @@ -61,6 +61,12 @@ allprojects { } } +tasks.register('bulkCdkIntegrationTest').configure { + // findByName returns the task, or null if no such task exists. + // we need this because not all submodules have an integrationTest task. + dependsOn allprojects.collect {it.tasks.findByName('integrationTest')}.findAll {it != null} +} + if (buildNumberFile.exists()) { tasks.register('bulkCdkBuild').configure { dependsOn allprojects.collect {it.tasks.named('build')} diff --git a/airbyte-cdk/bulk/core/load/build.gradle b/airbyte-cdk/bulk/core/load/build.gradle index e670f0cbe328..c593e6e9446b 100644 --- a/airbyte-cdk/bulk/core/load/build.gradle +++ b/airbyte-cdk/bulk/core/load/build.gradle @@ -1,3 +1,16 @@ +// simply declaring the source sets is sufficient to populate them with +// src/integrationTest/java+resources + src/integrationTest/kotlin. +sourceSets { + integrationTest { + } +} +kotlin { + sourceSets { + testIntegration { + } + } +} + dependencies { implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base') implementation 'org.apache.commons:commons-lang3:3.17.0' @@ -10,3 +23,18 @@ dependencies { testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1") implementation "org.jetbrains.kotlin:kotlin-reflect:2.0.20" } + +task integrationTest(type: Test) { + description = 'Runs the integration tests.' + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + classpath = sourceSets.integrationTest.runtimeClasspath + useJUnitPlatform() + mustRunAfter tasks.check +} +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntimeOnly.extendsFrom testRuntimeOnly +} +// These tests are lightweight enough to run on every PR. +rootProject.check.dependsOn(integrationTest) diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt new file mode 100644 index 000000000000..0513c31bdf09 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.mock_integration_test + +import io.airbyte.cdk.test.util.NoopDestinationCleaner +import io.airbyte.cdk.test.util.NoopExpectedRecordMapper +import io.airbyte.cdk.test.util.NoopNameMapper +import io.airbyte.cdk.test.write.BasicFunctionalityIntegrationTest + +class MockBasicFunctionalityIntegrationTest : + BasicFunctionalityIntegrationTest( + MockDestinationSpecification(), + MockDestinationDataDumper, + NoopDestinationCleaner, + NoopExpectedRecordMapper, + NoopNameMapper + ) diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationBackend.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationBackend.kt new file mode 100644 index 000000000000..99353873200f --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationBackend.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.mock_integration_test + +import io.airbyte.cdk.test.util.DestinationDataDumper +import io.airbyte.cdk.test.util.OutputRecord +import java.util.concurrent.ConcurrentHashMap + +object MockDestinationBackend { + private val files: MutableMap> = ConcurrentHashMap() + + fun insert(filename: String, vararg records: OutputRecord) { + getFile(filename).addAll(records) + } + + fun readFile(filename: String): List { + return getFile(filename) + } + + private fun getFile(filename: String): MutableList { + return files.getOrPut(filename) { mutableListOf() } + } +} + +object MockDestinationDataDumper : DestinationDataDumper { + override fun dumpRecords(streamName: String, streamNamespace: String?): List { + return MockDestinationBackend.readFile( + MockStreamLoader.getFilename(streamNamespace, streamName) + ) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationChecker.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationChecker.kt new file mode 100644 index 000000000000..824e880dc134 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationChecker.kt @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.mock_integration_test + +import io.airbyte.cdk.check.DestinationChecker +import javax.inject.Singleton + +@Singleton +class MockDestinationChecker : DestinationChecker { + override fun check(config: MockDestinationConfiguration) {} +} diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationConfiguration.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationConfiguration.kt new file mode 100644 index 000000000000..ac92cf54aba6 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationConfiguration.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.mock_integration_test + +import io.airbyte.cdk.command.ConfigurationSpecification +import io.airbyte.cdk.command.DestinationConfiguration +import io.airbyte.cdk.command.DestinationConfigurationFactory +import io.micronaut.context.annotation.Factory +import jakarta.inject.Singleton + +class MockDestinationConfiguration : DestinationConfiguration() + +@Singleton class MockDestinationSpecification : ConfigurationSpecification() + +@Singleton +class MockDestinationConfigurationFactory : + DestinationConfigurationFactory { + + override fun makeWithoutExceptionHandling( + pojo: MockDestinationSpecification + ): MockDestinationConfiguration { + return MockDestinationConfiguration() + } +} + +@Factory +class MockDestinationConfigurationProvider(private val config: DestinationConfiguration) { + @Singleton + fun get(): MockDestinationConfiguration { + return config as MockDestinationConfiguration + } +} diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationWriter.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationWriter.kt new file mode 100644 index 000000000000..7b8c5c4522f6 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationWriter.kt @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.mock_integration_test + +import io.airbyte.cdk.command.DestinationStream +import io.airbyte.cdk.data.ObjectValue +import io.airbyte.cdk.message.Batch +import io.airbyte.cdk.message.DestinationRecord +import io.airbyte.cdk.message.SimpleBatch +import io.airbyte.cdk.test.util.OutputRecord +import io.airbyte.cdk.write.DestinationWriter +import io.airbyte.cdk.write.StreamLoader +import java.time.Instant +import java.util.UUID +import javax.inject.Singleton + +@Singleton +class MockDestinationWriter : DestinationWriter { + override fun createStreamLoader(stream: DestinationStream): StreamLoader { + return MockStreamLoader(stream) + } +} + +class MockStreamLoader(override val stream: DestinationStream) : StreamLoader { + data class LocalBatch(val records: List) : Batch { + override val state = Batch.State.LOCAL + } + data class PersistedBatch(val records: List) : Batch { + override val state = Batch.State.PERSISTED + } + + override suspend fun processRecords( + records: Iterator, + totalSizeBytes: Long + ): Batch { + return LocalBatch(records.asSequence().toList()) + } + + override suspend fun processBatch(batch: Batch): Batch { + return when (batch) { + is LocalBatch -> { + batch.records.forEach { + MockDestinationBackend.insert( + getFilename(it.stream), + OutputRecord( + UUID.randomUUID(), + Instant.ofEpochMilli(it.emittedAtMs), + Instant.ofEpochMilli(System.currentTimeMillis()), + stream.generationId, + it.data as ObjectValue, + OutputRecord.Meta(changes = it.meta?.changes, syncId = stream.syncId), + ) + ) + } + PersistedBatch(batch.records) + } + is PersistedBatch -> SimpleBatch(state = Batch.State.COMPLETE) + else -> throw IllegalStateException("Unexpected batch type: $batch") + } + } + + companion object { + fun getFilename(stream: DestinationStream.Descriptor) = + getFilename(stream.namespace, stream.name) + fun getFilename(namespace: String?, name: String) = "(${namespace},${name})" + } +} diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/resources/metadata.yaml b/airbyte-cdk/bulk/core/load/src/integrationTest/resources/metadata.yaml new file mode 100644 index 000000000000..3424155343d1 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/resources/metadata.yaml @@ -0,0 +1,5 @@ +# This is a minimal metadata.yaml that allows a destination connector to run. +# A real metadata.yaml obviously contains much more stuff, but we don't strictly +# need any of it at runtime. +data: + dockerRepository: "airbyte/fake-destination" diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/test/util/RecordDifferTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/test/util/RecordDifferTest.kt index ae55eabb84a4..2c1183dfa0d0 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/test/util/RecordDifferTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/test/util/RecordDifferTest.kt @@ -58,7 +58,7 @@ class RecordDifferTest { "phone" to "1234", "email" to "charlie@example.com" ), - airbyteMeta = """{"sync_id": 12}""", + airbyteMeta = OutputRecord.Meta(syncId = 42), ), ), actualRecords = @@ -109,7 +109,7 @@ class RecordDifferTest { Missing record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00Z)): OutputRecord(rawId=null, extractedAt=1970-01-01T00:00:01.234Z, loadedAt=null, generationId=42, data=ObjectValue(values={id1=IntegerValue(value=1), id2=IntegerValue(value=100), updated_at=TimestampValue(value=1970-01-01T00:00Z), name=StringValue(value=alice), phone=StringValue(value=1234)}), airbyteMeta=null) Incorrect record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00:02Z)): generationId: Expected 42, got 41 - airbyteMeta: Expected {"sync_id":12}, got null + airbyteMeta: Expected Meta(changes=null, syncId=42), got null phone: Expected StringValue(value=1234), but was StringValue(value=5678) email: Expected StringValue(value=charlie@example.com), but was address: Expected , but was StringValue(value=1234 charlie street) diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/OutputRecord.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/OutputRecord.kt index 89e8f1a0b2d0..18ac7545d50b 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/OutputRecord.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/OutputRecord.kt @@ -4,9 +4,8 @@ package io.airbyte.cdk.test.util -import com.fasterxml.jackson.databind.JsonNode -import com.fasterxml.jackson.databind.ObjectMapper import io.airbyte.cdk.data.ObjectValue +import io.airbyte.cdk.message.DestinationRecord.Change import java.time.Instant import java.util.UUID @@ -23,8 +22,17 @@ data class OutputRecord( * Destinations _must_ filter out the airbyte_* fields from this map. */ val data: ObjectValue, - val airbyteMeta: JsonNode?, + val airbyteMeta: Meta?, ) { + /** + * Much like [io.airbyte.cdk.message.DestinationRecord.Meta], but includes the [syncId] field + * that we write to the destination. + */ + data class Meta( + val changes: List? = null, + val syncId: Long? = null, + ) + /** Utility constructor with easier types to write by hand */ constructor( rawId: String, @@ -32,14 +40,14 @@ data class OutputRecord( loadedAt: Long?, generationId: Long?, data: Map, - airbyteMeta: String?, + airbyteMeta: Meta?, ) : this( UUID.fromString(rawId), Instant.ofEpochMilli(extractedAt), loadedAt?.let { Instant.ofEpochMilli(it) }, generationId, ObjectValue.from(data), - airbyteMeta?.let { ObjectMapper().readTree(it) }, + airbyteMeta, ) /** @@ -51,13 +59,13 @@ data class OutputRecord( extractedAt: Long, generationId: Long?, data: Map, - airbyteMeta: String?, + airbyteMeta: Meta?, ) : this( null, Instant.ofEpochMilli(extractedAt), loadedAt = null, generationId, ObjectValue.from(data), - airbyteMeta?.let { ObjectMapper().readTree(it) }, + airbyteMeta, ) } diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/write/BasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/write/BasicFunctionalityIntegrationTest.kt index e06142bd34f2..7cd18ae8bda0 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/write/BasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/write/BasicFunctionalityIntegrationTest.kt @@ -20,6 +20,8 @@ import io.airbyte.cdk.test.util.NoopNameMapper import io.airbyte.cdk.test.util.OutputRecord import io.airbyte.protocol.models.v0.AirbyteConnectionStatus import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason import kotlin.test.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll @@ -73,6 +75,14 @@ abstract class BasicFunctionalityIntegrationTest( name = "test_stream", data = """{"id": 5678}""", emittedAtMs = 1234, + changes = + listOf( + DestinationRecord.Change( + field = "foo", + change = Change.NULLED, + reason = Reason.SOURCE_FIELD_SIZE_LIMITATION + ) + ) ), StreamCheckpoint( streamName = "test_stream", @@ -111,7 +121,18 @@ abstract class BasicFunctionalityIntegrationTest( extractedAt = 1234, generationId = 0, data = mapOf("id" to 5678), - airbyteMeta = """{"changes": [], "sync_id": 42}""" + airbyteMeta = + OutputRecord.Meta( + changes = + listOf( + DestinationRecord.Change( + field = "foo", + change = Change.NULLED, + reason = Reason.SOURCE_FIELD_SIZE_LIMITATION + ) + ), + syncId = 42 + ) ) ), "test_stream",