From 310e6bd315238144349e0f1c5ff28291893800ed Mon Sep 17 00:00:00 2001 From: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com> Date: Thu, 11 Jul 2024 08:54:54 -0700 Subject: [PATCH] [Source-mysql] : Implement WASS algo (#38240) Co-authored-by: Evan Tahler --- airbyte-cdk/java/airbyte-cdk/README.md | 1 + .../src/main/resources/version.properties | 2 +- .../source/jdbc/AbstractJdbcSource.kt | 7 +- .../source/relationaldb/InitialLoadHandler.kt | 5 +- .../relationaldb/InitialLoadTimeoutUtil.kt | 56 +++++++ .../relationaldb/state/GlobalStateManager.kt | 12 +- .../integrations/debezium/CdcSourceTest.kt | 72 ++++---- .../connectors/source-mysql/build.gradle | 2 +- .../connectors/source-mysql/metadata.yaml | 2 +- .../source/mysql/MySqlSource.java | 2 +- .../initialsync/MySqlInitialLoadHandler.java | 11 +- .../MySqlInitialLoadRecordIterator.java | 38 ++++- .../initialsync/MySqlInitialReadUtil.java | 156 ++++++++++++++---- .../source-mysql/src/main/resources/spec.json | 10 ++ .../resources/expected_cloud_spec.json | 10 ++ .../resources/expected_oss_spec.json | 10 ++ .../source/mysql/CdcMysqlSourceTest.java | 66 ++++---- .../test/resources/expected_cloud_spec.json | 10 ++ .../src/test/resources/expected_oss_spec.json | 10 ++ docs/integrations/sources/mysql.md | 3 +- 20 files changed, 371 insertions(+), 114 deletions(-) create mode 100644 airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/InitialLoadTimeoutUtil.kt diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 74ac40a4e10b..64e21ad29ce7 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -174,6 +174,7 @@ corresponds to that version. | Version | Date | Pull Request | Subject | |:-----------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.41.0 | 2024-07-11 | [\#38240](https://github.com/airbytehq/airbyte/pull/38240) | Sources : Changes in CDC interfaces to support WASS algorithm | | 0.40.11 | 2024-07-08 | [\#41041](https://github.com/airbytehq/airbyte/pull/41041) | Destinations: Fix truncate refreshes incorrectly discarding data if successful attempt had 0 records | | 0.40.10 | 2024-07-05 | [\#40719](https://github.com/airbytehq/airbyte/pull/40719) | Update test to refrlect isResumable field in catalog | | 0.40.9 | 2024-07-01 | [\#39473](https://github.com/airbytehq/airbyte/pull/39473) | minor changes around error logging and testing | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 1bd6a3adad51..a1e1bd3c1a8c 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.40.11 +version=0.41.0 \ No newline at end of file diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt index c2416828696d..e38de51cc107 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt @@ -158,7 +158,12 @@ abstract class AbstractJdbcSource( ) return augmentWithStreamStatus( airbyteStream, - initialLoadHandler.getIteratorForStream(airbyteStream, table, Instant.now()) + initialLoadHandler.getIteratorForStream( + airbyteStream, + table, + Instant.now(), + Optional.empty() + ) ) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/InitialLoadHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/InitialLoadHandler.kt index f14e406c7462..0d7dcc68acce 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/InitialLoadHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/InitialLoadHandler.kt @@ -8,12 +8,15 @@ import io.airbyte.commons.util.AutoCloseableIterator import io.airbyte.protocol.models.CommonField import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream +import java.time.Duration import java.time.Instant +import java.util.Optional interface InitialLoadHandler { fun getIteratorForStream( airbyteStream: ConfiguredAirbyteStream, table: TableInfo>, - emittedAt: Instant + emittedAt: Instant, + cdcInitialLoadTimeout: Optional, ): AutoCloseableIterator } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/InitialLoadTimeoutUtil.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/InitialLoadTimeoutUtil.kt new file mode 100644 index 000000000000..cc9d43bbd6b9 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/InitialLoadTimeoutUtil.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.integrations.source.relationaldb + +import com.fasterxml.jackson.databind.JsonNode +import io.github.oshai.kotlinlogging.KotlinLogging +import java.time.Duration +import java.util.* + +private val LOGGER = KotlinLogging.logger {} + +object InitialLoadTimeoutUtil { + + val MIN_INITIAL_LOAD_TIMEOUT: Duration = Duration.ofHours(4) + val MAX_INITIAL_LOAD_TIMEOUT: Duration = Duration.ofHours(24) + val DEFAULT_INITIAL_LOAD_TIMEOUT: Duration = Duration.ofHours(8) + + @JvmStatic + fun getInitialLoadTimeout(config: JsonNode): Duration { + val isTest = config.has("is_test") && config["is_test"].asBoolean() + var initialLoadTimeout = DEFAULT_INITIAL_LOAD_TIMEOUT + + val initialLoadTimeoutHours = getInitialLoadTimeoutHours(config) + + if (initialLoadTimeoutHours.isPresent) { + initialLoadTimeout = Duration.ofHours(initialLoadTimeoutHours.get().toLong()) + if (!isTest && initialLoadTimeout.compareTo(MIN_INITIAL_LOAD_TIMEOUT) < 0) { + LOGGER.warn { + "Initial Load timeout is overridden to ${MIN_INITIAL_LOAD_TIMEOUT.toHours()} hours, " + + "which is the min time allowed for safety." + } + initialLoadTimeout = MIN_INITIAL_LOAD_TIMEOUT + } else if (!isTest && initialLoadTimeout.compareTo(MAX_INITIAL_LOAD_TIMEOUT) > 0) { + LOGGER.warn { + "Initial Load timeout is overridden to ${MAX_INITIAL_LOAD_TIMEOUT.toHours()} hours, " + + "which is the max time allowed for safety." + } + initialLoadTimeout = MAX_INITIAL_LOAD_TIMEOUT + } + } + + LOGGER.info { "Initial Load timeout: ${initialLoadTimeout.seconds} seconds" } + return initialLoadTimeout + } + + fun getInitialLoadTimeoutHours(config: JsonNode): Optional { + val replicationMethod = config["replication_method"] + if (replicationMethod != null && replicationMethod.has("initial_load_timeout_hours")) { + val seconds = config["replication_method"]["initial_load_timeout_hours"].asInt() + return Optional.of(seconds) + } + return Optional.empty() + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt index 19aa245325dc..6cbf11b78813 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt @@ -64,7 +64,17 @@ class GlobalStateManager( // Populate global state val globalState = AirbyteGlobalState() globalState.sharedState = Jsons.jsonNode(cdcStateManager.cdcState) - globalState.streamStates = StateGeneratorUtils.generateStreamStateList(pairToCursorInfoMap) + // If stream state exists in the global manager, it should be used to reflect the partial + // states of initial loads. + if ( + cdcStateManager.rawStateMessage?.global?.streamStates != null && + cdcStateManager.rawStateMessage.global?.streamStates?.size != 0 + ) { + globalState.streamStates = cdcStateManager.rawStateMessage.global.streamStates + } else { + globalState.streamStates = + StateGeneratorUtils.generateStreamStateList(pairToCursorInfoMap) + } // Generate the legacy state for backwards compatibility val dbState = diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt index 8ea2fc62b1e6..fbce7c522f84 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt @@ -117,7 +117,7 @@ abstract class CdcSourceTest> { Assertions.assertEquals( AirbyteMessage.Type.TRACE, actualMessage.type, - "[Debug] all Message: $allMessages" + "[Debug] all Message: $allMessages", ) var traceMessage = actualMessage.trace Assertions.assertNotNull(traceMessage.streamStatus) @@ -305,7 +305,7 @@ abstract class CdcSourceTest> { val recordsPerStream = extractRecordMessagesStreamWise(messages) val consolidatedRecords: MutableSet = HashSet() recordsPerStream.values.forEach( - Consumer { c: Set -> consolidatedRecords.addAll(c) } + Consumer { c: Set -> consolidatedRecords.addAll(c) }, ) return consolidatedRecords } @@ -415,8 +415,8 @@ abstract class CdcSourceTest> { createAirbteStreanStatusTraceMessage( modelsSchema(), MODELS_STREAM_NAME, - AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED - ) + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED, + ), ) assertStreamStatusTraceMessageIndex( actualRecords.size - 1, @@ -424,8 +424,8 @@ abstract class CdcSourceTest> { createAirbteStreanStatusTraceMessage( modelsSchema(), MODELS_STREAM_NAME, - AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE - ) + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE, + ), ) Assertions.assertNotNull(targetPosition) @@ -495,8 +495,8 @@ abstract class CdcSourceTest> { createAirbteStreanStatusTraceMessage( modelsSchema(), MODELS_STREAM_NAME, - AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED - ) + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED, + ), ) assertStreamStatusTraceMessageIndex( actualRecords1.size - 1, @@ -504,8 +504,8 @@ abstract class CdcSourceTest> { createAirbteStreanStatusTraceMessage( modelsSchema(), MODELS_STREAM_NAME, - AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE - ) + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE, + ), ) updateCommand(MODELS_STREAM_NAME, COL_MODEL, updatedModel, COL_ID, 11) @@ -589,8 +589,8 @@ abstract class CdcSourceTest> { createAirbteStreanStatusTraceMessage( modelsSchema(), MODELS_STREAM_NAME, - AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED - ) + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED, + ), ) assertStreamStatusTraceMessageIndex( dataFromSecondBatch.size - 1, @@ -598,8 +598,8 @@ abstract class CdcSourceTest> { createAirbteStreanStatusTraceMessage( modelsSchema(), MODELS_STREAM_NAME, - AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE - ) + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE, + ), ) val stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch) @@ -711,8 +711,8 @@ abstract class CdcSourceTest> { createAirbteStreanStatusTraceMessage( modelsSchema(), MODELS_STREAM_NAME, - AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED - ) + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED, + ), ) assertStreamStatusTraceMessageIndex( actualMessages1.size - 1, @@ -720,8 +720,8 @@ abstract class CdcSourceTest> { createAirbteStreanStatusTraceMessage( modelsSchema(), MODELS_STREAM_NAME_2, - AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE - ) + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE, + ), ) val recordMessages1 = extractRecordMessages(actualMessages1) @@ -753,8 +753,8 @@ abstract class CdcSourceTest> { createAirbteStreanStatusTraceMessage( modelsSchema(), MODELS_STREAM_NAME_2, - AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE - ) + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE, + ), ) assertStreamStatusTraceMessageIndex( MODEL_RECORDS_2.size + 1, @@ -762,8 +762,8 @@ abstract class CdcSourceTest> { createAirbteStreanStatusTraceMessage( modelsSchema(), MODELS_STREAM_NAME, - AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED - ) + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED, + ), ) val state = Jsons.jsonNode(listOf(stateMessages1[stateMessages1.size - 1])) @@ -787,7 +787,7 @@ abstract class CdcSourceTest> { // We are expecting count match for all streams, including non RFR streams. assertExpectedStateMessageCountMatches( stateMessages1, - MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong() + MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong(), ) // Expect state and record message from MODEL_RECORDS_2. @@ -797,8 +797,8 @@ abstract class CdcSourceTest> { createAirbteStreanStatusTraceMessage( modelsSchema(), MODELS_STREAM_NAME, - AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE - ) + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE, + ), ) assertStreamStatusTraceMessageIndex( 2 * MODEL_RECORDS_2.size + 3, @@ -806,8 +806,8 @@ abstract class CdcSourceTest> { createAirbteStreanStatusTraceMessage( modelsSchema(), MODELS_STREAM_NAME_2, - AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED - ) + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED, + ), ) assertExpectedRecords( @@ -935,7 +935,7 @@ abstract class CdcSourceTest> { // Non resumeable full refresh will also get state messages with count. assertExpectedStateMessageCountMatches( stateMessages1, - MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong() + MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong(), ) stateMessages1.map { state -> assertStateDoNotHaveDuplicateStreams(state) } assertExpectedRecords( @@ -1002,8 +1002,8 @@ abstract class CdcSourceTest> { createAirbteStreanStatusTraceMessage( modelsSchema(), MODELS_STREAM_NAME, - AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED - ) + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED, + ), ) assertStreamStatusTraceMessageIndex( actualRecords.size - 1, @@ -1011,8 +1011,8 @@ abstract class CdcSourceTest> { createAirbteStreanStatusTraceMessage( modelsSchema(), MODELS_STREAM_NAME, - AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE - ) + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE, + ), ) } @@ -1058,11 +1058,11 @@ abstract class CdcSourceTest> { Assertions.assertEquals( expectedCatalog.streams.sortedWith( - Comparator.comparing { obj: AirbyteStream -> obj.name } + Comparator.comparing { obj: AirbyteStream -> obj.name }, ), actualCatalog.streams.sortedWith( - Comparator.comparing { obj: AirbyteStream -> obj.name } - ) + Comparator.comparing { obj: AirbyteStream -> obj.name }, + ), ) } @@ -1225,7 +1225,7 @@ abstract class CdcSourceTest> { recordsWrittenInRandomTable.add(record2) } - val state2 = stateAfterSecondBatch[stateAfterSecondBatch.size - 1].data + val state2 = Jsons.jsonNode(listOf(stateAfterSecondBatch[stateAfterSecondBatch.size - 1])) val thirdBatchIterator = source().read(config()!!, updatedCatalog, state2) val dataFromThirdBatch = AutoCloseableIterators.toListAndClose(thirdBatchIterator) diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index b503df498236..8b0fdf23cf9f 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -6,7 +6,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.40.7' + cdkVersionRequired = '0.41.0' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 55ea492844af..9f5352d563e7 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.4.12 + dockerImageTag: 3.5.0 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index 0eac319f64b5..90674ab134ad 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -484,7 +484,7 @@ public List> getIncrementalIterators(final final List> initialLoadIterator = new ArrayList<>(initialLoadHandler.getIncrementalIterators( new ConfiguredAirbyteCatalog().withStreams(initialLoadStreams.streamsForInitialLoad()), tableNameToTable, - emittedAt, true, true)); + emittedAt, true, true, Optional.empty())); // Build Cursor based iterator final List> cursorBasedIterator = diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadHandler.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadHandler.java index e005206676e3..f9684c33ea1f 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadHandler.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadHandler.java @@ -92,7 +92,8 @@ public List> getIncrementalIterators( final Map>> tableNameToTable, final Instant emittedAt, final boolean decorateWithStartedStatus, - final boolean decorateWithCompletedStatus) { + final boolean decorateWithCompletedStatus, + final Optional cdcInitialLoadTimeout) { final List> iteratorList = new ArrayList<>(); for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) { final AirbyteStream stream = airbyteStream.getStream(); @@ -107,7 +108,7 @@ public List> getIncrementalIterators( new StreamStatusTraceEmitterIterator(new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED))); } - iteratorList.add(getIteratorForStream(airbyteStream, table, emittedAt)); + iteratorList.add(getIteratorForStream(airbyteStream, table, emittedAt, cdcInitialLoadTimeout)); if (decorateWithCompletedStatus) { iteratorList.add(new StreamStatusTraceEmitterIterator( new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE))); @@ -121,7 +122,8 @@ public List> getIncrementalIterators( public AutoCloseableIterator getIteratorForStream( @NotNull ConfiguredAirbyteStream airbyteStream, @NotNull TableInfo> table, - @NotNull Instant emittedAt) { + @NotNull Instant emittedAt, + @NotNull final Optional cdcInitialLoadTimeout) { final AirbyteStream stream = airbyteStream.getStream(); final String streamName = stream.getName(); @@ -134,7 +136,8 @@ public AutoCloseableIterator getIteratorForStream( .collect(Collectors.toList()); final AutoCloseableIterator queryStream = new MySqlInitialLoadRecordIterator(database, sourceOperations, quoteString, initialLoadStateManager, selectedDatabaseFields, pair, - Long.min(calculateChunkSize(tableSizeInfoMap.get(pair), pair), MAX_CHUNK_SIZE), isCompositePrimaryKey(airbyteStream)); + Long.min(calculateChunkSize(tableSizeInfoMap.get(pair), pair), MAX_CHUNK_SIZE), isCompositePrimaryKey(airbyteStream), emittedAt, + cdcInitialLoadTimeout); final AutoCloseableIterator recordIterator = getRecordIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli()); final AutoCloseableIterator recordAndMessageIterator = augmentWithState(recordIterator, airbyteStream, pair); diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadRecordIterator.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadRecordIterator.java index 7c1c600766a8..c21d2d3ce9da 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadRecordIterator.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadRecordIterator.java @@ -4,12 +4,16 @@ package io.airbyte.integrations.source.mysql.initialsync; +import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcSnapshotForceShutdownMessage; + import com.google.common.collect.AbstractIterator; import com.mysql.cj.MysqlType; import io.airbyte.cdk.db.JdbcCompatibleSourceOperations; import io.airbyte.cdk.db.jdbc.AirbyteRecordData; import io.airbyte.cdk.db.jdbc.JdbcDatabase; +import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils; +import io.airbyte.commons.exceptions.TransientErrorException; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.PrimaryKeyInfo; @@ -18,7 +22,10 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.time.Duration; +import java.time.Instant; import java.util.List; +import java.util.Optional; import java.util.stream.Stream; import javax.annotation.CheckForNull; import org.slf4j.Logger; @@ -53,9 +60,14 @@ public class MySqlInitialLoadRecordIterator extends AbstractIterator currentIterator; + private Optional cdcInitialLoadTimeout; + private boolean isCdcSync; + MySqlInitialLoadRecordIterator( final JdbcDatabase database, final JdbcCompatibleSourceOperations sourceOperations, @@ -64,7 +76,9 @@ public class MySqlInitialLoadRecordIterator extends AbstractIterator columnNames, final AirbyteStreamNameNamespacePair pair, final long chunkSize, - final boolean isCompositeKeyLoad) { + final boolean isCompositeKeyLoad, + final Instant startInstant, + final Optional cdcInitialLoadTimeout) { this.database = database; this.sourceOperations = sourceOperations; this.quoteString = quoteString; @@ -74,11 +88,23 @@ public class MySqlInitialLoadRecordIterator extends AbstractIterator 0) { + final String cdcInitialLoadTimeoutMessage = String.format( + "Initial load for table %s has taken longer than %s, Canceling sync so that CDC replication can catch-up on subsequent attempt, and then initial snapshotting will resume", + getAirbyteStream().get(), cdcInitialLoadTimeout.get()); + LOGGER.info(cdcInitialLoadTimeoutMessage); + AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcSnapshotForceShutdownMessage()); + throw new TransientErrorException(cdcInitialLoadTimeoutMessage); + } if (shouldBuildNextSubquery()) { try { // We will only issue one query for a composite key load. If we have already processed all the data @@ -187,4 +213,14 @@ public void close() throws Exception { } } + private boolean isCdcSync(MySqlInitialLoadStateManager initialLoadStateManager) { + if (initialLoadStateManager instanceof MySqlInitialLoadGlobalStateManager) { + LOGGER.info("Running a cdc sync"); + return true; + } else { + LOGGER.info("Not running a cdc sync"); + return false; + } + } + } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java index 04a62a270073..8f19a82a44d3 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java @@ -18,16 +18,20 @@ import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler; +import io.airbyte.cdk.integrations.debezium.internals.DebeziumEventConverter; import io.airbyte.cdk.integrations.debezium.internals.RecordWaitTimeUtil; import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumEventConverter; import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumPropertiesManager; import io.airbyte.cdk.integrations.source.relationaldb.CdcStateManager; import io.airbyte.cdk.integrations.source.relationaldb.DbSourceDiscoverUtil; +import io.airbyte.cdk.integrations.source.relationaldb.InitialLoadTimeoutUtil; import io.airbyte.cdk.integrations.source.relationaldb.TableInfo; import io.airbyte.cdk.integrations.source.relationaldb.models.CdcState; import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager; import io.airbyte.cdk.integrations.source.relationaldb.streamstatus.StreamStatusTraceEmitterIterator; +import io.airbyte.cdk.integrations.source.relationaldb.streamstatus.TransientErrorTraceEmitterIterator; import io.airbyte.commons.exceptions.ConfigErrorException; +import io.airbyte.commons.exceptions.TransientErrorException; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.stream.AirbyteStreamStatusHolder; import io.airbyte.commons.util.AutoCloseableIterator; @@ -194,6 +198,7 @@ public static List> getCdcReadIterators(fi final JsonNode sourceConfig = database.getSourceConfig(); final Duration firstRecordWaitTime = RecordWaitTimeUtil.getFirstRecordWaitTime(sourceConfig); LOGGER.info("First record waiting time: {} seconds", firstRecordWaitTime.getSeconds()); + final Duration initialLoadTimeout = InitialLoadTimeoutUtil.getInitialLoadTimeout(sourceConfig); // Determine the streams that need to be loaded via primary key sync. final List> initialLoadIterator = new ArrayList<>(); final InitialLoadStreams initialLoadStreams = @@ -209,6 +214,17 @@ public static List> getCdcReadIterators(fi stateToBeUsed = cdcState; } + // Debezium is started for streams that have been started - that is they have been partially or + // fully completed. + final var startedCdcStreamList = catalog.getStreams().stream() + .filter(stream -> stream.getSyncMode() == SyncMode.INCREMENTAL) + .filter(stream -> isStreamPartiallyOrFullyCompleted(stream, initialLoadStreams)) + .map(stream -> stream.getStream().getNamespace() + "." + stream.getStream().getName()).toList(); + + final var allCdcStreamList = catalog.getStreams().stream() + .filter(stream -> stream.getSyncMode() == SyncMode.INCREMENTAL) + .map(stream -> stream.getStream().getNamespace() + "." + stream.getStream().getName()).toList(); + // If there are streams to sync via primary key load, build the relevant iterators. if (!initialLoadStreams.streamsForInitialLoad().isEmpty()) { @@ -218,23 +234,31 @@ public static List> getCdcReadIterators(fi getMySqlInitialLoadHandler(database, emittedAt, quoteString, initialLoadStreams, initialLoadGlobalStateManager, Optional.of(new CdcMetadataInjector(emittedAt.toString(), stateAttributes, metadataInjector))); - // Because initial load streams will be followed by cdc read of those stream, we only decorate with - // complete status trace - // after CDC read is done. + // Start and complete stream status messages are emitted while constructing the full set of initial + // load and incremental debezium iterators. initialLoadIterator.addAll(initialLoadHandler.getIncrementalIterators( new ConfiguredAirbyteCatalog().withStreams(initialLoadStreams.streamsForInitialLoad()), tableNameToTable, - emittedAt, true, false)); + emittedAt, false, false, Optional.of(initialLoadTimeout))); } + // CDC stream status messages should be emitted for streams. final List> cdcStreamsStartStatusEmitters = catalog.getStreams().stream() - .filter(stream -> !initialLoadStreams.streamsForInitialLoad.contains(stream)) + .filter(stream -> stream.getSyncMode() == SyncMode.INCREMENTAL) .map(stream -> (AutoCloseableIterator) new StreamStatusTraceEmitterIterator( new AirbyteStreamStatusHolder( new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()), AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED))) .toList(); + final List> cdcStreamsEndStatusEmitters = catalog.getStreams().stream() + .filter(stream -> stream.getSyncMode() == SyncMode.INCREMENTAL) + .map(stream -> (AutoCloseableIterator) new StreamStatusTraceEmitterIterator( + new AirbyteStreamStatusHolder( + new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()), + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE))) + .toList(); + // Build the incremental CDC iterators. final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler( sourceConfig, @@ -243,38 +267,92 @@ public static List> getCdcReadIterators(fi firstRecordWaitTime, AirbyteDebeziumHandler.QUEUE_CAPACITY, false); - final var cdcStreamList = catalog.getStreams().stream() - .filter(stream -> stream.getSyncMode() == SyncMode.INCREMENTAL) - .map(stream -> stream.getStream().getNamespace() + "." + stream.getStream().getName()).toList(); - final var propertiesManager = new RelationalDbDebeziumPropertiesManager( - MySqlCdcProperties.getDebeziumProperties(database), sourceConfig, catalog, cdcStreamList); final var eventConverter = new RelationalDbDebeziumEventConverter(metadataInjector, emittedAt); - final Supplier> incrementalIteratorSupplier = () -> handler.getIncrementalIterators( - propertiesManager, eventConverter, new MySqlCdcSavedInfoFetcher(stateToBeUsed), new MySqlCdcStateHandler(stateManager)); - - final List> allStreamsCompleteStatusEmitters = catalog.getStreams().stream() - .filter(stream -> stream.getSyncMode() == SyncMode.INCREMENTAL) - .map(stream -> (AutoCloseableIterator) new StreamStatusTraceEmitterIterator( - new AirbyteStreamStatusHolder( - new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()), - AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE))) - .toList(); + if (startedCdcStreamList.isEmpty()) { + LOGGER.info("First sync - no cdc streams have been completed or started"); + /* + * This is the first run case - no initial loads have been started. In this case, we want to run the + * iterators in the following order: 1. Run the initial load iterators. This step will timeout and + * throw a transient error if run for too long (> 8hrs by default). 2. Run the debezium iterators + * with ALL of the incremental streams configured. This is because if step 1 completes, the initial + * load can be considered finished. + */ + final var propertiesManager = new RelationalDbDebeziumPropertiesManager( + MySqlCdcProperties.getDebeziumProperties(database), sourceConfig, catalog, allCdcStreamList); + final Supplier> incrementalIteratorsSupplier = getCdcIncrementalIteratorsSupplier(handler, + propertiesManager, eventConverter, stateToBeUsed, stateManager); + return Collections.singletonList( + AutoCloseableIterators.concatWithEagerClose( + Stream + .of( + cdcStreamsStartStatusEmitters, + initialLoadIterator, + Collections.singletonList(AutoCloseableIterators.lazyIterator(incrementalIteratorsSupplier, null)), + cdcStreamsEndStatusEmitters) + .flatMap(Collection::stream) + .collect(Collectors.toList()), + AirbyteTraceMessageUtility::emitStreamStatusTrace)); + } else if (initialLoadIterator.isEmpty()) { + LOGGER.info("Initial load has finished completely - only reading the binlog"); + /* + * In this case, the initial load has completed and only debezium should be run. The iterators + * should be run in the following order: 1. Run the debezium iterators with ALL of the incremental + * streams configured. + */ + final var propertiesManager = new RelationalDbDebeziumPropertiesManager( + MySqlCdcProperties.getDebeziumProperties(database), sourceConfig, catalog, allCdcStreamList); + final Supplier> incrementalIteratorSupplier = getCdcIncrementalIteratorsSupplier(handler, + propertiesManager, eventConverter, stateToBeUsed, stateManager); + return Collections.singletonList( + AutoCloseableIterators.concatWithEagerClose( + Stream + .of( + cdcStreamsStartStatusEmitters, + Collections.singletonList(AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null)), + cdcStreamsEndStatusEmitters) + .flatMap(Collection::stream) + .collect(Collectors.toList()), + AirbyteTraceMessageUtility::emitStreamStatusTrace)); + } else { + LOGGER.info("Initial load is in progress - reading binlog first and then resuming with initial load."); + /* + * In this case, the initial load has partially completed (WASS case). The iterators should be run + * in the following order: 1. Run the debezium iterators with only the incremental streams which + * have been fully or partially completed configured. 2. Resume initial load for partially completed + * and not started streams. This step will timeout and throw a transient error if run for too long + * (> 8hrs by default). 3. Emit a transient error. This is to signal to the platform to restart the + * sync to clear the binlog. We cannot simply add the same cdc iterators as their target end + * position is fixed to the tip of the binlog at the start of the sync. + */ + final var propertiesManager = new RelationalDbDebeziumPropertiesManager( + MySqlCdcProperties.getDebeziumProperties(database), sourceConfig, catalog, startedCdcStreamList); + final Supplier> incrementalIteratorSupplier = getCdcIncrementalIteratorsSupplier(handler, + propertiesManager, eventConverter, stateToBeUsed, stateManager); + return Collections.singletonList( + AutoCloseableIterators.concatWithEagerClose( + Stream + .of( + cdcStreamsStartStatusEmitters, + Collections.singletonList(AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null)), + initialLoadIterator, + cdcStreamsEndStatusEmitters, + List.of(new TransientErrorTraceEmitterIterator( + new TransientErrorException("Forcing a new sync after the initial load to read the binlog")))) + .flatMap(Collection::stream) + .collect(Collectors.toList()), + AirbyteTraceMessageUtility::emitStreamStatusTrace)); + } + } - // This starts processing the binglogs as soon as initial sync is complete, this is a bit different - // from the current cdc syncs. - // We finish the current CDC once the initial snapshot is complete and the next sync starts - // processing the binlogs - return Collections.singletonList( - AutoCloseableIterators.concatWithEagerClose( - Stream - .of(initialLoadIterator, - cdcStreamsStartStatusEmitters, - Collections.singletonList(AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null)), - allStreamsCompleteStatusEmitters) - .flatMap(Collection::stream) - .collect(Collectors.toList()), - AirbyteTraceMessageUtility::emitStreamStatusTrace)); + @SuppressWarnings("unchecked") + private static Supplier> getCdcIncrementalIteratorsSupplier(AirbyteDebeziumHandler handler, + RelationalDbDebeziumPropertiesManager propertiesManager, + DebeziumEventConverter eventConverter, + CdcState stateToBeUsed, + StateManager stateManager) { + return () -> handler.getIncrementalIterators( + propertiesManager, eventConverter, new MySqlCdcSavedInfoFetcher(stateToBeUsed), new MySqlCdcStateHandler(stateManager)); } /** @@ -428,7 +506,7 @@ public static Map>> tableNameToTable, final String quoteString) { final Map pairToPkInfoMap = new HashMap<>(); - // For every stream that is in primary initial key sync, we want to maintain information about the + // For every stream that was in primary initial key sync, we want to maintain information about the // current primary key info associated with the // stream initialLoadStreams.streamsForInitialLoad().forEach(stream -> { @@ -477,6 +555,14 @@ private static Optional getPrimaryKeyInfo(final JdbcDatabase dat return Optional.of(new PrimaryKeyInfo(pkFieldName, pkFieldType, pkMaxValue)); } + private static boolean isStreamPartiallyOrFullyCompleted(ConfiguredAirbyteStream stream, InitialLoadStreams initialLoadStreams) { + boolean isStreamCompleted = !initialLoadStreams.streamsForInitialLoad.contains(stream); + // A stream has been partially completed if an initial load status exists. + boolean isStreamPartiallyCompleted = (initialLoadStreams.pairToInitialLoadStatus + .get(new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()))) != null; + return isStreamCompleted || isStreamPartiallyCompleted; + } + public record InitialLoadStreams(List streamsForInitialLoad, Map pairToInitialLoadStatus) { diff --git a/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json b/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json index 78450b13aabd..5a9304326cdd 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json @@ -220,6 +220,16 @@ "default": "Fail sync", "order": 3, "always_show": true + }, + "initial_load_timeout_hours": { + "type": "integer", + "title": "Initial Load Timeout in Hours (Advanced)", + "description": "The amount of time an initial load is allowed to continue for before catching up on CDC logs.", + "default": 8, + "min": 4, + "max": 24, + "order": 4, + "always_show": true } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json b/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json index 871b7c0c38bb..b76358180e65 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json @@ -198,6 +198,16 @@ "default": "Fail sync", "order": 3, "always_show": true + }, + "initial_load_timeout_hours": { + "type": "integer", + "title": "Initial Load Timeout in Hours (Advanced)", + "description": "The amount of time an initial load is allowed to continue for before catching up on CDC logs.", + "default": 8, + "min": 4, + "max": 24, + "order": 4, + "always_show": true } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_oss_spec.json b/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_oss_spec.json index 7ffbbad5f718..d45898990ba5 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_oss_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_oss_spec.json @@ -220,6 +220,16 @@ "default": "Fail sync", "order": 3, "always_show": true + }, + "initial_load_timeout_hours": { + "type": "integer", + "title": "Initial Load Timeout in Hours (Advanced)", + "description": "The amount of time an initial load is allowed to continue for before catching up on CDC logs.", + "default": 8, + "min": 4, + "max": 24, + "order": 4, + "always_show": true } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java index 365218b49933..51f81be711e3 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java @@ -464,13 +464,28 @@ private void assertStateTypes(final List stateMes @Override protected void assertStateMessagesForNewTableSnapshotTest(final List stateMessages, final AirbyteStateMessage stateMessageEmittedAfterFirstSyncCompletion) { + + // First message emitted in the WASS case is a CDC state message. This should have a different + // global state (LSN) as compared to the previous + // finishing state. The streams in snapshot phase should be the one that is completed at that point. assertEquals(7, stateMessages.size()); - for (int i = 0; i <= 4; i++) { + final AirbyteStateMessage cdcStateMessage = stateMessages.get(0); + assertNotEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(), cdcStateMessage.getGlobal().getSharedState()); + Set streamsInSnapshotState = cdcStateMessage.getGlobal().getStreamStates() + .stream() + .map(AirbyteStreamState::getStreamDescriptor) + .collect(Collectors.toSet()); + assertEquals(1, streamsInSnapshotState.size()); + assertTrue(streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(getDatabaseName()))); + + for (int i = 1; i <= 5; i++) { final AirbyteStateMessage stateMessage = stateMessages.get(i); assertEquals(AirbyteStateType.GLOBAL, stateMessage.getType()); - assertEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(), - stateMessage.getGlobal().getSharedState()); - final Set streamsInSnapshotState = stateMessage.getGlobal().getStreamStates() + // Shared state should not be the same as the first (CDC) state message as it should not change in + // initial sync. + assertEquals(cdcStateMessage.getGlobal().getSharedState(), stateMessage.getGlobal().getSharedState()); + streamsInSnapshotState.clear(); + streamsInSnapshotState = stateMessage.getGlobal().getStreamStates() .stream() .map(AirbyteStreamState::getStreamDescriptor) .collect(Collectors.toSet()); @@ -491,11 +506,13 @@ protected void assertStateMessagesForNewTableSnapshotTest(final List streamsInSnapshotState = secondLastSateMessage.getGlobal().getStreamStates() + // The last message emitted should indicate that initial PK load has finished for both streams. + final AirbyteStateMessage stateMessageEmittedAfterSecondSyncCompletion = stateMessages.get(6); + assertEquals(AirbyteStateType.GLOBAL, stateMessageEmittedAfterSecondSyncCompletion.getType()); + assertEquals(cdcStateMessage.getGlobal().getSharedState(), + stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getSharedState()); + streamsInSnapshotState.clear(); + streamsInSnapshotState = stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getStreamStates() .stream() .map(AirbyteStreamState::getStreamDescriptor) .collect(Collectors.toSet()); @@ -503,26 +520,10 @@ protected void assertStateMessagesForNewTableSnapshotTest(final List { + stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getStreamStates().forEach(s -> { final JsonNode streamState = s.getStreamState(); assertFalse(streamState.has(STATE_TYPE_KEY)); }); - - final AirbyteStateMessage stateMessageEmittedAfterSecondSyncCompletion = stateMessages.get(6); - assertEquals(AirbyteStateType.GLOBAL, stateMessageEmittedAfterSecondSyncCompletion.getType()); - assertNotEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(), - stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getSharedState()); - final Set streamsInSyncCompletionState = stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getStreamStates() - .stream() - .map(AirbyteStreamState::getStreamDescriptor) - .collect(Collectors.toSet()); - assertEquals(2, streamsInSnapshotState.size()); - assertTrue( - streamsInSyncCompletionState.contains( - new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomSchema()))); - assertTrue( - streamsInSyncCompletionState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(getDatabaseName()))); - assertNotNull(stateMessageEmittedAfterSecondSyncCompletion.getData()); } @Test @@ -584,7 +585,12 @@ public void testCompositeIndexInitialLoad() throws Exception { assertExpectedRecords(new HashSet<>(MODEL_RECORDS.subList(4, 6)), recordMessages2); assertEquals(3, stateMessages2.size()); - assertStateTypes(stateMessages2, 0); + // In the second sync (WASS case), the first state message is emitted via debezium use case, which + // should still have the pk state encoded within. The second state message emitted will contain + // state from the initial + // sync and the last (3rd) state message will not have any pk state as the initial sync can now be + // considered complete. + assertStateTypes(stateMessages2, 1); } // Remove all timestamp related fields in shared state. We want to make sure other information will @@ -746,10 +752,10 @@ public void testTwoStreamSync() throws Exception { assertNotNull(global.getSharedState()); assertEquals(2, global.getStreamStates().size()); - if (i <= 3) { + if (i <= 4) { final StreamDescriptor finalFirstStreamInState = firstStreamInState; global.getStreamStates().forEach(c -> { - // First 4 state messages are primary_key state for the stream that didn't complete primary_key sync + // First 5 state messages are primary_key state for the stream that didn't complete primary_key sync // the first time if (c.getStreamDescriptor().equals(finalFirstStreamInState)) { assertFalse(c.getStreamState().has(STATE_TYPE_KEY)); @@ -759,7 +765,7 @@ public void testTwoStreamSync() throws Exception { } }); } else { - // last 2 state messages don't contain primary_key info cause primary_key sync should be complete + // last state messages doesn't contain primary_key info cause primary_key sync should be complete global.getStreamStates().forEach(c -> assertFalse(c.getStreamState().has(STATE_TYPE_KEY))); } } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json index 66f0b3bdf647..29c00d0c7b7a 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json @@ -214,6 +214,16 @@ "default": "Fail sync", "order": 3, "always_show": true + }, + "initial_load_timeout_hours": { + "type": "integer", + "title": "Initial Load Timeout in Hours (Advanced)", + "description": "The amount of time an initial load is allowed to continue for before catching up on CDC logs.", + "default": 8, + "min": 4, + "max": 24, + "order": 4, + "always_show": true } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json index 78450b13aabd..5a9304326cdd 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json @@ -220,6 +220,16 @@ "default": "Fail sync", "order": 3, "always_show": true + }, + "initial_load_timeout_hours": { + "type": "integer", + "title": "Initial Load Timeout in Hours (Advanced)", + "description": "The amount of time an initial load is allowed to continue for before catching up on CDC logs.", + "default": 8, + "min": 4, + "max": 24, + "order": 4, + "always_show": true } } }, diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index bb08f32ba977..fb20ff07b22d 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -233,7 +233,8 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.4.12 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. | +| 3.5.0 | 2024-07-11 | [38240](https://github.com/airbytehq/airbyte/pull/38240) | Implement WASS. | +| 3.4.12 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz heartbeat. | | 3.4.11 | 2024-06-26 | [40561](https://github.com/airbytehq/airbyte/pull/40561) | Support PlanetScale MySQL's per-query row limit. | | 3.4.10 | 2024-06-14 | [39349](https://github.com/airbytehq/airbyte/pull/39349) | Full refresh stream sending internal count metadata. | | 3.4.9 | 2024-06-11 | [39405](https://github.com/airbytehq/airbyte/pull/39405) | Adopt latest CDK. |