diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 411e5d959928..18090670eec3 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -21,6 +21,7 @@ import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.SecretsRepositoryReader; import io.airbyte.config.persistence.SecretsRepositoryWriter; +import io.airbyte.config.persistence.StatePersistence; import io.airbyte.config.persistence.StreamResetPersistence; import io.airbyte.config.persistence.split_secrets.SecretPersistence; import io.airbyte.config.persistence.split_secrets.SecretsHydrator; @@ -61,6 +62,7 @@ import io.airbyte.server.handlers.SchedulerHandler; import io.airbyte.server.handlers.SourceDefinitionsHandler; import io.airbyte.server.handlers.SourceHandler; +import io.airbyte.server.handlers.StateHandler; import io.airbyte.server.handlers.WorkspacesHandler; import io.airbyte.server.scheduler.DefaultSynchronousSchedulerClient; import io.airbyte.server.scheduler.EventRunner; @@ -320,6 +322,10 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final OpenApiConfigHandler openApiConfigHandler = new OpenApiConfigHandler(); + final StatePersistence statePersistence = new StatePersistence(configsDatabase); + + final StateHandler stateHandler = new StateHandler(statePersistence); + LOGGER.info("Starting server..."); return apiFactory.create( @@ -353,6 +359,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, schedulerHandler, sourceHandler, sourceDefinitionsHandler, + stateHandler, workspacesHandler); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java b/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java index 87bc02784a1f..48e6cac302ec 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java @@ -31,6 +31,8 @@ import io.airbyte.server.apis.SchedulerApiController; import io.airbyte.server.apis.SourceApiController; import io.airbyte.server.apis.SourceDefinitionApiController; +import io.airbyte.server.apis.SourceOauthApiController; +import io.airbyte.server.apis.StateApiController; import io.airbyte.server.apis.binders.AttemptApiBinder; import io.airbyte.server.apis.binders.ConnectionApiBinder; import io.airbyte.server.apis.binders.DbMigrationBinder; @@ -48,6 +50,7 @@ import io.airbyte.server.apis.binders.SourceApiBinder; import io.airbyte.server.apis.binders.SourceDefinitionApiBinder; import io.airbyte.server.apis.binders.SourceOauthApiBinder; +import io.airbyte.server.apis.binders.StateApiBinder; import io.airbyte.server.apis.factories.AttemptApiFactory; import io.airbyte.server.apis.factories.ConnectionApiFactory; import io.airbyte.server.apis.factories.DbMigrationApiFactory; @@ -65,6 +68,7 @@ import io.airbyte.server.apis.factories.SourceApiFactory; import io.airbyte.server.apis.factories.SourceDefinitionApiFactory; import io.airbyte.server.apis.factories.SourceOauthApiFactory; +import io.airbyte.server.apis.factories.StateApiFactory; import io.airbyte.server.handlers.AttemptHandler; import io.airbyte.server.handlers.ConnectionsHandler; import io.airbyte.server.handlers.DbMigrationHandler; @@ -79,6 +83,7 @@ import io.airbyte.server.handlers.SchedulerHandler; import io.airbyte.server.handlers.SourceDefinitionsHandler; import io.airbyte.server.handlers.SourceHandler; +import io.airbyte.server.handlers.StateHandler; import io.airbyte.server.handlers.WorkspacesHandler; import io.airbyte.server.scheduler.EventRunner; import io.airbyte.server.scheduler.SynchronousSchedulerClient; @@ -121,6 +126,7 @@ ServerRunnable create(final SynchronousSchedulerClient synchronousSchedulerClien final SchedulerHandler schedulerHandler, final SourceHandler sourceHandler, final SourceDefinitionsHandler sourceDefinitionsHandler, + final StateHandler stateHandler, final WorkspacesHandler workspacesHandler); class Api implements ServerFactory { @@ -156,6 +162,7 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul final SchedulerHandler schedulerHandler, final SourceHandler sourceHandler, final SourceDefinitionsHandler sourceDefinitionsHandler, + final StateHandler stateHandler, final WorkspacesHandler workspacesHandler) { final Map mdc = MDC.getCopyOfContextMap(); @@ -218,6 +225,8 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul SourceDefinitionApiFactory.setValues(sourceDefinitionsHandler); + StateApiFactory.setValues(stateHandler); + // server configurations final Set> componentClasses = Set.of( ConfigurationApi.class, @@ -237,7 +246,8 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul SchedulerApiController.class, SourceApiController.class, SourceDefinitionApiController.class, - SourceOauthApiFactory.class); + SourceOauthApiController.class, + StateApiController.class); final Set components = Set.of( new CorsFilter(), @@ -258,7 +268,8 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul new SchedulerApiBinder(), new SourceApiBinder(), new SourceDefinitionApiBinder(), - new SourceOauthApiBinder()); + new SourceOauthApiBinder(), + new StateApiBinder()); // construct server return new ServerApp(airbyteVersion, componentClasses, components); diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java index 6fad1d1b5ea8..b49acd4da06c 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java @@ -416,7 +416,7 @@ public void revokeSourceDefinitionFromWorkspace(final SourceDefinitionIdWithWork } @Override - public InternalOperationResult saveStats(SaveStatsRequestBody saveStatsRequestBody) { + public InternalOperationResult saveStats(final SaveStatsRequestBody saveStatsRequestBody) { throw new UnsupportedOperationException(); } @@ -948,9 +948,13 @@ public OperationRead createOperation(final OperationCreate operationCreate) { throw new NotImplementedException(); } + /** + * This implementation has been moved to {@link StateApiController}. Since the path of + * {@link StateApiController} is more granular, it will override this implementation + */ @Override public ConnectionState createOrUpdateState(final ConnectionStateCreateOrUpdate connectionStateCreateOrUpdate) { - return ConfigurationApi.execute(() -> stateHandler.createOrUpdateState(connectionStateCreateOrUpdate)); + throw new NotImplementedException(); } /** @@ -989,9 +993,13 @@ public OperationRead updateOperation(final OperationUpdate operationUpdate) { throw new NotImplementedException(); } + /** + * This implementation has been moved to {@link StateApiController}. Since the path of + * {@link StateApiController} is more granular, it will override this implementation + */ @Override public ConnectionState getState(final ConnectionIdRequestBody connectionIdRequestBody) { - return ConfigurationApi.execute(() -> stateHandler.getState(connectionIdRequestBody)); + throw new NotImplementedException(); } // SCHEDULER diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/StateApiController.java b/airbyte-server/src/main/java/io/airbyte/server/apis/StateApiController.java new file mode 100644 index 000000000000..4a317db98696 --- /dev/null +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/StateApiController.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.server.apis; + +import io.airbyte.api.generated.StateApi; +import io.airbyte.api.model.generated.ConnectionIdRequestBody; +import io.airbyte.api.model.generated.ConnectionState; +import io.airbyte.api.model.generated.ConnectionStateCreateOrUpdate; +import io.airbyte.server.handlers.StateHandler; +import javax.ws.rs.Path; +import lombok.AllArgsConstructor; + +@Path("/v1/state") +@AllArgsConstructor +public class StateApiController implements StateApi { + + private final StateHandler stateHandler; + + @Override + public ConnectionState createOrUpdateState(final ConnectionStateCreateOrUpdate connectionStateCreateOrUpdate) { + return ConfigurationApi.execute(() -> stateHandler.createOrUpdateState(connectionStateCreateOrUpdate)); + } + + @Override + public ConnectionState getState(final ConnectionIdRequestBody connectionIdRequestBody) { + return ConfigurationApi.execute(() -> stateHandler.getState(connectionIdRequestBody)); + } + +} diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/binders/StateApiBinder.java b/airbyte-server/src/main/java/io/airbyte/server/apis/binders/StateApiBinder.java new file mode 100644 index 000000000000..65ab669528c0 --- /dev/null +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/binders/StateApiBinder.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.server.apis.binders; + +import io.airbyte.server.apis.StateApiController; +import io.airbyte.server.apis.factories.StateApiFactory; +import org.glassfish.hk2.utilities.binding.AbstractBinder; +import org.glassfish.jersey.process.internal.RequestScoped; + +public class StateApiBinder extends AbstractBinder { + + @Override + protected void configure() { + bindFactory(StateApiFactory.class) + .to(StateApiController.class) + .in(RequestScoped.class); + } + +} diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/factories/StateApiFactory.java b/airbyte-server/src/main/java/io/airbyte/server/apis/factories/StateApiFactory.java new file mode 100644 index 000000000000..0498681d7629 --- /dev/null +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/factories/StateApiFactory.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.server.apis.factories; + +import io.airbyte.server.apis.StateApiController; +import io.airbyte.server.handlers.StateHandler; +import org.glassfish.hk2.api.Factory; + +public class StateApiFactory implements Factory { + + private static StateHandler stateHandler; + + public static void setValues(final StateHandler stateHandler) { + StateApiFactory.stateHandler = stateHandler; + } + + @Override + public StateApiController provide() { + return new StateApiController(StateApiFactory.stateHandler); + } + + @Override + public void dispose(final StateApiController instance) { + /* no op */ + } + +}