Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bmoric/extract connection api #18409

Merged
merged 18 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.airbyte.persistence.job.DefaultJobPersistence;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.WebUrlHelper;
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.persistence.job.errorreporter.JobErrorReporter;
import io.airbyte.persistence.job.errorreporter.JobErrorReportingClient;
import io.airbyte.persistence.job.errorreporter.JobErrorReportingClientFactory;
Expand All @@ -53,7 +54,11 @@
import io.airbyte.server.errors.KnownExceptionMapper;
import io.airbyte.server.errors.NotFoundExceptionMapper;
import io.airbyte.server.errors.UncaughtExceptionMapper;
import io.airbyte.server.handlers.AttemptHandler;
import io.airbyte.server.handlers.ConnectionsHandler;
import io.airbyte.server.handlers.DbMigrationHandler;
import io.airbyte.server.handlers.OperationsHandler;
import io.airbyte.server.handlers.SchedulerHandler;
import io.airbyte.server.scheduler.DefaultSynchronousSchedulerClient;
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.server.scheduler.TemporalEventRunner;
Expand Down Expand Up @@ -255,6 +260,28 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
// "major" version bump as it will no longer be needed.
migrateExistingConnectionsToTemporalScheduler(configRepository, jobPersistence, eventRunner);

final WorkspaceHelper workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence);

final AttemptHandler attemptHandler = new AttemptHandler(jobPersistence);

final ConnectionsHandler connectionsHandler = new ConnectionsHandler(
configRepository,
workspaceHelper,
trackingClient,
eventRunner);

final OperationsHandler operationsHandler = new OperationsHandler(configRepository);

final SchedulerHandler schedulerHandler = new SchedulerHandler(
configRepository,
secretsRepositoryReader,
secretsRepositoryWriter,
syncSchedulerClient,
jobPersistence,
configs.getWorkerEnvironment(),
configs.getLogConfigs(),
eventRunner);

LOGGER.info("Starting server...");

return apiFactory.create(
Expand All @@ -273,7 +300,11 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
httpClient,
eventRunner,
configsFlyway,
jobsFlyway);
jobsFlyway,
attemptHandler,
connectionsHandler,
operationsHandler,
schedulerHandler);
}

@VisibleForTesting
Expand Down
66 changes: 45 additions & 21 deletions airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,46 @@
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.server.apis.AttemptApiController;
import io.airbyte.server.apis.ConfigurationApi;
import io.airbyte.server.apis.ConnectionApiController;
import io.airbyte.server.apis.binders.AttemptApiBinder;
import io.airbyte.server.apis.binders.ConnectionApiBinder;
import io.airbyte.server.apis.factories.AttemptApiFactory;
import io.airbyte.server.apis.factories.ConnectionApiFactory;
import io.airbyte.server.handlers.AttemptHandler;
import io.airbyte.server.handlers.ConnectionsHandler;
import io.airbyte.server.handlers.OperationsHandler;
import io.airbyte.server.handlers.SchedulerHandler;
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.server.scheduler.SynchronousSchedulerClient;
import java.net.http.HttpClient;
import java.nio.file.Path;
import java.util.Map;
import java.util.Set;
import org.flywaydb.core.Flyway;
import org.slf4j.MDC;

public interface ServerFactory {

ServerRunnable create(SynchronousSchedulerClient cachingSchedulerClient,
ConfigRepository configRepository,
SecretsRepositoryReader secretsRepositoryReader,
SecretsRepositoryWriter secretsRepositoryWriter,
JobPersistence jobPersistence,
Database configsDatabase,
Database jobsDatabase,
TrackingClient trackingClient,
WorkerEnvironment workerEnvironment,
LogConfigs logConfigs,
AirbyteVersion airbyteVersion,
Path workspaceRoot,
HttpClient httpClient,
EventRunner eventRunner,
Flyway configsFlyway,
Flyway jobsFlyway);
ServerRunnable create(final SynchronousSchedulerClient synchronousSchedulerClient,
final ConfigRepository configRepository,
final SecretsRepositoryReader secretsRepositoryReader,
final SecretsRepositoryWriter secretsRepositoryWriter,
final JobPersistence jobPersistence,
final Database configsDatabase,
final Database jobsDatabase,
final TrackingClient trackingClient,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final AirbyteVersion airbyteVersion,
final Path workspaceRoot,
final HttpClient httpClient,
final EventRunner eventRunner,
final Flyway configsFlyway,
final Flyway jobsFlyway,
final AttemptHandler attemptHandler,
final ConnectionsHandler connectionsHandler,
final OperationsHandler operationsHandler,
final SchedulerHandler schedulerHandler);

class Api implements ServerFactory {

Expand All @@ -63,7 +75,13 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
final HttpClient httpClient,
final EventRunner eventRunner,
final Flyway configsFlyway,
final Flyway jobsFlyway) {
final Flyway jobsFlyway,
final AttemptHandler attemptHandler,
final ConnectionsHandler connectionsHandler,
final OperationsHandler operationsHandler,
final SchedulerHandler schedulerHandler) {
final Map<String, String> mdc = MDC.getCopyOfContextMap();

// set static values for factory
ConfigurationApiFactory.setValues(
configRepository,
Expand All @@ -72,7 +90,7 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
jobPersistence,
synchronousSchedulerClient,
new StatePersistence(configsDatabase),
MDC.getCopyOfContextMap(),
mdc,
configsDatabase,
jobsDatabase,
trackingClient,
Expand All @@ -85,11 +103,17 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
configsFlyway,
jobsFlyway);

AttemptApiFactory.setValues(jobPersistence, MDC.getCopyOfContextMap());
AttemptApiFactory.setValues(attemptHandler, mdc);

ConnectionApiFactory.setValues(
connectionsHandler,
operationsHandler,
schedulerHandler,
mdc);

// server configurations
final Set<Class<?>> componentClasses = Set.of(ConfigurationApi.class, AttemptApiController.class);
final Set<Object> components = Set.of(new CorsFilter(), new ConfigurationApiBinder(), new AttemptApiBinder());
final Set<Class<?>> componentClasses = Set.of(ConfigurationApi.class, AttemptApiController.class, ConnectionApiController.class);
final Set<Object> components = Set.of(new CorsFilter(), new ConfigurationApiBinder(), new AttemptApiBinder(), new ConnectionApiBinder());

// construct server
return new ServerApp(airbyteVersion, componentClasses, components);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.airbyte.api.generated.AttemptApi;
import io.airbyte.api.model.generated.InternalOperationResult;
import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.server.handlers.AttemptHandler;
import javax.ws.rs.Path;

Expand All @@ -16,8 +15,8 @@ public class AttemptApiController implements AttemptApi {

private final AttemptHandler attemptHandler;

public AttemptApiController(final JobPersistence jobPersistence) {
attemptHandler = new AttemptHandler(jobPersistence);
public AttemptApiController(final AttemptHandler attemptHandler) {
this.attemptHandler = attemptHandler;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,53 +653,85 @@ public CheckConnectionRead checkConnectionToDestinationForUpdate(final Destinati

// CONNECTION

/**
* This implementation has been moved to {@link ConnectionApiController}. Since the path of
* {@link ConnectionApiController} is more granular, it will override this implementation
*/
@Override
public ConnectionRead createConnection(final ConnectionCreate connectionCreate) {
return execute(() -> connectionsHandler.createConnection(connectionCreate));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiController}. Since the path of
* {@link ConnectionApiController} is more granular, it will override this implementation
*/
@Override
public ConnectionRead updateConnection(final ConnectionUpdate connectionUpdate) {
return execute(() -> connectionsHandler.updateConnection(connectionUpdate));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiController}. Since the path of
* {@link ConnectionApiController} is more granular, it will override this implementation
*/
@Override
public ConnectionReadList listConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) {
return execute(() -> connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiController}. Since the path of
* {@link ConnectionApiController} is more granular, it will override this implementation
*/
@Override
public ConnectionReadList listAllConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) {
return execute(() -> connectionsHandler.listAllConnectionsForWorkspace(workspaceIdRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiController}. Since the path of
* {@link ConnectionApiController} is more granular, it will override this implementation
*/
@Override
public ConnectionReadList searchConnections(final ConnectionSearch connectionSearch) {
return execute(() -> connectionsHandler.searchConnections(connectionSearch));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiController}. Since the path of
* {@link ConnectionApiController} is more granular, it will override this implementation
*/
@Override
public ConnectionRead getConnection(final ConnectionIdRequestBody connectionIdRequestBody) {
return execute(() -> connectionsHandler.getConnection(connectionIdRequestBody.getConnectionId()));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiController}. Since the path of
* {@link ConnectionApiController} is more granular, it will override this implementation
*/
@Override
public void deleteConnection(final ConnectionIdRequestBody connectionIdRequestBody) {
execute(() -> {
operationsHandler.deleteOperationsForConnection(connectionIdRequestBody);
connectionsHandler.deleteConnection(connectionIdRequestBody.getConnectionId());
return null;
});
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiController}. Since the path of
* {@link ConnectionApiController} is more granular, it will override this implementation
*/
@Override
public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody) {
return execute(() -> schedulerHandler.syncConnection(connectionIdRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiController}. Since the path of
* {@link ConnectionApiController} is more granular, it will override this implementation
*/
@Override
public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdRequestBody) {
return execute(() -> schedulerHandler.resetConnection(connectionIdRequestBody));
throw new NotImplementedException();
}

// Operations
Expand All @@ -716,7 +748,7 @@ public OperationRead createOperation(final OperationCreate operationCreate) {

@Override
public ConnectionState createOrUpdateState(final ConnectionStateCreateOrUpdate connectionStateCreateOrUpdate) {
return execute(() -> stateHandler.createOrUpdateState(connectionStateCreateOrUpdate));
return ConfigurationApi.execute(() -> stateHandler.createOrUpdateState(connectionStateCreateOrUpdate));
}

@Override
Expand Down Expand Up @@ -744,7 +776,7 @@ public OperationRead updateOperation(final OperationUpdate operationUpdate) {

@Override
public ConnectionState getState(final ConnectionIdRequestBody connectionIdRequestBody) {
return execute(() -> stateHandler.getState(connectionIdRequestBody));
return ConfigurationApi.execute(() -> stateHandler.getState(connectionIdRequestBody));
}

// SCHEDULER
Expand Down Expand Up @@ -840,7 +872,7 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne

@Override
public ConnectionStateType getStateType(final ConnectionIdRequestBody connectionIdRequestBody) {
return execute(() -> webBackendConnectionsHandler.getStateType(connectionIdRequestBody));
return ConfigurationApi.execute(() -> webBackendConnectionsHandler.getStateType(connectionIdRequestBody));
}

@Override
Expand Down
Loading