diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt index 2fe9488d03a4b..c1e324ec63277 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt @@ -5,6 +5,7 @@ package io.airbyte.cdk.load.message import io.airbyte.cdk.load.util.CloseableCoroutine +import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.receiveAsFlow @@ -16,6 +17,7 @@ interface QueueReader { interface QueueWriter : CloseableCoroutine { suspend fun publish(message: T) + fun isClosedForPublish(): Boolean } interface MessageQueue : QueueReader, QueueWriter @@ -29,6 +31,8 @@ abstract class ChannelMessageQueue : MessageQueue { override suspend fun close() { channel.close() } + @OptIn(DelicateCoroutinesApi::class) + override fun isClosedForPublish(): Boolean = channel.isClosedForSend } interface MessageQueueSupplier { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/TimeWindowTrigger.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/TimeWindowTrigger.kt index ca2693a8a355d..4e3f86f78b6fa 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/TimeWindowTrigger.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/TimeWindowTrigger.kt @@ -15,7 +15,8 @@ data class TimeWindowTrigger( private val clock: Clock, private val windowWidthMs: Long, ) { - private var openedAtMs: Long? = null + var openedAtMs: Long? = null + private set /* * Sets window open timestamp for computing completeness. Idempotent. Mutative. diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushTickTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushTickTask.kt index 6c4a6caf83fe6..e460968341f20 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushTickTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushTickTask.kt @@ -14,10 +14,12 @@ import io.airbyte.cdk.load.message.StreamFlushEvent import io.airbyte.cdk.load.state.Reserved import io.airbyte.cdk.load.task.KillableScope import io.airbyte.cdk.load.task.SyncLevel +import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Value import jakarta.inject.Singleton import java.time.Clock +import kotlinx.coroutines.channels.ClosedSendChannelException @Singleton @Secondary @@ -29,6 +31,8 @@ class FlushTickTask( private val recordQueueSupplier: MessageQueueSupplier>, ) : SyncLevel, KillableScope { + private val log = KotlinLogging.logger {} + override suspend fun execute() { while (true) { waitAndPublishFlushTick() @@ -41,7 +45,14 @@ class FlushTickTask( catalog.streams.forEach { val queue = recordQueueSupplier.get(it.descriptor) - queue.publish(Reserved(value = StreamFlushEvent(clock.millis()))) + if (queue.isClosedForPublish()) { + return@forEach + } + try { + queue.publish(Reserved(value = StreamFlushEvent(clock.millis()))) + } catch (e: ClosedSendChannelException) { + log.info { "Attempted to flush closed queue for ${it.descriptor}. Ignoring..." } + } } } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt index dd2f6f0804c06..295258b055258 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt @@ -102,6 +102,11 @@ class DefaultSpillToDiskTask( } is StreamFlushEvent -> { val forceFlush = timeWindow.isComplete() + if (forceFlush) { + log.info { + "Time window complete for $streamDescriptor@${timeWindow.openedAtMs} closing $tmpFile of (${sizeBytes}b)" + } + } ReadResult(range, sizeBytes, forceFlush = forceFlush) } } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt index 9ae9af6778520..a223966a3bfb0 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt @@ -117,7 +117,7 @@ class DestinationTaskLauncherTest where T : LeveledTask, T : ScopedTask { @Requires(env = ["DestinationTaskLauncherTest"]) class MockQueueWriter : QueueWriter> { override suspend fun publish(message: Reserved) {} - + override fun isClosedForPublish(): Boolean = false override suspend fun close() {} } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/FlushTickTaskTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/FlushTickTaskTest.kt index 31b24d2baf54c..2ffeff5af451e 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/FlushTickTaskTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/FlushTickTaskTest.kt @@ -17,6 +17,7 @@ import io.airbyte.cdk.load.message.MessageQueue import io.airbyte.cdk.load.message.MessageQueueSupplier import io.airbyte.cdk.load.message.StreamFlushEvent import io.airbyte.cdk.load.state.Reserved +import io.mockk.coEvery import io.mockk.coVerify import io.mockk.every import io.mockk.impl.annotations.MockK @@ -25,6 +26,7 @@ import io.mockk.mockk import io.mockk.slot import java.time.Clock import java.util.stream.Stream +import kotlinx.coroutines.channels.ClosedSendChannelException import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -89,6 +91,58 @@ class FlushTickTaskTest { } } + @Test + fun `does not attempt to send flush events for closed queues`() = runTest { + every { catalog.streams } returns + listOf(Fixtures.stream1, Fixtures.stream2, Fixtures.stream3) + val queue1 = mockk>>(relaxed = true) + val queue2 = + mockk>>(relaxed = true) { + every { isClosedForPublish() } returns true + } + val queue3 = mockk>>(relaxed = true) + + every { recordQueueSupplier.get(Fixtures.stream1.descriptor) } returns queue1 + every { recordQueueSupplier.get(Fixtures.stream2.descriptor) } returns queue2 + every { recordQueueSupplier.get(Fixtures.stream3.descriptor) } returns queue3 + + task.waitAndPublishFlushTick() + + val msgSlot1 = slot>() + coVerify(exactly = 1) { queue1.publish(capture(msgSlot1)) } + assert(msgSlot1.captured.value is StreamFlushEvent) + + // no event should be sent for 2 + coVerify(exactly = 0) { queue2.publish(any()) } + + val msgSlot3 = slot>() + coVerify(exactly = 1) { queue3.publish(capture(msgSlot3)) } + assert(msgSlot3.captured.value is StreamFlushEvent) + } + + @Test + fun `handles channel closed exceptions due to race`() = runTest { + every { catalog.streams } returns listOf(Fixtures.stream1, Fixtures.stream2) + val queue1 = + mockk>>(relaxed = true) { + coEvery { publish(any()) } throws ClosedSendChannelException("Closed.") + } + val queue2 = mockk>>(relaxed = true) + + every { recordQueueSupplier.get(Fixtures.stream1.descriptor) } returns queue1 + every { recordQueueSupplier.get(Fixtures.stream2.descriptor) } returns queue2 + + task.waitAndPublishFlushTick() + + val msgSlot1 = slot>() + coVerify(exactly = 1) { queue1.publish(capture(msgSlot1)) } + assert(msgSlot1.captured.value is StreamFlushEvent) + + val msgSlot2 = slot>() + coVerify(exactly = 1) { queue2.publish(capture(msgSlot2)) } + assert(msgSlot2.captured.value is StreamFlushEvent) + } + companion object { @JvmStatic fun streamMatrix(): Stream {