diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java index fb0bcd3dc761..8864c1b8130f 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java @@ -69,16 +69,21 @@ private static io.airbyte.protocol.models.AirbyteStream toProtocol(final Airbyte } } // Only include the selected fields. + // NOTE: we verified above that each selected field has at least one element in the field path. final Set selectedFieldNames = config.getSelectedFields().stream().map((field) -> field.getFieldPath().get(0)).collect(Collectors.toSet()); // TODO(mfsiega-airbyte): we only check the top level of the cursor/primary key fields because we // don't support filtering nested fields yet. - if (!selectedFieldNames.contains(config.getCursorField().get(0)) && config.getSyncMode().equals(SyncMode.INCREMENTAL)) { + if (config.getSyncMode().equals(SyncMode.INCREMENTAL) // INCREMENTAL sync mode, AND + && !config.getCursorField().isEmpty() // There is a cursor configured, AND + && !selectedFieldNames.contains(config.getCursorField().get(0))) { // The cursor isn't in the selected fields. throw new JsonValidationException("Cursor field cannot be de-selected in INCREMENTAL syncs"); } - for (final List primaryKeyComponent : config.getPrimaryKey()) { - if (!selectedFieldNames.contains(primaryKeyComponent.get(0)) && config.getDestinationSyncMode().equals(DestinationSyncMode.APPEND_DEDUP)) { - throw new JsonValidationException("Primary key field cannot be de-selected in DEDUP mode"); + if (config.getDestinationSyncMode().equals(DestinationSyncMode.APPEND_DEDUP)) { + for (final List primaryKeyComponent : config.getPrimaryKey()) { + if (!selectedFieldNames.contains(primaryKeyComponent.get(0))) { + throw new JsonValidationException("Primary key field cannot be de-selected in DEDUP mode"); + } } } for (final String selectedFieldName : selectedFieldNames) { diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java index 856323207859..9a2364a8cf45 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java @@ -221,20 +221,8 @@ void setUp() throws JsonValidationException, ConfigNotFoundException, IOExceptio @Nested class CreateConnection { - @Test - void testCreateConnection() throws JsonValidationException, ConfigNotFoundException, IOException { - - final AirbyteCatalog catalog = ConnectionHelpers.generateBasicApiCatalog(); - - // set a defaultGeography on the workspace as EU, but expect connection to be - // created AUTO because the ConnectionCreate geography takes precedence over the workspace - // defaultGeography. - final StandardWorkspace workspace = new StandardWorkspace() - .withWorkspaceId(workspaceId) - .withDefaultGeography(Geography.EU); - when(configRepository.getStandardWorkspaceNoSecrets(workspaceId, true)).thenReturn(workspace); - - final ConnectionCreate connectionCreate = new ConnectionCreate() + private ConnectionCreate buildConnectionCreateRequest(final StandardSync standardSync, final AirbyteCatalog catalog) { + return new ConnectionCreate() .sourceId(standardSync.getSourceId()) .destinationId(standardSync.getDestinationId()) .operationIds(standardSync.getOperationIds()) @@ -252,6 +240,22 @@ void testCreateConnection() throws JsonValidationException, ConfigNotFoundExcept .memoryLimit(standardSync.getResourceRequirements().getMemoryLimit())) .sourceCatalogId(standardSync.getSourceCatalogId()) .geography(ApiPojoConverters.toApiGeography(standardSync.getGeography())); + } + + @Test + void testCreateConnection() throws JsonValidationException, ConfigNotFoundException, IOException { + + final AirbyteCatalog catalog = ConnectionHelpers.generateBasicApiCatalog(); + + // set a defaultGeography on the workspace as EU, but expect connection to be + // created AUTO because the ConnectionCreate geography takes precedence over the workspace + // defaultGeography. + final StandardWorkspace workspace = new StandardWorkspace() + .withWorkspaceId(workspaceId) + .withDefaultGeography(Geography.EU); + when(configRepository.getStandardWorkspaceNoSecrets(workspaceId, true)).thenReturn(workspace); + + final ConnectionCreate connectionCreate = buildConnectionCreateRequest(standardSync, catalog); final ConnectionRead actualConnectionRead = connectionsHandler.createConnection(connectionCreate); @@ -277,23 +281,7 @@ void testCreateConnectionUsesDefaultGeographyFromWorkspace() throws JsonValidati final AirbyteCatalog catalog = ConnectionHelpers.generateBasicApiCatalog(); // don't set a geography on the ConnectionCreate to force inheritance from workspace default - final ConnectionCreate connectionCreate = new ConnectionCreate() - .sourceId(standardSync.getSourceId()) - .destinationId(standardSync.getDestinationId()) - .operationIds(standardSync.getOperationIds()) - .name(PRESTO_TO_HUDI) - .namespaceDefinition(NamespaceDefinitionType.SOURCE) - .namespaceFormat(null) - .prefix(PRESTO_TO_HUDI_PREFIX) - .status(ConnectionStatus.ACTIVE) - .schedule(ConnectionHelpers.generateBasicConnectionSchedule()) - .syncCatalog(catalog) - .resourceRequirements(new io.airbyte.api.model.generated.ResourceRequirements() - .cpuRequest(standardSync.getResourceRequirements().getCpuRequest()) - .cpuLimit(standardSync.getResourceRequirements().getCpuLimit()) - .memoryRequest(standardSync.getResourceRequirements().getMemoryRequest()) - .memoryLimit(standardSync.getResourceRequirements().getMemoryLimit())) - .sourceCatalogId(standardSync.getSourceCatalogId()); + final ConnectionCreate connectionCreate = buildConnectionCreateRequest(standardSync, catalog).geography(null); // set the workspace default to EU final StandardWorkspace workspace = new StandardWorkspace() @@ -319,29 +307,12 @@ void testCreateConnectionWithSelectedFields() throws IOException, JsonValidation .withDefaultGeography(Geography.AUTO); when(configRepository.getStandardWorkspaceNoSecrets(workspaceId, true)).thenReturn(workspace); - final AirbyteCatalog catalog = ConnectionHelpers.generateApiCatalogWithTwoFields(); + final AirbyteCatalog catalogWithSelectedFields = ConnectionHelpers.generateApiCatalogWithTwoFields(); // Only select one of the two fields. - catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true) + catalogWithSelectedFields.getStreams().get(0).getConfig().fieldSelectionEnabled(true) .selectedFields(List.of(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME))); - final ConnectionCreate connectionCreate = new ConnectionCreate() - .sourceId(standardSync.getSourceId()) - .destinationId(standardSync.getDestinationId()) - .operationIds(standardSync.getOperationIds()) - .name(PRESTO_TO_HUDI) - .namespaceDefinition(NamespaceDefinitionType.SOURCE) - .namespaceFormat(null) - .prefix(PRESTO_TO_HUDI_PREFIX) - .status(ConnectionStatus.ACTIVE) - .schedule(ConnectionHelpers.generateBasicConnectionSchedule()) - .syncCatalog(catalog) - .resourceRequirements(new io.airbyte.api.model.generated.ResourceRequirements() - .cpuRequest(standardSync.getResourceRequirements().getCpuRequest()) - .cpuLimit(standardSync.getResourceRequirements().getCpuLimit()) - .memoryRequest(standardSync.getResourceRequirements().getMemoryRequest()) - .memoryLimit(standardSync.getResourceRequirements().getMemoryLimit())) - .sourceCatalogId(standardSync.getSourceCatalogId()) - .geography(ApiPojoConverters.toApiGeography(standardSync.getGeography())); + final ConnectionCreate connectionCreate = buildConnectionCreateRequest(standardSync, catalogWithSelectedFields); final ConnectionRead actualConnectionRead = connectionsHandler.createConnection(connectionCreate); @@ -354,6 +325,35 @@ void testCreateConnectionWithSelectedFields() throws IOException, JsonValidation verify(configRepository).writeStandardSync(standardSync); } + @Test + void testCreateFullRefreshConnectionWithSelectedFields() throws IOException, JsonValidationException, ConfigNotFoundException { + final StandardWorkspace workspace = new StandardWorkspace() + .withWorkspaceId(workspaceId) + .withDefaultGeography(Geography.AUTO); + when(configRepository.getStandardWorkspaceNoSecrets(workspaceId, true)).thenReturn(workspace); + + final AirbyteCatalog fullRefreshCatalogWithSelectedFields = ConnectionHelpers.generateApiCatalogWithTwoFields(); + fullRefreshCatalogWithSelectedFields.getStreams().get(0).getConfig() + .fieldSelectionEnabled(true) + .selectedFields(List.of(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME))) + .cursorField(null) + .syncMode(SyncMode.FULL_REFRESH); + + final ConnectionCreate connectionCreate = buildConnectionCreateRequest(standardSync, fullRefreshCatalogWithSelectedFields); + + final ConnectionRead actualConnectionRead = connectionsHandler.createConnection(connectionCreate); + + final ConnectionRead expectedConnectionRead = ConnectionHelpers.generateExpectedConnectionRead(standardSync); + + assertEquals(expectedConnectionRead, actualConnectionRead); + + standardSync + .withFieldSelectionData(new FieldSelectionData().withAdditionalProperty("null/users-data0", true)) + .getCatalog().getStreams().get(0).withSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH).withCursorField(null); + + verify(configRepository).writeStandardSync(standardSync); + } + @Test void testFieldSelectionRemoveCursorFails() throws JsonValidationException, ConfigNotFoundException, IOException { // Test that if we try to de-select a field that's being used for the cursor, the request will fail. @@ -365,7 +365,8 @@ void testFieldSelectionRemoveCursorFails() throws JsonValidationException, Confi catalogForUpdate.getStreams().get(0).getConfig() .fieldSelectionEnabled(true) .selectedFields(List.of(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME))) - .cursorField(List.of(SECOND_FIELD_NAME)); + .cursorField(List.of(SECOND_FIELD_NAME)) + .syncMode(SyncMode.INCREMENTAL); final ConnectionUpdate connectionUpdate = new ConnectionUpdate() .connectionId(standardSync.getConnectionId())