Skip to content

Commit

Permalink
Bmoric/extract db migration api (airbytehq#18459)
Browse files Browse the repository at this point in the history
* Tmp

* Extract the Attempt API from the V1 API

* Add comments

* Move Connection API out of configuration API

* format

* format

* Rename to Controller

* Rename to Controller

* Add values to the factory

* Change the constructor to use hadler instead of objects needed by the handler

* Update with new tags.

* tmp

* Fix PMD errors

* Extract DB migrator

* Add something that I forgot

* restore destination factory initialization

* format
  • Loading branch information
benmoriceau authored and jhammarstedt committed Oct 31, 2022
1 parent c155054 commit 87d22f6
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,13 @@ public class ConfigurationApiFactory implements Factory<ConfigurationApi> {
private static SynchronousSchedulerClient synchronousSchedulerClient;
private static StatePersistence statePersistence;
private static Map<String, String> mdc;
private static Database configsDatabase;
private static Database jobsDatabase;
private static TrackingClient trackingClient;
private static WorkerEnvironment workerEnvironment;
private static LogConfigs logConfigs;
private static Path workspaceRoot;
private static AirbyteVersion airbyteVersion;
private static HttpClient httpClient;
private static EventRunner eventRunner;
private static Flyway configsFlyway;
private static Flyway jobsFlyway;

public static void setValues(
final ConfigRepository configRepository,
Expand All @@ -70,17 +66,13 @@ public static void setValues(
ConfigurationApiFactory.secretsRepositoryWriter = secretsRepositoryWriter;
ConfigurationApiFactory.synchronousSchedulerClient = synchronousSchedulerClient;
ConfigurationApiFactory.mdc = mdc;
ConfigurationApiFactory.configsDatabase = configsDatabase;
ConfigurationApiFactory.jobsDatabase = jobsDatabase;
ConfigurationApiFactory.trackingClient = trackingClient;
ConfigurationApiFactory.workerEnvironment = workerEnvironment;
ConfigurationApiFactory.logConfigs = logConfigs;
ConfigurationApiFactory.workspaceRoot = workspaceRoot;
ConfigurationApiFactory.airbyteVersion = airbyteVersion;
ConfigurationApiFactory.httpClient = httpClient;
ConfigurationApiFactory.eventRunner = eventRunner;
ConfigurationApiFactory.configsFlyway = configsFlyway;
ConfigurationApiFactory.jobsFlyway = jobsFlyway;
ConfigurationApiFactory.statePersistence = statePersistence;
}

Expand All @@ -94,18 +86,14 @@ public ConfigurationApi provide() {
ConfigurationApiFactory.secretsRepositoryReader,
ConfigurationApiFactory.secretsRepositoryWriter,
ConfigurationApiFactory.synchronousSchedulerClient,
ConfigurationApiFactory.configsDatabase,
ConfigurationApiFactory.jobsDatabase,
ConfigurationApiFactory.statePersistence,
ConfigurationApiFactory.trackingClient,
ConfigurationApiFactory.workerEnvironment,
ConfigurationApiFactory.logConfigs,
ConfigurationApiFactory.airbyteVersion,
ConfigurationApiFactory.workspaceRoot,
ConfigurationApiFactory.httpClient,
ConfigurationApiFactory.eventRunner,
ConfigurationApiFactory.configsFlyway,
ConfigurationApiFactory.jobsFlyway);
ConfigurationApiFactory.eventRunner);
}

@Override
Expand Down
3 changes: 3 additions & 0 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
configs.getLogConfigs(),
eventRunner);

final DbMigrationHandler dbMigrationHandler = new DbMigrationHandler(configsDatabase, configsFlyway, jobsDatabase, jobsFlyway);

LOGGER.info("Starting server...");

return apiFactory.create(
Expand All @@ -314,6 +316,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
jobsFlyway,
attemptHandler,
connectionsHandler,
dbMigrationHandler,
destinationHandler,
operationsHandler,
schedulerHandler);
Expand Down
12 changes: 10 additions & 2 deletions airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@
import io.airbyte.server.apis.AttemptApiController;
import io.airbyte.server.apis.ConfigurationApi;
import io.airbyte.server.apis.ConnectionApiController;
import io.airbyte.server.apis.DbMigrationApiController;
import io.airbyte.server.apis.DestinationApiController;
import io.airbyte.server.apis.binders.AttemptApiBinder;
import io.airbyte.server.apis.binders.ConnectionApiBinder;
import io.airbyte.server.apis.binders.DbMigrationBinder;
import io.airbyte.server.apis.binders.DestinationApiBinder;
import io.airbyte.server.apis.factories.AttemptApiFactory;
import io.airbyte.server.apis.factories.ConnectionApiFactory;
import io.airbyte.server.apis.factories.DbMigrationApiFactory;
import io.airbyte.server.apis.factories.DestinationApiFactory;
import io.airbyte.server.handlers.AttemptHandler;
import io.airbyte.server.handlers.ConnectionsHandler;
import io.airbyte.server.handlers.DbMigrationHandler;
import io.airbyte.server.handlers.DestinationHandler;
import io.airbyte.server.handlers.OperationsHandler;
import io.airbyte.server.handlers.SchedulerHandler;
Expand Down Expand Up @@ -58,6 +62,7 @@ ServerRunnable create(final SynchronousSchedulerClient synchronousSchedulerClien
final Flyway jobsFlyway,
final AttemptHandler attemptHandler,
final ConnectionsHandler connectionsHandler,
final DbMigrationHandler dbMigrationHandler,
final DestinationHandler destinationApiHandler,
final OperationsHandler operationsHandler,
final SchedulerHandler schedulerHandler);
Expand All @@ -83,6 +88,7 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
final Flyway jobsFlyway,
final AttemptHandler attemptHandler,
final ConnectionsHandler connectionsHandler,
final DbMigrationHandler dbMigrationHandler,
final DestinationHandler destinationApiHandler,
final OperationsHandler operationsHandler,
final SchedulerHandler schedulerHandler) {
Expand Down Expand Up @@ -117,13 +123,15 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
schedulerHandler,
mdc);

DbMigrationApiFactory.setValues(dbMigrationHandler, mdc);

DestinationApiFactory.setValues(destinationApiHandler, schedulerHandler, mdc);

// server configurations
final Set<Class<?>> componentClasses = Set.of(ConfigurationApi.class, AttemptApiController.class, ConnectionApiController.class,
DestinationApiController.class);
DbMigrationApiController.class, DestinationApiController.class);
final Set<Object> components = Set.of(new CorsFilter(), new ConfigurationApiBinder(), new AttemptApiBinder(), new ConnectionApiBinder(),
new DestinationApiBinder());
new DbMigrationBinder(), new DestinationApiBinder());

// construct server
return new ServerApp(airbyteVersion, componentClasses, components);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,11 @@
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.config.persistence.StatePersistence;
import io.airbyte.db.Database;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.server.errors.BadObjectSchemaKnownException;
import io.airbyte.server.errors.IdNotFoundKnownException;
import io.airbyte.server.handlers.ConnectionsHandler;
import io.airbyte.server.handlers.DbMigrationHandler;
import io.airbyte.server.handlers.DestinationDefinitionsHandler;
import io.airbyte.server.handlers.DestinationHandler;
import io.airbyte.server.handlers.HealthCheckHandler;
Expand All @@ -142,7 +140,6 @@
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.NotImplementedException;
import org.flywaydb.core.Flyway;

@javax.ws.rs.Path("/v1")
@Slf4j
Expand All @@ -163,7 +160,6 @@ public class ConfigurationApi implements io.airbyte.api.generated.V1Api {
private final HealthCheckHandler healthCheckHandler;
private final LogsHandler logsHandler;
private final OpenApiConfigHandler openApiConfigHandler;
private final DbMigrationHandler dbMigrationHandler;
private final OAuthHandler oAuthHandler;
private final WorkerEnvironment workerEnvironment;
private final LogConfigs logConfigs;
Expand All @@ -175,18 +171,14 @@ public ConfigurationApi(final ConfigRepository configRepository,
final SecretsRepositoryWriter secretsRepositoryWriter,

final SynchronousSchedulerClient synchronousSchedulerClient,
final Database configsDatabase,
final Database jobsDatabase,
final StatePersistence statePersistence,
final TrackingClient trackingClient,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final AirbyteVersion airbyteVersion,
final Path workspaceRoot,
final HttpClient httpClient,
final EventRunner eventRunner,
final Flyway configsFlyway,
final Flyway jobsFlyway) {
final EventRunner eventRunner) {
this.workerEnvironment = workerEnvironment;
this.logConfigs = logConfigs;
this.workspaceRoot = workspaceRoot;
Expand Down Expand Up @@ -249,7 +241,6 @@ public ConfigurationApi(final ConfigRepository configRepository,
healthCheckHandler = new HealthCheckHandler(configRepository);
logsHandler = new LogsHandler();
openApiConfigHandler = new OpenApiConfigHandler();
dbMigrationHandler = new DbMigrationHandler(configsDatabase, configsFlyway, jobsDatabase, jobsFlyway);
}

// WORKSPACE
Expand Down Expand Up @@ -499,14 +490,22 @@ public SourceDiscoverSchemaRead discoverSchemaForSource(final SourceDiscoverSche

// DB MIGRATION

/**
* This implementation has been moved to {@link DbMigrationApiController}. Since the path of
* {@link DbMigrationApiController} is more granular, it will override this implementation
*/
@Override
public DbMigrationReadList listMigrations(final DbMigrationRequestBody request) {
return execute(() -> dbMigrationHandler.list(request));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link DbMigrationApiController}. Since the path of
* {@link DbMigrationApiController} is more granular, it will override this implementation
*/
@Override
public DbMigrationExecutionRead executeMigrations(final DbMigrationRequestBody request) {
return execute(() -> dbMigrationHandler.migrate(request));
throw new NotImplementedException();
}

// DESTINATION
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis;

import io.airbyte.api.generated.DbMigrationApi;
import io.airbyte.api.model.generated.DbMigrationExecutionRead;
import io.airbyte.api.model.generated.DbMigrationReadList;
import io.airbyte.api.model.generated.DbMigrationRequestBody;
import io.airbyte.server.handlers.DbMigrationHandler;
import javax.ws.rs.Path;

@Path("/v1/db_migrations")
public class DbMigrationApiController implements DbMigrationApi {

private final DbMigrationHandler dbMigrationHandler;

public DbMigrationApiController(final DbMigrationHandler dbMigrationHandler) {
this.dbMigrationHandler = dbMigrationHandler;
}

@Override
public DbMigrationExecutionRead executeMigrations(final DbMigrationRequestBody dbMigrationRequestBody) {
return ConfigurationApi.execute(() -> dbMigrationHandler.migrate(dbMigrationRequestBody));
}

@Override
public DbMigrationReadList listMigrations(final DbMigrationRequestBody dbMigrationRequestBody) {
return ConfigurationApi.execute(() -> dbMigrationHandler.list(dbMigrationRequestBody));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis.binders;

import io.airbyte.server.apis.DbMigrationApiController;
import io.airbyte.server.apis.factories.DbMigrationApiFactory;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.process.internal.RequestScoped;

public class DbMigrationBinder extends AbstractBinder {

@Override
protected void configure() {
bindFactory(DbMigrationApiFactory.class)
.to(DbMigrationApiController.class)
.in(RequestScoped.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis.factories;

import io.airbyte.server.apis.DbMigrationApiController;
import io.airbyte.server.handlers.DbMigrationHandler;
import java.util.Map;
import org.glassfish.hk2.api.Factory;
import org.slf4j.MDC;

public class DbMigrationApiFactory implements Factory<DbMigrationApiController> {

private static DbMigrationHandler dbMigrationHandler;
private static Map<String, String> mdc;

public static void setValues(final DbMigrationHandler dbMigrationHandler, final Map<String, String> mdc) {
DbMigrationApiFactory.dbMigrationHandler = dbMigrationHandler;
DbMigrationApiFactory.mdc = mdc;
}

@Override
public DbMigrationApiController provide() {
MDC.setContextMap(DbMigrationApiFactory.mdc);

return new DbMigrationApiController(dbMigrationHandler);
}

@Override
public void dispose(final DbMigrationApiController instance) {
/* no op */
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.config.persistence.StatePersistence;
import io.airbyte.db.Database;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.server.scheduler.SynchronousSchedulerClient;
import java.net.http.HttpClient;
import java.nio.file.Path;
import org.flywaydb.core.Flyway;
import org.junit.jupiter.api.Test;

class ConfigurationApiTest {
Expand All @@ -40,18 +38,14 @@ void testImportDefinitions() {
mock(SecretsRepositoryReader.class),
mock(SecretsRepositoryWriter.class),
mock(SynchronousSchedulerClient.class),
mock(Database.class),
mock(Database.class),
mock(StatePersistence.class),
mock(TrackingClient.class),
WorkerEnvironment.DOCKER,
LogConfigs.EMPTY,
new AirbyteVersion("0.1.0-alpha"),
Path.of(""),
mock(HttpClient.class),
mock(EventRunner.class),
mock(Flyway.class),
mock(Flyway.class));
mock(EventRunner.class));

assertFalse(configurationApi.getHealthCheck().getAvailable());
}
Expand Down

0 comments on commit 87d22f6

Please sign in to comment.