Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup #20459

Merged
merged 16 commits into from
Dec 21, 2022
1 change: 0 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.35.15.001
### AIRBYTE SERVICES ###
TEMPORAL_HOST=airbyte-temporal:7233
INTERNAL_API_HOST=airbyte-server:8001
INTERNAL_MICRONAUT_API_HOST=airbyte-server:8080
CONNECTOR_BUILDER_API_HOST=airbyte-connector-builder-server:80
WEBAPP_URL=http://localhost:8000/
# Although not present as an env var, required for webapp configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,21 @@ public class AirbyteApiClient {
private final AttemptApi attemptApi;
private final StateApi stateApi;

public AirbyteApiClient(final ApiClient apiClient, final ApiClient micronautApiClient) {
connectionApi = new ConnectionApi(micronautApiClient);
destinationDefinitionApi = new DestinationDefinitionApi(micronautApiClient);
destinationApi = new DestinationApi(micronautApiClient);
destinationSpecificationApi = new DestinationDefinitionSpecificationApi(micronautApiClient);
jobsApi = new JobsApi(micronautApiClient);
logsApi = new PatchedLogsApi(micronautApiClient);
operationApi = new OperationApi(micronautApiClient);
sourceDefinitionApi = new SourceDefinitionApi(micronautApiClient);
sourceApi = new SourceApi(micronautApiClient);
sourceDefinitionSpecificationApi = new SourceDefinitionSpecificationApi(micronautApiClient);
workspaceApi = new WorkspaceApi(micronautApiClient);
healthApi = new HealthApi(micronautApiClient);
attemptApi = new AttemptApi(micronautApiClient);
stateApi = new StateApi(micronautApiClient);
public AirbyteApiClient(final ApiClient apiClient) {
connectionApi = new ConnectionApi(apiClient);
destinationDefinitionApi = new DestinationDefinitionApi(apiClient);
destinationApi = new DestinationApi(apiClient);
destinationSpecificationApi = new DestinationDefinitionSpecificationApi(apiClient);
jobsApi = new JobsApi(apiClient);
logsApi = new PatchedLogsApi(apiClient);
operationApi = new OperationApi(apiClient);
sourceDefinitionApi = new SourceDefinitionApi(apiClient);
sourceApi = new SourceApi(apiClient);
sourceDefinitionSpecificationApi = new SourceDefinitionSpecificationApi(apiClient);
workspaceApi = new WorkspaceApi(apiClient);
healthApi = new HealthApi(apiClient);
attemptApi = new AttemptApi(apiClient);
stateApi = new StateApi(apiClient);
}

public ConnectionApi getConnectionApi() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public ConfigRepository(final Database database) {
*/
public boolean healthCheck() {
try {
database.query(ctx -> ctx.select(WORKSPACE.ID).from(WORKSPACE).limit(1).fetch());
database.query(ctx -> ctx.select(WORKSPACE.ID).from(WORKSPACE).limit(1).fetch()).stream().count();
} catch (final Exception e) {
LOGGER.error("Health check error: ", e);
return false;
Expand Down Expand Up @@ -290,7 +290,8 @@ private Stream<StandardSourceDefinition> sourceDefQuery(final Optional<UUID> sou
.where(ACTOR_DEFINITION.ACTOR_TYPE.eq(ActorType.source))
.and(sourceDefId.map(ACTOR_DEFINITION.ID::eq).orElse(noCondition()))
.and(includeTombstone ? noCondition() : ACTOR_DEFINITION.TOMBSTONE.notEqual(true))
.fetchStream())
.fetch())
.stream()
.map(DbConverter::buildStandardSourceDefinition)
// Ensure version is set. Needed for connectors not upgraded since we added versioning.
.map(def -> def.withProtocolVersion(AirbyteProtocolVersion.getWithDefault(def.getProtocolVersion()).serialize()));
Expand Down Expand Up @@ -352,7 +353,8 @@ private Stream<StandardDestinationDefinition> destDefQuery(final Optional<UUID>
.where(ACTOR_DEFINITION.ACTOR_TYPE.eq(ActorType.destination))
.and(destDefId.map(ACTOR_DEFINITION.ID::eq).orElse(noCondition()))
.and(includeTombstone ? noCondition() : ACTOR_DEFINITION.TOMBSTONE.notEqual(true))
.fetchStream())
.fetch())
.stream()
.map(DbConverter::buildStandardDestinationDefinition)
// Ensure version is set. Needed for connectors not upgraded since we added versioning.
.map(def -> def.withProtocolVersion(AirbyteProtocolVersion.getWithDefault(def.getProtocolVersion()).serialize()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ private static Stream<Record4<UUID, String, ActorType, String>> getActorDefiniti
return ctx.select(ACTOR_DEFINITION.ID, ACTOR_DEFINITION.DOCKER_REPOSITORY, ACTOR_DEFINITION.ACTOR_TYPE, ACTOR_DEFINITION.PROTOCOL_VERSION)
.from(ACTOR_DEFINITION)
.join(ACTOR).on(ACTOR.ACTOR_DEFINITION_ID.equal(ACTOR_DEFINITION.ID))
.fetchStream();
.fetch()
.stream();
}

static void writeStandardSourceDefinition(final List<StandardSourceDefinition> configs, final DSLContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ private Stream<StandardSyncIdsWithProtocolVersions> findDisabledSyncs(final DSLC
.where(
CONNECTION.UNSUPPORTED_PROTOCOL_VERSION.eq(true).and(
(actorType == ActorType.DESTINATION ? destDef : sourceDef).ID.eq(actorDefId)))
.fetchStream()
.fetch()
.stream()
.map(r -> new StandardSyncIdsWithProtocolVersions(
r.get(CONNECTION.ID),
r.get(sourceDef.ID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ private Set<StandardSyncProtocolVersionFlag> getProtocolVersionFlagForSyncs(fina
.select(CONNECTION.ID, CONNECTION.UNSUPPORTED_PROTOCOL_VERSION)
.from(CONNECTION)
.where(CONNECTION.ID.in(standardSync.stream().map(StandardSync::getConnectionId).toList()))
.fetchStream())
.fetch())
.stream()
.map(r -> new StandardSyncProtocolVersionFlag(r.get(CONNECTION.ID), r.get(CONNECTION.UNSUPPORTED_PROTOCOL_VERSION)))
.collect(Collectors.toSet());
}
Expand Down
1 change: 0 additions & 1 deletion airbyte-proxy/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ RUN apt-get update -y && apt-get install -y apache2-utils && rm -rf /var/lib/apt
# This variable can be used to update the destination containers that Nginx proxies to.
ENV PROXY_PASS_WEB "http://airbyte-webapp:80"
ENV PROXY_PASS_API "http://airbyte-server:8001"
ENV PROXY_PASS_MICRONAUT_API "http://airbyte-server:8080"
ENV CONNECTOR_BUILDER_SERVER_API "http://airbyte-connector-builder-server:80"

# Nginx config file
Expand Down
68 changes: 0 additions & 68 deletions airbyte-proxy/nginx-auth.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,6 @@ http {
auth_basic off;
}
}

location ~ ^/api/v1/(health|openapi) {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

auth_basic "Welcome to Airbyte";
auth_basic_user_file /etc/nginx/.htpasswd;

proxy_pass "${PROXY_PASS_MICRONAUT_API}";

error_page 401 /etc/nginx/401.html;
location ~ (401.html)$ {
alias /etc/nginx/$1;
auth_basic off;
}
}

location ~ ^/api/v1/(connections|destinations|destination_definitions|destination_definition_specifications|destination_oauths|jobs|logs|notifications|operations|scheduler|source_oauths|sources|source_definitions|source_definition_specifications|state|web_backend|workspaces)/.* {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

auth_basic "Welcome to Airbyte";
auth_basic_user_file /etc/nginx/.htpasswd;

proxy_pass "${PROXY_PASS_MICRONAUT_API}";

error_page 401 /etc/nginx/401.html;
location ~ (401.html)$ {
alias /etc/nginx/$1;
auth_basic off;
}
}
}

server {
Expand Down Expand Up @@ -89,40 +55,6 @@ http {
auth_basic off;
}
}

location ~ ^/api/v1/(health|openapi) {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

auth_basic "Welcome to Airbyte";
auth_basic_user_file /etc/nginx/.htpasswd;

proxy_pass "${PROXY_PASS_MICRONAUT_API}";

error_page 401 /etc/nginx/401.html;
location ~ (401.html)$ {
alias /etc/nginx/$1;
auth_basic off;
}
}

location ~ ^/api/v1/(connections|destinations|destination_definitions|destination_definition_specifications|destination_oauths|jobs|logs|notifications|operations|scheduler|source_oauths|sources|source_definitions|source_definition_specifications|state|web_backend|workspaces)/.* {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

auth_basic "Welcome to Airbyte";
auth_basic_user_file /etc/nginx/.htpasswd;

proxy_pass "${PROXY_PASS_MICRONAUT_API}";

error_page 401 /etc/nginx/401.html;
location ~ (401.html)$ {
alias /etc/nginx/$1;
auth_basic off;
}
}
}

server {
Expand Down
32 changes: 0 additions & 32 deletions airbyte-proxy/nginx-no-auth.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,6 @@ http {
proxy_read_timeout ${BASIC_AUTH_PROXY_TIMEOUT};
send_timeout ${BASIC_AUTH_PROXY_TIMEOUT};
}

location ~ ^/api/v1/(health|openapi) {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

proxy_pass "${PROXY_PASS_MICRONAUT_API}";
}

location ~ ^/api/v1/(connections|destinations|destination_definitions|destination_definition_specifications|destination_oauths|jobs|logs|notifications|operations|scheduler|source_oauths|sources|source_definitions|source_definition_specifications|state|web_backend|workspaces)/.* {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

proxy_pass "${PROXY_PASS_MICRONAUT_API}";
}
}

server {
Expand All @@ -53,22 +37,6 @@ http {
proxy_read_timeout ${BASIC_AUTH_PROXY_TIMEOUT};
send_timeout ${BASIC_AUTH_PROXY_TIMEOUT};
}

location ~ ^/api/v1/(health|openapi) {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

proxy_pass "${PROXY_PASS_MICRONAUT_API}";
}

location ~ ^/api/v1/(connections|destinations|destination_definitions|destination_definition_specifications|destination_oauths|jobs|logs|notifications|operations|scheduler|source_oauths|sources|source_definitions|source_definition_specifications|state|web_backend|workspaces)/.* {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

proxy_pass "${PROXY_PASS_MICRONAUT_API}";
}
}

server {
Expand Down
2 changes: 1 addition & 1 deletion airbyte-proxy/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ else
TEMPLATE_PATH="/etc/nginx/templates/nginx-auth.conf.template"
fi

envsubst '${PROXY_PASS_WEB} ${PROXY_PASS_API} ${PROXY_PASS_MICRONAUT_API} ${CONNECTOR_BUILDER_SERVER_API} ${PROXY_PASS_RESOLVER} ${BASIC_AUTH_PROXY_TIMEOUT}' < $TEMPLATE_PATH > /etc/nginx/nginx.conf
envsubst '${PROXY_PASS_WEB} ${PROXY_PASS_API} ${CONNECTOR_BUILDER_SERVER_API} ${PROXY_PASS_RESOLVER} ${BASIC_AUTH_PROXY_TIMEOUT}' < $TEMPLATE_PATH > /etc/nginx/nginx.conf

echo "starting nginx..."
nginx -v
Expand Down
4 changes: 2 additions & 2 deletions airbyte-proxy/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ VERSION="${VERSION:-dev}" # defaults to "dev", otherwise it is set by environmen
echo "testing with proxy container airbyte/proxy:$VERSION"

function start_container () {
CMD="docker run -d -p $PORT:8000 --env BASIC_AUTH_USERNAME=$1 --env BASIC_AUTH_PASSWORD=$2 --env BASIC_AUTH_PROXY_TIMEOUT=$3 --env PROXY_PASS_WEB=http://localhost --env PROXY_PASS_API=http://localhost --env PROXY_PASS_MICRONAUT_API=http://localhost --env CONNECTOR_BUILDER_SERVER_API=http://localhost --name $NAME airbyte/proxy:$VERSION"
CMD="docker run -d -p $PORT:8000 --env BASIC_AUTH_USERNAME=$1 --env BASIC_AUTH_PASSWORD=$2 --env BASIC_AUTH_PROXY_TIMEOUT=$3 --env PROXY_PASS_WEB=http://localhost --env PROXY_PASS_API=http://localhost --env CONNECTOR_BUILDER_SERVER_API=http://localhost --name $NAME airbyte/proxy:$VERSION"
echo $CMD
eval $CMD
wait_for_docker;
}

function start_container_with_proxy () {
CMD="docker run -d -p $PORT:8000 --env PROXY_PASS_WEB=$1 --env PROXY_PASS_API=$1 --env PROXY_PASS_MICRONAUT_API=$1 --name $NAME
CMD="docker run -d -p $PORT:8000 --env PROXY_PASS_WEB=$1 --env PROXY_PASS_API=$1 --name $NAME
airbyte/proxy:$VERSION"
echo $CMD
eval $CMD
Expand Down
75 changes: 0 additions & 75 deletions airbyte-server/src/main/java/io/airbyte/server/EventListener.java

This file was deleted.

18 changes: 15 additions & 3 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.commons.temporal.TemporalClient;
import io.airbyte.commons.temporal.TemporalUtils;
import io.airbyte.commons.temporal.TemporalWorkflowUtils;
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs;
import io.airbyte.config.helpers.LogClientSingleton;
Expand Down Expand Up @@ -71,6 +72,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
Expand Down Expand Up @@ -280,8 +282,17 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
connectionsHandler,
envVariableFeatureFlags);

final DestinationDefinitionsHandler destinationDefinitionsHandler = new DestinationDefinitionsHandler(configRepository, syncSchedulerClient,
destinationHandler);
final AirbyteProtocolVersionRange airbyteProtocolVersionRange = new AirbyteProtocolVersionRange(configs.getAirbyteProtocolVersionMin(),
configs.getAirbyteProtocolVersionMax());

final AirbyteGithubStore airbyteGithubStore = AirbyteGithubStore.production();

final DestinationDefinitionsHandler destinationDefinitionsHandler = new DestinationDefinitionsHandler(configRepository,
() -> UUID.randomUUID(),
syncSchedulerClient,
airbyteGithubStore,
destinationHandler,
airbyteProtocolVersionRange);

final HealthCheckHandler healthCheckHandler = new HealthCheckHandler(configRepository);

Expand All @@ -295,7 +306,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
connectionsHandler);

final SourceDefinitionsHandler sourceDefinitionsHandler =
new SourceDefinitionsHandler(configRepository, syncSchedulerClient, sourceHandler, configs);
new SourceDefinitionsHandler(configRepository, () -> UUID.randomUUID(), syncSchedulerClient, airbyteGithubStore, sourceHandler,
airbyteProtocolVersionRange);

final JobHistoryHandler jobHistoryHandler = new JobHistoryHandler(
jobPersistence,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import io.micronaut.http.annotation.Post;
import java.io.File;

@Controller("/v1/logs")
@Controller("/api/v1/logs")
@Context
public class LogsApiController implements LogsApi {

Expand Down
Loading