diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/CatalogClientConverters.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/CatalogClientConverters.java index c5f4b48be001..52cf8c1c2db1 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/CatalogClientConverters.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/CatalogClientConverters.java @@ -4,9 +4,19 @@ package io.airbyte.workers.helper; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.api.client.model.generated.AirbyteStreamConfiguration; +import io.airbyte.api.client.model.generated.DestinationSyncMode; +import io.airbyte.api.client.model.generated.SyncMode; import io.airbyte.commons.enums.Enums; import io.airbyte.commons.text.Names; import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.validation.json.JsonValidationException; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; /** @@ -17,6 +27,85 @@ */ public class CatalogClientConverters { + /** + * + * @param catalog + * @return + */ + public static io.airbyte.protocol.models.AirbyteCatalog toAirbyteProtocol(final io.airbyte.api.client.model.generated.AirbyteCatalog catalog) { + + io.airbyte.protocol.models.AirbyteCatalog protoCatalog = + new io.airbyte.protocol.models.AirbyteCatalog(); + var airbyteStream = catalog.getStreams().stream().map(stream -> { + try { + return toConfiguredProtocol(stream.getStream(), stream.getConfig()); + } catch (JsonValidationException e) { + return null; + } + }).collect(Collectors.toList()); + + protoCatalog.withStreams(airbyteStream); + return protoCatalog; + } + + @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") + private static io.airbyte.protocol.models.AirbyteStream toConfiguredProtocol(final io.airbyte.api.client.model.generated.AirbyteStream stream, + AirbyteStreamConfiguration config) + throws JsonValidationException { + if (config.getFieldSelectionEnabled() != null && config.getFieldSelectionEnabled()) { + // Validate the selected field paths. + if (config.getSelectedFields() == null) { + throw new JsonValidationException("Requested field selection but no selected fields provided"); + } + final JsonNode properties = stream.getJsonSchema().findValue("properties"); + if (properties == null || !properties.isObject()) { + throw new JsonValidationException("Requested field selection but no properties node found"); + } + for (final var selectedFieldInfo : config.getSelectedFields()) { + if (selectedFieldInfo.getFieldPath() == null || selectedFieldInfo.getFieldPath().isEmpty()) { + throw new JsonValidationException("Selected field path cannot be empty"); + } + if (selectedFieldInfo.getFieldPath().size() > 1) { + // TODO(mfsiega-airbyte): support nested fields. + throw new UnsupportedOperationException("Nested field selection not supported"); + } + } + // Only include the selected fields. + // NOTE: we verified above that each selected field has at least one element in the field path. + final Set selectedFieldNames = + config.getSelectedFields().stream().map((field) -> field.getFieldPath().get(0)).collect(Collectors.toSet()); + // TODO(mfsiega-airbyte): we only check the top level of the cursor/primary key fields because we + // don't support filtering nested fields yet. + if (config.getSyncMode().equals(SyncMode.INCREMENTAL) // INCREMENTAL sync mode, AND + && !config.getCursorField().isEmpty() // There is a cursor configured, AND + && !selectedFieldNames.contains(config.getCursorField().get(0))) { // The cursor isn't in the selected fields. + throw new JsonValidationException("Cursor field cannot be de-selected in INCREMENTAL syncs"); + } + if (config.getDestinationSyncMode().equals(DestinationSyncMode.APPEND_DEDUP)) { + for (final List primaryKeyComponent : config.getPrimaryKey()) { + if (!selectedFieldNames.contains(primaryKeyComponent.get(0))) { + throw new JsonValidationException("Primary key field cannot be de-selected in DEDUP mode"); + } + } + } + for (final String selectedFieldName : selectedFieldNames) { + if (!properties.has(selectedFieldName)) { + throw new JsonValidationException(String.format("Requested selected field %s not found in JSON schema", selectedFieldName)); + } + } + ((ObjectNode) properties).retain(selectedFieldNames); + } + return new io.airbyte.protocol.models.AirbyteStream() + .withName(stream.getName()) + .withJsonSchema(stream.getJsonSchema()) + .withSupportedSyncModes(Enums.convertListTo(stream.getSupportedSyncModes(), io.airbyte.protocol.models.SyncMode.class)) + .withSourceDefinedCursor(stream.getSourceDefinedCursor()) + .withDefaultCursorField(stream.getDefaultCursorField()) + .withSourceDefinedPrimaryKey( + Optional.ofNullable(stream.getSourceDefinedPrimaryKey()).orElse(Collections.emptyList())) + .withNamespace(stream.getNamespace()); + } + /** * Converts a protocol AirbyteCatalog to an OpenAPI client versioned AirbyteCatalog. */ diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/helper/CatalogClientConvertersTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/helper/CatalogClientConvertersTest.java index ae2c4ae22c99..b2a009c5e643 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/helper/CatalogClientConvertersTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/helper/CatalogClientConvertersTest.java @@ -65,4 +65,10 @@ void testConvertToClientAPI() { CatalogClientConverters.toAirbyteCatalogClientApi(BASIC_MODEL_CATALOG)); } + @Test + void testConvertToProtocol() { + assertEquals(BASIC_MODEL_CATALOG, + CatalogClientConverters.toAirbyteProtocol(EXPECTED_CLIENT_CATALOG)); + } + } diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java index 600e2f7cdcee..e245ec5df6df 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java @@ -37,6 +37,7 @@ import io.airbyte.workers.general.DefaultCheckConnectionWorker; import io.airbyte.workers.general.DefaultDiscoverCatalogWorker; import io.airbyte.workers.general.DefaultGetSpecWorker; +import io.airbyte.workers.helper.CatalogClientConverters; import io.airbyte.workers.helper.ConnectorConfigUpdater; import io.airbyte.workers.helper.EntrypointEnvChecker; import io.airbyte.workers.internal.AirbyteSource; @@ -124,12 +125,9 @@ public abstract class AbstractSourceConnectorTest { private ConnectorConfigUpdater mConnectorConfigUpdater; - // This has to be using the protocol version of the platform in order to capture the arg - private final ArgumentCaptor lastPersistedCatalog = - ArgumentCaptor.forClass(io.airbyte.protocol.models.AirbyteCatalog.class); - protected AirbyteCatalog getLastPersistedCatalog() { - return convertProtocolObject(lastPersistedCatalog.getValue(), AirbyteCatalog.class); + return convertProtocolObject( + CatalogClientConverters.toAirbyteProtocol(discoverWriteRequest.getValue().getCatalog()), AirbyteCatalog.class); } private final ArgumentCaptor discoverWriteRequest =