diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index c13f98405731..6590140440ac 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -1975,27 +1975,6 @@ paths: $ref: "#/components/schemas/WebBackendConnectionRead" "422": $ref: "#/components/responses/InvalidInputResponse" - /v1/web_backend/connections/updateNew: - post: - operationId: webBackendUpdateConnectionNew - requestBody: - content: - application/json: - schema: - $ref: "#/components/schemas/WebBackendConnectionUpdate" - required: true - responses: - "200": - content: - application/json: - schema: - $ref: "#/components/schemas/WebBackendConnectionRead" - description: Successful operation - "422": - $ref: "#/components/responses/InvalidInputResponse" - summary: Update a connection - tags: - - web_backend /v1/web_backend/connections/search: post: tags: @@ -3355,8 +3334,6 @@ components: $ref: "#/components/schemas/ConnectionStatus" resourceRequirements: $ref: "#/components/schemas/ResourceRequirements" - withRefreshedCatalog: - type: boolean skipReset: type: boolean operations: diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java index b19756aed062..cf6d33b3e58e 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java @@ -822,11 +822,6 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne return execute(() -> webBackendConnectionsHandler.webBackendUpdateConnection(webBackendConnectionUpdate)); } - @Override - public WebBackendConnectionRead webBackendUpdateConnectionNew(final WebBackendConnectionUpdate webBackendConnectionUpdate) { - return execute(() -> webBackendConnectionsHandler.webBackendUpdateConnectionNew(webBackendConnectionUpdate)); - } - @Override public ConnectionStateType getStateType(final ConnectionIdRequestBody connectionIdRequestBody) { return execute(() -> webBackendConnectionsHandler.getStateType(connectionIdRequestBody)); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index 57632ca9eb9e..1ab30300bd4a 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -362,30 +362,6 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne throws ConfigNotFoundException, IOException, JsonValidationException { final List operationIds = updateOperations(webBackendConnectionUpdate); final ConnectionUpdate connectionUpdate = toConnectionUpdate(webBackendConnectionUpdate, operationIds); - - ConnectionRead connectionRead; - final boolean needReset = MoreBooleans.isTruthy(webBackendConnectionUpdate.getWithRefreshedCatalog()); - - connectionRead = connectionsHandler.updateConnection(connectionUpdate); - - if (needReset) { - ManualOperationResult manualOperationResult = eventRunner.synchronousResetConnection( - webBackendConnectionUpdate.getConnectionId(), - // TODO (https://github.com/airbytehq/airbyte/issues/12741): change this to only get new/updated - // streams, instead of all - configRepository.getAllStreamsForConnection(webBackendConnectionUpdate.getConnectionId())); - verifyManualOperationResult(manualOperationResult); - manualOperationResult = eventRunner.startNewManualSync(webBackendConnectionUpdate.getConnectionId()); - verifyManualOperationResult(manualOperationResult); - connectionRead = connectionsHandler.getConnection(connectionUpdate.getConnectionId()); - } - return buildWebBackendConnectionRead(connectionRead); - } - - public WebBackendConnectionRead webBackendUpdateConnectionNew(final WebBackendConnectionUpdate webBackendConnectionUpdate) - throws ConfigNotFoundException, IOException, JsonValidationException { - final List operationIds = updateOperations(webBackendConnectionUpdate); - final ConnectionUpdate connectionUpdate = toConnectionUpdate(webBackendConnectionUpdate, operationIds); final UUID connectionId = webBackendConnectionUpdate.getConnectionId(); final ConfiguredAirbyteCatalog existingConfiguredCatalog = configRepository.getConfiguredCatalogForConnection(connectionId); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 102486f1d773..264b263dd0e5 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -440,7 +440,6 @@ void testToConnectionCreate() throws IOException { assertEquals(expected, actual); } - // TODO: remove withRefreshedCatalog param from this test when param is removed from code @Test void testToConnectionUpdate() throws IOException { final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); @@ -535,6 +534,14 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept .syncCatalog(expected.getSyncCatalog()) .sourceCatalogId(expected.getCatalogId()); + when(configRepository.getConfiguredCatalogForConnection(expected.getConnectionId())) + .thenReturn(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog()); + + final CatalogDiff catalogDiff = new CatalogDiff().transforms(List.of()); + when(connectionsHandler.getDiff(any(), any())).thenReturn(catalogDiff); + final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(expected.getConnectionId()); + when(stateHandler.getState(connectionIdRequestBody)).thenReturn(new ConnectionState().stateType(ConnectionStateType.LEGACY)); + when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn( new ConnectionRead().connectionId(expected.getConnectionId())); when(connectionsHandler.updateConnection(any())).thenReturn( @@ -576,48 +583,6 @@ void testUpdateConnectionWithOperations() throws JsonValidationException, Config .syncCatalog(expected.getSyncCatalog()) .operations(List.of(operationCreateOrUpdate)); - when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn( - new ConnectionRead() - .connectionId(expected.getConnectionId()) - .operationIds(connectionRead.getOperationIds())); - when(connectionsHandler.updateConnection(any())).thenReturn( - new ConnectionRead() - .connectionId(expected.getConnectionId()) - .sourceId(expected.getSourceId()) - .destinationId(expected.getDestinationId()) - .operationIds(connectionRead.getOperationIds()) - .name(expected.getName()) - .namespaceDefinition(expected.getNamespaceDefinition()) - .namespaceFormat(expected.getNamespaceFormat()) - .prefix(expected.getPrefix()) - .syncCatalog(expected.getSyncCatalog()) - .status(expected.getStatus()) - .schedule(expected.getSchedule())); - when(operationsHandler.updateOperation(operationUpdate)).thenReturn(new OperationRead().operationId(operationUpdate.getOperationId())); - when(operationsHandler.listOperationsForConnection(any())).thenReturn(operationReadList); - - final WebBackendConnectionRead actualConnectionRead = wbHandler.webBackendUpdateConnection(updateBody); - - assertEquals(connectionRead.getOperationIds(), actualConnectionRead.getOperationIds()); - verify(operationsHandler, times(1)).updateOperation(operationUpdate); - } - - @Test - void testUpdateConnectionWithOperationsNew() throws JsonValidationException, ConfigNotFoundException, IOException { - final WebBackendOperationCreateOrUpdate operationCreateOrUpdate = new WebBackendOperationCreateOrUpdate() - .name("Test Operation") - .operationId(connectionRead.getOperationIds().get(0)); - final OperationUpdate operationUpdate = WebBackendConnectionsHandler.toOperationUpdate(operationCreateOrUpdate); - final WebBackendConnectionUpdate updateBody = new WebBackendConnectionUpdate() - .namespaceDefinition(expected.getNamespaceDefinition()) - .namespaceFormat(expected.getNamespaceFormat()) - .prefix(expected.getPrefix()) - .connectionId(expected.getConnectionId()) - .schedule(expected.getSchedule()) - .status(expected.getStatus()) - .syncCatalog(expected.getSyncCatalog()) - .operations(List.of(operationCreateOrUpdate)); - when(configRepository.getConfiguredCatalogForConnection(expected.getConnectionId())) .thenReturn(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog()); @@ -646,62 +611,12 @@ void testUpdateConnectionWithOperationsNew() throws JsonValidationException, Con when(operationsHandler.updateOperation(operationUpdate)).thenReturn(new OperationRead().operationId(operationUpdate.getOperationId())); when(operationsHandler.listOperationsForConnection(any())).thenReturn(operationReadList); - final WebBackendConnectionRead actualConnectionRead = wbHandler.webBackendUpdateConnectionNew(updateBody); + final WebBackendConnectionRead actualConnectionRead = wbHandler.webBackendUpdateConnection(updateBody); assertEquals(connectionRead.getOperationIds(), actualConnectionRead.getOperationIds()); verify(operationsHandler, times(1)).updateOperation(operationUpdate); } - // TODO: remove in favor of test below when update endpoint is switched to new endpoint - @Test - void testUpdateConnectionWithUpdatedSchema() throws JsonValidationException, ConfigNotFoundException, IOException { - final WebBackendConnectionUpdate updateBody = new WebBackendConnectionUpdate() - .namespaceDefinition(expected.getNamespaceDefinition()) - .namespaceFormat(expected.getNamespaceFormat()) - .prefix(expected.getPrefix()) - .connectionId(expected.getConnectionId()) - .schedule(expected.getSchedule()) - .status(expected.getStatus()) - .syncCatalog(expectedWithNewSchema.getSyncCatalog()) - .withRefreshedCatalog(true); - - when(operationsHandler.listOperationsForConnection(any())).thenReturn(operationReadList); - when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn( - new ConnectionRead().connectionId(expected.getConnectionId())); - final ConnectionRead connectionRead = new ConnectionRead() - .connectionId(expected.getConnectionId()) - .sourceId(expected.getSourceId()) - .destinationId(expected.getDestinationId()) - .name(expected.getName()) - .namespaceDefinition(expected.getNamespaceDefinition()) - .namespaceFormat(expected.getNamespaceFormat()) - .prefix(expected.getPrefix()) - .syncCatalog(expectedWithNewSchema.getSyncCatalog()) - .status(expected.getStatus()) - .schedule(expected.getSchedule()); - when(connectionsHandler.updateConnection(any())).thenReturn(connectionRead); - when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(connectionRead); - - final List connectionStreams = List.of(ConnectionHelpers.STREAM_DESCRIPTOR); - when(configRepository.getAllStreamsForConnection(expected.getConnectionId())).thenReturn(connectionStreams); - - final ManualOperationResult successfulResult = ManualOperationResult.builder().jobId(Optional.empty()).failingReason(Optional.empty()).build(); - when(eventRunner.synchronousResetConnection(any(), any())).thenReturn(successfulResult); - when(eventRunner.startNewManualSync(any())).thenReturn(successfulResult); - - final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody); - - assertEquals(expectedWithNewSchema.getSyncCatalog(), result.getSyncCatalog()); - - final ConnectionIdRequestBody connectionId = new ConnectionIdRequestBody().connectionId(result.getConnectionId()); - verify(schedulerHandler, times(0)).resetConnection(connectionId); - verify(schedulerHandler, times(0)).syncConnection(connectionId); - verify(connectionsHandler, times(1)).updateConnection(any()); - final InOrder orderVerifier = inOrder(eventRunner); - orderVerifier.verify(eventRunner, times(1)).synchronousResetConnection(connectionId.getConnectionId(), connectionStreams); - orderVerifier.verify(eventRunner, times(1)).startNewManualSync(connectionId.getConnectionId()); - } - @Test void testUpdateConnectionWithUpdatedSchemaLegacy() throws JsonValidationException, ConfigNotFoundException, IOException { final WebBackendConnectionUpdate updateBody = new WebBackendConnectionUpdate() @@ -750,7 +665,7 @@ void testUpdateConnectionWithUpdatedSchemaLegacy() throws JsonValidationExceptio when(eventRunner.synchronousResetConnection(any(), any())).thenReturn(successfulResult); when(eventRunner.startNewManualSync(any())).thenReturn(successfulResult); - final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnectionNew(updateBody); + final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody); assertEquals(expectedWithNewSchema.getSyncCatalog(), result.getSyncCatalog()); @@ -816,7 +731,7 @@ void testUpdateConnectionWithUpdatedSchemaPerStream() throws JsonValidationExcep when(eventRunner.synchronousResetConnection(any(), any())).thenReturn(successfulResult); when(eventRunner.startNewManualSync(any())).thenReturn(successfulResult); - final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnectionNew(updateBody); + final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody); assertEquals(expectedWithNewSchema.getSyncCatalog(), result.getSyncCatalog()); @@ -870,7 +785,7 @@ void testUpdateConnectionNoStreamsToReset() throws JsonValidationException, Conf when(connectionsHandler.updateConnection(any())).thenReturn(connectionRead); when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(connectionRead); - final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnectionNew(updateBody); + final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody); assertEquals(expectedWithNewSchema.getSyncCatalog(), result.getSyncCatalog()); @@ -915,7 +830,7 @@ void testUpdateConnectionWithSkipReset() throws JsonValidationException, ConfigN .schedule(expected.getSchedule()); when(connectionsHandler.updateConnection(any())).thenReturn(connectionRead); - final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnectionNew(updateBody); + final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody); assertEquals(expectedWithNewSchema.getSyncCatalog(), result.getSyncCatalog()); diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java index c85f034e1a11..4edc4ba508d6 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java @@ -876,7 +876,6 @@ public WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connection .sourceCatalogId(connection.getSourceCatalogId()) .status(connection.getStatus()) .prefix(connection.getPrefix()) - .withRefreshedCatalog(true) .skipReset(false); } diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index 4e31dd6de047..f4446bf06531 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -934,7 +934,7 @@ void testSyncAfterUpgradeToPerStreamStateWithNoNewData(final TestInfo testInfo) } @Test - void testResetAllWhenSchemaIsModified() throws Exception { + void testResetAllWhenSchemaIsModifiedForLegacySource() throws Exception { final String sourceTable1 = "test_table1"; final String sourceTable2 = "test_table2"; final String sourceTable3 = "test_table3"; @@ -954,114 +954,131 @@ void testResetAllWhenSchemaIsModified() throws Exception { return null; }); - final UUID sourceId = testHarness.createPostgresSource().getSourceId(); + final SourceRead source = testHarness.createPostgresSource(); + final UUID sourceId = source.getSourceId(); + final UUID sourceDefinitionId = source.getSourceDefinitionId(); final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); final UUID destinationId = testHarness.createPostgresDestination().getDestinationId(); final OperationRead operation = testHarness.createOperation(); final String name = "test_reset_when_schema_is_modified_" + UUID.randomUUID(); - LOGGER.info("Discovered catalog: {}", catalog); - - final ConnectionRead connection = - testHarness.createConnection(name, sourceId, destinationId, List.of(operation.getOperationId()), catalog, null); - LOGGER.info("Created Connection: {}", connection); - - sourceDb.query(ctx -> { - prettyPrintTables(ctx, sourceTable1, sourceTable2); - return null; - }); - - // Run initial sync - LOGGER.info("Running initial sync"); - final JobInfoRead syncRead = - apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connection.getConnectionId())); - waitForSuccessfulJob(apiClient.getJobsApi(), syncRead.getJob()); - - // Some inspection for debug - destDb.query(ctx -> { - prettyPrintTables(ctx, outputPrefix + sourceTable1, outputPrefix + sourceTable2); - return null; - }); - final ConnectionState initSyncState = - apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connection.getConnectionId())); - LOGGER.info("ConnectionState after the initial sync: " + initSyncState.toString()); - - testHarness.assertSourceAndDestinationDbInSync(false); - - // Patch some data in the source - LOGGER.info("Modifying source tables"); - sourceDb.query(ctx -> { - // Adding a new rows to make sure we sync more data. - ctx.insertInto(DSL.table(sourceTable1)).columns(DSL.field(NAME)).values("alice").execute(); - ctx.insertInto(DSL.table(sourceTable2)).columns(DSL.field(VALUE)).values("v3").execute(); - - // The removed rows should no longer be in the destination since we expect a full reset - ctx.deleteFrom(DSL.table(sourceTable1)).where(DSL.field(NAME).eq("john")).execute(); - ctx.deleteFrom(DSL.table(sourceTable2)).where(DSL.field(VALUE).eq("v2")).execute(); - - // Adding a new table to trigger reset from the update connection API - ctx.createTableIfNotExists(sourceTable3).columns(DSL.field(LOCATION, SQLDataType.VARCHAR)).execute(); - ctx.truncate(sourceTable3).execute(); - ctx.insertInto(DSL.table(sourceTable3)).columns(DSL.field(LOCATION)).values("home").execute(); - ctx.insertInto(DSL.table(sourceTable3)).columns(DSL.field(LOCATION)).values("work").execute(); - ctx.insertInto(DSL.table(sourceTable3)).columns(DSL.field(LOCATION)).values("space").execute(); - return null; - }); - - final AirbyteCatalog updatedCatalog = testHarness.discoverSourceSchemaWithoutCache(sourceId); - LOGGER.info("Discovered updated catalog: {}", updatedCatalog); - - // Update with refreshed catalog - LOGGER.info("Submit the update request"); - final WebBackendConnectionUpdate update = new WebBackendConnectionUpdate() - .name(connection.getName()) - .connectionId(connection.getConnectionId()) - .namespaceDefinition(connection.getNamespaceDefinition()) - .namespaceFormat(connection.getNamespaceFormat()) - .prefix(connection.getPrefix()) - .operations(List.of( - new WebBackendOperationCreateOrUpdate() - .name(operation.getName()) - .operationId(operation.getOperationId()) - .workspaceId(operation.getWorkspaceId()) - .operatorConfiguration(operation.getOperatorConfiguration()))) - .syncCatalog(updatedCatalog) - .schedule(connection.getSchedule()) - .sourceCatalogId(connection.getSourceCatalogId()) - .status(connection.getStatus()) - .resourceRequirements(connection.getResourceRequirements()) - .withRefreshedCatalog(true); - webBackendApi.webBackendUpdateConnection(update); - - LOGGER.info("Inspecting Destination DB after the update request, tables should be empty"); - destDb.query(ctx -> { - prettyPrintTables(ctx, outputPrefix + sourceTable1, outputPrefix + sourceTable2); - return null; - }); - final ConnectionState postResetState = - apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connection.getConnectionId())); - LOGGER.info("ConnectionState after the update request: {}", postResetState.toString()); - - // Wait until the sync from the UpdateConnection is finished - final JobRead syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connection.getConnectionId()); - LOGGER.info("Generated SyncJob config: {}", syncFromTheUpdate.toString()); - waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); - - final ConnectionState postUpdateState = - apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connection.getConnectionId())); - LOGGER.info("ConnectionState after the final sync: {}", postUpdateState.toString()); - - LOGGER.info("Inspecting DBs After the final sync"); - sourceDb.query(ctx -> { - prettyPrintTables(ctx, sourceTable1, sourceTable2, sourceTable3); - return null; - }); - destDb.query(ctx -> { - prettyPrintTables(ctx, outputPrefix + sourceTable1, outputPrefix + sourceTable2, outputPrefix + sourceTable3); - return null; - }); + // Fetch the current/most recent source definition version + final SourceDefinitionRead sourceDefinitionRead = + apiClient.getSourceDefinitionApi().getSourceDefinition(new SourceDefinitionIdRequestBody().sourceDefinitionId(sourceDefinitionId)); + final String currentSourceDefinitionVersion = sourceDefinitionRead.getDockerImageTag(); - testHarness.assertSourceAndDestinationDbInSync(false); + try { + // Set the source to a version that does not support per-stream state + LOGGER.info("Setting source connector to pre-per-stream state version {}...", + AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION); + testHarness.updateSourceDefinitionVersion(sourceDefinitionId, AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION); + + LOGGER.info("Discovered catalog: {}", catalog); + + final ConnectionRead connection = + testHarness.createConnection(name, sourceId, destinationId, List.of(operation.getOperationId()), catalog, null); + LOGGER.info("Created Connection: {}", connection); + + sourceDb.query(ctx -> { + prettyPrintTables(ctx, sourceTable1, sourceTable2); + return null; + }); + + // Run initial sync + LOGGER.info("Running initial sync"); + final JobInfoRead syncRead = + apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connection.getConnectionId())); + waitForSuccessfulJob(apiClient.getJobsApi(), syncRead.getJob()); + + // Some inspection for debug + destDb.query(ctx -> { + prettyPrintTables(ctx, outputPrefix + sourceTable1, outputPrefix + sourceTable2); + return null; + }); + final ConnectionState initSyncState = + apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connection.getConnectionId())); + LOGGER.info("ConnectionState after the initial sync: " + initSyncState.toString()); + + testHarness.assertSourceAndDestinationDbInSync(false); + + // Patch some data in the source + LOGGER.info("Modifying source tables"); + sourceDb.query(ctx -> { + // Adding a new rows to make sure we sync more data. + ctx.insertInto(DSL.table(sourceTable1)).columns(DSL.field(NAME)).values("alice").execute(); + ctx.insertInto(DSL.table(sourceTable2)).columns(DSL.field(VALUE)).values("v3").execute(); + + // The removed rows should no longer be in the destination since we expect a full reset + ctx.deleteFrom(DSL.table(sourceTable1)).where(DSL.field(NAME).eq("john")).execute(); + ctx.deleteFrom(DSL.table(sourceTable2)).where(DSL.field(VALUE).eq("v2")).execute(); + + // Adding a new table to trigger reset from the update connection API + ctx.createTableIfNotExists(sourceTable3).columns(DSL.field(LOCATION, SQLDataType.VARCHAR)).execute(); + ctx.truncate(sourceTable3).execute(); + ctx.insertInto(DSL.table(sourceTable3)).columns(DSL.field(LOCATION)).values("home").execute(); + ctx.insertInto(DSL.table(sourceTable3)).columns(DSL.field(LOCATION)).values("work").execute(); + ctx.insertInto(DSL.table(sourceTable3)).columns(DSL.field(LOCATION)).values("space").execute(); + return null; + }); + + final AirbyteCatalog updatedCatalog = testHarness.discoverSourceSchemaWithoutCache(sourceId); + LOGGER.info("Discovered updated catalog: {}", updatedCatalog); + + // Update with refreshed catalog + LOGGER.info("Submit the update request"); + final WebBackendConnectionUpdate update = new WebBackendConnectionUpdate() + .name(connection.getName()) + .connectionId(connection.getConnectionId()) + .namespaceDefinition(connection.getNamespaceDefinition()) + .namespaceFormat(connection.getNamespaceFormat()) + .prefix(connection.getPrefix()) + .operations(List.of( + new WebBackendOperationCreateOrUpdate() + .name(operation.getName()) + .operationId(operation.getOperationId()) + .workspaceId(operation.getWorkspaceId()) + .operatorConfiguration(operation.getOperatorConfiguration()))) + .syncCatalog(updatedCatalog) + .schedule(connection.getSchedule()) + .sourceCatalogId(connection.getSourceCatalogId()) + .status(connection.getStatus()) + .resourceRequirements(connection.getResourceRequirements()); + webBackendApi.webBackendUpdateConnection(update); + + LOGGER.info("Inspecting Destination DB after the update request, tables should be empty"); + destDb.query(ctx -> { + prettyPrintTables(ctx, outputPrefix + sourceTable1, outputPrefix + sourceTable2); + return null; + }); + final ConnectionState postResetState = + apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connection.getConnectionId())); + LOGGER.info("ConnectionState after the update request: {}", postResetState.toString()); + + // Wait until the sync from the UpdateConnection is finished + final JobRead syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connection.getConnectionId()); + LOGGER.info("Generated SyncJob config: {}", syncFromTheUpdate.toString()); + waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); + + final ConnectionState postUpdateState = + apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connection.getConnectionId())); + LOGGER.info("ConnectionState after the final sync: {}", postUpdateState.toString()); + + LOGGER.info("Inspecting DBs After the final sync"); + sourceDb.query(ctx -> { + prettyPrintTables(ctx, sourceTable1, sourceTable2, sourceTable3); + return null; + }); + destDb.query(ctx -> { + prettyPrintTables(ctx, outputPrefix + sourceTable1, outputPrefix + sourceTable2, outputPrefix + sourceTable3); + return null; + }); + + testHarness.assertSourceAndDestinationDbInSync(false); + } finally { + // Set source back to version it was set to at beginning of test + testHarness.updateSourceDefinitionVersion(sourceDefinitionId, currentSourceDefinitionVersion); + LOGGER.info("Set source connector back to per-stream state supported version {}.", currentSourceDefinitionVersion); + } } private void prettyPrintTables(final DSLContext ctx, final String... tables) { @@ -1236,7 +1253,7 @@ void testPartialResetResetAllWhenSchemaIsModified(final TestInfo testInfo) throw // Update with refreshed catalog AirbyteCatalog refreshedCatalog = testHarness.discoverSourceSchemaWithoutCache(sourceId); WebBackendConnectionUpdate update = testHarness.getUpdateInput(connection, refreshedCatalog, operation); - webBackendApi.webBackendUpdateConnectionNew(update); + webBackendApi.webBackendUpdateConnection(update); // Wait until the sync from the UpdateConnection is finished JobRead syncFromTheUpdate = waitUntilTheNextJobIsStarted(connection.getConnectionId()); @@ -1265,7 +1282,7 @@ void testPartialResetResetAllWhenSchemaIsModified(final TestInfo testInfo) throw sourceId = testHarness.createPostgresSource().getSourceId(); refreshedCatalog = testHarness.discoverSourceSchema(sourceId); update = testHarness.getUpdateInput(connection, refreshedCatalog, operation); - webBackendApi.webBackendUpdateConnectionNew(update); + webBackendApi.webBackendUpdateConnection(update); syncFromTheUpdate = waitUntilTheNextJobIsStarted(connection.getConnectionId()); waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); @@ -1296,7 +1313,7 @@ void testPartialResetResetAllWhenSchemaIsModified(final TestInfo testInfo) throw sourceId = testHarness.createPostgresSource().getSourceId(); refreshedCatalog = testHarness.discoverSourceSchema(sourceId); update = testHarness.getUpdateInput(connection, refreshedCatalog, operation); - webBackendApi.webBackendUpdateConnectionNew(update); + webBackendApi.webBackendUpdateConnection(update); syncFromTheUpdate = waitUntilTheNextJobIsStarted(connection.getConnectionId()); waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index d988bfd8016c..bb2bd5c349d8 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -345,7 +345,7 @@ void testPartialResetFromSchemaUpdate(final TestInfo testInfo) throws Exception final AirbyteCatalog refreshedCatalog = testHarness.discoverSourceSchema(sourceId); LOGGER.info("Refreshed catalog: {}", refreshedCatalog); final WebBackendConnectionUpdate update = testHarness.getUpdateInput(connectionRead, refreshedCatalog, operationRead); - webBackendApi.webBackendUpdateConnectionNew(update); + webBackendApi.webBackendUpdateConnection(update); LOGGER.info("Waiting for sync job after update to complete"); final JobRead syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connectionId); @@ -392,7 +392,7 @@ void testPartialResetFromStreamSelection(final TestInfo testInfo) throws Excepti catalog.setStreams(updatedStreams); LOGGER.info("Updated catalog: {}", catalog); WebBackendConnectionUpdate update = testHarness.getUpdateInput(connectionRead, catalog, operationRead); - webBackendApi.webBackendUpdateConnectionNew(update); + webBackendApi.webBackendUpdateConnection(update); LOGGER.info("Waiting for sync job after update to start"); JobRead syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connectionId); @@ -407,7 +407,7 @@ void testPartialResetFromStreamSelection(final TestInfo testInfo) throws Excepti catalog = testHarness.discoverSourceSchema(sourceId); LOGGER.info("Updated catalog: {}", catalog); update = testHarness.getUpdateInput(connectionRead, catalog, operationRead); - webBackendApi.webBackendUpdateConnectionNew(update); + webBackendApi.webBackendUpdateConnection(update); LOGGER.info("Waiting for sync job after update to start"); syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connectionId); diff --git a/airbyte-webapp-e2e-tests/cypress/integration/connection.spec.ts b/airbyte-webapp-e2e-tests/cypress/integration/connection.spec.ts index 9050fed19b91..ec2df3dab4be 100644 --- a/airbyte-webapp-e2e-tests/cypress/integration/connection.spec.ts +++ b/airbyte-webapp-e2e-tests/cypress/integration/connection.spec.ts @@ -17,7 +17,7 @@ describe("Connection main actions", () => { }); it("Update connection", () => { - cy.intercept("/api/v1/web_backend/connections/updateNew").as("updateConnection"); + cy.intercept("/api/v1/web_backend/connections/update").as("updateConnection"); createTestConnection("Test update connection source cypress", "Test update connection destination cypress"); diff --git a/airbyte-webapp/src/core/domain/connection/WebBackendConnectionService.ts b/airbyte-webapp/src/core/domain/connection/WebBackendConnectionService.ts index a8702ce251a7..bfd3f6c9b050 100644 --- a/airbyte-webapp/src/core/domain/connection/WebBackendConnectionService.ts +++ b/airbyte-webapp/src/core/domain/connection/WebBackendConnectionService.ts @@ -4,7 +4,7 @@ import { webBackendCreateConnection, webBackendGetConnection, webBackendListConnectionsForWorkspace, - webBackendUpdateConnectionNew as webBackendUpdateConnection, + webBackendUpdateConnection, } from "../../request/AirbyteClient"; import { AirbyteRequestService } from "../../request/AirbyteRequestService"; diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index 1096c65e61ef..7aea937a743f 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -369,7 +369,6 @@

WebBackend

  • post /v1/web_backend/connections/list
  • post /v1/web_backend/connections/search
  • post /v1/web_backend/connections/update
  • -
  • post /v1/web_backend/connections/updateNew
  • Workspace

      @@ -9858,215 +9857,6 @@

      422

      InvalidInputExceptionInfo
      -
      -
      - Up -
      post /v1/web_backend/connections/updateNew
      -
      Update a connection (webBackendUpdateConnectionNew)
      -
      - - -

      Consumes

      - This API call consumes the following media types via the Content-Type request header: -
        -
      • application/json
      • -
      - -

      Request body

      -
      -
      WebBackendConnectionUpdate WebBackendConnectionUpdate (required)
      - -
      Body Parameter
      - -
      - - - - -

      Return type

      - - - - -

      Example data

      -
      Content-Type: application/json
      -
      {
      -  "sourceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
      -  "latestSyncJobCreatedAt" : 0,
      -  "prefix" : "prefix",
      -  "destination" : {
      -    "connectionConfiguration" : {
      -      "user" : "charles"
      -    },
      -    "destinationName" : "destinationName",
      -    "name" : "name",
      -    "destinationDefinitionId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
      -    "destinationId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
      -    "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91"
      -  },
      -  "isSyncing" : true,
      -  "source" : {
      -    "sourceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
      -    "connectionConfiguration" : {
      -      "user" : "charles"
      -    },
      -    "name" : "name",
      -    "sourceDefinitionId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
      -    "sourceName" : "sourceName",
      -    "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91"
      -  },
      -  "destinationId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
      -  "catalogDiff" : {
      -    "transforms" : [ {
      -      "streamDescriptor" : {
      -        "name" : "name",
      -        "namespace" : "namespace"
      -      },
      -      "transformType" : "add_stream",
      -      "updateStream" : [ {
      -        "updateFieldSchema" : { },
      -        "fieldName" : [ "fieldName", "fieldName" ],
      -        "addField" : { },
      -        "transformType" : "add_field",
      -        "removeField" : { }
      -      }, {
      -        "updateFieldSchema" : { },
      -        "fieldName" : [ "fieldName", "fieldName" ],
      -        "addField" : { },
      -        "transformType" : "add_field",
      -        "removeField" : { }
      -      } ]
      -    }, {
      -      "streamDescriptor" : {
      -        "name" : "name",
      -        "namespace" : "namespace"
      -      },
      -      "transformType" : "add_stream",
      -      "updateStream" : [ {
      -        "updateFieldSchema" : { },
      -        "fieldName" : [ "fieldName", "fieldName" ],
      -        "addField" : { },
      -        "transformType" : "add_field",
      -        "removeField" : { }
      -      }, {
      -        "updateFieldSchema" : { },
      -        "fieldName" : [ "fieldName", "fieldName" ],
      -        "addField" : { },
      -        "transformType" : "add_field",
      -        "removeField" : { }
      -      } ]
      -    } ]
      -  },
      -  "resourceRequirements" : {
      -    "cpu_limit" : "cpu_limit",
      -    "memory_request" : "memory_request",
      -    "memory_limit" : "memory_limit",
      -    "cpu_request" : "cpu_request"
      -  },
      -  "schedule" : {
      -    "units" : 0,
      -    "timeUnit" : "minutes"
      -  },
      -  "operations" : [ {
      -    "name" : "name",
      -    "operationId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
      -    "operatorConfiguration" : {
      -      "normalization" : {
      -        "option" : "basic"
      -      },
      -      "dbt" : {
      -        "gitRepoBranch" : "gitRepoBranch",
      -        "dockerImage" : "dockerImage",
      -        "dbtArguments" : "dbtArguments",
      -        "gitRepoUrl" : "gitRepoUrl"
      -      }
      -    },
      -    "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91"
      -  }, {
      -    "name" : "name",
      -    "operationId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
      -    "operatorConfiguration" : {
      -      "normalization" : {
      -        "option" : "basic"
      -      },
      -      "dbt" : {
      -        "gitRepoBranch" : "gitRepoBranch",
      -        "dockerImage" : "dockerImage",
      -        "dbtArguments" : "dbtArguments",
      -        "gitRepoUrl" : "gitRepoUrl"
      -      }
      -    },
      -    "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91"
      -  } ],
      -  "catalogId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
      -  "name" : "name",
      -  "syncCatalog" : {
      -    "streams" : [ {
      -      "stream" : {
      -        "sourceDefinedPrimaryKey" : [ [ "sourceDefinedPrimaryKey", "sourceDefinedPrimaryKey" ], [ "sourceDefinedPrimaryKey", "sourceDefinedPrimaryKey" ] ],
      -        "supportedSyncModes" : [ null, null ],
      -        "sourceDefinedCursor" : true,
      -        "name" : "name",
      -        "namespace" : "namespace",
      -        "defaultCursorField" : [ "defaultCursorField", "defaultCursorField" ]
      -      },
      -      "config" : {
      -        "aliasName" : "aliasName",
      -        "cursorField" : [ "cursorField", "cursorField" ],
      -        "selected" : true,
      -        "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ]
      -      }
      -    }, {
      -      "stream" : {
      -        "sourceDefinedPrimaryKey" : [ [ "sourceDefinedPrimaryKey", "sourceDefinedPrimaryKey" ], [ "sourceDefinedPrimaryKey", "sourceDefinedPrimaryKey" ] ],
      -        "supportedSyncModes" : [ null, null ],
      -        "sourceDefinedCursor" : true,
      -        "name" : "name",
      -        "namespace" : "namespace",
      -        "defaultCursorField" : [ "defaultCursorField", "defaultCursorField" ]
      -      },
      -      "config" : {
      -        "aliasName" : "aliasName",
      -        "cursorField" : [ "cursorField", "cursorField" ],
      -        "selected" : true,
      -        "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ]
      -      }
      -    } ]
      -  },
      -  "connectionId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
      -  "namespaceFormat" : "${SOURCE_NAMESPACE}",
      -  "operationIds" : [ null, null ],
      -  "scheduleData" : {
      -    "cron" : {
      -      "cronExpression" : "cronExpression",
      -      "cronTimeZone" : "cronTimeZone"
      -    },
      -    "basicSchedule" : {
      -      "units" : 6,
      -      "timeUnit" : "minutes"
      -    }
      -  }
      -}
      - -

      Produces

      - This API call produces the following media types according to the Accept request header; - the media type will be conveyed by the Content-Type response header. -
        -
      • application/json
      • -
      - -

      Responses

      -

      200

      - Successful operation - WebBackendConnectionRead -

      422

      - Input failed validation - InvalidInputExceptionInfo -
      -

      Workspace

      @@ -12243,7 +12033,6 @@

      WebBackendConnectionUpdate
      scheduleData (optional)
      status
      resourceRequirements (optional)
      -
      withRefreshedCatalog (optional)
      skipReset (optional)
      operations (optional)
      sourceCatalogId (optional)
      UUID format: uuid
      diff --git a/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml b/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml index 4769414af322..89947e53fe5c 100644 --- a/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml +++ b/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml @@ -1260,27 +1260,6 @@ paths: summary: Update a connection tags: - web_backend - /v1/web_backend/connections/updateNew: - post: - operationId: webBackendUpdateConnectionNew - requestBody: - content: - application/json: - schema: - $ref: "#/components/schemas/WebBackendConnectionUpdate" - required: true - responses: - "200": - content: - application/json: - schema: - $ref: "#/components/schemas/WebBackendConnectionRead" - description: Successful operation - "422": - $ref: "#/components/responses/InvalidInputResponse" - summary: Update a connection - tags: - - web_backend /v1/web_backend/destinations/recreate: post: operationId: webBackendRecreateDestination @@ -2760,8 +2739,6 @@ components: $ref: "#/components/schemas/ConnectionStatus" syncCatalog: $ref: "#/components/schemas/AirbyteCatalog" - withRefreshedCatalog: - type: boolean skipReset: type: boolean required: