Skip to content

Commit

Permalink
Fix for acceptance test on discover workflow (#22595)
Browse files Browse the repository at this point in the history
* fix test

* remove unused var

* add converter into test

* use converters to convert client catalog to proto

* remove cdk related changes

* more cdk remove

* Minor format changes

* remove untrue comment

* Minor format changes

---------

Co-authored-by: Sergio Ropero <42538006+sergio-ropero@users.noreply.github.com>
Co-authored-by: Sergio Ropero <sergio@airbyte.io>
  • Loading branch information
3 people authored Feb 9, 2023
1 parent b9609ab commit 0ba609c
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,19 @@

package io.airbyte.workers.helper;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.api.client.model.generated.AirbyteStreamConfiguration;
import io.airbyte.api.client.model.generated.DestinationSyncMode;
import io.airbyte.api.client.model.generated.SyncMode;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.text.Names;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.validation.json.JsonValidationException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand All @@ -17,6 +27,85 @@
*/
public class CatalogClientConverters {

/**
*
* @param catalog
* @return
*/
public static io.airbyte.protocol.models.AirbyteCatalog toAirbyteProtocol(final io.airbyte.api.client.model.generated.AirbyteCatalog catalog) {

io.airbyte.protocol.models.AirbyteCatalog protoCatalog =
new io.airbyte.protocol.models.AirbyteCatalog();
var airbyteStream = catalog.getStreams().stream().map(stream -> {
try {
return toConfiguredProtocol(stream.getStream(), stream.getConfig());
} catch (JsonValidationException e) {
return null;
}
}).collect(Collectors.toList());

protoCatalog.withStreams(airbyteStream);
return protoCatalog;
}

@SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
private static io.airbyte.protocol.models.AirbyteStream toConfiguredProtocol(final io.airbyte.api.client.model.generated.AirbyteStream stream,
AirbyteStreamConfiguration config)
throws JsonValidationException {
if (config.getFieldSelectionEnabled() != null && config.getFieldSelectionEnabled()) {
// Validate the selected field paths.
if (config.getSelectedFields() == null) {
throw new JsonValidationException("Requested field selection but no selected fields provided");
}
final JsonNode properties = stream.getJsonSchema().findValue("properties");
if (properties == null || !properties.isObject()) {
throw new JsonValidationException("Requested field selection but no properties node found");
}
for (final var selectedFieldInfo : config.getSelectedFields()) {
if (selectedFieldInfo.getFieldPath() == null || selectedFieldInfo.getFieldPath().isEmpty()) {
throw new JsonValidationException("Selected field path cannot be empty");
}
if (selectedFieldInfo.getFieldPath().size() > 1) {
// TODO(mfsiega-airbyte): support nested fields.
throw new UnsupportedOperationException("Nested field selection not supported");
}
}
// Only include the selected fields.
// NOTE: we verified above that each selected field has at least one element in the field path.
final Set<String> selectedFieldNames =
config.getSelectedFields().stream().map((field) -> field.getFieldPath().get(0)).collect(Collectors.toSet());
// TODO(mfsiega-airbyte): we only check the top level of the cursor/primary key fields because we
// don't support filtering nested fields yet.
if (config.getSyncMode().equals(SyncMode.INCREMENTAL) // INCREMENTAL sync mode, AND
&& !config.getCursorField().isEmpty() // There is a cursor configured, AND
&& !selectedFieldNames.contains(config.getCursorField().get(0))) { // The cursor isn't in the selected fields.
throw new JsonValidationException("Cursor field cannot be de-selected in INCREMENTAL syncs");
}
if (config.getDestinationSyncMode().equals(DestinationSyncMode.APPEND_DEDUP)) {
for (final List<String> primaryKeyComponent : config.getPrimaryKey()) {
if (!selectedFieldNames.contains(primaryKeyComponent.get(0))) {
throw new JsonValidationException("Primary key field cannot be de-selected in DEDUP mode");
}
}
}
for (final String selectedFieldName : selectedFieldNames) {
if (!properties.has(selectedFieldName)) {
throw new JsonValidationException(String.format("Requested selected field %s not found in JSON schema", selectedFieldName));
}
}
((ObjectNode) properties).retain(selectedFieldNames);
}
return new io.airbyte.protocol.models.AirbyteStream()
.withName(stream.getName())
.withJsonSchema(stream.getJsonSchema())
.withSupportedSyncModes(Enums.convertListTo(stream.getSupportedSyncModes(), io.airbyte.protocol.models.SyncMode.class))
.withSourceDefinedCursor(stream.getSourceDefinedCursor())
.withDefaultCursorField(stream.getDefaultCursorField())
.withSourceDefinedPrimaryKey(
Optional.ofNullable(stream.getSourceDefinedPrimaryKey()).orElse(Collections.emptyList()))
.withNamespace(stream.getNamespace());
}

/**
* Converts a protocol AirbyteCatalog to an OpenAPI client versioned AirbyteCatalog.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,10 @@ void testConvertToClientAPI() {
CatalogClientConverters.toAirbyteCatalogClientApi(BASIC_MODEL_CATALOG));
}

@Test
void testConvertToProtocol() {
assertEquals(BASIC_MODEL_CATALOG,
CatalogClientConverters.toAirbyteProtocol(EXPECTED_CLIENT_CATALOG));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.airbyte.workers.general.DefaultCheckConnectionWorker;
import io.airbyte.workers.general.DefaultDiscoverCatalogWorker;
import io.airbyte.workers.general.DefaultGetSpecWorker;
import io.airbyte.workers.helper.CatalogClientConverters;
import io.airbyte.workers.helper.ConnectorConfigUpdater;
import io.airbyte.workers.helper.EntrypointEnvChecker;
import io.airbyte.workers.internal.AirbyteSource;
Expand Down Expand Up @@ -124,12 +125,9 @@ public abstract class AbstractSourceConnectorTest {

private ConnectorConfigUpdater mConnectorConfigUpdater;

// This has to be using the protocol version of the platform in order to capture the arg
private final ArgumentCaptor<io.airbyte.protocol.models.AirbyteCatalog> lastPersistedCatalog =
ArgumentCaptor.forClass(io.airbyte.protocol.models.AirbyteCatalog.class);

protected AirbyteCatalog getLastPersistedCatalog() {
return convertProtocolObject(lastPersistedCatalog.getValue(), AirbyteCatalog.class);
return convertProtocolObject(
CatalogClientConverters.toAirbyteProtocol(discoverWriteRequest.getValue().getCatalog()), AirbyteCatalog.class);
}

private final ArgumentCaptor<SourceDiscoverSchemaWriteRequestBody> discoverWriteRequest =
Expand Down

0 comments on commit 0ba609c

Please sign in to comment.