Skip to content

Commit

Permalink
Bmoric/convert webbackend micronaut (#20403)
Browse files Browse the repository at this point in the history
* Convert jobs to micronaut

* Nit

* Format

* Migrate the webbackend to micronaut

* Add missing Bean
  • Loading branch information
benmoriceau authored Dec 13, 2022
1 parent a8c11d2 commit 6a480ef
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 96 deletions.
4 changes: 2 additions & 2 deletions airbyte-proxy/nginx-auth.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ http {
}
}

location ~ ^/api/v1/(connections|destinations|destination_definitions|destination_definition_specifications|destination_oauths|jobs|logs|notifications|operations|scheduler|source_oauths|sources|source_definitions|source_definition_specifications|state|workspaces)/.* {
location ~ ^/api/v1/(connections|destinations|destination_definitions|destination_definition_specifications|destination_oauths|jobs|logs|notifications|operations|scheduler|source_oauths|sources|source_definitions|source_definition_specifications|state|web_backend|workspaces)/.* {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
Expand Down Expand Up @@ -103,7 +103,7 @@ http {
}
}

location ~ ^/api/v1/(connections|destinations|destination_definitions|destination_definition_specifications|destination_oauths|jobs|logs|notifications|operations|scheduler|source_oauths|sources|source_definitions|source_definition_specifications|state|workspaces)/.* {
location ~ ^/api/v1/(connections|destinations|destination_definitions|destination_definition_specifications|destination_oauths|jobs|logs|notifications|operations|scheduler|source_oauths|sources|source_definitions|source_definition_specifications|state|web_backend|workspaces)/.* {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
Expand Down
4 changes: 2 additions & 2 deletions airbyte-proxy/nginx-no-auth.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ http {
proxy_pass "${PROXY_PASS_MICRONAUT_API}";
}

location ~ ^/api/v1/(connections|destinations|destination_definitions|destination_definition_specifications|destination_oauths|jobs|logs|notifications|operations|scheduler|source_oauths|sources|source_definitions|source_definition_specifications|state|workspaces)/.* {
location ~ ^/api/v1/(connections|destinations|destination_definitions|destination_definition_specifications|destination_oauths|jobs|logs|notifications|operations|scheduler|source_oauths|sources|source_definitions|source_definition_specifications|state|web_backend|workspaces)/.* {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
Expand Down Expand Up @@ -58,7 +58,7 @@ http {
proxy_pass "${PROXY_PASS_MICRONAUT_API}";
}

location ~ ^/api/v1/(connections|destinations|destination_definitions|destination_definition_specifications|destination_oauths|jobs|logs|notifications|operations|scheduler|source_oauths|sources|source_definitions|source_definition_specifications|state|workspaces)/.* {
location ~ ^/api/v1/(connections|destinations|destination_definitions|destination_definition_specifications|destination_oauths|jobs|logs|notifications|operations|scheduler|source_oauths|sources|source_definitions|source_definition_specifications|state|web_backend|workspaces)/.* {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
Expand Down
17 changes: 2 additions & 15 deletions airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.db.Database;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.server.apis.LogsApiController;
import io.airbyte.server.apis.NotificationsApiController;
import io.airbyte.server.apis.WebBackendApiController;
import io.airbyte.server.apis.binders.WebBackendApiBinder;
import io.airbyte.server.apis.factories.WebBackendApiFactory;
import io.airbyte.server.handlers.AttemptHandler;
import io.airbyte.server.handlers.ConnectionsHandler;
import io.airbyte.server.handlers.DestinationDefinitionsHandler;
Expand All @@ -38,7 +33,7 @@
import io.airbyte.server.scheduler.SynchronousSchedulerClient;
import java.net.http.HttpClient;
import java.nio.file.Path;
import java.util.Set;
import java.util.HashSet;
import org.flywaydb.core.Flyway;

public interface ServerFactory {
Expand Down Expand Up @@ -112,16 +107,8 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
final WebBackendGeographiesHandler webBackendGeographiesHandler,
final WebBackendCheckUpdatesHandler webBackendCheckUpdatesHandler) {

WebBackendApiFactory.setValues(webBackendConnectionsHandler, webBackendGeographiesHandler, webBackendCheckUpdatesHandler);

final Set<Class<?>> componentClasses = Set.of(
WebBackendApiController.class);

final Set<Object> components = Set.of(
new WebBackendApiBinder());

// construct server
return new ServerApp(airbyteVersion, componentClasses, components);
return new ServerApp(airbyteVersion, new HashSet<>(), new HashSet<>());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,52 +20,68 @@
import io.airbyte.server.handlers.WebBackendCheckUpdatesHandler;
import io.airbyte.server.handlers.WebBackendConnectionsHandler;
import io.airbyte.server.handlers.WebBackendGeographiesHandler;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import javax.ws.rs.Path;
import lombok.AllArgsConstructor;

@Path("/v1/web_backend")
@AllArgsConstructor
@Controller("/api/v1/web_backend")
public class WebBackendApiController implements WebBackendApi {

private final WebBackendConnectionsHandler webBackendConnectionsHandler;
private final WebBackendGeographiesHandler webBackendGeographiesHandler;
private final WebBackendCheckUpdatesHandler webBackendCheckUpdatesHandler;

public WebBackendApiController(final WebBackendConnectionsHandler webBackendConnectionsHandler,
final WebBackendGeographiesHandler webBackendGeographiesHandler,
final WebBackendCheckUpdatesHandler webBackendCheckUpdatesHandler) {
this.webBackendConnectionsHandler = webBackendConnectionsHandler;
this.webBackendGeographiesHandler = webBackendGeographiesHandler;
this.webBackendCheckUpdatesHandler = webBackendCheckUpdatesHandler;
}

@Post("/state/get_type")
@Override
public ConnectionStateType getStateType(final ConnectionIdRequestBody connectionIdRequestBody) {
return ApiHelper.execute(() -> webBackendConnectionsHandler.getStateType(connectionIdRequestBody));
}

@Post("/check_updates")
@Override
public WebBackendCheckUpdatesRead webBackendCheckUpdates() {
return ApiHelper.execute(webBackendCheckUpdatesHandler::checkUpdates);
}

@Post("/connections/create")
@Override
public WebBackendConnectionRead webBackendCreateConnection(final WebBackendConnectionCreate webBackendConnectionCreate) {
return ApiHelper.execute(() -> webBackendConnectionsHandler.webBackendCreateConnection(webBackendConnectionCreate));
}

@Post("/connections/get")
@Override
public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnectionRequestBody webBackendConnectionRequestBody) {
return ApiHelper.execute(() -> webBackendConnectionsHandler.webBackendGetConnection(webBackendConnectionRequestBody));
}

@Post("/workspace/state")
@Override
public WebBackendWorkspaceStateResult webBackendGetWorkspaceState(final WebBackendWorkspaceState webBackendWorkspaceState) {
return ApiHelper.execute(() -> webBackendConnectionsHandler.getWorkspaceState(webBackendWorkspaceState));
}

@Post("/connections/list")
@Override
public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) {
return ApiHelper.execute(() -> webBackendConnectionsHandler.webBackendListConnectionsForWorkspace(workspaceIdRequestBody));
}

@Post("/geographies/list")
@Override
public WebBackendGeographiesListResult webBackendListGeographies() {
return ApiHelper.execute(webBackendGeographiesHandler::listGeographiesOSS);
}

@Post("/connections/update")
@Override
public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConnectionUpdate webBackendConnectionUpdate) {
return ApiHelper.execute(() -> webBackendConnectionsHandler.webBackendUpdateConnection(webBackendConnectionUpdate));
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.StatePersistence;
import io.airbyte.config.persistence.StreamResetPersistence;
import io.airbyte.db.Database;
import io.airbyte.db.check.DatabaseMigrationCheck;
Expand Down Expand Up @@ -89,6 +90,13 @@ public JobPersistence jobPersistence(@Named("configDatabase") final Database job
return new DefaultJobPersistence(jobDatabase);
}

@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
public StatePersistence statePersistence(@Named("configDatabase") final Database configDatabase) {
return new StatePersistence(configDatabase);
}


@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
@Named("configsDatabaseMigrationCheck")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,29 @@
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.server.services.AirbyteGithubStore;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@AllArgsConstructor
@Slf4j
@Singleton
public class WebBackendCheckUpdatesHandler {

private static final int NO_CHANGES_FOUND = 0;

final ConfigRepository configRepository;
final AirbyteGithubStore githubStore;

public WebBackendCheckUpdatesHandler(final ConfigRepository configRepository, final AirbyteGithubStore githubStore) {
this.configRepository = configRepository;
this.githubStore = githubStore;
}

public WebBackendCheckUpdatesRead checkUpdates() {

final int destinationDiffCount = getDestinationDiffCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.helper.ProtocolConverters;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -66,16 +67,10 @@
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AllArgsConstructor
@Slf4j
@Singleton
public class WebBackendConnectionsHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(WebBackendConnectionsHandler.class);
private final ConnectionsHandler connectionsHandler;
private final StateHandler stateHandler;
private final SourceHandler sourceHandler;
Expand All @@ -87,6 +82,26 @@ public class WebBackendConnectionsHandler {
// todo (cgardens) - this handler should NOT have access to the db. only access via handler.
private final ConfigRepository configRepository;

public WebBackendConnectionsHandler(final ConnectionsHandler connectionsHandler,
final StateHandler stateHandler,
final SourceHandler sourceHandler,
final DestinationHandler destinationHandler,
final JobHistoryHandler jobHistoryHandler,
final SchedulerHandler schedulerHandler,
final OperationsHandler operationsHandler,
final EventRunner eventRunner,
final ConfigRepository configRepository) {
this.connectionsHandler = connectionsHandler;
this.stateHandler = stateHandler;
this.sourceHandler = sourceHandler;
this.destinationHandler = destinationHandler;
this.jobHistoryHandler = jobHistoryHandler;
this.schedulerHandler = schedulerHandler;
this.operationsHandler = operationsHandler;
this.eventRunner = eventRunner;
this.configRepository = configRepository;
}

public WebBackendWorkspaceStateResult getWorkspaceState(final WebBackendWorkspaceState webBackendWorkspaceState) throws IOException {
final var workspaceId = webBackendWorkspaceState.getWorkspaceId();
final var connectionCount = configRepository.countConnectionsForWorkspace(workspaceId);
Expand Down Expand Up @@ -375,7 +390,7 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
return buildWebBackendConnectionRead(connection, currentSourceCatalogId).catalogDiff(diff);
}

private AirbyteCatalog updateSchemaWithOriginalDiscoveredCatalog(AirbyteCatalog configuredCatalog, AirbyteCatalog originalDiscoveredCatalog) {
private AirbyteCatalog updateSchemaWithOriginalDiscoveredCatalog(final AirbyteCatalog configuredCatalog, final AirbyteCatalog originalDiscoveredCatalog) {
// We pass the original discovered catalog in as the "new" discovered catalog.
return updateSchemaWithRefreshedDiscoveredCatalog(configuredCatalog, originalDiscoveredCatalog, originalDiscoveredCatalog);
}
Expand Down Expand Up @@ -404,7 +419,7 @@ private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceI
*/
@VisibleForTesting
protected static AirbyteCatalog updateSchemaWithRefreshedDiscoveredCatalog(final AirbyteCatalog originalConfigured,
AirbyteCatalog originalDiscovered,
final AirbyteCatalog originalDiscovered,
final AirbyteCatalog discovered) {
/*
* We can't directly use s.getStream() as the key, because it contains a bunch of other fields, so
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@

import io.airbyte.api.model.generated.Geography;
import io.airbyte.api.model.generated.WebBackendGeographiesListResult;
import jakarta.inject.Singleton;
import java.util.Arrays;
import java.util.Collections;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@AllArgsConstructor
@Slf4j
@Singleton
public class WebBackendGeographiesHandler {

public WebBackendGeographiesListResult listGeographiesOSS() {
Expand Down

0 comments on commit 6a480ef

Please sign in to comment.