Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk load CDK: Even more tests #47377

Merged
merged 3 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class MockBasicFunctionalityIntegrationTest :
NoopExpectedRecordMapper,
NoopNameMapper,
isStreamSchemaRetroactive = false,
supportsDedup = true,
) {
@Test
override fun testBasicWrite() {
Expand All @@ -36,8 +37,8 @@ class MockBasicFunctionalityIntegrationTest :
}

@Test
override fun testFunkyStreamAndColumnNames() {
super.testFunkyStreamAndColumnNames()
override fun testFunkyCharacters() {
super.testFunkyCharacters()
}

@Test
Expand All @@ -54,4 +55,9 @@ class MockBasicFunctionalityIntegrationTest :
override fun testAppendSchemaEvolution() {
super.testAppendSchemaEvolution()
}

@Test
override fun testDedup() {
super.testDedup()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ package io.airbyte.cdk.load.mock_integration_test

import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.test.util.DestinationDataDumper
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.load.test.util.RecordDiffer
import java.util.concurrent.ConcurrentHashMap

object MockDestinationBackend {
Expand All @@ -17,6 +21,63 @@ object MockDestinationBackend {
getFile(filename).addAll(records)
}

fun upsert(
filename: String,
primaryKey: List<List<String>>,
cursor: List<String>,
vararg records: OutputRecord
) {
fun getField(path: List<String>, record: OutputRecord): AirbyteValue? {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most (all?) destinations don't actually support traversing a jsonpath to fetch a cursor/PK, but it's easy enough to implement in java 🤷

var currentValue: ObjectValue = record.data
// Iterate over the path, except the final element
for (pathElement in path.subList(0, (path.size - 2).coerceAtLeast(0))) {
when (val next = currentValue.values[pathElement]) {
null,
is NullValue -> return null
!is ObjectValue -> {
throw IllegalStateException(
"Attempted to traverse field list in ${record.data} but found non-object value at $pathElement: $next"
)
}
else -> currentValue = next
}
}
return currentValue.values[path.last()]
}
fun getPk(record: OutputRecord): List<AirbyteValue?> =
primaryKey.map { pkField -> getField(pkField, record) }
fun getCursor(record: OutputRecord): AirbyteValue? = getField(cursor, record)

val file = getFile(filename)
records.forEach { incomingRecord ->
val incomingPk = getPk(incomingRecord)
// Assume that in dedup mode, we don't have duplicates - so we can just find the first
// record with the same PK as the incoming record
val existingRecord =
file.firstOrNull { RecordDiffer.comparePks(incomingPk, getPk(it)) == 0 }
if (existingRecord == null) {
file.add(incomingRecord)
} else {
val incomingCursor = getCursor(incomingRecord)
val existingCursor = getCursor(existingRecord)
val compare = RecordDiffer.valueComparator.compare(incomingCursor, existingCursor)
// If the incoming record has a later cursor,
// or the same cursor but a later extractedAt,
// then upsert. (otherwise discard the incoming record.)
if (
compare > 0 ||
(compare == 0 && incomingRecord.extractedAt > existingRecord.extractedAt)
) {
file.remove(existingRecord)
val deletion = getField(listOf("_ab_cdc_deleted_at"), incomingRecord)
if (deletion == null || deletion is NullValue) {
file.add(incomingRecord)
}
}
}
}
}

fun readFile(filename: String): List<OutputRecord> {
return getFile(filename)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.mock_integration_test

import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.message.Batch
Expand Down Expand Up @@ -50,8 +51,8 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
return when (batch) {
is LocalBatch -> {
batch.records.forEach {
MockDestinationBackend.insert(
getFilename(it.stream),
val filename = getFilename(it.stream)
val record =
OutputRecord(
UUID.randomUUID(),
Instant.ofEpochMilli(it.emittedAtMs),
Expand All @@ -63,7 +64,17 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
syncId = stream.syncId
),
)
)
val importType = stream.importType
if (importType is Dedupe) {
MockDestinationBackend.upsert(
filename,
importType.primaryKey,
importType.cursor,
record
)
} else {
MockDestinationBackend.insert(filename, record)
}
}
PersistedBatch(batch.records)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,17 @@
package io.airbyte.cdk.load.test.util

import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.DateValue
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.TimeValue
import io.airbyte.cdk.load.data.TimestampValue
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetDateTime
import java.time.OffsetTime
import kotlin.reflect.jvm.jvmName

class RecordDiffer(
Expand Down Expand Up @@ -62,12 +70,7 @@ class RecordDiffer(
)
}

// Compare each PK field in order, until we find a field that the two records differ in.
// If all the fields are equal, then these two records have the same PK.
pk1.zip(pk2)
.map { (pk1Field, pk2Field) -> valueComparator.compare(pk1Field, pk2Field) }
.firstOrNull { it != 0 }
?: 0
comparePks(pk1, pk2)
}

/**
Expand Down Expand Up @@ -235,7 +238,7 @@ class RecordDiffer(
// with it explicitly in the condition)
val expectedValue = expectedRecord.data.values[key]
val actualValue = actualRecord.data.values[key]
if (expectedValue != actualValue) {
if (valueComparator.compare(expectedValue, actualValue) != 0) {
diff.append("$key: Expected $expectedValue, but was $actualValue\n")
}
}
Expand All @@ -248,16 +251,55 @@ class RecordDiffer(
val valueComparator: Comparator<AirbyteValue> =
Comparator.nullsFirst { v1, v2 -> compare(v1!!, v2!!) }

/**
* Compare each PK field in order, until we find a field that the two records differ in. If
* all the fields are equal, then these two records have the same PK.
*/
fun comparePks(pk1: List<AirbyteValue?>, pk2: List<AirbyteValue?>) =
(pk1.zip(pk2)
.map { (pk1Field, pk2Field) -> valueComparator.compare(pk1Field, pk2Field) }
.firstOrNull { it != 0 }
?: 0)

private fun compare(v1: AirbyteValue, v2: AirbyteValue): Int {
// when comparing values of different types, just sort by their class name.
// in theory, we could check for numeric types and handle them smartly...
// that's a lot of work though
return if (v1::class != v2::class) {
v1::class.jvmName.compareTo(v2::class.jvmName)
} else {
// otherwise, just be a terrible person.
// we know these are the same type, so this is safe to do.
@Suppress("UNCHECKED_CAST") (v1 as Comparable<AirbyteValue>).compareTo(v2)
// Handle temporal types specifically, because they require explicit parsing
return when (v1) {
is DateValue ->
LocalDate.parse(v1.value)
.compareTo(LocalDate.parse((v2 as DateValue).value))
is TimeValue -> {
try {
val time1 = LocalTime.parse(v1.value)
val time2 = LocalTime.parse((v2 as TimeValue).value)
time1.compareTo(time2)
} catch (e: Exception) {
val time1 = OffsetTime.parse(v1.value)
val time2 = OffsetTime.parse((v2 as TimeValue).value)
time1.compareTo(time2)
}
}
is TimestampValue -> {
try {
val ts1 = LocalDateTime.parse(v1.value)
val ts2 = LocalDateTime.parse((v2 as TimestampValue).value)
ts1.compareTo(ts2)
} catch (e: Exception) {
val ts1 = OffsetDateTime.parse(v1.value)
val ts2 = OffsetDateTime.parse((v2 as TimestampValue).value)
ts1.compareTo(ts2)
}
}
// otherwise, just be a terrible person.
// we know these are the same type, so this is safe to do.
else ->
@Suppress("UNCHECKED_CAST") (v1 as Comparable<AirbyteValue>).compareTo(v2)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import io.micronaut.context.env.yaml.YamlPropertySourceLoader
import java.nio.file.Files
import java.nio.file.Path

const val DOCKERIZED_TEST_ENV = "DOCKERIZED_INTEGRATION_TEST"

/**
* Represents a destination process, whether running in-JVM via micronaut, or as a separate Docker
* container. The general lifecycle is:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@ import io.airbyte.protocol.models.v0.AirbyteLogMessage
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
import java.io.BufferedWriter
import java.io.OutputStreamWriter
import java.nio.file.Files
import java.nio.file.Path
import java.time.Clock
import java.util.Locale
import java.util.Scanner
import javax.inject.Singleton
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
Expand Down Expand Up @@ -238,20 +235,9 @@ class DockerizedDestination(
}
}

@Singleton
@Requires(env = [DOCKERIZED_TEST_ENV])
class DockerizedDestinationFactory(
// Note that this is not the same property as in MetadataYamlPropertySource.
// We get this because IntegrationTest manually sets "classpath:metadata.yaml"
// as a property source.
// MetadataYamlPropertySource has nothing to do with this property.
@Value("\${data.docker-repository}") val imageName: String,
// Most tests will just use micronaut to inject this.
// But some tests will want to manually instantiate an instance,
// e.g. to run an older version of the connector.
// So we just hardcode 'dev' here; manual callers can pass in
// whatever they want.
@Value("dev") val imageVersion: String,
private val imageName: String,
private val imageVersion: String,
) : DestinationProcessFactory() {
override fun createDestinationProcess(
command: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.protocol.models.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.micronaut.context.annotation.Requires
import java.io.PipedInputStream
import java.io.PipedOutputStream
import java.io.PrintWriter
import java.util.concurrent.Executors
import javax.inject.Singleton
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -93,11 +91,6 @@ class NonDockerizedDestination(
}
}

// Notably, not actually a Micronaut factory. We want to inject the actual
// factory into our tests, not a pre-instantiated destination, because we want
// to run multiple destination processes per test.
@Singleton
@Requires(notEnv = [DOCKERIZED_TEST_ENV])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because docker-v-nondocker is now controlled by an explicit env variable check in the process factory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, exactly. I just forgot to remove these annotations in #47006

class NonDockerizedDestinationFactory : DestinationProcessFactory() {
override fun createDestinationProcess(
command: String,
Expand Down
Loading
Loading