Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bmoric/extract connection api #18409

Merged
merged 18 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
import io.airbyte.config.persistence.StatePersistence;
import io.airbyte.db.Database;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.server.apis.AttemptApiImpl;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should discuss naming conventions on these classes, we (outside of this pr) seem to have a mix of DefaultInterface and InterfaceImpl. Personally I lean towards the Default prefix, but more I want consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forget to change the base, sorry about that. @jdpgrailsdev had a similar comment and I renamed it to Controller

import io.airbyte.server.apis.ConfigurationApi;
import io.airbyte.server.apis.ConnectionApiImpl;
import io.airbyte.server.apis.binders.AttemptApiBinder;
import io.airbyte.server.apis.binders.ConnectionApiBinder;
import io.airbyte.server.apis.factories.AttemptApiFactory;
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.server.scheduler.SynchronousSchedulerClient;
import java.net.http.HttpClient;
Expand Down Expand Up @@ -82,9 +87,11 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
configsFlyway,
jobsFlyway);

AttemptApiFactory.setValues(jobPersistence, MDC.getCopyOfContextMap());

// server configurations
final Set<Class<?>> componentClasses = Set.of(ConfigurationApi.class);
final Set<Object> components = Set.of(new CorsFilter(), new ConfigurationApiBinder());
final Set<Class<?>> componentClasses = Set.of(ConfigurationApi.class, AttemptApiImpl.class, ConnectionApiImpl.class);
final Set<Object> components = Set.of(new CorsFilter(), new ConfigurationApiBinder(), new AttemptApiBinder(), new ConnectionApiBinder());

// construct server
return new ServerApp(airbyteVersion, componentClasses, components);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis;

import io.airbyte.api.generated.AttemptApi;
import io.airbyte.api.model.generated.InternalOperationResult;
import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.server.handlers.AttemptHandler;
import javax.ws.rs.Path;

@Path("/v1/attempt/set_workflow_in_attempt")
public class AttemptApiImpl implements AttemptApi {

private final AttemptHandler attemptHandler;

public AttemptApiImpl(final JobPersistence jobPersistence) {
attemptHandler = new AttemptHandler(jobPersistence);
}

@Override
public InternalOperationResult setWorkflowInAttempt(final SetWorkflowInAttemptRequestBody requestBody) {
return ConfigurationApi.execute(() -> attemptHandler.setWorkflowInAttempt(requestBody));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.server.errors.BadObjectSchemaKnownException;
import io.airbyte.server.errors.IdNotFoundKnownException;
import io.airbyte.server.handlers.AttemptHandler;
import io.airbyte.server.handlers.ConnectionsHandler;
import io.airbyte.server.handlers.DbMigrationHandler;
import io.airbyte.server.handlers.DestinationDefinitionsHandler;
Expand Down Expand Up @@ -141,6 +140,7 @@
import java.nio.file.Path;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.NotImplementedException;
import org.flywaydb.core.Flyway;

@javax.ws.rs.Path("/v1")
Expand All @@ -164,7 +164,6 @@ public class ConfigurationApi implements io.airbyte.api.generated.V1Api {
private final OpenApiConfigHandler openApiConfigHandler;
private final DbMigrationHandler dbMigrationHandler;
private final OAuthHandler oAuthHandler;
private final AttemptHandler attemptHandler;
private final WorkerEnvironment workerEnvironment;
private final LogConfigs logConfigs;
private final Path workspaceRoot;
Expand Down Expand Up @@ -249,7 +248,6 @@ public ConfigurationApi(final ConfigRepository configRepository,
logsHandler = new LogsHandler();
openApiConfigHandler = new OpenApiConfigHandler();
dbMigrationHandler = new DbMigrationHandler(configsDatabase, configsFlyway, jobsDatabase, jobsFlyway);
attemptHandler = new AttemptHandler(jobPersistence);
}

// WORKSPACE
Expand Down Expand Up @@ -433,6 +431,15 @@ public void setInstancewideSourceOauthParams(final SetInstancewideSourceOauthPar
});
}

/**
* This implementation has been moved to {@link AttemptApiImpl}. Since the path of
* {@link AttemptApiImpl} is more granular, it will override this implementation
*/
@Override
public InternalOperationResult setWorkflowInAttempt(final SetWorkflowInAttemptRequestBody setWorkflowInAttemptRequestBody) {
throw new NotImplementedException();
}

// SOURCE IMPLEMENTATION

@Override
Expand Down Expand Up @@ -645,53 +652,85 @@ public CheckConnectionRead checkConnectionToDestinationForUpdate(final Destinati

// CONNECTION

/**
* This implementation has been moved to {@link ConnectionApiImpl}. Since the path of
* {@link ConnectionApiImpl} is more granular, it will override this implementation
*/
@Override
public ConnectionRead createConnection(final ConnectionCreate connectionCreate) {
return execute(() -> connectionsHandler.createConnection(connectionCreate));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiImpl}. Since the path of
* {@link ConnectionApiImpl} is more granular, it will override this implementation
*/
@Override
public ConnectionRead updateConnection(final ConnectionUpdate connectionUpdate) {
return execute(() -> connectionsHandler.updateConnection(connectionUpdate));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiImpl}. Since the path of
* {@link ConnectionApiImpl} is more granular, it will override this implementation
*/
@Override
public ConnectionReadList listConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) {
return execute(() -> connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiImpl}. Since the path of
* {@link ConnectionApiImpl} is more granular, it will override this implementation
*/
@Override
public ConnectionReadList listAllConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) {
return execute(() -> connectionsHandler.listAllConnectionsForWorkspace(workspaceIdRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiImpl}. Since the path of
* {@link ConnectionApiImpl} is more granular, it will override this implementation
*/
@Override
public ConnectionReadList searchConnections(final ConnectionSearch connectionSearch) {
return execute(() -> connectionsHandler.searchConnections(connectionSearch));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiImpl}. Since the path of
* {@link ConnectionApiImpl} is more granular, it will override this implementation
*/
@Override
public ConnectionRead getConnection(final ConnectionIdRequestBody connectionIdRequestBody) {
return execute(() -> connectionsHandler.getConnection(connectionIdRequestBody.getConnectionId()));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiImpl}. Since the path of
* {@link ConnectionApiImpl} is more granular, it will override this implementation
*/
@Override
public void deleteConnection(final ConnectionIdRequestBody connectionIdRequestBody) {
execute(() -> {
operationsHandler.deleteOperationsForConnection(connectionIdRequestBody);
connectionsHandler.deleteConnection(connectionIdRequestBody.getConnectionId());
return null;
});
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiImpl}. Since the path of
* {@link ConnectionApiImpl} is more granular, it will override this implementation
*/
@Override
public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody) {
return execute(() -> schedulerHandler.syncConnection(connectionIdRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link ConnectionApiImpl}. Since the path of
* {@link ConnectionApiImpl} is more granular, it will override this implementation
*/
@Override
public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdRequestBody) {
return execute(() -> schedulerHandler.resetConnection(connectionIdRequestBody));
throw new NotImplementedException();
}

// Operations
Expand All @@ -706,9 +745,13 @@ public OperationRead createOperation(final OperationCreate operationCreate) {
return execute(() -> operationsHandler.createOperation(operationCreate));
}

/**
* This implementation has been moved to {@link ConnectionApiImpl}. Since the path of
* {@link ConnectionApiImpl} is more granular, it will override this implementation
*/
@Override
public ConnectionState createOrUpdateState(final ConnectionStateCreateOrUpdate connectionStateCreateOrUpdate) {
return execute(() -> stateHandler.createOrUpdateState(connectionStateCreateOrUpdate));
throw new NotImplementedException();
}

@Override
Expand All @@ -734,9 +777,13 @@ public OperationRead updateOperation(final OperationUpdate operationUpdate) {
return execute(() -> operationsHandler.updateOperation(operationUpdate));
}

/**
* This implementation has been moved to {@link ConnectionApiImpl}. Since the path of
* {@link ConnectionApiImpl} is more granular, it will override this implementation
*/
@Override
public ConnectionState getState(final ConnectionIdRequestBody connectionIdRequestBody) {
return execute(() -> stateHandler.getState(connectionIdRequestBody));
throw new NotImplementedException();
}

// SCHEDULER
Expand Down Expand Up @@ -825,22 +872,22 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
return execute(() -> webBackendConnectionsHandler.webBackendUpdateConnection(webBackendConnectionUpdate));
}

/**
* This implementation has been moved to {@link ConnectionApiImpl}. Since the path of
* {@link ConnectionApiImpl} is more granular, it will override this implementation
*/
@Override
public ConnectionStateType getStateType(final ConnectionIdRequestBody connectionIdRequestBody) {
return execute(() -> webBackendConnectionsHandler.getStateType(connectionIdRequestBody));
throw new NotImplementedException();
}

@Override
public WebBackendWorkspaceStateResult webBackendGetWorkspaceState(final WebBackendWorkspaceState webBackendWorkspaceState) {
return execute(() -> webBackendConnectionsHandler.getWorkspaceState(webBackendWorkspaceState));
}

@Override
public InternalOperationResult setWorkflowInAttempt(final SetWorkflowInAttemptRequestBody requestBody) {
return execute(() -> attemptHandler.setWorkflowInAttempt(requestBody));
}

private static <T> T execute(final HandlerCall<T> call) {
// TODO: Move to common when all the api are moved
static <T> T execute(final HandlerCall<T> call) {
try {
return call.call();
} catch (final ConfigNotFoundException e) {
Expand All @@ -854,7 +901,7 @@ private static <T> T execute(final HandlerCall<T> call) {
}
}

private interface HandlerCall<T> {
interface HandlerCall<T> {

T call() throws ConfigNotFoundException, IOException, JsonValidationException;

Expand Down
Loading