diff --git a/airbyte-integrations/connectors/source-mssql/metadata.yaml b/airbyte-integrations/connectors/source-mssql/metadata.yaml index 186e91b9e44e..05203ea7035a 100644 --- a/airbyte-integrations/connectors/source-mssql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mssql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 - dockerImageTag: 4.1.12 + dockerImageTag: 4.1.13 dockerRepository: airbyte/source-mssql documentationUrl: https://docs.airbyte.com/integrations/sources/mssql githubIssueLabel: source-mssql diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java index da55b7b40c56..4fa63c266c6e 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java @@ -30,6 +30,7 @@ public class MssqlInitialLoadGlobalStateManager extends MssqlInitialLoadStateMan // No special handling for resumable full refresh streams. We will report the cursor as it is. private Set resumableFullRefreshStreams; private Set nonResumableFullRefreshStreams; + private Set completedNonResumableFullRefreshStreams; public MssqlInitialLoadGlobalStateManager(final InitialLoadStreams initialLoadStreams, final Map pairToOrderedColInfo, @@ -61,6 +62,7 @@ private void initStreams(final InitialLoadStreams initialLoadStreams, this.streamsThatHaveCompletedSnapshot = new HashSet<>(); this.resumableFullRefreshStreams = new HashSet<>(); this.nonResumableFullRefreshStreams = new HashSet<>(); + this.completedNonResumableFullRefreshStreams = new HashSet<>(); catalog.getStreams().forEach(configuredAirbyteStream -> { var pairInStream = @@ -70,7 +72,8 @@ private void initStreams(final InitialLoadStreams initialLoadStreams, this.streamsThatHaveCompletedSnapshot.add(pairInStream); } if (configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) { - if (initialLoadStreams.streamsForInitialLoad().contains(configuredAirbyteStream)) { + if (configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey() != null + && !configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey().isEmpty()) { this.resumableFullRefreshStreams.add(pairInStream); } else { this.nonResumableFullRefreshStreams.add(pairInStream); @@ -94,6 +97,12 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb } }); + completedNonResumableFullRefreshStreams.forEach(stream -> { + streamStates.add(new AirbyteStreamState() + .withStreamDescriptor( + new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace()))); + }); + if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) { AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()); @@ -119,10 +128,13 @@ private AirbyteStreamState getAirbyteStreamState(final AirbyteStreamNameNamespac @Override public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream airbyteStream) { + + final io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair pair = new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair( + airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()); if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) { - io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair pair = new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair( - airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()); streamsThatHaveCompletedSnapshot.add(pair); + } else if (nonResumableFullRefreshStreams.contains(pair)) { + completedNonResumableFullRefreshStreams.add(pair); } final List streamStates = new ArrayList<>(); streamsThatHaveCompletedSnapshot.forEach(stream -> { @@ -135,7 +147,7 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(ocStatus))); }); - nonResumableFullRefreshStreams.forEach(stream -> { + completedNonResumableFullRefreshStreams.forEach(stream -> { streamStates.add(new AirbyteStreamState() .withStreamDescriptor( new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace()))); diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 11f1a73bdd89..13d8d6a89561 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.7.1 + dockerImageTag: 3.7.2 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadGlobalStateManager.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadGlobalStateManager.java index 5f2fa13144df..9b3de8d4047e 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadGlobalStateManager.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadGlobalStateManager.java @@ -27,9 +27,12 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MySqlInitialLoadGlobalStateManager extends MySqlInitialLoadStateManager { + private static final Logger LOGGER = LoggerFactory.getLogger(MySqlInitialLoadGlobalStateManager.class); protected StateManager stateManager; // Only one global state is emitted, which is fanned out into many entries in the DB by platform. As @@ -42,6 +45,7 @@ public class MySqlInitialLoadGlobalStateManager extends MySqlInitialLoadStateMan // non ResumableFullRefreshStreams do not have any state. We only report count for them. private Set nonResumableFullRefreshStreams; + private Set completedNonResumableFullRefreshStreams; private final boolean savedOffsetStillPresentOnServer; private final ConfiguredAirbyteCatalog catalog; @@ -69,6 +73,7 @@ private void initStreams(final InitialLoadStreams initialLoadStreams, this.streamsThatHaveCompletedSnapshot = new HashSet<>(); this.resumableFullRefreshStreams = new HashSet<>(); this.nonResumableFullRefreshStreams = new HashSet<>(); + this.completedNonResumableFullRefreshStreams = new HashSet<>(); catalog.getStreams().forEach(configuredAirbyteStream -> { var pairInStream = @@ -78,7 +83,8 @@ private void initStreams(final InitialLoadStreams initialLoadStreams, this.streamsThatHaveCompletedSnapshot.add(pairInStream); } if (configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) { - if (initialLoadStreams.streamsForInitialLoad().contains(configuredAirbyteStream)) { + if (configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey() != null + && !configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey().isEmpty()) { this.resumableFullRefreshStreams.add(pairInStream); } else { this.nonResumableFullRefreshStreams.add(pairInStream); @@ -115,6 +121,13 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(pkStatus)))); } }); + + completedNonResumableFullRefreshStreams.forEach(stream -> { + streamStates.add(new AirbyteStreamState() + .withStreamDescriptor( + new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace()))); + }); + if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) { AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()); @@ -129,10 +142,12 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb @Override public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream airbyteStream) { + final AirbyteStreamNameNamespacePair pair = + new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()); if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) { - AirbyteStreamNameNamespacePair pair = - new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()); streamsThatHaveCompletedSnapshot.add(pair); + } else if (nonResumableFullRefreshStreams.contains(pair)) { + completedNonResumableFullRefreshStreams.add(pair); } final List streamStates = new ArrayList<>(); @@ -146,7 +161,7 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(pkStatus)))); }); - nonResumableFullRefreshStreams.forEach(stream -> { + completedNonResumableFullRefreshStreams.forEach(stream -> { streamStates.add(new AirbyteStreamState() .withStreamDescriptor( new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace()))); diff --git a/docs/integrations/sources/mssql.md b/docs/integrations/sources/mssql.md index 007dc587e2c5..847204d28cc5 100644 --- a/docs/integrations/sources/mssql.md +++ b/docs/integrations/sources/mssql.md @@ -422,6 +422,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura | Version | Date | Pull Request | Subject | |:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| 4.1.13 | 2024-09-05 | [45181](https://github.com/airbytehq/airbyte/pull/45181) | Fix incorrect categorizing resumable/nonresumable full refresh streams. | | 4.1.12 | 2024-09-10 | [45368](https://github.com/airbytehq/airbyte/pull/45368) | Remove excessive debezium logging. | | 4.1.11 | 2024-09-04 | [45142](https://github.com/airbytehq/airbyte/pull/45142) | Fix incorrect datetimeoffset format in cursor state. | | 4.1.10 | 2024-08-27 | [44759](https://github.com/airbytehq/airbyte/pull/44759) | Improve null safety in parsing debezium change events. | diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 97c1f1a22df8..5b398ae5a2f8 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -233,7 +233,8 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.7.1 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. | +| 3.7.2 | 2024-09-05 | [45181](https://github.com/airbytehq/airbyte/pull/45181) | Fix incorrect categorizing resumable/nonresumable full refresh streams. | +| 3.7.1 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. | | 3.7.0 | 2024-08-13 | [44013](https://github.com/airbytehq/airbyte/pull/44013) | Upgrading to Debezium 2.7.1.Final | | 3.6.9 | 2024-08-08 | [43410](https://github.com/airbytehq/airbyte/pull/43410) | Adopt latest CDK. | | 3.6.8 | 2024-07-30 | [42869](https://github.com/airbytehq/airbyte/pull/42869) | Adopt latest CDK. |