Skip to content

Commit

Permalink
Move 'updateNew' logic into 'update', and remove 'updateNew' (#15863)
Browse files Browse the repository at this point in the history
* save

* clean up more usages and remove withRefreshedCatalog

* make webapp use correct endpoint

* add back intercept

* fix acceptance test

* fix log

* remove 'new' from test name
  • Loading branch information
lmossman authored and rodireich committed Aug 25, 2022
1 parent 95a1de3 commit bc8a609
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 496 deletions.
23 changes: 0 additions & 23 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1975,27 +1975,6 @@ paths:
$ref: "#/components/schemas/WebBackendConnectionRead"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/web_backend/connections/updateNew:
post:
operationId: webBackendUpdateConnectionNew
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/WebBackendConnectionUpdate"
required: true
responses:
"200":
content:
application/json:
schema:
$ref: "#/components/schemas/WebBackendConnectionRead"
description: Successful operation
"422":
$ref: "#/components/responses/InvalidInputResponse"
summary: Update a connection
tags:
- web_backend
/v1/web_backend/connections/search:
post:
tags:
Expand Down Expand Up @@ -3355,8 +3334,6 @@ components:
$ref: "#/components/schemas/ConnectionStatus"
resourceRequirements:
$ref: "#/components/schemas/ResourceRequirements"
withRefreshedCatalog:
type: boolean
skipReset:
type: boolean
operations:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,11 +822,6 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
return execute(() -> webBackendConnectionsHandler.webBackendUpdateConnection(webBackendConnectionUpdate));
}

@Override
public WebBackendConnectionRead webBackendUpdateConnectionNew(final WebBackendConnectionUpdate webBackendConnectionUpdate) {
return execute(() -> webBackendConnectionsHandler.webBackendUpdateConnectionNew(webBackendConnectionUpdate));
}

@Override
public ConnectionStateType getStateType(final ConnectionIdRequestBody connectionIdRequestBody) {
return execute(() -> webBackendConnectionsHandler.getStateType(connectionIdRequestBody));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,30 +362,6 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
throws ConfigNotFoundException, IOException, JsonValidationException {
final List<UUID> operationIds = updateOperations(webBackendConnectionUpdate);
final ConnectionUpdate connectionUpdate = toConnectionUpdate(webBackendConnectionUpdate, operationIds);

ConnectionRead connectionRead;
final boolean needReset = MoreBooleans.isTruthy(webBackendConnectionUpdate.getWithRefreshedCatalog());

connectionRead = connectionsHandler.updateConnection(connectionUpdate);

if (needReset) {
ManualOperationResult manualOperationResult = eventRunner.synchronousResetConnection(
webBackendConnectionUpdate.getConnectionId(),
// TODO (https://github.com/airbytehq/airbyte/issues/12741): change this to only get new/updated
// streams, instead of all
configRepository.getAllStreamsForConnection(webBackendConnectionUpdate.getConnectionId()));
verifyManualOperationResult(manualOperationResult);
manualOperationResult = eventRunner.startNewManualSync(webBackendConnectionUpdate.getConnectionId());
verifyManualOperationResult(manualOperationResult);
connectionRead = connectionsHandler.getConnection(connectionUpdate.getConnectionId());
}
return buildWebBackendConnectionRead(connectionRead);
}

public WebBackendConnectionRead webBackendUpdateConnectionNew(final WebBackendConnectionUpdate webBackendConnectionUpdate)
throws ConfigNotFoundException, IOException, JsonValidationException {
final List<UUID> operationIds = updateOperations(webBackendConnectionUpdate);
final ConnectionUpdate connectionUpdate = toConnectionUpdate(webBackendConnectionUpdate, operationIds);
final UUID connectionId = webBackendConnectionUpdate.getConnectionId();
final ConfiguredAirbyteCatalog existingConfiguredCatalog =
configRepository.getConfiguredCatalogForConnection(connectionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,6 @@ void testToConnectionCreate() throws IOException {
assertEquals(expected, actual);
}

// TODO: remove withRefreshedCatalog param from this test when param is removed from code
@Test
void testToConnectionUpdate() throws IOException {
final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID());
Expand Down Expand Up @@ -535,6 +534,14 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept
.syncCatalog(expected.getSyncCatalog())
.sourceCatalogId(expected.getCatalogId());

when(configRepository.getConfiguredCatalogForConnection(expected.getConnectionId()))
.thenReturn(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog());

final CatalogDiff catalogDiff = new CatalogDiff().transforms(List.of());
when(connectionsHandler.getDiff(any(), any())).thenReturn(catalogDiff);
final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(expected.getConnectionId());
when(stateHandler.getState(connectionIdRequestBody)).thenReturn(new ConnectionState().stateType(ConnectionStateType.LEGACY));

when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(
new ConnectionRead().connectionId(expected.getConnectionId()));
when(connectionsHandler.updateConnection(any())).thenReturn(
Expand Down Expand Up @@ -576,48 +583,6 @@ void testUpdateConnectionWithOperations() throws JsonValidationException, Config
.syncCatalog(expected.getSyncCatalog())
.operations(List.of(operationCreateOrUpdate));

when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(
new ConnectionRead()
.connectionId(expected.getConnectionId())
.operationIds(connectionRead.getOperationIds()));
when(connectionsHandler.updateConnection(any())).thenReturn(
new ConnectionRead()
.connectionId(expected.getConnectionId())
.sourceId(expected.getSourceId())
.destinationId(expected.getDestinationId())
.operationIds(connectionRead.getOperationIds())
.name(expected.getName())
.namespaceDefinition(expected.getNamespaceDefinition())
.namespaceFormat(expected.getNamespaceFormat())
.prefix(expected.getPrefix())
.syncCatalog(expected.getSyncCatalog())
.status(expected.getStatus())
.schedule(expected.getSchedule()));
when(operationsHandler.updateOperation(operationUpdate)).thenReturn(new OperationRead().operationId(operationUpdate.getOperationId()));
when(operationsHandler.listOperationsForConnection(any())).thenReturn(operationReadList);

final WebBackendConnectionRead actualConnectionRead = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(connectionRead.getOperationIds(), actualConnectionRead.getOperationIds());
verify(operationsHandler, times(1)).updateOperation(operationUpdate);
}

@Test
void testUpdateConnectionWithOperationsNew() throws JsonValidationException, ConfigNotFoundException, IOException {
final WebBackendOperationCreateOrUpdate operationCreateOrUpdate = new WebBackendOperationCreateOrUpdate()
.name("Test Operation")
.operationId(connectionRead.getOperationIds().get(0));
final OperationUpdate operationUpdate = WebBackendConnectionsHandler.toOperationUpdate(operationCreateOrUpdate);
final WebBackendConnectionUpdate updateBody = new WebBackendConnectionUpdate()
.namespaceDefinition(expected.getNamespaceDefinition())
.namespaceFormat(expected.getNamespaceFormat())
.prefix(expected.getPrefix())
.connectionId(expected.getConnectionId())
.schedule(expected.getSchedule())
.status(expected.getStatus())
.syncCatalog(expected.getSyncCatalog())
.operations(List.of(operationCreateOrUpdate));

when(configRepository.getConfiguredCatalogForConnection(expected.getConnectionId()))
.thenReturn(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog());

Expand Down Expand Up @@ -646,62 +611,12 @@ void testUpdateConnectionWithOperationsNew() throws JsonValidationException, Con
when(operationsHandler.updateOperation(operationUpdate)).thenReturn(new OperationRead().operationId(operationUpdate.getOperationId()));
when(operationsHandler.listOperationsForConnection(any())).thenReturn(operationReadList);

final WebBackendConnectionRead actualConnectionRead = wbHandler.webBackendUpdateConnectionNew(updateBody);
final WebBackendConnectionRead actualConnectionRead = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(connectionRead.getOperationIds(), actualConnectionRead.getOperationIds());
verify(operationsHandler, times(1)).updateOperation(operationUpdate);
}

// TODO: remove in favor of test below when update endpoint is switched to new endpoint
@Test
void testUpdateConnectionWithUpdatedSchema() throws JsonValidationException, ConfigNotFoundException, IOException {
final WebBackendConnectionUpdate updateBody = new WebBackendConnectionUpdate()
.namespaceDefinition(expected.getNamespaceDefinition())
.namespaceFormat(expected.getNamespaceFormat())
.prefix(expected.getPrefix())
.connectionId(expected.getConnectionId())
.schedule(expected.getSchedule())
.status(expected.getStatus())
.syncCatalog(expectedWithNewSchema.getSyncCatalog())
.withRefreshedCatalog(true);

when(operationsHandler.listOperationsForConnection(any())).thenReturn(operationReadList);
when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(
new ConnectionRead().connectionId(expected.getConnectionId()));
final ConnectionRead connectionRead = new ConnectionRead()
.connectionId(expected.getConnectionId())
.sourceId(expected.getSourceId())
.destinationId(expected.getDestinationId())
.name(expected.getName())
.namespaceDefinition(expected.getNamespaceDefinition())
.namespaceFormat(expected.getNamespaceFormat())
.prefix(expected.getPrefix())
.syncCatalog(expectedWithNewSchema.getSyncCatalog())
.status(expected.getStatus())
.schedule(expected.getSchedule());
when(connectionsHandler.updateConnection(any())).thenReturn(connectionRead);
when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(connectionRead);

final List<io.airbyte.protocol.models.StreamDescriptor> connectionStreams = List.of(ConnectionHelpers.STREAM_DESCRIPTOR);
when(configRepository.getAllStreamsForConnection(expected.getConnectionId())).thenReturn(connectionStreams);

final ManualOperationResult successfulResult = ManualOperationResult.builder().jobId(Optional.empty()).failingReason(Optional.empty()).build();
when(eventRunner.synchronousResetConnection(any(), any())).thenReturn(successfulResult);
when(eventRunner.startNewManualSync(any())).thenReturn(successfulResult);

final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(expectedWithNewSchema.getSyncCatalog(), result.getSyncCatalog());

final ConnectionIdRequestBody connectionId = new ConnectionIdRequestBody().connectionId(result.getConnectionId());
verify(schedulerHandler, times(0)).resetConnection(connectionId);
verify(schedulerHandler, times(0)).syncConnection(connectionId);
verify(connectionsHandler, times(1)).updateConnection(any());
final InOrder orderVerifier = inOrder(eventRunner);
orderVerifier.verify(eventRunner, times(1)).synchronousResetConnection(connectionId.getConnectionId(), connectionStreams);
orderVerifier.verify(eventRunner, times(1)).startNewManualSync(connectionId.getConnectionId());
}

@Test
void testUpdateConnectionWithUpdatedSchemaLegacy() throws JsonValidationException, ConfigNotFoundException, IOException {
final WebBackendConnectionUpdate updateBody = new WebBackendConnectionUpdate()
Expand Down Expand Up @@ -750,7 +665,7 @@ void testUpdateConnectionWithUpdatedSchemaLegacy() throws JsonValidationExceptio
when(eventRunner.synchronousResetConnection(any(), any())).thenReturn(successfulResult);
when(eventRunner.startNewManualSync(any())).thenReturn(successfulResult);

final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnectionNew(updateBody);
final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(expectedWithNewSchema.getSyncCatalog(), result.getSyncCatalog());

Expand Down Expand Up @@ -816,7 +731,7 @@ void testUpdateConnectionWithUpdatedSchemaPerStream() throws JsonValidationExcep
when(eventRunner.synchronousResetConnection(any(), any())).thenReturn(successfulResult);
when(eventRunner.startNewManualSync(any())).thenReturn(successfulResult);

final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnectionNew(updateBody);
final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(expectedWithNewSchema.getSyncCatalog(), result.getSyncCatalog());

Expand Down Expand Up @@ -870,7 +785,7 @@ void testUpdateConnectionNoStreamsToReset() throws JsonValidationException, Conf
when(connectionsHandler.updateConnection(any())).thenReturn(connectionRead);
when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(connectionRead);

final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnectionNew(updateBody);
final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(expectedWithNewSchema.getSyncCatalog(), result.getSyncCatalog());

Expand Down Expand Up @@ -915,7 +830,7 @@ void testUpdateConnectionWithSkipReset() throws JsonValidationException, ConfigN
.schedule(expected.getSchedule());
when(connectionsHandler.updateConnection(any())).thenReturn(connectionRead);

final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnectionNew(updateBody);
final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(expectedWithNewSchema.getSyncCatalog(), result.getSyncCatalog());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,6 @@ public WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connection
.sourceCatalogId(connection.getSourceCatalogId())
.status(connection.getStatus())
.prefix(connection.getPrefix())
.withRefreshedCatalog(true)
.skipReset(false);
}

Expand Down
Loading

0 comments on commit bc8a609

Please sign in to comment.