diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteConfig.java b/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteConfig.java index de07a554ae5a7..8136819c3d498 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteConfig.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteConfig.java @@ -48,4 +48,6 @@ public interface AirbyteConfig { */ File getConfigSchemaFile(); + boolean isSecret(); + } diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/ConfigSchema.java b/airbyte-config/models/src/main/java/io/airbyte/config/ConfigSchema.java index 2e485df97967e..5a0e0f57e5af1 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/ConfigSchema.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/ConfigSchema.java @@ -35,44 +35,42 @@ public enum ConfigSchema implements AirbyteConfig { STANDARD_WORKSPACE("StandardWorkspace.yaml", StandardWorkspace.class, standardWorkspace -> standardWorkspace.getWorkspaceId().toString(), - "workspaceId"), + "workspaceId", false), // source STANDARD_SOURCE_DEFINITION("StandardSourceDefinition.yaml", StandardSourceDefinition.class, standardSourceDefinition -> standardSourceDefinition.getSourceDefinitionId().toString(), - "sourceDefinitionId"), + "sourceDefinitionId", false), SOURCE_CONNECTION("SourceConnection.yaml", SourceConnection.class, sourceConnection -> sourceConnection.getSourceId().toString(), - "sourceId"), + "sourceId", true), // destination STANDARD_DESTINATION_DEFINITION("StandardDestinationDefinition.yaml", StandardDestinationDefinition.class, standardDestinationDefinition -> standardDestinationDefinition.getDestinationDefinitionId().toString(), - "destinationDefinitionId"), + "destinationDefinitionId", false), DESTINATION_CONNECTION("DestinationConnection.yaml", DestinationConnection.class, destinationConnection -> destinationConnection.getDestinationId().toString(), - "destinationId"), + "destinationId", true), // sync STANDARD_SYNC("StandardSync.yaml", StandardSync.class, standardSync -> standardSync.getConnectionId().toString(), - "connectionId"), + "connectionId", false), STANDARD_SYNC_OPERATION("StandardSyncOperation.yaml", StandardSyncOperation.class, - standardSyncOperation -> standardSyncOperation.getOperationId().toString(), - "operationId"), - + standardSyncOperation -> standardSyncOperation.getOperationId().toString(), "operationId", false), SOURCE_OAUTH_PARAM("SourceOAuthParameter.yaml", SourceOAuthParameter.class, sourceOAuthParameter -> sourceOAuthParameter.getOauthParameterId().toString(), - "oauthParameterId"), + "oauthParameterId", false), DESTINATION_OAUTH_PARAM("DestinationOAuthParameter.yaml", DestinationOAuthParameter.class, destinationOAuthParameter -> destinationOAuthParameter.getOauthParameterId().toString(), - "oauthParameterId"), + "oauthParameterId", false), STANDARD_SYNC_SUMMARY("StandardSyncSummary.yaml", StandardSyncSummary.class), @@ -90,21 +88,25 @@ public enum ConfigSchema implements AirbyteConfig { private final Class className; private final Function extractId; private final String idFieldName; + private final boolean isSecret; ConfigSchema(final String schemaFilename, Class className, Function extractId, - String idFieldName) { + String idFieldName, + boolean isSecret) { this.schemaFilename = schemaFilename; this.className = className; this.extractId = extractId; this.idFieldName = idFieldName; + this.isSecret = isSecret; } ConfigSchema(final String schemaFilename, Class className) { this.schemaFilename = schemaFilename; this.className = className; + this.isSecret = false; this.extractId = object -> { throw new RuntimeException(className.getSimpleName() + " doesn't have an id"); }; @@ -120,6 +122,11 @@ public Class getClassName() { return (Class) className; } + @Override + public boolean isSecret() { + return isSecret; + } + @Override public String getId(T object) { if (getClassName().isInstance(object)) { diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java index ce611221ee518..073bc15b49a7d 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Stream; public interface ConfigPersistence { @@ -46,6 +47,10 @@ public interface ConfigPersistence { Map> dumpConfigs() throws IOException; - void loadData(ConfigPersistence seedPersistence) throws IOException; + void loadData(ConfigPersistence seedPersistence, Set connectorRepositoriesInUse) throws IOException; + + Set getRepositoriesFromDefinitionIds(Set usedConnectorDefinitionIds) throws IOException; + + Set listDefinitionIdsInUseByConnectors() throws IOException; } diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index c29b84667b563..3e9cf084567a9 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -42,19 +42,31 @@ import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.stream.Stream; public class ConfigRepository { private final ConfigPersistence persistence; + private final ConfigPersistence secretsPersistence; + public ConfigRepository(final ConfigPersistence persistence, final ConfigPersistence secretsPersistence) { + this.persistence = persistence; + this.secretsPersistence = secretsPersistence; + } + + /** + * This should only be used for backward compatibility where a secret store isn't yet in use. + */ public ConfigRepository(final ConfigPersistence persistence) { this.persistence = persistence; + this.secretsPersistence = persistence; } public StandardWorkspace getStandardWorkspace(final UUID workspaceId, final boolean includeTombstone) @@ -168,7 +180,7 @@ public void writeStandardDestinationDefinition(final StandardDestinationDefiniti } public SourceConnection getSourceConnection(final UUID sourceId) throws JsonValidationException, IOException, ConfigNotFoundException { - return persistence.getConfig(ConfigSchema.SOURCE_CONNECTION, sourceId.toString(), SourceConnection.class); + return secretsPersistence.getConfig(ConfigSchema.SOURCE_CONNECTION, sourceId.toString(), SourceConnection.class); } public void writeSourceConnection(final SourceConnection source, final ConnectorSpecification connectorSpecification) @@ -177,16 +189,16 @@ public void writeSourceConnection(final SourceConnection source, final Connector final JsonSchemaValidator validator = new JsonSchemaValidator(); validator.ensure(connectorSpecification.getConnectionSpecification(), source.getConfiguration()); - persistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, source.getSourceId().toString(), source); + secretsPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, source.getSourceId().toString(), source); } public List listSourceConnection() throws JsonValidationException, IOException { - return persistence.listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class); + return secretsPersistence.listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class); } public DestinationConnection getDestinationConnection(final UUID destinationId) throws JsonValidationException, IOException, ConfigNotFoundException { - return persistence.getConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId.toString(), DestinationConnection.class); + return secretsPersistence.getConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId.toString(), DestinationConnection.class); } public void writeDestinationConnection(final DestinationConnection destinationConnection, final ConnectorSpecification connectorSpecification) @@ -195,11 +207,11 @@ public void writeDestinationConnection(final DestinationConnection destinationCo final JsonSchemaValidator validator = new JsonSchemaValidator(); validator.ensure(connectorSpecification.getConnectionSpecification(), destinationConnection.getConfiguration()); - persistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationConnection.getDestinationId().toString(), destinationConnection); + secretsPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationConnection.getDestinationId().toString(), destinationConnection); } public List listDestinationConnection() throws JsonValidationException, IOException { - return persistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class); + return secretsPersistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class); } public StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, IOException, ConfigNotFoundException { @@ -277,15 +289,58 @@ public List listDestinationOAuthParam() throws JsonVa } public void replaceAllConfigs(final Map> configs, final boolean dryRun) throws IOException { - persistence.replaceAllConfigs(configs, dryRun); + // if we're using a single persistence layer, this is the easy route. + if (persistence == secretsPersistence) { + persistence.replaceAllConfigs(configs, dryRun); + return; + } + + // Or if we're using secrets storage, split into sets by where they should be stored. + Map> secretConfigs = new LinkedHashMap>(); + Map> nonsecretConfigs = new LinkedHashMap>(); + for (AirbyteConfig configType : configs.keySet()) { + if (configType.isSecret()) { + secretConfigs.put(configType, configs.get(configType)); + } else { + nonsecretConfigs.put(configType, configs.get(configType)); + } + } + // And store each type in its own persistence store. + persistence.replaceAllConfigs(nonsecretConfigs, dryRun); + secretsPersistence.replaceAllConfigs(secretConfigs, dryRun); } public Map> dumpConfigs() throws IOException { - return persistence.dumpConfigs(); + Map> bothConfigTypes = persistence.dumpConfigs(); + // Secrets and nonsecrets might be stored separately, and if they are, go looking for the rest. + // No need to do that if they're in the same single store, because we don't need duplicates or + // double work. + if (secretsPersistence != persistence) { + bothConfigTypes.putAll(secretsPersistence.dumpConfigs()); + } + return bothConfigTypes; + } + + /** + * During migration, this runs through the list of connectors, determining their docker image and + * version, for purposes of skipping upgrading in-use definitions. + * + * @return + */ + public Set listDefinitionsInUseByConnectors() throws IOException { + // Once we have secrets coordinates as references instead of storing the whole config, this lookup + // will be simpler again. + Set definitionIds; + if (secretsPersistence != persistence) { + definitionIds = secretsPersistence.listDefinitionIdsInUseByConnectors(); + } else { + definitionIds = persistence.listDefinitionIdsInUseByConnectors(); + } + return persistence.getRepositoriesFromDefinitionIds(definitionIds); } - public void loadData(ConfigPersistence seedPersistence) throws IOException { - persistence.loadData(seedPersistence); + public void loadData(ConfigPersistence seedPersistence, Set connectorRepositoriesInUse) throws IOException { + persistence.loadData(seedPersistence, connectorRepositoriesInUse); } } diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java index 37e500eb797a9..261264bd35a51 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java @@ -100,13 +100,24 @@ public void migrateFileConfigs(Configs serverConfigs) throws IOException { } @Override - public void loadData(ConfigPersistence seedConfigPersistence) throws IOException { + public void loadData(ConfigPersistence seedConfigPersistence, Set connectorRepositoriesInUse) throws IOException { database.transaction(ctx -> { - updateConfigsFromSeed(ctx, seedConfigPersistence); + final boolean isInitialized = ctx.fetchExists(AIRBYTE_CONFIGS); + if (isInitialized) { + updateConfigsFromSeed(ctx, seedConfigPersistence, connectorRepositoriesInUse); + } else { + copyConfigsFromSeed(ctx, seedConfigPersistence); + } return null; }); } + /* + * public void loadData(ConfigPersistence seedConfigPersistence) throws IOException { + * database.transaction(ctx -> { updateConfigsFromSeed(ctx, seedConfigPersistence); return null; }); + * } + */ + public ValidatingConfigPersistence withValidation() { return new ValidatingConfigPersistence(this); } @@ -312,11 +323,10 @@ private ConnectorCounter(int newCount, int updateCount) { } @VisibleForTesting - void updateConfigsFromSeed(DSLContext ctx, ConfigPersistence seedConfigPersistence) throws SQLException { - LOGGER.info("Updating connector definitions from the seed if necessary..."); + void updateConfigsFromSeed(DSLContext ctx, ConfigPersistence seedConfigPersistence, Set connectorRepositoriesInUse) throws SQLException { + LOGGER.info("Config database has been initialized; updating connector definitions from the seed if necessary..."); try { - Set connectorRepositoriesInUse = getConnectorRepositoriesInUse(ctx); LOGGER.info("Connectors in use: {}", connectorRepositoriesInUse); Map connectorRepositoryToInfoMap = getConnectorRepositoryToInfoMap(ctx); @@ -474,31 +484,50 @@ Map getConnectorRepositoryToInfoMap(DSLContext ctx) { * connectors can be added manually by users, and their config ids are not always the same * as those in the seed. */ - private Set getConnectorRepositoriesInUse(DSLContext ctx) { + public Set listDefinitionIdsInUseByConnectors() throws IOException { Set usedConnectorDefinitionIds = new HashSet<>(); - // query for used source definitions - usedConnectorDefinitionIds.addAll(ctx - .select(field("config_blob ->> 'sourceDefinitionId'", SQLDataType.VARCHAR)) - .from(AIRBYTE_CONFIGS) - .where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(ConfigSchema.SOURCE_CONNECTION.name())) - .fetch().stream() - .flatMap(row -> Stream.of(row.value1())) - .collect(Collectors.toSet())); - // query for used destination definitions - usedConnectorDefinitionIds.addAll(ctx - .select(field("config_blob ->> 'destinationDefinitionId'", SQLDataType.VARCHAR)) - .from(AIRBYTE_CONFIGS) - .where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(ConfigSchema.DESTINATION_CONNECTION.name())) - .fetch().stream() - .flatMap(row -> Stream.of(row.value1())) - .collect(Collectors.toSet())); + database.transaction(ctx -> { + // query for used source definitions + usedConnectorDefinitionIds.addAll(ctx + .select(field("config_blob ->> 'sourceDefinitionId'", SQLDataType.VARCHAR)) + .from(AIRBYTE_CONFIGS) + .where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(ConfigSchema.SOURCE_CONNECTION.name())) + .fetch().stream() + .flatMap(row -> Stream.of(row.value1())) + .collect(Collectors.toSet())); + // query for used destination definitions + usedConnectorDefinitionIds.addAll(ctx + .select(field("config_blob ->> 'destinationDefinitionId'", SQLDataType.VARCHAR)) + .from(AIRBYTE_CONFIGS) + .where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(ConfigSchema.DESTINATION_CONNECTION.name())) + .fetch().stream() + .flatMap(row -> Stream.of(row.value1())) + .collect(Collectors.toSet())); + return null; + }); + return usedConnectorDefinitionIds; + } + + public Set getRepositoriesFromDefinitionIds(Set usedConnectorDefinitionIds) throws IOException { + final Set reps = new HashSet(); + // reps.addAll(ctx.select(field("config_blob ->> 'dockerRepository'", SQLDataType.VARCHAR)) + database.transaction(ctx -> { + System.out.println(ctx.select(field("config_blob ->> 'dockerRepository'", SQLDataType.VARCHAR)) + .from(AIRBYTE_CONFIGS).fetch()); + return null; + }); + + database.transaction(ctx -> { + reps.addAll(ctx.select(field("config_blob ->> 'dockerRepository'", SQLDataType.VARCHAR)) + .from(AIRBYTE_CONFIGS) + .where(AIRBYTE_CONFIGS.CONFIG_ID.in(usedConnectorDefinitionIds)) + .fetch().stream() + .map(Record1::value1) + .collect(Collectors.toSet())); + return null; + }); + return reps; - return ctx.select(field("config_blob ->> 'dockerRepository'", SQLDataType.VARCHAR)) - .from(AIRBYTE_CONFIGS) - .where(AIRBYTE_CONFIGS.CONFIG_ID.in(usedConnectorDefinitionIds)) - .fetch().stream() - .map(Record1::value1) - .collect(Collectors.toSet()); } } diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java index 9c95b4f8da204..11fb8651d1b7b 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java @@ -29,6 +29,9 @@ import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; import io.airbyte.config.AirbyteConfig; +import io.airbyte.config.ConfigSchema; +import io.airbyte.config.DestinationConnection; +import io.airbyte.config.SourceConnection; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.io.UnsupportedEncodingException; @@ -36,8 +39,10 @@ import java.nio.file.Path; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -91,6 +96,33 @@ public T getConfig(final AirbyteConfig configType, final String configId, fi } } + @Override + public Set getRepositoriesFromDefinitionIds(Set usedConnectorDefinitionIds) throws IOException { + throw new UnsupportedOperationException("Migrate to a database persistence before trying to use the external secrets store."); + } + + @Override + public Set listDefinitionIdsInUseByConnectors() throws IOException { + Set definitionIds = new HashSet(); + try { + { + List sources = listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class); + for (SourceConnection source : sources) { + definitionIds.add(source.getSourceDefinitionId().toString()); + } + } + { + List destinations = listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class); + for (DestinationConnection dest : destinations) { + definitionIds.add(dest.getDestinationDefinitionId().toString()); + } + } + return definitionIds; + } catch (JsonValidationException e) { + throw new RuntimeException(e); + } + } + @Override public List listConfigs(AirbyteConfig configType, Class clazz) throws JsonValidationException, IOException { synchronized (lock) { @@ -200,7 +232,7 @@ public void replaceAllConfigs(Map> configs, boolean dry } @Override - public void loadData(ConfigPersistence seedPersistence) throws IOException { + public void loadData(ConfigPersistence seedPersistence, Set connectorRepositoriesInUse) throws IOException { throw new UnsupportedEncodingException("This method is not supported in this implementation"); } diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/GoogleSecretsManager.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/GoogleSecretsManager.java index 3ec017db15a0a..d10e009c49ec2 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/GoogleSecretsManager.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/GoogleSecretsManager.java @@ -101,10 +101,7 @@ public static boolean existsSecret(String secretId) throws IOException { EnvConfigs envConfig = new EnvConfigs(); String projectId = envConfig.getSecretStoreGcpProjectId(); try (SecretManagerServiceClient client = getSecretManagerServiceClient()) { - System.out.println("Project ID: " + projectId); - System.out.println("Secret ID: " + secretId); SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, "latest"); - System.out.println(secretVersionName); AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); return true; } catch (com.google.api.gax.rpc.NotFoundException e) { diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/GoogleSecretsManagerConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/GoogleSecretsManagerConfigPersistence.java index 389babafce29d..f0bf09e2c66d5 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/GoogleSecretsManagerConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/GoogleSecretsManagerConfigPersistence.java @@ -48,11 +48,7 @@ public String getVersion() { return "secrets-v1"; } - // @Override - public void loadData(ConfigPersistence seedPersistence) throws IOException { - loadData(seedPersistence, new HashSet()); - } - + @Override public void loadData(ConfigPersistence seedPersistence, Set configsInUse) throws IOException { // Don't need to do anything because the seed persistence only contains // non-secret configs, which we don't load into the secrets store. @@ -80,7 +76,7 @@ public Set listDefinitionIdsInUseByConnectors() { } } - // @Override + @Override public Set getRepositoriesFromDefinitionIds(Set usedConnectorDefinitionIds) throws IOException { throw new UnsupportedOperationException( "Secrets Manager does not store the list of definitions and thus cannot be used to look up docker repositories."); @@ -116,7 +112,6 @@ public List listConfigs(AirbyteConfig configType, Class clazz) throws @Override public void writeConfig(AirbyteConfig configType, String configId, T config) throws JsonValidationException, IOException { String keyName = generateKeyNameFromType(configType, configId); - System.out.println("keyname " + keyName); GoogleSecretsManager.saveSecret(keyName, Jsons.serialize(config)); } diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java index 145e417626a04..0d38c3cde4812 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java @@ -27,11 +27,16 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; import io.airbyte.config.AirbyteConfig; +import io.airbyte.config.ConfigSchema; +import io.airbyte.config.DestinationConnection; +import io.airbyte.config.SourceConnection; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Stream; // we force all interaction with disk storage to be effectively single threaded. @@ -66,6 +71,28 @@ public List listConfigs(AirbyteConfig configType, Class clazz) throws return configs; } + @Override + public Set listDefinitionIdsInUseByConnectors() throws IOException { + Set definitionIds = new HashSet(); + try { + { + List sources = listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class); + for (SourceConnection source : sources) { + definitionIds.add(source.getSourceDefinitionId().toString()); + } + } + { + List destinations = listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class); + for (DestinationConnection dest : destinations) { + definitionIds.add(dest.getDestinationDefinitionId().toString()); + } + } + return definitionIds; + } catch (JsonValidationException e) { + throw new RuntimeException(e); + } + } + @Override public void writeConfig(AirbyteConfig configType, String configId, T config) throws JsonValidationException, IOException { validateJson(Jsons.jsonNode(config), configType); @@ -89,8 +116,13 @@ public Map> dumpConfigs() throws IOException { } @Override - public void loadData(ConfigPersistence seedPersistence) throws IOException { - decoratedPersistence.loadData(seedPersistence); + public void loadData(ConfigPersistence seedPersistence, Set connectorRepositoriesInUse) throws IOException { + decoratedPersistence.loadData(seedPersistence, connectorRepositoriesInUse); + } + + @Override + public Set getRepositoriesFromDefinitionIds(Set usedConnectorDefinitionIds) throws IOException { + return decoratedPersistence.getRepositoriesFromDefinitionIds(usedConnectorDefinitionIds); } private void validateJson(T config, AirbyteConfig configType) throws JsonValidationException { diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java index 8bbff9c06215e..8c623d5c51ed9 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java @@ -32,13 +32,17 @@ import io.airbyte.commons.yaml.Yamls; import io.airbyte.config.AirbyteConfig; import io.airbyte.config.ConfigSchema; +import io.airbyte.config.DestinationConnection; +import io.airbyte.config.SourceConnection; import io.airbyte.config.init.SeedType; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -53,6 +57,7 @@ public class YamlSeedConfigPersistence implements ConfigPersistence { ConfigSchema.STANDARD_DESTINATION_DEFINITION, SeedType.STANDARD_DESTINATION_DEFINITION); private static final YamlSeedConfigPersistence INSTANCE; + static { try { INSTANCE = new YamlSeedConfigPersistence(); @@ -85,6 +90,12 @@ private static Map getConfigs(SeedType seedType) throws IOExce json -> json)); } + @Override + public Set getRepositoriesFromDefinitionIds(Set usedConnectorDefinitionIds) throws IOException { + throw new UnsupportedOperationException( + "This should never be needed as seed persistence isn't the source of truth for definitions in use by connectors."); + } + @Override public T getConfig(AirbyteConfig configType, String configId, Class clazz) throws ConfigNotFoundException, JsonValidationException, IOException { @@ -99,6 +110,24 @@ public T getConfig(AirbyteConfig configType, String configId, Class clazz return Jsons.object(config, clazz); } + @Override + public Set listDefinitionIdsInUseByConnectors() throws IOException { + Set definitionIds = new HashSet(); + { + List sources = listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class); + for (SourceConnection source : sources) { + definitionIds.add(source.getSourceDefinitionId().toString()); + } + } + { + List destinations = listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class); + for (DestinationConnection dest : destinations) { + definitionIds.add(dest.getDestinationDefinitionId().toString()); + } + } + return definitionIds; + } + @Override public List listConfigs(AirbyteConfig configType, Class clazz) { final Map configs = allSeedConfigs.get(CONFIG_SCHEMA_MAP.get(configType)); @@ -131,7 +160,7 @@ public Map> dumpConfigs() { } @Override - public void loadData(ConfigPersistence seedPersistence) throws IOException { + public void loadData(ConfigPersistence seedPersistence, Set connectorRepositoriesInUse) throws IOException { throw new UnsupportedOperationException("The seed config persistence is read only."); } diff --git a/airbyte-config/persistence/src/test-integration/java/io/airbyte/config/persistence/ConfigRepositoryIntegrationTest.java b/airbyte-config/persistence/src/test-integration/java/io/airbyte/config/persistence/ConfigRepositoryIntegrationTest.java new file mode 100644 index 0000000000000..f168bc955ed4f --- /dev/null +++ b/airbyte-config/persistence/src/test-integration/java/io/airbyte/config/persistence/ConfigRepositoryIntegrationTest.java @@ -0,0 +1,213 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.config.persistence; + +import static org.junit.jupiter.api.Assertions.*; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.AirbyteConfig; +import io.airbyte.config.ConfigSchema; +import io.airbyte.config.DestinationConnection; +import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; +import io.airbyte.config.SourceConnection; +import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.config.StandardSync; +import io.airbyte.config.StandardSyncOperation; +import io.airbyte.config.StandardWorkspace; +import io.airbyte.validation.json.JsonValidationException; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Stream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class ConfigRepositoryIntegrationTest { + + private static final UUID WORKSPACE_ID = UUID.randomUUID(); + private static final UUID SOURCE_DEFINITION_ID = UUID.randomUUID(); + private static final UUID SOURCE_DEFINITION_ID2 = UUID.randomUUID(); + private static final UUID SOURCE_CONNECTION_ID = UUID.randomUUID(); + private static final UUID SOURCE_CONNECTION_ID2 = UUID.randomUUID(); + private static final UUID DEST_DEFINITION_ID = UUID.randomUUID(); + private static final UUID DESTINATION_CONNECTION_ID = UUID.randomUUID(); + + private ConfigPersistence configPersistence; + private ConfigPersistence secretsConfigPersistence; + private ConfigRepository configRepository; + + public static final UUID UUID_1 = new UUID(0, 1); + public static final StandardSourceDefinition SOURCE_DEFINITION_1 = new StandardSourceDefinition(); + + static { + SOURCE_DEFINITION_1.withSourceDefinitionId(UUID_1).withName("mysql"); + } + private static final SourceConnection SOURCE_CONNECTION = new SourceConnection() + .withSourceId(SOURCE_CONNECTION_ID) + .withSourceDefinitionId(SOURCE_DEFINITION_ID) + .withWorkspaceId(WORKSPACE_ID) + .withConfiguration(Jsons.deserialize("{\"somefield\":\"secretvalue\"}")) + .withName("source") + .withTombstone(false); + + public static final UUID UUID_2 = new UUID(0, 2); + public static final StandardSourceDefinition SOURCE_DEFINITION_2 = new StandardSourceDefinition(); + private static final Path TEST_ROOT = Path.of("/tmp/airbyte_tests"); + + static { + SOURCE_DEFINITION_2.withSourceDefinitionId(UUID_2).withName("mssql"); + } + private static final SourceConnection SOURCE_CONNECTION_2 = new SourceConnection() + .withSourceId(SOURCE_CONNECTION_ID2) + .withSourceDefinitionId(SOURCE_DEFINITION_ID2) + .withWorkspaceId(WORKSPACE_ID) + .withConfiguration(Jsons.deserialize("{\"somefield\":\"secretvalue\"}")) + .withName("source") + .withTombstone(false); + + public static final UUID UUID_3 = new UUID(0, 3); + public static final StandardDestinationDefinition DEST_3 = new StandardDestinationDefinition(); + static { + DEST_3.withDestinationDefinitionId(UUID_3).withName("postgresql"); + } + private static final DestinationConnection DESTINATION_CONNECTION = new DestinationConnection() + .withDestinationId(DESTINATION_CONNECTION_ID) + .withDestinationDefinitionId(DEST_DEFINITION_ID) + .withWorkspaceId(WORKSPACE_ID) + .withConfiguration(Jsons.deserialize("{\"somefield\":\"secretvalue\"}")) + .withName("dest") + .withTombstone(false); + + @BeforeEach + void setup() throws IOException { + Path p = Files.createTempDirectory(Files.createDirectories(TEST_ROOT), this.getClass().getName()); + Files.createDirectories(new File(p.toAbsolutePath() + "/config").toPath()); + + configPersistence = new FileSystemConfigPersistence(p); + secretsConfigPersistence = new GoogleSecretsManagerConfigPersistence(); + configRepository = new ConfigRepository(configPersistence, secretsConfigPersistence); + } + + @AfterEach + void tearDown() throws IOException { + // Delete all the secrets we stored for testing in this temporary workspace. + String prefix = ((GoogleSecretsManagerConfigPersistence) secretsConfigPersistence).getVersion() + "-"; + List names = GoogleSecretsManager.listSecretsMatching(prefix); + for (String name : names) { + GoogleSecretsManager.deleteSecret(name); + } + } + + // Make sure we can add configs and then replace them with updated ones, and the updates take. + @Test + void replaceAllConfigsSeeUpdates() throws IOException, JsonValidationException, ConfigNotFoundException { + Map> map = new LinkedHashMap<>(); + map.put(ConfigSchema.SOURCE_CONNECTION, Stream.of(SOURCE_CONNECTION, SOURCE_CONNECTION_2)); + map.put(ConfigSchema.DESTINATION_CONNECTION, Stream.of(DESTINATION_CONNECTION)); + configRepository.replaceAllConfigs(map, true); + + map.put(ConfigSchema.SOURCE_CONNECTION, Stream.of(SOURCE_CONNECTION, SOURCE_CONNECTION_2)); + map.put(ConfigSchema.DESTINATION_CONNECTION, Stream.of(DESTINATION_CONNECTION)); + configRepository.replaceAllConfigs(map, false); + + assertNotNull(configRepository.getSourceConnection(SOURCE_CONNECTION_ID)); + assertNotNull(configRepository.getSourceConnection(SOURCE_CONNECTION_ID2)); + assertNotNull(configRepository.getDestinationConnection(DESTINATION_CONNECTION_ID)); + } + + @Test + void replaceAllConfigsEmptySet() throws IOException { + // Make sure we can call replaceAll on an empty set and it doesn't crash. + Map> map = new LinkedHashMap<>(); + configRepository.replaceAllConfigs(map, true); + configRepository.replaceAllConfigs(map, false); + } + + @Test + void replaceAllConfigsSecretsLocation() throws IOException { + // Make sure that secrets are written out to the correct secret store and don't show up in the wrong + // one. + + Map> map = new LinkedHashMap<>(); + map.put(ConfigSchema.SOURCE_CONNECTION, Stream.of(SOURCE_CONNECTION, SOURCE_CONNECTION_2)); + map.put(ConfigSchema.DESTINATION_CONNECTION, Stream.of(DESTINATION_CONNECTION)); + configRepository.replaceAllConfigs(map, false); + + Map> nonsecret = configPersistence.dumpConfigs(); + assert (nonsecret.size() == 0); + + Map> secret = secretsConfigPersistence.dumpConfigs(); + assert (secret.size() == 2); + } + + @Test + void dumpConfigs() throws IOException { + // 1. Make sure we can store some configs and then do a dump and they're all included. + // 2. Make sure we can see both of configs from a non-secret store and configs from a secret store. + Map> map = new LinkedHashMap<>(); + + // These should land in the secrets store, so they're the core of testing. + map.put(ConfigSchema.SOURCE_CONNECTION, Stream.of(SOURCE_CONNECTION, SOURCE_CONNECTION_2)); + map.put(ConfigSchema.DESTINATION_CONNECTION, Stream.of(DESTINATION_CONNECTION)); + + // We're checking serialization for all the rest of these, to make sure nothing explodes during + // storage and they are retained. + map.put(ConfigSchema.STANDARD_WORKSPACE, Stream.of(new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withCustomerId(UUID.randomUUID()))); + map.put(ConfigSchema.STANDARD_SOURCE_DEFINITION, Stream.of(new StandardSourceDefinition().withSourceDefinitionId(UUID.randomUUID()))); + map.put(ConfigSchema.STANDARD_DESTINATION_DEFINITION, + Stream.of(new StandardDestinationDefinition().withDestinationDefinitionId(UUID.randomUUID()))); + map.put(ConfigSchema.STANDARD_SYNC, Stream.of(new StandardSync() + .withName("sync") + .withNamespaceDefinition(NamespaceDefinitionType.SOURCE) + .withNamespaceFormat(null) + .withPrefix("sync") + .withConnectionId(SOURCE_CONNECTION_ID) + .withSourceId(UUID.randomUUID()) + .withDestinationId(UUID.randomUUID()) + .withOperationIds(List.of(UUID.randomUUID())))); + map.put(ConfigSchema.STANDARD_SYNC_OPERATION, Stream.of(new StandardSyncOperation().withOperationId(UUID.randomUUID()))); + + // Now retrieve what we stored and make sure we got everything. + configRepository.replaceAllConfigs(map, false); + + Map> configs = configRepository.dumpConfigs(); + assertNotNull(configs.get(ConfigSchema.SOURCE_CONNECTION.name())); + assertNotNull(configs.get(ConfigSchema.DESTINATION_CONNECTION.name())); + assertNotNull(configs.get(ConfigSchema.STANDARD_WORKSPACE.name())); + assertNotNull(configs.get(ConfigSchema.STANDARD_SOURCE_DEFINITION.name())); + assertNotNull(configs.get(ConfigSchema.STANDARD_DESTINATION_DEFINITION.name())); + assertNotNull(configs.get(ConfigSchema.STANDARD_SYNC.name())); + assertNotNull(configs.get(ConfigSchema.STANDARD_SYNC_OPERATION.name())); + } + +} diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java index f0136ffacf19e..4328feb55022d 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java @@ -46,7 +46,7 @@ class ConfigRepositoryTest { @BeforeEach void setup() { configPersistence = mock(ConfigPersistence.class); - configRepository = new ConfigRepository(configPersistence); + configRepository = new ConfigRepository(configPersistence, configPersistence); } @Test diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java index acbd68d04cb22..057b74793e8a1 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java @@ -26,13 +26,16 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Lists; +import io.airbyte.commons.json.Jsons; import io.airbyte.config.ConfigSchema; import io.airbyte.config.DestinationConnection; import io.airbyte.config.SourceConnection; @@ -40,7 +43,11 @@ import io.airbyte.config.StandardSourceDefinition; import io.airbyte.db.instance.configs.ConfigsDatabaseInstance; import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.stream.Stream; import org.jooq.DSLContext; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -52,7 +59,7 @@ import org.junit.jupiter.api.TestMethodOrder; /** - * Unit test for the {@link DatabaseConfigPersistence#loadData} method. + * Unit test for the {@link ConfigPersistence#loadData} method. */ @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class DatabaseConfigPersistenceLoadDataTest extends BaseDatabaseConfigPersistenceTest { @@ -79,20 +86,40 @@ public void resetPersistence() { @Test @Order(1) - @DisplayName("When database is empty, configs should be inserted") + @DisplayName("When database is empty, seed should be copied to the database") + public void testCopyConfigsToEmptyDatabase() throws Exception { + Map> initialSeeds = Map.of( + ConfigSchema.STANDARD_DESTINATION_DEFINITION.name(), Stream.of(Jsons.jsonNode(DESTINATION_SNOWFLAKE)), + ConfigSchema.STANDARD_SOURCE_DEFINITION.name(), Stream.of(Jsons.jsonNode(SOURCE_GITHUB))); + when(seedPersistence.dumpConfigs()).thenReturn(initialSeeds); + + Set dockerRepoIds = configPersistence.getRepositoriesFromDefinitionIds(configPersistence.listDefinitionIdsInUseByConnectors()); + configPersistence.loadData(seedPersistence, dockerRepoIds); + assertRecordCount(2); + assertHasSource(SOURCE_GITHUB); + assertHasDestination(DESTINATION_SNOWFLAKE); + verify(configPersistence, times(1)).copyConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class)); + verify(configPersistence, never()).updateConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class), any(HashSet.class)); + } + + @Test + @Order(2) + @DisplayName("When database is not empty, configs should be updated") public void testUpdateConfigsInNonEmptyDatabase() throws Exception { when(seedPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)) .thenReturn(Lists.newArrayList(SOURCE_GITHUB)); when(seedPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class)) .thenReturn(Lists.newArrayList(DESTINATION_S3, DESTINATION_SNOWFLAKE)); - configPersistence.loadData(seedPersistence); + Set dockerRepoIds = configPersistence.getRepositoriesFromDefinitionIds(configPersistence.listDefinitionIdsInUseByConnectors()); + configPersistence.loadData(seedPersistence, dockerRepoIds); // the new destination is added assertRecordCount(3); assertHasDestination(DESTINATION_SNOWFLAKE); - verify(configPersistence, times(1)).updateConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class)); + verify(configPersistence, never()).copyConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class)); + verify(configPersistence, times(1)).updateConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class), any(HashSet.class)); } @Test @@ -121,7 +148,9 @@ public void testNoUpdateForUsedConnector() throws Exception { .withSourceDefinitionId(sourceGithubV2.getSourceDefinitionId()); configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, githubConnection.getSourceId().toString(), githubConnection); - configPersistence.loadData(seedPersistence); + Set dockerRepoIds = configPersistence.getRepositoriesFromDefinitionIds(configPersistence.listDefinitionIdsInUseByConnectors()); + configPersistence.loadData(seedPersistence, dockerRepoIds); + // s3 destination is not updated assertHasDestination(DESTINATION_S3); assertHasSource(SOURCE_GITHUB); @@ -138,7 +167,8 @@ public void testUpdateForUnusedConnector() throws Exception { when(seedPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class)) .thenReturn(Collections.singletonList(snowflakeV2)); - configPersistence.loadData(seedPersistence); + Set dockerRepoIds = configPersistence.getRepositoriesFromDefinitionIds(configPersistence.listDefinitionIdsInUseByConnectors()); + configPersistence.loadData(seedPersistence, dockerRepoIds); assertHasDestination(snowflakeV2); } diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceMigrateFileConfigsTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceMigrateFileConfigsTest.java index 9e4faebb8cbd6..5b934c085e4da 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceMigrateFileConfigsTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceMigrateFileConfigsTest.java @@ -37,6 +37,7 @@ import io.airbyte.db.instance.configs.ConfigsDatabaseInstance; import java.nio.file.Files; import java.nio.file.Path; +import java.util.HashSet; import java.util.UUID; import org.jooq.DSLContext; import org.junit.jupiter.api.AfterAll; @@ -86,7 +87,7 @@ public void testNewDeployment() throws Exception { assertRecordCount(0); verify(configPersistence, never()).copyConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class)); - verify(configPersistence, never()).updateConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class)); + verify(configPersistence, never()).updateConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class), any(HashSet.class)); } @Test @@ -115,7 +116,7 @@ public void testUpdateDeployment() throws Exception { assertHasSource(SOURCE_GITHUB); verify(configPersistence, never()).copyConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class)); - verify(configPersistence, never()).updateConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class)); + verify(configPersistence, never()).updateConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class), any(HashSet.class)); } private void prepareLocalFilePersistence() throws Exception { diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java index 91b61ef2a5e67..00294680a154d 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java @@ -50,9 +50,9 @@ import org.junit.jupiter.api.Test; /** - * See {@link DatabaseConfigPersistenceLoadDataTest}, - * {@link DatabaseConfigPersistenceMigrateFileConfigsTest}, and - * {@link DatabaseConfigPersistenceUpdateConnectorDefinitionsTest} for testing of specific methods. + * The {@link ConfigPersistence#loadData} method is tested in + * {@link DatabaseConfigPersistenceLoadDataTest}. See {@link DatabaseConfigPersistenceLoadDataTest} + * and {@link DatabaseConfigPersistenceMigrateFileConfigsTest} for testing of specific methods. */ public class DatabaseConfigPersistenceTest extends BaseDatabaseConfigPersistenceTest { diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index 1aa0ece764752..7baf308b8cec2 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -36,9 +36,9 @@ import io.airbyte.config.Configs; import io.airbyte.config.EnvConfigs; import io.airbyte.config.helpers.LogClientSingleton; -import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.DatabaseConfigPersistence; +import io.airbyte.config.persistence.GoogleSecretsManagerConfigPersistence; import io.airbyte.db.Database; import io.airbyte.db.instance.configs.ConfigsDatabaseInstance; import io.airbyte.db.instance.jobs.JobsDatabaseInstance; @@ -203,8 +203,9 @@ public static void main(String[] args) throws IOException, InterruptedException configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl()) .getInitialized(); - final ConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase).withValidation(); - final ConfigRepository configRepository = new ConfigRepository(configPersistence); + final ConfigRepository configRepository = configs.getSecretStoreForConfigs() != null && configs.getSecretStoreForConfigs().equalsIgnoreCase("gcp") + ? new ConfigRepository(new DatabaseConfigPersistence(configDatabase).withValidation(), new GoogleSecretsManagerConfigPersistence()) + : new ConfigRepository(new DatabaseConfigPersistence(configDatabase).withValidation()); final JobCleaner jobCleaner = new JobCleaner( configs.getWorkspaceRetentionConfig(), workspaceRoot, diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/WorkspaceHelperTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/WorkspaceHelperTest.java index 67bdbcc438779..bb44922a81469 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/WorkspaceHelperTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/WorkspaceHelperTest.java @@ -40,6 +40,7 @@ import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.FileSystemConfigPersistence; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; @@ -103,7 +104,8 @@ class WorkspaceHelperTest { public void setup() throws IOException { tmpDir = Files.createTempDirectory("workspace_helper_test_" + RandomStringUtils.randomAlphabetic(5)); - configRepository = new ConfigRepository(new FileSystemConfigPersistence(tmpDir)); + ConfigPersistence filePersistence = new FileSystemConfigPersistence(tmpDir); + configRepository = new ConfigRepository(filePersistence, filePersistence); jobPersistence = mock(JobPersistence.class); workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence); diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java index 3039b399d55e8..ed32906966326 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java @@ -148,7 +148,8 @@ public void importDataWithSeed(String targetVersion, File archive, ConfigPersist // 4. Import Configs and update connector definitions importConfigsFromArchive(sourceRoot, false); - configRepository.loadData(seedPersistence); + + configRepository.loadData(seedPersistence, configRepository.listDefinitionsInUseByConnectors()); // 5. Set DB version LOGGER.info("Setting the DB Airbyte version to : " + targetVersion); diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 516b6bbcf7571..bfc044b8c367a 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -35,6 +35,7 @@ import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.DatabaseConfigPersistence; +import io.airbyte.config.persistence.GoogleSecretsManagerConfigPersistence; import io.airbyte.config.persistence.YamlSeedConfigPersistence; import io.airbyte.db.Database; import io.airbyte.db.instance.DatabaseMigrator; @@ -181,8 +182,12 @@ public static ServerRunnable getServer(final ServerFactory apiFactory) throws Ex configs.getConfigDatabaseUrl()) .getAndInitialize(); final DatabaseConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase); + final ConfigRepository configRepository = configs.getSecretStoreForConfigs() != null && configs.getSecretStoreForConfigs().equalsIgnoreCase("gcp") + ? new ConfigRepository(new DatabaseConfigPersistence(configDatabase).withValidation(), + new GoogleSecretsManagerConfigPersistence()) + : new ConfigRepository(new DatabaseConfigPersistence(configDatabase).withValidation()); + configPersistence.migrateFileConfigs(configs); - final ConfigRepository configRepository = new ConfigRepository(configPersistence.withValidation()); LOGGER.info("Creating Scheduler persistence..."); final Database jobDatabase = new JobsDatabaseInstance( @@ -242,7 +247,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory) throws Ex LOGGER.info("Starting server..."); runFlywayMigration(configs, configDatabase, jobDatabase); - configPersistence.loadData(YamlSeedConfigPersistence.get()); + configPersistence.loadData(YamlSeedConfigPersistence.get(), configRepository.listDefinitionsInUseByConnectors()); return apiFactory.create( schedulerJobClient, diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java index 500c369dbe784..d83da8eac8d97 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java @@ -42,12 +42,14 @@ import io.airbyte.commons.string.Strings; import io.airbyte.config.ConfigSchema; import io.airbyte.config.DestinationConnection; +import io.airbyte.config.EnvConfigs; import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardWorkspace; import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.DatabaseConfigPersistence; +import io.airbyte.config.persistence.FileSystemConfigPersistence; import io.airbyte.config.persistence.YamlSeedConfigPersistence; import io.airbyte.db.Database; import io.airbyte.db.instance.configs.ConfigsDatabaseInstance; @@ -63,6 +65,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -81,6 +84,7 @@ public class ArchiveHandlerTest { + private static final Path TEST_ROOT = Path.of("/tmp/airbyte_tests"); private static final Logger LOGGER = LoggerFactory.getLogger(ArchiveHandlerTest.class); private static final String VERSION = "0.6.8"; @@ -89,10 +93,13 @@ public class ArchiveHandlerTest { private Database database; private JobPersistence jobPersistence; private DatabaseConfigPersistence configPersistence; + private FileSystemConfigPersistence secretsPersistence; private ConfigPersistence seedPersistence; private ConfigRepository configRepository; private ArchiveHandler archiveHandler; + private EnvConfigs configs; + private ConnectorSpecification emptyConnectorSpec; private static class NoOpFileTtlManager extends FileTtlManager { @@ -120,19 +127,24 @@ public static void dbDown() { @BeforeEach public void setup() throws Exception { + Path p = Files.createTempDirectory(Files.createDirectories(TEST_ROOT), this.getClass().getName()); + Files.createDirectories(new File(p.toAbsolutePath() + "/config").toPath()); + database = new JobsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); jobPersistence = new DefaultJobPersistence(database); database = new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); seedPersistence = YamlSeedConfigPersistence.get(); configPersistence = new DatabaseConfigPersistence(database); configPersistence.replaceAllConfigs(Collections.emptyMap(), false); - configPersistence.loadData(seedPersistence); - configRepository = new ConfigRepository(configPersistence); + configPersistence.loadData(seedPersistence, new HashSet()); + secretsPersistence = new FileSystemConfigPersistence(p); + configRepository = new ConfigRepository(configPersistence, secretsPersistence); + // configRepository = new ConfigRepository(configPersistence, configPersistence); jobPersistence.setVersion(VERSION); final SpecFetcher specFetcher = mock(SpecFetcher.class); - final ConnectorSpecification emptyConnectorSpec = mock(ConnectorSpecification.class); + emptyConnectorSpec = mock(ConnectorSpecification.class); when(emptyConnectorSpec.getConnectionSpecification()).thenReturn(Jsons.emptyObject()); when(specFetcher.execute(any())).thenReturn(emptyConnectorSpec); @@ -161,7 +173,7 @@ void testFullExportImportRoundTrip() throws Exception { File archive = archiveHandler.exportData(); // After deleting the configs, the dump becomes empty. - configPersistence.replaceAllConfigs(Collections.emptyMap(), false); + configRepository.replaceAllConfigs(Collections.emptyMap(), false); assertSameConfigDump(Collections.emptyMap(), configRepository.dumpConfigs()); // After importing the configs, the dump is restored. @@ -189,12 +201,12 @@ void testFullExportImportRoundTrip() throws Exception { .withTombstone(false); // Write source connection and an old source definition. - configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, sourceConnection.getSourceId().toString(), sourceConnection); + configRepository.writeSourceConnection(sourceConnection, emptyConnectorSpec); configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, sourceS3DefinitionId.toString(), sourceS3Definition); // Export, wipe, and import the configs. archive = archiveHandler.exportData(); - configPersistence.replaceAllConfigs(Collections.emptyMap(), false); + configRepository.replaceAllConfigs(Collections.emptyMap(), false); archiveHandler.importData(archive); // The version has not changed. @@ -223,11 +235,12 @@ void testLightWeightExportImportRoundTrip() throws Exception { FileUtils.copyFile(archive, secondArchive); // After deleting all the configs, the dump becomes empty. - configPersistence.replaceAllConfigs(Collections.emptyMap(), false); + configRepository.replaceAllConfigs(Collections.emptyMap(), false); + // configPersistence.replaceAllConfigs(Collections.emptyMap(), false); // JENNY assertSameConfigDump(Collections.emptyMap(), configRepository.dumpConfigs()); // Restore default seed data - configPersistence.loadData(seedPersistence); + configPersistence.loadData(seedPersistence, new HashSet()); assertSameConfigDump(seedPersistence.dumpConfigs(), configRepository.dumpConfigs()); setupWorkspaceData(workspaceId); @@ -280,8 +293,8 @@ void testLightWeightExportImportRoundTrip() throws Exception { // check that first workspace is unchanged even though modifications were made to second workspace // (that contains similar connections from importing the same archive) archive = archiveHandler.exportWorkspace(new WorkspaceIdRequestBody().workspaceId(workspaceId)); - configPersistence.replaceAllConfigs(Collections.emptyMap(), false); - configPersistence.loadData(seedPersistence); + configRepository.replaceAllConfigs(Collections.emptyMap(), false); + configPersistence.loadData(seedPersistence, new HashSet()); setupWorkspaceData(workspaceId); uploadRead = archiveHandler.uploadArchiveResource(archive); assertEquals(UploadRead.StatusEnum.SUCCEEDED, uploadRead.getStatus()); @@ -303,21 +316,21 @@ private void setupTestData(UUID workspaceId) throws JsonValidationException, IOE // Fill up with some configurations setupWorkspaceData(workspaceId); final UUID sourceid = UUID.randomUUID(); - configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, sourceid.toString(), new SourceConnection() + configRepository.writeSourceConnection(new SourceConnection() .withSourceId(sourceid) .withWorkspaceId(workspaceId) .withSourceDefinitionId(configRepository.listStandardSources().get(0).getSourceDefinitionId()) .withName("test-source") .withConfiguration(Jsons.emptyObject()) - .withTombstone(false)); + .withTombstone(false), emptyConnectorSpec); final UUID destinationId = UUID.randomUUID(); - configPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId.toString(), new DestinationConnection() + configRepository.writeDestinationConnection(new DestinationConnection() .withDestinationId(destinationId) .withWorkspaceId(workspaceId) .withDestinationDefinitionId(configRepository.listStandardDestinationDefinitions().get(0).getDestinationDefinitionId()) .withName("test-destination") .withConfiguration(Jsons.emptyObject()) - .withTombstone(false)); + .withTombstone(false), emptyConnectorSpec); } private void assertSameConfigDump(Map> expected, Map> actual) { diff --git a/tools/bin/acceptance_test_kube.sh b/tools/bin/acceptance_test_kube.sh index 9532af93616bc..1c8590058036d 100755 --- a/tools/bin/acceptance_test_kube.sh +++ b/tools/bin/acceptance_test_kube.sh @@ -17,10 +17,19 @@ kind load docker-image airbyte/db:dev --name chart-testing echo "Starting app..." +if [ -n "$CI" ]; then + # write out environment variables to the .env file in kube/overlays/dev-integration-test/.env + echo "SECRET_STORE_GCP_PROJECT_ID=${SECRET_STORE_GCP_PROJECT_ID}" >> kube/overlays/dev-integration-test/.env + echo "SECRET_STORE_FOR_CONFIGS=${SECRET_STORE_FOR_CONFIGS}" >> kube/overlays/dev-integration-test/.env + echo "SECRET_STORE_GCP_CREDENTIALS=${SECRET_STORE_GCP_CREDENTIALS}" >> kube/overlays/dev-integration-test/.env +fi + echo "Applying dev-integration-test manifests to kubernetes..." kubectl apply -k kube/overlays/dev-integration-test echo "Waiting for server and scheduler to be ready..." +# kubectl describe-pod and tail logs at some regular cadence +# test bash behavior of set -e with || and make sure we can see the describe pods output on failures. kubectl wait --for=condition=Available deployment/airbyte-server --timeout=300s || (kubectl describe pods && exit 1) kubectl wait --for=condition=Available deployment/airbyte-scheduler --timeout=300s || (kubectl describe pods && exit 1)