Skip to content

Commit

Permalink
convert #36333 to kotlin
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Mar 27, 2024
1 parent b618796 commit 75c80ce
Show file tree
Hide file tree
Showing 14 changed files with 95 additions and 264 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ import java.util.*
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import org.slf4j.Logger
import org.slf4j.LoggerFactory

/** Source operation skeleton for JDBC compatible databases. */
abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
JdbcCompatibleSourceOperations<Datatype> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class JdbcSourceOperations :
preparedStatement: PreparedStatement,
parameterIndex: Int,
cursorFieldType: JDBCType?,
value: String?
value: String
) {
when (cursorFieldType) {
JDBCType.TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value)
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@

/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@

/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.SyncMode
import io.debezium.engine.ChangeEvent
import io.debezium.engine.DebeziumEngine
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Duration
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.concurrent.LinkedBlockingQueue
import org.slf4j.Logger
import org.slf4j.LoggerFactory

/**
* This class acts as the bridge between Airbyte DB connectors and debezium. If a DB connector wants
Expand Down Expand Up @@ -121,17 +121,25 @@ class AirbyteDebeziumHandler<T>(
config[DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY].asLong()
else DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS.toLong()

val messageProducer: DebeziumMessageProducer<T> = DebeziumMessageProducer<T>(cdcStateHandler,
val messageProducer: DebeziumMessageProducer<T> =
DebeziumMessageProducer<T>(
cdcStateHandler,
targetPosition,
eventConverter,
offsetManager,
schemaHistoryManager)

schemaHistoryManager
)

// Usually sourceStateIterator requires airbyteStream as input. For DBZ iterator, stream is not used
// Usually sourceStateIterator requires airbyteStream as input. For DBZ iterator, stream is
// not used
// at all thus we will pass in null.
val iterator: SourceStateIterator<ChangeEventWithMetadata> =
SourceStateIterator<ChangeEventWithMetadata>(eventIterator, null, messageProducer!!, StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration))
SourceStateIterator<ChangeEventWithMetadata>(
eventIterator,
null,
messageProducer!!,
StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration)
)
return AutoCloseableIterators.fromIterator<AirbyteMessage>(iterator)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@ interface CdcTargetPosition<T> {
* @param event Event from the CDC load
* @return Returns `true` when the event is ahead of the offset. Otherwise, it returns `false`
*/
fun isEventAheadOffset(
offset: Map<String, String>?,
event: ChangeEventWithMetadata?
): Boolean {
fun isEventAheadOffset(offset: Map<String, String>?, event: ChangeEventWithMetadata?): Boolean {
return false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,33 @@ import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageP
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import java.util.*
import org.apache.kafka.connect.errors.ConnectException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.*

class DebeziumMessageProducer<T>(private val cdcStateHandler: CdcStateHandler,
targetPosition: CdcTargetPosition<T>,
eventConverter: DebeziumEventConverter,
offsetManager: AirbyteFileOffsetBackingStore?,
schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage>) : SourceStateMessageProducer<ChangeEventWithMetadata> {
class DebeziumMessageProducer<T>(
private val cdcStateHandler: CdcStateHandler,
targetPosition: CdcTargetPosition<T>,
eventConverter: DebeziumEventConverter,
offsetManager: AirbyteFileOffsetBackingStore?,
schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage>
) : SourceStateMessageProducer<ChangeEventWithMetadata> {
/**
* `checkpointOffsetToSend` is used as temporal storage for the offset that we want to send as
* message. As Debezium is reading records faster that we process them, if we try to send
* `offsetManger.read()` offset, it is possible that the state is behind the record we are currently
* propagating. To avoid that, we store the offset as soon as we reach the checkpoint threshold
* (time or records) and we wait to send it until we are sure that the record we are processing is
* behind the offset to be sent.
* `offsetManger.read()` offset, it is possible that the state is behind the record we are
* currently propagating. To avoid that, we store the offset as soon as we reach the checkpoint
* threshold (time or records) and we wait to send it until we are sure that the record we are
* processing is behind the offset to be sent.
*/
private val checkpointOffsetToSend = HashMap<String, String>()

/**
* `previousCheckpointOffset` is used to make sure we don't send duplicated states with the same
* offset. Is it possible that the offset Debezium report doesn't move for a period of time, and if
* we just rely on the `offsetManger.read()`, there is a chance to sent duplicate states, generating
* an unneeded usage of networking and processing.
* offset. Is it possible that the offset Debezium report doesn't move for a period of time, and
* if we just rely on the `offsetManger.read()`, there is a chance to sent duplicate states,
* generating an unneeded usage of networking and processing.
*/
private val initialOffset: HashMap<String, String>
private val previousCheckpointOffset: HashMap<String?, String?>
Expand All @@ -57,7 +59,9 @@ class DebeziumMessageProducer<T>(private val cdcStateHandler: CdcStateHandler,
this.initialOffset = HashMap(this.previousCheckpointOffset)
}

override fun generateStateMessageAtCheckpoint(stream: ConfiguredAirbyteStream?): AirbyteStateMessage {
override fun generateStateMessageAtCheckpoint(
stream: ConfiguredAirbyteStream?
): AirbyteStateMessage {
LOGGER.info("Sending CDC checkpoint state message.")
val stateMessage = createStateMessage(checkpointOffsetToSend)
previousCheckpointOffset.clear()
Expand All @@ -72,15 +76,20 @@ class DebeziumMessageProducer<T>(private val cdcStateHandler: CdcStateHandler,
* @param message
* @return
*/
override fun processRecordMessage(stream: ConfiguredAirbyteStream?, message: ChangeEventWithMetadata): AirbyteMessage {
override fun processRecordMessage(
stream: ConfiguredAirbyteStream?,
message: ChangeEventWithMetadata
): AirbyteMessage {
if (checkpointOffsetToSend.isEmpty()) {
try {
val temporalOffset = offsetManager!!.read()
if (!targetPosition.isSameOffset(previousCheckpointOffset, temporalOffset)) {
checkpointOffsetToSend.putAll(temporalOffset)
}
} catch (e: ConnectException) {
LOGGER.warn("Offset file is being written by Debezium. Skipping CDC checkpoint in this loop.")
LOGGER.warn(
"Offset file is being written by Debezium. Skipping CDC checkpoint in this loop."
)
}
}

Expand Down Expand Up @@ -122,7 +131,14 @@ class DebeziumMessageProducer<T>(private val cdcStateHandler: CdcStateHandler,
*/
private fun createStateMessage(offset: Map<String, String>): AirbyteStateMessage {
val message =
cdcStateHandler.saveState(offset, schemaHistoryManager.map { obj: AirbyteSchemaHistoryStorage -> obj.read() }.orElse(null))!!.state
cdcStateHandler
.saveState(
offset,
schemaHistoryManager
.map { obj: AirbyteSchemaHistoryStorage -> obj.read() }
.orElse(null)
)!!
.state
return message
}

Expand Down

This file was deleted.

Loading

0 comments on commit 75c80ce

Please sign in to comment.