Skip to content

Commit

Permalink
Bmoric/extract connection api (#18409)
Browse files Browse the repository at this point in the history
* Tmp

* Extract the Attempt API from the V1 API

* Add comments

* Move Connection API out of configuration API

* format

* format

* Rename to Controller

* Rename to Controller

* Add values to the factory

* Change the constructor to use hadler instead of objects needed by the handler

* Update with new tags.

* Fix PMD errors

* Add explicit path to the controller
  • Loading branch information
benmoriceau authored and nataly committed Nov 3, 2022
1 parent b346fbd commit 2c027c4
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 46 deletions.
33 changes: 32 additions & 1 deletion airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.airbyte.persistence.job.DefaultJobPersistence;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.WebUrlHelper;
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.persistence.job.errorreporter.JobErrorReporter;
import io.airbyte.persistence.job.errorreporter.JobErrorReportingClient;
import io.airbyte.persistence.job.errorreporter.JobErrorReportingClientFactory;
Expand All @@ -53,7 +54,11 @@
import io.airbyte.server.errors.KnownExceptionMapper;
import io.airbyte.server.errors.NotFoundExceptionMapper;
import io.airbyte.server.errors.UncaughtExceptionMapper;
import io.airbyte.server.handlers.AttemptHandler;
import io.airbyte.server.handlers.ConnectionsHandler;
import io.airbyte.server.handlers.DbMigrationHandler;
import io.airbyte.server.handlers.OperationsHandler;
import io.airbyte.server.handlers.SchedulerHandler;
import io.airbyte.server.scheduler.DefaultSynchronousSchedulerClient;
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.server.scheduler.TemporalEventRunner;
Expand Down Expand Up @@ -255,6 +260,28 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
// "major" version bump as it will no longer be needed.
migrateExistingConnectionsToTemporalScheduler(configRepository, jobPersistence, eventRunner);

final WorkspaceHelper workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence);

final AttemptHandler attemptHandler = new AttemptHandler(jobPersistence);

final ConnectionsHandler connectionsHandler = new ConnectionsHandler(
configRepository,
workspaceHelper,
trackingClient,
eventRunner);

final OperationsHandler operationsHandler = new OperationsHandler(configRepository);

final SchedulerHandler schedulerHandler = new SchedulerHandler(
configRepository,
secretsRepositoryReader,
secretsRepositoryWriter,
syncSchedulerClient,
jobPersistence,
configs.getWorkerEnvironment(),
configs.getLogConfigs(),
eventRunner);

LOGGER.info("Starting server...");

return apiFactory.create(
Expand All @@ -273,7 +300,11 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
httpClient,
eventRunner,
configsFlyway,
jobsFlyway);
jobsFlyway,
attemptHandler,
connectionsHandler,
operationsHandler,
schedulerHandler);
}

@VisibleForTesting
Expand Down
66 changes: 45 additions & 21 deletions airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,46 @@
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.server.apis.AttemptApiController;
import io.airbyte.server.apis.ConfigurationApi;
import io.airbyte.server.apis.ConnectionApiController;
import io.airbyte.server.apis.binders.AttemptApiBinder;
import io.airbyte.server.apis.binders.ConnectionApiBinder;
import io.airbyte.server.apis.factories.AttemptApiFactory;
import io.airbyte.server.apis.factories.ConnectionApiFactory;
import io.airbyte.server.handlers.AttemptHandler;
import io.airbyte.server.handlers.ConnectionsHandler;
import io.airbyte.server.handlers.OperationsHandler;
import io.airbyte.server.handlers.SchedulerHandler;
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.server.scheduler.SynchronousSchedulerClient;
import java.net.http.HttpClient;
import java.nio.file.Path;
import java.util.Map;
import java.util.Set;
import org.flywaydb.core.Flyway;
import org.slf4j.MDC;

public interface ServerFactory {

ServerRunnable create(SynchronousSchedulerClient cachingSchedulerClient,
ConfigRepository configRepository,
SecretsRepositoryReader secretsRepositoryReader,
SecretsRepositoryWriter secretsRepositoryWriter,
JobPersistence jobPersistence,
Database configsDatabase,
Database jobsDatabase,
TrackingClient trackingClient,
WorkerEnvironment workerEnvironment,
LogConfigs logConfigs,
AirbyteVersion airbyteVersion,
Path workspaceRoot,
HttpClient httpClient,
EventRunner eventRunner,
Flyway configsFlyway,
Flyway jobsFlyway);
ServerRunnable create(final SynchronousSchedulerClient synchronousSchedulerClient,
final ConfigRepository configRepository,
final SecretsRepositoryReader secretsRepositoryReader,
final SecretsRepositoryWriter secretsRepositoryWriter,
final JobPersistence jobPersistence,
final Database configsDatabase,
final Database jobsDatabase,
final TrackingClient trackingClient,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final AirbyteVersion airbyteVersion,
final Path workspaceRoot,
final HttpClient httpClient,
final EventRunner eventRunner,
final Flyway configsFlyway,
final Flyway jobsFlyway,
final AttemptHandler attemptHandler,
final ConnectionsHandler connectionsHandler,
final OperationsHandler operationsHandler,
final SchedulerHandler schedulerHandler);

class Api implements ServerFactory {

Expand All @@ -63,7 +75,13 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
final HttpClient httpClient,
final EventRunner eventRunner,
final Flyway configsFlyway,
final Flyway jobsFlyway) {
final Flyway jobsFlyway,
final AttemptHandler attemptHandler,
final ConnectionsHandler connectionsHandler,
final OperationsHandler operationsHandler,
final SchedulerHandler schedulerHandler) {
final Map<String, String> mdc = MDC.getCopyOfContextMap();

// set static values for factory
ConfigurationApiFactory.setValues(
configRepository,
Expand All @@ -72,7 +90,7 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
jobPersistence,
synchronousSchedulerClient,
new StatePersistence(configsDatabase),
MDC.getCopyOfContextMap(),
mdc,
configsDatabase,
jobsDatabase,
trackingClient,
Expand All @@ -85,11 +103,17 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
configsFlyway,
jobsFlyway);

AttemptApiFactory.setValues(jobPersistence, MDC.getCopyOfContextMap());
AttemptApiFactory.setValues(attemptHandler, mdc);

ConnectionApiFactory.setValues(
connectionsHandler,
operationsHandler,
schedulerHandler,
mdc);

// server configurations
final Set<Class<?>> componentClasses = Set.of(ConfigurationApi.class, AttemptApiController.class);
final Set<Object> components = Set.of(new CorsFilter(), new ConfigurationApiBinder(), new AttemptApiBinder());
final Set<Class<?>> componentClasses = Set.of(ConfigurationApi.class, AttemptApiController.class, ConnectionApiController.class);
final Set<Object> components = Set.of(new CorsFilter(), new ConfigurationApiBinder(), new AttemptApiBinder(), new ConnectionApiBinder());

// construct server
return new ServerApp(airbyteVersion, componentClasses, components);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.airbyte.api.generated.AttemptApi;
import io.airbyte.api.model.generated.InternalOperationResult;
import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.server.handlers.AttemptHandler;
import javax.ws.rs.Path;

Expand All @@ -16,8 +15,8 @@ public class AttemptApiController implements AttemptApi {

private final AttemptHandler attemptHandler;

public AttemptApiController(final JobPersistence jobPersistence) {
attemptHandler = new AttemptHandler(jobPersistence);
public AttemptApiController(final AttemptHandler attemptHandler) {
this.attemptHandler = attemptHandler;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,53 +653,85 @@ public CheckConnectionRead checkConnectionToDestinationForUpdate(final Destinati

// CONNECTION

/**
* This implementation has been moved to {@link ConnectionApiController}. Since the path of
* {@link ConnectionApiController} is more granular, it will override this implementation
*/
@Override
public ConnectionRead createConnection(final ConnectionCreate connectionCreate) {
return execute(() -> connectionsHandler.createConnection(connectionCreate));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiController}. Since the path of
* {@link ConnectionApiController} is more granular, it will override this implementation
*/
@Override
public ConnectionRead updateConnection(final ConnectionUpdate connectionUpdate) {
return execute(() -> connectionsHandler.updateConnection(connectionUpdate));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiController}. Since the path of
* {@link ConnectionApiController} is more granular, it will override this implementation
*/
@Override
public ConnectionReadList listConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) {
return execute(() -> connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiController}. Since the path of
* {@link ConnectionApiController} is more granular, it will override this implementation
*/
@Override
public ConnectionReadList listAllConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) {
return execute(() -> connectionsHandler.listAllConnectionsForWorkspace(workspaceIdRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiController}. Since the path of
* {@link ConnectionApiController} is more granular, it will override this implementation
*/
@Override
public ConnectionReadList searchConnections(final ConnectionSearch connectionSearch) {
return execute(() -> connectionsHandler.searchConnections(connectionSearch));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiController}. Since the path of
* {@link ConnectionApiController} is more granular, it will override this implementation
*/
@Override
public ConnectionRead getConnection(final ConnectionIdRequestBody connectionIdRequestBody) {
return execute(() -> connectionsHandler.getConnection(connectionIdRequestBody.getConnectionId()));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiController}. Since the path of
* {@link ConnectionApiController} is more granular, it will override this implementation
*/
@Override
public void deleteConnection(final ConnectionIdRequestBody connectionIdRequestBody) {
execute(() -> {
operationsHandler.deleteOperationsForConnection(connectionIdRequestBody);
connectionsHandler.deleteConnection(connectionIdRequestBody.getConnectionId());
return null;
});
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiController}. Since the path of
* {@link ConnectionApiController} is more granular, it will override this implementation
*/
@Override
public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody) {
return execute(() -> schedulerHandler.syncConnection(connectionIdRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiController}. Since the path of
* {@link ConnectionApiController} is more granular, it will override this implementation
*/
@Override
public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdRequestBody) {
return execute(() -> schedulerHandler.resetConnection(connectionIdRequestBody));
throw new NotImplementedException();
}

// Operations
Expand All @@ -716,7 +748,7 @@ public OperationRead createOperation(final OperationCreate operationCreate) {

@Override
public ConnectionState createOrUpdateState(final ConnectionStateCreateOrUpdate connectionStateCreateOrUpdate) {
return execute(() -> stateHandler.createOrUpdateState(connectionStateCreateOrUpdate));
return ConfigurationApi.execute(() -> stateHandler.createOrUpdateState(connectionStateCreateOrUpdate));
}

@Override
Expand Down Expand Up @@ -744,7 +776,7 @@ public OperationRead updateOperation(final OperationUpdate operationUpdate) {

@Override
public ConnectionState getState(final ConnectionIdRequestBody connectionIdRequestBody) {
return execute(() -> stateHandler.getState(connectionIdRequestBody));
return ConfigurationApi.execute(() -> stateHandler.getState(connectionIdRequestBody));
}

// SCHEDULER
Expand Down Expand Up @@ -840,7 +872,7 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne

@Override
public ConnectionStateType getStateType(final ConnectionIdRequestBody connectionIdRequestBody) {
return execute(() -> webBackendConnectionsHandler.getStateType(connectionIdRequestBody));
return ConfigurationApi.execute(() -> webBackendConnectionsHandler.getStateType(connectionIdRequestBody));
}

@Override
Expand Down
Loading

0 comments on commit 2c027c4

Please sign in to comment.