diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt index 5093fab4cf29..361eb2156a10 100644 --- a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt @@ -19,6 +19,7 @@ class MockBasicFunctionalityIntegrationTest : NoopExpectedRecordMapper, NoopNameMapper, isStreamSchemaRetroactive = false, + supportsDedup = true, ) { @Test override fun testBasicWrite() { @@ -36,8 +37,8 @@ class MockBasicFunctionalityIntegrationTest : } @Test - override fun testFunkyStreamAndColumnNames() { - super.testFunkyStreamAndColumnNames() + override fun testFunkyCharacters() { + super.testFunkyCharacters() } @Test @@ -54,4 +55,9 @@ class MockBasicFunctionalityIntegrationTest : override fun testAppendSchemaEvolution() { super.testAppendSchemaEvolution() } + + @Test + override fun testDedup() { + super.testDedup() + } } diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationBackend.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationBackend.kt index 11cbd75f644a..174d55aad58b 100644 --- a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationBackend.kt +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationBackend.kt @@ -6,8 +6,12 @@ package io.airbyte.cdk.load.mock_integration_test import io.airbyte.cdk.command.ConfigurationSpecification import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.NullValue +import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.test.util.DestinationDataDumper import io.airbyte.cdk.load.test.util.OutputRecord +import io.airbyte.cdk.load.test.util.RecordDiffer import java.util.concurrent.ConcurrentHashMap object MockDestinationBackend { @@ -17,6 +21,63 @@ object MockDestinationBackend { getFile(filename).addAll(records) } + fun upsert( + filename: String, + primaryKey: List>, + cursor: List, + vararg records: OutputRecord + ) { + fun getField(path: List, record: OutputRecord): AirbyteValue? { + var currentValue: ObjectValue = record.data + // Iterate over the path, except the final element + for (pathElement in path.subList(0, (path.size - 2).coerceAtLeast(0))) { + when (val next = currentValue.values[pathElement]) { + null, + is NullValue -> return null + !is ObjectValue -> { + throw IllegalStateException( + "Attempted to traverse field list in ${record.data} but found non-object value at $pathElement: $next" + ) + } + else -> currentValue = next + } + } + return currentValue.values[path.last()] + } + fun getPk(record: OutputRecord): List = + primaryKey.map { pkField -> getField(pkField, record) } + fun getCursor(record: OutputRecord): AirbyteValue? = getField(cursor, record) + + val file = getFile(filename) + records.forEach { incomingRecord -> + val incomingPk = getPk(incomingRecord) + // Assume that in dedup mode, we don't have duplicates - so we can just find the first + // record with the same PK as the incoming record + val existingRecord = + file.firstOrNull { RecordDiffer.comparePks(incomingPk, getPk(it)) == 0 } + if (existingRecord == null) { + file.add(incomingRecord) + } else { + val incomingCursor = getCursor(incomingRecord) + val existingCursor = getCursor(existingRecord) + val compare = RecordDiffer.valueComparator.compare(incomingCursor, existingCursor) + // If the incoming record has a later cursor, + // or the same cursor but a later extractedAt, + // then upsert. (otherwise discard the incoming record.) + if ( + compare > 0 || + (compare == 0 && incomingRecord.extractedAt > existingRecord.extractedAt) + ) { + file.remove(existingRecord) + val deletion = getField(listOf("_ab_cdc_deleted_at"), incomingRecord) + if (deletion == null || deletion is NullValue) { + file.add(incomingRecord) + } + } + } + } + } + fun readFile(filename: String): List { return getFile(filename) } diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt index 35219fd4ef74..e1524187f2ba 100644 --- a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt @@ -4,6 +4,7 @@ package io.airbyte.cdk.load.mock_integration_test +import io.airbyte.cdk.load.command.Dedupe import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.message.Batch @@ -50,8 +51,8 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader { return when (batch) { is LocalBatch -> { batch.records.forEach { - MockDestinationBackend.insert( - getFilename(it.stream), + val filename = getFilename(it.stream) + val record = OutputRecord( UUID.randomUUID(), Instant.ofEpochMilli(it.emittedAtMs), @@ -63,7 +64,17 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader { syncId = stream.syncId ), ) - ) + val importType = stream.importType + if (importType is Dedupe) { + MockDestinationBackend.upsert( + filename, + importType.primaryKey, + importType.cursor, + record + ) + } else { + MockDestinationBackend.insert(filename, record) + } } PersistedBatch(batch.records) } diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt index ce1b70262532..b5e260c8268a 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt @@ -5,9 +5,17 @@ package io.airbyte.cdk.load.test.util import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.DateValue import io.airbyte.cdk.load.data.IntegerValue import io.airbyte.cdk.load.data.NullValue import io.airbyte.cdk.load.data.ObjectValue +import io.airbyte.cdk.load.data.TimeValue +import io.airbyte.cdk.load.data.TimestampValue +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.LocalTime +import java.time.OffsetDateTime +import java.time.OffsetTime import kotlin.reflect.jvm.jvmName class RecordDiffer( @@ -62,12 +70,7 @@ class RecordDiffer( ) } - // Compare each PK field in order, until we find a field that the two records differ in. - // If all the fields are equal, then these two records have the same PK. - pk1.zip(pk2) - .map { (pk1Field, pk2Field) -> valueComparator.compare(pk1Field, pk2Field) } - .firstOrNull { it != 0 } - ?: 0 + comparePks(pk1, pk2) } /** @@ -235,7 +238,7 @@ class RecordDiffer( // with it explicitly in the condition) val expectedValue = expectedRecord.data.values[key] val actualValue = actualRecord.data.values[key] - if (expectedValue != actualValue) { + if (valueComparator.compare(expectedValue, actualValue) != 0) { diff.append("$key: Expected $expectedValue, but was $actualValue\n") } } @@ -248,6 +251,16 @@ class RecordDiffer( val valueComparator: Comparator = Comparator.nullsFirst { v1, v2 -> compare(v1!!, v2!!) } + /** + * Compare each PK field in order, until we find a field that the two records differ in. If + * all the fields are equal, then these two records have the same PK. + */ + fun comparePks(pk1: List, pk2: List) = + (pk1.zip(pk2) + .map { (pk1Field, pk2Field) -> valueComparator.compare(pk1Field, pk2Field) } + .firstOrNull { it != 0 } + ?: 0) + private fun compare(v1: AirbyteValue, v2: AirbyteValue): Int { // when comparing values of different types, just sort by their class name. // in theory, we could check for numeric types and handle them smartly... @@ -255,9 +268,38 @@ class RecordDiffer( return if (v1::class != v2::class) { v1::class.jvmName.compareTo(v2::class.jvmName) } else { - // otherwise, just be a terrible person. - // we know these are the same type, so this is safe to do. - @Suppress("UNCHECKED_CAST") (v1 as Comparable).compareTo(v2) + // Handle temporal types specifically, because they require explicit parsing + return when (v1) { + is DateValue -> + LocalDate.parse(v1.value) + .compareTo(LocalDate.parse((v2 as DateValue).value)) + is TimeValue -> { + try { + val time1 = LocalTime.parse(v1.value) + val time2 = LocalTime.parse((v2 as TimeValue).value) + time1.compareTo(time2) + } catch (e: Exception) { + val time1 = OffsetTime.parse(v1.value) + val time2 = OffsetTime.parse((v2 as TimeValue).value) + time1.compareTo(time2) + } + } + is TimestampValue -> { + try { + val ts1 = LocalDateTime.parse(v1.value) + val ts2 = LocalDateTime.parse((v2 as TimestampValue).value) + ts1.compareTo(ts2) + } catch (e: Exception) { + val ts1 = OffsetDateTime.parse(v1.value) + val ts2 = OffsetDateTime.parse((v2 as TimestampValue).value) + ts1.compareTo(ts2) + } + } + // otherwise, just be a terrible person. + // we know these are the same type, so this is safe to do. + else -> + @Suppress("UNCHECKED_CAST") (v1 as Comparable).compareTo(v2) + } } } } diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DestinationProcess.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DestinationProcess.kt index c25f37993f3a..5b9f849960d5 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DestinationProcess.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DestinationProcess.kt @@ -13,8 +13,6 @@ import io.micronaut.context.env.yaml.YamlPropertySourceLoader import java.nio.file.Files import java.nio.file.Path -const val DOCKERIZED_TEST_ENV = "DOCKERIZED_INTEGRATION_TEST" - /** * Represents a destination process, whether running in-JVM via micronaut, or as a separate Docker * container. The general lifecycle is: diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt index 3cb91a11fbd3..ac6f435269fd 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt @@ -11,8 +11,6 @@ import io.airbyte.protocol.models.v0.AirbyteLogMessage import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.github.oshai.kotlinlogging.KotlinLogging -import io.micronaut.context.annotation.Requires -import io.micronaut.context.annotation.Value import java.io.BufferedWriter import java.io.OutputStreamWriter import java.nio.file.Files @@ -20,7 +18,6 @@ import java.nio.file.Path import java.time.Clock import java.util.Locale import java.util.Scanner -import javax.inject.Singleton import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.coroutineScope @@ -238,20 +235,9 @@ class DockerizedDestination( } } -@Singleton -@Requires(env = [DOCKERIZED_TEST_ENV]) class DockerizedDestinationFactory( - // Note that this is not the same property as in MetadataYamlPropertySource. - // We get this because IntegrationTest manually sets "classpath:metadata.yaml" - // as a property source. - // MetadataYamlPropertySource has nothing to do with this property. - @Value("\${data.docker-repository}") val imageName: String, - // Most tests will just use micronaut to inject this. - // But some tests will want to manually instantiate an instance, - // e.g. to run an older version of the connector. - // So we just hardcode 'dev' here; manual callers can pass in - // whatever they want. - @Value("dev") val imageVersion: String, + private val imageName: String, + private val imageVersion: String, ) : DestinationProcessFactory() { override fun createDestinationProcess( command: String, diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt index 6ed0a480ac2b..f1726b5702d3 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt @@ -11,12 +11,10 @@ import io.airbyte.cdk.command.FeatureFlag import io.airbyte.protocol.models.Jsons import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog -import io.micronaut.context.annotation.Requires import java.io.PipedInputStream import java.io.PipedOutputStream import java.io.PrintWriter import java.util.concurrent.Executors -import javax.inject.Singleton import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.launch @@ -93,11 +91,6 @@ class NonDockerizedDestination( } } -// Notably, not actually a Micronaut factory. We want to inject the actual -// factory into our tests, not a pre-instantiated destination, because we want -// to run multiple destination processes per test. -@Singleton -@Requires(notEnv = [DOCKERIZED_TEST_ENV]) class NonDockerizedDestinationFactory : DestinationProcessFactory() { override fun createDestinationProcess( command: String, diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt index 80e1b6751748..abcde861434b 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt @@ -7,14 +7,18 @@ package io.airbyte.cdk.load.write import io.airbyte.cdk.command.ConfigurationSpecification import io.airbyte.cdk.command.ValidatedJsonUtils import io.airbyte.cdk.load.command.Append +import io.airbyte.cdk.load.command.Dedupe import io.airbyte.cdk.load.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.data.AirbyteValue import io.airbyte.cdk.load.data.FieldType import io.airbyte.cdk.load.data.IntegerType import io.airbyte.cdk.load.data.IntegerValue import io.airbyte.cdk.load.data.ObjectType import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.data.StringType +import io.airbyte.cdk.load.data.StringValue +import io.airbyte.cdk.load.data.TimestampTypeWithTimezone import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.cdk.load.message.StreamCheckpoint import io.airbyte.cdk.load.test.util.DestinationCleaner @@ -30,6 +34,7 @@ import io.airbyte.cdk.util.Jsons import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange import io.airbyte.protocol.models.v0.AirbyteStateMessage +import java.time.OffsetDateTime import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -63,6 +68,7 @@ abstract class BasicFunctionalityIntegrationTest( * retroactive schemas: writing a new file without a column has no effect on older files. */ val isStreamSchemaRetroactive: Boolean, + val supportsDedup: Boolean, ) : IntegrationTest(dataDumper, destinationCleaner, recordMangler, nameMapper) { val parsedConfig = ValidatedJsonUtils.parseOne(configSpecClass, configContents) @@ -413,7 +419,7 @@ abstract class BasicFunctionalityIntegrationTest( @Test @Disabled - open fun testFunkyStreamAndColumnNames() { + open fun testFunkyCharacters() { assumeTrue(verifyDataWriting) fun makeStream( name: String, @@ -427,7 +433,8 @@ abstract class BasicFunctionalityIntegrationTest( minimumGenerationId = 0, syncId = 42, ) - // Catalog with some weird schemas + // Catalog with some weird schemas. + // Every stream has an int `id`, and maybe some string fields. val catalog = DestinationCatalog( listOf( @@ -436,34 +443,41 @@ abstract class BasicFunctionalityIntegrationTest( makeStream("STREAM_WITH_ALL_CAPS"), makeStream("CapitalCase"), makeStream( - "stream_with_edge_case_field_names", + "stream_with_edge_case_field_names_and_values", linkedMapOf( "id" to intType, - "fieldWithCamelCase" to intType, - "field_with_underscore" to intType, - "FIELD_WITH_ALL_CAPS" to intType, - "field_with_spécial_character" to intType, + "fieldWithCamelCase" to stringType, + "field_with_underscore" to stringType, + "FIELD_WITH_ALL_CAPS" to stringType, + "field_with_spécial_character" to stringType, // "order" is a reserved word in many sql engines - "order" to intType, - "ProperCase" to intType, + "order" to stringType, + "ProperCase" to stringType, ) ), // this is apparently trying to test for reserved words? // https://github.com/airbytehq/airbyte/pull/1753 - makeStream("groups", linkedMapOf("id" to intType, "authorization" to intType)), + makeStream( + "groups", + linkedMapOf("id" to intType, "authorization" to stringType) + ), ) ) - // For each stream, generate a record containing every field in the schema + // For each stream, generate a record containing every field in the schema. + // The id field is always 42, and the string fields are always "foo\nbar". val messages = - catalog.streams.map { + catalog.streams.map { stream -> DestinationRecord( - it.descriptor, + stream.descriptor, ObjectValue( - (it.schema as ObjectType).properties.mapValuesTo(linkedMapOf()) { - IntegerValue(42) - } + (stream.schema as ObjectType) + .properties + .mapValuesTo(linkedMapOf()) { + StringValue("foo\nbar") + } + .also { it["id"] = IntegerValue(42) } ), - 1234, + emittedAtMs = 1234, meta = null, serialized = "", ) @@ -479,9 +493,10 @@ abstract class BasicFunctionalityIntegrationTest( extractedAt = 1234, generationId = 0, data = - (stream.schema as ObjectType).properties.mapValuesTo( - linkedMapOf() - ) { 42 }, + (stream.schema as ObjectType) + .properties + .mapValuesTo(linkedMapOf()) { "foo\nbar" } + .also { it["id"] = 42 }, airbyteMeta = OutputRecord.Meta(syncId = 42) ) ), @@ -684,8 +699,151 @@ abstract class BasicFunctionalityIntegrationTest( ) } + @Test + open fun testDedup() { + assumeTrue(supportsDedup) + fun makeStream(syncId: Long) = + DestinationStream( + DestinationStream.Descriptor(randomizedNamespace, "test_stream"), + importType = + Dedupe( + primaryKey = listOf(listOf("id1"), listOf("id2")), + cursor = listOf("updated_at"), + ), + schema = + ObjectType( + properties = + linkedMapOf( + "id1" to intType, + "id2" to intType, + "updated_at" to timestamptzType, + "name" to stringType, + "_ab_cdc_deleted_at" to timestamptzType, + ) + ), + generationId = 42, + minimumGenerationId = 0, + syncId = syncId, + ) + fun makeRecord(data: String, extractedAt: Long) = + DestinationRecord( + randomizedNamespace, + "test_stream", + data, + emittedAtMs = extractedAt, + ) + + val sync1Stream = makeStream(syncId = 42) + runSync( + configContents, + sync1Stream, + listOf( + // emitted_at:1000 is equal to 1970-01-01 00:00:01Z. + // This obviously makes no sense in relation to updated_at being in the year 2000, + // but that's OK because (from destinations POV) updated_at has no relation to + // extractedAt. + makeRecord( + """{"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "name": "Alice1", "_ab_cdc_deleted_at": null}""", + extractedAt = 1000, + ), + // Emit a second record for id=(1,200) with a different updated_at. + makeRecord( + """{"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "name": "Alice2", "_ab_cdc_deleted_at": null}""", + extractedAt = 1000, + ), + // Emit a record with no _ab_cdc_deleted_at field. CDC sources typically emit an + // explicit null, but we should handle both cases. + makeRecord( + """{"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob1"}""", + extractedAt = 1000, + ), + ), + ) + dumpAndDiffRecords( + parsedConfig, + listOf( + // Alice has only the newer record, and Bob also exists + OutputRecord( + extractedAt = 1000, + generationId = 42, + data = + mapOf( + "id1" to 1, + "id2" to 200, + "updated_at" to OffsetDateTime.parse("2000-01-01T00:01:00Z"), + "name" to "Alice2", + "_ab_cdc_deleted_at" to null + ), + airbyteMeta = OutputRecord.Meta(syncId = 42), + ), + OutputRecord( + extractedAt = 1000, + generationId = 42, + data = + mapOf( + "id1" to 1, + "id2" to 201, + "updated_at" to OffsetDateTime.parse("2000-01-01T00:02:00Z"), + "name" to "Bob1" + ), + airbyteMeta = OutputRecord.Meta(syncId = 42), + ), + ), + sync1Stream, + primaryKey = listOf(listOf("id1"), listOf("id2")), + cursor = listOf("updated_at"), + ) + + val sync2Stream = makeStream(syncId = 43) + runSync( + configContents, + sync2Stream, + listOf( + // Update both Alice and Bob + makeRecord( + """{"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "name": "Alice3", "_ab_cdc_deleted_at": null}""", + extractedAt = 2000, + ), + makeRecord( + """{"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "name": "Bob2"}""", + extractedAt = 2000, + ), + // And delete Bob. Again, T+D doesn't check the actual _value_ of deleted_at (i.e. + // the fact that it's in the past is irrelevant). It only cares whether deleted_at + // is non-null. So the destination should delete Bob. + makeRecord( + """{"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}""", + extractedAt = 2000, + ), + ), + ) + dumpAndDiffRecords( + parsedConfig, + listOf( + // Alice still exists (and has been updated to the latest version), but Bob is gone + OutputRecord( + extractedAt = 2000, + generationId = 42, + data = + mapOf( + "id1" to 1, + "id2" to 200, + "updated_at" to OffsetDateTime.parse("2000-01-02T00:00:00Z"), + "name" to "Alice3", + "_ab_cdc_deleted_at" to null + ), + airbyteMeta = OutputRecord.Meta(syncId = 43), + ) + ), + sync2Stream, + primaryKey = listOf(listOf("id1"), listOf("id2")), + cursor = listOf("updated_at"), + ) + } + companion object { private val intType = FieldType(IntegerType, nullable = true) private val stringType = FieldType(StringType, nullable = true) + private val timestamptzType = FieldType(TimestampTypeWithTimezone, nullable = true) } } diff --git a/airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullBasicFunctionalityIntegrationTest.kt b/airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullBasicFunctionalityIntegrationTest.kt index b6af7de7e6e1..f3eb7195f1bf 100644 --- a/airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullBasicFunctionalityIntegrationTest.kt +++ b/airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullBasicFunctionalityIntegrationTest.kt @@ -18,6 +18,7 @@ class DevNullBasicFunctionalityIntegrationTest : NoopExpectedRecordMapper, verifyDataWriting = false, isStreamSchemaRetroactive = false, + supportsDedup = false, ) { @Test override fun testBasicWrite() { diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index 73c829fd9e8d..4a36648c9a5c 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -18,12 +18,18 @@ abstract class S3V2WriteTest(path: String) : NoopDestinationCleaner, NoopExpectedRecordMapper, isStreamSchemaRetroactive = false, + supportsDedup = false, ) { @Test override fun testBasicWrite() { super.testBasicWrite() } + @Test + override fun testFunkyCharacters() { + super.testFunkyCharacters() + } + @Disabled @Test override fun testMidSyncCheckpointingStreamState() {