Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[source-mysql/mssql] Fix state manager on determining non-resumable streams #45181

Merged
merged 8 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.1.11
dockerImageTag: 4.1.12
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class MssqlInitialLoadGlobalStateManager extends MssqlInitialLoadStateMan
// No special handling for resumable full refresh streams. We will report the cursor as it is.
private Set<AirbyteStreamNameNamespacePair> resumableFullRefreshStreams;
private Set<AirbyteStreamNameNamespacePair> nonResumableFullRefreshStreams;
private Set<AirbyteStreamNameNamespacePair> completedNonResumableFullRefreshStreams;

public MssqlInitialLoadGlobalStateManager(final InitialLoadStreams initialLoadStreams,
final Map<AirbyteStreamNameNamespacePair, OrderedColumnInfo> pairToOrderedColInfo,
Expand Down Expand Up @@ -61,6 +62,7 @@ private void initStreams(final InitialLoadStreams initialLoadStreams,
this.streamsThatHaveCompletedSnapshot = new HashSet<>();
this.resumableFullRefreshStreams = new HashSet<>();
this.nonResumableFullRefreshStreams = new HashSet<>();
this.completedNonResumableFullRefreshStreams = new HashSet<>();

catalog.getStreams().forEach(configuredAirbyteStream -> {
var pairInStream =
Expand All @@ -70,7 +72,8 @@ private void initStreams(final InitialLoadStreams initialLoadStreams,
this.streamsThatHaveCompletedSnapshot.add(pairInStream);
}
if (configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
if (initialLoadStreams.streamsForInitialLoad().contains(configuredAirbyteStream)) {
if (configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey() != null
&& !configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey().isEmpty()) {
this.resumableFullRefreshStreams.add(pairInStream);
} else {
this.nonResumableFullRefreshStreams.add(pairInStream);
Expand All @@ -94,6 +97,12 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb
}
});

completedNonResumableFullRefreshStreams.forEach(stream -> {
streamStates.add(new AirbyteStreamState()
.withStreamDescriptor(
new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace())));
});

if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
AirbyteStreamNameNamespacePair pair =
new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
Expand All @@ -119,10 +128,13 @@ private AirbyteStreamState getAirbyteStreamState(final AirbyteStreamNameNamespac

@Override
public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream airbyteStream) {

final io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair pair = new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair(
airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair pair = new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair(
airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
streamsThatHaveCompletedSnapshot.add(pair);
} else if (nonResumableFullRefreshStreams.contains(pair)) {
completedNonResumableFullRefreshStreams.add(pair);
}
final List<AirbyteStreamState> streamStates = new ArrayList<>();
streamsThatHaveCompletedSnapshot.forEach(stream -> {
Expand All @@ -135,7 +147,7 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream
streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(ocStatus)));
});

nonResumableFullRefreshStreams.forEach(stream -> {
completedNonResumableFullRefreshStreams.forEach(stream -> {
streamStates.add(new AirbyteStreamState()
.withStreamDescriptor(
new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.7.1
dockerImageTag: 3.7.2
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlInitialLoadGlobalStateManager extends MySqlInitialLoadStateManager {

private static final Logger LOGGER = LoggerFactory.getLogger(MySqlInitialLoadGlobalStateManager.class);
protected StateManager stateManager;

// Only one global state is emitted, which is fanned out into many entries in the DB by platform. As
Expand All @@ -42,6 +45,7 @@ public class MySqlInitialLoadGlobalStateManager extends MySqlInitialLoadStateMan

// non ResumableFullRefreshStreams do not have any state. We only report count for them.
private Set<AirbyteStreamNameNamespacePair> nonResumableFullRefreshStreams;
private Set<AirbyteStreamNameNamespacePair> completedNonResumableFullRefreshStreams;

private final boolean savedOffsetStillPresentOnServer;
private final ConfiguredAirbyteCatalog catalog;
Expand Down Expand Up @@ -69,6 +73,7 @@ private void initStreams(final InitialLoadStreams initialLoadStreams,
this.streamsThatHaveCompletedSnapshot = new HashSet<>();
this.resumableFullRefreshStreams = new HashSet<>();
this.nonResumableFullRefreshStreams = new HashSet<>();
this.completedNonResumableFullRefreshStreams = new HashSet<>();

catalog.getStreams().forEach(configuredAirbyteStream -> {
var pairInStream =
Expand All @@ -78,7 +83,8 @@ private void initStreams(final InitialLoadStreams initialLoadStreams,
this.streamsThatHaveCompletedSnapshot.add(pairInStream);
}
if (configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
if (initialLoadStreams.streamsForInitialLoad().contains(configuredAirbyteStream)) {
if (configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey() != null
&& !configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey().isEmpty()) {
this.resumableFullRefreshStreams.add(pairInStream);
} else {
this.nonResumableFullRefreshStreams.add(pairInStream);
Expand Down Expand Up @@ -115,6 +121,13 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb
streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(pkStatus))));
}
});

completedNonResumableFullRefreshStreams.forEach(stream -> {
streamStates.add(new AirbyteStreamState()
.withStreamDescriptor(
new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace())));
});

if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
AirbyteStreamNameNamespacePair pair =
new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
Expand All @@ -129,10 +142,12 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb

@Override
public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream airbyteStream) {
final AirbyteStreamNameNamespacePair pair =
new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
AirbyteStreamNameNamespacePair pair =
new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
streamsThatHaveCompletedSnapshot.add(pair);
} else if (nonResumableFullRefreshStreams.contains(pair)) {
completedNonResumableFullRefreshStreams.add(pair);
}
final List<AirbyteStreamState> streamStates = new ArrayList<>();

Expand All @@ -146,7 +161,7 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream
streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(pkStatus))));
});

nonResumableFullRefreshStreams.forEach(stream -> {
completedNonResumableFullRefreshStreams.forEach(stream -> {
streamStates.add(new AirbyteStreamState()
.withStreamDescriptor(
new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace())));
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.1.12 | 2024-09-05 | [45181](https://github.com/airbytehq/airbyte/pull/45181) | Fix incorrect categorizing resumable/nonresumable full refresh streams. |
| 4.1.11 | 2024-09-04 | [45142](https://github.com/airbytehq/airbyte/pull/45142) | Fix incorrect datetimeoffset format in cursor state. |
| 4.1.10 | 2024-08-27 | [44759](https://github.com/airbytehq/airbyte/pull/44759) | Improve null safety in parsing debezium change events. |
| 4.1.9 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. |
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.7.1 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. |
| 3.7.2 | 2024-09-05 | [45181](https://github.com/airbytehq/airbyte/pull/45181) | Fix incorrect categorizing resumable/nonresumable full refresh streams. |
| 3.7.1 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. |
| 3.7.0 | 2024-08-13 | [44013](https://github.com/airbytehq/airbyte/pull/44013) | Upgrading to Debezium 2.7.1.Final |
| 3.6.9 | 2024-08-08 | [43410](https://github.com/airbytehq/airbyte/pull/43410) | Adopt latest CDK. |
| 3.6.8 | 2024-07-30 | [42869](https://github.com/airbytehq/airbyte/pull/42869) | Adopt latest CDK. |
Expand Down
Loading