diff --git a/.env b/.env index 080af529b14e7..db1a7075928e4 100644 --- a/.env +++ b/.env @@ -94,7 +94,6 @@ WORKFLOW_FAILURE_RESTART_DELAY_SECONDS= ### FEATURE FLAGS ### AUTO_DISABLE_FAILING_CONNECTIONS=false -EXPOSE_SECRETS_IN_EXPORT=false FORCE_MIGRATE_SECRET_STORE=false ### MONITORING FLAGS ### diff --git a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java index 63f60dae025d6..5f71b0f725771 100644 --- a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java +++ b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java @@ -191,7 +191,6 @@ private static Database getConfigDatabase(final DSLContext dslContext) throws IO private static ConfigPersistence getConfigPersistence(final Database configDatabase) throws IOException { final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder() - .maskSecrets(true) .copySecrets(true) .build(); diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java index e9e122a6c1327..c5f89eef85505 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java @@ -170,7 +170,6 @@ void testBootloaderAppRunSecretMigration() throws Exception { final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder() .copySecrets(true) - .maskSecrets(true) .build(); try (val configsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES); diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index f97598dbacb7f..81eb74b4d6426 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -21,11 +21,6 @@ public boolean autoDisablesFailingConnections() { return Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS")); } - @Override - public boolean exposeSecretsInExport() { - return Boolean.parseBoolean(System.getenv("EXPOSE_SECRETS_IN_EXPORT")); - } - @Override public boolean forceSecretMigration() { return Boolean.parseBoolean(System.getenv("FORCE_MIGRATE_SECRET_STORE")); diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java index 0757f025f9369..eb3b765ceef80 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java @@ -12,8 +12,6 @@ public interface FeatureFlags { boolean autoDisablesFailingConnections(); - boolean exposeSecretsInExport(); - boolean forceSecretMigration(); boolean useStreamCapableState(); diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/split_secrets/JsonSecretsProcessor.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/split_secrets/JsonSecretsProcessor.java index f97ce914bb1a4..f89cc8966df35 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/split_secrets/JsonSecretsProcessor.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/split_secrets/JsonSecretsProcessor.java @@ -27,10 +27,7 @@ public class JsonSecretsProcessor { @Builder.Default - final private boolean maskSecrets = true; - - @Builder.Default - final private boolean copySecrets = true; + final private Boolean copySecrets = false; protected static final JsonSchemaValidator VALIDATOR = new JsonSchemaValidator(); @@ -54,18 +51,14 @@ public class JsonSecretsProcessor { * @param obj Object containing potentially secret fields */ public JsonNode prepareSecretsForOutput(final JsonNode obj, final JsonNode schema) { - if (maskSecrets) { - // todo (cgardens) this is not safe. should throw. - // if schema is an object and has a properties field - if (!isValidJsonSchema(schema)) { - log.error("The schema is not valid, the secret can't be hidden"); - return obj; - } - - return maskAllSecrets(obj, schema); + // todo (cgardens) this is not safe. should throw. + // if schema is an object and has a properties field + if (!isValidJsonSchema(schema)) { + log.error("The schema is not valid, the secret can't be hidden"); + return obj; } - return obj; + return maskAllSecrets(obj, schema); } /** diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/split_secrets/JsonSecretsProcessorTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/split_secrets/JsonSecretsProcessorTest.java index 8302226800a1c..a522e4db4b554 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/split_secrets/JsonSecretsProcessorTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/split_secrets/JsonSecretsProcessorTest.java @@ -198,7 +198,6 @@ class JsonSecretsProcessorTest { public void setup() { processor = JsonSecretsProcessor.builder() .copySecrets(true) - .maskSecrets(true) .build(); } @@ -502,7 +501,6 @@ class NoOpTest { public void setup() { processor = JsonSecretsProcessor.builder() .copySecrets(false) - .maskSecrets(false) .build(); } @@ -568,7 +566,7 @@ void testSecretScenario(final String folder, final boolean partial) throws IOExc final InputStream inputIs = getClass().getClassLoader().getResourceAsStream(inputFilePath); final JsonNode input = objectMapper.readTree(inputIs); - final String expectedFilePath = folder + (partial ? "/partial_config.json" : "/full_config.json"); + final String expectedFilePath = folder + "/expected.json"; final InputStream expectedIs = getClass().getClassLoader().getResourceAsStream(expectedFilePath); final JsonNode expected = objectMapper.readTree(expectedIs); diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpExporter.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpExporter.java deleted file mode 100644 index b9e6a0ab09161..0000000000000 --- a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpExporter.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.server; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.commons.io.Archives; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.yaml.Yamls; -import io.airbyte.config.ConfigSchema; -import io.airbyte.config.DestinationConnection; -import io.airbyte.config.SourceConnection; -import io.airbyte.config.StandardDestinationDefinition; -import io.airbyte.config.StandardSourceDefinition; -import io.airbyte.config.StandardSync; -import io.airbyte.config.persistence.ConfigNotFoundException; -import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.config.persistence.SecretsRepositoryReader; -import io.airbyte.persistence.job.JobPersistence; -import io.airbyte.persistence.job.WorkspaceHelper; -import io.airbyte.validation.json.JsonValidationException; -import java.io.File; -import java.io.IOException; -import java.nio.charset.Charset; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.commons.io.FileUtils; - -// TODO: Write a test case which compares the output dump with the output of ArchiveHandler export -// for the same data - -/** - * This class acts like export method of ArchiveHandler but the difference is 1. It takes a full - * dump of whatever is available in the config directory without any schema validation. We dont want - * schema validation because in case of automatic migration, the code that is going to do the schema - * validation is from new version but the data in the config files is old. Thus schema validation - * would fail. 2. Unlike ArchiveHandler, this doesn't take the dump of specific files but looks at - * the config directory and takes the full dump of whatever is available - */ -public class ConfigDumpExporter { - - private static final String ARCHIVE_FILE_NAME = "airbyte_config_dump"; - private static final String CONFIG_FOLDER_NAME = "airbyte_config"; - private static final String VERSION_FILE_NAME = "VERSION"; - private final ConfigRepository configRepository; - private final SecretsRepositoryReader secretsRepositoryReader; - private final JobPersistence jobPersistence; - private final WorkspaceHelper workspaceHelper; - - public ConfigDumpExporter(final ConfigRepository configRepository, - final SecretsRepositoryReader secretsRepositoryReader, - final JobPersistence jobPersistence, - final WorkspaceHelper workspaceHelper) { - this.configRepository = configRepository; - this.secretsRepositoryReader = secretsRepositoryReader; - this.jobPersistence = jobPersistence; - this.workspaceHelper = workspaceHelper; - } - - public File dump() { - try { - final Path tempFolder = Files.createTempDirectory(Path.of("/tmp"), ARCHIVE_FILE_NAME); - final File dump = Files.createTempFile(ARCHIVE_FILE_NAME, ".tar.gz").toFile(); - exportVersionFile(tempFolder); - dumpConfigsDatabase(tempFolder); - - Archives.createArchive(tempFolder, dump.toPath()); - return dump; - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - private void exportVersionFile(final Path tempFolder) throws IOException { - final String version = jobPersistence.getVersion().orElseThrow(); - final File versionFile = Files.createFile(tempFolder.resolve(VERSION_FILE_NAME)).toFile(); - FileUtils.writeStringToFile(versionFile, version, Charset.defaultCharset()); - } - - private void dumpConfigsDatabase(final Path parentFolder) throws IOException { - for (final Map.Entry> configEntry : secretsRepositoryReader.dumpConfigsWithSecrets().entrySet()) { - writeConfigsToArchive(parentFolder, configEntry.getKey(), configEntry.getValue()); - } - } - - private static void writeConfigsToArchive(final Path storageRoot, - final String schemaType, - final Stream configs) - throws IOException { - writeConfigsToArchive(storageRoot, schemaType, configs.collect(Collectors.toList())); - } - - private static void writeConfigsToArchive(final Path storageRoot, - final String schemaType, - final List configList) - throws IOException { - final Path configPath = buildConfigPath(storageRoot, schemaType); - Files.createDirectories(configPath.getParent()); - if (!configList.isEmpty()) { - final List sortedConfigs = configList.stream() - .sorted(Comparator.comparing(JsonNode::toString)).collect( - Collectors.toList()); - Files.writeString(configPath, Yamls.serialize(sortedConfigs)); - } else { - // Create empty file - Files.createFile(configPath); - } - } - - private static Path buildConfigPath(final Path storageRoot, final String schemaType) { - return storageRoot.resolve(CONFIG_FOLDER_NAME) - .resolve(String.format("%s.yaml", schemaType)); - } - - public File exportWorkspace(final UUID workspaceId) throws JsonValidationException, IOException, ConfigNotFoundException { - final Path tempFolder = Files.createTempDirectory(Path.of("/tmp"), ARCHIVE_FILE_NAME); - final File dump = Files.createTempFile(ARCHIVE_FILE_NAME, ".tar.gz").toFile(); - exportVersionFile(tempFolder); - exportConfigsDatabase(tempFolder, workspaceId); - - Archives.createArchive(tempFolder, dump.toPath()); - return dump; - } - - private void exportConfigsDatabase(final Path parentFolder, final UUID workspaceId) - throws IOException, JsonValidationException, ConfigNotFoundException { - final Collection sourceConnections = writeConfigsToArchive( - parentFolder, - ConfigSchema.SOURCE_CONNECTION.name(), - secretsRepositoryReader::listSourceConnectionWithSecrets, - (sourceConnection) -> workspaceId.equals(sourceConnection.getWorkspaceId())); - writeConfigsToArchive(parentFolder, ConfigSchema.STANDARD_SOURCE_DEFINITION.name(), - () -> listSourceDefinition(sourceConnections), - (config) -> true); - - final Collection destinationConnections = writeConfigsToArchive( - parentFolder, - ConfigSchema.DESTINATION_CONNECTION.name(), - secretsRepositoryReader::listDestinationConnectionWithSecrets, - (destinationConnection) -> workspaceId.equals(destinationConnection.getWorkspaceId())); - writeConfigsToArchive(parentFolder, ConfigSchema.STANDARD_DESTINATION_DEFINITION.name(), - () -> listDestinationDefinition(destinationConnections), - (config) -> true); - - writeConfigsToArchive( - parentFolder, - ConfigSchema.STANDARD_SYNC_OPERATION.name(), - configRepository::listStandardSyncOperations, - (operation) -> workspaceId.equals(operation.getWorkspaceId())); - - final List standardSyncs = new ArrayList<>(); - for (final StandardSync standardSync : configRepository.listStandardSyncs()) { - if (workspaceHelper != null && - workspaceId.equals(workspaceHelper.getWorkspaceForConnection(standardSync.getSourceId(), standardSync.getDestinationId()))) { - standardSyncs.add(standardSync); - } - } - writeConfigsToArchive(parentFolder, ConfigSchema.STANDARD_SYNC.name(), standardSyncs.stream().map(Jsons::jsonNode)); - } - - private Collection writeConfigsToArchive(final Path parentFolder, - final String configSchemaName, - final ListConfigCall listConfigCall, - final Function filterConfigCall) - throws JsonValidationException, ConfigNotFoundException, IOException { - final Collection configs = listConfigCall.apply().stream().filter(filterConfigCall::apply).collect(Collectors.toList()); - writeConfigsToArchive(parentFolder, configSchemaName, configs.stream().map(Jsons::jsonNode)); - return configs; - } - - private Collection listSourceDefinition(final Collection sourceConnections) - throws JsonValidationException, ConfigNotFoundException, IOException { - final Map sourceDefinitionMap = new HashMap<>(); - for (final SourceConnection sourceConnection : sourceConnections) { - if (!sourceDefinitionMap.containsKey(sourceConnection.getSourceDefinitionId())) { - sourceDefinitionMap - .put(sourceConnection.getSourceDefinitionId(), - configRepository.getStandardSourceDefinition(sourceConnection.getSourceDefinitionId())); - } - } - return sourceDefinitionMap.values(); - } - - private Collection listDestinationDefinition(final Collection destinationConnections) - throws JsonValidationException, ConfigNotFoundException, IOException { - final Map destinationDefinitionMap = new HashMap<>(); - for (final DestinationConnection destinationConnection : destinationConnections) { - if (!destinationDefinitionMap.containsKey(destinationConnection.getDestinationDefinitionId())) { - destinationDefinitionMap - .put(destinationConnection.getDestinationDefinitionId(), - configRepository.getStandardDestinationDefinition(destinationConnection.getDestinationDefinitionId())); - } - } - return destinationDefinitionMap.values(); - } - - /** - * List all configurations of type @param <T> that already exists - */ - public interface ListConfigCall { - - Collection apply() throws IOException, JsonValidationException, ConfigNotFoundException; - - } - -} diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java deleted file mode 100644 index a3c6dbcfa1a89..0000000000000 --- a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java +++ /dev/null @@ -1,576 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.server; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Streams; -import io.airbyte.analytics.TrackingClientSingleton; -import io.airbyte.api.model.generated.UploadRead; -import io.airbyte.commons.enums.Enums; -import io.airbyte.commons.io.Archives; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.version.AirbyteVersion; -import io.airbyte.commons.yaml.Yamls; -import io.airbyte.config.AirbyteConfig; -import io.airbyte.config.ConfigSchema; -import io.airbyte.config.DestinationConnection; -import io.airbyte.config.SourceConnection; -import io.airbyte.config.StandardDestinationDefinition; -import io.airbyte.config.StandardSourceDefinition; -import io.airbyte.config.StandardSync; -import io.airbyte.config.StandardSyncOperation; -import io.airbyte.config.persistence.ConfigNotFoundException; -import io.airbyte.config.persistence.ConfigPersistence; -import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.config.persistence.SecretsRepositoryWriter; -import io.airbyte.persistence.job.DefaultJobPersistence; -import io.airbyte.persistence.job.JobPersistence; -import io.airbyte.persistence.job.WorkspaceHelper; -import io.airbyte.server.errors.IdNotFoundKnownException; -import io.airbyte.validation.json.JsonSchemaValidator; -import io.airbyte.validation.json.JsonValidationException; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.charset.Charset; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Collection; -import java.util.Comparator; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.Spliterator; -import java.util.Spliterators; -import java.util.UUID; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; -import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@SuppressWarnings("PMD.AvoidReassigningLoopVariables") -public class ConfigDumpImporter { - - private static final Logger LOGGER = LoggerFactory.getLogger(ConfigDumpImporter.class); - private static final String CONFIG_FOLDER_NAME = "airbyte_config"; - private static final String VERSION_FILE_NAME = "VERSION"; - private static final Path TMP_AIRBYTE_STAGED_RESOURCES = Path.of("/tmp/airbyte_staged_resources"); - - private final ConfigRepository configRepository; - private final SecretsRepositoryWriter secretsRepositoryWriter; - private final WorkspaceHelper workspaceHelper; - private final JsonSchemaValidator jsonSchemaValidator; - private final JobPersistence jobPersistence; - private final boolean importDefinitions; - - public ConfigDumpImporter(final ConfigRepository configRepository, - final SecretsRepositoryWriter secretsRepositoryWriter, - final JobPersistence jobPersistence, - final WorkspaceHelper workspaceHelper, - final boolean importDefinitions) { - this(configRepository, secretsRepositoryWriter, jobPersistence, workspaceHelper, new JsonSchemaValidator(), importDefinitions); - } - - @VisibleForTesting - public ConfigDumpImporter(final ConfigRepository configRepository, - final SecretsRepositoryWriter secretsRepositoryWriter, - final JobPersistence jobPersistence, - final WorkspaceHelper workspaceHelper, - final JsonSchemaValidator jsonSchemaValidator, - final boolean importDefinitions) { - this.jsonSchemaValidator = jsonSchemaValidator; - this.jobPersistence = jobPersistence; - this.configRepository = configRepository; - this.secretsRepositoryWriter = secretsRepositoryWriter; - this.workspaceHelper = workspaceHelper; - this.importDefinitions = importDefinitions; - } - - /** - * Re-initialize the staged resource folder that contains uploaded artifacts when importing - * workspaces. This is because they need to be done in two steps (two API endpoints), upload - * resource first then import. When server starts, we flush the content of this folder, deleting - * previously staged resources that were not imported yet. - */ - public static void initStagedResourceFolder() { - try { - final File stagedResourceRoot = TMP_AIRBYTE_STAGED_RESOURCES.toFile(); - if (stagedResourceRoot.exists()) { - FileUtils.forceDelete(stagedResourceRoot); - } - FileUtils.forceMkdir(stagedResourceRoot); - FileUtils.forceDeleteOnExit(stagedResourceRoot); - } catch (final IOException e) { - throw new RuntimeException("Failed to create staging resource folder", e); - } - } - - public void importDataWithSeed(final AirbyteVersion targetVersion, final File archive, final ConfigPersistence seedPersistence) - throws IOException, JsonValidationException { - final Path sourceRoot = Files.createTempDirectory(Path.of("/tmp"), "airbyte_archive"); - try { - // 1. Unzip source - Archives.extractArchive(archive.toPath(), sourceRoot); - - // 2. dry run - try { - checkImport(targetVersion, sourceRoot); - importConfigsFromArchive(sourceRoot, true); - } catch (final Exception e) { - LOGGER.error("Dry run failed.", e); - throw e; - } - - // 4. Import Configs and update connector definitions - importConfigsFromArchive(sourceRoot, false); - configRepository.loadDataNoSecrets(seedPersistence); - - // 5. Set DB version - LOGGER.info("Setting the DB Airbyte version to : " + targetVersion); - jobPersistence.setVersion(targetVersion.serialize()); - - // 6. check db version - checkDBVersion(targetVersion); - } finally { - FileUtils.deleteDirectory(sourceRoot.toFile()); - FileUtils.deleteQuietly(archive); - } - - // identify this instance as the new customer id. - configRepository.listStandardWorkspaces(true).forEach(workspace -> TrackingClientSingleton.get().identify(workspace.getWorkspaceId())); - } - - private void checkImport(final AirbyteVersion targetVersion, final Path tempFolder) throws IOException { - final Path versionFile = tempFolder.resolve(VERSION_FILE_NAME); - final AirbyteVersion importVersion = new AirbyteVersion(Files - .readString(versionFile, Charset.defaultCharset()) - .replace("\n", "") - .strip()); - LOGGER.info(String.format("Checking Airbyte Version to import %s", importVersion)); - if (!AirbyteVersion.isCompatible(targetVersion, importVersion)) { - throw new IOException(String - .format("Imported VERSION (%s) is incompatible with current Airbyte version (%s).\n" + - "Please upgrade your Airbyte Archive, see more at https://docs.airbyte.com/operator-guides/upgrading-airbyte\n", - importVersion, targetVersion)); - } - } - - // Config - private List listDirectories(final Path sourceRoot) throws IOException { - try (final Stream files = Files.list(sourceRoot.resolve(CONFIG_FOLDER_NAME))) { - return files.map(c -> c.getFileName().toString()) - .collect(Collectors.toList()); - } - } - - private void importConfigsFromArchive(final Path sourceRoot, final boolean dryRun) throws IOException { - final List directories = listDirectories(sourceRoot); - final Map> data = new LinkedHashMap<>(); - - for (final String directory : directories) { - final Optional configSchemaOptional = Enums.toEnum(directory.replace(".yaml", ""), ConfigSchema.class); - - if (configSchemaOptional.isEmpty()) { - continue; - } - - final ConfigSchema configSchema = configSchemaOptional.get(); - data.put(configSchema, readConfigsFromArchive(sourceRoot, configSchema)); - } - secretsRepositoryWriter.replaceAllConfigs(data, dryRun); - } - - private Stream readConfigsFromArchive(final Path storageRoot, final ConfigSchema schemaType) - throws IOException { - - final Path configPath = buildConfigPath(storageRoot, schemaType); - if (configPath.toFile().exists()) { - final String configStr = Files.readString(configPath); - final JsonNode node = Yamls.deserialize(configStr); - return StreamSupport - .stream(Spliterators.spliteratorUnknownSize(node.elements(), Spliterator.ORDERED), false) - .map(element -> { - final T config = Jsons.object(element, schemaType.getClassName()); - try { - validateJson(config, schemaType); - return config; - } catch (final JsonValidationException e) { - throw new RuntimeException(e); - } - }); - - } else { - throw new FileNotFoundException( - String.format("Airbyte Configuration %s was not found in the archive", schemaType)); - } - } - - private void validateJson(final T config, final ConfigSchema configType) throws JsonValidationException { - final JsonNode schema = JsonSchemaValidator.getSchema(configType.getConfigSchemaFile()); - jsonSchemaValidator.ensure(schema, Jsons.jsonNode(config)); - } - - protected static Path buildConfigPath(final Path storageRoot, final ConfigSchema schemaType) { - return storageRoot.resolve(CONFIG_FOLDER_NAME) - .resolve(String.format("%s.yaml", schemaType.name())); - } - - /** - * The deployment concept is specific to the environment that Airbyte is running in (not the data - * being imported). Thus, if there is a deployment in the imported data, we filter it out. In - * addition, before running the import, we look up the current deployment id, and make sure that - * that id is inserted when we run the import. - * - * @param postgresPersistence - database that we are importing into. - * @param metadataTableStream - stream of records to be imported into the metadata table. - * @return modified stream with old deployment id removed and correct deployment id inserted. - * @throws IOException - you never know when you IO. - */ - static Stream replaceDeploymentMetadata(final JobPersistence postgresPersistence, - final Stream metadataTableStream) - throws IOException { - // filter out the deployment record from the import data, if it exists. - Stream stream = metadataTableStream - .filter(record -> !DefaultJobPersistence.DEPLOYMENT_ID_KEY.equals(record.get(DefaultJobPersistence.METADATA_KEY_COL).asText())); - - // insert the current deployment id, if it exists. - final Optional deploymentOptional = postgresPersistence.getDeployment(); - if (deploymentOptional.isPresent()) { - final JsonNode deploymentRecord = Jsons.jsonNode(ImmutableMap.builder() - .put(DefaultJobPersistence.METADATA_KEY_COL, DefaultJobPersistence.DEPLOYMENT_ID_KEY) - .put(DefaultJobPersistence.METADATA_VAL_COL, deploymentOptional.get().toString()) - .build()); - stream = Streams.concat(stream, Stream.of(deploymentRecord)); - } - return stream; - } - - private void checkDBVersion(final AirbyteVersion airbyteVersion) throws IOException { - final Optional airbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new); - airbyteDatabaseVersion - .ifPresent(dbVersion -> AirbyteVersion.assertIsCompatible(airbyteVersion, dbVersion)); - } - - public UploadRead uploadArchiveResource(final File archive) { - try { - final UUID resourceId = UUID.randomUUID(); - FileUtils.moveFile(archive, TMP_AIRBYTE_STAGED_RESOURCES.resolve(resourceId.toString()).toFile()); - return new UploadRead() - .status(UploadRead.StatusEnum.SUCCEEDED) - .resourceId(resourceId); - } catch (final IOException e) { - LOGGER.error("Failed to upload archive resource", e); - return new UploadRead().status(UploadRead.StatusEnum.FAILED); - } - } - - public File getArchiveResource(final UUID resourceId) { - final File archive = TMP_AIRBYTE_STAGED_RESOURCES.resolve(resourceId.toString()).toFile(); - if (!archive.exists()) { - throw new IdNotFoundKnownException("Archive Resource not found", resourceId.toString()); - } - return archive; - } - - public void deleteArchiveResource(final UUID resourceId) { - final File archive = getArchiveResource(resourceId); - FileUtils.deleteQuietly(archive); - } - - public void importIntoWorkspace(final AirbyteVersion targetVersion, final UUID workspaceId, final File archive) - throws IOException, JsonValidationException, ConfigNotFoundException { - final Path sourceRoot = Files.createTempDirectory(Path.of("/tmp"), "airbyte_archive"); - try { - // 1. Unzip source - Archives.extractArchive(archive.toPath(), sourceRoot); - - // TODO: Auto-migrate archive? - - // 2. dry run - try { - checkImport(targetVersion, sourceRoot); - importConfigsIntoWorkspace(sourceRoot, workspaceId, true); - } catch (final Exception e) { - LOGGER.error("Dry run failed.", e); - throw e; - } - - // 3. import configs - importConfigsIntoWorkspace(sourceRoot, workspaceId, false); - } finally { - FileUtils.deleteDirectory(sourceRoot.toFile()); - } - } - - private void importConfigsIntoWorkspace(final Path sourceRoot, final UUID workspaceId, final boolean dryRun) - throws IOException, JsonValidationException, ConfigNotFoundException { - // Keep maps of any re-assigned ids - final Map sourceIdMap = new HashMap<>(); - final Map destinationIdMap = new HashMap<>(); - final Map operationIdMap = new HashMap<>(); - - final List directories = listDirectories(sourceRoot); - // We sort the directories because we want to process SOURCE_CONNECTION after - // STANDARD_SOURCE_DEFINITION and DESTINATION_CONNECTION after STANDARD_DESTINATION_DEFINITION - // so that we can identify which connectors should not be imported because the definitions are not - // existing - directories.sort(Comparator.reverseOrder()); - Stream standardSyncs = null; - - for (final String directory : directories) { - final Optional configSchemaOptional = Enums.toEnum(directory.replace(".yaml", ""), ConfigSchema.class); - - if (configSchemaOptional.isEmpty()) { - continue; - } - final ConfigSchema configSchema = configSchemaOptional.get(); - final Stream configs = readConfigsFromArchive(sourceRoot, configSchema); - - if (dryRun) { - continue; - } - - switch (configSchema) { - case STANDARD_SOURCE_DEFINITION -> { - if (canImportDefinitions()) { - importSourceDefinitionIntoWorkspace(configs); - } - } - case SOURCE_CONNECTION -> sourceIdMap.putAll(importIntoWorkspace( - ConfigSchema.SOURCE_CONNECTION, - configs.map(c -> (SourceConnection) c), - configRepository::listSourceConnection, - (sourceConnection) -> !workspaceId.equals(sourceConnection.getWorkspaceId()), - (sourceConnection, sourceId) -> { - sourceConnection.setSourceId(sourceId); - sourceConnection.setWorkspaceId(workspaceId); - return sourceConnection; - }, - (sourceConnection) -> { - // make sure connector definition exists - try { - final StandardSourceDefinition sourceDefinition = - configRepository.getStandardSourceDefinition(sourceConnection.getSourceDefinitionId()); - if (sourceDefinition == null) { - return; - } - if (sourceDefinition.getTombstone() != null && sourceDefinition.getTombstone()) { - return; - } - secretsRepositoryWriter.writeSourceConnection(sourceConnection, sourceDefinition.getSpec()); - } catch (final ConfigNotFoundException e) { - return; - } - })); - case STANDARD_DESTINATION_DEFINITION -> { - if (canImportDefinitions()) { - importDestinationDefinitionIntoWorkspace(configs); - } - } - case DESTINATION_CONNECTION -> destinationIdMap.putAll(importIntoWorkspace( - ConfigSchema.DESTINATION_CONNECTION, - configs.map(c -> (DestinationConnection) c), - configRepository::listDestinationConnection, - (destinationConnection) -> !workspaceId.equals(destinationConnection.getWorkspaceId()), - (destinationConnection, destinationId) -> { - destinationConnection.setDestinationId(destinationId); - destinationConnection.setWorkspaceId(workspaceId); - return destinationConnection; - }, - (destinationConnection) -> { - // make sure connector definition exists - try { - final StandardDestinationDefinition destinationDefinition = configRepository.getStandardDestinationDefinition( - destinationConnection.getDestinationDefinitionId()); - if (destinationDefinition == null) { - return; - } - if (destinationDefinition.getTombstone() != null && destinationDefinition.getTombstone()) { - return; - } - secretsRepositoryWriter.writeDestinationConnection(destinationConnection, destinationDefinition.getSpec()); - } catch (final ConfigNotFoundException e) { - return; - } - })); - case STANDARD_SYNC -> standardSyncs = configs; - case STANDARD_SYNC_OPERATION -> operationIdMap.putAll(importIntoWorkspace( - ConfigSchema.STANDARD_SYNC_OPERATION, - configs.map(c -> (StandardSyncOperation) c), - configRepository::listStandardSyncOperations, - (operation) -> !workspaceId.equals(operation.getWorkspaceId()), - (operation, operationId) -> { - operation.setOperationId(operationId); - operation.setWorkspaceId(workspaceId); - return operation; - }, - configRepository::writeStandardSyncOperation)); - default -> {} - } - } - - if (standardSyncs != null) { - // we import connections (standard sync) last to update reference to modified ids - importIntoWorkspace( - ConfigSchema.STANDARD_SYNC, - standardSyncs.map(c -> (StandardSync) c), - configRepository::listStandardSyncs, - (standardSync) -> { - try { - return !workspaceId.equals(workspaceHelper.getWorkspaceForConnection(standardSync.getSourceId(), standardSync.getDestinationId())); - } catch (final JsonValidationException | ConfigNotFoundException e) { - return true; - } - }, - (standardSync, connectionId) -> { - standardSync.setConnectionId(connectionId); - standardSync.setSourceId(sourceIdMap.get(standardSync.getSourceId())); - standardSync.setDestinationId(destinationIdMap.get(standardSync.getDestinationId())); - standardSync.setOperationIds(standardSync.getOperationIds() - .stream() - .map(operationIdMap::get) - .collect(Collectors.toList())); - return standardSync; - }, - (standardSync) -> { - // make sure connectors definition exists - try { - if (configRepository.getSourceConnection(standardSync.getSourceId()) == null || - configRepository.getDestinationConnection(standardSync.getDestinationId()) == null) { - return; - } - for (final UUID operationId : standardSync.getOperationIds()) { - if (configRepository.getStandardSyncOperation(operationId) == null) { - return; - } - } - } catch (final ConfigNotFoundException e) { - return; - } - configRepository.writeStandardSync(standardSync); - }); - } - } - - /** - * Method that @return if this importer will import standard connector definitions or not - */ - public boolean canImportDefinitions() { - return importDefinitions; - } - - protected void importSourceDefinitionIntoWorkspace(final Stream configs) - throws JsonValidationException, ConfigNotFoundException, IOException { - importIntoWorkspace( - ConfigSchema.STANDARD_SOURCE_DEFINITION, - configs.map(c -> (StandardSourceDefinition) c), - () -> configRepository.listStandardSourceDefinitions(false), - (config) -> true, - (config, id) -> { - if (id.equals(config.getSourceDefinitionId())) { - return config; - } else { - // a newId has been generated for this definition as it is in conflict with an existing one - // here we return null, so we don't do anything to the old definition - return null; - } - }, - (config) -> { - if (config != null) { - configRepository.writeStandardSourceDefinition(config); - } - }); - } - - protected void importDestinationDefinitionIntoWorkspace(final Stream configs) - throws JsonValidationException, ConfigNotFoundException, IOException { - importIntoWorkspace( - ConfigSchema.STANDARD_DESTINATION_DEFINITION, - configs.map(c -> (StandardDestinationDefinition) c), - () -> configRepository.listStandardDestinationDefinitions(false), - (config) -> true, - (config, id) -> { - if (id.equals(config.getDestinationDefinitionId())) { - return config; - } else { - // a newId has been generated for this definition as it is in conflict with an existing one - // here we return null, so we don't do anything to the old definition - return null; - } - }, - (config) -> { - if (config != null) { - configRepository.writeStandardDestinationDefinition(config); - } - }); - } - - private Map importIntoWorkspace(final ConfigSchema configSchema, - final Stream configs, - final ListConfigCall listConfigCall, - final Function filterConfigCall, - final MutateConfigCall mutateConfig, - final PersistConfigCall persistConfig) - throws JsonValidationException, ConfigNotFoundException, IOException { - final Map idsMap = new HashMap<>(); - // To detect conflicts, we retrieve ids already in use by others for this ConfigSchema (ids from the - // current workspace can be safely updated) - final Set idsInUse = listConfigCall.apply() - .stream() - .filter(filterConfigCall::apply) - .map(configSchema::getId) - .map(UUID::fromString) - .collect(Collectors.toSet()); - for (T config : configs.collect(Collectors.toList())) { - final UUID configId = UUID.fromString(configSchema.getId(config)); - final UUID configIdToPersist = idsInUse.contains(configId) ? UUID.randomUUID() : configId; - config = mutateConfig.apply(config, configIdToPersist); - if (config != null) { - idsMap.put(configId, UUID.fromString(configSchema.getId(config))); - persistConfig.apply(config); - } else { - idsMap.put(configId, configId); - } - } - return idsMap; - } - - /** - * List all configurations of type @param <T> that already exists (we'll be using this to know - * which ids are already in use) - */ - public interface ListConfigCall { - - Collection apply() throws IOException, JsonValidationException, ConfigNotFoundException; - - } - - /** - * Apply some modifications to the configuration with new ids - */ - public interface MutateConfigCall { - - T apply(T config, UUID newId) throws IOException, JsonValidationException, ConfigNotFoundException; - - } - - /** - * Persist the configuration - */ - public interface PersistConfigCall { - - void apply(T config) throws JsonValidationException, IOException; - - } - -} diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java index bc2b2703ae8f5..4ed1af3866126 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java @@ -5,11 +5,9 @@ package io.airbyte.server; import io.airbyte.analytics.TrackingClient; -import io.airbyte.commons.io.FileTtlManager; import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.helpers.LogConfigs; -import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.SecretsRepositoryReader; import io.airbyte.config.persistence.SecretsRepositoryWriter; @@ -30,11 +28,9 @@ public class ConfigurationApiFactory implements Factory { private static ConfigRepository configRepository; private static JobPersistence jobPersistence; - private static ConfigPersistence seed; private static SecretsRepositoryReader secretsRepositoryReader; private static SecretsRepositoryWriter secretsRepositoryWriter; private static SynchronousSchedulerClient synchronousSchedulerClient; - private static FileTtlManager archiveTtlManager; private static StatePersistence statePersistence; private static Map mdc; private static Database configsDatabase; @@ -54,9 +50,7 @@ public static void setValues( final SecretsRepositoryReader secretsRepositoryReader, final SecretsRepositoryWriter secretsRepositoryWriter, final JobPersistence jobPersistence, - final ConfigPersistence seed, final SynchronousSchedulerClient synchronousSchedulerClient, - final FileTtlManager archiveTtlManager, final StatePersistence statePersistence, final Map mdc, final Database configsDatabase, @@ -72,11 +66,9 @@ public static void setValues( final Flyway jobsFlyway) { ConfigurationApiFactory.configRepository = configRepository; ConfigurationApiFactory.jobPersistence = jobPersistence; - ConfigurationApiFactory.seed = seed; ConfigurationApiFactory.secretsRepositoryReader = secretsRepositoryReader; ConfigurationApiFactory.secretsRepositoryWriter = secretsRepositoryWriter; ConfigurationApiFactory.synchronousSchedulerClient = synchronousSchedulerClient; - ConfigurationApiFactory.archiveTtlManager = archiveTtlManager; ConfigurationApiFactory.mdc = mdc; ConfigurationApiFactory.configsDatabase = configsDatabase; ConfigurationApiFactory.jobsDatabase = jobsDatabase; @@ -99,11 +91,9 @@ public ConfigurationApi provide() { return new ConfigurationApi( ConfigurationApiFactory.configRepository, ConfigurationApiFactory.jobPersistence, - ConfigurationApiFactory.seed, ConfigurationApiFactory.secretsRepositoryReader, ConfigurationApiFactory.secretsRepositoryWriter, ConfigurationApiFactory.synchronousSchedulerClient, - ConfigurationApiFactory.archiveTtlManager, ConfigurationApiFactory.configsDatabase, ConfigurationApiFactory.jobsDatabase, ConfigurationApiFactory.statePersistence, diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index b304e2c008bbb..c50b443e203da 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -8,8 +8,6 @@ import io.airbyte.analytics.Deployment; import io.airbyte.analytics.TrackingClient; import io.airbyte.analytics.TrackingClientSingleton; -import io.airbyte.commons.features.EnvVariableFeatureFlags; -import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.lang.CloseableShutdownHook; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.temporal.TemporalUtils; @@ -17,9 +15,9 @@ import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs; import io.airbyte.config.EnvConfigs; +import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSync.Status; import io.airbyte.config.helpers.LogClientSingleton; -import io.airbyte.config.init.YamlSeedConfigPersistence; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; @@ -167,7 +165,6 @@ private static void assertDatabasesReady(final Configs configs, } public static ServerRunnable getServer(final ServerFactory apiFactory, - final ConfigPersistence seed, final Configs configs, final DSLContext configsDslContext, final Flyway configsFlyway, @@ -182,14 +179,9 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, LOGGER.info("Checking databases.."); assertDatabasesReady(configs, configsDslContext, configsFlyway, jobsDslContext, jobsFlyway); - LOGGER.info("Creating Staged Resource folder..."); - ConfigDumpImporter.initStagedResourceFolder(); - LOGGER.info("Creating config repository..."); final Database configsDatabase = new Database(configsDslContext); - final FeatureFlags featureFlags = new EnvVariableFeatureFlags(); final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder() - .maskSecrets(!featureFlags.exposeSecretsInExport()) .copySecrets(false) .build(); final ConfigPersistence configPersistence = DatabaseConfigPersistence.createWithValidation(configsDatabase, jsonSecretsProcessor); @@ -271,7 +263,6 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, secretsRepositoryReader, secretsRepositoryWriter, jobPersistence, - seed, configsDatabase, jobsDatabase, trackingClient, @@ -300,13 +291,14 @@ static void migrateExistingConnectionsToTemporalScheduler(final ConfigRepository final Set connectionIds = configRepository.listStandardSyncs().stream() .filter(standardSync -> standardSync.getStatus() == Status.ACTIVE || standardSync.getStatus() == Status.INACTIVE) - .map(standardSync -> standardSync.getConnectionId()).collect(Collectors.toSet()); + .map(StandardSync::getConnectionId) + .collect(Collectors.toSet()); eventRunner.migrateSyncIfNeeded(connectionIds); jobPersistence.setSchedulerMigrationDone(); LOGGER.info("Done migrating to the new scheduler..."); } - public static void main(final String[] args) throws Exception { + public static void main(final String[] args) { try { final Configs configs = new EnvConfigs(); @@ -327,10 +319,8 @@ public static void main(final String[] args) throws Exception { ConfigsDatabaseMigrator.DB_IDENTIFIER, ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION); final Flyway jobsFlyway = FlywayFactory.create(jobsDataSource, DbMigrationHandler.class.getSimpleName(), JobsDatabaseMigrator.DB_IDENTIFIER, JobsDatabaseMigrator.MIGRATION_FILE_LOCATION); - final ConfigPersistence yamlSeedConfigPersistence = - new YamlSeedConfigPersistence(YamlSeedConfigPersistence.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); - getServer(new ServerFactory.Api(), yamlSeedConfigPersistence, configs, configsDslContext, configsFlyway, jobsDslContext, jobsFlyway).start(); + getServer(new ServerFactory.Api(), configs, configsDslContext, configsFlyway, jobsDslContext, jobsFlyway).start(); } } catch (final Throwable e) { LOGGER.error("Server failed", e); diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java b/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java index aa0af731c63ed..a86486d53961e 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java @@ -5,11 +5,9 @@ package io.airbyte.server; import io.airbyte.analytics.TrackingClient; -import io.airbyte.commons.io.FileTtlManager; import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.helpers.LogConfigs; -import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.SecretsRepositoryReader; import io.airbyte.config.persistence.SecretsRepositoryWriter; @@ -22,7 +20,6 @@ import java.net.http.HttpClient; import java.nio.file.Path; import java.util.Set; -import java.util.concurrent.TimeUnit; import org.flywaydb.core.Flyway; import org.slf4j.MDC; @@ -33,7 +30,6 @@ ServerRunnable create(SynchronousSchedulerClient cachingSchedulerClient, SecretsRepositoryReader secretsRepositoryReader, SecretsRepositoryWriter secretsRepositoryWriter, JobPersistence jobPersistence, - ConfigPersistence seed, Database configsDatabase, Database jobsDatabase, TrackingClient trackingClient, @@ -54,7 +50,6 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul final SecretsRepositoryReader secretsRepositoryReader, final SecretsRepositoryWriter secretsRepositoryWriter, final JobPersistence jobPersistence, - final ConfigPersistence seed, final Database configsDatabase, final Database jobsDatabase, final TrackingClient trackingClient, @@ -72,9 +67,7 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul secretsRepositoryReader, secretsRepositoryWriter, jobPersistence, - seed, synchronousSchedulerClient, - new FileTtlManager(10, TimeUnit.MINUTES, 10), new StatePersistence(configsDatabase), MDC.getCopyOfContextMap(), configsDatabase, diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java index 09b68f029a249..ba39d020aaaab 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java @@ -99,12 +99,10 @@ import io.airbyte.api.model.generated.WorkspaceReadList; import io.airbyte.api.model.generated.WorkspaceUpdate; import io.airbyte.api.model.generated.WorkspaceUpdateName; -import io.airbyte.commons.io.FileTtlManager; import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.persistence.ConfigNotFoundException; -import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.SecretsRepositoryReader; import io.airbyte.config.persistence.SecretsRepositoryWriter; @@ -114,7 +112,6 @@ import io.airbyte.persistence.job.WorkspaceHelper; import io.airbyte.server.errors.BadObjectSchemaKnownException; import io.airbyte.server.errors.IdNotFoundKnownException; -import io.airbyte.server.handlers.ArchiveHandler; import io.airbyte.server.handlers.AttemptHandler; import io.airbyte.server.handlers.ConnectionsHandler; import io.airbyte.server.handlers.DbMigrationHandler; @@ -160,7 +157,6 @@ public class ConfigurationApi implements io.airbyte.api.generated.V1Api { private final JobHistoryHandler jobHistoryHandler; private final WebBackendConnectionsHandler webBackendConnectionsHandler; private final HealthCheckHandler healthCheckHandler; - private final ArchiveHandler archiveHandler; private final LogsHandler logsHandler; private final OpenApiConfigHandler openApiConfigHandler; private final DbMigrationHandler dbMigrationHandler; @@ -172,11 +168,9 @@ public class ConfigurationApi implements io.airbyte.api.generated.V1Api { public ConfigurationApi(final ConfigRepository configRepository, final JobPersistence jobPersistence, - final ConfigPersistence seed, final SecretsRepositoryReader secretsRepositoryReader, final SecretsRepositoryWriter secretsRepositoryWriter, final SynchronousSchedulerClient synchronousSchedulerClient, - final FileTtlManager archiveTtlManager, final Database configsDatabase, final Database jobsDatabase, final StatePersistence statePersistence, @@ -243,16 +237,6 @@ public ConfigurationApi(final ConfigRepository configRepository, eventRunner, configRepository); healthCheckHandler = new HealthCheckHandler(configRepository); - archiveHandler = new ArchiveHandler( - airbyteVersion, - configRepository, - secretsRepositoryReader, - secretsRepositoryWriter, - jobPersistence, - seed, - workspaceHelper, - archiveTtlManager, - true); logsHandler = new LogsHandler(); openApiConfigHandler = new OpenApiConfigHandler(); dbMigrationHandler = new DbMigrationHandler(configsDatabase, configsFlyway, jobsDatabase, jobsFlyway); @@ -842,10 +826,6 @@ public InternalOperationResult setWorkflowInAttempt(final SetWorkflowInAttemptRe return execute(() -> attemptHandler.setWorkflowInAttempt(requestBody)); } - public boolean canImportDefinitions() { - return archiveHandler.canImportDefinitions(); - } - private static T execute(final HandlerCall call) { try { return call.call(); diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/ConfigurationUpdate.java b/airbyte-server/src/main/java/io/airbyte/server/converters/ConfigurationUpdate.java index 18888169e2c55..f4ac339423cd1 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/ConfigurationUpdate.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/ConfigurationUpdate.java @@ -27,7 +27,6 @@ public class ConfigurationUpdate { public ConfigurationUpdate(final ConfigRepository configRepository, final SecretsRepositoryReader secretsRepositoryReader) { this(configRepository, secretsRepositoryReader, JsonSecretsProcessor.builder() - .maskSecrets(true) .copySecrets(true) .build()); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ArchiveHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ArchiveHandler.java deleted file mode 100644 index adfb4dbfd852d..0000000000000 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ArchiveHandler.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.server.handlers; - -import io.airbyte.api.model.generated.ImportRead; -import io.airbyte.api.model.generated.ImportRead.StatusEnum; -import io.airbyte.api.model.generated.ImportRequestBody; -import io.airbyte.api.model.generated.UploadRead; -import io.airbyte.api.model.generated.WorkspaceIdRequestBody; -import io.airbyte.commons.io.FileTtlManager; -import io.airbyte.commons.version.AirbyteVersion; -import io.airbyte.config.persistence.ConfigNotFoundException; -import io.airbyte.config.persistence.ConfigPersistence; -import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.config.persistence.SecretsRepositoryReader; -import io.airbyte.config.persistence.SecretsRepositoryWriter; -import io.airbyte.persistence.job.JobPersistence; -import io.airbyte.persistence.job.WorkspaceHelper; -import io.airbyte.server.ConfigDumpExporter; -import io.airbyte.server.ConfigDumpImporter; -import io.airbyte.server.errors.InternalServerKnownException; -import io.airbyte.validation.json.JsonValidationException; -import java.io.File; -import java.io.IOException; -import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ArchiveHandler { - - private static final Logger LOGGER = LoggerFactory.getLogger(ArchiveHandler.class); - - private final AirbyteVersion version; - private final ConfigDumpExporter configDumpExporter; - private final ConfigDumpImporter configDumpImporter; - private final ConfigPersistence seed; - private final FileTtlManager fileTtlManager; - - public ArchiveHandler(final AirbyteVersion version, - final ConfigRepository configRepository, - final SecretsRepositoryReader secretsRepositoryReader, - final SecretsRepositoryWriter secretsRepositoryWriter, - final JobPersistence jobPersistence, - final ConfigPersistence seed, - final WorkspaceHelper workspaceHelper, - final FileTtlManager fileTtlManager, - final boolean importDefinitions) { - this( - version, - fileTtlManager, - new ConfigDumpExporter(configRepository, secretsRepositoryReader, jobPersistence, workspaceHelper), - new ConfigDumpImporter(configRepository, secretsRepositoryWriter, jobPersistence, workspaceHelper, importDefinitions), - seed); - } - - public ArchiveHandler(final AirbyteVersion version, - final FileTtlManager fileTtlManager, - final ConfigDumpExporter configDumpExporter, - final ConfigDumpImporter configDumpImporter, - final ConfigPersistence seed) { - this.version = version; - this.configDumpExporter = configDumpExporter; - this.configDumpImporter = configDumpImporter; - this.seed = seed; - this.fileTtlManager = fileTtlManager; - } - - /** - * Creates an archive tarball file using Gzip compression of internal Airbyte Data - * - * @return that tarball File. - */ - public File exportData() { - final File archive = configDumpExporter.dump(); - fileTtlManager.register(archive.toPath()); - return archive; - } - - /** - * Creates an archive tarball file using Gzip compression of only configurations tied to - * - * @param workspaceIdRequestBody which is the target workspace to export - * @return that lightweight tarball file - */ - public File exportWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) { - final File archive; - try { - archive = configDumpExporter.exportWorkspace(workspaceIdRequestBody.getWorkspaceId()); - fileTtlManager.register(archive.toPath()); - return archive; - } catch (final JsonValidationException | IOException | ConfigNotFoundException e) { - throw new InternalServerKnownException(String.format("Failed to export Workspace configuration due to: %s", e.getMessage()), e); - } - } - - /** - * Extract internal Airbyte data from the @param archive tarball file (using Gzip compression) as - * produced by {@link #exportData()}. Note that the provided archived file will be deleted. - * - * @return a status object describing if import was successful or not. - */ - public ImportRead importData(final File archive) { - try { - return importInternal(() -> configDumpImporter.importDataWithSeed(version, archive, seed)); - } finally { - FileUtils.deleteQuietly(archive); - } - } - - public UploadRead uploadArchiveResource(final File archive) { - return configDumpImporter.uploadArchiveResource(archive); - } - - /** - * Extract Airbyte configuration data from the archive tarball file (using Gzip compression) as - * produced by {@link #exportWorkspace(WorkspaceIdRequestBody)}. The configurations from the tarball - * may get mutated to be safely included into the current workspace. (the exact same tarball could - * be imported into 2 different workspaces) Note that the provided archived file will be deleted. - * - * @return a status object describing if import was successful or not. - */ - public ImportRead importIntoWorkspace(final ImportRequestBody importRequestBody) { - final File archive = configDumpImporter.getArchiveResource(importRequestBody.getResourceId()); - try { - return importInternal( - () -> configDumpImporter.importIntoWorkspace(version, importRequestBody.getWorkspaceId(), archive)); - } finally { - configDumpImporter.deleteArchiveResource(importRequestBody.getResourceId()); - } - } - - private ImportRead importInternal(final importCall importCall) { - ImportRead result; - try { - importCall.importData(); - result = new ImportRead().status(StatusEnum.SUCCEEDED); - } catch (final Exception e) { - LOGGER.error("Import failed", e); - result = new ImportRead().status(StatusEnum.FAILED).reason(e.getMessage()); - } - - return result; - } - - public interface importCall { - - void importData() throws IOException, JsonValidationException, ConfigNotFoundException; - - } - - public boolean canImportDefinitions() { - return configDumpImporter.canImportDefinitions(); - } - -} diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java index 1d4ecdf2c2def..27ec317b83c57 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java @@ -58,7 +58,7 @@ public class DestinationHandler { this.configRepository = configRepository; this.secretsRepositoryReader = secretsRepositoryReader; this.secretsRepositoryWriter = secretsRepositoryWriter; - this.validator = integrationSchemaValidation; + validator = integrationSchemaValidation; this.connectionsHandler = connectionsHandler; this.uuidGenerator = uuidGenerator; this.configurationUpdate = configurationUpdate; @@ -78,7 +78,6 @@ public DestinationHandler(final ConfigRepository configRepository, connectionsHandler, UUID::randomUUID, JsonSecretsProcessor.builder() - .maskSecrets(true) .copySecrets(true) .build(), new ConfigurationUpdate(configRepository, secretsRepositoryReader)); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java index 0990887d68eed..57c751e3e589c 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java @@ -56,7 +56,7 @@ public class SourceHandler { this.configRepository = configRepository; this.secretsRepositoryReader = secretsRepositoryReader; this.secretsRepositoryWriter = secretsRepositoryWriter; - this.validator = integrationSchemaValidation; + validator = integrationSchemaValidation; this.connectionsHandler = connectionsHandler; this.uuidGenerator = uuidGenerator; this.configurationUpdate = configurationUpdate; @@ -76,7 +76,6 @@ public SourceHandler(final ConfigRepository configRepository, connectionsHandler, UUID::randomUUID, JsonSecretsProcessor.builder() - .maskSecrets(true) .copySecrets(true) .build(), new ConfigurationUpdate(configRepository, secretsRepositoryReader)); diff --git a/airbyte-server/src/test/java/io/airbyte/server/ConfigDumpImporterTest.java b/airbyte-server/src/test/java/io/airbyte/server/ConfigDumpImporterTest.java deleted file mode 100644 index 3f86267159ae5..0000000000000 --- a/airbyte-server/src/test/java/io/airbyte/server/ConfigDumpImporterTest.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.server; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.AdditionalMatchers.not; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.version.AirbyteVersion; -import io.airbyte.config.DestinationConnection; -import io.airbyte.config.SourceConnection; -import io.airbyte.config.StandardDestinationDefinition; -import io.airbyte.config.StandardSourceDefinition; -import io.airbyte.config.StandardSync; -import io.airbyte.config.StandardSync.Status; -import io.airbyte.config.StandardSyncOperation; -import io.airbyte.config.StandardSyncOperation.OperatorType; -import io.airbyte.config.persistence.ConfigNotFoundException; -import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.config.persistence.SecretsRepositoryReader; -import io.airbyte.config.persistence.SecretsRepositoryWriter; -import io.airbyte.persistence.job.DefaultJobPersistence; -import io.airbyte.persistence.job.JobPersistence; -import io.airbyte.persistence.job.WorkspaceHelper; -import io.airbyte.protocol.models.ConnectorSpecification; -import io.airbyte.validation.json.JsonSchemaValidator; -import io.airbyte.validation.json.JsonValidationException; -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Optional; -import java.util.UUID; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -class ConfigDumpImporterTest { - - static final AirbyteVersion TEST_VERSION = new AirbyteVersion("0.0.1-test-version"); - - private ConfigRepository configRepository; - private SecretsRepositoryReader secretsRepositoryReader; - private SecretsRepositoryWriter secretsRepositoryWriter; - private ConfigDumpImporter configDumpImporter; - private ConfigDumpExporter configDumpExporter; - - private UUID workspaceId; - private SourceConnection sourceConnection; - private DestinationConnection destinationConnection; - private StandardSyncOperation operation; - private StandardSync connection; - private ConnectorSpecification emptyConnectorSpec; - - @BeforeEach - void setup() throws IOException, JsonValidationException, ConfigNotFoundException { - configRepository = mock(ConfigRepository.class); - secretsRepositoryReader = mock(SecretsRepositoryReader.class); - secretsRepositoryWriter = mock(SecretsRepositoryWriter.class); - final JobPersistence jobPersistence = mock(JobPersistence.class); - final WorkspaceHelper workspaceHelper = mock(WorkspaceHelper.class); - - emptyConnectorSpec = new ConnectorSpecification().withConnectionSpecification(Jsons.emptyObject()); - - configDumpImporter = new ConfigDumpImporter( - configRepository, - secretsRepositoryWriter, - jobPersistence, - workspaceHelper, - mock(JsonSchemaValidator.class), - true); - - configDumpExporter = new ConfigDumpExporter(configRepository, secretsRepositoryReader, jobPersistence, workspaceHelper); - - workspaceId = UUID.randomUUID(); - when(jobPersistence.getVersion()).thenReturn(Optional.of(TEST_VERSION.serialize())); - - final StandardSourceDefinition standardSourceDefinition = new StandardSourceDefinition() - .withSourceDefinitionId(UUID.randomUUID()) - .withName("test-standard-source") - .withDockerRepository("test") - .withDocumentationUrl("http://doc") - .withIcon("hello") - .withDockerImageTag("dev") - .withSpec(emptyConnectorSpec); - sourceConnection = new SourceConnection() - .withSourceId(UUID.randomUUID()) - .withSourceDefinitionId(standardSourceDefinition.getSourceDefinitionId()) - .withConfiguration(Jsons.emptyObject()) - .withName("test-source") - .withTombstone(false) - .withWorkspaceId(workspaceId); - when(configRepository.listStandardSourceDefinitions(false)) - .thenReturn(List.of(standardSourceDefinition)); - when(configRepository.getStandardSourceDefinition(standardSourceDefinition.getSourceDefinitionId())) - .thenReturn(standardSourceDefinition); - when(configRepository.getSourceConnection(any())) - .thenReturn(sourceConnection); - - final StandardDestinationDefinition standardDestinationDefinition = new StandardDestinationDefinition() - .withDestinationDefinitionId(UUID.randomUUID()) - .withName("test-standard-destination") - .withDockerRepository("test") - .withDocumentationUrl("http://doc") - .withIcon("hello") - .withDockerImageTag("dev") - .withSpec(emptyConnectorSpec); - destinationConnection = new DestinationConnection() - .withDestinationId(UUID.randomUUID()) - .withDestinationDefinitionId(standardDestinationDefinition.getDestinationDefinitionId()) - .withConfiguration(Jsons.emptyObject()) - .withName("test-source") - .withTombstone(false) - .withWorkspaceId(workspaceId); - when(configRepository.listStandardDestinationDefinitions(false)) - .thenReturn(List.of(standardDestinationDefinition)); - when(configRepository.getStandardDestinationDefinition(standardDestinationDefinition.getDestinationDefinitionId())) - .thenReturn(standardDestinationDefinition); - when(configRepository.getDestinationConnection(any())) - .thenReturn(destinationConnection); - - operation = new StandardSyncOperation() - .withOperationId(UUID.randomUUID()) - .withName("test-operation") - .withWorkspaceId(workspaceId) - .withTombstone(false) - .withOperatorType(OperatorType.DBT); - when(configRepository.getStandardSyncOperation(any())) - .thenReturn(operation); - - connection = new StandardSync() - .withConnectionId(UUID.randomUUID()) - .withSourceId(sourceConnection.getSourceId()) - .withDestinationId(destinationConnection.getDestinationId()) - .withOperationIds(List.of(operation.getOperationId())) - .withName("test-sync") - .withStatus(Status.ACTIVE); - - when(workspaceHelper.getWorkspaceForConnection(sourceConnection.getSourceId(), destinationConnection.getDestinationId())) - .thenReturn(workspaceId); - } - - @Test - void testImportIntoWorkspaceWithConflicts() throws JsonValidationException, ConfigNotFoundException, IOException { - when(secretsRepositoryReader.listSourceConnectionWithSecrets()) - .thenReturn(List.of(sourceConnection, - new SourceConnection() - .withSourceId(UUID.randomUUID()) - .withWorkspaceId(UUID.randomUUID()))); - when(secretsRepositoryReader.listDestinationConnectionWithSecrets()) - .thenReturn(List.of(destinationConnection, - new DestinationConnection() - .withDestinationId(UUID.randomUUID()) - .withWorkspaceId(UUID.randomUUID()))); - when(configRepository.listStandardSyncOperations()) - .thenReturn(List.of(operation, - new StandardSyncOperation() - .withOperationId(UUID.randomUUID()) - .withWorkspaceId(UUID.randomUUID()))); - when(configRepository.listStandardSyncs()) - .thenReturn(List.of(connection)); - final File archive = configDumpExporter.exportWorkspace(workspaceId); - - final UUID newWorkspaceId = UUID.randomUUID(); - configDumpImporter.importIntoWorkspace(TEST_VERSION, newWorkspaceId, archive); - - verify(secretsRepositoryWriter) - .writeSourceConnection( - Jsons.clone(sourceConnection).withWorkspaceId(newWorkspaceId).withSourceId(not(eq(sourceConnection.getSourceId()))), - eq(emptyConnectorSpec)); - verify(secretsRepositoryWriter).writeDestinationConnection( - Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId).withDestinationId(not(eq(destinationConnection.getDestinationId()))), - eq(emptyConnectorSpec)); - verify(configRepository) - .writeStandardSyncOperation(Jsons.clone(operation).withWorkspaceId(newWorkspaceId).withOperationId(not(eq(operation.getOperationId())))); - verify(configRepository).writeStandardSync(Jsons.clone(connection).withConnectionId(not(eq(connection.getConnectionId())))); - } - - @Test - void testImportIntoWorkspaceWithoutConflicts() throws JsonValidationException, ConfigNotFoundException, IOException { - when(secretsRepositoryReader.listSourceConnectionWithSecrets()) - // First called for export - .thenReturn(List.of(sourceConnection, - new SourceConnection() - .withSourceId(UUID.randomUUID()) - .withWorkspaceId(UUID.randomUUID()))) - // then called for import - .thenReturn(List.of(new SourceConnection() - .withSourceId(UUID.randomUUID()) - .withWorkspaceId(UUID.randomUUID()))); - when(secretsRepositoryReader.listDestinationConnectionWithSecrets()) - // First called for export - .thenReturn(List.of(destinationConnection, - new DestinationConnection() - .withDestinationId(UUID.randomUUID()) - .withWorkspaceId(UUID.randomUUID()))) - // then called for import - .thenReturn(List.of(new DestinationConnection() - .withDestinationId(UUID.randomUUID()) - .withWorkspaceId(UUID.randomUUID()))); - when(configRepository.listStandardSyncOperations()) - // First called for export - .thenReturn(List.of(operation, - new StandardSyncOperation() - .withOperationId(UUID.randomUUID()) - .withWorkspaceId(UUID.randomUUID()))) - // then called for import - .thenReturn(List.of(new StandardSyncOperation() - .withOperationId(UUID.randomUUID()) - .withWorkspaceId(UUID.randomUUID()))); - when(configRepository.listStandardSyncs()) - // First called for export - .thenReturn(List.of(connection)) - // then called for import - .thenReturn(List.of()); - final File archive = configDumpExporter.exportWorkspace(workspaceId); - - final UUID newWorkspaceId = UUID.randomUUID(); - configDumpImporter.importIntoWorkspace(TEST_VERSION, newWorkspaceId, archive); - - verify(secretsRepositoryWriter) - .writeSourceConnection(Jsons.clone(sourceConnection).withWorkspaceId(newWorkspaceId), emptyConnectorSpec); - verify(secretsRepositoryWriter) - .writeDestinationConnection(Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId), emptyConnectorSpec); - verify(configRepository).writeStandardSyncOperation(Jsons.clone(operation).withWorkspaceId(newWorkspaceId)); - verify(configRepository).writeStandardSync(connection); - } - - @Test - void testReplaceDeploymentMetadata() throws Exception { - final UUID oldDeploymentUuid = UUID.randomUUID(); - final UUID newDeploymentUuid = UUID.randomUUID(); - - final JsonNode airbyteVersion = Jsons.deserialize("{\"key\":\"airbyte_version\",\"value\":\"dev\"}"); - final JsonNode serverUuid = Jsons.deserialize("{\"key\":\"server_uuid\",\"value\":\"e895a584-7dbf-48ce-ace6-0bc9ea570c34\"}"); - final JsonNode date = Jsons.deserialize("{\"key\":\"date\",\"value\":\"1956-08-17\"}"); - final JsonNode oldDeploymentId = Jsons.deserialize( - String.format("{\"key\":\"%s\",\"value\":\"%s\"}", DefaultJobPersistence.DEPLOYMENT_ID_KEY, oldDeploymentUuid)); - final JsonNode newDeploymentId = Jsons.deserialize( - String.format("{\"key\":\"%s\",\"value\":\"%s\"}", DefaultJobPersistence.DEPLOYMENT_ID_KEY, newDeploymentUuid)); - - final JobPersistence jobPersistence = mock(JobPersistence.class); - - // when new deployment id does not exist, the old deployment id is removed - when(jobPersistence.getDeployment()).thenReturn(Optional.empty()); - final Stream inputStream1 = Stream.of(airbyteVersion, serverUuid, date, oldDeploymentId); - final Stream outputStream1 = ConfigDumpImporter.replaceDeploymentMetadata(jobPersistence, inputStream1); - final Stream expectedStream1 = Stream.of(airbyteVersion, serverUuid, date); - assertEquals(expectedStream1.collect(Collectors.toList()), outputStream1.collect(Collectors.toList())); - - // when new deployment id exists, the old deployment id is replaced with the new one - when(jobPersistence.getDeployment()).thenReturn(Optional.of(newDeploymentUuid)); - final Stream inputStream2 = Stream.of(airbyteVersion, serverUuid, date, oldDeploymentId); - final Stream outputStream2 = ConfigDumpImporter.replaceDeploymentMetadata(jobPersistence, inputStream2); - final Stream expectedStream2 = Stream.of(airbyteVersion, serverUuid, date, newDeploymentId); - assertEquals(expectedStream2.collect(Collectors.toList()), outputStream2.collect(Collectors.toList())); - } - -} diff --git a/airbyte-server/src/test/java/io/airbyte/server/apis/ConfigurationApiTest.java b/airbyte-server/src/test/java/io/airbyte/server/apis/ConfigurationApiTest.java index af371faf2ffd2..0bba057a25aaa 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/apis/ConfigurationApiTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/apis/ConfigurationApiTest.java @@ -4,17 +4,15 @@ package io.airbyte.server.apis; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import io.airbyte.analytics.TrackingClient; -import io.airbyte.commons.io.FileTtlManager; import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.helpers.LogConfigs; -import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.SecretsRepositoryReader; import io.airbyte.config.persistence.SecretsRepositoryWriter; @@ -39,11 +37,9 @@ void testImportDefinitions() { final ConfigurationApi configurationApi = new ConfigurationApi( mock(ConfigRepository.class), mock(JobPersistence.class), - mock(ConfigPersistence.class), mock(SecretsRepositoryReader.class), mock(SecretsRepositoryWriter.class), mock(SynchronousSchedulerClient.class), - mock(FileTtlManager.class), mock(Database.class), mock(Database.class), mock(StatePersistence.class), @@ -56,7 +52,8 @@ void testImportDefinitions() { mock(EventRunner.class), mock(Flyway.class), mock(Flyway.class)); - assertTrue(configurationApi.canImportDefinitions()); + + assertFalse(configurationApi.getHealthCheck().getAvailable()); } } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java deleted file mode 100644 index e68c6aa7ce6ca..0000000000000 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java +++ /dev/null @@ -1,454 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.server.handlers; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.api.model.generated.ImportRead; -import io.airbyte.api.model.generated.ImportRead.StatusEnum; -import io.airbyte.api.model.generated.ImportRequestBody; -import io.airbyte.api.model.generated.UploadRead; -import io.airbyte.api.model.generated.WorkspaceIdRequestBody; -import io.airbyte.commons.io.FileTtlManager; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.string.Strings; -import io.airbyte.commons.version.AirbyteVersion; -import io.airbyte.config.ActorCatalog; -import io.airbyte.config.ConfigSchema; -import io.airbyte.config.DestinationConnection; -import io.airbyte.config.Notification; -import io.airbyte.config.Notification.NotificationType; -import io.airbyte.config.SlackNotificationConfiguration; -import io.airbyte.config.SourceConnection; -import io.airbyte.config.StandardDestinationDefinition; -import io.airbyte.config.StandardSourceDefinition; -import io.airbyte.config.StandardSourceDefinition.SourceType; -import io.airbyte.config.StandardSync; -import io.airbyte.config.StandardWorkspace; -import io.airbyte.config.init.YamlSeedConfigPersistence; -import io.airbyte.config.persistence.ConfigPersistence; -import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.config.persistence.DatabaseConfigPersistence; -import io.airbyte.config.persistence.SecretsRepositoryReader; -import io.airbyte.config.persistence.SecretsRepositoryWriter; -import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor; -import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator; -import io.airbyte.db.Database; -import io.airbyte.db.factory.DSLContextFactory; -import io.airbyte.db.factory.DataSourceFactory; -import io.airbyte.db.instance.test.TestDatabaseProviders; -import io.airbyte.persistence.job.DefaultJobPersistence; -import io.airbyte.persistence.job.JobPersistence; -import io.airbyte.persistence.job.WorkspaceHelper; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.ConnectorSpecification; -import io.airbyte.test.utils.DatabaseConnectionHelper; -import io.airbyte.validation.json.JsonValidationException; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import javax.sql.DataSource; -import org.jooq.DSLContext; -import org.jooq.SQLDialect; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.PostgreSQLContainer; -import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; -import org.testcontainers.shaded.org.apache.commons.io.FileUtils; - -class ArchiveHandlerTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(ArchiveHandlerTest.class); - - private static final AirbyteVersion VERSION = new AirbyteVersion("0.6.8"); - private static PostgreSQLContainer container; - - private DataSource dataSource; - private DSLContext dslContext; - private Database jobDatabase; - private Database configDatabase; - private JobPersistence jobPersistence; - private SecretsRepositoryReader secretsRepositoryReader; - private SecretsRepositoryWriter secretsRepositoryWriter; - private ConfigPersistence configPersistence; - private ConfigPersistence seedPersistence; - private JsonSecretsProcessor jsonSecretsProcessor; - private ConfigRepository configRepository; - private ArchiveHandler archiveHandler; - private WorkspaceHelper workspaceHelper; - - private static class NoOpFileTtlManager extends FileTtlManager { - - public NoOpFileTtlManager() { - super(1L, TimeUnit.MINUTES, 1L); - } - - @Override - public void register(final Path path) {} - - } - - @BeforeAll - static void dbSetup() { - container = new PostgreSQLContainer<>("postgres:13-alpine") - .withDatabaseName("airbyte") - .withUsername("docker") - .withPassword("docker"); - container.start(); - } - - @AfterAll - static void dbDown() { - container.close(); - } - - @BeforeEach - void setup() throws Exception { - dataSource = DatabaseConnectionHelper.createDataSource(container); - dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES); - final TestDatabaseProviders databaseProviders = new TestDatabaseProviders(dataSource, dslContext); - jobDatabase = databaseProviders.createNewJobsDatabase(); - configDatabase = databaseProviders.createNewConfigsDatabase(); - jobPersistence = new DefaultJobPersistence(jobDatabase); - seedPersistence = new YamlSeedConfigPersistence(YamlSeedConfigPersistence.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); - jsonSecretsProcessor = JsonSecretsProcessor.builder() - .maskSecrets(false) - .copySecrets(false) - .build(); - configPersistence = new DatabaseConfigPersistence(jobDatabase, jsonSecretsProcessor); - configPersistence.replaceAllConfigs(Collections.emptyMap(), false); - configPersistence.loadData(seedPersistence); - configRepository = new ConfigRepository(configPersistence, configDatabase); - secretsRepositoryReader = new SecretsRepositoryReader(configRepository, new NoOpSecretsHydrator()); - secretsRepositoryWriter = new SecretsRepositoryWriter(configRepository, Optional.empty(), Optional.empty()); - - jobPersistence.setVersion(VERSION.serialize()); - - workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence); - - archiveHandler = new ArchiveHandler( - VERSION, - configRepository, - secretsRepositoryReader, - secretsRepositoryWriter, - jobPersistence, - seedPersistence, - workspaceHelper, - new NoOpFileTtlManager(), - true); - } - - @AfterEach - void tearDown() throws Exception { - dslContext.close(); - DataSourceFactory.close(dataSource); - } - - /** - * After exporting and importing, the configs should remain the same. - */ - @Test - void testFullExportImportRoundTrip() throws Exception { - assertSameConfigDump(seedPersistence.dumpConfigs(), secretsRepositoryReader.dumpConfigsWithSecrets()); - - // Export the configs. - File archive = archiveHandler.exportData(); - - // After deleting the configs, the dump becomes empty. - configPersistence.replaceAllConfigs(Collections.emptyMap(), false); - assertSameConfigDump(Collections.emptyMap(), secretsRepositoryReader.dumpConfigsWithSecrets()); - - // After importing the configs, the dump is restored. - assertTrue(archive.exists()); - final ImportRead importResult = archiveHandler.importData(archive); - assertFalse(archive.exists()); - assertEquals(StatusEnum.SUCCEEDED, importResult.getStatus()); - assertSameConfigDump(seedPersistence.dumpConfigs(), secretsRepositoryReader.dumpConfigsWithSecrets()); - - // When a connector definition is in use, it will not be updated. - final UUID sourceS3DefinitionId = UUID.fromString("69589781-7828-43c5-9f63-8925b1c1ccc2"); - final String sourceS3DefinitionVersion = "0.0.0"; - final StandardSourceDefinition sourceS3Definition = seedPersistence.getConfig( - ConfigSchema.STANDARD_SOURCE_DEFINITION, - sourceS3DefinitionId.toString(), - StandardSourceDefinition.class) - // This source definition is on an old version - .withDockerImageTag(sourceS3DefinitionVersion) - .withTombstone(false); - final Notification notification = new Notification() - .withNotificationType(NotificationType.SLACK) - .withSendOnFailure(true) - .withSendOnSuccess(true) - .withSlackConfiguration(new SlackNotificationConfiguration().withWebhook("webhook-url")); - final StandardWorkspace workspace = new StandardWorkspace() - .withWorkspaceId(UUID.randomUUID()) - .withCustomerId(UUID.randomUUID()) - .withName("test-workspace") - .withSlug("random-string") - .withEmail("abc@xyz.com") - .withInitialSetupComplete(true) - .withAnonymousDataCollection(true) - .withNews(true) - .withSecurityUpdates(true) - .withDisplaySetupWizard(true) - .withTombstone(false) - .withNotifications(Collections.singletonList(notification)) - .withFirstCompletedSync(true) - .withFeedbackDone(true); - final SourceConnection source = new SourceConnection() - .withSourceDefinitionId(sourceS3DefinitionId) - .withSourceId(UUID.randomUUID()) - .withWorkspaceId(workspace.getWorkspaceId()) - .withName("Test source") - .withConfiguration(Jsons.deserialize("{}")) - .withTombstone(false); - - final StandardDestinationDefinition DESTINATION_S3 = new StandardDestinationDefinition() - .withName("S3") - .withDestinationDefinitionId(UUID.fromString("4816b78f-1489-44c1-9060-4b19d5fa9362")) - .withDockerRepository("airbyte/destination-s3") - .withDockerImageTag("0.1.12") - .withSpec(sourceS3Definition.getSpec()) - .withDocumentationUrl("https://docs.airbyte.io/integrations/destinations/s3") - .withTombstone(false); - - final DestinationConnection destination = new DestinationConnection() - .withName("Destination") - .withDestinationId(UUID.randomUUID()) - .withDestinationDefinitionId(DESTINATION_S3.getDestinationDefinitionId()) - .withConfiguration(Jsons.deserialize("{}")) - .withWorkspaceId(workspace.getWorkspaceId()); - - final ActorCatalog actorCatalog = new ActorCatalog() - .withId(UUID.randomUUID()) - .withCatalog(Jsons.deserialize("{}")) - .withCatalogHash(""); - - final StandardSync sync = new StandardSync() - .withName("Connection") - .withConnectionId(UUID.randomUUID()) - .withSourceId(source.getSourceId()) - .withDestinationId(destination.getDestinationId()) - .withCatalog(new ConfiguredAirbyteCatalog().withStreams(List.of())) - .withSourceCatalogId(actorCatalog.getId()) - .withManual(true); - - // Write source connection and an old source definition. - configPersistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, workspace.getWorkspaceId().toString(), workspace); - configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, sourceS3DefinitionId.toString(), sourceS3Definition); - configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, source.getSourceId().toString(), source); - - configPersistence.writeConfig(ConfigSchema.ACTOR_CATALOG, actorCatalog.getId().toString(), actorCatalog); - configPersistence.writeConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, DESTINATION_S3.getDestinationDefinitionId().toString(), - DESTINATION_S3); - configPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destination.getDestinationId().toString(), destination); - - configPersistence.writeConfig(ConfigSchema.STANDARD_SYNC, sync.getConnectionId().toString(), sync); - - // Export, wipe, and import the configs. - archive = archiveHandler.exportData(); - configPersistence.replaceAllConfigs(Collections.emptyMap(), false); - archiveHandler.importData(archive); - - // The version has not changed. - final StandardSourceDefinition actualS3Definition = configPersistence.getConfig( - ConfigSchema.STANDARD_SOURCE_DEFINITION, - sourceS3DefinitionId.toString(), - StandardSourceDefinition.class); - assertEquals(sourceS3DefinitionVersion, actualS3Definition.getDockerImageTag()); - } - - @Test - void testLightWeightExportImportRoundTrip() throws Exception { - assertSameConfigDump(seedPersistence.dumpConfigs(), secretsRepositoryReader.dumpConfigsWithSecrets()); - - // Insert some workspace data - final UUID workspaceId = UUID.randomUUID(); - setupTestData(workspaceId); - final Map> workspaceDump = secretsRepositoryReader.dumpConfigsWithSecrets(); - - // Insert some other workspace data - setupTestData(UUID.randomUUID()); - - // Export the first workspace configs - File archive = archiveHandler.exportWorkspace(new WorkspaceIdRequestBody().workspaceId(workspaceId)); - final File secondArchive = Files.createTempFile("tests", "archive").toFile(); - FileUtils.copyFile(archive, secondArchive); - - // After deleting all the configs, the dump becomes empty. - configPersistence.replaceAllConfigs(Collections.emptyMap(), false); - assertSameConfigDump(Collections.emptyMap(), secretsRepositoryReader.dumpConfigsWithSecrets()); - - // Restore default seed data - configPersistence.loadData(seedPersistence); - assertSameConfigDump(seedPersistence.dumpConfigs(), secretsRepositoryReader.dumpConfigsWithSecrets()); - - setupWorkspaceData(workspaceId); - - // After importing the configs, the first workspace is restored. - assertTrue(archive.exists()); - UploadRead uploadRead = archiveHandler.uploadArchiveResource(archive); - assertFalse(archive.exists()); - assertEquals(UploadRead.StatusEnum.SUCCEEDED, uploadRead.getStatus()); - ImportRead importResult = archiveHandler.importIntoWorkspace(new ImportRequestBody() - .resourceId(uploadRead.getResourceId()) - .workspaceId(workspaceId)); - assertEquals(StatusEnum.SUCCEEDED, importResult.getStatus()); - assertSameConfigDump(workspaceDump, secretsRepositoryReader.dumpConfigsWithSecrets()); - - // we modify first workspace - setupTestData(workspaceId); - final Map> secondWorkspaceDump = secretsRepositoryReader.dumpConfigsWithSecrets(); - - final UUID secondWorkspaceId = UUID.randomUUID(); - setupWorkspaceData(secondWorkspaceId); - - // the archive is importing again in another workspace - final UploadRead secondUploadRead = archiveHandler.uploadArchiveResource(secondArchive); - assertEquals(UploadRead.StatusEnum.SUCCEEDED, secondUploadRead.getStatus()); - final ImportRead secondImportResult = archiveHandler.importIntoWorkspace(new ImportRequestBody() - .resourceId(secondUploadRead.getResourceId()) - .workspaceId(secondWorkspaceId)); - assertEquals(StatusEnum.SUCCEEDED, secondImportResult.getStatus()); - - final UUID secondSourceId = secretsRepositoryReader.listSourceConnectionWithSecrets() - .stream() - .filter(sourceConnection -> secondWorkspaceId.equals(sourceConnection.getWorkspaceId())) - .map(SourceConnection::getSourceId) - .collect(Collectors.toList()).get(0); - - final StandardSourceDefinition standardSourceDefinition = new StandardSourceDefinition() - .withSourceDefinitionId(UUID.randomUUID()) - .withSourceType(SourceType.API) - .withName("random-source-1") - .withDockerImageTag("tag-1") - .withDockerRepository("repository-1") - .withDocumentationUrl("documentation-url-1") - .withIcon("icon-1") - .withSpec(new ConnectorSpecification()) - .withTombstone(false); - - final SourceConnection sourceConnection = new SourceConnection() - .withWorkspaceId(secondWorkspaceId) - .withSourceId(secondSourceId) - .withName("Some new names") - .withSourceDefinitionId(standardSourceDefinition.getSourceDefinitionId()) - .withTombstone(false) - .withConfiguration(Jsons.emptyObject()); - - final ConnectorSpecification emptyConnectorSpec = mock(ConnectorSpecification.class); - when(emptyConnectorSpec.getConnectionSpecification()).thenReturn(Jsons.emptyObject()); - - configRepository.writeStandardSourceDefinition(standardSourceDefinition); - secretsRepositoryWriter.writeSourceConnection(sourceConnection, emptyConnectorSpec); - - // check that first workspace is unchanged even though modifications were made to second workspace - // (that contains similar connections from importing the same archive) - archive = archiveHandler.exportWorkspace(new WorkspaceIdRequestBody().workspaceId(workspaceId)); - configPersistence.replaceAllConfigs(Collections.emptyMap(), false); - configPersistence.loadData(seedPersistence); - setupWorkspaceData(workspaceId); - uploadRead = archiveHandler.uploadArchiveResource(archive); - assertEquals(UploadRead.StatusEnum.SUCCEEDED, uploadRead.getStatus()); - importResult = archiveHandler.importIntoWorkspace(new ImportRequestBody() - .resourceId(uploadRead.getResourceId()) - .workspaceId(workspaceId)); - assertEquals(StatusEnum.SUCCEEDED, importResult.getStatus()); - assertSameConfigDump(secondWorkspaceDump, secretsRepositoryReader.dumpConfigsWithSecrets()); - } - - private void setupWorkspaceData(final UUID workspaceId) throws IOException, JsonValidationException { - configPersistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString(), new StandardWorkspace() - .withWorkspaceId(workspaceId) - .withName("test-workspace") - .withSlug(workspaceId.toString()) - .withInitialSetupComplete(false) - .withTombstone(false)); - } - - private void setupTestData(final UUID workspaceId) throws JsonValidationException, IOException { - // Fill up with some configurations - setupWorkspaceData(workspaceId); - final UUID sourceid = UUID.randomUUID(); - configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, sourceid.toString(), new SourceConnection() - .withSourceId(sourceid) - .withWorkspaceId(workspaceId) - .withSourceDefinitionId(UUID.fromString("ef69ef6e-aa7f-4af1-a01d-ef775033524e")) // GitHub source definition - .withName("test-source") - .withConfiguration(Jsons.jsonNode(ImmutableMap.of("start_date", "2021-03-01T00:00:00Z", "repository", "airbytehq/airbyte"))) - .withTombstone(false)); - final UUID destinationId = UUID.randomUUID(); - configPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId.toString(), new DestinationConnection() - .withDestinationId(destinationId) - .withWorkspaceId(workspaceId) - .withDestinationDefinitionId(UUID.fromString("079d5540-f236-4294-ba7c-ade8fd918496")) // BigQuery destination definition - .withName("test-destination") - .withConfiguration(Jsons.jsonNode(ImmutableMap.of("project_id", "project", "dataset_id", "dataset"))) - .withTombstone(false)); - } - - private void assertSameConfigDump(final Map> expected, final Map> actual) { - assertEquals(expected.keySet(), actual.keySet(), - String.format("The expected (%s) vs actual (%s) streams does not match", expected.size(), actual.size())); - for (final String stream : expected.keySet()) { - LOGGER.info("Checking stream {}", stream); - // assertEquals cannot correctly check the equality of two maps with stream values, - // so streams are converted to sets before being compared. - final Set expectedRecords = expected.get(stream).collect(Collectors.toSet()); - final Set actualRecords = actual.get(stream).collect(Collectors.toSet()); - for (final var expectedRecord : expectedRecords) { - assertTrue( - backwardCompatibleContains(actualRecords, expectedRecord), - String.format( - "\n Expected record was not found:\n%s\n Actual records were:\n%s\n", - expectedRecord, - Strings.join(actualRecords, "\n"))); - } - assertEquals(expectedRecords.size(), actualRecords.size(), - String.format( - "The expected vs actual records does not match:\n expected records:\n%s\n actual records\n%s\n", - Strings.join(expectedRecords, "\n"), - Strings.join(actualRecords, "\n"))); - } - } - - /* - * The protocol version is currently optional and defaults to 0.2.0 To reflect that today we need to - * support connectors without protocol version in the spec, we add a secondary check with the - * default protocol version - */ - private boolean backwardCompatibleContains(final Set actualRecords, final JsonNode expectedRecord) { - return actualRecords.contains(expectedRecord) || - (!expectedRecord.has("protocolVersion") && actualRecords.contains(cloneWithDefaultVersion(expectedRecord))); - } - - private JsonNode cloneWithDefaultVersion(final JsonNode json) { - final ObjectNode clonedJson = json.deepCopy(); - clonedJson.put("protocolVersion", "0.2.0"); - return clonedJson; - } - -} diff --git a/airbyte-webapp/src/test-utils/mock-data/mockDestinationDefinition.json b/airbyte-webapp/src/test-utils/mock-data/mockDestinationDefinition.json index 40a038b7bc6a0..281bef4449645 100644 --- a/airbyte-webapp/src/test-utils/mock-data/mockDestinationDefinition.json +++ b/airbyte-webapp/src/test-utils/mock-data/mockDestinationDefinition.json @@ -146,7 +146,12 @@ }, { "title": "verify-full", - "required": ["mode", "ca_certificate", "client_certificate", "client_key"], + "required": [ + "mode", + "ca_certificate", + "client_certificate", + "client_key" + ], "properties": { "mode": { "enum": ["verify-full"], @@ -218,7 +223,13 @@ }, { "title": "SSH Key Authentication", - "required": ["tunnel_method", "tunnel_host", "tunnel_port", "tunnel_user", "ssh_key"], + "required": [ + "tunnel_method", + "tunnel_host", + "tunnel_port", + "tunnel_user", + "ssh_key" + ], "properties": { "ssh_key": { "type": "string", @@ -260,7 +271,13 @@ }, { "title": "Password Authentication", - "required": ["tunnel_method", "tunnel_host", "tunnel_port", "tunnel_user", "tunnel_user_password"], + "required": [ + "tunnel_method", + "tunnel_host", + "tunnel_port", + "tunnel_user", + "tunnel_user_password" + ], "properties": { "tunnel_host": { "type": "string", diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ApplicationBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ApplicationBeanFactory.java index c907c93bd875a..411a0c97bece8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ApplicationBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ApplicationBeanFactory.java @@ -126,7 +126,6 @@ public JobTracker jobTracker( @Requires(env = WorkerMode.CONTROL_PLANE) public JsonSecretsProcessor jsonSecretsProcessor(final FeatureFlags featureFlags) { return JsonSecretsProcessor.builder() - .maskSecrets(!featureFlags.exposeSecretsInExport()) .copySecrets(false) .build(); }