Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Load CDK: Add integration test using in-memory mock destination #45634

Merged
merged 5 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/publish-bulk-cdk.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ jobs:
gradle-distribution-sha-256-sum-warning: false
arguments: --scan :airbyte-cdk:bulk:bulkCdkBuild

- name: Integration test Bulk CDK
uses: burrunan/gradle-cache-action@v1
env:
CI: true
with:
job-id: bulk-cdk-publish
concurrent: true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are some read-only flags that you can set here, not sure if they're actually helpful

in any case none of this seems wrong

gradle-distribution-sha-256-sum-warning: false
arguments: --scan :airbyte-cdk:bulk:bulkCdkIntegrationTest

- name: Publish Poms and Jars to CloudRepo
uses: burrunan/gradle-cache-action@v1
env:
Expand Down
6 changes: 6 additions & 0 deletions airbyte-cdk/bulk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ allprojects {
}
}

tasks.register('bulkCdkIntegrationTest').configure {
// findByName returns the task, or null if no such task exists.
// we need this because not all submodules have an integrationTest task.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, alternatively, tasks.matching is also useful for this purpose, perhaps even preferred

dependsOn allprojects.collect {it.tasks.findByName('integrationTest')}.findAll {it != null}
}

if (buildNumberFile.exists()) {
tasks.register('bulkCdkBuild').configure {
dependsOn allprojects.collect {it.tasks.named('build')}
Expand Down
28 changes: 28 additions & 0 deletions airbyte-cdk/bulk/core/load/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
// simply declaring the source sets is sufficient to populate them with
// src/integrationTest/java+resources + src/integrationTest/kotlin.
sourceSets {
integrationTest {
}
}
kotlin {
sourceSets {
testIntegration {
}
}
}

dependencies {
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')
implementation 'org.apache.commons:commons-lang3:3.17.0'
Expand All @@ -10,3 +23,18 @@ dependencies {
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1")
implementation "org.jetbrains.kotlin:kotlin-reflect:2.0.20"
}

task integrationTest(type: Test) {
description = 'Runs the integration tests.'
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath
useJUnitPlatform()
mustRunAfter tasks.check
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this for? why not have this run as part of check? is this because of the dependency on assemble for the docker image? if it's the latter, it's better to declare that dependency explicitly with dependsOn assemble. These mustRunAfter constraints are typically not what you really want for these kinds of tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this really tripped me up; did you in fact mean to have check depend on integrationTest? if so please let me know @edgao

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc I copypasted this from stackoverflow without reading it :P I did in fact want to have check depend on integrationTest

(which I think is what I did later on, with the rootProject.check.dependsOn thing? This mustRunAfter thing probably isn't really needed - probably the assumption on SO was that integrationTest is slow, and therefore only worth running if check succeeded. Which isn't true here.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

FYI chatgpt is great at generating gradle scripts

}
configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntimeOnly.extendsFrom testRuntimeOnly
}
// These tests are lightweight enough to run on every PR.
rootProject.check.dependsOn(integrationTest)
edgao marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.mock_integration_test

import io.airbyte.cdk.test.util.NoopDestinationCleaner
import io.airbyte.cdk.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.test.util.NoopNameMapper
import io.airbyte.cdk.test.write.BasicFunctionalityIntegrationTest

class MockBasicFunctionalityIntegrationTest :
BasicFunctionalityIntegrationTest(
MockDestinationSpecification(),
MockDestinationDataDumper,
NoopDestinationCleaner,
NoopExpectedRecordMapper,
NoopNameMapper
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.mock_integration_test

import io.airbyte.cdk.test.util.DestinationDataDumper
import io.airbyte.cdk.test.util.OutputRecord
import java.util.concurrent.ConcurrentHashMap

object MockDestinationBackend {
private val files: MutableMap<String, MutableList<OutputRecord>> = ConcurrentHashMap()

fun insert(filename: String, vararg records: OutputRecord) {
getFile(filename).addAll(records)
}

fun readFile(filename: String): List<OutputRecord> {
return getFile(filename)
}

private fun getFile(filename: String): MutableList<OutputRecord> {
return files.getOrPut(filename) { mutableListOf() }
}
}

object MockDestinationDataDumper : DestinationDataDumper {
override fun dumpRecords(streamName: String, streamNamespace: String?): List<OutputRecord> {
return MockDestinationBackend.readFile(
MockStreamLoader.getFilename(streamNamespace, streamName)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.mock_integration_test

import io.airbyte.cdk.check.DestinationChecker
import javax.inject.Singleton

@Singleton
class MockDestinationChecker : DestinationChecker<MockDestinationConfiguration> {
override fun check(config: MockDestinationConfiguration) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.mock_integration_test

import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.DestinationConfiguration
import io.airbyte.cdk.command.DestinationConfigurationFactory
import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton

class MockDestinationConfiguration : DestinationConfiguration()

@Singleton class MockDestinationSpecification : ConfigurationSpecification()

@Singleton
class MockDestinationConfigurationFactory :
DestinationConfigurationFactory<MockDestinationSpecification, MockDestinationConfiguration> {
edgao marked this conversation as resolved.
Show resolved Hide resolved

override fun makeWithoutExceptionHandling(
pojo: MockDestinationSpecification
): MockDestinationConfiguration {
return MockDestinationConfiguration()
}
}

@Factory
class MockDestinationConfigurationProvider(private val config: DestinationConfiguration) {
edgao marked this conversation as resolved.
Show resolved Hide resolved
@Singleton
fun get(): MockDestinationConfiguration {
return config as MockDestinationConfiguration
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.mock_integration_test

import io.airbyte.cdk.command.DestinationStream
import io.airbyte.cdk.data.ObjectValue
import io.airbyte.cdk.message.Batch
import io.airbyte.cdk.message.DestinationRecord
import io.airbyte.cdk.message.SimpleBatch
import io.airbyte.cdk.test.util.OutputRecord
import io.airbyte.cdk.write.DestinationWriter
import io.airbyte.cdk.write.StreamLoader
import java.time.Instant
import java.util.UUID
import javax.inject.Singleton

@Singleton
class MockDestinationWriter : DestinationWriter {
override fun createStreamLoader(stream: DestinationStream): StreamLoader {
return MockStreamLoader(stream)
}
}

class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
data class LocalBatch(val records: List<DestinationRecord>) : Batch {
override val state = Batch.State.LOCAL
}
data class PersistedBatch(val records: List<DestinationRecord>) : Batch {
override val state = Batch.State.PERSISTED
}

override suspend fun processRecords(
edgao marked this conversation as resolved.
Show resolved Hide resolved
records: Iterator<DestinationRecord>,
totalSizeBytes: Long
): Batch {
return LocalBatch(records.asSequence().toList())
}

override suspend fun processBatch(batch: Batch): Batch {
return when (batch) {
is LocalBatch -> {
batch.records.forEach {
MockDestinationBackend.insert(
getFilename(it.stream),
OutputRecord(
UUID.randomUUID(),
Instant.ofEpochMilli(it.emittedAtMs),
Instant.ofEpochMilli(System.currentTimeMillis()),
stream.generationId,
it.data as ObjectValue,
OutputRecord.Meta(changes = it.meta?.changes, syncId = stream.syncId),
)
)
}
PersistedBatch(batch.records)
}
is PersistedBatch -> SimpleBatch(state = Batch.State.COMPLETE)
else -> throw IllegalStateException("Unexpected batch type: $batch")
}
}

companion object {
fun getFilename(stream: DestinationStream.Descriptor) =
getFilename(stream.namespace, stream.name)
fun getFilename(namespace: String?, name: String) = "(${namespace},${name})"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# This is a minimal metadata.yaml that allows a destination connector to run.
# A real metadata.yaml obviously contains much more stuff, but we don't strictly
# need any of it at runtime.
data:
dockerRepository: "airbyte/fake-destination"
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class RecordDifferTest {
"phone" to "1234",
"email" to "charlie@example.com"
),
airbyteMeta = """{"sync_id": 12}""",
airbyteMeta = OutputRecord.Meta(syncId = 42),
),
),
actualRecords =
Expand Down Expand Up @@ -109,7 +109,7 @@ class RecordDifferTest {
Missing record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00Z)): OutputRecord(rawId=null, extractedAt=1970-01-01T00:00:01.234Z, loadedAt=null, generationId=42, data=ObjectValue(values={id1=IntegerValue(value=1), id2=IntegerValue(value=100), updated_at=TimestampValue(value=1970-01-01T00:00Z), name=StringValue(value=alice), phone=StringValue(value=1234)}), airbyteMeta=null)
Incorrect record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00:02Z)):
generationId: Expected 42, got 41
airbyteMeta: Expected {"sync_id":12}, got null
airbyteMeta: Expected Meta(changes=null, syncId=42), got null
phone: Expected StringValue(value=1234), but was StringValue(value=5678)
email: Expected StringValue(value=charlie@example.com), but was <unset>
address: Expected <unset>, but was StringValue(value=1234 charlie street)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@

package io.airbyte.cdk.test.util

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import io.airbyte.cdk.data.ObjectValue
import io.airbyte.cdk.message.DestinationRecord.Change
import java.time.Instant
import java.util.UUID

Expand All @@ -23,23 +22,32 @@ data class OutputRecord(
* Destinations _must_ filter out the airbyte_* fields from this map.
*/
val data: ObjectValue,
val airbyteMeta: JsonNode?,
val airbyteMeta: Meta?,
) {
/**
* Much like [io.airbyte.cdk.message.DestinationRecord.Meta], but includes the [syncId] field
* that we write to the destination.
*/
data class Meta(
val changes: List<Change>? = null,
val syncId: Long? = null,
)

/** Utility constructor with easier types to write by hand */
constructor(
rawId: String,
extractedAt: Long,
loadedAt: Long?,
generationId: Long?,
data: Map<String, Any?>,
airbyteMeta: String?,
airbyteMeta: Meta?,
) : this(
UUID.fromString(rawId),
Instant.ofEpochMilli(extractedAt),
loadedAt?.let { Instant.ofEpochMilli(it) },
generationId,
ObjectValue.from(data),
airbyteMeta?.let { ObjectMapper().readTree(it) },
airbyteMeta,
)

/**
Expand All @@ -51,13 +59,13 @@ data class OutputRecord(
extractedAt: Long,
generationId: Long?,
data: Map<String, Any?>,
airbyteMeta: String?,
airbyteMeta: Meta?,
) : this(
null,
Instant.ofEpochMilli(extractedAt),
loadedAt = null,
generationId,
ObjectValue.from(data),
airbyteMeta?.let { ObjectMapper().readTree(it) },
airbyteMeta,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import io.airbyte.cdk.test.util.NoopNameMapper
import io.airbyte.cdk.test.util.OutputRecord
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason
import kotlin.test.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
Expand Down Expand Up @@ -73,6 +75,14 @@ abstract class BasicFunctionalityIntegrationTest(
name = "test_stream",
data = """{"id": 5678}""",
emittedAtMs = 1234,
changes =
edgao marked this conversation as resolved.
Show resolved Hide resolved
listOf(
DestinationRecord.Change(
field = "foo",
change = Change.NULLED,
reason = Reason.SOURCE_FIELD_SIZE_LIMITATION
)
)
),
StreamCheckpoint(
streamName = "test_stream",
Expand Down Expand Up @@ -111,7 +121,18 @@ abstract class BasicFunctionalityIntegrationTest(
extractedAt = 1234,
generationId = 0,
data = mapOf("id" to 5678),
airbyteMeta = """{"changes": [], "sync_id": 42}"""
airbyteMeta =
OutputRecord.Meta(
changes =
listOf(
DestinationRecord.Change(
field = "foo",
change = Change.NULLED,
reason = Reason.SOURCE_FIELD_SIZE_LIMITATION
)
),
syncId = 42
)
)
),
"test_stream",
Expand Down
Loading