Skip to content

Commit

Permalink
chore: skip closed streams in flush tick. Log when flushing due to ti…
Browse files Browse the repository at this point in the history
…me wind… (#48605)
  • Loading branch information
tryangul authored Nov 21, 2024
1 parent 2ddbaf3 commit eb25860
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.cdk.load.message

import io.airbyte.cdk.load.util.CloseableCoroutine
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow
Expand All @@ -16,6 +17,7 @@ interface QueueReader<T> {

interface QueueWriter<T> : CloseableCoroutine {
suspend fun publish(message: T)
fun isClosedForPublish(): Boolean
}

interface MessageQueue<T> : QueueReader<T>, QueueWriter<T>
Expand All @@ -29,6 +31,8 @@ abstract class ChannelMessageQueue<T> : MessageQueue<T> {
override suspend fun close() {
channel.close()
}
@OptIn(DelicateCoroutinesApi::class)
override fun isClosedForPublish(): Boolean = channel.isClosedForSend
}

interface MessageQueueSupplier<K, T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ data class TimeWindowTrigger(
private val clock: Clock,
private val windowWidthMs: Long,
) {
private var openedAtMs: Long? = null
var openedAtMs: Long? = null
private set

/*
* Sets window open timestamp for computing completeness. Idempotent. Mutative.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import io.airbyte.cdk.load.message.StreamFlushEvent
import io.airbyte.cdk.load.state.Reserved
import io.airbyte.cdk.load.task.KillableScope
import io.airbyte.cdk.load.task.SyncLevel
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary
import io.micronaut.context.annotation.Value
import jakarta.inject.Singleton
import java.time.Clock
import kotlinx.coroutines.channels.ClosedSendChannelException

@Singleton
@Secondary
Expand All @@ -29,6 +31,8 @@ class FlushTickTask(
private val recordQueueSupplier:
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationStreamEvent>>,
) : SyncLevel, KillableScope {
private val log = KotlinLogging.logger {}

override suspend fun execute() {
while (true) {
waitAndPublishFlushTick()
Expand All @@ -41,7 +45,14 @@ class FlushTickTask(

catalog.streams.forEach {
val queue = recordQueueSupplier.get(it.descriptor)
queue.publish(Reserved(value = StreamFlushEvent(clock.millis())))
if (queue.isClosedForPublish()) {
return@forEach
}
try {
queue.publish(Reserved(value = StreamFlushEvent(clock.millis())))
} catch (e: ClosedSendChannelException) {
log.info { "Attempted to flush closed queue for ${it.descriptor}. Ignoring..." }
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ class DefaultSpillToDiskTask(
}
is StreamFlushEvent -> {
val forceFlush = timeWindow.isComplete()
if (forceFlush) {
log.info {
"Time window complete for $streamDescriptor@${timeWindow.openedAtMs} closing $tmpFile of (${sizeBytes}b)"
}
}
ReadResult(range, sizeBytes, forceFlush = forceFlush)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class DestinationTaskLauncherTest<T> where T : LeveledTask, T : ScopedTask {
@Requires(env = ["DestinationTaskLauncherTest"])
class MockQueueWriter : QueueWriter<Reserved<CheckpointMessageWrapped>> {
override suspend fun publish(message: Reserved<CheckpointMessageWrapped>) {}

override fun isClosedForPublish(): Boolean = false
override suspend fun close() {}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import io.airbyte.cdk.load.message.MessageQueue
import io.airbyte.cdk.load.message.MessageQueueSupplier
import io.airbyte.cdk.load.message.StreamFlushEvent
import io.airbyte.cdk.load.state.Reserved
import io.mockk.coEvery
import io.mockk.coVerify
import io.mockk.every
import io.mockk.impl.annotations.MockK
Expand All @@ -25,6 +26,7 @@ import io.mockk.mockk
import io.mockk.slot
import java.time.Clock
import java.util.stream.Stream
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -89,6 +91,58 @@ class FlushTickTaskTest {
}
}

@Test
fun `does not attempt to send flush events for closed queues`() = runTest {
every { catalog.streams } returns
listOf(Fixtures.stream1, Fixtures.stream2, Fixtures.stream3)
val queue1 = mockk<MessageQueue<Reserved<DestinationStreamEvent>>>(relaxed = true)
val queue2 =
mockk<MessageQueue<Reserved<DestinationStreamEvent>>>(relaxed = true) {
every { isClosedForPublish() } returns true
}
val queue3 = mockk<MessageQueue<Reserved<DestinationStreamEvent>>>(relaxed = true)

every { recordQueueSupplier.get(Fixtures.stream1.descriptor) } returns queue1
every { recordQueueSupplier.get(Fixtures.stream2.descriptor) } returns queue2
every { recordQueueSupplier.get(Fixtures.stream3.descriptor) } returns queue3

task.waitAndPublishFlushTick()

val msgSlot1 = slot<Reserved<DestinationStreamEvent>>()
coVerify(exactly = 1) { queue1.publish(capture(msgSlot1)) }
assert(msgSlot1.captured.value is StreamFlushEvent)

// no event should be sent for 2
coVerify(exactly = 0) { queue2.publish(any()) }

val msgSlot3 = slot<Reserved<DestinationStreamEvent>>()
coVerify(exactly = 1) { queue3.publish(capture(msgSlot3)) }
assert(msgSlot3.captured.value is StreamFlushEvent)
}

@Test
fun `handles channel closed exceptions due to race`() = runTest {
every { catalog.streams } returns listOf(Fixtures.stream1, Fixtures.stream2)
val queue1 =
mockk<MessageQueue<Reserved<DestinationStreamEvent>>>(relaxed = true) {
coEvery { publish(any()) } throws ClosedSendChannelException("Closed.")
}
val queue2 = mockk<MessageQueue<Reserved<DestinationStreamEvent>>>(relaxed = true)

every { recordQueueSupplier.get(Fixtures.stream1.descriptor) } returns queue1
every { recordQueueSupplier.get(Fixtures.stream2.descriptor) } returns queue2

task.waitAndPublishFlushTick()

val msgSlot1 = slot<Reserved<DestinationStreamEvent>>()
coVerify(exactly = 1) { queue1.publish(capture(msgSlot1)) }
assert(msgSlot1.captured.value is StreamFlushEvent)

val msgSlot2 = slot<Reserved<DestinationStreamEvent>>()
coVerify(exactly = 1) { queue2.publish(capture(msgSlot2)) }
assert(msgSlot2.captured.value is StreamFlushEvent)
}

companion object {
@JvmStatic
fun streamMatrix(): Stream<Arguments> {
Expand Down

0 comments on commit eb25860

Please sign in to comment.