From 58d052de24e0f082e3f5682352aff27ada6f3e7c Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Tue, 9 Jan 2024 17:05:57 -0800 Subject: [PATCH] Add count in state message for incremental syncs (#33005) Co-authored-by: xiaohansong --- airbyte-cdk/java/airbyte-cdk/README.md | 1 + .../src/main/resources/version.properties | 2 +- .../DebeziumStateDecoratingIterator.java | 11 ++++++---- .../relationaldb/StateDecoratingIterator.java | 20 ++++++++++++++----- .../state/SourceStateIterator.java | 2 ++ .../StateDecoratingIteratorTest.java | 4 +++- .../integrations/debezium/CdcSourceTest.java | 2 ++ .../jdbc/test/JdbcSourceAcceptanceTest.java | 19 ++++++++++-------- .../connectors/source-mysql/build.gradle | 2 +- .../connectors/source-mysql/metadata.yaml | 2 +- .../MySqlInitialLoadGlobalStateManager.java | 3 ++- .../MySqlInitialLoadStateManager.java | 3 ++- .../source/mysql/CdcMysqlSourceTest.java | 15 ++++++++++++++ .../mysql/MySqlJdbcSourceAcceptanceTest.java | 8 +++++--- docs/integrations/sources/mysql.md | 3 ++- 15 files changed, 70 insertions(+), 27 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index ce700502cdea..6a26ce990535 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,6 +166,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.11.4 | 2024-01-09 | [\#33305](https://github.com/airbytehq/airbyte/pull/33305) | Source stats in incremental syncs | | 0.11.3 | 2023-01-09 | [\#33658](https://github.com/airbytehq/airbyte/pull/33658) | Always fail when debezium fails, even if it happened during the setup phase. | | 0.11.2 | 2024-01-09 | [\#33969](https://github.com/airbytehq/airbyte/pull/33969) | Destination state stats implementation | | 0.11.1 | 2024-01-04 | [\#33727](https://github.com/airbytehq/airbyte/pull/33727) | SSH bastion heartbeats for Destinations | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 8b9cd41d8c02..630a01b8fa3b 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.11.3 +version=0.11.4 diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateDecoratingIterator.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateDecoratingIterator.java index 6e98a6ca86ac..29569515e999 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateDecoratingIterator.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateDecoratingIterator.java @@ -12,6 +12,7 @@ import io.airbyte.cdk.integrations.debezium.internals.DebeziumPropertiesManager.DebeziumConnectorType; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteStateMessage; +import io.airbyte.protocol.models.v0.AirbyteStateStats; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import java.time.Duration; import java.time.Instant; @@ -144,7 +145,7 @@ protected AirbyteMessage computeNext() { if (cdcStateHandler.isCdcCheckpointEnabled() && sendCheckpointMessage) { LOGGER.info("Sending CDC checkpoint state message."); - final AirbyteMessage stateMessage = createStateMessage(checkpointOffsetToSend); + final AirbyteMessage stateMessage = createStateMessage(checkpointOffsetToSend, recordsLastSync); previousCheckpointOffset.clear(); previousCheckpointOffset.putAll(checkpointOffsetToSend); resetCheckpointValues(); @@ -182,7 +183,7 @@ protected AirbyteMessage computeNext() { } isSyncFinished = true; - return createStateMessage(offsetManager.read()); + return createStateMessage(offsetManager.read(), recordsLastSync); } /** @@ -201,7 +202,7 @@ private void resetCheckpointValues() { * * @return {@link AirbyteStateMessage} which includes offset and schema history if used. */ - private AirbyteMessage createStateMessage(final Map offset) { + private AirbyteMessage createStateMessage(final Map offset, final long recordCount) { if (trackSchemaHistory && schemaHistoryManager == null) { throw new RuntimeException("Schema History Tracking is true but manager is not initialised"); } @@ -209,7 +210,9 @@ private AirbyteMessage createStateMessage(final Map offset) { throw new RuntimeException("Offset can not be null"); } - return cdcStateHandler.saveState(offset, schemaHistoryManager != null ? schemaHistoryManager.read() : null); + final AirbyteMessage message = cdcStateHandler.saveState(offset, schemaHistoryManager != null ? schemaHistoryManager.read() : null); + message.getState().withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount)); + return message; } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIterator.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIterator.java index 5c8f7d638ebb..667a0ceb8152 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIterator.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIterator.java @@ -11,6 +11,7 @@ import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteMessage.Type; import io.airbyte.protocol.models.v0.AirbyteStateMessage; +import io.airbyte.protocol.models.v0.AirbyteStateStats; import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; import java.util.Iterator; import java.util.Objects; @@ -53,6 +54,8 @@ public class StateDecoratingIterator extends AbstractIterator im */ private final int stateEmissionFrequency; private int totalRecordCount = 0; + // In between each state message, recordCountInStateMessage will be reset to 0. + private int recordCountInStateMessage = 0; private boolean emitIntermediateState = false; private AirbyteMessage intermediateStateMessage = null; private boolean hasCaughtException = false; @@ -128,6 +131,7 @@ protected AirbyteMessage computeNext() { } totalRecordCount++; + recordCountInStateMessage++; // Use try-catch to catch Exception that could occur when connection to the database fails try { final AirbyteMessage message = messageIterator.next(); @@ -139,7 +143,7 @@ protected AirbyteMessage computeNext() { if (stateEmissionFrequency > 0 && !Objects.equals(currentMaxCursor, initialCursor) && messageIterator.hasNext()) { // Only create an intermediate state when it is not the first or last record message. // The last state message will be processed seperately. - intermediateStateMessage = createStateMessage(false, totalRecordCount); + intermediateStateMessage = createStateMessage(false, recordCountInStateMessage); } currentMaxCursor = cursorCandidate; currentMaxCursorRecordCount = 1L; @@ -164,7 +168,7 @@ protected AirbyteMessage computeNext() { return optionalIntermediateMessage.orElse(endOfData()); } } else if (!hasEmittedFinalState) { - return createStateMessage(true, totalRecordCount); + return createStateMessage(true, recordCountInStateMessage); } else { return endOfData(); } @@ -185,6 +189,7 @@ protected final Optional getIntermediateMessage() { if (emitIntermediateState && intermediateStateMessage != null) { final AirbyteMessage message = intermediateStateMessage; intermediateStateMessage = null; + recordCountInStateMessage = 0; emitIntermediateState = false; return Optional.of(message); } @@ -196,14 +201,15 @@ protected final Optional getIntermediateMessage() { * read up so far * * @param isFinalState marker for if the final state of the iterator has been reached - * @param totalRecordCount count of read messages + * @param recordCount count of read messages * @return AirbyteMessage which includes information on state of records read so far */ - public AirbyteMessage createStateMessage(final boolean isFinalState, final int totalRecordCount) { + public AirbyteMessage createStateMessage(final boolean isFinalState, final int recordCount) { final AirbyteStateMessage stateMessage = stateManager.updateAndEmit(pair, currentMaxCursor, currentMaxCursorRecordCount); final Optional cursorInfo = stateManager.getCursorInfo(pair); + // logging once every 100 messages to reduce log verbosity - if (totalRecordCount % 100 == 0) { + if (recordCount % 100 == 0) { LOGGER.info("State report for stream {} - original: {} = {} (count {}) -> latest: {} = {} (count {})", pair, cursorInfo.map(CursorInfo::getOriginalCursorField).orElse(null), @@ -213,6 +219,10 @@ public AirbyteMessage createStateMessage(final boolean isFinalState, final int t cursorInfo.map(CursorInfo::getCursor).orElse(null), cursorInfo.map(CursorInfo::getCursorRecordCount).orElse(null)); } + + if (stateMessage != null) { + stateMessage.withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount)); + } if (isFinalState) { hasEmittedFinalState = true; if (stateManager.getCursor(pair).isEmpty()) { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.java index 3cea088be9c6..203244800b42 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.java @@ -63,6 +63,8 @@ protected AirbyteMessage computeNext() { } else if (!hasEmittedFinalState) { hasEmittedFinalState = true; final AirbyteStateMessage finalStateMessage = sourceStateIteratorManager.createFinalStateMessage(); + finalStateMessage.withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount)); + recordCount = 0L; return new AirbyteMessage() .withType(Type.STATE) .withState(finalStateMessage); diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIteratorTest.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIteratorTest.java index 8e6448b78d89..e2d64f849748 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIteratorTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIteratorTest.java @@ -19,6 +19,7 @@ import io.airbyte.protocol.models.v0.AirbyteMessage.Type; import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import io.airbyte.protocol.models.v0.AirbyteStateMessage; +import io.airbyte.protocol.models.v0.AirbyteStateStats; import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; import java.sql.SQLException; import java.util.Collections; @@ -69,7 +70,8 @@ private static AirbyteMessage createStateMessage(final String recordValue) { return new AirbyteMessage() .withType(Type.STATE) .withState(new AirbyteStateMessage() - .withData(Jsons.jsonNode(ImmutableMap.of("cursor", recordValue)))); + .withData(Jsons.jsonNode(ImmutableMap.of("cursor", recordValue))) + .withSourceStats(new AirbyteStateStats().withRecordCount(1.0))); } private Iterator createExceptionIterator() { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java index a0ee71a226d0..d05d54254ebe 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java @@ -142,6 +142,8 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() { protected abstract void assertExpectedStateMessages(final List stateMessages); + protected abstract void assertExpectedStateMessagesWithTotalCount(final List stateMessages, final long totalRecordCount); + @BeforeEach protected void setup() { testdb = createTestDatabase(); diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java index dda24e0dd292..ae358d0f8e8d 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java @@ -36,6 +36,7 @@ import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import io.airbyte.protocol.models.v0.AirbyteStateMessage; import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType; +import io.airbyte.protocol.models.v0.AirbyteStateStats; import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.AirbyteStreamState; import io.airbyte.protocol.models.v0.CatalogHelpers; @@ -601,7 +602,7 @@ protected List getExpectedAirbyteMessagesSecondSync(final String .withCursorField(List.of(COL_ID)) .withCursor("5") .withCursorRecordCount(1L); - expectedMessages.addAll(createExpectedTestMessages(List.of(state))); + expectedMessages.addAll(createExpectedTestMessages(List.of(state), 2L)); return expectedMessages; } @@ -671,9 +672,9 @@ protected void testReadMultipleTablesIncrementally() throws Exception { .withCursorRecordCount(1L)); final List expectedMessagesFirstSync = new ArrayList<>(getTestMessages()); - expectedMessagesFirstSync.add(createStateMessage(expectedStateStreams1.get(0), expectedStateStreams1)); + expectedMessagesFirstSync.add(createStateMessage(expectedStateStreams1.get(0), expectedStateStreams1, 3L)); expectedMessagesFirstSync.addAll(secondStreamExpectedMessages); - expectedMessagesFirstSync.add(createStateMessage(expectedStateStreams2.get(1), expectedStateStreams2)); + expectedMessagesFirstSync.add(createStateMessage(expectedStateStreams2.get(1), expectedStateStreams2, 3L)); setEmittedAtToNull(actualMessagesFirstSync); @@ -854,7 +855,7 @@ protected void incrementalCursorCheck( final List expectedStreams = List.of(buildStreamState(airbyteStream, cursorField, endCursorValue)); final List expectedMessages = new ArrayList<>(expectedRecordMessages); - expectedMessages.addAll(createExpectedTestMessages(expectedStreams)); + expectedMessages.addAll(createExpectedTestMessages(expectedStreams, expectedRecordMessages.size())); assertEquals(expectedMessages.size(), actualMessages.size()); assertTrue(expectedMessages.containsAll(actualMessages)); @@ -934,7 +935,7 @@ protected List getTestMessages() { COL_UPDATED_AT, "2006-10-19"))))); } - protected List createExpectedTestMessages(final List states) { + protected List createExpectedTestMessages(final List states, final long numRecords) { return states.stream() .map(s -> new AirbyteMessage().withType(Type.STATE) .withState( @@ -942,7 +943,8 @@ protected List createExpectedTestMessages(final List legacyStates) { + protected AirbyteMessage createStateMessage(final DbStreamState dbStreamState, final List legacyStates, final long recordCount) { return new AirbyteMessage().withType(Type.STATE) .withState( new AirbyteStateMessage().withType(AirbyteStateType.STREAM) @@ -1070,7 +1072,8 @@ protected AirbyteMessage createStateMessage(final DbStreamState dbStreamState, f .withStreamDescriptor(new StreamDescriptor().withNamespace(dbStreamState.getStreamNamespace()) .withName(dbStreamState.getStreamName())) .withStreamState(Jsons.jsonNode(dbStreamState))) - .withData(Jsons.jsonNode(new DbState().withCdc(false).withStreams(legacyStates)))); + .withData(Jsons.jsonNode(new DbState().withCdc(false).withStreams(legacyStates))) + .withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount))); } protected List extractSpecificFieldFromCombinedMessages(final List messages, diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index 77f896ef719d..77b7c8054490 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -7,7 +7,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.10.3' + cdkVersionRequired = '0.11.4' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 10430558ac71..caaf7458309b 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.3.1 + dockerImageTag: 3.3.2 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadGlobalStateManager.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadGlobalStateManager.java index 3393e68ae124..e810d860e4c8 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadGlobalStateManager.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadGlobalStateManager.java @@ -86,7 +86,8 @@ public void updatePrimaryKeyLoadState(final AirbyteStreamNameNamespacePair pair, } @Override - public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun) { + public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, + final JsonNode streamStateForIncrementalRun) { streamsThatHaveCompletedSnapshot.add(pair); final List streamStates = new ArrayList<>(); streamsThatHaveCompletedSnapshot.forEach(stream -> { diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadStateManager.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadStateManager.java index 7bb6a7b846ae..be5cec573294 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadStateManager.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadStateManager.java @@ -25,7 +25,8 @@ public interface MySqlInitialLoadStateManager { void updatePrimaryKeyLoadState(final AirbyteStreamNameNamespacePair pair, final PrimaryKeyLoadStatus pkLoadStatus); // Returns the final state message for the initial sync. - AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun); + AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, + final JsonNode streamStateForIncrementalRun); // Returns the previous state emitted, represented as a {@link PrimaryKeyLoadStatus} associated with // the stream. diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java index e918e29e3da1..643acf6d0f72 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java @@ -288,6 +288,15 @@ protected void assertExpectedStateMessages(final List state assertStateTypes(stateMessages, 4); } + @Override + protected void assertExpectedStateMessagesWithTotalCount(final List stateMessages, final long totalRecordCount) { + long actualRecordCount = 0L; + for (final AirbyteStateMessage message : stateMessages) { + actualRecordCount += message.getSourceStats().getRecordCount(); + } + assertEquals(actualRecordCount, totalRecordCount); + } + @Override protected void assertExpectedStateMessagesFromIncrementalSync(final List stateMessages) { assertEquals(1, stateMessages.size()); @@ -433,6 +442,7 @@ public void syncWouldWorkWithDBWithInvalidTimezone() throws Exception { assertExpectedRecords(new HashSet<>(MODEL_RECORDS), recordMessages); assertExpectedStateMessages(stateMessages); + assertExpectedStateMessagesWithTotalCount(stateMessages, 6); } @Test @@ -451,6 +461,7 @@ public void testCompositeIndexInitialLoad() throws Exception { final List stateMessages1 = extractStateMessages(actualRecords1); assertExpectedRecords(new HashSet<>(MODEL_RECORDS), recordMessages1); assertExpectedStateMessages(stateMessages1); + assertExpectedStateMessagesWithTotalCount(stateMessages1, 6); // Re-run the sync with state associated with record w/ id = 15 (second to last record). // We expect to read 2 records, since in the case of a composite PK we issue a >= query. @@ -514,6 +525,8 @@ public void testTwoStreamSync() throws Exception { final Set recordMessages1 = extractRecordMessages(actualRecords1); final List stateMessages1 = extractStateMessages(actualRecords1); assertEquals(13, stateMessages1.size()); + assertExpectedStateMessagesWithTotalCount(stateMessages1, 12); + JsonNode sharedState = null; StreamDescriptor firstStreamInState = null; for (int i = 0; i < stateMessages1.size(); i++) { @@ -582,6 +595,8 @@ public void testTwoStreamSync() throws Exception { final List stateMessages2 = extractStateMessages(actualRecords2); assertEquals(6, stateMessages2.size()); + // State was reset to the 7th; thus 5 remaining records were expected to be reloaded. + assertExpectedStateMessagesWithTotalCount(stateMessages2, 5); for (int i = 0; i < stateMessages2.size(); i++) { final AirbyteStateMessage stateMessage = stateMessages2.get(i); assertEquals(AirbyteStateType.GLOBAL, stateMessage.getType()); diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java index 50d81f0664e4..d6597cd2b023 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java @@ -38,6 +38,7 @@ import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import io.airbyte.protocol.models.v0.AirbyteStateMessage; import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType; +import io.airbyte.protocol.models.v0.AirbyteStateStats; import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.AirbyteStreamState; import io.airbyte.protocol.models.v0.CatalogHelpers; @@ -402,7 +403,7 @@ protected List getExpectedAirbyteMessagesSecondSync(final String .withCursor("5") .withCursorRecordCount(1L); - expectedMessages.addAll(createExpectedTestMessages(List.of(state))); + expectedMessages.addAll(createExpectedTestMessages(List.of(state), 2L)); return expectedMessages; } @@ -477,14 +478,15 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { // Override from parent class as we're no longer including the legacy Data field. @Override - protected List createExpectedTestMessages(final List states) { + protected List createExpectedTestMessages(final List states, final long numRecords) { return states.stream() .map(s -> new AirbyteMessage().withType(Type.STATE) .withState( new AirbyteStateMessage().withType(AirbyteStateType.STREAM) .withStream(new AirbyteStreamState() .withStreamDescriptor(new StreamDescriptor().withNamespace(s.getStreamNamespace()).withName(s.getStreamName())) - .withStreamState(Jsons.jsonNode(s))))) + .withStreamState(Jsons.jsonNode(s))) + .withSourceStats(new AirbyteStateStats().withRecordCount((double) numRecords)))) .collect( Collectors.toList()); } diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 903833d8c3b7..37dbc1bfe797 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -223,7 +223,8 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.3.1 | 2024-01-03 | [33312](https://github.com/airbytehq/airbyte/pull/33312) | Adding count stats in AirbyteStateMessage | +| 3.3.2 | 2024-01-08 | [33005](https://github.com/airbytehq/airbyte/pull/33005) | Adding count stats for incremental sync in AirbyteStateMessage +| 3.3.1 | 2024-01-03 | [33312](https://github.com/airbytehq/airbyte/pull/33312) | Adding count stats in AirbyteStateMessage | | 3.3.0 | 2023-12-19 | [33436](https://github.com/airbytehq/airbyte/pull/33436) | Remove LEGACY state flag | | 3.2.4 | 2023-12-12 | [33356](https://github.com/airbytehq/airbyte/pull/33210) | Support for better debugging tools.. | | 3.2.3 | 2023-12-08 | [33210](https://github.com/airbytehq/airbyte/pull/33210) | Update MySql driver property value for zero date handling. |