Skip to content

Commit

Permalink
Bulk Load CDK: Add integration test using in-memory mock destination (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Oct 3, 2024
1 parent 45cf615 commit 4c680b4
Show file tree
Hide file tree
Showing 12 changed files with 256 additions and 10 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/publish-bulk-cdk.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ jobs:
gradle-distribution-sha-256-sum-warning: false
arguments: --scan :airbyte-cdk:bulk:bulkCdkBuild

- name: Integration test Bulk CDK
uses: burrunan/gradle-cache-action@v1
env:
CI: true
with:
job-id: bulk-cdk-publish
concurrent: true
gradle-distribution-sha-256-sum-warning: false
arguments: --scan :airbyte-cdk:bulk:bulkCdkIntegrationTest

- name: Publish Poms and Jars to CloudRepo
uses: burrunan/gradle-cache-action@v1
env:
Expand Down
6 changes: 6 additions & 0 deletions airbyte-cdk/bulk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ allprojects {
}
}

tasks.register('bulkCdkIntegrationTest').configure {
// findByName returns the task, or null if no such task exists.
// we need this because not all submodules have an integrationTest task.
dependsOn allprojects.collect {it.tasks.findByName('integrationTest')}.findAll {it != null}
}

if (buildNumberFile.exists()) {
tasks.register('bulkCdkBuild').configure {
dependsOn allprojects.collect {it.tasks.named('build')}
Expand Down
28 changes: 28 additions & 0 deletions airbyte-cdk/bulk/core/load/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
// simply declaring the source sets is sufficient to populate them with
// src/integrationTest/java+resources + src/integrationTest/kotlin.
sourceSets {
integrationTest {
}
}
kotlin {
sourceSets {
testIntegration {
}
}
}

dependencies {
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')
implementation 'org.apache.commons:commons-lang3:3.17.0'
Expand All @@ -10,3 +23,18 @@ dependencies {
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1")
implementation "org.jetbrains.kotlin:kotlin-reflect:2.0.20"
}

task integrationTest(type: Test) {
description = 'Runs the integration tests.'
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath
useJUnitPlatform()
mustRunAfter tasks.check
}
configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntimeOnly.extendsFrom testRuntimeOnly
}
// These tests are lightweight enough to run on every PR.
rootProject.check.dependsOn(integrationTest)
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.mock_integration_test

import io.airbyte.cdk.test.util.NoopDestinationCleaner
import io.airbyte.cdk.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.test.util.NoopNameMapper
import io.airbyte.cdk.test.write.BasicFunctionalityIntegrationTest

class MockBasicFunctionalityIntegrationTest :
BasicFunctionalityIntegrationTest(
MockDestinationSpecification(),
MockDestinationDataDumper,
NoopDestinationCleaner,
NoopExpectedRecordMapper,
NoopNameMapper
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.mock_integration_test

import io.airbyte.cdk.test.util.DestinationDataDumper
import io.airbyte.cdk.test.util.OutputRecord
import java.util.concurrent.ConcurrentHashMap

object MockDestinationBackend {
private val files: MutableMap<String, MutableList<OutputRecord>> = ConcurrentHashMap()

fun insert(filename: String, vararg records: OutputRecord) {
getFile(filename).addAll(records)
}

fun readFile(filename: String): List<OutputRecord> {
return getFile(filename)
}

private fun getFile(filename: String): MutableList<OutputRecord> {
return files.getOrPut(filename) { mutableListOf() }
}
}

object MockDestinationDataDumper : DestinationDataDumper {
override fun dumpRecords(streamName: String, streamNamespace: String?): List<OutputRecord> {
return MockDestinationBackend.readFile(
MockStreamLoader.getFilename(streamNamespace, streamName)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.mock_integration_test

import io.airbyte.cdk.check.DestinationChecker
import javax.inject.Singleton

@Singleton
class MockDestinationChecker : DestinationChecker<MockDestinationConfiguration> {
override fun check(config: MockDestinationConfiguration) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.mock_integration_test

import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.DestinationConfiguration
import io.airbyte.cdk.command.DestinationConfigurationFactory
import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton

class MockDestinationConfiguration : DestinationConfiguration()

@Singleton class MockDestinationSpecification : ConfigurationSpecification()

@Singleton
class MockDestinationConfigurationFactory :
DestinationConfigurationFactory<MockDestinationSpecification, MockDestinationConfiguration> {

override fun makeWithoutExceptionHandling(
pojo: MockDestinationSpecification
): MockDestinationConfiguration {
return MockDestinationConfiguration()
}
}

@Factory
class MockDestinationConfigurationProvider(private val config: DestinationConfiguration) {
@Singleton
fun get(): MockDestinationConfiguration {
return config as MockDestinationConfiguration
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.mock_integration_test

import io.airbyte.cdk.command.DestinationStream
import io.airbyte.cdk.data.ObjectValue
import io.airbyte.cdk.message.Batch
import io.airbyte.cdk.message.DestinationRecord
import io.airbyte.cdk.message.SimpleBatch
import io.airbyte.cdk.test.util.OutputRecord
import io.airbyte.cdk.write.DestinationWriter
import io.airbyte.cdk.write.StreamLoader
import java.time.Instant
import java.util.UUID
import javax.inject.Singleton

@Singleton
class MockDestinationWriter : DestinationWriter {
override fun createStreamLoader(stream: DestinationStream): StreamLoader {
return MockStreamLoader(stream)
}
}

class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
data class LocalBatch(val records: List<DestinationRecord>) : Batch {
override val state = Batch.State.LOCAL
}
data class PersistedBatch(val records: List<DestinationRecord>) : Batch {
override val state = Batch.State.PERSISTED
}

override suspend fun processRecords(
records: Iterator<DestinationRecord>,
totalSizeBytes: Long
): Batch {
return LocalBatch(records.asSequence().toList())
}

override suspend fun processBatch(batch: Batch): Batch {
return when (batch) {
is LocalBatch -> {
batch.records.forEach {
MockDestinationBackend.insert(
getFilename(it.stream),
OutputRecord(
UUID.randomUUID(),
Instant.ofEpochMilli(it.emittedAtMs),
Instant.ofEpochMilli(System.currentTimeMillis()),
stream.generationId,
it.data as ObjectValue,
OutputRecord.Meta(changes = it.meta?.changes, syncId = stream.syncId),
)
)
}
PersistedBatch(batch.records)
}
is PersistedBatch -> SimpleBatch(state = Batch.State.COMPLETE)
else -> throw IllegalStateException("Unexpected batch type: $batch")
}
}

companion object {
fun getFilename(stream: DestinationStream.Descriptor) =
getFilename(stream.namespace, stream.name)
fun getFilename(namespace: String?, name: String) = "(${namespace},${name})"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# This is a minimal metadata.yaml that allows a destination connector to run.
# A real metadata.yaml obviously contains much more stuff, but we don't strictly
# need any of it at runtime.
data:
dockerRepository: "airbyte/fake-destination"
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class RecordDifferTest {
"phone" to "1234",
"email" to "charlie@example.com"
),
airbyteMeta = """{"sync_id": 12}""",
airbyteMeta = OutputRecord.Meta(syncId = 42),
),
),
actualRecords =
Expand Down Expand Up @@ -109,7 +109,7 @@ class RecordDifferTest {
Missing record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00Z)): OutputRecord(rawId=null, extractedAt=1970-01-01T00:00:01.234Z, loadedAt=null, generationId=42, data=ObjectValue(values={id1=IntegerValue(value=1), id2=IntegerValue(value=100), updated_at=TimestampValue(value=1970-01-01T00:00Z), name=StringValue(value=alice), phone=StringValue(value=1234)}), airbyteMeta=null)
Incorrect record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00:02Z)):
generationId: Expected 42, got 41
airbyteMeta: Expected {"sync_id":12}, got null
airbyteMeta: Expected Meta(changes=null, syncId=42), got null
phone: Expected StringValue(value=1234), but was StringValue(value=5678)
email: Expected StringValue(value=charlie@example.com), but was <unset>
address: Expected <unset>, but was StringValue(value=1234 charlie street)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@

package io.airbyte.cdk.test.util

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import io.airbyte.cdk.data.ObjectValue
import io.airbyte.cdk.message.DestinationRecord.Change
import java.time.Instant
import java.util.UUID

Expand All @@ -23,23 +22,32 @@ data class OutputRecord(
* Destinations _must_ filter out the airbyte_* fields from this map.
*/
val data: ObjectValue,
val airbyteMeta: JsonNode?,
val airbyteMeta: Meta?,
) {
/**
* Much like [io.airbyte.cdk.message.DestinationRecord.Meta], but includes the [syncId] field
* that we write to the destination.
*/
data class Meta(
val changes: List<Change>? = null,
val syncId: Long? = null,
)

/** Utility constructor with easier types to write by hand */
constructor(
rawId: String,
extractedAt: Long,
loadedAt: Long?,
generationId: Long?,
data: Map<String, Any?>,
airbyteMeta: String?,
airbyteMeta: Meta?,
) : this(
UUID.fromString(rawId),
Instant.ofEpochMilli(extractedAt),
loadedAt?.let { Instant.ofEpochMilli(it) },
generationId,
ObjectValue.from(data),
airbyteMeta?.let { ObjectMapper().readTree(it) },
airbyteMeta,
)

/**
Expand All @@ -51,13 +59,13 @@ data class OutputRecord(
extractedAt: Long,
generationId: Long?,
data: Map<String, Any?>,
airbyteMeta: String?,
airbyteMeta: Meta?,
) : this(
null,
Instant.ofEpochMilli(extractedAt),
loadedAt = null,
generationId,
ObjectValue.from(data),
airbyteMeta?.let { ObjectMapper().readTree(it) },
airbyteMeta,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import io.airbyte.cdk.test.util.NoopNameMapper
import io.airbyte.cdk.test.util.OutputRecord
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason
import kotlin.test.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
Expand Down Expand Up @@ -73,6 +75,14 @@ abstract class BasicFunctionalityIntegrationTest(
name = "test_stream",
data = """{"id": 5678}""",
emittedAtMs = 1234,
changes =
listOf(
DestinationRecord.Change(
field = "foo",
change = Change.NULLED,
reason = Reason.SOURCE_FIELD_SIZE_LIMITATION
)
)
),
StreamCheckpoint(
streamName = "test_stream",
Expand Down Expand Up @@ -111,7 +121,18 @@ abstract class BasicFunctionalityIntegrationTest(
extractedAt = 1234,
generationId = 0,
data = mapOf("id" to 5678),
airbyteMeta = """{"changes": [], "sync_id": 42}"""
airbyteMeta =
OutputRecord.Meta(
changes =
listOf(
DestinationRecord.Change(
field = "foo",
change = Change.NULLED,
reason = Reason.SOURCE_FIELD_SIZE_LIMITATION
)
),
syncId = 42
)
)
),
"test_stream",
Expand Down

0 comments on commit 4c680b4

Please sign in to comment.