Skip to content

Commit

Permalink
Bulk load CDK: Even more tests (#47377)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Oct 28, 2024
1 parent 9106d24 commit 9dcb7e7
Show file tree
Hide file tree
Showing 10 changed files with 322 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class MockBasicFunctionalityIntegrationTest :
NoopExpectedRecordMapper,
NoopNameMapper,
isStreamSchemaRetroactive = false,
supportsDedup = true,
) {
@Test
override fun testBasicWrite() {
Expand All @@ -36,8 +37,8 @@ class MockBasicFunctionalityIntegrationTest :
}

@Test
override fun testFunkyStreamAndColumnNames() {
super.testFunkyStreamAndColumnNames()
override fun testFunkyCharacters() {
super.testFunkyCharacters()
}

@Test
Expand All @@ -54,4 +55,9 @@ class MockBasicFunctionalityIntegrationTest :
override fun testAppendSchemaEvolution() {
super.testAppendSchemaEvolution()
}

@Test
override fun testDedup() {
super.testDedup()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ package io.airbyte.cdk.load.mock_integration_test

import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.test.util.DestinationDataDumper
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.load.test.util.RecordDiffer
import java.util.concurrent.ConcurrentHashMap

object MockDestinationBackend {
Expand All @@ -17,6 +21,63 @@ object MockDestinationBackend {
getFile(filename).addAll(records)
}

fun upsert(
filename: String,
primaryKey: List<List<String>>,
cursor: List<String>,
vararg records: OutputRecord
) {
fun getField(path: List<String>, record: OutputRecord): AirbyteValue? {
var currentValue: ObjectValue = record.data
// Iterate over the path, except the final element
for (pathElement in path.subList(0, (path.size - 2).coerceAtLeast(0))) {
when (val next = currentValue.values[pathElement]) {
null,
is NullValue -> return null
!is ObjectValue -> {
throw IllegalStateException(
"Attempted to traverse field list in ${record.data} but found non-object value at $pathElement: $next"
)
}
else -> currentValue = next
}
}
return currentValue.values[path.last()]
}
fun getPk(record: OutputRecord): List<AirbyteValue?> =
primaryKey.map { pkField -> getField(pkField, record) }
fun getCursor(record: OutputRecord): AirbyteValue? = getField(cursor, record)

val file = getFile(filename)
records.forEach { incomingRecord ->
val incomingPk = getPk(incomingRecord)
// Assume that in dedup mode, we don't have duplicates - so we can just find the first
// record with the same PK as the incoming record
val existingRecord =
file.firstOrNull { RecordDiffer.comparePks(incomingPk, getPk(it)) == 0 }
if (existingRecord == null) {
file.add(incomingRecord)
} else {
val incomingCursor = getCursor(incomingRecord)
val existingCursor = getCursor(existingRecord)
val compare = RecordDiffer.valueComparator.compare(incomingCursor, existingCursor)
// If the incoming record has a later cursor,
// or the same cursor but a later extractedAt,
// then upsert. (otherwise discard the incoming record.)
if (
compare > 0 ||
(compare == 0 && incomingRecord.extractedAt > existingRecord.extractedAt)
) {
file.remove(existingRecord)
val deletion = getField(listOf("_ab_cdc_deleted_at"), incomingRecord)
if (deletion == null || deletion is NullValue) {
file.add(incomingRecord)
}
}
}
}
}

fun readFile(filename: String): List<OutputRecord> {
return getFile(filename)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.mock_integration_test

import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.message.Batch
Expand Down Expand Up @@ -50,8 +51,8 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
return when (batch) {
is LocalBatch -> {
batch.records.forEach {
MockDestinationBackend.insert(
getFilename(it.stream),
val filename = getFilename(it.stream)
val record =
OutputRecord(
UUID.randomUUID(),
Instant.ofEpochMilli(it.emittedAtMs),
Expand All @@ -63,7 +64,17 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
syncId = stream.syncId
),
)
)
val importType = stream.importType
if (importType is Dedupe) {
MockDestinationBackend.upsert(
filename,
importType.primaryKey,
importType.cursor,
record
)
} else {
MockDestinationBackend.insert(filename, record)
}
}
PersistedBatch(batch.records)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,17 @@
package io.airbyte.cdk.load.test.util

import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.DateValue
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.TimeValue
import io.airbyte.cdk.load.data.TimestampValue
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetDateTime
import java.time.OffsetTime
import kotlin.reflect.jvm.jvmName

class RecordDiffer(
Expand Down Expand Up @@ -62,12 +70,7 @@ class RecordDiffer(
)
}

// Compare each PK field in order, until we find a field that the two records differ in.
// If all the fields are equal, then these two records have the same PK.
pk1.zip(pk2)
.map { (pk1Field, pk2Field) -> valueComparator.compare(pk1Field, pk2Field) }
.firstOrNull { it != 0 }
?: 0
comparePks(pk1, pk2)
}

/**
Expand Down Expand Up @@ -235,7 +238,7 @@ class RecordDiffer(
// with it explicitly in the condition)
val expectedValue = expectedRecord.data.values[key]
val actualValue = actualRecord.data.values[key]
if (expectedValue != actualValue) {
if (valueComparator.compare(expectedValue, actualValue) != 0) {
diff.append("$key: Expected $expectedValue, but was $actualValue\n")
}
}
Expand All @@ -248,16 +251,55 @@ class RecordDiffer(
val valueComparator: Comparator<AirbyteValue> =
Comparator.nullsFirst { v1, v2 -> compare(v1!!, v2!!) }

/**
* Compare each PK field in order, until we find a field that the two records differ in. If
* all the fields are equal, then these two records have the same PK.
*/
fun comparePks(pk1: List<AirbyteValue?>, pk2: List<AirbyteValue?>) =
(pk1.zip(pk2)
.map { (pk1Field, pk2Field) -> valueComparator.compare(pk1Field, pk2Field) }
.firstOrNull { it != 0 }
?: 0)

private fun compare(v1: AirbyteValue, v2: AirbyteValue): Int {
// when comparing values of different types, just sort by their class name.
// in theory, we could check for numeric types and handle them smartly...
// that's a lot of work though
return if (v1::class != v2::class) {
v1::class.jvmName.compareTo(v2::class.jvmName)
} else {
// otherwise, just be a terrible person.
// we know these are the same type, so this is safe to do.
@Suppress("UNCHECKED_CAST") (v1 as Comparable<AirbyteValue>).compareTo(v2)
// Handle temporal types specifically, because they require explicit parsing
return when (v1) {
is DateValue ->
LocalDate.parse(v1.value)
.compareTo(LocalDate.parse((v2 as DateValue).value))
is TimeValue -> {
try {
val time1 = LocalTime.parse(v1.value)
val time2 = LocalTime.parse((v2 as TimeValue).value)
time1.compareTo(time2)
} catch (e: Exception) {
val time1 = OffsetTime.parse(v1.value)
val time2 = OffsetTime.parse((v2 as TimeValue).value)
time1.compareTo(time2)
}
}
is TimestampValue -> {
try {
val ts1 = LocalDateTime.parse(v1.value)
val ts2 = LocalDateTime.parse((v2 as TimestampValue).value)
ts1.compareTo(ts2)
} catch (e: Exception) {
val ts1 = OffsetDateTime.parse(v1.value)
val ts2 = OffsetDateTime.parse((v2 as TimestampValue).value)
ts1.compareTo(ts2)
}
}
// otherwise, just be a terrible person.
// we know these are the same type, so this is safe to do.
else ->
@Suppress("UNCHECKED_CAST") (v1 as Comparable<AirbyteValue>).compareTo(v2)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import io.micronaut.context.env.yaml.YamlPropertySourceLoader
import java.nio.file.Files
import java.nio.file.Path

const val DOCKERIZED_TEST_ENV = "DOCKERIZED_INTEGRATION_TEST"

/**
* Represents a destination process, whether running in-JVM via micronaut, or as a separate Docker
* container. The general lifecycle is:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@ import io.airbyte.protocol.models.v0.AirbyteLogMessage
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
import java.io.BufferedWriter
import java.io.OutputStreamWriter
import java.nio.file.Files
import java.nio.file.Path
import java.time.Clock
import java.util.Locale
import java.util.Scanner
import javax.inject.Singleton
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
Expand Down Expand Up @@ -238,20 +235,9 @@ class DockerizedDestination(
}
}

@Singleton
@Requires(env = [DOCKERIZED_TEST_ENV])
class DockerizedDestinationFactory(
// Note that this is not the same property as in MetadataYamlPropertySource.
// We get this because IntegrationTest manually sets "classpath:metadata.yaml"
// as a property source.
// MetadataYamlPropertySource has nothing to do with this property.
@Value("\${data.docker-repository}") val imageName: String,
// Most tests will just use micronaut to inject this.
// But some tests will want to manually instantiate an instance,
// e.g. to run an older version of the connector.
// So we just hardcode 'dev' here; manual callers can pass in
// whatever they want.
@Value("dev") val imageVersion: String,
private val imageName: String,
private val imageVersion: String,
) : DestinationProcessFactory() {
override fun createDestinationProcess(
command: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.protocol.models.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.micronaut.context.annotation.Requires
import java.io.PipedInputStream
import java.io.PipedOutputStream
import java.io.PrintWriter
import java.util.concurrent.Executors
import javax.inject.Singleton
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -93,11 +91,6 @@ class NonDockerizedDestination(
}
}

// Notably, not actually a Micronaut factory. We want to inject the actual
// factory into our tests, not a pre-instantiated destination, because we want
// to run multiple destination processes per test.
@Singleton
@Requires(notEnv = [DOCKERIZED_TEST_ENV])
class NonDockerizedDestinationFactory : DestinationProcessFactory() {
override fun createDestinationProcess(
command: String,
Expand Down
Loading

0 comments on commit 9dcb7e7

Please sign in to comment.