Skip to content

Commit

Permalink
Re-sync in case of stale offset (#48578)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich authored and frifriSF59 committed Nov 21, 2024
1 parent 822862e commit 9c160d5
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ interface StateQuerier {

/** Returns the current state value for the given [feed]. */
fun current(feed: Feed): OpaqueStateValue?

/** Rolls back each feed state. This is required when resyncing CDC from scratch */
fun resetFeedStates()
}

/** Singleton object which tracks the state of an ongoing READ operation. */
Expand Down Expand Up @@ -56,6 +59,10 @@ class StateManager(

override fun current(feed: Feed): OpaqueStateValue? = scoped(feed).current()

override fun resetFeedStates() {
feeds.forEach { f -> scoped(f).set(Jsons.objectNode(), 0) }
}

/** Returns a [StateManagerScopedToFeed] instance scoped to this [feed]. */
fun scoped(feed: Feed): StateManagerScopedToFeed =
when (feed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ class FeedBootstrapTest {
is Global -> globalStateValue
is Stream -> streamStateValue
}

override fun resetFeedStates() {
// no-op
}
}

fun Feed.bootstrap(stateQuerier: StateQuerier): FeedBootstrap<*> =
Expand Down Expand Up @@ -140,6 +144,9 @@ class FeedBootstrapTest {
object : StateQuerier {
override val feeds: List<Feed> = listOf(stream)
override fun current(feed: Feed): OpaqueStateValue? = null
override fun resetFeedStates() {
// no-op
}
}
val streamBootstrap = stream.bootstrap(stateQuerier) as StreamFeedBootstrap
val consumer: StreamRecordConsumer = streamBootstrap.streamRecordConsumer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.cdk.read.cdc

import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.TransientErrorException
import io.airbyte.cdk.read.ConcurrencyResource
import io.airbyte.cdk.read.GlobalFeedBootstrap
import io.airbyte.cdk.read.PartitionReader
Expand All @@ -26,6 +27,8 @@ class CdcPartitionsCreator<T : Comparable<T>>(
private val log = KotlinLogging.logger {}
private val acquiredThread = AtomicReference<ConcurrencyResource.AcquiredThread>()

class OffsetInvalidNeedsResyncIllegalStateException() : IllegalStateException()

override fun tryAcquireResources(): PartitionsCreator.TryAcquireResourcesStatus {
val acquiredThread: ConcurrencyResource.AcquiredThread =
concurrencyResource.tryAcquire()
Expand All @@ -39,6 +42,12 @@ class CdcPartitionsCreator<T : Comparable<T>>(
}

override suspend fun run(): List<PartitionReader> {
if (CDCNeedsRestart) {
globalLockResource.markCdcAsComplete()
throw TransientErrorException(
"Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch."
)
}
val activeStreams: List<Stream> by lazy {
feedBootstrap.feed.streams.filter { feedBootstrap.stateQuerier.current(it) != null }
}
Expand All @@ -60,6 +69,13 @@ class CdcPartitionsCreator<T : Comparable<T>>(
log.error(ex) { "Existing state is invalid." }
globalLockResource.markCdcAsComplete()
throw ex
} catch (_: OffsetInvalidNeedsResyncIllegalStateException) {
// If deserialization concludes we need a re-sync we rollback stream states
// and put the creator in a Need Restart mode.
// The next round will throw a transient error to kickoff the resync
feedBootstrap.stateQuerier.resetFeedStates()
CDCNeedsRestart = true
syntheticInput
}
}
}
Expand Down Expand Up @@ -102,4 +118,8 @@ class CdcPartitionsCreator<T : Comparable<T>>(
log.info { "Current position '$lowerBound' does not exceed target position '$upperBound'." }
return listOf(partitionReader)
}

companion object {
var CDCNeedsRestart: Boolean = false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,20 @@ sealed class JdbcPartitionsCreator<

private val acquiredResources = AtomicReference<AcquiredResources>()

// A reader that only checkpoints the complete state of a partition
// used for empty tables
inner class CheckpointOnlyPartitionReader() : PartitionReader {
override fun tryAcquireResources(): PartitionReader.TryAcquireResourcesStatus =
PartitionReader.TryAcquireResourcesStatus.READY_TO_RUN

override suspend fun run() {}

override fun checkpoint(): PartitionReadCheckpoint =
PartitionReadCheckpoint(partition.completeState, 0)

override fun releaseResources() {}
}

/** Calling [close] releases the resources acquired for the [JdbcPartitionsCreator]. */
fun interface AcquiredResources : AutoCloseable

Expand Down Expand Up @@ -128,7 +142,7 @@ class JdbcSequentialPartitionsCreator<
ensureCursorUpperBound()
if (streamState.cursorUpperBound?.isNull == true) {
log.info { "Maximum cursor column value query found that the table was empty." }
return listOf()
return listOf(CheckpointOnlyPartitionReader())
}
}
if (streamState.fetchSize == null) {
Expand All @@ -140,7 +154,9 @@ class JdbcSequentialPartitionsCreator<
log.info { "Table memory size estimated at ${expectedTableByteSize shr 20} MiB." }
if (rowByteSizeSample.kind == Sample.Kind.EMPTY) {
log.info { "Sampling query found that the table was empty." }
return listOf()
// An empty table will checkpoint once in order to emit its complete state
// Otherwise on each sync we would try to read this partition
return listOf(CheckpointOnlyPartitionReader())
}
streamState.fetchSize =
sharedState.jdbcFetchSizeEstimator().apply(rowByteSizeSample)
Expand Down Expand Up @@ -178,7 +194,7 @@ class JdbcConcurrentPartitionsCreator<
ensureCursorUpperBound()
if (streamState.cursorUpperBound?.isNull == true) {
log.info { "Maximum cursor column value query found that the table was empty." }
return listOf()
return listOf(CheckpointOnlyPartitionReader())
}
}
// Handle edge case where the table can't be sampled.
Expand All @@ -198,7 +214,7 @@ class JdbcConcurrentPartitionsCreator<
}
if (sample.kind == Sample.Kind.EMPTY) {
log.info { "Sampling query found that the table was empty." }
return listOf()
return listOf(CheckpointOnlyPartitionReader())
}
val rowByteSizeSample: Sample<Long> = sample.map { (_, rowByteSize: Long) -> rowByteSize }
streamState.fetchSize = sharedState.jdbcFetchSizeEstimator().apply(rowByteSizeSample)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ object TestFixtures {
object MockStateQuerier : StateQuerier {
override val feeds: List<Feed> = listOf()
override fun current(feed: Feed): OpaqueStateValue? = null
override fun resetFeedStates() {
// no-op
}
}

object MockMetaFieldDecorator : MetaFieldDecorator {
Expand All @@ -212,6 +215,9 @@ object TestFixtures {
object : StateQuerier {
override val feeds: List<Feed> = listOf(this@bootstrap)
override fun current(feed: Feed): OpaqueStateValue? = opaqueStateValue
override fun resetFeedStates() {
// no-op
}
},
stream = this
)
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.9.0-rc.16
dockerImageTag: 3.9.0-rc.17
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.data.LeafAirbyteSchemaType
import io.airbyte.cdk.data.LocalDateTimeCodec
import io.airbyte.cdk.data.LocalDateTimeCodec.formatter
import io.airbyte.cdk.data.OffsetDateTimeCodec
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.jdbc.JdbcConnectionFactory
Expand All @@ -30,7 +29,7 @@ import io.micronaut.context.annotation.Primary
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.time.format.DateTimeParseException
import java.util.Base64
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import javax.inject.Singleton

Expand Down Expand Up @@ -158,7 +157,10 @@ class MysqlJdbcPartitionFactory(
val stream: Stream = streamFeedBootstrap.feed
val streamState: DefaultJdbcStreamState = streamState(streamFeedBootstrap)
val opaqueStateValue: OpaqueStateValue =
streamFeedBootstrap.currentState ?: return coldStart(streamState)
when (streamFeedBootstrap.currentState?.isEmpty) {
false -> streamFeedBootstrap.currentState!!
else -> return coldStart(streamState)
}

val isCursorBased: Boolean = !sharedState.configuration.global

Expand Down Expand Up @@ -315,7 +317,7 @@ class MysqlJdbcPartitionFactory(
LocalDateTime.parse(stateValue, formatter)
.format(LocalDateTimeCodec.formatter)
)
} catch (e: DateTimeParseException) {
} catch (_: DateTimeParseException) {
// Resolve to use the new format.
Jsons.valueToTree(stateValue)
}
Expand All @@ -331,7 +333,7 @@ class MysqlJdbcPartitionFactory(
.atOffset(java.time.ZoneOffset.UTC)
.format(OffsetDateTimeCodec.formatter)
)
} catch (e: DateTimeParseException) {
} catch (_: DateTimeParseException) {
// Resolve to use the new format.
Jsons.valueToTree(stateValue)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import io.airbyte.cdk.jdbc.JdbcConnectionFactory
import io.airbyte.cdk.jdbc.LongFieldType
import io.airbyte.cdk.jdbc.StringFieldType
import io.airbyte.cdk.read.Stream
import io.airbyte.cdk.read.cdc.CdcPartitionsCreator.OffsetInvalidNeedsResyncIllegalStateException
import io.airbyte.cdk.read.cdc.DebeziumInput
import io.airbyte.cdk.read.cdc.DebeziumOffset
import io.airbyte.cdk.read.cdc.DebeziumOperations
Expand Down Expand Up @@ -307,6 +308,9 @@ class MySqlDebeziumOperations(
"Saved offset no longer present on the server. Please reset the connection, and then increase binlog retention and/or increase sync frequency."
)
}
if (cdcValidationResult == CdcStateValidateResult.INVALID_RESET) {
throw OffsetInvalidNeedsResyncIllegalStateException()
}
return synthesize()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ class MysqlJdbcPartitionFactoryTest {
override val feeds: List<Feed> = listOf(stream)
override fun current(feed: Feed): OpaqueStateValue? =
if (feed == stream) incumbentStateValue else null
override fun resetFeedStates() {
/* no-op */
}
},
stream,
)
Expand Down

0 comments on commit 9c160d5

Please sign in to comment.