Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Load CDK: Tolerate empty streams and update on empty batches #48591

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ class DefaultStreamManager(
return false
}

/* A closed empty stream is always complete. */
if (recordCount.get() == 0L) {
return true
}

return isProcessingCompleteForState(recordCount.get(), Batch.State.COMPLETE)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import io.airbyte.cdk.load.message.DestinationMessage
import io.airbyte.cdk.load.message.DestinationStreamEvent
import io.airbyte.cdk.load.message.MessageQueueSupplier
import io.airbyte.cdk.load.message.QueueWriter
import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.load.state.Reserved
import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.load.task.implementor.CloseStreamTaskFactory
Expand Down Expand Up @@ -204,9 +205,15 @@ class DefaultDestinationTaskLauncher(
stream: DestinationStream.Descriptor,
file: SpilledRawMessagesLocalFile
) {
log.info { "Starting process records task for $stream, file $file" }
val task = processRecordsTaskFactory.make(this, stream, file)
enqueue(task)
if (file.totalSizeBytes > 0L) {
log.info { "Starting process records task for ${stream}, file $file" }
val task = processRecordsTaskFactory.make(this, stream, file)
enqueue(task)
} else {
log.info { "No records to process in $file, skipping process records" }
// TODO: Make this `maybeCloseStream` or something
handleNewBatch(stream, BatchEnvelope(SimpleBatch(Batch.State.COMPLETE)))
}
if (!file.endOfStream) {
log.info { "End-of-stream not reached, restarting spill-to-disk task for $stream" }
val spillTask = spillToDiskTaskFactory.make(this, stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,11 @@ class StreamManagerTest {
// Can close now
Assertions.assertDoesNotThrow(manager::markSucceeded)
}

@Test
fun testEmptyCompletedStreamYieldsBatchProcessingComplete() {
val manager = DefaultStreamManager(stream1)
manager.markEndOfStream()
Assertions.assertTrue(manager.isBatchProcessingComplete())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,21 @@ class DestinationTaskLauncherTest<T> where T : LeveledTask, T : ScopedTask {
)
}

@Test
fun testHandleEmptySpilledFile() = runTest {
taskLauncher.handleNewSpilledFile(
MockDestinationCatalogFactory.stream1.descriptor,
SpilledRawMessagesLocalFile(Path("not/a/real/file"), 0L, Range.singleton(0))
)

mockSpillToDiskTaskFactory.streamHasRun[MockDestinationCatalogFactory.stream1.descriptor]
?.receive()
?: Assertions.fail("SpillToDiskTask not run")

delay(500)
Assertions.assertTrue(processRecordsTaskFactory.hasRun.tryReceive().isFailure)
}

@Test
fun testHandleNewBatch() = runTest {
val range = TreeRangeSet.create(listOf(Range.closed(0L, 100L)))
Expand Down Expand Up @@ -494,6 +509,18 @@ class DestinationTaskLauncherTest<T> where T : LeveledTask, T : ScopedTask {
Assertions.assertTrue(true)
}

@Test
fun handleEmptyBatch() = runTest {
val range = TreeRangeSet.create(listOf(Range.closed(0L, 0L)))
val streamManager =
syncManager.getStreamManager(MockDestinationCatalogFactory.stream1.descriptor)
streamManager.markEndOfStream()

val emptyBatch = BatchEnvelope(MockBatch(Batch.State.COMPLETE), range)
taskLauncher.handleNewBatch(MockDestinationCatalogFactory.stream1.descriptor, emptyBatch)
closeStreamTaskFactory.hasRun.receive()
}

@Test
fun testHandleStreamClosed() = runTest {
// This should run teardown unconditionally.
Expand Down
Loading