Skip to content

Commit

Permalink
even more test
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Aug 2, 2024
1 parent 0d9c69b commit 7185b25
Showing 1 changed file with 129 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,135 @@ abstract class BaseTypingDedupingTest {
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
}

/**
* Emulates this sequence of events:
* 1. User runs a normal incremental sync
* 2. User initiates a truncate refresh, but it fails.
* 3. User cancels the truncate refresh, and initiates a normal incremental sync.
*
* In particular, we must retain all records from both the first sync, _and_ the truncate sync's
* temporary raw table.
*/
@Test
@Throws(Exception::class)
open fun resumeAfterCancelledTruncate() {
val catalog1 =
io.airbyte.protocol.models.v0
.ConfiguredAirbyteCatalog()
.withStreams(
java.util.List.of(
ConfiguredAirbyteStream()
.withSyncId(42)
.withGenerationId(43)
.withMinimumGenerationId(0)
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(listOf("updated_at"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(
AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(SCHEMA)
)
)
)

// Normal sync
runSync(catalog1, readMessages("dat/sync1_messages.jsonl"))

val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl")
val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl")
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison())

val catalog2 =
io.airbyte.protocol.models.v0
.ConfiguredAirbyteCatalog()
.withStreams(
java.util.List.of(
ConfiguredAirbyteStream()
.withSyncId(42)
// Generation ID is incremented
.withGenerationId(44)
.withMinimumGenerationId(44)
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(listOf("updated_at"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(
AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(SCHEMA)
)
)
)
// Interrupted truncate sync
assertThrows<Exception> {
runSync(
catalog2,
readMessages("dat/sync2_messages.jsonl"),
streamStatus = AirbyteStreamStatus.INCOMPLETE,
)
}

// We should still have the exact same records as after the initial sync
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison())

val catalog3 =
io.airbyte.protocol.models.v0
.ConfiguredAirbyteCatalog()
.withStreams(
java.util.List.of(
ConfiguredAirbyteStream()
.withSyncId(42)
// Same generation as the truncate sync, but now with
// min gen = 0
.withGenerationId(44)
.withMinimumGenerationId(0)
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(listOf("updated_at"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(
AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(SCHEMA)
)
)
)

// Third sync
runSync(catalog3, readMessages("dat/sync2_messages.jsonl"))

// We wrote the sync2 records twice, so expect duplicates.
// But we didn't write the sync1 records twice, so filter those out in a dumb way.
// Also override the generation ID to the correct value on the sync2 records,
// but leave the sync1 records with their original generation.
val expectedRawRecords2 =
readRecords("dat/sync2_expectedrecords_raw.jsonl").let { baseRecords ->
val sync2Records =
baseRecords.subList(expectedRawRecords1.size, baseRecords.size).onEach {
(it as ObjectNode).put(
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
44,
)
}
expectedRawRecords1 + sync2Records + sync2Records
}
val expectedFinalRecords2 =
readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl").let {
baseRecords ->
val sync2Records =
baseRecords.subList(expectedFinalRecords1.size, baseRecords.size).onEach {
(it as ObjectNode).put(
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
44,
)
}
expectedFinalRecords1 + sync2Records + sync2Records
}
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
}

open val manyStreamCount = 20

@Test
Expand Down

0 comments on commit 7185b25

Please sign in to comment.