Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for acceptance test on discover workflow #22595

Merged
merged 10 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,20 @@

package io.airbyte.workers.helper;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.api.client.model.generated.AirbyteStreamConfiguration;
import io.airbyte.api.client.model.generated.DestinationSyncMode;
import io.airbyte.api.client.model.generated.SyncMode;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.text.Names;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.validation.json.JsonValidationException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand All @@ -17,6 +28,86 @@
*/
public class CatalogClientConverters {

/**
*
* @param catalog
* @return
*/
public static io.airbyte.protocol.models.AirbyteCatalog toAirbyteProtocol(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It definitely makes me feel uneasy to duplicate such complicated and hard-to-maintain code. It seems like it will be very likely that this code will need to change in both places, but it will only end up being changed in a single place because it isn't obvious that there are multiple implementations of nearly the exact same thing.

I don't really have a good answer for this right now, since the conversion logic isn't actually duplicated (in one case, they are server-side java models, and here they are client-side java models, so they're technically different types).

I think it would be ideal to add very apparent comments to this class AND to the other CatalogConverter class that the logic is duplicated for now, and we should discuss and try to prioritize an issue for coming up with a better approach for these super-complicated catalog conversion methods.

I don't want to block this PR, but I want to make sure we're doing everything we can to minimize the tech debt here and come up with a solid long term solution!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree - conversion between those three types with added logic for stream makes it really hard to maintain.

final io.airbyte.api.client.model.generated.AirbyteCatalog catalog) {

io.airbyte.protocol.models.AirbyteCatalog protoCatalog =
new io.airbyte.protocol.models.AirbyteCatalog();
var airbyteStream = catalog.getStreams().stream().map(stream -> {
try {
return toConfiguredProtocol(stream.getStream(), stream.getConfig());
} catch (JsonValidationException e) {
return null;
}
}).collect(Collectors.toList());

protoCatalog.withStreams(airbyteStream);
return protoCatalog;
}


@SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
private static io.airbyte.protocol.models.AirbyteStream toConfiguredProtocol(final io.airbyte.api.client.model.generated.AirbyteStream stream, AirbyteStreamConfiguration config)
throws JsonValidationException {
if (config.getFieldSelectionEnabled() != null && config.getFieldSelectionEnabled()) {
// Validate the selected field paths.
if (config.getSelectedFields() == null) {
throw new JsonValidationException("Requested field selection but no selected fields provided");
}
final JsonNode properties = stream.getJsonSchema().findValue("properties");
if (properties == null || !properties.isObject()) {
throw new JsonValidationException("Requested field selection but no properties node found");
}
for (final var selectedFieldInfo : config.getSelectedFields()) {
if (selectedFieldInfo.getFieldPath() == null || selectedFieldInfo.getFieldPath().isEmpty()) {
throw new JsonValidationException("Selected field path cannot be empty");
}
if (selectedFieldInfo.getFieldPath().size() > 1) {
// TODO(mfsiega-airbyte): support nested fields.
throw new UnsupportedOperationException("Nested field selection not supported");
}
}
// Only include the selected fields.
// NOTE: we verified above that each selected field has at least one element in the field path.
final Set<String> selectedFieldNames =
config.getSelectedFields().stream().map((field) -> field.getFieldPath().get(0)).collect(Collectors.toSet());
// TODO(mfsiega-airbyte): we only check the top level of the cursor/primary key fields because we
// don't support filtering nested fields yet.
if (config.getSyncMode().equals(SyncMode.INCREMENTAL) // INCREMENTAL sync mode, AND
&& !config.getCursorField().isEmpty() // There is a cursor configured, AND
&& !selectedFieldNames.contains(config.getCursorField().get(0))) { // The cursor isn't in the selected fields.
throw new JsonValidationException("Cursor field cannot be de-selected in INCREMENTAL syncs");
}
if (config.getDestinationSyncMode().equals(DestinationSyncMode.APPEND_DEDUP)) {
for (final List<String> primaryKeyComponent : config.getPrimaryKey()) {
if (!selectedFieldNames.contains(primaryKeyComponent.get(0))) {
throw new JsonValidationException("Primary key field cannot be de-selected in DEDUP mode");
}
}
}
for (final String selectedFieldName : selectedFieldNames) {
if (!properties.has(selectedFieldName)) {
throw new JsonValidationException(String.format("Requested selected field %s not found in JSON schema", selectedFieldName));
}
}
((ObjectNode) properties).retain(selectedFieldNames);
}
return new io.airbyte.protocol.models.AirbyteStream()
.withName(stream.getName())
.withJsonSchema(stream.getJsonSchema())
.withSupportedSyncModes(Enums.convertListTo(stream.getSupportedSyncModes(), io.airbyte.protocol.models.SyncMode.class))
.withSourceDefinedCursor(stream.getSourceDefinedCursor())
.withDefaultCursorField(stream.getDefaultCursorField())
.withSourceDefinedPrimaryKey(
Optional.ofNullable(stream.getSourceDefinedPrimaryKey()).orElse(Collections.emptyList()))
.withNamespace(stream.getNamespace());
}

/**
* Converts a protocol AirbyteCatalog to an OpenAPI client versioned AirbyteCatalog.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,10 @@ void testConvertToClientAPI() {
CatalogClientConverters.toAirbyteCatalogClientApi(BASIC_MODEL_CATALOG));
}

@Test
void testConvertToProtocol() {
assertEquals(BASIC_MODEL_CATALOG,
CatalogClientConverters.toAirbyteProtocol(EXPECTED_CLIENT_CATALOG));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.airbyte.workers.general.DefaultCheckConnectionWorker;
import io.airbyte.workers.general.DefaultDiscoverCatalogWorker;
import io.airbyte.workers.general.DefaultGetSpecWorker;
import io.airbyte.workers.helper.CatalogClientConverters;
import io.airbyte.workers.helper.ConnectorConfigUpdater;
import io.airbyte.workers.helper.EntrypointEnvChecker;
import io.airbyte.workers.internal.AirbyteSource;
Expand Down Expand Up @@ -125,11 +126,9 @@ public abstract class AbstractSourceConnectorTest {
private ConnectorConfigUpdater mConnectorConfigUpdater;

// This has to be using the protocol version of the platform in order to capture the arg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this comment makes sense to keep anymore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that meant to be AirbyteCatalog as it has to be using protocol version, but yeah definitely not used to capture the arg

private final ArgumentCaptor<io.airbyte.protocol.models.AirbyteCatalog> lastPersistedCatalog =
ArgumentCaptor.forClass(io.airbyte.protocol.models.AirbyteCatalog.class);

protected AirbyteCatalog getLastPersistedCatalog() {
return convertProtocolObject(lastPersistedCatalog.getValue(), AirbyteCatalog.class);
return convertProtocolObject(
CatalogClientConverters.toAirbyteProtocol(discoverWriteRequest.getValue().getCatalog()), AirbyteCatalog.class);
}

private final ArgumentCaptor<SourceDiscoverSchemaWriteRequestBody> discoverWriteRequest =
Expand Down