Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Load CDK: Checkpoint flush every 15 minutes #46382

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ abstract class DestinationConfiguration : Configuration {
open val estimatedRecordMemoryOverheadRatio: Double =
0.1 // 0 => No overhead, 1.0 => 100% overhead

/**
* If we have not flushed state checkpoints in this amount of time, make a best-effort attempt
* to force a flush.
*/
open val maxCheckpointFlushTimeMs: Long = 15 * 60 * 1000L // 15 minutes

/**
* Micronaut factory which glues [ConfigurationSpecificationSupplier] and
* [DestinationConfigurationFactory] together to produce a [DestinationConfiguration] singleton.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.file

import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton

interface TimeProvider {
fun currentTimeMillis(): Long
edgao marked this conversation as resolved.
Show resolved Hide resolved
suspend fun delay(ms: Long)
}

@Singleton
@Secondary
class DefaultTimeProvider : TimeProvider {
override fun currentTimeMillis(): Long {
return System.currentTimeMillis()
}

override suspend fun delay(ms: Long) {
kotlinx.coroutines.delay(ms)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.airbyte.cdk.state

import io.airbyte.cdk.command.DestinationCatalog
import io.airbyte.cdk.command.DestinationStream
import io.airbyte.cdk.file.TimeProvider
import io.airbyte.cdk.message.CheckpointMessage
import io.airbyte.cdk.message.MessageConverter
import io.airbyte.protocol.models.v0.AirbyteMessage
Expand All @@ -15,18 +16,24 @@ import io.micronaut.core.util.clhm.ConcurrentLinkedHashMap
import jakarta.inject.Singleton
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
import java.util.function.Consumer
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext

/**
* Interface for checkpoint management. Should accept stream and global checkpoints, as well as
* requests to flush all data-sufficient checkpoints.
*/
interface CheckpointManager<K, T> {
fun addStreamCheckpoint(key: K, index: Long, checkpointMessage: T)
fun addGlobalCheckpoint(keyIndexes: List<Pair<K, Long>>, checkpointMessage: T)
suspend fun addStreamCheckpoint(key: K, index: Long, checkpointMessage: T)
suspend fun addGlobalCheckpoint(keyIndexes: List<Pair<K, Long>>, checkpointMessage: T)
suspend fun flushReadyCheckpointMessages()
suspend fun getLastSuccessfulFlushTimeMs(): Long
suspend fun getNextCheckpointIndexes(): Map<K, Long>
}

/**
Expand All @@ -41,15 +48,17 @@ interface CheckpointManager<K, T> {
* TODO: Ensure that checkpoint is flushed at the end, and require that all checkpoints be flushed
* before the destination can succeed.
*/
abstract class StreamsCheckpointManager<T, U>() :
CheckpointManager<DestinationStream.Descriptor, T> {
abstract class StreamsCheckpointManager<T, U> : CheckpointManager<DestinationStream.Descriptor, T> {

private val log = KotlinLogging.logger {}
private val flushLock = Mutex()
protected val lastFlushTimeMs = AtomicLong(0L)

abstract val catalog: DestinationCatalog
abstract val syncManager: SyncManager
abstract val outputFactory: MessageConverter<T, U>
abstract val outputConsumer: Consumer<U>
abstract val timeProvider: TimeProvider

data class GlobalCheckpoint<T>(
val streamIndexes: List<Pair<DestinationStream.Descriptor, Long>>,
Expand All @@ -63,65 +72,73 @@ abstract class StreamsCheckpointManager<T, U>() :
private val globalCheckpoints: ConcurrentLinkedQueue<GlobalCheckpoint<T>> =
ConcurrentLinkedQueue()

override fun addStreamCheckpoint(
override suspend fun addStreamCheckpoint(
key: DestinationStream.Descriptor,
index: Long,
checkpointMessage: T
) {
if (checkpointsAreGlobal.updateAndGet { it == true } != false) {
throw IllegalStateException(
"Global checkpoints cannot be mixed with non-global checkpoints"
)
}
flushLock.withLock {
if (checkpointsAreGlobal.updateAndGet { it == true } != false) {
throw IllegalStateException(
"Global checkpoints cannot be mixed with non-global checkpoints"
)
}

streamCheckpoints.compute(key) { _, indexToMessage ->
val map =
if (indexToMessage == null) {
// If the map doesn't exist yet, build it.
ConcurrentLinkedHashMap.Builder<Long, T>().maximumWeightedCapacity(1000).build()
} else {
if (indexToMessage.isNotEmpty()) {
// Make sure the messages are coming in order
val oldestIndex = indexToMessage.ascendingKeySet().first()
if (oldestIndex > index) {
throw IllegalStateException(
"Checkpoint message received out of order ($oldestIndex before $index)"
)
streamCheckpoints.compute(key) { _, indexToMessage ->
val map =
if (indexToMessage == null) {
// If the map doesn't exist yet, build it.
ConcurrentLinkedHashMap.Builder<Long, T>()
.maximumWeightedCapacity(1000)
.build()
} else {
if (indexToMessage.isNotEmpty()) {
// Make sure the messages are coming in order
val oldestIndex = indexToMessage.ascendingKeySet().first()
if (oldestIndex > index) {
throw IllegalStateException(
"Checkpoint message received out of order ($oldestIndex before $index)"
)
}
}
indexToMessage
}
indexToMessage
}
// Actually add the message
map[index] = checkpointMessage
map
}
// Actually add the message
map[index] = checkpointMessage
map
}

log.info { "Added checkpoint for stream: $key at index: $index" }
log.info { "Added checkpoint for stream: $key at index: $index" }
}
}

// TODO: Is it an error if we don't get all the streams every time?
override fun addGlobalCheckpoint(
override suspend fun addGlobalCheckpoint(
keyIndexes: List<Pair<DestinationStream.Descriptor, Long>>,
checkpointMessage: T
) {
if (checkpointsAreGlobal.updateAndGet { it != false } != true) {
throw IllegalStateException(
"Global checkpoint cannot be mixed with non-global checkpoints"
)
}
flushLock.withLock {
if (checkpointsAreGlobal.updateAndGet { it != false } != true) {
throw IllegalStateException(
"Global checkpoint cannot be mixed with non-global checkpoints"
)
}

val head = globalCheckpoints.peek()
if (head != null) {
val keyIndexesByStream = keyIndexes.associate { it.first to it.second }
head.streamIndexes.forEach {
if (keyIndexesByStream[it.first]!! < it.second) {
throw IllegalStateException("Global checkpoint message received out of order")
val head = globalCheckpoints.peek()
if (head != null) {
val keyIndexesByStream = keyIndexes.associate { it.first to it.second }
head.streamIndexes.forEach {
if (keyIndexesByStream[it.first]!! < it.second) {
throw IllegalStateException(
"Global checkpoint message received out of order"
)
}
}
}
}

globalCheckpoints.add(GlobalCheckpoint(keyIndexes, checkpointMessage))
log.info { "Added global checkpoint with stream indexes: $keyIndexes" }
globalCheckpoints.add(GlobalCheckpoint(keyIndexes, checkpointMessage))
log.info { "Added global checkpoint with stream indexes: $keyIndexes" }
}
}

override suspend fun flushReadyCheckpointMessages() {
Expand All @@ -146,7 +163,7 @@ abstract class StreamsCheckpointManager<T, U>() :
}
}

private fun flushGlobalCheckpoints() {
private suspend fun flushGlobalCheckpoints() {
while (!globalCheckpoints.isEmpty()) {
val head = globalCheckpoints.peek()
val allStreamsPersisted =
Expand All @@ -155,15 +172,14 @@ abstract class StreamsCheckpointManager<T, U>() :
}
if (allStreamsPersisted) {
globalCheckpoints.poll()
val outMessage = outputFactory.from(head.checkpointMessage)
outputConsumer.accept(outMessage)
sendMessage(head.checkpointMessage)
} else {
break
}
}
}

private fun flushStreamCheckpoints() {
private suspend fun flushStreamCheckpoints() {
for (stream in catalog.streams) {
val manager = syncManager.getStreamManager(stream.descriptor)
val streamCheckpoints = streamCheckpoints[stream.descriptor] ?: return
Expand All @@ -173,14 +189,45 @@ abstract class StreamsCheckpointManager<T, U>() :
streamCheckpoints.remove(index)
?: throw IllegalStateException("Checkpoint not found for index: $index")
log.info { "Flushing checkpoint for stream: $stream at index: $index" }
val outMessage = outputFactory.from(checkpointMessage)
outputConsumer.accept(outMessage)
sendMessage(checkpointMessage)
} else {
break
}
}
}
}

private suspend fun sendMessage(checkpointMessage: T) =
withContext(Dispatchers.IO) {
lastFlushTimeMs.set(timeProvider.currentTimeMillis())
val outMessage = outputFactory.from(checkpointMessage)
outputConsumer.accept(outMessage)
}

override suspend fun getLastSuccessfulFlushTimeMs(): Long =
// Return inside the lock to ensure the value reflects flushes in progress
flushLock.withLock { lastFlushTimeMs.get() }

override suspend fun getNextCheckpointIndexes(): Map<DestinationStream.Descriptor, Long> {
flushLock.withLock {
return when (checkpointsAreGlobal.get()) {
null -> {
emptyMap()
}
true -> {
val head = globalCheckpoints.peek()
head?.streamIndexes?.associate { it } ?: emptyMap()
}
false -> {
println("streamCheckpoints: $streamCheckpoints")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

protip: if you use logger.info instead of println, you can pretend that these are all intentional :P

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am bad brain

streamCheckpoints
.mapValues { it.value.ascendingKeySet().firstOrNull() }
.filterValues { it != null }
.mapValues { it.value!! }
}
}
}
}
}

@Singleton
Expand All @@ -189,5 +236,10 @@ class DefaultCheckpointManager(
override val catalog: DestinationCatalog,
override val syncManager: SyncManager,
override val outputFactory: MessageConverter<CheckpointMessage, AirbyteMessage>,
override val outputConsumer: Consumer<AirbyteMessage>
) : StreamsCheckpointManager<CheckpointMessage, AirbyteMessage>()
override val outputConsumer: Consumer<AirbyteMessage>,
override val timeProvider: TimeProvider
) : StreamsCheckpointManager<CheckpointMessage, AirbyteMessage>() {
init {
lastFlushTimeMs.set(timeProvider.currentTimeMillis())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.state

/**
* A multi-reader consumer of events produced by a single-writer [EventProducer].
*
* To use:
* - set up an [EventProducer] with the same type parameter as described in the producer's
* documentation
* - declare a subclass of [EventConsumer] and mark it `@Prototype` (multi-reader)
* - inject the producer and consumers where needed
*/
abstract class EventConsumer<T>(producer: EventProducer<T>) {
val channel = producer.subscribe()

suspend fun consumeMaybe(): T? {
return channel.tryReceive().getOrNull()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.state

import java.util.concurrent.ConcurrentLinkedQueue
import kotlinx.coroutines.channels.Channel

/**
* A single-writer event producer for a multi-reader consumer.
*
* To use
* - declare a subclass of [EventProducer] with the type parameter of the events to produce
* - mark it `@Singleton` (single-writer!)
* - configure [EventConsumer]s as described in the consumer's documentation
* - inject the producer and consumers where needed
*
* TODO: If we need to support different paradigms (multi-writer, etc.), abstract this into an
* interface and provide abstract implementations for each type.
*/
abstract class EventProducer<T> {
private val subscribers = ConcurrentLinkedQueue<Channel<T>>()

fun subscribe(): Channel<T> {
val channel = Channel<T>(Channel.UNLIMITED)
subscribers.add(channel)
return channel
}

suspend fun produce(event: T) {
subscribers.forEach { it.send(event) }
}
}
Loading
Loading