Skip to content

Commit

Permalink
Add workspace state web backend API
Browse files Browse the repository at this point in the history
  • Loading branch information
timroes committed Feb 4, 2022
1 parent f0546ed commit 89720b1
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 22 deletions.
42 changes: 42 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1622,6 +1622,28 @@ paths:
$ref: "#/components/schemas/WebBackendConnectionReadList"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/web_backend/workspace/state:
post:
tags:
- web_backend
summary: Returns the current state of a workspace
operationId: webBackendGetWorkspaceState
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/WebBackendWorkspaceState"
responses:
"200":
description: Successfull operation
content:
application/json:
schema:
$ref: "#/components/schemas/WebBackendWorkspaceStateResult"
"404":
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/jobs/list:
post:
tags:
Expand Down Expand Up @@ -2040,6 +2062,26 @@ components:
properties:
workspaceId:
$ref: "#/components/schemas/WorkspaceId"
WebBackendWorkspaceState:
type: object
required:
- workspaceId
properties:
workspaceId:
$ref: "#/components/schemas/WorkspaceId"
WebBackendWorkspaceStateResult:
type: object
required:
- hasConnections
- hasSources
- hasDestinations
properties:
hasConnections:
type: boolean
hasSources:
type: boolean
hasDestinations:
type: boolean
# SLUG
SlugRequestBody:
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,25 @@ public void updateConnectionState(final UUID connectionId, final State state) th
}
}

public int countConnectionsForWorkspace(final UUID workspaceId) throws JsonValidationException, ConfigNotFoundException, IOException {
// TODO: Missing implementation
return 0;
}

public long countSourcesForWorkspace(final UUID workspaceId) throws JsonValidationException, IOException {
// TODO: Should use a dedicated count query in the persistence layer
return this.listSourceConnection().stream()
.filter(source -> Objects.equals(source.getWorkspaceId(), workspaceId))
.count();
}

public long countDestinationsForWorkspace(final UUID workspaceId) throws JsonValidationException, IOException {
// TODO: Should use a dedicated count query in the persistence layer
return this.listDestinationConnection().stream()
.filter(dest -> Objects.equals(dest.getWorkspaceId(), workspaceId) && !dest.getTombstone())
.count();
}

/**
* Converts between a dumpConfig() output and a replaceAllConfigs() input, by deserializing the
* string/jsonnode into the AirbyteConfig, Stream<Object<AirbyteConfig.getClassName()>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@
import io.airbyte.api.model.WebBackendConnectionRequestBody;
import io.airbyte.api.model.WebBackendConnectionSearch;
import io.airbyte.api.model.WebBackendConnectionUpdate;
import io.airbyte.api.model.WebBackendWorkspaceState;
import io.airbyte.api.model.WebBackendWorkspaceStateResult;
import io.airbyte.api.model.WorkspaceCreate;
import io.airbyte.api.model.WorkspaceGiveFeedback;
import io.airbyte.api.model.WorkspaceIdRequestBody;
Expand Down Expand Up @@ -114,7 +116,7 @@
import io.airbyte.server.handlers.SchedulerHandler;
import io.airbyte.server.handlers.SourceDefinitionsHandler;
import io.airbyte.server.handlers.SourceHandler;
import io.airbyte.server.handlers.WebBackendConnectionsHandler;
import io.airbyte.server.handlers.WebBackendHandler;
import io.airbyte.server.handlers.WorkspacesHandler;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
Expand All @@ -140,7 +142,7 @@ public class ConfigurationApi implements io.airbyte.api.V1Api {
private final OperationsHandler operationsHandler;
private final SchedulerHandler schedulerHandler;
private final JobHistoryHandler jobHistoryHandler;
private final WebBackendConnectionsHandler webBackendConnectionsHandler;
private final WebBackendHandler webBackendHandler;
private final HealthCheckHandler healthCheckHandler;
private final ArchiveHandler archiveHandler;
private final LogsHandler logsHandler;
Expand Down Expand Up @@ -210,15 +212,16 @@ public ConfigurationApi(final ConfigRepository configRepository,
jobHistoryHandler = new JobHistoryHandler(jobPersistence, workerEnvironment, logConfigs, connectionsHandler, sourceHandler,
sourceDefinitionsHandler, destinationHandler, destinationDefinitionsHandler, airbyteVersion);
oAuthHandler = new OAuthHandler(configRepository, httpClient, trackingClient);
webBackendConnectionsHandler = new WebBackendConnectionsHandler(
webBackendHandler = new WebBackendHandler(
connectionsHandler,
sourceHandler,
destinationHandler,
jobHistoryHandler,
schedulerHandler,
operationsHandler,
featureFlags,
temporalWorkerRunFactory);
temporalWorkerRunFactory,
configRepository);
healthCheckHandler = new HealthCheckHandler();
archiveHandler = new ArchiveHandler(
airbyteVersion,
Expand Down Expand Up @@ -677,32 +680,37 @@ public HealthCheckRead getHealthCheck() {

@Override
public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) {
return execute(() -> webBackendConnectionsHandler.webBackendListConnectionsForWorkspace(workspaceIdRequestBody));
return execute(() -> webBackendHandler.webBackendListConnectionsForWorkspace(workspaceIdRequestBody));
}

@Override
public WebBackendConnectionReadList webBackendListAllConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) {
return execute(() -> webBackendConnectionsHandler.webBackendListAllConnectionsForWorkspace(workspaceIdRequestBody));
return execute(() -> webBackendHandler.webBackendListAllConnectionsForWorkspace(workspaceIdRequestBody));
}

@Override
public WebBackendConnectionReadList webBackendSearchConnections(final WebBackendConnectionSearch webBackendConnectionSearch) {
return execute(() -> webBackendConnectionsHandler.webBackendSearchConnections(webBackendConnectionSearch));
return execute(() -> webBackendHandler.webBackendSearchConnections(webBackendConnectionSearch));
}

@Override
public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnectionRequestBody webBackendConnectionRequestBody) {
return execute(() -> webBackendConnectionsHandler.webBackendGetConnection(webBackendConnectionRequestBody));
return execute(() -> webBackendHandler.webBackendGetConnection(webBackendConnectionRequestBody));
}

@Override
public WebBackendConnectionRead webBackendCreateConnection(final WebBackendConnectionCreate webBackendConnectionCreate) {
return execute(() -> webBackendConnectionsHandler.webBackendCreateConnection(webBackendConnectionCreate));
return execute(() -> webBackendHandler.webBackendCreateConnection(webBackendConnectionCreate));
}

@Override
public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConnectionUpdate webBackendConnectionUpdate) {
return execute(() -> webBackendConnectionsHandler.webBackendUpdateConnection(webBackendConnectionUpdate));
return execute(() -> webBackendHandler.webBackendUpdateConnection(webBackendConnectionUpdate));
}

@Override
public WebBackendWorkspaceStateResult webBackendGetWorkspaceState(final WebBackendWorkspaceState webBackendWorkspaceState) {
return execute(() -> webBackendHandler.getWorkspaceState((webBackendWorkspaceState)));
}

// ARCHIVES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@
import io.airbyte.api.model.WebBackendConnectionSearch;
import io.airbyte.api.model.WebBackendConnectionUpdate;
import io.airbyte.api.model.WebBackendOperationCreateOrUpdate;
import io.airbyte.api.model.WebBackendWorkspaceState;
import io.airbyte.api.model.WebBackendWorkspaceStateResult;
import io.airbyte.api.model.WorkspaceIdRequestBody;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
import java.io.IOException;
Expand All @@ -59,7 +62,7 @@

@AllArgsConstructor
@Slf4j
public class WebBackendConnectionsHandler {
public class WebBackendHandler {

private static final Set<JobStatus> TERMINAL_STATUSES = Sets.newHashSet(JobStatus.FAILED, JobStatus.SUCCEEDED, JobStatus.CANCELLED);

Expand All @@ -71,6 +74,22 @@ public class WebBackendConnectionsHandler {
private final OperationsHandler operationsHandler;
private final FeatureFlags featureFlags;
private final TemporalWorkerRunFactory temporalWorkerRunFactory;
private final ConfigRepository configRepository;

public WebBackendWorkspaceStateResult getWorkspaceState(final WebBackendWorkspaceState webBackendWorkspaceState)
throws JsonValidationException, ConfigNotFoundException, IOException {
final var workspaceId = webBackendWorkspaceState.getWorkspaceId();
// final var connectionCount = configRepository.countConnectionsForWorkspace(workspaceId);
final var connectionCount =
connectionsHandler.listConnectionsForWorkspace(new WorkspaceIdRequestBody().workspaceId(workspaceId)).getConnections().size();
final var destinationCount = configRepository.countDestinationsForWorkspace(workspaceId);
final var sourceCount = configRepository.countSourcesForWorkspace(workspaceId);

return new WebBackendWorkspaceStateResult()
.hasConnections(connectionCount > 0)
.hasDestinations(destinationCount > 0)
.hasSources(sourceCount > 0);
}

public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody)
throws ConfigNotFoundException, IOException, JsonValidationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
Expand All @@ -85,12 +86,12 @@
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;

class WebBackendConnectionsHandlerTest {
class WebBackendHandlerTest {

private ConnectionsHandler connectionsHandler;
private OperationsHandler operationsHandler;
private SchedulerHandler schedulerHandler;
private WebBackendConnectionsHandler wbHandler;
private WebBackendHandler wbHandler;

private SourceRead sourceRead;
private ConnectionRead connectionRead;
Expand All @@ -107,11 +108,12 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE
final SourceHandler sourceHandler = mock(SourceHandler.class);
final DestinationHandler destinationHandler = mock(DestinationHandler.class);
final JobHistoryHandler jobHistoryHandler = mock(JobHistoryHandler.class);
final ConfigRepository configRepository = mock(ConfigRepository.class);
schedulerHandler = mock(SchedulerHandler.class);
featureFlags = mock(FeatureFlags.class);
temporalWorkerRunFactory = mock(TemporalWorkerRunFactory.class);
wbHandler = new WebBackendConnectionsHandler(connectionsHandler, sourceHandler, destinationHandler, jobHistoryHandler, schedulerHandler,
operationsHandler, featureFlags, temporalWorkerRunFactory);
wbHandler = new WebBackendHandler(connectionsHandler, sourceHandler, destinationHandler, jobHistoryHandler, schedulerHandler,
operationsHandler, featureFlags, temporalWorkerRunFactory, configRepository);

final StandardSourceDefinition standardSourceDefinition = SourceDefinitionHelpers.generateSourceDefinition();
final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID());
Expand Down Expand Up @@ -347,7 +349,7 @@ public void testToConnectionCreate() throws IOException {
.schedule(schedule)
.syncCatalog(catalog);

final ConnectionCreate actual = WebBackendConnectionsHandler.toConnectionCreate(input, operationIds);
final ConnectionCreate actual = WebBackendHandler.toConnectionCreate(input, operationIds);

assertEquals(expected, actual);
}
Expand Down Expand Up @@ -386,7 +388,7 @@ public void testToConnectionUpdate() throws IOException {
.schedule(schedule)
.syncCatalog(catalog);

final ConnectionUpdate actual = WebBackendConnectionsHandler.toConnectionUpdate(input, operationIds);
final ConnectionUpdate actual = WebBackendHandler.toConnectionUpdate(input, operationIds);

assertEquals(expected, actual);
}
Expand Down Expand Up @@ -470,7 +472,7 @@ void testUpdateConnectionWithOperations() throws JsonValidationException, Config
final WebBackendOperationCreateOrUpdate operationCreateOrUpdate = new WebBackendOperationCreateOrUpdate()
.name("Test Operation")
.operationId(connectionRead.getOperationIds().get(0));
final OperationUpdate operationUpdate = WebBackendConnectionsHandler.toOperationUpdate(operationCreateOrUpdate);
final OperationUpdate operationUpdate = WebBackendHandler.toOperationUpdate(operationCreateOrUpdate);
final WebBackendConnectionUpdate updateBody = new WebBackendConnectionUpdate()
.namespaceDefinition(expected.getNamespaceDefinition())
.namespaceFormat(expected.getNamespaceFormat())
Expand Down Expand Up @@ -579,7 +581,7 @@ void testUpdateConnectionWithUpdatedSchemaNewScheduler() throws JsonValidationEx
final ConnectionIdRequestBody connectionId = new ConnectionIdRequestBody().connectionId(connectionRead.getConnectionId());
verify(schedulerHandler, times(0)).resetConnection(connectionId);
verify(schedulerHandler, times(0)).syncConnection(connectionId);
InOrder orderVerifier = inOrder(temporalWorkerRunFactory);
final InOrder orderVerifier = inOrder(temporalWorkerRunFactory);
orderVerifier.verify(temporalWorkerRunFactory, times(1)).synchronousResetConnection(connectionId.getConnectionId());
orderVerifier.verify(temporalWorkerRunFactory, times(1)).startNewManualSync(connectionId.getConnectionId());
}
Expand Down Expand Up @@ -611,7 +613,7 @@ public void testUpdateSchemaWithDiscoveryFromEmpty() {
.primaryKey(Collections.emptyList())
.aliasName("stream1");

final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, discovered);
final AirbyteCatalog actual = WebBackendHandler.updateSchemaWithDiscovery(original, discovered);

assertEquals(expected, actual);
}
Expand Down Expand Up @@ -660,7 +662,7 @@ public void testUpdateSchemaWithDiscoveryResetStream() {
.primaryKey(Collections.emptyList())
.aliasName("stream1");

final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, discovered);
final AirbyteCatalog actual = WebBackendHandler.updateSchemaWithDiscovery(original, discovered);

assertEquals(expected, actual);
}
Expand Down Expand Up @@ -735,7 +737,7 @@ public void testUpdateSchemaWithDiscoveryMergeNewStream() {
.aliasName("stream2");
expected.getStreams().add(expectedNewStream);

final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, discovered);
final AirbyteCatalog actual = WebBackendHandler.updateSchemaWithDiscovery(original, discovered);

assertEquals(expected, actual);
}
Expand Down
Loading

0 comments on commit 89720b1

Please sign in to comment.