Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate only on incremental #14966

Merged
merged 8 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,22 @@ public static AirbyteCatalog configuredCatalogToCatalog(final ConfiguredAirbyteC
.toList());
}

/**
* Converts a {@link ConfiguredAirbyteCatalog} into an {@link AirbyteCatalog}. This is possible
* because the latter is a subset of the former. It filters out the non incremental streams.
*
* @param configuredCatalog - catalog to convert
* @return - airbyte catalog
*/
public static AirbyteCatalog configuredCatalogToCatalogOnlyIncremental(final ConfiguredAirbyteCatalog configuredCatalog) {
Copy link
Contributor

@lmossman lmossman Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is a little bit weird to me, because an AirbyteCatalog with only the incremental-configured streams is not really useful anywhere - the only thing we ever really care about in that object is the stream descriptors of those streams.

What if instead, this PR just added the extractIncrementalStreamDescriptors() method which looked like this:

  public static List<StreamDescriptor> extractIncrementalStreamDescriptors(final ConfiguredAirbyteCatalog configuredCatalog) {
    return configuredCatalog.getStreams()
        .stream()
        .filter(configuredStream -> configuredStream.getSyncMode() == SyncMode.INCREMENTAL)
        .map(configuredStream -> extractDescriptor(configuredStream.getStream()))
        .toList();
  }

This way, this method pulls exactly what it needs out of the configured catalog without producing a strange intermediate catalog object that only contains some streams

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make senses, done.

return new AirbyteCatalog().withStreams(
configuredCatalog.getStreams()
.stream()
.filter(streamAndConfig -> streamAndConfig.getSyncMode() == SyncMode.INCREMENTAL)
.map(ConfiguredAirbyteStream::getStream)
.toList());
}

/**
* Extracts {@link StreamDescriptor} for a given {@link AirbyteStream}
*
Expand Down Expand Up @@ -113,6 +129,17 @@ public static List<StreamDescriptor> extractStreamDescriptors(final ConfiguredAi
return extractStreamDescriptors(configuredCatalogToCatalog(configuredCatalog));
}

/**
* Extracts {@link StreamDescriptor}s for each stream with an incremental {@link SyncMode} in a
* given {@link ConfiguredAirbyteCatalog}
*
* @param configuredCatalog catalog
* @return list of stream descriptors
*/
public static List<StreamDescriptor> extractIncrementalStreamDescriptors(final ConfiguredAirbyteCatalog configuredCatalog) {
return extractStreamDescriptors(configuredCatalogToCatalogOnlyIncremental(configuredCatalog));
}

/**
* Extracts {@link StreamDescriptor}s for each stream in a given {@link AirbyteCatalog}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,25 @@ void testGetCatalogDiff() throws IOException {
assertEquals(expectedDiff, actualDiff.stream().sorted(STREAM_TRANSFORM_COMPARATOR).toList());
}

@Test
void testConfiguredCatalogToCatalogOnlyIncremental() {
benmoriceau marked this conversation as resolved.
Show resolved Hide resolved
final ConfiguredAirbyteCatalog configuredCatalog = new ConfiguredAirbyteCatalog()
.withStreams(List.of(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withStream(
new AirbyteStream()
.withName("one")),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.FULL_REFRESH)
.withStream(
new AirbyteStream()
.withName("one"))));

final AirbyteCatalog catalog = CatalogHelpers.configuredCatalogToCatalogOnlyIncremental(configuredCatalog);

assertEquals(1, catalog.getStreams().size());
assertEquals("one", catalog.getStreams().get(0).getName());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public boolean persist(final UUID connectionId, final StandardSyncOutput syncOut
void validateStreamStates(final StateWrapper state, final ConfiguredAirbyteCatalog configuredCatalog) {
final List<StreamDescriptor> stateStreamDescriptors =
state.getStateMessages().stream().map(stateMessage -> stateMessage.getStream().getStreamDescriptor()).toList();
final List<StreamDescriptor> catalogStreamDescriptors = CatalogHelpers.extractStreamDescriptors(configuredCatalog);
final List<StreamDescriptor> catalogStreamDescriptors = CatalogHelpers.extractIncrementalStreamDescriptors(configuredCatalog);
catalogStreamDescriptors.forEach(streamDescriptor -> {
if (!stateStreamDescriptors.contains(streamDescriptor)) {
throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -98,6 +99,7 @@ void testPersistWithInvalidStateDuringMigration() throws IOException {
void testPersistWithValidStateDuringMigration() throws IOException {
final ConfiguredAirbyteStream stream = new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("a").withNamespace("a1"));
final ConfiguredAirbyteStream stream2 = new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("b"));
final ConfiguredAirbyteStream stream3 = new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("c")).withSyncMode(SyncMode.FULL_REFRESH);

final AirbyteStateMessage stateMessage1 = new AirbyteStateMessage()
.withType(AirbyteStateType.STREAM)
Expand All @@ -111,7 +113,7 @@ void testPersistWithValidStateDuringMigration() throws IOException {
final JsonNode jsonState = Jsons.jsonNode(List.of(stateMessage1, stateMessage2));
final State state = new State().withState(jsonState);

final ConfiguredAirbyteCatalog migrationConfiguredCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(stream, stream2));
final ConfiguredAirbyteCatalog migrationConfiguredCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(stream, stream2, stream3));
final StandardSyncOutput syncOutput = new StandardSyncOutput().withState(state);
Mockito.when(featureFlags.useStreamCapableState()).thenReturn(true);
Mockito.when(statePersistence.isMigration(Mockito.eq(CONNECTION_ID), Mockito.eq(StateType.STREAM), Mockito.any(Optional.class))).thenReturn(true);
Expand Down