Skip to content

Commit

Permalink
java-cdk: better debezium logging (#41212)
Browse files Browse the repository at this point in the history
Co-authored-by: Akash Kulkarni <akash@airbyte.io>
  • Loading branch information
postamar and akashkulk authored Jul 11, 2024
1 parent c97cdb0 commit 10a2614
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 26 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.41.1 | 2024-07-11 | [\#41212](https://github.com/airbytehq/airbyte/pull/41212) | Improve debezium logging. |
| 0.41.0 | 2024-07-11 | [\#38240](https://github.com/airbytehq/airbyte/pull/38240) | Sources : Changes in CDC interfaces to support WASS algorithm |
| 0.40.11 | 2024-07-08 | [\#41041](https://github.com/airbytehq/airbyte/pull/41041) | Destinations: Fix truncate refreshes incorrectly discarding data if successful attempt had 0 records |
| 0.40.10 | 2024-07-05 | [\#40719](https://github.com/airbytehq/airbyte/pull/40719) | Update test to refrlect isResumable field in catalog |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.41.0
version=0.41.1
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicLong

private val LOGGER = KotlinLogging.logger {}
/**
Expand All @@ -38,29 +39,31 @@ class AirbyteDebeziumHandler<T>(
) {
internal inner class CapacityReportingBlockingQueue<E>(capacity: Int) :
LinkedBlockingQueue<E>(capacity) {
private var lastReport: Instant? = null
private var lastReport: Instant = Instant.MIN
private var puts = AtomicLong()
private var polls = AtomicLong()

private fun reportQueueUtilization() {
if (
lastReport == null ||
Duration.between(lastReport, Instant.now())
.compareTo(Companion.REPORT_DURATION) > 0
) {
private fun reportQueueUtilization(put: Long = 0L, poll: Long = 0L) {
if (Duration.between(lastReport, Instant.now()) > REPORT_DURATION) {
LOGGER.info {
"CDC events queue size: ${this.size}. remaining ${this.remainingCapacity()}"
"CDC events queue stats: " +
"size=${this.size}, " +
"cap=${this.remainingCapacity()}, " +
"puts=${puts.addAndGet(put)}, " +
"polls=${polls.addAndGet(poll)}"
}
synchronized(this) { lastReport = Instant.now() }
}
}

@Throws(InterruptedException::class)
override fun put(e: E) {
reportQueueUtilization()
reportQueueUtilization(put = 1L)
super.put(e)
}

override fun poll(): E {
reportQueueUtilization()
reportQueueUtilization(poll = 1L)
return super.poll()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import io.debezium.engine.ChangeEvent
import io.github.oshai.kotlinlogging.KotlinLogging
import java.lang.reflect.Field
import java.time.Duration
import java.time.Instant
import java.time.LocalDateTime
import java.util.*
import java.util.concurrent.*
Expand Down Expand Up @@ -51,34 +52,60 @@ class DebeziumRecordIterator<T>(
private var lastHeartbeatPosition: T? = null
private var maxInstanceOfNoRecordsFound = 0
private var signalledDebeziumEngineShutdown = false
private var numUnloggedPolls: Int = -1
private var lastLoggedPoll: Instant = Instant.MIN

// The following logic incorporates heartbeat (CDC postgres only for now):
// The following logic incorporates heartbeat:
// 1. Wait on queue either the configured time first or 1 min after a record received
// 2. If nothing came out of queue finish sync
// 3. If received heartbeat: check if hearbeat_lsn reached target or hasn't changed in a while
// finish sync
// 4. If change event lsn reached target finish sync
// 5. Otherwise check message queuen again
// 5. Otherwise check message queue again
override fun computeNext(): ChangeEventWithMetadata? {
// keep trying until the publisher is closed or until the queue is empty. the latter case is
// possible when the publisher has shutdown but the consumer has not yet processed all
// messages it
// emitted.
// messages it emitted.
while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) {
val next: ChangeEvent<String?, String?>?

val waitTime =
if (receivedFirstRecord) this.subsequentRecordWaitTime else this.firstRecordWaitTime
val instantBeforePoll: Instant = Instant.now()
try {
next = queue.poll(waitTime.seconds, TimeUnit.SECONDS)
} catch (e: InterruptedException) {
throw RuntimeException(e)
}
val instantAfterPoll: Instant = Instant.now()
val isEventLogged: Boolean =
numUnloggedPolls >= POLL_LOG_MAX_CALLS_INTERVAL - 1 ||
Duration.between(lastLoggedPoll, instantAfterPoll) > pollLogMaxTimeInterval ||
next == null ||
isHeartbeatEvent(next)
if (isEventLogged) {
val pollDuration: Duration = Duration.between(instantBeforePoll, Instant.now())
LOGGER.info {
"CDC events queue poll(): " +
when (numUnloggedPolls) {
-1 -> "blocked for $pollDuration in its first call."
0 ->
"blocked for $pollDuration after " +
"its previous call which was also logged."
else ->
"blocked for $pollDuration after " +
"$numUnloggedPolls previous call(s) which were not logged."
}
}
numUnloggedPolls = 0
lastLoggedPoll = instantAfterPoll
} else {
numUnloggedPolls++
}

// if within the timeout, the consumer could not get a record, it is time to tell the
// producer to
// shutdown.
// producer to shutdown.
if (next == null) {
LOGGER.info { "CDC events queue poll(): returned nothing." }
if (
!receivedFirstRecord || hasSnapshotFinished || maxInstanceOfNoRecordsFound >= 10
) {
Expand All @@ -90,17 +117,35 @@ class DebeziumRecordIterator<T>(
DebeziumCloseReason.TIMEOUT
)
}
LOGGER.info { "no record found. polling again." }

maxInstanceOfNoRecordsFound++
LOGGER.info {
"CDC events queue poll(): " +
"returned nothing, polling again, attempt $maxInstanceOfNoRecordsFound."
}
continue
}

if (isHeartbeatEvent(next)) {
if (!hasSnapshotFinished) {
LOGGER.info {
"CDC events queue poll(): " +
"returned a heartbeat event while snapshot is not finished yet."
}
continue
}

val heartbeatPos = getHeartbeatPosition(next)
val isProgressing = heartbeatPos != lastHeartbeatPosition
LOGGER.info {
"CDC events queue poll(): " +
"returned a heartbeat event: " +
if (isProgressing) {
"progressing to $heartbeatPos."
} else {
"no progress since last heartbeat."
}
}
// wrap up sync if heartbeat position crossed the target OR heartbeat position
// hasn't changed for
// too long
Expand All @@ -118,7 +163,7 @@ class DebeziumRecordIterator<T>(
)
}

if (heartbeatPos != lastHeartbeatPosition) {
if (isProgressing) {
this.tsLastHeartbeat = LocalDateTime.now()
this.lastHeartbeatPosition = heartbeatPos
}
Expand All @@ -128,6 +173,14 @@ class DebeziumRecordIterator<T>(
val changeEventWithMetadata = ChangeEventWithMetadata(next)
hasSnapshotFinished = !changeEventWithMetadata.isSnapshotEvent

if (isEventLogged) {
val source: JsonNode? = changeEventWithMetadata.eventValueAsJson()["source"]
LOGGER.info {
"CDC events queue poll(): " +
"returned a change event with \"source\": $source."
}
}

// if the last record matches the target file position, it is time to tell the producer
// to shutdown.
if (targetPosition.reachedTargetPosition(changeEventWithMetadata)) {
Expand All @@ -137,6 +190,9 @@ class DebeziumRecordIterator<T>(
)
}
this.tsLastHeartbeat = null
if (!receivedFirstRecord) {
LOGGER.info { "Received first record from debezium." }
}
this.receivedFirstRecord = true
this.maxInstanceOfNoRecordsFound = 0
return changeEventWithMetadata
Expand Down Expand Up @@ -197,15 +253,21 @@ class DebeziumRecordIterator<T>(
}

private fun heartbeatPosNotChanging(): Boolean {
// Closing debezium due to heartbeat position not changing only exists as an escape hatch
// for
// testing setups. In production, we rely on the platform heartbeats to kill the sync
if (!isTest() || this.tsLastHeartbeat == null) {
if (this.tsLastHeartbeat == null) {
return false
} else if (!isTest() && receivedFirstRecord) {
// Closing debezium due to heartbeat position not changing only exists as an escape
// hatch
// for testing setups. In production, we rely on the platform heartbeats to kill the
// sync
// ONLY if we haven't received a record from Debezium. If a record has not been received
// from Debezium and the heartbeat isn't changing, the sync should be shut down due to
// heartbeat position not changing.
return false
}
val timeElapsedSinceLastHeartbeatTs =
Duration.between(this.tsLastHeartbeat, LocalDateTime.now())
return timeElapsedSinceLastHeartbeatTs.compareTo(firstRecordWaitTime.dividedBy(2)) > 0
return timeElapsedSinceLastHeartbeatTs.compareTo(firstRecordWaitTime) > 0
}

private fun requestClose(closeLogMessage: String, closeReason: DebeziumCloseReason) {
Expand Down Expand Up @@ -272,5 +334,8 @@ class DebeziumRecordIterator<T>(
HEARTBEAT_NOT_PROGRESSING
}

companion object {}
companion object {
val pollLogMaxTimeInterval: Duration = Duration.ofSeconds(5)
const val POLL_LOG_MAX_CALLS_INTERVAL = 1_000
}
}

0 comments on commit 10a2614

Please sign in to comment.