From 10a261489ca56c8f37a3c2bbe48ecc75592badc6 Mon Sep 17 00:00:00 2001 From: Marius Posta Date: Thu, 11 Jul 2024 12:20:42 -0700 Subject: [PATCH] java-cdk: better debezium logging (#41212) Co-authored-by: Akash Kulkarni --- airbyte-cdk/java/airbyte-cdk/README.md | 1 + .../src/main/resources/version.properties | 2 +- .../debezium/AirbyteDebeziumHandler.kt | 23 +++-- .../internals/DebeziumRecordIterator.kt | 95 ++++++++++++++++--- 4 files changed, 95 insertions(+), 26 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 64e21ad29ce7..1387a16e9f2b 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -174,6 +174,7 @@ corresponds to that version. | Version | Date | Pull Request | Subject | |:-----------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.41.1 | 2024-07-11 | [\#41212](https://github.com/airbytehq/airbyte/pull/41212) | Improve debezium logging. | | 0.41.0 | 2024-07-11 | [\#38240](https://github.com/airbytehq/airbyte/pull/38240) | Sources : Changes in CDC interfaces to support WASS algorithm | | 0.40.11 | 2024-07-08 | [\#41041](https://github.com/airbytehq/airbyte/pull/41041) | Destinations: Fix truncate refreshes incorrectly discarding data if successful attempt had 0 records | | 0.40.10 | 2024-07-05 | [\#40719](https://github.com/airbytehq/airbyte/pull/40719) | Update test to refrlect isResumable field in catalog | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index a1e1bd3c1a8c..1cdaa4e6d53a 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.41.0 \ No newline at end of file +version=0.41.1 \ No newline at end of file diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt index ce424a4254bf..c86c9836d225 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt @@ -22,6 +22,7 @@ import java.time.Instant import java.time.temporal.ChronoUnit import java.util.* import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.AtomicLong private val LOGGER = KotlinLogging.logger {} /** @@ -38,16 +39,18 @@ class AirbyteDebeziumHandler( ) { internal inner class CapacityReportingBlockingQueue(capacity: Int) : LinkedBlockingQueue(capacity) { - private var lastReport: Instant? = null + private var lastReport: Instant = Instant.MIN + private var puts = AtomicLong() + private var polls = AtomicLong() - private fun reportQueueUtilization() { - if ( - lastReport == null || - Duration.between(lastReport, Instant.now()) - .compareTo(Companion.REPORT_DURATION) > 0 - ) { + private fun reportQueueUtilization(put: Long = 0L, poll: Long = 0L) { + if (Duration.between(lastReport, Instant.now()) > REPORT_DURATION) { LOGGER.info { - "CDC events queue size: ${this.size}. remaining ${this.remainingCapacity()}" + "CDC events queue stats: " + + "size=${this.size}, " + + "cap=${this.remainingCapacity()}, " + + "puts=${puts.addAndGet(put)}, " + + "polls=${polls.addAndGet(poll)}" } synchronized(this) { lastReport = Instant.now() } } @@ -55,12 +58,12 @@ class AirbyteDebeziumHandler( @Throws(InterruptedException::class) override fun put(e: E) { - reportQueueUtilization() + reportQueueUtilization(put = 1L) super.put(e) } override fun poll(): E { - reportQueueUtilization() + reportQueueUtilization(poll = 1L) return super.poll() } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.kt index 1e2389b43cbf..25a2e22a909d 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.kt @@ -15,6 +15,7 @@ import io.debezium.engine.ChangeEvent import io.github.oshai.kotlinlogging.KotlinLogging import java.lang.reflect.Field import java.time.Duration +import java.time.Instant import java.time.LocalDateTime import java.util.* import java.util.concurrent.* @@ -51,34 +52,60 @@ class DebeziumRecordIterator( private var lastHeartbeatPosition: T? = null private var maxInstanceOfNoRecordsFound = 0 private var signalledDebeziumEngineShutdown = false + private var numUnloggedPolls: Int = -1 + private var lastLoggedPoll: Instant = Instant.MIN - // The following logic incorporates heartbeat (CDC postgres only for now): + // The following logic incorporates heartbeat: // 1. Wait on queue either the configured time first or 1 min after a record received // 2. If nothing came out of queue finish sync // 3. If received heartbeat: check if hearbeat_lsn reached target or hasn't changed in a while // finish sync // 4. If change event lsn reached target finish sync - // 5. Otherwise check message queuen again + // 5. Otherwise check message queue again override fun computeNext(): ChangeEventWithMetadata? { // keep trying until the publisher is closed or until the queue is empty. the latter case is // possible when the publisher has shutdown but the consumer has not yet processed all - // messages it - // emitted. + // messages it emitted. while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) { val next: ChangeEvent? - val waitTime = if (receivedFirstRecord) this.subsequentRecordWaitTime else this.firstRecordWaitTime + val instantBeforePoll: Instant = Instant.now() try { next = queue.poll(waitTime.seconds, TimeUnit.SECONDS) } catch (e: InterruptedException) { throw RuntimeException(e) } + val instantAfterPoll: Instant = Instant.now() + val isEventLogged: Boolean = + numUnloggedPolls >= POLL_LOG_MAX_CALLS_INTERVAL - 1 || + Duration.between(lastLoggedPoll, instantAfterPoll) > pollLogMaxTimeInterval || + next == null || + isHeartbeatEvent(next) + if (isEventLogged) { + val pollDuration: Duration = Duration.between(instantBeforePoll, Instant.now()) + LOGGER.info { + "CDC events queue poll(): " + + when (numUnloggedPolls) { + -1 -> "blocked for $pollDuration in its first call." + 0 -> + "blocked for $pollDuration after " + + "its previous call which was also logged." + else -> + "blocked for $pollDuration after " + + "$numUnloggedPolls previous call(s) which were not logged." + } + } + numUnloggedPolls = 0 + lastLoggedPoll = instantAfterPoll + } else { + numUnloggedPolls++ + } // if within the timeout, the consumer could not get a record, it is time to tell the - // producer to - // shutdown. + // producer to shutdown. if (next == null) { + LOGGER.info { "CDC events queue poll(): returned nothing." } if ( !receivedFirstRecord || hasSnapshotFinished || maxInstanceOfNoRecordsFound >= 10 ) { @@ -90,17 +117,35 @@ class DebeziumRecordIterator( DebeziumCloseReason.TIMEOUT ) } - LOGGER.info { "no record found. polling again." } + maxInstanceOfNoRecordsFound++ + LOGGER.info { + "CDC events queue poll(): " + + "returned nothing, polling again, attempt $maxInstanceOfNoRecordsFound." + } continue } if (isHeartbeatEvent(next)) { if (!hasSnapshotFinished) { + LOGGER.info { + "CDC events queue poll(): " + + "returned a heartbeat event while snapshot is not finished yet." + } continue } val heartbeatPos = getHeartbeatPosition(next) + val isProgressing = heartbeatPos != lastHeartbeatPosition + LOGGER.info { + "CDC events queue poll(): " + + "returned a heartbeat event: " + + if (isProgressing) { + "progressing to $heartbeatPos." + } else { + "no progress since last heartbeat." + } + } // wrap up sync if heartbeat position crossed the target OR heartbeat position // hasn't changed for // too long @@ -118,7 +163,7 @@ class DebeziumRecordIterator( ) } - if (heartbeatPos != lastHeartbeatPosition) { + if (isProgressing) { this.tsLastHeartbeat = LocalDateTime.now() this.lastHeartbeatPosition = heartbeatPos } @@ -128,6 +173,14 @@ class DebeziumRecordIterator( val changeEventWithMetadata = ChangeEventWithMetadata(next) hasSnapshotFinished = !changeEventWithMetadata.isSnapshotEvent + if (isEventLogged) { + val source: JsonNode? = changeEventWithMetadata.eventValueAsJson()["source"] + LOGGER.info { + "CDC events queue poll(): " + + "returned a change event with \"source\": $source." + } + } + // if the last record matches the target file position, it is time to tell the producer // to shutdown. if (targetPosition.reachedTargetPosition(changeEventWithMetadata)) { @@ -137,6 +190,9 @@ class DebeziumRecordIterator( ) } this.tsLastHeartbeat = null + if (!receivedFirstRecord) { + LOGGER.info { "Received first record from debezium." } + } this.receivedFirstRecord = true this.maxInstanceOfNoRecordsFound = 0 return changeEventWithMetadata @@ -197,15 +253,21 @@ class DebeziumRecordIterator( } private fun heartbeatPosNotChanging(): Boolean { - // Closing debezium due to heartbeat position not changing only exists as an escape hatch - // for - // testing setups. In production, we rely on the platform heartbeats to kill the sync - if (!isTest() || this.tsLastHeartbeat == null) { + if (this.tsLastHeartbeat == null) { + return false + } else if (!isTest() && receivedFirstRecord) { + // Closing debezium due to heartbeat position not changing only exists as an escape + // hatch + // for testing setups. In production, we rely on the platform heartbeats to kill the + // sync + // ONLY if we haven't received a record from Debezium. If a record has not been received + // from Debezium and the heartbeat isn't changing, the sync should be shut down due to + // heartbeat position not changing. return false } val timeElapsedSinceLastHeartbeatTs = Duration.between(this.tsLastHeartbeat, LocalDateTime.now()) - return timeElapsedSinceLastHeartbeatTs.compareTo(firstRecordWaitTime.dividedBy(2)) > 0 + return timeElapsedSinceLastHeartbeatTs.compareTo(firstRecordWaitTime) > 0 } private fun requestClose(closeLogMessage: String, closeReason: DebeziumCloseReason) { @@ -272,5 +334,8 @@ class DebeziumRecordIterator( HEARTBEAT_NOT_PROGRESSING } - companion object {} + companion object { + val pollLogMaxTimeInterval: Duration = Duration.ofSeconds(5) + const val POLL_LOG_MAX_CALLS_INTERVAL = 1_000 + } }