From 1930d448b33489d8a96ee446d3465fceb3d0d7c2 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 28 Aug 2024 15:14:02 -0700 Subject: [PATCH] deprecate old record differ --- .../cdk/test/util/RecordDifferTest.kt} | 4 +- .../io/airbyte/cdk/test/util/OutputRecord.kt | 62 ++++++++++++ .../io/airbyte/cdk/test/util}/RecordDiffer.kt | 96 ++++--------------- .../BaseSqlGeneratorIntegrationTest.kt | 4 +- .../typing_deduping/BaseTypingDedupingTest.kt | 4 +- ...{RecordDiffer.kt => LegacyRecordDiffer.kt} | 7 +- .../destination/e2e_test/E2EDestination.kt | 5 - 7 files changed, 92 insertions(+), 90 deletions(-) rename airbyte-cdk/bulk/core/{extract/src/test/kotlin/io/airbyte/cdk/test/MaybeRecordDifferTest.kt => load/src/test/kotlin/io/airbyte/cdk/test/util/RecordDifferTest.kt} (98%) create mode 100644 airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/OutputRecord.kt rename airbyte-cdk/bulk/core/{extract/src/testFixtures/kotlin/io/airbyte/cdk/test => load/src/testFixtures/kotlin/io/airbyte/cdk/test/util}/RecordDiffer.kt (75%) rename airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/{RecordDiffer.kt => LegacyRecordDiffer.kt} (98%) diff --git a/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/test/MaybeRecordDifferTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/test/util/RecordDifferTest.kt similarity index 98% rename from airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/test/MaybeRecordDifferTest.kt rename to airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/test/util/RecordDifferTest.kt index e8f5d5ef543d..01b08195f325 100644 --- a/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/test/MaybeRecordDifferTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/test/util/RecordDifferTest.kt @@ -2,13 +2,13 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.cdk.test +package io.airbyte.cdk.test.util import java.time.Instant import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test -class MaybeRecordDifferTest { +class RecordDifferTest { @Test fun testBasicBehavior() { val differ = diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/OutputRecord.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/OutputRecord.kt new file mode 100644 index 000000000000..15328f4a72d0 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/OutputRecord.kt @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.test.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper +import java.time.Instant +import java.util.UUID + +/** A record that we expect to exist in the destination, whether raw _or_ final. */ +data class OutputRecord( + val rawId: UUID?, + val extractedAt: Instant, + val loadedAt: Instant?, + val generationId: Long?, + /** + * strongly-typed map, e.g. ZonedDateTime for timestamp_with_timezone. this makes destination + * test implementations easier. values can be null, b/c warehouse destinations with a JSON + * column type can be either SQL null, or JSON null, and we want to distinguish between those. + * Destinations _must_ filter out the airbyte_* fields from this map. + */ + val data: Map, + val airbyteMeta: JsonNode?, +) { + /** Utility constructor with easier types to write by hand */ + constructor( + rawId: String, + extractedAt: Long, + loadedAt: Long?, + generationId: Long?, + data: Map, + airbyteMeta: String?, + ) : this( + UUID.fromString(rawId), + Instant.ofEpochMilli(extractedAt), + loadedAt?.let { Instant.ofEpochMilli(it) }, + generationId, + data, + airbyteMeta?.let { ObjectMapper().readTree(it) }, + ) + + /** + * Utility constructor for "expected records". [rawId] and [loadedAt] are generated by the + * destination at runtime, so we don't have those when writing the test. Just generate arbitrary + * values for them. + */ + constructor( + extractedAt: Long, + generationId: Long?, + data: Map, + airbyteMeta: String?, + ) : this( + null, + Instant.ofEpochMilli(extractedAt), + loadedAt = null, + generationId, + data, + airbyteMeta?.let { ObjectMapper().readTree(it) }, + ) +} diff --git a/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/test/RecordDiffer.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/RecordDiffer.kt similarity index 75% rename from airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/test/RecordDiffer.kt rename to airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/RecordDiffer.kt index eace95573c1a..d2a7fe42b12e 100644 --- a/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/test/RecordDiffer.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/RecordDiffer.kt @@ -2,67 +2,10 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.cdk.test +package io.airbyte.cdk.test.util -import com.fasterxml.jackson.databind.JsonNode -import com.fasterxml.jackson.databind.ObjectMapper -import io.airbyte.cdk.test.RecordDiffer.MaybeRecordDiff -import java.time.Instant -import java.util.UUID import kotlin.reflect.jvm.jvmName -/** A record that we expect to exist in the destination, whether raw _or_ final. */ -data class OutputRecord( - val rawId: UUID?, - val extractedAt: Instant, - val loadedAt: Instant?, - val generationId: Long?, - /** - * strongly-typed map, e.g. ZonedDateTime for timestamp_with_timezone. this makes destination - * test implementations easier. values can be null, b/c warehouse destinations with a JSON - * column type can be either SQL null, or JSON null, and we want to distinguish between those. - * Destinations _must_ filter out the airbyte_* fields from this map. - */ - val data: Map, - val airbyteMeta: JsonNode?, -) { - /** Utility constructor with easier types to write by hand */ - constructor( - rawId: String, - extractedAt: Long, - loadedAt: Long?, - generationId: Long?, - data: Map, - airbyteMeta: String?, - ) : this( - UUID.fromString(rawId), - Instant.ofEpochMilli(extractedAt), - loadedAt?.let { Instant.ofEpochMilli(it) }, - generationId, - data, - airbyteMeta?.let { ObjectMapper().readTree(it) }, - ) - - /** - * Utility constructor for "expected records". [rawId] and [loadedAt] are generated by the - * destination at runtime, so we don't have those when writing the test. Just generate arbitrary - * values for them. - */ - constructor( - extractedAt: Long, - generationId: Long?, - data: Map, - airbyteMeta: String?, - ) : this( - null, - Instant.ofEpochMilli(extractedAt), - loadedAt = null, - generationId, - data, - airbyteMeta?.let { ObjectMapper().readTree(it) }, - ) -} - class RecordDiffer( /** * A function to extract primary key fields from a record. Most streams will have some `id` @@ -80,13 +23,11 @@ class RecordDiffer( * This class implicitly also sorts records by extracted_at; this comparator does _not_ need to * do that sorting. * - * See [MaybeRecordDiff.generateRecordIdentifier] for why this is nullable. + * See [MatchingRecords.generateRecordIdentifier] for why this is nullable. */ val extractCursor: ((OutputRecord) -> Any?)? = null, ) { - /** - * Comparator that sorts records by their primary key - */ + /** Comparator that sorts records by their primary key */ private val identityComparator: Comparator = Comparator { rec1, rec2 -> val pk1 = extractPrimaryKey(rec1) val pk2 = extractPrimaryKey(rec2) @@ -112,11 +53,12 @@ class RecordDiffer( .thenComparing { it -> it.extractedAt } /** - * The actual comparator we'll use to sort the expected/actual record lists. - * I.e. group records by their PK, then within each PK, sort by cursor/extractedAt. + * The actual comparator we'll use to sort the expected/actual record lists. I.e. group records + * by their PK, then within each PK, sort by cursor/extractedAt. */ private val everythingComparator = identityComparator.thenComparing(sortComparator) + /** Returns a pretty-printed diff of the two lists, or null if they were identical */ fun diffRecords( expectedRecords: List, actualRecords: List @@ -127,7 +69,7 @@ class RecordDiffer( // Match up all the records between the expected and actual records, // or if there's no matching record then detect that also. // We'll filter this list down to actual differing records later on. - val diffs = mutableListOf() + val matches = mutableListOf() var expectedRecordIndex = 0 var actualRecordIndex = 0 while ( @@ -136,45 +78,43 @@ class RecordDiffer( ) { val expectedRecord = expectedRecords[expectedRecordIndex] val actualRecord = actualRecords[actualRecordIndex] - val compare = - everythingComparator - .compare(expectedRecord, actualRecord) + val compare = everythingComparator.compare(expectedRecord, actualRecord) if (compare == 0) { // These records are the same underlying record - diffs.add(MaybeRecordDiff(expectedRecord, actualRecord)) + matches.add(MatchingRecords(expectedRecord, actualRecord)) expectedRecordIndex++ actualRecordIndex++ } else if (compare < 0) { // There's an extra expected record - diffs.add(MaybeRecordDiff(expectedRecord, actualRecord = null)) + matches.add(MatchingRecords(expectedRecord, actualRecord = null)) expectedRecordIndex++ } else { // There's an extra actual record - diffs.add(MaybeRecordDiff(expectedRecord = null, actualRecord)) + matches.add(MatchingRecords(expectedRecord = null, actualRecord)) actualRecordIndex++ } } // Tail loops in case we reached the end of one list before the other. while (expectedRecordIndex < expectedRecords.size) { - diffs.add(MaybeRecordDiff(expectedRecords[expectedRecordIndex], actualRecord = null)) + matches.add(MatchingRecords(expectedRecords[expectedRecordIndex], actualRecord = null)) expectedRecordIndex++ } while (actualRecordIndex < actualRecords.size) { - diffs.add(MaybeRecordDiff(expectedRecord = null, actualRecords[actualRecordIndex])) + matches.add(MatchingRecords(expectedRecord = null, actualRecords[actualRecordIndex])) actualRecordIndex++ } // We've paired up all the records, now find just the ones that are wrong. - val actualDiffs = diffs.filter { it.isMismatch() } - return if (actualDiffs.isEmpty()) { + val diffs = matches.filter { it.isMismatch() } + return if (diffs.isEmpty()) { null } else { - actualDiffs.joinToString("\n") { it.prettyPrintMismatch() } + diffs.joinToString("\n") { it.prettyPrintMismatch() } } } - private inner class MaybeRecordDiff( + private inner class MatchingRecords( val expectedRecord: OutputRecord?, val actualRecord: OutputRecord?, ) { @@ -189,7 +129,7 @@ class RecordDiffer( } else if (actualRecord == null) { "Missing record (${generateRecordIdentifier(expectedRecord)}): $expectedRecord" } else { - "Incorrect record ((${generateRecordIdentifier(actualRecord)}):\n" + + "Incorrect record (${generateRecordIdentifier(actualRecord)}):\n" + generateDiffString(expectedRecord, actualRecord).prependIndent(" ") } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt index 30bb00b8fee5..6cc2619d1aa8 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt @@ -47,7 +47,7 @@ private val LOGGER = KotlinLogging.logger {} */ @Execution(ExecutionMode.CONCURRENT) abstract class BaseSqlGeneratorIntegrationTest { - protected var DIFFER: RecordDiffer = mock() + protected var DIFFER: LegacyRecordDiffer = mock() /** Subclasses may use these four StreamConfigs in their tests. */ protected var incrementalDedupStream: StreamConfig = mock() @@ -200,7 +200,7 @@ abstract class BaseSqlGeneratorIntegrationTest, diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/E2EDestination.kt b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/E2EDestination.kt index f2d9ffb2600a..8a812670051e 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/E2EDestination.kt +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/E2EDestination.kt @@ -5,11 +5,6 @@ package io.airbyte.integrations.destination.e2e_test import io.airbyte.cdk.AirbyteDestinationRunner -import io.micronaut.context.annotation.Factory -import io.micronaut.context.annotation.Primary -import jakarta.inject.Singleton -import java.io.FileInputStream -import java.io.InputStream class E2EDestination { companion object {