Skip to content

Commit

Permalink
Bmoric/extract state api (#18980)
Browse files Browse the repository at this point in the history
* Extract Operation API

* Extract scheduler API

* Format

* extract source api

* Extract source definition api

* Add path

* Extract State API

* Add missing binder

* fix type
  • Loading branch information
benmoriceau authored Nov 7, 2022
1 parent b7b6507 commit a16ecd6
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 5 deletions.
7 changes: 7 additions & 0 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.config.persistence.StatePersistence;
import io.airbyte.config.persistence.StreamResetPersistence;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
Expand Down Expand Up @@ -61,6 +62,7 @@
import io.airbyte.server.handlers.SchedulerHandler;
import io.airbyte.server.handlers.SourceDefinitionsHandler;
import io.airbyte.server.handlers.SourceHandler;
import io.airbyte.server.handlers.StateHandler;
import io.airbyte.server.handlers.WorkspacesHandler;
import io.airbyte.server.scheduler.DefaultSynchronousSchedulerClient;
import io.airbyte.server.scheduler.EventRunner;
Expand Down Expand Up @@ -320,6 +322,10 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,

final OpenApiConfigHandler openApiConfigHandler = new OpenApiConfigHandler();

final StatePersistence statePersistence = new StatePersistence(configsDatabase);

final StateHandler stateHandler = new StateHandler(statePersistence);

LOGGER.info("Starting server...");

return apiFactory.create(
Expand Down Expand Up @@ -353,6 +359,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
schedulerHandler,
sourceHandler,
sourceDefinitionsHandler,
stateHandler,
workspacesHandler);
}

Expand Down
15 changes: 13 additions & 2 deletions airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import io.airbyte.server.apis.SchedulerApiController;
import io.airbyte.server.apis.SourceApiController;
import io.airbyte.server.apis.SourceDefinitionApiController;
import io.airbyte.server.apis.SourceOauthApiController;
import io.airbyte.server.apis.StateApiController;
import io.airbyte.server.apis.binders.AttemptApiBinder;
import io.airbyte.server.apis.binders.ConnectionApiBinder;
import io.airbyte.server.apis.binders.DbMigrationBinder;
Expand All @@ -48,6 +50,7 @@
import io.airbyte.server.apis.binders.SourceApiBinder;
import io.airbyte.server.apis.binders.SourceDefinitionApiBinder;
import io.airbyte.server.apis.binders.SourceOauthApiBinder;
import io.airbyte.server.apis.binders.StateApiBinder;
import io.airbyte.server.apis.factories.AttemptApiFactory;
import io.airbyte.server.apis.factories.ConnectionApiFactory;
import io.airbyte.server.apis.factories.DbMigrationApiFactory;
Expand All @@ -65,6 +68,7 @@
import io.airbyte.server.apis.factories.SourceApiFactory;
import io.airbyte.server.apis.factories.SourceDefinitionApiFactory;
import io.airbyte.server.apis.factories.SourceOauthApiFactory;
import io.airbyte.server.apis.factories.StateApiFactory;
import io.airbyte.server.handlers.AttemptHandler;
import io.airbyte.server.handlers.ConnectionsHandler;
import io.airbyte.server.handlers.DbMigrationHandler;
Expand All @@ -79,6 +83,7 @@
import io.airbyte.server.handlers.SchedulerHandler;
import io.airbyte.server.handlers.SourceDefinitionsHandler;
import io.airbyte.server.handlers.SourceHandler;
import io.airbyte.server.handlers.StateHandler;
import io.airbyte.server.handlers.WorkspacesHandler;
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.server.scheduler.SynchronousSchedulerClient;
Expand Down Expand Up @@ -121,6 +126,7 @@ ServerRunnable create(final SynchronousSchedulerClient synchronousSchedulerClien
final SchedulerHandler schedulerHandler,
final SourceHandler sourceHandler,
final SourceDefinitionsHandler sourceDefinitionsHandler,
final StateHandler stateHandler,
final WorkspacesHandler workspacesHandler);

class Api implements ServerFactory {
Expand Down Expand Up @@ -156,6 +162,7 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
final SchedulerHandler schedulerHandler,
final SourceHandler sourceHandler,
final SourceDefinitionsHandler sourceDefinitionsHandler,
final StateHandler stateHandler,
final WorkspacesHandler workspacesHandler) {
final Map<String, String> mdc = MDC.getCopyOfContextMap();

Expand Down Expand Up @@ -218,6 +225,8 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul

SourceDefinitionApiFactory.setValues(sourceDefinitionsHandler);

StateApiFactory.setValues(stateHandler);

// server configurations
final Set<Class<?>> componentClasses = Set.of(
ConfigurationApi.class,
Expand All @@ -237,7 +246,8 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
SchedulerApiController.class,
SourceApiController.class,
SourceDefinitionApiController.class,
SourceOauthApiFactory.class);
SourceOauthApiController.class,
StateApiController.class);

final Set<Object> components = Set.of(
new CorsFilter(),
Expand All @@ -258,7 +268,8 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
new SchedulerApiBinder(),
new SourceApiBinder(),
new SourceDefinitionApiBinder(),
new SourceOauthApiBinder());
new SourceOauthApiBinder(),
new StateApiBinder());

// construct server
return new ServerApp(airbyteVersion, componentClasses, components);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ public void revokeSourceDefinitionFromWorkspace(final SourceDefinitionIdWithWork
}

@Override
public InternalOperationResult saveStats(SaveStatsRequestBody saveStatsRequestBody) {
public InternalOperationResult saveStats(final SaveStatsRequestBody saveStatsRequestBody) {
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -948,9 +948,13 @@ public OperationRead createOperation(final OperationCreate operationCreate) {
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link StateApiController}. Since the path of
* {@link StateApiController} is more granular, it will override this implementation
*/
@Override
public ConnectionState createOrUpdateState(final ConnectionStateCreateOrUpdate connectionStateCreateOrUpdate) {
return ConfigurationApi.execute(() -> stateHandler.createOrUpdateState(connectionStateCreateOrUpdate));
throw new NotImplementedException();
}

/**
Expand Down Expand Up @@ -989,9 +993,13 @@ public OperationRead updateOperation(final OperationUpdate operationUpdate) {
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link StateApiController}. Since the path of
* {@link StateApiController} is more granular, it will override this implementation
*/
@Override
public ConnectionState getState(final ConnectionIdRequestBody connectionIdRequestBody) {
return ConfigurationApi.execute(() -> stateHandler.getState(connectionIdRequestBody));
throw new NotImplementedException();
}

// SCHEDULER
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis;

import io.airbyte.api.generated.StateApi;
import io.airbyte.api.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.model.generated.ConnectionState;
import io.airbyte.api.model.generated.ConnectionStateCreateOrUpdate;
import io.airbyte.server.handlers.StateHandler;
import javax.ws.rs.Path;
import lombok.AllArgsConstructor;

@Path("/v1/state")
@AllArgsConstructor
public class StateApiController implements StateApi {

private final StateHandler stateHandler;

@Override
public ConnectionState createOrUpdateState(final ConnectionStateCreateOrUpdate connectionStateCreateOrUpdate) {
return ConfigurationApi.execute(() -> stateHandler.createOrUpdateState(connectionStateCreateOrUpdate));
}

@Override
public ConnectionState getState(final ConnectionIdRequestBody connectionIdRequestBody) {
return ConfigurationApi.execute(() -> stateHandler.getState(connectionIdRequestBody));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis.binders;

import io.airbyte.server.apis.StateApiController;
import io.airbyte.server.apis.factories.StateApiFactory;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.process.internal.RequestScoped;

public class StateApiBinder extends AbstractBinder {

@Override
protected void configure() {
bindFactory(StateApiFactory.class)
.to(StateApiController.class)
.in(RequestScoped.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis.factories;

import io.airbyte.server.apis.StateApiController;
import io.airbyte.server.handlers.StateHandler;
import org.glassfish.hk2.api.Factory;

public class StateApiFactory implements Factory<StateApiController> {

private static StateHandler stateHandler;

public static void setValues(final StateHandler stateHandler) {
StateApiFactory.stateHandler = stateHandler;
}

@Override
public StateApiController provide() {
return new StateApiController(StateApiFactory.stateHandler);
}

@Override
public void dispose(final StateApiController instance) {
/* no op */
}

}

0 comments on commit a16ecd6

Please sign in to comment.