Skip to content

Commit

Permalink
Separate usage of ConnectorRegistry types and internal platform types…
Browse files Browse the repository at this point in the history
… (#5849)

Co-authored-by: Ben Church <ben@airbyte.io>
  • Loading branch information
pedroslopez and bnchrch committed Apr 14, 2023
1 parent b30b3d4 commit 55cf1b5
Show file tree
Hide file tree
Showing 27 changed files with 668 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.airbyte.config.Geography;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.helpers.ConnectorRegistryConverters;
import io.airbyte.config.init.ApplyDefinitionsHelper;
import io.airbyte.config.init.CdkVersionProvider;
import io.airbyte.config.init.DeclarativeSourceUpdater;
Expand Down Expand Up @@ -230,7 +231,9 @@ void testBootloaderAppRunSecretMigration() throws Exception {
initBootloader.load();

final DefinitionsProvider localDefinitions = new LocalDefinitionsProvider();
configRepository.seedActorDefinitions(localDefinitions.getSourceDefinitions(), localDefinitions.getDestinationDefinitions());
configRepository.seedActorDefinitions(
localDefinitions.getSourceDefinitions().stream().map(ConnectorRegistryConverters::toStandardSourceDefinition).toList(),
localDefinitions.getDestinationDefinitions().stream().map(ConnectorRegistryConverters::toStandardDestinationDefinition).toList());

final String sourceSpecs = """
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
import io.airbyte.commons.version.Version;
import io.airbyte.config.ActorType;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.ConnectorRegistryDestinationDefinition;
import io.airbyte.config.ConnectorRegistrySourceDefinition;
import io.airbyte.config.init.DefinitionsProvider;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.JobPersistence;
Expand Down Expand Up @@ -340,17 +340,17 @@ private void setCurrentProtocolRangeRange(final Version min, final Version max)
}

private void setNewDestinationDefinitions(final List<Entry<UUID, Version>> defs) {
final List<StandardDestinationDefinition> destDefinitions = defs.stream()
.map(e -> new StandardDestinationDefinition()
final List<ConnectorRegistryDestinationDefinition> destDefinitions = defs.stream()
.map(e -> new ConnectorRegistryDestinationDefinition()
.withDestinationDefinitionId(e.getKey())
.withSpec(new ConnectorSpecification().withProtocolVersion(e.getValue().serialize())))
.toList();
when(definitionsProvider.getDestinationDefinitions()).thenReturn(destDefinitions);
}

private void setNewSourceDefinitions(final List<Entry<UUID, Version>> defs) {
final List<StandardSourceDefinition> sourceDefinitions = defs.stream()
.map(e -> new StandardSourceDefinition()
final List<ConnectorRegistrySourceDefinition> sourceDefinitions = defs.stream()
.map(e -> new ConnectorRegistrySourceDefinition()
.withSourceDefinitionId(e.getKey())
.withSpec(new ConnectorSpecification().withProtocolVersion(e.getValue().serialize())))
.toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.helpers.ConnectorRegistryConverters;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConnectorSpecification;
Expand Down Expand Up @@ -150,7 +151,7 @@ public DestinationDefinitionReadList listLatestDestinationDefinitions() {
}

private List<StandardDestinationDefinition> getLatestDestinations() {
return remoteOssCatalog.getDestinationDefinitions();
return remoteOssCatalog.getDestinationDefinitions().stream().map(ConnectorRegistryConverters::toStandardDestinationDefinition).toList();
}

public DestinationDefinitionReadList listDestinationDefinitionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.helpers.ConnectorRegistryConverters;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConnectorSpecification;
Expand Down Expand Up @@ -156,7 +157,7 @@ public SourceDefinitionReadList listLatestSourceDefinitions() {
}

private List<StandardSourceDefinition> getLatestSources() {
return remoteOssCatalog.getSourceDefinitions();
return remoteOssCatalog.getSourceDefinitions().stream().map(ConnectorRegistryConverters::toStandardSourceDefinition).toList();
}

public SourceDefinitionReadList listSourceDefinitionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

import io.airbyte.api.model.generated.WebBackendCheckUpdatesRead;
import io.airbyte.commons.server.services.AirbyteRemoteOssCatalog;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.ConnectorRegistryDestinationDefinition;
import io.airbyte.config.ConnectorRegistrySourceDefinition;
import io.airbyte.config.persistence.ConfigRepository;
import jakarta.inject.Singleton;
import java.io.IOException;
Expand Down Expand Up @@ -68,7 +68,8 @@ private int getDestinationDiffCount() {

newActorDefToDockerImageTag = remoteOssCatalog.getDestinationDefinitions()
.stream()
.collect(Collectors.toMap(StandardDestinationDefinition::getDestinationDefinitionId, StandardDestinationDefinition::getDockerImageTag));
.collect(Collectors.toMap(ConnectorRegistryDestinationDefinition::getDestinationDefinitionId,
ConnectorRegistryDestinationDefinition::getDockerImageTag));

return getDiffCount(currentActorDefToDockerImageTag, newActorDefToDockerImageTag);
}
Expand All @@ -89,7 +90,7 @@ private int getSourceDiffCount() {

newActorDefToDockerImageTag = remoteOssCatalog.getSourceDefinitions()
.stream()
.collect(Collectors.toMap(StandardSourceDefinition::getSourceDefinitionId, StandardSourceDefinition::getDockerImageTag));
.collect(Collectors.toMap(ConnectorRegistrySourceDefinition::getSourceDefinitionId, ConnectorRegistrySourceDefinition::getDockerImageTag));

return getDiffCount(currentActorDefToDockerImageTag, newActorDefToDockerImageTag);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
package io.airbyte.commons.server.services;

import io.airbyte.commons.constants.AirbyteCatalogConstants;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.ConnectorRegistryDestinationDefinition;
import io.airbyte.config.ConnectorRegistrySourceDefinition;
import io.airbyte.config.init.RemoteDefinitionsProvider;
import io.airbyte.config.specs.CombinedConnectorCatalogDownloader;
import jakarta.inject.Singleton;
import java.util.Collections;
import java.util.List;
Expand All @@ -25,7 +24,7 @@
public class AirbyteRemoteOssCatalog extends RemoteDefinitionsProvider {

private static final long TIMEOUT = 30000;
private static final Logger LOGGER = LoggerFactory.getLogger(CombinedConnectorCatalogDownloader.class);
private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteRemoteOssCatalog.class);

public AirbyteRemoteOssCatalog() {
this(AirbyteCatalogConstants.REMOTE_OSS_CATALOG_URL);
Expand All @@ -38,25 +37,25 @@ public AirbyteRemoteOssCatalog(final String remoteCatalogUrl) {

@Override
@SuppressWarnings("PMD.AvoidCatchingThrowable")
public List<StandardDestinationDefinition> getDestinationDefinitions() {
public List<ConnectorRegistryDestinationDefinition> getDestinationDefinitions() {
try {
return super.getDestinationDefinitions();
} catch (final Throwable e) {
LOGGER.warn(
"Unable to retrieve latest Destination list from Remote Catalog. This warning is expected if this cluster does not have internet access.",
"Unable to retrieve latest Destination list from Remote Registry. This warning is expected if this cluster does not have internet access.",
e);
return Collections.emptyList();
}
}

@Override
@SuppressWarnings("PMD.AvoidCatchingThrowable")
public List<StandardSourceDefinition> getSourceDefinitions() {
public List<ConnectorRegistrySourceDefinition> getSourceDefinitions() {
try {
return super.getSourceDefinitions();
} catch (final Throwable e) {
LOGGER.warn(
"Unable to retrieve latest Source list from Remote Catalog. This warning is expected if this cluster does not have internet access.",
"Unable to retrieve latest Source list from Remote Registry. This warning is expected if this cluster does not have internet access.",
e);
return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
import io.airbyte.commons.version.Version;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.ActorType;
import io.airbyte.config.ConnectorRegistryDestinationDefinition;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.NormalizationDestinationDefinitionConfig;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.helpers.ConnectorRegistryConverters;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConnectorSpecification;
Expand Down Expand Up @@ -653,14 +655,28 @@ class listLatest {
@Test
@DisplayName("should return the latest list")
void testCorrect() throws InterruptedException {
final StandardDestinationDefinition destinationDefinition = generateDestinationDefinition();
when(remoteOssCatalog.getDestinationDefinitions()).thenReturn(Collections.singletonList(destinationDefinition));
final ConnectorRegistryDestinationDefinition registryDestinationDefinition = new ConnectorRegistryDestinationDefinition()
.withDestinationDefinitionId(UUID.randomUUID())
.withName("some-destination")
.withDocumentationUrl("https://airbyte.com")
.withDockerRepository("dockerrepo")
.withDockerImageTag("1.2.4")
.withIcon("dest.svg")
.withSpec(new ConnectorSpecification().withConnectionSpecification(
Jsons.jsonNode(ImmutableMap.of("key", "val"))))
.withTombstone(false)
.withProtocolVersion("0.2.2")
.withReleaseStage(io.airbyte.config.ReleaseStage.ALPHA)
.withReleaseDate(TODAY_DATE_STRING)
.withResourceRequirements(new ActorDefinitionResourceRequirements().withDefault(new ResourceRequirements().withCpuRequest("2")));
when(remoteOssCatalog.getDestinationDefinitions()).thenReturn(Collections.singletonList(registryDestinationDefinition));

final var destinationDefinitionReadList = destinationDefinitionsHandler.listLatestDestinationDefinitions().getDestinationDefinitions();
assertEquals(1, destinationDefinitionReadList.size());

final var destinationDefinitionRead = destinationDefinitionReadList.get(0);
assertEquals(DestinationDefinitionsHandler.buildDestinationDefinitionRead(destinationDefinition), destinationDefinitionRead);
assertEquals(DestinationDefinitionsHandler.buildDestinationDefinitionRead(
ConnectorRegistryConverters.toStandardDestinationDefinition(registryDestinationDefinition)), destinationDefinitionRead);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@
import io.airbyte.commons.version.Version;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.ActorType;
import io.airbyte.config.ConnectorRegistrySourceDefinition;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.helpers.ConnectorRegistryConverters;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConnectorSpecification;
Expand Down Expand Up @@ -593,15 +595,29 @@ class listLatest {

@Test
@DisplayName("should return the latest list")
void testCorrect() throws IOException, InterruptedException {
final StandardSourceDefinition sourceDefinition = generateSourceDefinition();
when(githubStore.getSourceDefinitions()).thenReturn(Collections.singletonList(sourceDefinition));
void testCorrect() {
final ConnectorRegistrySourceDefinition registrySourceDefinition = new ConnectorRegistrySourceDefinition()
.withSourceDefinitionId(UUID.randomUUID())
.withName("some-source")
.withDocumentationUrl("https://airbyte.com")
.withDockerRepository("dockerrepo")
.withDockerImageTag("1.2.4")
.withIcon("source.svg")
.withSpec(new ConnectorSpecification().withConnectionSpecification(
Jsons.jsonNode(ImmutableMap.of("key", "val"))))
.withTombstone(false)
.withReleaseStage(io.airbyte.config.ReleaseStage.ALPHA)
.withReleaseDate(TODAY_DATE_STRING)
.withResourceRequirements(new ActorDefinitionResourceRequirements().withDefault(new ResourceRequirements().withCpuRequest("2")));
when(githubStore.getSourceDefinitions()).thenReturn(Collections.singletonList(registrySourceDefinition));

final var sourceDefinitionReadList = sourceDefinitionsHandler.listLatestSourceDefinitions().getSourceDefinitions();
assertEquals(1, sourceDefinitionReadList.size());

final var sourceDefinitionRead = sourceDefinitionReadList.get(0);
assertEquals(SourceDefinitionsHandler.buildSourceDefinitionRead(sourceDefinition), sourceDefinitionRead);
assertEquals(
SourceDefinitionsHandler.buildSourceDefinitionRead(ConnectorRegistryConverters.toStandardSourceDefinition(registrySourceDefinition)),
sourceDefinitionRead);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import io.airbyte.api.model.generated.WebBackendCheckUpdatesRead;
import io.airbyte.commons.server.services.AirbyteRemoteOssCatalog;
import io.airbyte.config.ConnectorRegistryDestinationDefinition;
import io.airbyte.config.ConnectorRegistrySourceDefinition;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigRepository;
Expand Down Expand Up @@ -147,12 +149,18 @@ private void setMocks(final List<Entry<UUID, String>> currentSources,
when(configRepository.listStandardSourceDefinitions(INCLUDE_TOMBSTONE))
.thenReturn(currentSources.stream().map(this::createSourceDef).toList());
when(remoteOssCatalog.getSourceDefinitions())
.thenReturn(latestSources.stream().map(this::createSourceDef).toList());
.thenReturn(latestSources.stream().map(this::createRegistrySourceDef).toList());

when(configRepository.listStandardDestinationDefinitions(INCLUDE_TOMBSTONE))
.thenReturn(currentDestinations.stream().map(this::createDestinationDef).toList());
when(remoteOssCatalog.getDestinationDefinitions())
.thenReturn(latestDestinations.stream().map(this::createDestinationDef).toList());
.thenReturn(latestDestinations.stream().map(this::createRegistryDestinationDef).toList());
}

private ConnectorRegistryDestinationDefinition createRegistryDestinationDef(final Entry<UUID, String> idImageTagEntry) {
return new ConnectorRegistryDestinationDefinition()
.withDestinationDefinitionId(idImageTagEntry.getKey())
.withDockerImageTag(idImageTagEntry.getValue());
}

private StandardDestinationDefinition createDestinationDef(final Entry<UUID, String> idImageTagEntry) {
Expand All @@ -161,6 +169,12 @@ private StandardDestinationDefinition createDestinationDef(final Entry<UUID, Str
.withDockerImageTag(idImageTagEntry.getValue());
}

private ConnectorRegistrySourceDefinition createRegistrySourceDef(final Entry<UUID, String> idImageTagEntry) {
return new ConnectorRegistrySourceDefinition()
.withSourceDefinitionId(idImageTagEntry.getKey())
.withDockerImageTag(idImageTagEntry.getValue());
}

private StandardSourceDefinition createSourceDef(final Entry<UUID, String> idImageTagEntry) {
return new StandardSourceDefinition()
.withSourceDefinitionId(idImageTagEntry.getKey())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.google.common.io.Resources;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.ConnectorRegistryDestinationDefinition;
import io.airbyte.config.ConnectorRegistrySourceDefinition;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
Expand Down Expand Up @@ -52,7 +52,7 @@ void testReceivedData() throws Exception {

final AirbyteRemoteOssCatalog remoteDefinitionsProvider = new AirbyteRemoteOssCatalog(catalogUrl);
final UUID stripeSourceId = UUID.fromString("e094cb9a-26de-4645-8761-65c0c425d1de");
final StandardSourceDefinition stripeSource = remoteDefinitionsProvider.getSourceDefinition(stripeSourceId);
final ConnectorRegistrySourceDefinition stripeSource = remoteDefinitionsProvider.getSourceDefinition(stripeSourceId);
assertEquals(stripeSourceId, stripeSource.getSourceDefinitionId());
assertEquals("Stripe", stripeSource.getName());
assertEquals("airbyte/source-stripe", stripeSource.getDockerRepository());
Expand All @@ -73,10 +73,10 @@ void testNoCache() throws Exception {

assertEquals(0, webServer.getRequestCount());

remoteDefinitionsProvider.getRemoteDefinitionCatalog();
remoteDefinitionsProvider.getRemoteConnectorRegistry();
assertEquals(1, webServer.getRequestCount());

remoteDefinitionsProvider.getRemoteDefinitionCatalog();
remoteDefinitionsProvider.getRemoteConnectorRegistry();
assertEquals(2, webServer.getRequestCount());
}

Expand All @@ -87,10 +87,10 @@ void testHandlesNotAvailable() throws Exception {
webServer.enqueue(invalidCatalogResponse);

final AirbyteRemoteOssCatalog remoteDefinitionsProvider = new AirbyteRemoteOssCatalog(catalogUrl);
final List<StandardDestinationDefinition> definitions = remoteDefinitionsProvider.getDestinationDefinitions();
final List<ConnectorRegistryDestinationDefinition> definitions = remoteDefinitionsProvider.getDestinationDefinitions();
assertEquals(0, definitions.size());

final List<StandardSourceDefinition> sources = remoteDefinitionsProvider.getSourceDefinitions();
final List<ConnectorRegistrySourceDefinition> sources = remoteDefinitionsProvider.getSourceDefinitions();
assertEquals(0, sources.size());

}
Expand Down
Loading

0 comments on commit 55cf1b5

Please sign in to comment.