Skip to content

Commit

Permalink
airbyte-config: Implement RemoteDefinitionsProvider (airbytehq#16018)
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere authored and robbinhan committed Sep 29, 2022
1 parent 5daa738 commit be523b3
Show file tree
Hide file tree
Showing 12 changed files with 14,978 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.storage.CloudStorageConfigs;
import java.net.URI;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -71,6 +72,11 @@ public interface Configs {
*/
Path getWorkspaceRoot();

/**
* Defines the URL to pull the remote connector catalog from.
*/
URI getRemoteConnectorCatalogUrl();

// Docker Only

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.config.storage.CloudStorageConfigs.GcsConfig;
import io.airbyte.config.storage.CloudStorageConfigs.MinioConfig;
import io.airbyte.config.storage.CloudStorageConfigs.S3Config;
import java.net.URI;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -148,6 +149,8 @@ public class EnvConfigs implements Configs {
public static final String METRIC_CLIENT = "METRIC_CLIENT";
private static final String OTEL_COLLECTOR_ENDPOINT = "OTEL_COLLECTOR_ENDPOINT";

public static final String REMOTE_CONNECTOR_CATALOG_URL = "REMOTE_CONNECTOR_CATALOG_URL";

// job-type-specific overrides
public static final String SPEC_JOB_KUBE_NODE_SELECTORS = "SPEC_JOB_KUBE_NODE_SELECTORS";
public static final String CHECK_JOB_KUBE_NODE_SELECTORS = "CHECK_JOB_KUBE_NODE_SELECTORS";
Expand Down Expand Up @@ -323,6 +326,12 @@ public Path getWorkspaceRoot() {
return getPath(WORKSPACE_ROOT);
}

@Override
public URI getRemoteConnectorCatalogUrl() {
// Default to reuse the job database
return URI.create(getEnsureEnv(REMOTE_CONNECTOR_CATALOG_URL));
}

// Docker Only
@Override
public String getWorkspaceDockerMount() {
Expand Down
3 changes: 3 additions & 0 deletions airbyte-config/init/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ dependencies {
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-commons-docker')
implementation project(':airbyte-json-validation')

testImplementation project(':airbyte-test-utils')
testImplementation 'com.squareup.okhttp3:mockwebserver:4.9.1'
}

task copyScripts(type: Copy) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.init;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.ConfigWithMetadata;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;

public class DefinitionProviderToConfigPersistenceAdapter implements ConfigPersistence {

private final DefinitionsProvider definitionsProvider;
private static final String PERSISTENCE_READ_ONLY_ERROR_MSG = "The remote definitions are read only.";

public DefinitionProviderToConfigPersistenceAdapter(DefinitionsProvider definitionsProvider) {
this.definitionsProvider = definitionsProvider;
}

@Override
public <T> T getConfig(AirbyteConfig configType, String configId, Class<T> clazz)
throws ConfigNotFoundException, JsonValidationException, IOException {
if (configType == ConfigSchema.STANDARD_SOURCE_DEFINITION && clazz == StandardSourceDefinition.class) {
return (T) definitionsProvider.getSourceDefinition(UUID.fromString(configId));
} else if (configType == ConfigSchema.STANDARD_DESTINATION_DEFINITION && clazz == StandardDestinationDefinition.class) {
return (T) definitionsProvider.getDestinationDefinition(UUID.fromString(configId));
} else {
throw new UnsupportedOperationException("The config type you passed does not match any existing model class.");
}
}

@Override
public <T> List<T> listConfigs(AirbyteConfig configType, Class<T> clazz) throws JsonValidationException, IOException {
if (configType == ConfigSchema.STANDARD_SOURCE_DEFINITION && clazz == StandardSourceDefinition.class) {
return (List<T>) definitionsProvider.getSourceDefinitions();
} else if (configType == ConfigSchema.STANDARD_DESTINATION_DEFINITION && clazz == StandardDestinationDefinition.class) {
return (List<T>) definitionsProvider.getDestinationDefinitions();
} else {
throw new UnsupportedOperationException("The config type you passed does not match any existing model class.");
}
}

@Override
public <T> ConfigWithMetadata<T> getConfigWithMetadata(AirbyteConfig configType, String configId, Class<T> clazz)
throws ConfigNotFoundException, JsonValidationException, IOException {
return null;
}

@Override
public <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(AirbyteConfig configType, Class<T> clazz)
throws JsonValidationException, IOException {
throw new UnsupportedOperationException("Definition provider doesn't support metadata");
}

@Override
public <T> void writeConfig(AirbyteConfig configType, String configId, T config) throws JsonValidationException, IOException {
throw new UnsupportedOperationException(PERSISTENCE_READ_ONLY_ERROR_MSG);

}

@Override
public <T> void writeConfigs(AirbyteConfig configType, Map<String, T> configs) throws IOException, JsonValidationException {
throw new UnsupportedOperationException(PERSISTENCE_READ_ONLY_ERROR_MSG);

}

@Override
public void deleteConfig(AirbyteConfig configType, String configId) throws ConfigNotFoundException, IOException {
throw new UnsupportedOperationException(PERSISTENCE_READ_ONLY_ERROR_MSG);

}

@Override
public void replaceAllConfigs(Map<AirbyteConfig, Stream<?>> configs, boolean dryRun) throws IOException {
throw new UnsupportedOperationException(PERSISTENCE_READ_ONLY_ERROR_MSG);
}

@Override
public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
Stream<JsonNode> jsonSourceDefinitions = definitionsProvider.getSourceDefinitions().stream().map(Jsons::jsonNode);
Stream<JsonNode> jsonDestinationDefinitions = definitionsProvider.getDestinationDefinitions().stream().map(Jsons::jsonNode);
return Map.of(SeedType.STANDARD_SOURCE_DEFINITION.name(), jsonSourceDefinitions, SeedType.STANDARD_DESTINATION_DEFINITION.name(),
jsonDestinationDefinitions);
}

@Override
public void loadData(ConfigPersistence seedPersistence) throws IOException {
throw new UnsupportedOperationException(PERSISTENCE_READ_ONLY_ERROR_MSG);

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.init;

import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigNotFoundException;
import java.util.List;
import java.util.UUID;

public interface DefinitionsProvider {

StandardSourceDefinition getSourceDefinition(final UUID definitionId) throws ConfigNotFoundException;

List<StandardSourceDefinition> getSourceDefinitions();

StandardDestinationDefinition getDestinationDefinition(final UUID definitionId) throws ConfigNotFoundException;

List<StandardDestinationDefinition> getDestinationDefinitions();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.init;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.BooleanNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

public class JsonDefinitionsHelper {

public static JsonNode addMissingTombstoneField(final JsonNode definitionJson) {
final JsonNode currTombstone = definitionJson.get("tombstone");
if (currTombstone == null || currTombstone.isNull()) {
((ObjectNode) definitionJson).set("tombstone", BooleanNode.FALSE);
}
return definitionJson;
}

public static JsonNode addMissingPublicField(final JsonNode definitionJson) {
final JsonNode currPublic = definitionJson.get("public");
if (currPublic == null || currPublic.isNull()) {
// definitions loaded from seed yamls are by definition public
((ObjectNode) definitionJson).set("public", BooleanNode.TRUE);
}
return definitionJson;
}

public static JsonNode addMissingCustomField(final JsonNode definitionJson) {
final JsonNode currCustom = definitionJson.get("custom");
if (currCustom == null || currCustom.isNull()) {
// definitions loaded from seed yamls are by definition not custom
((ObjectNode) definitionJson).set("custom", BooleanNode.FALSE);
}
return definitionJson;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.init;

import static io.airbyte.config.init.JsonDefinitionsHelper.addMissingCustomField;
import static io.airbyte.config.init.JsonDefinitionsHelper.addMissingPublicField;
import static io.airbyte.config.init.JsonDefinitionsHelper.addMissingTombstoneField;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.io.Resources;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.commons.yaml.Yamls;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;

/**
* This provider contains all definitions according to the local yaml files.
*/
final public class LocalDefinitionsProvider implements DefinitionsProvider {

private Map<UUID, StandardSourceDefinition> sourceDefinitions;
private Map<UUID, StandardDestinationDefinition> destinationDefinitions;

// TODO inject via dependency injection framework
private final Class<?> seedResourceClass;

public LocalDefinitionsProvider(final Class<?> seedResourceClass) throws IOException {
this.seedResourceClass = seedResourceClass;

// TODO remove this call once dependency injection framework manages object creation
initialize();
}

// TODO will be called automatically by the dependency injection framework on object creation
public void initialize() throws IOException {
this.sourceDefinitions =
parseDefinitions(this.seedResourceClass, SeedType.STANDARD_SOURCE_DEFINITION.getResourcePath(), SeedType.SOURCE_SPEC.getResourcePath(),
SeedType.STANDARD_SOURCE_DEFINITION.getIdName(), SeedType.SOURCE_SPEC.getIdName(), StandardSourceDefinition.class);
this.destinationDefinitions = parseDefinitions(this.seedResourceClass, SeedType.STANDARD_DESTINATION_DEFINITION.getResourcePath(),
SeedType.DESTINATION_SPEC.getResourcePath(), SeedType.STANDARD_DESTINATION_DEFINITION.getIdName(), SeedType.DESTINATION_SPEC.getIdName(),
StandardDestinationDefinition.class);
}

@Override
public StandardSourceDefinition getSourceDefinition(final UUID definitionId) throws ConfigNotFoundException {
StandardSourceDefinition definition = this.sourceDefinitions.get(definitionId);
if (definition == null) {
throw new ConfigNotFoundException(SeedType.STANDARD_SOURCE_DEFINITION.name(), definitionId.toString());
}
return definition;
}

@Override
public List<StandardSourceDefinition> getSourceDefinitions() {
return new ArrayList<>(this.sourceDefinitions.values());
}

@Override
public StandardDestinationDefinition getDestinationDefinition(final UUID definitionId) throws ConfigNotFoundException {
StandardDestinationDefinition definition = this.destinationDefinitions.get(definitionId);
if (definition == null) {
throw new ConfigNotFoundException(SeedType.STANDARD_DESTINATION_DEFINITION.name(), definitionId.toString());
}
return definition;
}

@Override
public List<StandardDestinationDefinition> getDestinationDefinitions() {
return new ArrayList<>(this.destinationDefinitions.values());
}

@SuppressWarnings("UnstableApiUsage")
private static <T> Map<UUID, T> parseDefinitions(final Class<?> seedDefinitionsResourceClass,
final String definitionsYamlPath,
final String specYamlPath,
final String definitionIdField,
final String specIdField,
final Class<T> definitionModel)
throws IOException {
final Map<String, JsonNode> rawDefinitions = getJsonElements(seedDefinitionsResourceClass, definitionsYamlPath, definitionIdField);
final Map<String, JsonNode> rawSpecs = getJsonElements(seedDefinitionsResourceClass, specYamlPath, specIdField);

return rawDefinitions.entrySet().stream()
.collect(Collectors.toMap(e -> UUID.fromString(e.getKey()), e -> {
final JsonNode withMissingFields = addMissingFields(e.getValue());
final JsonNode withSpec = mergeSpecIntoDefinition(withMissingFields, rawSpecs);
return Jsons.object(withSpec, definitionModel);
}));

}

private static Map<String, JsonNode> getJsonElements(final Class<?> seedDefinitionsResourceClass, final String resourcePath, final String idName)
throws IOException {
final URL url = Resources.getResource(seedDefinitionsResourceClass, resourcePath);
final String yamlString = Resources.toString(url, StandardCharsets.UTF_8);
final JsonNode configList = Yamls.deserialize(yamlString);
return MoreIterators.toList(configList.elements()).stream().collect(Collectors.toMap(
json -> json.get(idName).asText(),
json -> json));
}

/**
* Merges the corresponding spec JSON into the definition JSON. This is necessary because specs are
* stored in a separate resource file from definitions.
*
* @param definitionJson JSON of connector definition that is missing a spec
* @param specConfigs map of docker image to JSON of docker image/connector spec pair
* @return JSON of connector definition including the connector spec
*/
private static JsonNode mergeSpecIntoDefinition(final JsonNode definitionJson, final Map<String, JsonNode> specConfigs) {
final String dockerImage = DockerUtils.getTaggedImageName(
definitionJson.get("dockerRepository").asText(),
definitionJson.get("dockerImageTag").asText());
final JsonNode specConfigJson = specConfigs.get(dockerImage);
if (specConfigJson == null || specConfigJson.get("spec") == null) {
throw new UnsupportedOperationException(String.format("There is no seed spec for docker image %s", dockerImage));
}
((ObjectNode) definitionJson).set("spec", specConfigJson.get("spec"));
return definitionJson;
}

private static JsonNode addMissingFields(JsonNode element) {
return addMissingPublicField(addMissingCustomField(addMissingTombstoneField(element)));
}

}
Loading

0 comments on commit be523b3

Please sign in to comment.