Skip to content

Commit

Permalink
Cleanup (#20459)
Browse files Browse the repository at this point in the history
* Cleanup

* More cleanup

* Disable in order to test cloud

* Restore missing files

* Fix test

* Format and fix pmd

* Add transactional

* Fix version

* Tentative

* Cleanup the cleanup

* Rm reference to the micronaut server

* format

* pmd

* more pmd

* fix build
  • Loading branch information
benmoriceau authored Dec 21, 2022
1 parent c0682a3 commit 5d13c4c
Show file tree
Hide file tree
Showing 56 changed files with 237 additions and 393 deletions.
1 change: 0 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.35.15.001
### AIRBYTE SERVICES ###
TEMPORAL_HOST=airbyte-temporal:7233
INTERNAL_API_HOST=airbyte-server:8001
INTERNAL_MICRONAUT_API_HOST=airbyte-server:8080
CONNECTOR_BUILDER_API_HOST=airbyte-connector-builder-server:80
WEBAPP_URL=http://localhost:8000/
# Although not present as an env var, required for webapp configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,21 @@ public class AirbyteApiClient {
private final AttemptApi attemptApi;
private final StateApi stateApi;

public AirbyteApiClient(final ApiClient apiClient, final ApiClient micronautApiClient) {
connectionApi = new ConnectionApi(micronautApiClient);
destinationDefinitionApi = new DestinationDefinitionApi(micronautApiClient);
destinationApi = new DestinationApi(micronautApiClient);
destinationSpecificationApi = new DestinationDefinitionSpecificationApi(micronautApiClient);
jobsApi = new JobsApi(micronautApiClient);
logsApi = new PatchedLogsApi(micronautApiClient);
operationApi = new OperationApi(micronautApiClient);
sourceDefinitionApi = new SourceDefinitionApi(micronautApiClient);
sourceApi = new SourceApi(micronautApiClient);
sourceDefinitionSpecificationApi = new SourceDefinitionSpecificationApi(micronautApiClient);
workspaceApi = new WorkspaceApi(micronautApiClient);
healthApi = new HealthApi(micronautApiClient);
attemptApi = new AttemptApi(micronautApiClient);
stateApi = new StateApi(micronautApiClient);
public AirbyteApiClient(final ApiClient apiClient) {
connectionApi = new ConnectionApi(apiClient);
destinationDefinitionApi = new DestinationDefinitionApi(apiClient);
destinationApi = new DestinationApi(apiClient);
destinationSpecificationApi = new DestinationDefinitionSpecificationApi(apiClient);
jobsApi = new JobsApi(apiClient);
logsApi = new PatchedLogsApi(apiClient);
operationApi = new OperationApi(apiClient);
sourceDefinitionApi = new SourceDefinitionApi(apiClient);
sourceApi = new SourceApi(apiClient);
sourceDefinitionSpecificationApi = new SourceDefinitionSpecificationApi(apiClient);
workspaceApi = new WorkspaceApi(apiClient);
healthApi = new HealthApi(apiClient);
attemptApi = new AttemptApi(apiClient);
stateApi = new StateApi(apiClient);
}

public ConnectionApi getConnectionApi() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public ConfigRepository(final Database database) {
*/
public boolean healthCheck() {
try {
database.query(ctx -> ctx.select(WORKSPACE.ID).from(WORKSPACE).limit(1).fetch());
database.query(ctx -> ctx.select(WORKSPACE.ID).from(WORKSPACE).limit(1).fetch()).stream().count();
} catch (final Exception e) {
LOGGER.error("Health check error: ", e);
return false;
Expand Down Expand Up @@ -290,7 +290,8 @@ private Stream<StandardSourceDefinition> sourceDefQuery(final Optional<UUID> sou
.where(ACTOR_DEFINITION.ACTOR_TYPE.eq(ActorType.source))
.and(sourceDefId.map(ACTOR_DEFINITION.ID::eq).orElse(noCondition()))
.and(includeTombstone ? noCondition() : ACTOR_DEFINITION.TOMBSTONE.notEqual(true))
.fetchStream())
.fetch())
.stream()
.map(DbConverter::buildStandardSourceDefinition)
// Ensure version is set. Needed for connectors not upgraded since we added versioning.
.map(def -> def.withProtocolVersion(AirbyteProtocolVersion.getWithDefault(def.getProtocolVersion()).serialize()));
Expand Down Expand Up @@ -352,7 +353,8 @@ private Stream<StandardDestinationDefinition> destDefQuery(final Optional<UUID>
.where(ACTOR_DEFINITION.ACTOR_TYPE.eq(ActorType.destination))
.and(destDefId.map(ACTOR_DEFINITION.ID::eq).orElse(noCondition()))
.and(includeTombstone ? noCondition() : ACTOR_DEFINITION.TOMBSTONE.notEqual(true))
.fetchStream())
.fetch())
.stream()
.map(DbConverter::buildStandardDestinationDefinition)
// Ensure version is set. Needed for connectors not upgraded since we added versioning.
.map(def -> def.withProtocolVersion(AirbyteProtocolVersion.getWithDefault(def.getProtocolVersion()).serialize()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ private static Stream<Record4<UUID, String, ActorType, String>> getActorDefiniti
return ctx.select(ACTOR_DEFINITION.ID, ACTOR_DEFINITION.DOCKER_REPOSITORY, ACTOR_DEFINITION.ACTOR_TYPE, ACTOR_DEFINITION.PROTOCOL_VERSION)
.from(ACTOR_DEFINITION)
.join(ACTOR).on(ACTOR.ACTOR_DEFINITION_ID.equal(ACTOR_DEFINITION.ID))
.fetchStream();
.fetch()
.stream();
}

static void writeStandardSourceDefinition(final List<StandardSourceDefinition> configs, final DSLContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ private Stream<StandardSyncIdsWithProtocolVersions> findDisabledSyncs(final DSLC
.where(
CONNECTION.UNSUPPORTED_PROTOCOL_VERSION.eq(true).and(
(actorType == ActorType.DESTINATION ? destDef : sourceDef).ID.eq(actorDefId)))
.fetchStream()
.fetch()
.stream()
.map(r -> new StandardSyncIdsWithProtocolVersions(
r.get(CONNECTION.ID),
r.get(sourceDef.ID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ private Set<StandardSyncProtocolVersionFlag> getProtocolVersionFlagForSyncs(fina
.select(CONNECTION.ID, CONNECTION.UNSUPPORTED_PROTOCOL_VERSION)
.from(CONNECTION)
.where(CONNECTION.ID.in(standardSync.stream().map(StandardSync::getConnectionId).toList()))
.fetchStream())
.fetch())
.stream()
.map(r -> new StandardSyncProtocolVersionFlag(r.get(CONNECTION.ID), r.get(CONNECTION.UNSUPPORTED_PROTOCOL_VERSION)))
.collect(Collectors.toSet());
}
Expand Down
1 change: 0 additions & 1 deletion airbyte-proxy/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ RUN apt-get update -y && apt-get install -y apache2-utils && rm -rf /var/lib/apt
# This variable can be used to update the destination containers that Nginx proxies to.
ENV PROXY_PASS_WEB "http://airbyte-webapp:80"
ENV PROXY_PASS_API "http://airbyte-server:8001"
ENV PROXY_PASS_MICRONAUT_API "http://airbyte-server:8080"
ENV CONNECTOR_BUILDER_SERVER_API "http://airbyte-connector-builder-server:80"

# Nginx config file
Expand Down
68 changes: 0 additions & 68 deletions airbyte-proxy/nginx-auth.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,6 @@ http {
auth_basic off;
}
}

location ~ ^/api/v1/(health|openapi) {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

auth_basic "Welcome to Airbyte";
auth_basic_user_file /etc/nginx/.htpasswd;

proxy_pass "${PROXY_PASS_MICRONAUT_API}";

error_page 401 /etc/nginx/401.html;
location ~ (401.html)$ {
alias /etc/nginx/$1;
auth_basic off;
}
}

location ~ ^/api/v1/(connections|destinations|destination_definitions|destination_definition_specifications|destination_oauths|jobs|logs|notifications|operations|scheduler|source_oauths|sources|source_definitions|source_definition_specifications|state|web_backend|workspaces)/.* {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

auth_basic "Welcome to Airbyte";
auth_basic_user_file /etc/nginx/.htpasswd;

proxy_pass "${PROXY_PASS_MICRONAUT_API}";

error_page 401 /etc/nginx/401.html;
location ~ (401.html)$ {
alias /etc/nginx/$1;
auth_basic off;
}
}
}

server {
Expand Down Expand Up @@ -89,40 +55,6 @@ http {
auth_basic off;
}
}

location ~ ^/api/v1/(health|openapi) {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

auth_basic "Welcome to Airbyte";
auth_basic_user_file /etc/nginx/.htpasswd;

proxy_pass "${PROXY_PASS_MICRONAUT_API}";

error_page 401 /etc/nginx/401.html;
location ~ (401.html)$ {
alias /etc/nginx/$1;
auth_basic off;
}
}

location ~ ^/api/v1/(connections|destinations|destination_definitions|destination_definition_specifications|destination_oauths|jobs|logs|notifications|operations|scheduler|source_oauths|sources|source_definitions|source_definition_specifications|state|web_backend|workspaces)/.* {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

auth_basic "Welcome to Airbyte";
auth_basic_user_file /etc/nginx/.htpasswd;

proxy_pass "${PROXY_PASS_MICRONAUT_API}";

error_page 401 /etc/nginx/401.html;
location ~ (401.html)$ {
alias /etc/nginx/$1;
auth_basic off;
}
}
}

server {
Expand Down
32 changes: 0 additions & 32 deletions airbyte-proxy/nginx-no-auth.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,6 @@ http {
proxy_read_timeout ${BASIC_AUTH_PROXY_TIMEOUT};
send_timeout ${BASIC_AUTH_PROXY_TIMEOUT};
}

location ~ ^/api/v1/(health|openapi) {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

proxy_pass "${PROXY_PASS_MICRONAUT_API}";
}

location ~ ^/api/v1/(connections|destinations|destination_definitions|destination_definition_specifications|destination_oauths|jobs|logs|notifications|operations|scheduler|source_oauths|sources|source_definitions|source_definition_specifications|state|web_backend|workspaces)/.* {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

proxy_pass "${PROXY_PASS_MICRONAUT_API}";
}
}

server {
Expand All @@ -53,22 +37,6 @@ http {
proxy_read_timeout ${BASIC_AUTH_PROXY_TIMEOUT};
send_timeout ${BASIC_AUTH_PROXY_TIMEOUT};
}

location ~ ^/api/v1/(health|openapi) {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

proxy_pass "${PROXY_PASS_MICRONAUT_API}";
}

location ~ ^/api/v1/(connections|destinations|destination_definitions|destination_definition_specifications|destination_oauths|jobs|logs|notifications|operations|scheduler|source_oauths|sources|source_definitions|source_definition_specifications|state|web_backend|workspaces)/.* {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

proxy_pass "${PROXY_PASS_MICRONAUT_API}";
}
}

server {
Expand Down
2 changes: 1 addition & 1 deletion airbyte-proxy/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ else
TEMPLATE_PATH="/etc/nginx/templates/nginx-auth.conf.template"
fi

envsubst '${PROXY_PASS_WEB} ${PROXY_PASS_API} ${PROXY_PASS_MICRONAUT_API} ${CONNECTOR_BUILDER_SERVER_API} ${PROXY_PASS_RESOLVER} ${BASIC_AUTH_PROXY_TIMEOUT}' < $TEMPLATE_PATH > /etc/nginx/nginx.conf
envsubst '${PROXY_PASS_WEB} ${PROXY_PASS_API} ${CONNECTOR_BUILDER_SERVER_API} ${PROXY_PASS_RESOLVER} ${BASIC_AUTH_PROXY_TIMEOUT}' < $TEMPLATE_PATH > /etc/nginx/nginx.conf

echo "starting nginx..."
nginx -v
Expand Down
4 changes: 2 additions & 2 deletions airbyte-proxy/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ VERSION="${VERSION:-dev}" # defaults to "dev", otherwise it is set by environmen
echo "testing with proxy container airbyte/proxy:$VERSION"

function start_container () {
CMD="docker run -d -p $PORT:8000 --env BASIC_AUTH_USERNAME=$1 --env BASIC_AUTH_PASSWORD=$2 --env BASIC_AUTH_PROXY_TIMEOUT=$3 --env PROXY_PASS_WEB=http://localhost --env PROXY_PASS_API=http://localhost --env PROXY_PASS_MICRONAUT_API=http://localhost --env CONNECTOR_BUILDER_SERVER_API=http://localhost --name $NAME airbyte/proxy:$VERSION"
CMD="docker run -d -p $PORT:8000 --env BASIC_AUTH_USERNAME=$1 --env BASIC_AUTH_PASSWORD=$2 --env BASIC_AUTH_PROXY_TIMEOUT=$3 --env PROXY_PASS_WEB=http://localhost --env PROXY_PASS_API=http://localhost --env CONNECTOR_BUILDER_SERVER_API=http://localhost --name $NAME airbyte/proxy:$VERSION"
echo $CMD
eval $CMD
wait_for_docker;
}

function start_container_with_proxy () {
CMD="docker run -d -p $PORT:8000 --env PROXY_PASS_WEB=$1 --env PROXY_PASS_API=$1 --env PROXY_PASS_MICRONAUT_API=$1 --name $NAME
CMD="docker run -d -p $PORT:8000 --env PROXY_PASS_WEB=$1 --env PROXY_PASS_API=$1 --name $NAME
airbyte/proxy:$VERSION"
echo $CMD
eval $CMD
Expand Down
75 changes: 0 additions & 75 deletions airbyte-server/src/main/java/io/airbyte/server/EventListener.java

This file was deleted.

18 changes: 15 additions & 3 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.commons.temporal.TemporalClient;
import io.airbyte.commons.temporal.TemporalUtils;
import io.airbyte.commons.temporal.TemporalWorkflowUtils;
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs;
import io.airbyte.config.helpers.LogClientSingleton;
Expand Down Expand Up @@ -71,6 +72,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
Expand Down Expand Up @@ -280,8 +282,17 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
connectionsHandler,
envVariableFeatureFlags);

final DestinationDefinitionsHandler destinationDefinitionsHandler = new DestinationDefinitionsHandler(configRepository, syncSchedulerClient,
destinationHandler);
final AirbyteProtocolVersionRange airbyteProtocolVersionRange = new AirbyteProtocolVersionRange(configs.getAirbyteProtocolVersionMin(),
configs.getAirbyteProtocolVersionMax());

final AirbyteGithubStore airbyteGithubStore = AirbyteGithubStore.production();

final DestinationDefinitionsHandler destinationDefinitionsHandler = new DestinationDefinitionsHandler(configRepository,
() -> UUID.randomUUID(),
syncSchedulerClient,
airbyteGithubStore,
destinationHandler,
airbyteProtocolVersionRange);

final HealthCheckHandler healthCheckHandler = new HealthCheckHandler(configRepository);

Expand All @@ -295,7 +306,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
connectionsHandler);

final SourceDefinitionsHandler sourceDefinitionsHandler =
new SourceDefinitionsHandler(configRepository, syncSchedulerClient, sourceHandler, configs);
new SourceDefinitionsHandler(configRepository, () -> UUID.randomUUID(), syncSchedulerClient, airbyteGithubStore, sourceHandler,
airbyteProtocolVersionRange);

final JobHistoryHandler jobHistoryHandler = new JobHistoryHandler(
jobPersistence,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import io.micronaut.http.annotation.Post;
import java.io.File;

@Controller("/v1/logs")
@Controller("/api/v1/logs")
@Context
public class LogsApiController implements LogsApi {

Expand Down
Loading

0 comments on commit 5d13c4c

Please sign in to comment.