Skip to content

Commit

Permalink
Extract the AttemptApi out of the ConfigurationApi (#18406)
Browse files Browse the repository at this point in the history
* Tmp

* Extract the Attempt API from the V1 API

* Add comments

* format

* Rename to Controller
  • Loading branch information
benmoriceau authored Oct 25, 2022
1 parent 25e7a37 commit 62eb6b6
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
import io.airbyte.config.persistence.StatePersistence;
import io.airbyte.db.Database;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.server.apis.AttemptApiController;
import io.airbyte.server.apis.ConfigurationApi;
import io.airbyte.server.apis.binders.AttemptApiBinder;
import io.airbyte.server.apis.factories.AttemptApiFactory;
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.server.scheduler.SynchronousSchedulerClient;
import java.net.http.HttpClient;
Expand Down Expand Up @@ -82,9 +85,11 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
configsFlyway,
jobsFlyway);

AttemptApiFactory.setValues(jobPersistence, MDC.getCopyOfContextMap());

// server configurations
final Set<Class<?>> componentClasses = Set.of(ConfigurationApi.class);
final Set<Object> components = Set.of(new CorsFilter(), new ConfigurationApiBinder());
final Set<Class<?>> componentClasses = Set.of(ConfigurationApi.class, AttemptApiController.class);
final Set<Object> components = Set.of(new CorsFilter(), new ConfigurationApiBinder(), new AttemptApiBinder());

// construct server
return new ServerApp(airbyteVersion, componentClasses, components);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis;

import io.airbyte.api.generated.AttemptApi;
import io.airbyte.api.model.generated.InternalOperationResult;
import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.server.handlers.AttemptHandler;
import javax.ws.rs.Path;

@Path("/v1/attempt/set_workflow_in_attempt")
public class AttemptApiController implements AttemptApi {

private final AttemptHandler attemptHandler;

public AttemptApiController(final JobPersistence jobPersistence) {
attemptHandler = new AttemptHandler(jobPersistence);
}

@Override
public InternalOperationResult setWorkflowInAttempt(final SetWorkflowInAttemptRequestBody requestBody) {
return ConfigurationApi.execute(() -> attemptHandler.setWorkflowInAttempt(requestBody));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.server.errors.BadObjectSchemaKnownException;
import io.airbyte.server.errors.IdNotFoundKnownException;
import io.airbyte.server.handlers.AttemptHandler;
import io.airbyte.server.handlers.ConnectionsHandler;
import io.airbyte.server.handlers.DbMigrationHandler;
import io.airbyte.server.handlers.DestinationDefinitionsHandler;
Expand Down Expand Up @@ -141,6 +140,7 @@
import java.nio.file.Path;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.NotImplementedException;
import org.flywaydb.core.Flyway;

@javax.ws.rs.Path("/v1")
Expand All @@ -164,7 +164,6 @@ public class ConfigurationApi implements io.airbyte.api.generated.V1Api {
private final OpenApiConfigHandler openApiConfigHandler;
private final DbMigrationHandler dbMigrationHandler;
private final OAuthHandler oAuthHandler;
private final AttemptHandler attemptHandler;
private final WorkerEnvironment workerEnvironment;
private final LogConfigs logConfigs;
private final Path workspaceRoot;
Expand Down Expand Up @@ -249,7 +248,6 @@ public ConfigurationApi(final ConfigRepository configRepository,
logsHandler = new LogsHandler();
openApiConfigHandler = new OpenApiConfigHandler();
dbMigrationHandler = new DbMigrationHandler(configsDatabase, configsFlyway, jobsDatabase, jobsFlyway);
attemptHandler = new AttemptHandler(jobPersistence);
}

// WORKSPACE
Expand Down Expand Up @@ -433,6 +431,15 @@ public void setInstancewideSourceOauthParams(final SetInstancewideSourceOauthPar
});
}

/**
* This implementation has been moved to {@link AttemptApiController}. Since the path of
* {@link AttemptApiController} is more granular, it will override this implementation
*/
@Override
public InternalOperationResult setWorkflowInAttempt(final SetWorkflowInAttemptRequestBody setWorkflowInAttemptRequestBody) {
throw new NotImplementedException();
}

// SOURCE IMPLEMENTATION

@Override
Expand Down Expand Up @@ -835,12 +842,8 @@ public WebBackendWorkspaceStateResult webBackendGetWorkspaceState(final WebBacke
return execute(() -> webBackendConnectionsHandler.getWorkspaceState(webBackendWorkspaceState));
}

@Override
public InternalOperationResult setWorkflowInAttempt(final SetWorkflowInAttemptRequestBody requestBody) {
return execute(() -> attemptHandler.setWorkflowInAttempt(requestBody));
}

private static <T> T execute(final HandlerCall<T> call) {
// TODO: Move to common when all the api are moved
static <T> T execute(final HandlerCall<T> call) {
try {
return call.call();
} catch (final ConfigNotFoundException e) {
Expand All @@ -854,7 +857,7 @@ private static <T> T execute(final HandlerCall<T> call) {
}
}

private interface HandlerCall<T> {
interface HandlerCall<T> {

T call() throws ConfigNotFoundException, IOException, JsonValidationException;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis.binders;

import io.airbyte.server.apis.AttemptApiController;
import io.airbyte.server.apis.factories.AttemptApiFactory;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.process.internal.RequestScoped;

public class AttemptApiBinder extends AbstractBinder {

@Override
protected void configure() {
bindFactory(AttemptApiFactory.class)
.to(AttemptApiController.class)
.in(RequestScoped.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis.factories;

import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.server.apis.AttemptApiController;
import java.util.Map;
import org.glassfish.hk2.api.Factory;
import org.slf4j.MDC;

public class AttemptApiFactory implements Factory<AttemptApiController> {

private static JobPersistence jobPersistence;
private static Map<String, String> mdc;

public static void setValues(final JobPersistence jobPersistence, final Map<String, String> mdc) {
AttemptApiFactory.jobPersistence = jobPersistence;
AttemptApiFactory.mdc = mdc;
}

@Override
public AttemptApiController provide() {
MDC.setContextMap(AttemptApiFactory.mdc);

return new AttemptApiController(jobPersistence);
}

@Override
public void dispose(final AttemptApiController instance) {
/* no op */
}

}

0 comments on commit 62eb6b6

Please sign in to comment.