From 45cf6156a07bbdc421ecff378e583853237ce70c Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Thu, 3 Oct 2024 15:12:04 -0700 Subject: [PATCH] Bulk Load CDK: Restore CheckpointManager Test (#46321) --- .../DestinationMessageQueueWriterTest.kt | 1 + .../cdk/state/CheckpointManagerTest.kt | 40 +++++++++++++------ .../io/airbyte/cdk/state/StreamManagerTest.kt | 1 + .../cdk/task/DestinationTaskLauncherTest.kt | 10 ++++- 4 files changed, 38 insertions(+), 14 deletions(-) diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/message/DestinationMessageQueueWriterTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/message/DestinationMessageQueueWriterTest.kt index d4d175661bb3..09ad533422fa 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/message/DestinationMessageQueueWriterTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/message/DestinationMessageQueueWriterTest.kt @@ -186,6 +186,7 @@ class DestinationMessageQueueWriterTest { Assertions.assertEquals(manager2.endOfStreamRead(), false) Assertions.assertEquals(manager1.endOfStreamRead(), true) + Assertions.assertEquals(11, channel1.messages.size) Assertions.assertEquals(channel1.messages[10], StreamCompleteWrapped(10)) } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/state/CheckpointManagerTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/state/CheckpointManagerTest.kt index e295a1f7cc4e..b2667f392022 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/state/CheckpointManagerTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/state/CheckpointManagerTest.kt @@ -14,7 +14,6 @@ import io.airbyte.cdk.message.Batch import io.airbyte.cdk.message.BatchEnvelope import io.airbyte.cdk.message.MessageConverter import io.airbyte.cdk.message.SimpleBatch -import io.micronaut.context.annotation.Prototype import io.micronaut.test.extensions.junit5.annotation.MicronautTest import jakarta.inject.Inject import jakarta.inject.Singleton @@ -29,6 +28,7 @@ import org.junit.jupiter.params.provider.ArgumentsProvider import org.junit.jupiter.params.provider.ArgumentsSource @MicronautTest( + rebuildContext = true, environments = [ "CheckpointManagerTest", @@ -37,6 +37,7 @@ import org.junit.jupiter.params.provider.ArgumentsSource ) class CheckpointManagerTest { @Inject lateinit var checkpointManager: TestCheckpointManager + @Inject lateinit var syncManager: SyncManager /** * Test state messages. * @@ -64,20 +65,23 @@ class CheckpointManagerTest { } } - @Prototype + @Singleton class MockOutputConsumer : Consumer { - val collectedStreamOutput = mutableMapOf>() + val collectedStreamOutput = + mutableMapOf>() val collectedGlobalOutput = mutableListOf() override fun accept(t: MockCheckpointOut) { when (t) { is MockStreamCheckpointOut -> - collectedStreamOutput.getOrPut(t.stream) { mutableListOf() }.add(t.payload) + collectedStreamOutput + .getOrPut(t.stream.descriptor) { mutableListOf() } + .add(t.payload) is MockGlobalCheckpointOut -> collectedGlobalOutput.add(t.payload) } } } - @Prototype + @Singleton class TestCheckpointManager( override val catalog: DestinationCatalog, override val syncManager: SyncManager, @@ -104,7 +108,7 @@ class CheckpointManagerTest { val name: String, val events: List, // Order matters, but only per stream - val expectedStreamOutput: Map> = mapOf(), + val expectedStreamOutput: Map> = mapOf(), val expectedGlobalOutput: List = listOf(), val expectedException: Class? = null ) @@ -124,7 +128,7 @@ class CheckpointManagerTest { mapOf(stream1 to listOf(Range.closed(0L, 20L))) ) ), - expectedStreamOutput = mapOf(stream1 to listOf("1", "2")) + expectedStreamOutput = mapOf(stream1.descriptor to listOf("1", "2")) ), TestCase( name = "One stream, two messages, flush only the first", @@ -137,7 +141,7 @@ class CheckpointManagerTest { mapOf(stream1 to listOf(Range.closed(0L, 10L))) ) ), - expectedStreamOutput = mapOf(stream1 to listOf("1")) + expectedStreamOutput = mapOf(stream1.descriptor to listOf("1")) ), TestCase( name = "Two streams, two messages each, flush all", @@ -156,7 +160,10 @@ class CheckpointManagerTest { ) ), expectedStreamOutput = - mapOf(stream1 to listOf("11", "12"), stream2 to listOf("22", "21")) + mapOf( + stream1.descriptor to listOf("11", "12"), + stream2.descriptor to listOf("22", "21") + ) ), TestCase( name = "One stream, only later range persisted", @@ -335,7 +342,7 @@ class CheckpointManagerTest { TestStreamMessage(stream1, 30L, 3), FlushPoint(mapOf(stream1 to listOf(Range.closed(10L, 30L)))) ), - expectedStreamOutput = mapOf(stream1 to listOf("1", "2", "3")) + expectedStreamOutput = mapOf(stream1.descriptor to listOf("1", "2", "3")) ), TestCase( name = "Global checkpoint, multiple flush points, no output", @@ -402,7 +409,7 @@ class CheckpointManagerTest { @ParameterizedTest @ArgumentsSource(CheckpointManagerTestArgumentsProvider::class) - suspend fun testAddingAndFlushingCheckpoints(testCase: TestCase) = runTest { + fun testAddingAndFlushingCheckpoints(testCase: TestCase) = runTest { if (testCase.expectedException != null) { try { runTestCase(testCase) @@ -429,6 +436,15 @@ class CheckpointManagerTest { testCase.events.forEach { when (it) { is TestStreamMessage -> { + /** + * Mock the correct state of the stream manager by advancing the record count to + * the index of the message. + */ + val streamManager = syncManager.getStreamManager(it.stream.descriptor) + val recordCount = streamManager.recordCount() + (recordCount until it.index).forEach { _ -> + syncManager.getStreamManager(it.stream.descriptor).countRecordIn() + } checkpointManager.addStreamCheckpoint( it.stream.descriptor, it.index, @@ -444,7 +460,7 @@ class CheckpointManagerTest { val mockBatch = SimpleBatch(state = Batch.State.PERSISTED) val rangeSet = TreeRangeSet.create(ranges) val mockBatchEnvelope = BatchEnvelope(batch = mockBatch, ranges = rangeSet) - checkpointManager.syncManager + syncManager .getStreamManager(stream.descriptor) .updateBatchState(mockBatchEnvelope) } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/state/StreamManagerTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/state/StreamManagerTest.kt index 335d9d50688a..727041adbccd 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/state/StreamManagerTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/state/StreamManagerTest.kt @@ -68,6 +68,7 @@ class StreamManagerTest { Assertions.assertTrue(channel.tryReceive().isFailure) Assertions.assertThrows(IllegalStateException::class.java) { manager.markSucceeded() } manager.markEndOfStream() + manager.markSucceeded() Assertions.assertTrue(channel.receive()) diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/task/DestinationTaskLauncherTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/task/DestinationTaskLauncherTest.kt index aa114ca90950..be850fbc6186 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/task/DestinationTaskLauncherTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/task/DestinationTaskLauncherTest.kt @@ -318,11 +318,17 @@ class DestinationTaskLauncherTest { streamManager.markEndOfStream() // Verify incomplete batch triggers process batch - val incompleteBatch = BatchEnvelope(MockBatch(Batch.State.PERSISTED), range) + val incompleteBatch = BatchEnvelope(MockBatch(Batch.State.LOCAL), range) taskLauncher.handleNewBatch(stream1, incompleteBatch) - Assertions.assertTrue(streamManager.areRecordsPersistedUntil(100L)) + Assertions.assertFalse(streamManager.areRecordsPersistedUntil(100L)) val batchReceived = processBatchTaskFactory.hasRun.receive() Assertions.assertEquals(incompleteBatch, batchReceived) + delay(500) + Assertions.assertTrue(flushCheckpointsTaskFactory.hasRun.tryReceive().isFailure) + + val persistedBatch = BatchEnvelope(MockBatch(Batch.State.PERSISTED), range) + taskLauncher.handleNewBatch(stream1, persistedBatch) + Assertions.assertTrue(streamManager.areRecordsPersistedUntil(100L)) Assertions.assertTrue(flushCheckpointsTaskFactory.hasRun.receive()) // Verify complete batch w/o batch processing complete does nothing