Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

airbyte-config: Implement RemoteDefinitionsProvider #16018

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b0e2572
Remote connector catalog config persistence
alafanechere Aug 26, 2022
81e60a6
clean
alafanechere Aug 29, 2022
e01ba75
pmd fix
alafanechere Aug 29, 2022
09dace6
format
alafanechere Aug 29, 2022
bcd6322
fix pmdTest
alafanechere Aug 29, 2022
369c6d0
fix test class name
alafanechere Aug 30, 2022
030979e
remove addition of public and custom fields
alafanechere Aug 30, 2022
f727a1c
Merge branch 'augustin/config/implement-remote-catalog-config-persist…
alafanechere Aug 30, 2022
84fb264
ref wip after review
alafanechere Aug 30, 2022
31aced6
use new interface
alafanechere Aug 31, 2022
5985d69
Merge branch 'master' into augustin/config/implement-remote-catalog-c…
alafanechere Aug 31, 2022
50ff130
create adapter
alafanechere Aug 31, 2022
23479ec
create adapter
alafanechere Aug 31, 2022
96aafd4
add LocalDefinitionsProvider
alafanechere Aug 31, 2022
ef76999
rename PatchingHelpers to JsonDefinitionsHelper
alafanechere Sep 1, 2022
7755a62
implement suggestions
alafanechere Sep 1, 2022
dff32bb
add tests for LocalDefintionProvider
alafanechere Sep 1, 2022
9ffc450
Merge branch 'master' into augustin/config/implement-remote-catalog-c…
alafanechere Sep 1, 2022
5a8c588
make getRemoteConnectorCatalogUrl return an URI
alafanechere Sep 1, 2022
a871a20
Merge branch 'augustin/config/implement-remote-catalog-config-persist…
alafanechere Sep 1, 2022
73b81a3
fix adapter name
alafanechere Sep 1, 2022
d395f5e
Merge branch 'master' into augustin/config/implement-remote-catalog-c…
alafanechere Sep 1, 2022
4e2a822
use CombinedConnectorCatalog
alafanechere Sep 1, 2022
853fc01
fix spotbug
alafanechere Sep 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.storage.CloudStorageConfigs;
import java.net.URI;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -74,7 +75,7 @@ public interface Configs {
/**
* Defines the URL to pull the remote connector catalog from.
*/
String getRemoteConnectorCatalogUrl();
URI getRemoteConnectorCatalogUrl();

// Docker Only

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.config.storage.CloudStorageConfigs.GcsConfig;
import io.airbyte.config.storage.CloudStorageConfigs.MinioConfig;
import io.airbyte.config.storage.CloudStorageConfigs.S3Config;
import java.net.URI;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -326,9 +327,9 @@ public Path getWorkspaceRoot() {
}

@Override
public String getRemoteConnectorCatalogUrl() {
public URI getRemoteConnectorCatalogUrl() {
// Default to reuse the job database
return getEnsureEnv(REMOTE_CONNECTOR_CATALOG_URL);
return URI.create(getEnsureEnv(REMOTE_CONNECTOR_CATALOG_URL));
}

// Docker Only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.fasterxml.jackson.databind.node.BooleanNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

public class PatchingHelpers {
public class JsonDefinitionsHelper {

public static JsonNode addMissingTombstoneField(final JsonNode definitionJson) {
final JsonNode currTombstone = definitionJson.get("tombstone");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

package io.airbyte.config.init;

import static io.airbyte.config.init.PatchingHelpers.addMissingCustomField;
import static io.airbyte.config.init.PatchingHelpers.addMissingPublicField;
import static io.airbyte.config.init.PatchingHelpers.addMissingTombstoneField;
import static io.airbyte.config.init.JsonDefinitionsHelper.addMissingCustomField;
import static io.airbyte.config.init.JsonDefinitionsHelper.addMissingPublicField;
import static io.airbyte.config.init.JsonDefinitionsHelper.addMissingTombstoneField;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -28,8 +28,7 @@
import java.util.stream.Collectors;

/**
* This config persistence contains all seed definitions according to the yaml files. It is
* read-only.
* This provider contains all definitions according to the local yaml files.
*/
final public class LocalDefinitionsProvider implements DefinitionsProvider {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@

package io.airbyte.config.init;

import static io.airbyte.config.init.PatchingHelpers.addMissingTombstoneField;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.config.CombinedConnectorCatalog;
alafanechere marked this conversation as resolved.
Show resolved Hide resolved
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigNotFoundException;
Expand All @@ -17,62 +14,46 @@
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpTimeoutException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This config persistence pulls the definition configurations from a remotely hosted catalog. It is
* read-only.
* This provider pulls the definitions from a remotely hosted catalog.
*/
final public class RemoteDefinitionsProvider implements DefinitionsProvider {

private Map<UUID, StandardSourceDefinition> sourceDefinitions;
private Map<UUID, StandardDestinationDefinition> destinationDefinitions;

private static final Logger LOGGER = LoggerFactory.getLogger(RemoteDefinitionsProvider.class);
private static final HttpClient httpClient = HttpClient.newHttpClient();
private final URI remoteDefinitionCatalogUrl;
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30);
private final Duration timeout;

public RemoteDefinitionsProvider(final String remoteDefinitionCatalogUrl) throws InterruptedException, IOException {
this.remoteDefinitionCatalogUrl = URI.create(remoteDefinitionCatalogUrl);
this.timeout = DEFAULT_TIMEOUT;
// TODO remove this call once dependency injection framework manages object creation
initialize();
public RemoteDefinitionsProvider(final URI remoteDefinitionCatalogUrl) throws InterruptedException, IOException {
this(remoteDefinitionCatalogUrl, DEFAULT_TIMEOUT);
}

public RemoteDefinitionsProvider(final String remoteDefinitionCatalogUrl, final Duration timeout) throws InterruptedException, IOException {
this.remoteDefinitionCatalogUrl = URI.create(remoteDefinitionCatalogUrl);
public RemoteDefinitionsProvider(final URI remoteDefinitionCatalogUrl, final Duration timeout) throws InterruptedException, IOException {
this.remoteDefinitionCatalogUrl = remoteDefinitionCatalogUrl;
this.timeout = timeout;
// TODO remove this call once dependency injection framework manages object creation
initialize();
}

// TODO will be called automatically by the dependency injection framework on object creation
public void initialize() throws InterruptedException, IOException {
try {
final JsonNode catalog = getRemoteDefinitionCatalog(this.remoteDefinitionCatalogUrl, this.timeout);
this.sourceDefinitions =
parseDefinitions(catalog, "sources", StandardSourceDefinition.class, SeedType.STANDARD_SOURCE_DEFINITION.getIdName());
this.destinationDefinitions =
parseDefinitions(catalog, "destinations", StandardDestinationDefinition.class, SeedType.STANDARD_DESTINATION_DEFINITION.getIdName());
} catch (final HttpTimeoutException e) {
LOGGER.warn(
"Unable to retrieve the remote definition catalog. Using the catalog bundled with Airbyte. This warning is expected if this Airbyte cluster does not have internet access.",
e);
this.sourceDefinitions = Collections.emptyMap();
this.destinationDefinitions = Collections.emptyMap();
}

final CombinedConnectorCatalog catalog = getRemoteDefinitionCatalog(this.remoteDefinitionCatalogUrl, this.timeout);
this.sourceDefinitions = catalog.getSources().stream().collect(Collectors.toMap(
StandardSourceDefinition::getSourceDefinitionId,
source -> source));
this.destinationDefinitions = catalog.getDestinations().stream().collect(Collectors.toMap(
StandardDestinationDefinition::getDestinationDefinitionId,
source -> source));
}

@Override
Expand Down Expand Up @@ -103,39 +84,15 @@ public List<StandardDestinationDefinition> getDestinationDefinitions() {
return new ArrayList<>(this.destinationDefinitions.values());
}

private static JsonNode getRemoteDefinitionCatalog(URI catalogUrl, Duration timeout) throws IOException, InterruptedException {
private static CombinedConnectorCatalog getRemoteDefinitionCatalog(URI catalogUrl, Duration timeout) throws IOException, InterruptedException {
final HttpRequest request = HttpRequest.newBuilder(catalogUrl).timeout(timeout).header("accept", "application/json").build();

final HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() >= 400) {
throw new IOException(
"getRemoteDefinitionCatalog request ran into status code error: " + response.statusCode() + " with message: " + response.getClass());
}
try {
return Jsons.deserialize(response.body());
} catch (final RuntimeException e) {
throw new IOException("Could not deserialize JSON response: ", e);
}

}

private static <T> Map<UUID, T> parseDefinitions(final JsonNode catalog,
final String catalogKey,
final Class<T> outputModel,
final String definitionIdField)
throws IOException {
final JsonNode catalogElementNode = catalog.get(catalogKey);
if (catalogElementNode == null) {
throw new IOException("The catalog schema is invalid: Missing \"" + catalogKey + "\" key");
}
final List<JsonNode> catalogElements = MoreIterators.toList(catalogElementNode.elements());
return catalogElements.stream().collect(Collectors.toMap(
json -> UUID.fromString(json.get(definitionIdField).asText()),
json -> Jsons.object(addMissingFields(json), outputModel)));
}

private static JsonNode addMissingFields(JsonNode element) {
return addMissingTombstoneField(element);
return Jsons.deserialize(response.body(), CombinedConnectorCatalog.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class DefinitionProviderToConfigPersistenceAdapterTest {
class DefinitionsProviderToConfigPersistenceAdapterTest {

private static MockWebServer webServer;
private static MockResponse validCatalogResponse;
private static String catalogUrl;
private static URI catalogUrl;
private static JsonNode jsonCatalog;

@BeforeEach
void setup() throws IOException {
webServer = new MockWebServer();
catalogUrl = webServer.url("/connector_catalog.json").toString();
catalogUrl = URI.create(webServer.url("/connector_catalog.json").toString());

final URL testCatalog = Resources.getResource("connector_catalog.json");
final String jsonBody = Resources.toString(testCatalog, Charset.defaultCharset());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.init;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.io.Resources;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.commons.yaml.Yamls;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

class LocalDefinitionsProviderTest {

private static LocalDefinitionsProvider localDefinitionsProvider;

@BeforeAll
static void setup() throws IOException {
localDefinitionsProvider = new LocalDefinitionsProvider(SeedType.class);
}

@Test
void testGetSourceDefinition() throws Exception {
// source
final UUID stripeSourceId = UUID.fromString("e094cb9a-26de-4645-8761-65c0c425d1de");
final StandardSourceDefinition stripeSource = localDefinitionsProvider.getSourceDefinition(stripeSourceId);
assertEquals(stripeSourceId, stripeSource.getSourceDefinitionId());
assertEquals("Stripe", stripeSource.getName());
assertEquals("airbyte/source-stripe", stripeSource.getDockerRepository());
assertEquals("https://docs.airbyte.io/integrations/sources/stripe", stripeSource.getDocumentationUrl());
assertEquals("stripe.svg", stripeSource.getIcon());
assertEquals(URI.create("https://docs.airbyte.io/integrations/sources/stripe"), stripeSource.getSpec().getDocumentationUrl());
assertEquals(false, stripeSource.getTombstone());
}

@Test
@SuppressWarnings({"PMD.AvoidDuplicateLiterals"})
void testGetDestinationDefinition() throws Exception {
final UUID s3DestinationId = UUID.fromString("4816b78f-1489-44c1-9060-4b19d5fa9362");
final StandardDestinationDefinition s3Destination = localDefinitionsProvider
.getDestinationDefinition(s3DestinationId);
assertEquals(s3DestinationId, s3Destination.getDestinationDefinitionId());
assertEquals("S3", s3Destination.getName());
assertEquals("airbyte/destination-s3", s3Destination.getDockerRepository());
assertEquals("https://docs.airbyte.io/integrations/destinations/s3", s3Destination.getDocumentationUrl());
assertEquals(URI.create("https://docs.airbyte.io/integrations/destinations/s3"), s3Destination.getSpec().getDocumentationUrl());
assertEquals(false, s3Destination.getTombstone());
}

@Test
void testGetInvalidDefinitionId() {
final UUID invalidDefinitionId = UUID.fromString("1a7c360c-1289-4b96-a171-2ac1c86fb7ca");

assertThrows(
ConfigNotFoundException.class,
() -> localDefinitionsProvider.getSourceDefinition(invalidDefinitionId));
assertThrows(
ConfigNotFoundException.class,
() -> localDefinitionsProvider.getDestinationDefinition(invalidDefinitionId));
}

@Test
void testGetSourceDefinitions() throws IOException {
final URL url = Resources.getResource(LocalDefinitionsProvider.class, "/seed/source_definitions.yaml");
final String yamlString = Resources.toString(url, StandardCharsets.UTF_8);
final JsonNode configList = Yamls.deserialize(yamlString);
final int expectedNumberOfSources = MoreIterators.toList(configList.elements()).size();

final List<StandardSourceDefinition> sourceDefinitions = localDefinitionsProvider.getSourceDefinitions();
assertEquals(expectedNumberOfSources, sourceDefinitions.size());
}

@Test
void testGetDestinationDefinitions() throws IOException {
final URL url = Resources.getResource(LocalDefinitionsProvider.class, "/seed/destination_definitions.yaml");
final String yamlString = Resources.toString(url, StandardCharsets.UTF_8);
final JsonNode configList = Yamls.deserialize(yamlString);
final int expectedNumberOfDestinations = MoreIterators.toList(configList.elements()).size();
final List<StandardDestinationDefinition> destinationDefinitions = localDefinitionsProvider.getDestinationDefinitions();
assertEquals(expectedNumberOfDestinations, destinationDefinitions.size());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.net.http.HttpTimeoutException;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.List;
Expand All @@ -30,13 +31,13 @@ class RemoteDefinitionsProviderTest {

private static MockWebServer webServer;
private static MockResponse validCatalogResponse;
private static String catalogUrl;
private static URI catalogUrl;
private static JsonNode jsonCatalog;

@BeforeEach
void setup() throws IOException {
webServer = new MockWebServer();
catalogUrl = webServer.url("/connector_catalog.json").toString();
catalogUrl = URI.create(webServer.url("/connector_catalog.json").toString());

final URL testCatalog = Resources.getResource("connector_catalog.json");
final String jsonBody = Resources.toString(testCatalog, Charset.defaultCharset());
Expand All @@ -60,7 +61,6 @@ void testGetSourceDefinition() throws Exception {
assertEquals("https://docs.airbyte.io/integrations/sources/stripe", stripeSource.getDocumentationUrl());
assertEquals("stripe.svg", stripeSource.getIcon());
assertEquals(URI.create("https://docs.airbyte.io/integrations/sources/stripe"), stripeSource.getSpec().getDocumentationUrl());
assertEquals(false, stripeSource.getTombstone());
}

@Test
Expand All @@ -76,7 +76,6 @@ void testGetDestinationDefinition() throws Exception {
assertEquals("airbyte/destination-s3", s3Destination.getDockerRepository());
assertEquals("https://docs.airbyte.io/integrations/destinations/s3", s3Destination.getDocumentationUrl());
assertEquals(URI.create("https://docs.airbyte.io/integrations/destinations/s3"), s3Destination.getSpec().getDocumentationUrl());
assertEquals(false, s3Destination.getTombstone());
}

@Test
Expand Down Expand Up @@ -120,11 +119,11 @@ void testBadResponseStatus() {
}

@Test
void testTimeOut() throws Exception {
void testTimeOut() {
// No request enqueued -> Timeout
final RemoteDefinitionsProvider remoteDefinitionsProvider = new RemoteDefinitionsProvider(catalogUrl, Duration.ofSeconds(1));
assertEquals(0, remoteDefinitionsProvider.getSourceDefinitions().size());
assertEquals(0, remoteDefinitionsProvider.getDestinationDefinitions().size());
assertThrows(HttpTimeoutException.class, () -> {
new RemoteDefinitionsProvider(catalogUrl, Duration.ofSeconds(1));
});
}

@Test
Expand All @@ -134,19 +133,7 @@ void testNonJson() {
.addHeader("Cache-Control", "no-cache")
.setBody("not json");
webServer.enqueue(notJson);
assertThrows(IOException.class, () -> {
new RemoteDefinitionsProvider(catalogUrl, Duration.ofSeconds(1));
});
}

@Test
void testInvalidCatalogSchema() {
final MockResponse invalidSchema = new MockResponse().setResponseCode(200)
.addHeader("Content-Type", "application/json; charset=utf-8")
.addHeader("Cache-Control", "no-cache")
.setBody("{\"foo\":\"bar\"}");
webServer.enqueue(invalidSchema);
assertThrows(IOException.class, () -> {
assertThrows(RuntimeException.class, () -> {
new RemoteDefinitionsProvider(catalogUrl, Duration.ofSeconds(1));
});
}
Expand Down