Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java-cdk: better debezium logging #41212

Merged
merged 4 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.41.1 | 2024-07-11 | [\#41212](https://github.com/airbytehq/airbyte/pull/41212) | Improve debezium logging. |
| 0.41.0 | 2024-07-11 | [\#38240](https://github.com/airbytehq/airbyte/pull/38240) | Sources : Changes in CDC interfaces to support WASS algorithm |
| 0.40.11 | 2024-07-08 | [\#41041](https://github.com/airbytehq/airbyte/pull/41041) | Destinations: Fix truncate refreshes incorrectly discarding data if successful attempt had 0 records |
| 0.40.10 | 2024-07-05 | [\#40719](https://github.com/airbytehq/airbyte/pull/40719) | Update test to refrlect isResumable field in catalog |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.41.0
version=0.41.1
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicLong

private val LOGGER = KotlinLogging.logger {}
/**
Expand All @@ -38,29 +39,31 @@ class AirbyteDebeziumHandler<T>(
) {
internal inner class CapacityReportingBlockingQueue<E>(capacity: Int) :
LinkedBlockingQueue<E>(capacity) {
private var lastReport: Instant? = null
private var lastReport: Instant = Instant.MIN
private var puts = AtomicLong()
private var polls = AtomicLong()

private fun reportQueueUtilization() {
if (
lastReport == null ||
Duration.between(lastReport, Instant.now())
.compareTo(Companion.REPORT_DURATION) > 0
) {
private fun reportQueueUtilization(put: Long = 0L, poll: Long = 0L) {
if (Duration.between(lastReport, Instant.now()) > REPORT_DURATION) {
LOGGER.info {
"CDC events queue size: ${this.size}. remaining ${this.remainingCapacity()}"
"CDC events queue stats: " +
"size=${this.size}, " +
"cap=${this.remainingCapacity()}, " +
"puts=${puts.addAndGet(put)}, " +
"polls=${polls.addAndGet(poll)}"
}
synchronized(this) { lastReport = Instant.now() }
}
}

@Throws(InterruptedException::class)
override fun put(e: E) {
reportQueueUtilization()
reportQueueUtilization(put = 1L)
super.put(e)
}

override fun poll(): E {
reportQueueUtilization()
reportQueueUtilization(poll = 1L)
return super.poll()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import io.debezium.engine.ChangeEvent
import io.github.oshai.kotlinlogging.KotlinLogging
import java.lang.reflect.Field
import java.time.Duration
import java.time.Instant
import java.time.LocalDateTime
import java.util.*
import java.util.concurrent.*
Expand Down Expand Up @@ -51,34 +52,60 @@ class DebeziumRecordIterator<T>(
private var lastHeartbeatPosition: T? = null
private var maxInstanceOfNoRecordsFound = 0
private var signalledDebeziumEngineShutdown = false
private var numUnloggedPolls: Int = -1
private var lastLoggedPoll: Instant = Instant.MIN

// The following logic incorporates heartbeat (CDC postgres only for now):
// The following logic incorporates heartbeat:
// 1. Wait on queue either the configured time first or 1 min after a record received
// 2. If nothing came out of queue finish sync
// 3. If received heartbeat: check if hearbeat_lsn reached target or hasn't changed in a while
// finish sync
// 4. If change event lsn reached target finish sync
// 5. Otherwise check message queuen again
// 5. Otherwise check message queue again
override fun computeNext(): ChangeEventWithMetadata? {
// keep trying until the publisher is closed or until the queue is empty. the latter case is
// possible when the publisher has shutdown but the consumer has not yet processed all
// messages it
// emitted.
// messages it emitted.
while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) {
val next: ChangeEvent<String?, String?>?

val waitTime =
if (receivedFirstRecord) this.subsequentRecordWaitTime else this.firstRecordWaitTime
val instantBeforePoll: Instant = Instant.now()
try {
next = queue.poll(waitTime.seconds, TimeUnit.SECONDS)
} catch (e: InterruptedException) {
throw RuntimeException(e)
}
val instantAfterPoll: Instant = Instant.now()
val isEventLogged: Boolean =
numUnloggedPolls >= POLL_LOG_MAX_CALLS_INTERVAL - 1 ||
Duration.between(lastLoggedPoll, instantAfterPoll) > pollLogMaxTimeInterval ||
next == null ||
isHeartbeatEvent(next)
if (isEventLogged) {
val pollDuration: Duration = Duration.between(instantBeforePoll, Instant.now())
LOGGER.info {
"CDC events queue poll(): " +
when (numUnloggedPolls) {
-1 -> "blocked for $pollDuration in its first call."
0 ->
"blocked for $pollDuration after " +
"its previous call which was also logged."
else ->
"blocked for $pollDuration after " +
"$numUnloggedPolls previous call(s) which were not logged."
}
}
numUnloggedPolls = 0
lastLoggedPoll = instantAfterPoll
} else {
numUnloggedPolls++
}

// if within the timeout, the consumer could not get a record, it is time to tell the
// producer to
// shutdown.
// producer to shutdown.
if (next == null) {
LOGGER.info { "CDC events queue poll(): returned nothing." }
if (
!receivedFirstRecord || hasSnapshotFinished || maxInstanceOfNoRecordsFound >= 10
) {
Expand All @@ -90,17 +117,35 @@ class DebeziumRecordIterator<T>(
DebeziumCloseReason.TIMEOUT
)
}
LOGGER.info { "no record found. polling again." }

maxInstanceOfNoRecordsFound++
LOGGER.info {
"CDC events queue poll(): " +
"returned nothing, polling again, attempt $maxInstanceOfNoRecordsFound."
}
continue
}

if (isHeartbeatEvent(next)) {
if (!hasSnapshotFinished) {
LOGGER.info {
"CDC events queue poll(): " +
"returned a heartbeat event while snapshot is not finished yet."
}
continue
}

val heartbeatPos = getHeartbeatPosition(next)
val isProgressing = heartbeatPos != lastHeartbeatPosition
LOGGER.info {
"CDC events queue poll(): " +
"returned a heartbeat event: " +
if (isProgressing) {
"progressing to $heartbeatPos."
} else {
"no progress since last heartbeat."
}
}
// wrap up sync if heartbeat position crossed the target OR heartbeat position
// hasn't changed for
// too long
Expand All @@ -118,7 +163,7 @@ class DebeziumRecordIterator<T>(
)
}

if (heartbeatPos != lastHeartbeatPosition) {
if (isProgressing) {
this.tsLastHeartbeat = LocalDateTime.now()
this.lastHeartbeatPosition = heartbeatPos
}
Expand All @@ -128,6 +173,14 @@ class DebeziumRecordIterator<T>(
val changeEventWithMetadata = ChangeEventWithMetadata(next)
hasSnapshotFinished = !changeEventWithMetadata.isSnapshotEvent

if (isEventLogged) {
val source: JsonNode? = changeEventWithMetadata.eventValueAsJson()["source"]
LOGGER.info {
"CDC events queue poll(): " +
"returned a change event with \"source\": $source."
}
}

// if the last record matches the target file position, it is time to tell the producer
// to shutdown.
if (targetPosition.reachedTargetPosition(changeEventWithMetadata)) {
Expand All @@ -137,6 +190,9 @@ class DebeziumRecordIterator<T>(
)
}
this.tsLastHeartbeat = null
if (!receivedFirstRecord) {
LOGGER.info { "Received first record from debezium." }
}
this.receivedFirstRecord = true
this.maxInstanceOfNoRecordsFound = 0
return changeEventWithMetadata
Expand Down Expand Up @@ -197,15 +253,21 @@ class DebeziumRecordIterator<T>(
}

private fun heartbeatPosNotChanging(): Boolean {
// Closing debezium due to heartbeat position not changing only exists as an escape hatch
// for
// testing setups. In production, we rely on the platform heartbeats to kill the sync
if (!isTest() || this.tsLastHeartbeat == null) {
if (this.tsLastHeartbeat == null) {
return false
} else if (!isTest() && receivedFirstRecord) {
// Closing debezium due to heartbeat position not changing only exists as an escape
// hatch
// for testing setups. In production, we rely on the platform heartbeats to kill the
// sync
// ONLY if we haven't received a record from Debezium. If a record has not been received
// from Debezium and the heartbeat isn't changing, the sync should be shut down due to
// heartbeat position not changing.
return false
}
val timeElapsedSinceLastHeartbeatTs =
Duration.between(this.tsLastHeartbeat, LocalDateTime.now())
return timeElapsedSinceLastHeartbeatTs.compareTo(firstRecordWaitTime.dividedBy(2)) > 0
return timeElapsedSinceLastHeartbeatTs.compareTo(firstRecordWaitTime) > 0
}

private fun requestClose(closeLogMessage: String, closeReason: DebeziumCloseReason) {
Expand Down Expand Up @@ -272,5 +334,8 @@ class DebeziumRecordIterator<T>(
HEARTBEAT_NOT_PROGRESSING
}

companion object {}
companion object {
val pollLogMaxTimeInterval: Duration = Duration.ofSeconds(5)
const val POLL_LOG_MAX_CALLS_INTERVAL = 1_000
}
}
Loading