From 9b05e228dee815145ce70b9558d19dd7ef29c484 Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Thu, 5 Jan 2023 12:51:05 -0500 Subject: [PATCH] Migrate airbyte-bootloader to Micronaut --- .env | 5 +- .env.dev | 4 + airbyte-bootloader/{readme.md => Readme.md} | 2 +- airbyte-bootloader/build.gradle | 48 ++- .../io/airbyte/bootloader/Application.java | 30 ++ .../io/airbyte/bootloader/Bootloader.java | 291 +++++++++++++ .../io/airbyte/bootloader/BootloaderApp.java | 405 ------------------ .../bootloader/ProtocolVersionChecker.java | 56 ++- .../io/airbyte/bootloader/SecretMigrator.java | 17 +- .../config/ApplicationBeanFactory.java | 74 ++++ .../config/DatabaseBeanFactory.java | 130 ++++++ .../config/SecretPersistenceBeanFactory.java | 110 +++++ .../src/main/resources/application.yml | 115 +++++ .../src/main/resources/micronaut-banner.txt | 8 + .../airbyte/bootloader/BootloaderAppTest.java | 358 ---------------- .../io/airbyte/bootloader/BootloaderTest.java | 400 +++++++++++++++++ .../ProtocolVersionCheckerTest.java | 83 ++-- .../config/SecretPersistenceBeanFactory.java | 13 + .../src/main/resources/application.yml | 3 + charts/airbyte/templates/env-configmap.yaml | 1 + docker-compose.yaml | 4 +- 21 files changed, 1329 insertions(+), 828 deletions(-) rename airbyte-bootloader/{readme.md => Readme.md} (77%) create mode 100644 airbyte-bootloader/src/main/java/io/airbyte/bootloader/Application.java create mode 100644 airbyte-bootloader/src/main/java/io/airbyte/bootloader/Bootloader.java delete mode 100644 airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java create mode 100644 airbyte-bootloader/src/main/java/io/airbyte/bootloader/config/ApplicationBeanFactory.java create mode 100644 airbyte-bootloader/src/main/java/io/airbyte/bootloader/config/DatabaseBeanFactory.java create mode 100644 airbyte-bootloader/src/main/java/io/airbyte/bootloader/config/SecretPersistenceBeanFactory.java create mode 100644 airbyte-bootloader/src/main/resources/application.yml create mode 100644 airbyte-bootloader/src/main/resources/micronaut-banner.txt delete mode 100644 airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java create mode 100644 airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java diff --git a/.env b/.env index 32c5ba3c49525..eb6ccc10f1de1 100644 --- a/.env +++ b/.env @@ -39,6 +39,10 @@ BASIC_AUTH_PASSWORD=password BASIC_AUTH_PROXY_TIMEOUT=600 ### DATABASE ### +# Airbyte Bootloader environment variables +BOOTLOADER_MIGRATION_BASELINE_VERSION=0.29.0.001 +RUN_DATABASE_MIGRATION_ON_STARTUP=true + # Airbyte Internal Job Database, see https://docs.airbyte.io/operator-guides/configuring-airbyte-db DATABASE_USER=docker DATABASE_PASSWORD=docker @@ -115,4 +119,3 @@ OTEL_COLLECTOR_ENDPOINT="http://host.docker.internal:4317" USE_STREAM_CAPABLE_STATE=true AUTO_DETECT_SCHEMA=false - diff --git a/.env.dev b/.env.dev index 4f25503451fc3..e9f771a444dc3 100644 --- a/.env.dev +++ b/.env.dev @@ -30,6 +30,10 @@ WORKERS_MICRONAUT_ENVIRONMENTS=control-plane CRON_MICRONAUT_ENVIRONMENTS=control-plane AUTO_DETECT_SCHEMA=false +# Airbyte Bootloader environment variables +BOOTLOADER_MIGRATION_BASELINE_VERSION=0.29.0.001 +RUN_DATABASE_MIGRATION_ON_STARTUP=true + # Sentry SENTRY_DSN="" diff --git a/airbyte-bootloader/readme.md b/airbyte-bootloader/Readme.md similarity index 77% rename from airbyte-bootloader/readme.md rename to airbyte-bootloader/Readme.md index c27261073832f..831613918da99 100644 --- a/airbyte-bootloader/readme.md +++ b/airbyte-bootloader/Readme.md @@ -3,4 +3,4 @@ This application runs at start up for Airbyte. It is responsible for making sure that the environment is upgraded and in a good state. e.g. It makes sure the database has been migrated to the correct version. ## Entrypoint -* BootloaderApp.java - has the main method for running the bootloader. +* Application.java - has the main method for running the bootloader. diff --git a/airbyte-bootloader/build.gradle b/airbyte-bootloader/build.gradle index ac945a727c417..ad9c57c65dbe6 100644 --- a/airbyte-bootloader/build.gradle +++ b/airbyte-bootloader/build.gradle @@ -3,6 +3,21 @@ plugins { } dependencies { + annotationProcessor platform(libs.micronaut.bom) + annotationProcessor libs.bundles.micronaut.annotation.processor + + implementation platform(libs.micronaut.bom) + implementation libs.bundles.micronaut + + // Ensure that the versions defined in deps.toml are used + // instead of versions from transitive dependencies + implementation (libs.flyway.core) { + force = true + } + implementation (libs.jooq) { + force = true + } + implementation project(':airbyte-config:init') implementation project(':airbyte-config:config-models') implementation project(':airbyte-config:config-persistence') @@ -11,19 +26,42 @@ dependencies { implementation project(':airbyte-protocol:protocol-models') implementation project(':airbyte-persistence:job-persistence') - implementation libs.temporal.sdk - implementation libs.flyway.core + testAnnotationProcessor platform(libs.micronaut.bom) + testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor - testImplementation libs.platform.testcontainers.postgresql + testImplementation libs.bundles.micronaut.test + testImplementation libs.bundles.junit testImplementation libs.junit.jupiter.system.stubs + testImplementation libs.platform.testcontainers.postgresql } +mainClassName = 'io.airbyte.bootloader.Application' + application { - applicationName = "airbyte-bootloader" - mainClass = 'io.airbyte.bootloader.BootloaderApp' + applicationName = project.name + mainClass = mainClassName applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0'] } +Properties env = new Properties() +rootProject.file('.env.dev').withInputStream { env.load(it) } + +run { + // default for running on local machine. + env.each { entry -> + environment entry.getKey(), entry.getValue() + } + + environment 'AIRBYTE_ROLE', System.getenv('AIRBYTE_ROLE') + environment 'AIRBYTE_VERSION', env.VERSION + environment 'DATABASE_URL', 'jdbc:postgresql://localhost:5432/airbyte' +} + +test { + // Required to enable mocked beans + systemProperty("mockito.test.enabled", "true") +} + // produce reproducible archives // (see https://docs.gradle.org/current/userguide/working_with_files.html#sec:reproducible_archives) tasks.withType(AbstractArchiveTask) { diff --git a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/Application.java b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/Application.java new file mode 100644 index 0000000000000..d1bc3df9c3de9 --- /dev/null +++ b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/Application.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.bootloader; + +import io.micronaut.context.ApplicationContext; +import io.micronaut.runtime.Micronaut; +import lombok.extern.slf4j.Slf4j; + +/** + * Main application entry point responsible for starting the server and invoking the bootstrapping + * of the Airbyte environment. + */ +@Slf4j +public class Application { + + public static void main(final String[] args) { + try { + final ApplicationContext applicationContext = Micronaut.run(Application.class, args); + final Bootloader bootloader = applicationContext.getBean(Bootloader.class); + bootloader.load(); + System.exit(0); + } catch (final Exception e) { + log.error("Unable to bootstrap Airbyte environment.", e); + System.exit(-1); + } + } + +} diff --git a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/Bootloader.java b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/Bootloader.java new file mode 100644 index 0000000000000..e1247c6754a2b --- /dev/null +++ b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/Bootloader.java @@ -0,0 +1,291 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.bootloader; + +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.commons.features.FeatureFlags; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.commons.version.AirbyteProtocolVersionRange; +import io.airbyte.commons.version.AirbyteVersion; +import io.airbyte.config.Geography; +import io.airbyte.config.StandardWorkspace; +import io.airbyte.config.init.ApplyDefinitionsHelper; +import io.airbyte.config.init.DefinitionsProvider; +import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.db.init.DatabaseInitializationException; +import io.airbyte.db.init.DatabaseInitializer; +import io.airbyte.db.instance.DatabaseMigrator; +import io.airbyte.persistence.job.JobPersistence; +import io.airbyte.validation.json.JsonValidationException; +import io.micronaut.context.annotation.Value; +import jakarta.annotation.PostConstruct; +import jakarta.inject.Named; +import jakarta.inject.Singleton; +import java.io.IOException; +import java.util.Optional; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Ensures that the databases are migrated to the appropriate level. + */ +@Singleton +public class Bootloader { + + private static final Logger LOGGER = LoggerFactory.getLogger(Bootloader.class); + private static final AirbyteVersion VERSION_BREAK = new AirbyteVersion("0.32.0-alpha"); + + private final boolean autoUpgradeConnectors; + private final ConfigRepository configRepository; + private final DatabaseMigrator configsDatabaseMigrator; + private final DatabaseInitializer configsDatabaseInitializer; + private final AirbyteVersion currentAirbyteVersion; + private final Optional definitionsProvider; + private final FeatureFlags featureFlags; + private final DatabaseInitializer jobsDatabaseInitializer; + private final DatabaseMigrator jobsDatabaseMigrator; + private final JobPersistence jobPersistence; + private Runnable postLoadExecution; + private final ProtocolVersionChecker protocolVersionChecker; + private final boolean runMigrationOnStartup; + private final SecretMigrator secretMigrator; + + public Bootloader( + @Value("${airbyte.protocol.auto-upgrade-connectors}") final boolean autoUpgradeConnectors, + final ConfigRepository configRepository, + @Named("configsDatabaseInitializer") final DatabaseInitializer configsDatabaseInitializer, + @Named("configsDatabaseMigrator") final DatabaseMigrator configsDatabaseMigrator, + final AirbyteVersion currentAirbyteVersion, + final Optional definitionsProvider, + final FeatureFlags featureFlags, + @Named("jobsDatabaseInitializer") final DatabaseInitializer jobsDatabaseInitializer, + @Named("jobsDatabaseMigrator") final DatabaseMigrator jobsDatabaseMigrator, + final JobPersistence jobPersistence, + final ProtocolVersionChecker protocolVersionChecker, + @Value("${airbyte.run-migration-on-startup}") final boolean runMigrationOnStartup, + final SecretMigrator secretMigrator) { + this(autoUpgradeConnectors, configRepository, configsDatabaseInitializer, configsDatabaseMigrator, currentAirbyteVersion, + definitionsProvider, featureFlags, jobsDatabaseInitializer, jobsDatabaseMigrator, jobPersistence, protocolVersionChecker, + runMigrationOnStartup, secretMigrator, null); + } + + @VisibleForTesting + Bootloader( + @Value("${airbyte.protocol.auto-upgrade-connectors}") final boolean autoUpgradeConnectors, + final ConfigRepository configRepository, + @Named("configsDatabaseInitializer") final DatabaseInitializer configsDatabaseInitializer, + @Named("configsDatabaseMigrator") final DatabaseMigrator configsDatabaseMigrator, + final AirbyteVersion currentAirbyteVersion, + final Optional definitionsProvider, + final FeatureFlags featureFlags, + @Named("jobsDatabaseInitializer") final DatabaseInitializer jobsDatabaseInitializer, + @Named("jobsDatabaseMigrator") final DatabaseMigrator jobsDatabaseMigrator, + final JobPersistence jobPersistence, + final ProtocolVersionChecker protocolVersionChecker, + @Value("${airbyte.run-migration-on-startup}") final boolean runMigrationOnStartup, + final SecretMigrator secretMigrator, + final Runnable postLoadExecution) { + this.autoUpgradeConnectors = autoUpgradeConnectors; + this.configRepository = configRepository; + this.configsDatabaseInitializer = configsDatabaseInitializer; + this.configsDatabaseMigrator = configsDatabaseMigrator; + this.currentAirbyteVersion = currentAirbyteVersion; + this.definitionsProvider = definitionsProvider; + this.featureFlags = featureFlags; + this.jobsDatabaseInitializer = jobsDatabaseInitializer; + this.jobsDatabaseMigrator = jobsDatabaseMigrator; + this.jobPersistence = jobPersistence; + this.protocolVersionChecker = protocolVersionChecker; + this.runMigrationOnStartup = runMigrationOnStartup; + this.secretMigrator = secretMigrator; + this.postLoadExecution = postLoadExecution; + } + + /** + * Ensures additional initialization is performed after all dependency injection has completed. + */ + @PostConstruct + public void afterInitialization() { + postLoadExecution = () -> { + try { + final ApplyDefinitionsHelper applyDefinitionsHelper = + new ApplyDefinitionsHelper(configRepository, this.definitionsProvider.get(), jobPersistence); + applyDefinitionsHelper.apply(); + + if (featureFlags.forceSecretMigration() || !jobPersistence.isSecretMigrated()) { + if (this.secretMigrator != null) { + this.secretMigrator.migrateSecrets(); + LOGGER.info("Secrets successfully migrated."); + } + } + LOGGER.info("Loaded seed data.."); + } catch (final IOException | JsonValidationException | ConfigNotFoundException e) { + throw new RuntimeException(e); + } + }; + } + + /** + * Performs all required bootstrapping for the Airbyte environment. This includes the following: + *
    + *
  • Initializes the databases
  • + *
  • Check database migration compatibility
  • + *
  • Check protocol version compatibility
  • + *
  • Migrate databases
  • + *
  • Create default workspace
  • + *
  • Create default deployment
  • + *
  • Perform post migration tasks
  • + *
+ * + * @throws Exception if unable to perform any of the bootstrap operations. + */ + public void load() throws Exception { + LOGGER.info("Initializing databases..."); + initializeDatabases(); + + LOGGER.info("Checking migration compatibility..."); + assertNonBreakingMigration(jobPersistence, currentAirbyteVersion); + + LOGGER.info("Checking protocol version constraints..."); + assertNonBreakingProtocolVersionConstraints(protocolVersionChecker, jobPersistence, autoUpgradeConnectors); + + LOGGER.info("Running database migrations..."); + runFlywayMigration(runMigrationOnStartup, configsDatabaseMigrator, jobsDatabaseMigrator); + + LOGGER.info("Creating workspace (if none exists)..."); + createWorkspaceIfNoneExists(configRepository); + + LOGGER.info("Creating deployment (if none exists)..."); + createDeploymentIfNoneExists(jobPersistence); + + final String airbyteVersion = currentAirbyteVersion.serialize(); + LOGGER.info("Setting Airbyte version to '{}'...", airbyteVersion); + jobPersistence.setVersion(airbyteVersion); + LOGGER.info("Set version to '{}'", airbyteVersion); + + if (postLoadExecution != null) { + postLoadExecution.run(); + LOGGER.info("Finished running post load Execution."); + } + + LOGGER.info("Finished bootstrapping Airbyte environment."); + } + + private void assertNonBreakingMigration(final JobPersistence jobPersistence, final AirbyteVersion airbyteVersion) + throws IOException { + // version in the database when the server main method is called. may be empty if this is the first + // time the server is started. + LOGGER.info("Checking illegal upgrade..."); + final Optional initialAirbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new); + if (!isLegalUpgrade(initialAirbyteDatabaseVersion.orElse(null), airbyteVersion)) { + final String attentionBanner = MoreResources.readResource("banner/attention-banner.txt"); + LOGGER.error(attentionBanner); + final String message = String.format( + "Cannot upgrade from version %s to version %s directly. First you must upgrade to version %s. After that upgrade is complete, you may upgrade to version %s", + initialAirbyteDatabaseVersion.get().serialize(), + airbyteVersion.serialize(), + VERSION_BREAK.serialize(), + airbyteVersion.serialize()); + + LOGGER.error(message); + throw new RuntimeException(message); + } + } + + private void assertNonBreakingProtocolVersionConstraints(final ProtocolVersionChecker protocolVersionChecker, + final JobPersistence jobPersistence, + final boolean autoUpgradeConnectors) + throws Exception { + final Optional newProtocolRange = protocolVersionChecker.validate(autoUpgradeConnectors); + if (newProtocolRange.isEmpty()) { + throw new RuntimeException( + "Aborting bootloader to avoid breaking existing connection after an upgrade. " + + "Please address airbyte protocol version support issues in the connectors before retrying."); + } + trackProtocolVersion(jobPersistence, newProtocolRange.get()); + } + + private void createDeploymentIfNoneExists(final JobPersistence jobPersistence) throws IOException { + final Optional deploymentOptional = jobPersistence.getDeployment(); + if (deploymentOptional.isPresent()) { + LOGGER.info("running deployment: {}", deploymentOptional.get()); + } else { + final UUID deploymentId = UUID.randomUUID(); + jobPersistence.setDeployment(deploymentId); + LOGGER.info("created deployment: {}", deploymentId); + } + } + + private void createWorkspaceIfNoneExists(final ConfigRepository configRepository) throws JsonValidationException, IOException { + if (!configRepository.listStandardWorkspaces(true).isEmpty()) { + LOGGER.info("workspace already exists for the deployment."); + return; + } + + final UUID workspaceId = UUID.randomUUID(); + final StandardWorkspace workspace = new StandardWorkspace() + .withWorkspaceId(workspaceId) + .withCustomerId(UUID.randomUUID()) + .withName(workspaceId.toString()) + .withSlug(workspaceId.toString()) + .withInitialSetupComplete(false) + .withDisplaySetupWizard(true) + .withTombstone(false) + .withDefaultGeography(Geography.AUTO); + // NOTE: it's safe to use the NoSecrets version since we know that the user hasn't supplied any + // secrets yet. + configRepository.writeStandardWorkspaceNoSecrets(workspace); + } + + private void initializeDatabases() throws DatabaseInitializationException { + LOGGER.info("Initializing databases..."); + configsDatabaseInitializer.initialize(); + jobsDatabaseInitializer.initialize(); + LOGGER.info("Databases initialized."); + } + + @VisibleForTesting + boolean isLegalUpgrade(final AirbyteVersion airbyteDatabaseVersion, final AirbyteVersion airbyteVersion) { + // means there was no previous version so upgrade even needs to happen. always legal. + if (airbyteDatabaseVersion == null) { + LOGGER.info("No previous Airbyte Version set."); + return true; + } + + LOGGER.info("Current Airbyte version: {}", airbyteDatabaseVersion); + LOGGER.info("Future Airbyte version: {}", airbyteVersion); + final var futureVersionIsAfterVersionBreak = airbyteVersion.greaterThan(VERSION_BREAK) || airbyteVersion.isDev(); + final var isUpgradingThroughVersionBreak = airbyteDatabaseVersion.lessThan(VERSION_BREAK) && futureVersionIsAfterVersionBreak; + return !isUpgradingThroughVersionBreak; + } + + private void runFlywayMigration(final boolean runDatabaseMigrationOnStartup, + final DatabaseMigrator configDbMigrator, + final DatabaseMigrator jobDbMigrator) { + LOGGER.info("Creating baseline for config database..."); + configDbMigrator.createBaseline(); + LOGGER.info("Creating baseline for job database..."); + jobDbMigrator.createBaseline(); + + if (runDatabaseMigrationOnStartup) { + LOGGER.info("Migrating configs database"); + configDbMigrator.migrate(); + LOGGER.info("Migrating jobs database"); + jobDbMigrator.migrate(); + } else { + LOGGER.info("Auto database migration is skipped"); + } + } + + private void trackProtocolVersion(final JobPersistence jobPersistence, final AirbyteProtocolVersionRange protocolVersionRange) + throws IOException { + jobPersistence.setAirbyteProtocolVersionMin(protocolVersionRange.min()); + jobPersistence.setAirbyteProtocolVersionMax(protocolVersionRange.max()); + LOGGER.info("AirbyteProtocol version support range [{}:{}]", protocolVersionRange.min().serialize(), protocolVersionRange.max().serialize()); + } + +} diff --git a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java deleted file mode 100644 index dd60a92159908..0000000000000 --- a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java +++ /dev/null @@ -1,405 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.bootloader; - -import io.airbyte.commons.features.EnvVariableFeatureFlags; -import io.airbyte.commons.features.FeatureFlags; -import io.airbyte.commons.lang.CloseableShutdownHook; -import io.airbyte.commons.resources.MoreResources; -import io.airbyte.commons.version.AirbyteProtocolVersionRange; -import io.airbyte.commons.version.AirbyteVersion; -import io.airbyte.config.Configs; -import io.airbyte.config.EnvConfigs; -import io.airbyte.config.Geography; -import io.airbyte.config.StandardWorkspace; -import io.airbyte.config.init.ApplyDefinitionsHelper; -import io.airbyte.config.init.DefinitionsProvider; -import io.airbyte.config.init.LocalDefinitionsProvider; -import io.airbyte.config.persistence.ConfigNotFoundException; -import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.config.persistence.SecretsRepositoryReader; -import io.airbyte.config.persistence.SecretsRepositoryWriter; -import io.airbyte.config.persistence.split_secrets.SecretPersistence; -import io.airbyte.config.persistence.split_secrets.SecretsHydrator; -import io.airbyte.db.Database; -import io.airbyte.db.factory.DSLContextFactory; -import io.airbyte.db.factory.DataSourceFactory; -import io.airbyte.db.factory.DatabaseCheckFactory; -import io.airbyte.db.factory.DatabaseDriver; -import io.airbyte.db.factory.FlywayFactory; -import io.airbyte.db.instance.DatabaseConstants; -import io.airbyte.db.instance.DatabaseMigrator; -import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; -import io.airbyte.db.instance.jobs.JobsDatabaseMigrator; -import io.airbyte.persistence.job.DefaultJobPersistence; -import io.airbyte.persistence.job.JobPersistence; -import io.airbyte.validation.json.JsonValidationException; -import java.io.IOException; -import java.util.Optional; -import java.util.UUID; -import javax.sql.DataSource; -import org.flywaydb.core.Flyway; -import org.jooq.DSLContext; -import org.jooq.SQLDialect; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Just like how the Linux bootloader paves the way for the OS to start, this class is responsible - * for setting up the Airbyte environment so the rest of the Airbyte applications can safely start. - *

- * This includes: - *

- * - creating databases, if needed. - *

- * - ensuring all required database migrations are run. - *

- * - setting all required Airbyte metadata information. - */ -@SuppressWarnings("PMD.UnusedPrivateField") -public class BootloaderApp { - - private static final Logger LOGGER = LoggerFactory.getLogger(BootloaderApp.class); - private static final AirbyteVersion VERSION_BREAK = new AirbyteVersion("0.32.0-alpha"); - private static final String DRIVER_CLASS_NAME = DatabaseDriver.POSTGRESQL.getDriverClassName(); - - private final Configs configs; - private final Runnable postLoadExecution; - private final FeatureFlags featureFlags; - private final SecretMigrator secretMigrator; - private ConfigRepository configRepository; - private Optional definitionsProvider; - private Database configDatabase; - private Database jobDatabase; - private JobPersistence jobPersistence; - private final Flyway configsFlyway; - private final Flyway jobsFlyway; - private final DSLContext configsDslContext; - private final DSLContext jobsDslContext; - - // This controls how we check the protocol version compatibility - // True means that the connectors will be forcefully upgraded regardless of whether they are used in - // an active sync or not. - // This should be moved to a Configs, however, this behavior is currently forced through hooks that - // are passed as the postLoadExecution. - private final boolean autoUpgradeConnectors; - - /** - * This method is exposed for Airbyte Cloud consumption. This lets us override the seed loading - * logic and customise Cloud connector versions. Please check with the Platform team before making - * changes to this method. - * - * @param configs - * @param postLoadExecution - * @param featureFlags - * @param secretMigrator - * @param configsDslContext - */ - public BootloaderApp(final Configs configs, - final Runnable postLoadExecution, - final FeatureFlags featureFlags, - final SecretMigrator secretMigrator, - final DSLContext configsDslContext, - final DSLContext jobsDslContext, - final Flyway configsFlyway, - final Flyway jobsFlyway, - final Optional definitionsProvider, - final boolean autoUpgradeConnectors) { - this.configs = configs; - this.postLoadExecution = postLoadExecution; - this.featureFlags = featureFlags; - this.secretMigrator = secretMigrator; - this.configsDslContext = configsDslContext; - this.configsFlyway = configsFlyway; - this.jobsDslContext = jobsDslContext; - this.jobsFlyway = jobsFlyway; - this.definitionsProvider = definitionsProvider; - this.autoUpgradeConnectors = autoUpgradeConnectors; - - initPersistences(configsDslContext, jobsDslContext); - } - - // Temporary duplication of constructor, to remove once Cloud has been migrated to the one above. - @Deprecated(forRemoval = true) - public BootloaderApp(final Configs configs, - final Runnable postLoadExecution, - final FeatureFlags featureFlags, - final SecretMigrator secretMigrator, - final DSLContext configsDslContext, - final DSLContext jobsDslContext, - final Flyway configsFlyway, - final Flyway jobsFlyway) { - this.configs = configs; - this.postLoadExecution = postLoadExecution; - this.featureFlags = featureFlags; - this.secretMigrator = secretMigrator; - this.configsDslContext = configsDslContext; - this.configsFlyway = configsFlyway; - this.jobsDslContext = jobsDslContext; - this.jobsFlyway = jobsFlyway; - this.autoUpgradeConnectors = false; - - try { - this.definitionsProvider = Optional.of(getLocalDefinitionsProvider()); - } catch (final IOException e) { - LOGGER.error("Unable to initialize persistence.", e); - } - - initPersistences(configsDslContext, jobsDslContext); - } - - public BootloaderApp(final Configs configs, - final FeatureFlags featureFlags, - final SecretMigrator secretMigrator, - final DSLContext configsDslContext, - final DSLContext jobsDslContext, - final Flyway configsFlyway, - final Flyway jobsFlyway, - final DefinitionsProvider definitionsProvider, - final boolean autoUpgradeConnectors) { - this.configs = configs; - this.featureFlags = featureFlags; - this.secretMigrator = secretMigrator; - this.configsDslContext = configsDslContext; - this.configsFlyway = configsFlyway; - this.jobsDslContext = jobsDslContext; - this.jobsFlyway = jobsFlyway; - this.definitionsProvider = Optional.of(definitionsProvider); - this.autoUpgradeConnectors = autoUpgradeConnectors; - - initPersistences(configsDslContext, jobsDslContext); - - postLoadExecution = () -> { - try { - final ApplyDefinitionsHelper applyDefinitionsHelper = - new ApplyDefinitionsHelper(configRepository, this.definitionsProvider.get(), jobPersistence); - applyDefinitionsHelper.apply(); - - if (featureFlags.forceSecretMigration() || !jobPersistence.isSecretMigrated()) { - if (this.secretMigrator != null) { - this.secretMigrator.migrateSecrets(); - LOGGER.info("Secrets successfully migrated."); - } - } - LOGGER.info("Loaded seed data.."); - } catch (final IOException | JsonValidationException | ConfigNotFoundException e) { - throw new RuntimeException(e); - } - }; - } - - public void load() throws Exception { - LOGGER.info("Initializing databases..."); - DatabaseCheckFactory.createConfigsDatabaseInitializer(configsDslContext, - configs.getConfigsDatabaseInitializationTimeoutMs(), MoreResources.readResource(DatabaseConstants.CONFIGS_INITIAL_SCHEMA_PATH)).initialize(); - - DatabaseCheckFactory.createJobsDatabaseInitializer(jobsDslContext, - configs.getJobsDatabaseInitializationTimeoutMs(), MoreResources.readResource(DatabaseConstants.JOBS_INITIAL_SCHEMA_PATH)).initialize(); - LOGGER.info("Databases initialized."); - - LOGGER.info("Setting up config database and default workspace..."); - final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase); - final AirbyteVersion currAirbyteVersion = configs.getAirbyteVersion(); - assertNonBreakingMigration(jobPersistence, currAirbyteVersion); - - final ProtocolVersionChecker protocolVersionChecker = - new ProtocolVersionChecker(jobPersistence, configs, configRepository, definitionsProvider); - assertNonBreakingProtocolVersionConstraints(protocolVersionChecker, jobPersistence, autoUpgradeConnectors); - - // TODO Will be converted to an injected singleton during DI migration - final DatabaseMigrator configDbMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway); - final DatabaseMigrator jobDbMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway); - - runFlywayMigration(configs, configDbMigrator, jobDbMigrator); - LOGGER.info("Ran Flyway migrations."); - - createWorkspaceIfNoneExists(configRepository); - LOGGER.info("Default workspace created."); - - createDeploymentIfNoneExists(jobPersistence); - LOGGER.info("Default deployment created."); - - jobPersistence.setVersion(currAirbyteVersion.serialize()); - LOGGER.info("Set version to {}", currAirbyteVersion); - - postLoadExecution.run(); - - LOGGER.info("Finished running post load Execution."); - - LOGGER.info("Finished bootstrapping Airbyte environment."); - } - - private static Database getConfigDatabase(final DSLContext dslContext) throws IOException { - return new Database(dslContext); - } - - static DefinitionsProvider getLocalDefinitionsProvider() throws IOException { - return new LocalDefinitionsProvider(LocalDefinitionsProvider.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); - } - - private static Database getJobDatabase(final DSLContext dslContext) throws IOException { - return new Database(dslContext); - } - - private static JobPersistence getJobPersistence(final Database jobDatabase) throws IOException { - return new DefaultJobPersistence(jobDatabase); - } - - private void initPersistences(final DSLContext configsDslContext, final DSLContext jobsDslContext) { - try { - configDatabase = getConfigDatabase(configsDslContext); - configRepository = new ConfigRepository(configDatabase); - jobDatabase = getJobDatabase(jobsDslContext); - jobPersistence = getJobPersistence(jobDatabase); - } catch (final IOException e) { - LOGGER.error("Unable to initialize persistence.", e); - } - } - - public static void main(final String[] args) throws Exception { - final Configs configs = new EnvConfigs(); - final FeatureFlags featureFlags = new EnvVariableFeatureFlags(); - - // Manual configuration that will be replaced by Dependency Injection in the future - final DataSource configsDataSource = DataSourceFactory.create(configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), - DRIVER_CLASS_NAME, configs.getConfigDatabaseUrl()); - final DataSource jobsDataSource = - DataSourceFactory.create(configs.getDatabaseUser(), configs.getDatabasePassword(), DRIVER_CLASS_NAME, configs.getDatabaseUrl()); - - try (final DSLContext configsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES); - final DSLContext jobsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES)) { - - // TODO Will be converted to an injected singleton during DI migration - final Database configDatabase = getConfigDatabase(configsDslContext); - final ConfigRepository configRepository = new ConfigRepository(configDatabase); - final Database jobDatabase = getJobDatabase(jobsDslContext); - final JobPersistence jobPersistence = getJobPersistence(jobDatabase); - - final SecretsHydrator secretsHydrator = SecretPersistence.getSecretsHydrator(configsDslContext, configs); - final Optional secretPersistence = SecretPersistence.getLongLived(configsDslContext, configs); - final SecretsRepositoryReader secretsRepositoryReader = new SecretsRepositoryReader(configRepository, secretsHydrator); - final SecretsRepositoryWriter secretsRepositoryWriter = new SecretsRepositoryWriter(configRepository, secretPersistence, Optional.empty()); - - final SecretMigrator secretMigrator = - new SecretMigrator(secretsRepositoryReader, secretsRepositoryWriter, configRepository, jobPersistence, secretPersistence); - final Flyway configsFlyway = FlywayFactory.create(configsDataSource, BootloaderApp.class.getSimpleName(), ConfigsDatabaseMigrator.DB_IDENTIFIER, - ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION); - final Flyway jobsFlyway = FlywayFactory.create(jobsDataSource, BootloaderApp.class.getSimpleName(), JobsDatabaseMigrator.DB_IDENTIFIER, - JobsDatabaseMigrator.MIGRATION_FILE_LOCATION); - - // Ensure that the database resources are closed on application shutdown - CloseableShutdownHook.registerRuntimeShutdownHook(configsDataSource, jobsDataSource, configsDslContext, jobsDslContext); - - final DefinitionsProvider definitionsProvider = getLocalDefinitionsProvider(); - - final var bootloader = - new BootloaderApp(configs, featureFlags, secretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway, definitionsProvider, - false); - bootloader.load(); - } - } - - private static void createDeploymentIfNoneExists(final JobPersistence jobPersistence) throws IOException { - final Optional deploymentOptional = jobPersistence.getDeployment(); - if (deploymentOptional.isPresent()) { - LOGGER.info("running deployment: {}", deploymentOptional.get()); - } else { - final UUID deploymentId = UUID.randomUUID(); - jobPersistence.setDeployment(deploymentId); - LOGGER.info("created deployment: {}", deploymentId); - } - } - - private static void createWorkspaceIfNoneExists(final ConfigRepository configRepository) throws JsonValidationException, IOException { - if (!configRepository.listStandardWorkspaces(true).isEmpty()) { - LOGGER.info("workspace already exists for the deployment."); - return; - } - - final UUID workspaceId = UUID.randomUUID(); - final StandardWorkspace workspace = new StandardWorkspace() - .withWorkspaceId(workspaceId) - .withCustomerId(UUID.randomUUID()) - .withName(workspaceId.toString()) - .withSlug(workspaceId.toString()) - .withInitialSetupComplete(false) - .withDisplaySetupWizard(true) - .withTombstone(false) - .withDefaultGeography(Geography.AUTO); - // NOTE: it's safe to use the NoSecrets version since we know that the user hasn't supplied any - // secrets yet. - configRepository.writeStandardWorkspaceNoSecrets(workspace); - } - - private static void assertNonBreakingMigration(final JobPersistence jobPersistence, final AirbyteVersion airbyteVersion) - throws IOException { - // version in the database when the server main method is called. may be empty if this is the first - // time the server is started. - LOGGER.info("Checking illegal upgrade.."); - final Optional initialAirbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new); - if (!isLegalUpgrade(initialAirbyteDatabaseVersion.orElse(null), airbyteVersion)) { - final String attentionBanner = MoreResources.readResource("banner/attention-banner.txt"); - LOGGER.error(attentionBanner); - final String message = String.format( - "Cannot upgrade from version %s to version %s directly. First you must upgrade to version %s. After that upgrade is complete, you may upgrade to version %s", - initialAirbyteDatabaseVersion.get().serialize(), - airbyteVersion.serialize(), - VERSION_BREAK.serialize(), - airbyteVersion.serialize()); - - LOGGER.error(message); - throw new RuntimeException(message); - } - } - - private static void assertNonBreakingProtocolVersionConstraints(final ProtocolVersionChecker protocolVersionChecker, - final JobPersistence jobPersistence, - final boolean autoUpgradeConnectors) - throws Exception { - final Optional newProtocolRange = protocolVersionChecker.validate(autoUpgradeConnectors); - if (newProtocolRange.isEmpty()) { - throw new RuntimeException( - "Aborting bootloader to avoid breaking existing connection after an upgrade. " + - "Please address airbyte protocol version support issues in the connectors before retrying."); - } - trackProtocolVersion(jobPersistence, newProtocolRange.get()); - } - - private static void trackProtocolVersion(final JobPersistence jobPersistence, final AirbyteProtocolVersionRange protocolVersionRange) - throws IOException { - jobPersistence.setAirbyteProtocolVersionMin(protocolVersionRange.min()); - jobPersistence.setAirbyteProtocolVersionMax(protocolVersionRange.max()); - LOGGER.info("AirbyteProtocol version support range [{}:{}]", protocolVersionRange.min().serialize(), protocolVersionRange.max().serialize()); - } - - static boolean isLegalUpgrade(final AirbyteVersion airbyteDatabaseVersion, final AirbyteVersion airbyteVersion) { - // means there was no previous version so upgrade even needs to happen. always legal. - if (airbyteDatabaseVersion == null) { - LOGGER.info("No previous Airbyte Version set.."); - return true; - } - - LOGGER.info("Current Airbyte version: {}", airbyteDatabaseVersion); - LOGGER.info("Future Airbyte version: {}", airbyteVersion); - final var futureVersionIsAfterVersionBreak = airbyteVersion.greaterThan(VERSION_BREAK) || airbyteVersion.isDev(); - final var isUpgradingThroughVersionBreak = airbyteDatabaseVersion.lessThan(VERSION_BREAK) && futureVersionIsAfterVersionBreak; - return !isUpgradingThroughVersionBreak; - } - - private static void runFlywayMigration(final Configs configs, final DatabaseMigrator configDbMigrator, final DatabaseMigrator jobDbMigrator) { - configDbMigrator.createBaseline(); - jobDbMigrator.createBaseline(); - - if (configs.runDatabaseMigrationOnStartup()) { - LOGGER.info("Migrating configs database"); - configDbMigrator.migrate(); - LOGGER.info("Migrating jobs database"); - jobDbMigrator.migrate(); - } else { - LOGGER.info("Auto database migration is skipped"); - } - } - -} diff --git a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/ProtocolVersionChecker.java b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/ProtocolVersionChecker.java index e5de49cd3ba0d..9c4db162a69d5 100644 --- a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/ProtocolVersionChecker.java +++ b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/ProtocolVersionChecker.java @@ -9,12 +9,12 @@ import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.commons.version.Version; import io.airbyte.config.ActorType; -import io.airbyte.config.Configs; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.init.DefinitionsProvider; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.persistence.job.JobPersistence; +import jakarta.inject.Singleton; import java.io.IOException; import java.util.HashSet; import java.util.Map; @@ -26,24 +26,34 @@ import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; +/** + * Validates that all connectors support the desired target Airbyte protocol version. + */ +@Singleton @Slf4j public class ProtocolVersionChecker { private final JobPersistence jobPersistence; - private final Configs configs; + private final AirbyteProtocolVersionRange airbyteProtocolTargetVersionRange; private final ConfigRepository configRepository; private final Optional definitionsProvider; - // Dependencies could be simplified once we break some pieces up: - // * JobPersistence for accessing the airbyte_metadata table. - // * Configs for getting the new Airbyte Protocol Range from the env vars. - // * ConfigRepository for accessing ActorDefinitions + /** + * Constructs a new protocol version checker that verifies all connectors are within the provided + * target protocol version range. + * + * @param jobPersistence A {@link JobPersistence} instance. + * @param airbyteProtocolTargetVersionRange The target Airbyte protocol version range. + * @param configRepository A {@link ConfigRepository} instance + * @param definitionsProvider An {@link Optional} that may contain a {@link DefinitionsProvider} + * instance. + */ public ProtocolVersionChecker(final JobPersistence jobPersistence, - final Configs configs, + final AirbyteProtocolVersionRange airbyteProtocolTargetVersionRange, final ConfigRepository configRepository, final Optional definitionsProvider) { this.jobPersistence = jobPersistence; - this.configs = configs; + this.airbyteProtocolTargetVersionRange = airbyteProtocolTargetVersionRange; this.configRepository = configRepository; this.definitionsProvider = definitionsProvider; } @@ -124,7 +134,7 @@ protected Optional getCurrentAirbyteVersion() throws IOException } protected AirbyteProtocolVersionRange getTargetProtocolVersionRange() { - return new AirbyteProtocolVersionRange(configs.getAirbyteProtocolVersionMin(), configs.getAirbyteProtocolVersionMax()); + return airbyteProtocolTargetVersionRange; } protected Map> getConflictingActorDefinitions(final AirbyteProtocolVersionRange targetRange) throws IOException { @@ -165,17 +175,21 @@ protected Stream> getProtocolVersionsForActorDefinitions(fi return Stream.empty(); } - Stream> stream; - if (actorType == ActorType.SOURCE) { - stream = definitionsProvider.get().getSourceDefinitions() - .stream() - .map(def -> Map.entry(def.getSourceDefinitionId(), AirbyteProtocolVersion.getWithDefault(def.getSpec().getProtocolVersion()))); - } else { - stream = definitionsProvider.get().getDestinationDefinitions() - .stream() - .map(def -> Map.entry(def.getDestinationDefinitionId(), AirbyteProtocolVersion.getWithDefault(def.getSpec().getProtocolVersion()))); + return getActorVersions(actorType); + } + + private Stream> getActorVersions(final ActorType actorType) { + switch (actorType) { + case SOURCE: + return definitionsProvider.get().getSourceDefinitions() + .stream() + .map(def -> Map.entry(def.getSourceDefinitionId(), AirbyteProtocolVersion.getWithDefault(def.getSpec().getProtocolVersion()))); + case DESTINATION: + default: + return definitionsProvider.get().getDestinationDefinitions() + .stream() + .map(def -> Map.entry(def.getDestinationDefinitionId(), AirbyteProtocolVersion.getWithDefault(def.getSpec().getProtocolVersion()))); } - return stream; } private Stream formatActorDefinitionForLogging(final Set remainingDestConflicts, final Set remainingSourceConflicts) { @@ -186,7 +200,7 @@ private Stream formatActorDefinitionForLogging(final Set remaining sourceDef = configRepository.getStandardSourceDefinition(defId); return String.format("Source: %s: %s: protocol version: %s", sourceDef.getSourceDefinitionId(), sourceDef.getName(), sourceDef.getProtocolVersion()); - } catch (Exception e) { + } catch (final Exception e) { log.info("Failed to getStandardSourceDefinition for {}", defId, e); return String.format("Source: %s: Failed to fetch details...", defId); } @@ -196,7 +210,7 @@ private Stream formatActorDefinitionForLogging(final Set remaining final StandardDestinationDefinition destDef = configRepository.getStandardDestinationDefinition(defId); return String.format("Destination: %s: %s: protocol version: %s", destDef.getDestinationDefinitionId(), destDef.getName(), destDef.getProtocolVersion()); - } catch (Exception e) { + } catch (final Exception e) { log.info("Failed to getStandardDestinationDefinition for {}", defId, e); return String.format("Source: %s: Failed to fetch details...", defId); } diff --git a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/SecretMigrator.java b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/SecretMigrator.java index 456e85d38944f..c32b42770c7f2 100644 --- a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/SecretMigrator.java +++ b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/SecretMigrator.java @@ -19,6 +19,8 @@ import io.airbyte.persistence.job.JobPersistence; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.validation.json.JsonValidationException; +import jakarta.inject.Named; +import jakarta.inject.Singleton; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -26,11 +28,10 @@ import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; -import lombok.AllArgsConstructor; import lombok.Value; import lombok.extern.slf4j.Slf4j; -@AllArgsConstructor +@Singleton @Slf4j public class SecretMigrator { @@ -40,6 +41,18 @@ public class SecretMigrator { private final JobPersistence jobPersistence; private final Optional secretPersistence; + public SecretMigrator(final SecretsRepositoryReader secretsReader, + final SecretsRepositoryWriter secretsWriter, + final ConfigRepository configRepository, + final JobPersistence jobPersistence, + @Named("longLivedSecretPersistence") final Optional secretPersistence) { + this.secretsReader = secretsReader; + this.secretsWriter = secretsWriter; + this.configRepository = configRepository; + this.jobPersistence = jobPersistence; + this.secretPersistence = secretPersistence; + } + @Value static class ConnectorConfiguration { diff --git a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/config/ApplicationBeanFactory.java b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/config/ApplicationBeanFactory.java new file mode 100644 index 0000000000000..04a3a4fb4c6f2 --- /dev/null +++ b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/config/ApplicationBeanFactory.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.bootloader.config; + +import io.airbyte.commons.features.EnvVariableFeatureFlags; +import io.airbyte.commons.features.FeatureFlags; +import io.airbyte.commons.version.AirbyteProtocolVersionRange; +import io.airbyte.commons.version.AirbyteVersion; +import io.airbyte.commons.version.Version; +import io.airbyte.config.init.DefinitionsProvider; +import io.airbyte.config.init.LocalDefinitionsProvider; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.config.persistence.SecretsRepositoryReader; +import io.airbyte.config.persistence.SecretsRepositoryWriter; +import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor; +import io.airbyte.config.persistence.split_secrets.SecretPersistence; +import io.airbyte.config.persistence.split_secrets.SecretsHydrator; +import io.micronaut.context.annotation.Factory; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Named; +import jakarta.inject.Singleton; +import java.io.IOException; +import java.util.Optional; + +/** + * Micronaut bean factory for general application-related singletons. + */ +@Factory +@SuppressWarnings("PMD.AvoidDuplicateLiterals") +public class ApplicationBeanFactory { + + @Singleton + public AirbyteVersion airbyteVersion(@Value("${airbyte.version}") final String version) { + return new AirbyteVersion(version); + } + + @Singleton + public AirbyteProtocolVersionRange airbyteProtocolTargetVersionRange(@Value("${airbyte.protocol.target.range.min-version}") final String min, + @Value("${airbyte.protocol.target.range.max-version}") final String max) { + return new AirbyteProtocolVersionRange(new Version(min), new Version(max)); + } + + @Singleton + public DefinitionsProvider localDefinitionsProvider() throws IOException { + return new LocalDefinitionsProvider(LocalDefinitionsProvider.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); + } + + @Singleton + public FeatureFlags featureFlags() { + return new EnvVariableFeatureFlags(); + } + + @Singleton + public JsonSecretsProcessor jsonSecretsProcessor() { + return JsonSecretsProcessor.builder() + .copySecrets(false) + .build(); + } + + @Singleton + public SecretsRepositoryReader secretsRepositoryReader(final ConfigRepository configRepository, final SecretsHydrator secretsHydrator) { + return new SecretsRepositoryReader(configRepository, secretsHydrator); + } + + @Singleton + public SecretsRepositoryWriter secretsRepositoryWriter(final ConfigRepository configRepository, + @Named("longLivedSecretPersistence") final Optional longLivedSecretPersistence, + @Named("ephemeralSecretPersistence") final Optional ephemeralSecretPersistence) { + return new SecretsRepositoryWriter(configRepository, longLivedSecretPersistence, ephemeralSecretPersistence); + } + +} diff --git a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/config/DatabaseBeanFactory.java b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/config/DatabaseBeanFactory.java new file mode 100644 index 0000000000000..ba167a28f8894 --- /dev/null +++ b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/config/DatabaseBeanFactory.java @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.bootloader.config; + +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.db.Database; +import io.airbyte.db.check.impl.JobsDatabaseAvailabilityCheck; +import io.airbyte.db.factory.DatabaseCheckFactory; +import io.airbyte.db.init.DatabaseInitializer; +import io.airbyte.db.instance.DatabaseConstants; +import io.airbyte.db.instance.DatabaseMigrator; +import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; +import io.airbyte.db.instance.jobs.JobsDatabaseMigrator; +import io.airbyte.persistence.job.DefaultJobPersistence; +import io.airbyte.persistence.job.JobPersistence; +import io.micronaut.context.annotation.Factory; +import io.micronaut.context.annotation.Value; +import io.micronaut.flyway.FlywayConfigurationProperties; +import jakarta.inject.Named; +import jakarta.inject.Singleton; +import java.io.IOException; +import javax.sql.DataSource; +import org.flywaydb.core.Flyway; +import org.jooq.DSLContext; + +/** + * Micronaut bean factory for database-related singletons. + */ +@Factory +@SuppressWarnings("PMD.AvoidDuplicateLiterals") +public class DatabaseBeanFactory { + + private static final String BASELINE_DESCRIPTION = "Baseline from file-based migration v1"; + private static final Boolean BASELINE_ON_MIGRATION = true; + private static final String INSTALLED_BY = "BootloaderApp"; + + @Singleton + @Named("configDatabase") + public Database configDatabase(@Named("config") final DSLContext dslContext) throws IOException { + return new Database(dslContext); + } + + @Singleton + @Named("jobsDatabase") + public Database jobsDatabase(@Named("jobs") final DSLContext dslContext) throws IOException { + return new Database(dslContext); + } + + @Singleton + @Named("configFlyway") + public Flyway configFlyway(@Named("config") final FlywayConfigurationProperties configFlywayConfigurationProperties, + @Named("config") final DataSource configDataSource, + @Value("${airbyte.bootloader.migration-baseline-version}") final String baselineVersion) { + return configFlywayConfigurationProperties.getFluentConfiguration() + .dataSource(configDataSource) + .baselineVersion(baselineVersion) + .baselineDescription(BASELINE_DESCRIPTION) + .baselineOnMigrate(BASELINE_ON_MIGRATION) + .installedBy(INSTALLED_BY) + .table(String.format("airbyte_%s_migrations", "configs")) + .load(); + } + + @Singleton + @Named("jobsFlyway") + public Flyway jobsFlyway(@Named("jobs") final FlywayConfigurationProperties jobsFlywayConfigurationProperties, + @Named("jobs") final DataSource jobsDataSource, + @Value("${airbyte.bootloader.migration-baseline-version}") final String baselineVersion) { + return jobsFlywayConfigurationProperties.getFluentConfiguration() + .dataSource(jobsDataSource) + .baselineVersion(baselineVersion) + .baselineDescription(BASELINE_DESCRIPTION) + .baselineOnMigrate(BASELINE_ON_MIGRATION) + .installedBy(INSTALLED_BY) + .table(String.format("airbyte_%s_migrations", "jobs")) + .load(); + } + + @Singleton + public ConfigRepository configRepository(@Named("configDatabase") final Database configDatabase) { + return new ConfigRepository(configDatabase); + } + + @Singleton + public JobPersistence jobPersistence(@Named("jobsDatabase") final Database jobDatabase) { + return new DefaultJobPersistence(jobDatabase); + } + + @Singleton + @Named("configsDatabaseInitializer") + public DatabaseInitializer configsDatabaseInitializer(@Named("config") final DSLContext configsDslContext, + @Value("${airbyte.flyway.configs.initialization-timeout-ms}") final Long configsDatabaseInitializationTimeoutMs) + throws IOException { + return DatabaseCheckFactory.createConfigsDatabaseInitializer(configsDslContext, + configsDatabaseInitializationTimeoutMs, MoreResources.readResource(DatabaseConstants.CONFIGS_INITIAL_SCHEMA_PATH)); + } + + @Singleton + @Named("jobsDatabaseInitializer") + public DatabaseInitializer jobsDatabaseInitializer(@Named("jobs") final DSLContext jobsDslContext, + @Value("${airbyte.flyway.jobs.initialization-timeout-ms}") final Long jobsDatabaseInitializationTimeoutMs) + throws IOException { + return DatabaseCheckFactory.createJobsDatabaseInitializer(jobsDslContext, + jobsDatabaseInitializationTimeoutMs, MoreResources.readResource(DatabaseConstants.JOBS_INITIAL_SCHEMA_PATH)); + } + + @Singleton + @Named("jobsDatabaseAvailabilityCheck") + public JobsDatabaseAvailabilityCheck jobsDatabaseAvailabilityCheck(@Named("jobs") final DSLContext dslContext) { + return new JobsDatabaseAvailabilityCheck(dslContext, DatabaseConstants.DEFAULT_ASSERT_DATABASE_TIMEOUT_MS); + } + + @Singleton + @Named("configsDatabaseMigrator") + public DatabaseMigrator configsDatabaseMigrator(@Named("configDatabase") final Database configDatabase, + @Named("configFlyway") final Flyway configFlyway) { + return new ConfigsDatabaseMigrator(configDatabase, configFlyway); + } + + @Singleton + @Named("jobsDatabaseMigrator") + public DatabaseMigrator jobsDatabaseMigrator(@Named("jobsDatabase") final Database jobsDatabase, + @Named("jobsFlyway") final Flyway jobsFlyway) { + return new JobsDatabaseMigrator(jobsDatabase, jobsFlyway); + } + +} diff --git a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/config/SecretPersistenceBeanFactory.java b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/config/SecretPersistenceBeanFactory.java new file mode 100644 index 0000000000000..aef85a4599eac --- /dev/null +++ b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/config/SecretPersistenceBeanFactory.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.bootloader.config; + +import io.airbyte.config.persistence.split_secrets.AWSSecretManagerPersistence; +import io.airbyte.config.persistence.split_secrets.GoogleSecretManagerPersistence; +import io.airbyte.config.persistence.split_secrets.LocalTestingSecretPersistence; +import io.airbyte.config.persistence.split_secrets.RealSecretsHydrator; +import io.airbyte.config.persistence.split_secrets.SecretPersistence; +import io.airbyte.config.persistence.split_secrets.SecretsHydrator; +import io.airbyte.config.persistence.split_secrets.VaultSecretPersistence; +import io.airbyte.db.Database; +import io.micronaut.context.annotation.Factory; +import io.micronaut.context.annotation.Requires; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Named; +import jakarta.inject.Singleton; + +/** + * Micronaut bean factory for secret persistence-related singletons. + */ +@Factory +@SuppressWarnings("PMD.AvoidDuplicateLiterals") +public class SecretPersistenceBeanFactory { + + @Singleton + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^(?!testing_config_db_table).*") + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^(?!google_secret_manager).*") + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^(?!vault).*") + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^(?!aws_secret_manager).*") + @Named("longLivedSecretPersistence") + public SecretPersistence defaultSecretPersistence(@Named("configDatabase") final Database configDatabase) { + return localTestingSecretPersistence(configDatabase); + } + + @Singleton + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^testing_config_db_table$") + @Named("longLivedSecretPersistence") + public SecretPersistence localTestingSecretPersistence(@Named("configDatabase") final Database configDatabase) { + return new LocalTestingSecretPersistence(configDatabase); + } + + @Singleton + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^testing_config_db_table$") + @Named("ephemeralSecretPersistence") + public SecretPersistence ephemeralLocalTestingSecretPersistence(@Named("configDatabase") final Database configDatabase) { + return new LocalTestingSecretPersistence(configDatabase); + } + + @Singleton + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^google_secret_manager$") + @Named("longLivedSecretPersistence") + public SecretPersistence googleSecretPersistence(@Value("${airbyte.secret.store.gcp.credentials}") final String credentials, + @Value("${airbyte.secret.store.gcp.project-id}") final String projectId) { + return GoogleSecretManagerPersistence.getLongLived(projectId, credentials); + } + + @Singleton + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^google_secret_manager$") + @Named("ephemeralSecretPersistence") + public SecretPersistence ephemeralGoogleSecretPersistence(@Value("${airbyte.secret.store.gcp.credentials}") final String credentials, + @Value("${airbyte.secret.store.gcp.project-id}") final String projectId) { + return GoogleSecretManagerPersistence.getEphemeral(projectId, credentials); + } + + @Singleton + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^vault$") + @Named("longLivedSecretPersistence") + public SecretPersistence vaultSecretPersistence(@Value("${airbyte.secret.store.vault.address}") final String address, + @Value("${airbyte.secret.store.vault.prefix}") final String prefix, + @Value("${airbyte.secret.store.vault.token}") final String token) { + return new VaultSecretPersistence(address, prefix, token); + } + + @Singleton + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^vault$") + @Named("ephemeralSecretPersistence") + public SecretPersistence ephemeralVaultSecretPersistence(@Value("${airbyte.secret.store.vault.address}") final String address, + @Value("${airbyte.secret.store.vault.prefix}") final String prefix, + @Value("${airbyte.secret.store.vault.token}") final String token) { + return new VaultSecretPersistence(address, prefix, token); + } + + @Singleton + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^aws_secret_manager$") + @Named("longLivedSecretPersistence") + public SecretPersistence longLivedAwsSecretPersistence(@Value("${airbyte.secret.store.aws.access-key}") final String awsAccessKey, + @Value("${airbyte.secret.store.aws.secret-key}") final String awsSecretKey) { + return new AWSSecretManagerPersistence(awsAccessKey, awsSecretKey); + } + + @Singleton + public SecretsHydrator secretsHydrator(@Named("longLivedSecretPersistence") final SecretPersistence secretPersistence) { + return new RealSecretsHydrator(secretPersistence); + } + +} diff --git a/airbyte-bootloader/src/main/resources/application.yml b/airbyte-bootloader/src/main/resources/application.yml new file mode 100644 index 0000000000000..d68c4b8c18989 --- /dev/null +++ b/airbyte-bootloader/src/main/resources/application.yml @@ -0,0 +1,115 @@ +micronaut: + application: + name: airbyte-bootloader + server: + port: 9002 + +airbyte: + bootloader: + migration-baseline-version: ${BOOTLOADER_MIGRATION_BASELINE_VERSION} + flyway: + configs: + initialization-timeout-ms: ${CONFIGS_DATABASE_INITIALIZATION_TIMEOUT_MS:60000} + jobs: + initialization-timeout-ms: ${JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS:60000} + protocol: + auto-upgrade-connectors: ${AUTO_UPGRADE_CONNECTORS_PROTOCOL:false} + target: + range: + min-version: ${AIRBYTE_PROTOCOL_VERSION_MIN:0.0.0} + max-version: ${AIRBYTE_PROTOCOL_VERSION_MAX:0.3.0} + run-migration-on-startup: ${RUN_DATABASE_MIGRATION_ON_STARTUP:true} + secret: + persistence: ${SECRET_PERSISTENCE:TESTING_CONFIG_DB_TABLE} + store: + aws: + access-key: ${AWS_ACCESS_KEY:} + secret-key: ${AWS_SECRET_ACCESS_KEY:} + gcp: + credentials: ${SECRET_STORE_GCP_CREDENTIALS:} + project-id: ${SECRET_STORE_GCP_PROJECT_ID:} + vault: + address: ${VAULT_ADDRESS:} + prefix: ${VAULT_PREFIX:} + token: ${VAULT_AUTH_TOKEN:} + version: ${AIRBYTE_VERSION} + +datasources: + config: + connection-test-query: SELECT 1 + connection-timeout: 30000 + idle-timeout: 600000 + maximum-pool-size: 5 + minimum-idle: 0 + url: ${DATABASE_URL} + driverClassName: org.postgresql.Driver + username: ${DATABASE_USER} + password: ${DATABASE_PASSWORD} + jobs: + connection-test-query: SELECT 1 + connection-timeout: 30000 + idle-timeout: 600000 + maximum-pool-size: 5 + minimum-idle: 0 + url: ${DATABASE_URL} + driverClassName: org.postgresql.Driver + username: ${DATABASE_USER} + password: ${DATABASE_PASSWORD} + +endpoints: + beans: + enabled: true + sensitive: false + env: + enabled: true + sensitive: false + health: + enabled: true + sensitive: false + info: + enabled: true + sensitive: true + loggers: + enabled: true + sensitive: true + refresh: + enabled: false + sensitive: true + routes: + enabled: true + sensitive: false + threaddump: + enabled: true + sensitive: true + +flyway: + enabled: true + datasources: + config: + enabled: false + locations: + - 'classpath:io/airbyte/db/instance/configs/migrations' + jobs: + enabled: false + locations: + - 'classpath:io/airbyte/db/instance/jobs/migrations' + +jpa: + default: + properties: + hibernate: + show_sql: true + +jooq: + datasources: + config: + jackson-converter-enabled: true + sql-dialect: POSTGRES + jobs: + jackson-converter-enabled: true + sql-dialect: POSTGRES + +logger: + levels: +# Uncomment to help resolve issues with conditional beans +# io.micronaut.context.condition: DEBUG \ No newline at end of file diff --git a/airbyte-bootloader/src/main/resources/micronaut-banner.txt b/airbyte-bootloader/src/main/resources/micronaut-banner.txt new file mode 100644 index 0000000000000..ec2646448f420 --- /dev/null +++ b/airbyte-bootloader/src/main/resources/micronaut-banner.txt @@ -0,0 +1,8 @@ + + ___ _ __ __ + / | (_)____/ /_ __ __/ /____ + / /| | / / ___/ __ \/ / / / __/ _ \ + / ___ |/ / / / /_/ / /_/ / /_/ __/ +/_/ |_/_/_/ /_.___/\__, /\__/\___/ + /____/ + : airbyte-bootloader : \ No newline at end of file diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java deleted file mode 100644 index 8f184141bd2e6..0000000000000 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java +++ /dev/null @@ -1,358 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.bootloader; - -import static io.airbyte.config.Configs.SecretPersistenceType.TESTING_CONFIG_DB_TABLE; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.when; - -import com.fasterxml.jackson.databind.ObjectMapper; -import io.airbyte.commons.features.FeatureFlags; -import io.airbyte.commons.version.AirbyteVersion; -import io.airbyte.commons.version.Version; -import io.airbyte.config.Configs; -import io.airbyte.config.Geography; -import io.airbyte.config.SourceConnection; -import io.airbyte.config.StandardWorkspace; -import io.airbyte.config.init.DefinitionsProvider; -import io.airbyte.config.init.LocalDefinitionsProvider; -import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.config.persistence.SecretsRepositoryReader; -import io.airbyte.config.persistence.SecretsRepositoryWriter; -import io.airbyte.config.persistence.split_secrets.LocalTestingSecretPersistence; -import io.airbyte.config.persistence.split_secrets.RealSecretsHydrator; -import io.airbyte.config.persistence.split_secrets.SecretPersistence; -import io.airbyte.db.factory.DSLContextFactory; -import io.airbyte.db.factory.DataSourceFactory; -import io.airbyte.db.factory.FlywayFactory; -import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; -import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider; -import io.airbyte.db.instance.jobs.JobsDatabaseMigrator; -import io.airbyte.db.instance.jobs.JobsDatabaseTestProvider; -import io.airbyte.persistence.job.DefaultJobPersistence; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.sql.DataSource; -import lombok.val; -import org.flywaydb.core.Flyway; -import org.jooq.SQLDialect; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.testcontainers.containers.PostgreSQLContainer; -import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; -import uk.org.webcompere.systemstubs.jupiter.SystemStub; -import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; - -@SuppressWarnings("PMD.AvoidUsingHardCodedIP") -@ExtendWith(SystemStubsExtension.class) -class BootloaderAppTest { - - private PostgreSQLContainer container; - private DataSource configsDataSource; - private DataSource jobsDataSource; - private static final String DOCKER = "docker"; - private static final String PROTOCOL_VERSION_123 = "1.2.3"; - private static final String PROTOCOL_VERSION_124 = "1.2.4"; - private static final String VERSION_0330_ALPHA = "0.33.0-alpha"; - private static final String VERSION_0320_ALPHA = "0.32.0-alpha"; - private static final String VERSION_0321_ALPHA = "0.32.1-alpha"; - private static final String VERSION_0170_ALPHA = "0.17.0-alpha"; - - @BeforeEach - void setup() { - container = new PostgreSQLContainer<>("postgres:13-alpine") - .withDatabaseName("public") - .withUsername(DOCKER) - .withPassword(DOCKER); - container.start(); - - configsDataSource = - DataSourceFactory.create(container.getUsername(), container.getPassword(), container.getDriverClassName(), container.getJdbcUrl()); - jobsDataSource = - DataSourceFactory.create(container.getUsername(), container.getPassword(), container.getDriverClassName(), container.getJdbcUrl()); - } - - @AfterEach - void cleanup() throws Exception { - closeDataSource(configsDataSource); - closeDataSource(jobsDataSource); - container.stop(); - } - - @SystemStub - private EnvironmentVariables environmentVariables; - - @Test - void testBootloaderAppBlankDb() throws Exception { - val mockedConfigs = mock(Configs.class); - when(mockedConfigs.getConfigDatabaseUrl()).thenReturn(container.getJdbcUrl()); - when(mockedConfigs.getConfigDatabaseUser()).thenReturn(container.getUsername()); - when(mockedConfigs.getConfigDatabasePassword()).thenReturn(container.getPassword()); - when(mockedConfigs.getDatabaseUrl()).thenReturn(container.getJdbcUrl()); - when(mockedConfigs.getDatabaseUser()).thenReturn(container.getUsername()); - when(mockedConfigs.getDatabasePassword()).thenReturn(container.getPassword()); - when(mockedConfigs.getAirbyteVersion()).thenReturn(new AirbyteVersion(VERSION_0330_ALPHA)); - when(mockedConfigs.getAirbyteProtocolVersionMin()).thenReturn(new Version(PROTOCOL_VERSION_123)); - when(mockedConfigs.getAirbyteProtocolVersionMax()).thenReturn(new Version(PROTOCOL_VERSION_124)); - when(mockedConfigs.runDatabaseMigrationOnStartup()).thenReturn(true); - when(mockedConfigs.getConfigsDatabaseInitializationTimeoutMs()).thenReturn(60000L); - when(mockedConfigs.getJobsDatabaseInitializationTimeoutMs()).thenReturn(60000L); - - val mockedFeatureFlags = mock(FeatureFlags.class); - - val mockedSecretMigrator = mock(SecretMigrator.class); - - // Although we are able to inject mocked configs into the Bootloader, a particular migration in the - // configs database - // requires the env var to be set. Flyway prevents injection, so we dynamically set this instead. - environmentVariables.set("DATABASE_USER", DOCKER); - environmentVariables.set("DATABASE_PASSWORD", DOCKER); - environmentVariables.set("DATABASE_URL", container.getJdbcUrl()); - - try (val configsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES); - val jobsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES)) { - - val configsFlyway = createConfigsFlyway(configsDataSource); - val jobsFlyway = createJobsFlyway(jobsDataSource); - - val configDatabase = new ConfigsDatabaseTestProvider(configsDslContext, configsFlyway).create(false); - val jobDatabase = new JobsDatabaseTestProvider(jobsDslContext, jobsFlyway).create(false); - - val bootloader = - new BootloaderApp(mockedConfigs, mockedFeatureFlags, mockedSecretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway, - BootloaderApp.getLocalDefinitionsProvider(), false); - bootloader.load(); - - val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway); - assertEquals("0.40.26.001", jobsMigrator.getLatestMigration().getVersion().getVersion()); - - val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway); - // this line should change with every new migration - // to show that you meant to make a new migration to the prod database - assertEquals("0.40.23.002", configsMigrator.getLatestMigration().getVersion().getVersion()); - - val jobsPersistence = new DefaultJobPersistence(jobDatabase); - assertEquals(VERSION_0330_ALPHA, jobsPersistence.getVersion().get()); - assertEquals(new Version(PROTOCOL_VERSION_123), jobsPersistence.getAirbyteProtocolVersionMin().get()); - assertEquals(new Version(PROTOCOL_VERSION_124), jobsPersistence.getAirbyteProtocolVersionMax().get()); - - assertNotEquals(Optional.empty(), jobsPersistence.getDeployment().get()); - } - } - - @Test - void testBootloaderAppRunSecretMigration() throws Exception { - val mockedConfigs = mock(Configs.class); - when(mockedConfigs.getConfigDatabaseUrl()).thenReturn(container.getJdbcUrl()); - when(mockedConfigs.getConfigDatabaseUser()).thenReturn(container.getUsername()); - when(mockedConfigs.getConfigDatabasePassword()).thenReturn(container.getPassword()); - when(mockedConfigs.getDatabaseUrl()).thenReturn(container.getJdbcUrl()); - when(mockedConfigs.getDatabaseUser()).thenReturn(container.getUsername()); - when(mockedConfigs.getDatabasePassword()).thenReturn(container.getPassword()); - when(mockedConfigs.getAirbyteVersion()).thenReturn(new AirbyteVersion(VERSION_0330_ALPHA)); - when(mockedConfigs.getAirbyteProtocolVersionMin()).thenReturn(new Version(PROTOCOL_VERSION_123)); - when(mockedConfigs.getAirbyteProtocolVersionMax()).thenReturn(new Version(PROTOCOL_VERSION_123)); - when(mockedConfigs.runDatabaseMigrationOnStartup()).thenReturn(true); - when(mockedConfigs.getSecretPersistenceType()).thenReturn(TESTING_CONFIG_DB_TABLE); - when(mockedConfigs.getConfigsDatabaseInitializationTimeoutMs()).thenReturn(60000L); - when(mockedConfigs.getJobsDatabaseInitializationTimeoutMs()).thenReturn(60000L); - - val mockedFeatureFlags = mock(FeatureFlags.class); - - try (val configsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES); - val jobsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES)) { - - val configsFlyway = createConfigsFlyway(configsDataSource); - val jobsFlyway = createJobsFlyway(jobsDataSource); - - val configDatabase = new ConfigsDatabaseTestProvider(configsDslContext, configsFlyway).create(false); - val jobDatabase = new JobsDatabaseTestProvider(jobsDslContext, jobsFlyway).create(false); - - val configRepository = new ConfigRepository(configDatabase); - val jobsPersistence = new DefaultJobPersistence(jobDatabase); - - val secretsPersistence = SecretPersistence.getLongLived(configsDslContext, mockedConfigs); - final LocalTestingSecretPersistence localTestingSecretPersistence = new LocalTestingSecretPersistence(configDatabase); - - val secretsReader = new SecretsRepositoryReader(configRepository, new RealSecretsHydrator(localTestingSecretPersistence)); - val secretsWriter = new SecretsRepositoryWriter(configRepository, secretsPersistence, Optional.empty()); - - val spiedSecretMigrator = - spy(new SecretMigrator(secretsReader, secretsWriter, configRepository, jobsPersistence, secretsPersistence)); - - // Although we are able to inject mocked configs into the Bootloader, a particular migration in the - // configs database requires the env var to be set. Flyway prevents injection, so we dynamically set - // this instead. - environmentVariables.set("DATABASE_USER", DOCKER); - environmentVariables.set("DATABASE_PASSWORD", DOCKER); - environmentVariables.set("DATABASE_URL", container.getJdbcUrl()); - - // Bootstrap the database for the test - val initBootloader = new BootloaderApp(mockedConfigs, mockedFeatureFlags, null, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway, - BootloaderApp.getLocalDefinitionsProvider(), false); - initBootloader.load(); - - final DefinitionsProvider localDefinitions = new LocalDefinitionsProvider(LocalDefinitionsProvider.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); - configRepository.seedActorDefinitions(localDefinitions.getSourceDefinitions(), localDefinitions.getDestinationDefinitions()); - - final String sourceSpecs = """ - { - "account_id": "1234567891234567", - "start_date": "2022-04-01T00:00:00Z", - "access_token": "nonhiddensecret", - "include_deleted": false, - "fetch_thumbnail_images": false - } - - """; - - final ObjectMapper mapper = new ObjectMapper(); - - final UUID workspaceId = UUID.randomUUID(); - configRepository.writeStandardWorkspaceNoSecrets(new StandardWorkspace() - .withWorkspaceId(workspaceId) - .withName("wName") - .withSlug("wSlug") - .withEmail("email@mail.com") - .withTombstone(false) - .withInitialSetupComplete(false) - .withDefaultGeography(Geography.AUTO)); - final UUID sourceId = UUID.randomUUID(); - configRepository.writeSourceConnectionNoSecrets(new SourceConnection() - .withSourceDefinitionId(UUID.fromString("e7778cfc-e97c-4458-9ecb-b4f2bba8946c")) // Facebook Marketing - .withSourceId(sourceId) - .withName("test source") - .withWorkspaceId(workspaceId) - .withTombstone(false) - .withConfiguration(mapper.readTree(sourceSpecs))); - - when(mockedFeatureFlags.forceSecretMigration()).thenReturn(false); - - // Perform secrets migration - var bootloader = - new BootloaderApp(mockedConfigs, mockedFeatureFlags, spiedSecretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway, - BootloaderApp.getLocalDefinitionsProvider(), false); - boolean isMigrated = jobsPersistence.isSecretMigrated(); - - assertFalse(isMigrated); - - bootloader.load(); - verify(spiedSecretMigrator).migrateSecrets(); - - final SourceConnection sourceConnection = configRepository.getSourceConnection(sourceId); - - assertFalse(sourceConnection.getConfiguration().toString().contains("nonhiddensecret")); - assertTrue(sourceConnection.getConfiguration().toString().contains("_secret")); - - isMigrated = jobsPersistence.isSecretMigrated(); - assertTrue(isMigrated); - - // Verify that the migration does not happen if it has already been performed - reset(spiedSecretMigrator); - // We need to re-create the bootloader because it is closing the persistence after running load - bootloader = - new BootloaderApp(mockedConfigs, mockedFeatureFlags, spiedSecretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway, - BootloaderApp.getLocalDefinitionsProvider(), false); - bootloader.load(); - verifyNoInteractions(spiedSecretMigrator); - - // Verify that the migration occurs if the force migration feature flag is enabled - reset(spiedSecretMigrator); - when(mockedFeatureFlags.forceSecretMigration()).thenReturn(true); - // We need to re-create the bootloader because it is closing the persistence after running load - bootloader = - new BootloaderApp(mockedConfigs, mockedFeatureFlags, spiedSecretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway, - BootloaderApp.getLocalDefinitionsProvider(), false); - bootloader.load(); - verify(spiedSecretMigrator).migrateSecrets(); - } - } - - @Test - void testIsLegalUpgradePredicate() { - // starting from no previous version is always legal. - assertTrue(BootloaderApp.isLegalUpgrade(null, new AirbyteVersion("0.17.1-alpha"))); - assertTrue(BootloaderApp.isLegalUpgrade(null, new AirbyteVersion(VERSION_0320_ALPHA))); - assertTrue(BootloaderApp.isLegalUpgrade(null, new AirbyteVersion(VERSION_0321_ALPHA))); - assertTrue(BootloaderApp.isLegalUpgrade(null, new AirbyteVersion("0.33.1-alpha"))); - // starting from a version that is pre-breaking migration cannot go past the breaking migration. - assertTrue(BootloaderApp.isLegalUpgrade(new AirbyteVersion(VERSION_0170_ALPHA), new AirbyteVersion("0.17.1-alpha"))); - assertTrue(BootloaderApp.isLegalUpgrade(new AirbyteVersion(VERSION_0170_ALPHA), new AirbyteVersion("0.18.0-alpha"))); - assertTrue(BootloaderApp.isLegalUpgrade(new AirbyteVersion(VERSION_0170_ALPHA), new AirbyteVersion(VERSION_0320_ALPHA))); - assertFalse(BootloaderApp.isLegalUpgrade(new AirbyteVersion(VERSION_0170_ALPHA), new AirbyteVersion(VERSION_0321_ALPHA))); - assertFalse(BootloaderApp.isLegalUpgrade(new AirbyteVersion(VERSION_0170_ALPHA), new AirbyteVersion(VERSION_0330_ALPHA))); - // any migration starting at the breaking migration or after it can upgrade to anything. - assertTrue(BootloaderApp.isLegalUpgrade(new AirbyteVersion(VERSION_0320_ALPHA), new AirbyteVersion(VERSION_0321_ALPHA))); - assertTrue(BootloaderApp.isLegalUpgrade(new AirbyteVersion(VERSION_0320_ALPHA), new AirbyteVersion(VERSION_0330_ALPHA))); - assertTrue(BootloaderApp.isLegalUpgrade(new AirbyteVersion(VERSION_0321_ALPHA), new AirbyteVersion(VERSION_0321_ALPHA))); - assertTrue(BootloaderApp.isLegalUpgrade(new AirbyteVersion(VERSION_0321_ALPHA), new AirbyteVersion(VERSION_0330_ALPHA))); - assertTrue(BootloaderApp.isLegalUpgrade(new AirbyteVersion(VERSION_0330_ALPHA), new AirbyteVersion("0.33.1-alpha"))); - assertTrue(BootloaderApp.isLegalUpgrade(new AirbyteVersion(VERSION_0330_ALPHA), new AirbyteVersion("0.34.0-alpha"))); - } - - @Test - void testPostLoadExecutionExecutes() throws Exception { - final var testTriggered = new AtomicBoolean(); - val mockedConfigs = mock(Configs.class); - when(mockedConfigs.getConfigDatabaseUrl()).thenReturn(container.getJdbcUrl()); - when(mockedConfigs.getConfigDatabaseUser()).thenReturn(container.getUsername()); - when(mockedConfigs.getConfigDatabasePassword()).thenReturn(container.getPassword()); - when(mockedConfigs.getDatabaseUrl()).thenReturn(container.getJdbcUrl()); - when(mockedConfigs.getDatabaseUser()).thenReturn(container.getUsername()); - when(mockedConfigs.getDatabasePassword()).thenReturn(container.getPassword()); - when(mockedConfigs.getAirbyteVersion()).thenReturn(new AirbyteVersion(VERSION_0330_ALPHA)); - when(mockedConfigs.getAirbyteProtocolVersionMin()).thenReturn(new Version(PROTOCOL_VERSION_123)); - when(mockedConfigs.getAirbyteProtocolVersionMax()).thenReturn(new Version(PROTOCOL_VERSION_123)); - when(mockedConfigs.runDatabaseMigrationOnStartup()).thenReturn(true); - when(mockedConfigs.getConfigsDatabaseInitializationTimeoutMs()).thenReturn(60000L); - when(mockedConfigs.getJobsDatabaseInitializationTimeoutMs()).thenReturn(60000L); - - val mockedFeatureFlags = mock(FeatureFlags.class); - - val mockedSecretMigrator = mock(SecretMigrator.class); - - try (val configsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES); - val jobsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES)) { - - val configsFlyway = createConfigsFlyway(configsDataSource); - val jobsFlyway = createJobsFlyway(jobsDataSource); - - new ConfigsDatabaseTestProvider(configsDslContext, configsFlyway).create(false); - new JobsDatabaseTestProvider(jobsDslContext, jobsFlyway).create(false); - - new BootloaderApp(mockedConfigs, () -> testTriggered.set(true), mockedFeatureFlags, mockedSecretMigrator, configsDslContext, jobsDslContext, - configsFlyway, jobsFlyway) - .load(); - - assertTrue(testTriggered.get()); - } - } - - private Flyway createConfigsFlyway(final DataSource dataSource) { - return FlywayFactory.create(dataSource, getClass().getName(), ConfigsDatabaseMigrator.DB_IDENTIFIER, - ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION); - } - - private Flyway createJobsFlyway(final DataSource dataSource) { - return FlywayFactory.create(dataSource, getClass().getName(), JobsDatabaseMigrator.DB_IDENTIFIER, - JobsDatabaseMigrator.MIGRATION_FILE_LOCATION); - } - - private void closeDataSource(final DataSource dataSource) throws Exception { - DataSourceFactory.close(dataSource); - } - -} diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java new file mode 100644 index 0000000000000..f00660b2bb7ef --- /dev/null +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java @@ -0,0 +1,400 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.bootloader; + +import static io.airbyte.config.Configs.SecretPersistenceType.TESTING_CONFIG_DB_TABLE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.commons.features.FeatureFlags; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.commons.version.AirbyteProtocolVersionRange; +import io.airbyte.commons.version.AirbyteVersion; +import io.airbyte.commons.version.Version; +import io.airbyte.config.Configs; +import io.airbyte.config.Geography; +import io.airbyte.config.SourceConnection; +import io.airbyte.config.StandardWorkspace; +import io.airbyte.config.init.DefinitionsProvider; +import io.airbyte.config.init.LocalDefinitionsProvider; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.config.persistence.SecretsRepositoryReader; +import io.airbyte.config.persistence.SecretsRepositoryWriter; +import io.airbyte.config.persistence.split_secrets.LocalTestingSecretPersistence; +import io.airbyte.config.persistence.split_secrets.RealSecretsHydrator; +import io.airbyte.config.persistence.split_secrets.SecretPersistence; +import io.airbyte.db.factory.DSLContextFactory; +import io.airbyte.db.factory.DataSourceFactory; +import io.airbyte.db.factory.DatabaseCheckFactory; +import io.airbyte.db.factory.FlywayFactory; +import io.airbyte.db.instance.DatabaseConstants; +import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; +import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider; +import io.airbyte.db.instance.jobs.JobsDatabaseMigrator; +import io.airbyte.db.instance.jobs.JobsDatabaseTestProvider; +import io.airbyte.persistence.job.DefaultJobPersistence; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.sql.DataSource; +import lombok.val; +import org.flywaydb.core.Flyway; +import org.jooq.SQLDialect; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.containers.PostgreSQLContainer; +import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; +import uk.org.webcompere.systemstubs.jupiter.SystemStub; +import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; + +/** + * Test suite for the {@link Bootloader} class. + */ +@SuppressWarnings("PMD.AvoidUsingHardCodedIP") +@ExtendWith(SystemStubsExtension.class) +class BootloaderTest { + + private PostgreSQLContainer container; + private DataSource configsDataSource; + private DataSource jobsDataSource; + private static final String DOCKER = "docker"; + private static final String PROTOCOL_VERSION_123 = "1.2.3"; + private static final String PROTOCOL_VERSION_124 = "1.2.4"; + private static final String VERSION_0330_ALPHA = "0.33.0-alpha"; + private static final String VERSION_0320_ALPHA = "0.32.0-alpha"; + private static final String VERSION_0321_ALPHA = "0.32.1-alpha"; + private static final String VERSION_0170_ALPHA = "0.17.0-alpha"; + + @BeforeEach + void setup() { + container = new PostgreSQLContainer<>("postgres:13-alpine") + .withDatabaseName("public") + .withUsername(DOCKER) + .withPassword(DOCKER); + container.start(); + + configsDataSource = + DataSourceFactory.create(container.getUsername(), container.getPassword(), container.getDriverClassName(), container.getJdbcUrl()); + jobsDataSource = + DataSourceFactory.create(container.getUsername(), container.getPassword(), container.getDriverClassName(), container.getJdbcUrl()); + } + + @AfterEach + void cleanup() throws Exception { + closeDataSource(configsDataSource); + closeDataSource(jobsDataSource); + container.stop(); + } + + @SystemStub + private EnvironmentVariables environmentVariables; + + @Test + void testBootloaderAppBlankDb() throws Exception { + val currentAirbyteVersion = new AirbyteVersion(VERSION_0330_ALPHA); + val airbyteProtocolRange = new AirbyteProtocolVersionRange(new Version(PROTOCOL_VERSION_123), new Version(PROTOCOL_VERSION_124)); + val mockedFeatureFlags = mock(FeatureFlags.class); + val runMigrationOnStartup = true; + val mockedSecretMigrator = mock(SecretMigrator.class); + + try (val configsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES); + val jobsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES)) { + + val configsFlyway = createConfigsFlyway(configsDataSource); + val jobsFlyway = createJobsFlyway(jobsDataSource); + + val configDatabase = new ConfigsDatabaseTestProvider(configsDslContext, configsFlyway).create(false); + val jobDatabase = new JobsDatabaseTestProvider(jobsDslContext, jobsFlyway).create(false); + val configRepository = new ConfigRepository(configDatabase); + val configsDatabaseInitializationTimeoutMs = TimeUnit.SECONDS.toMillis(60L); + val configDatabaseInitializer = DatabaseCheckFactory.createConfigsDatabaseInitializer(configsDslContext, + configsDatabaseInitializationTimeoutMs, MoreResources.readResource(DatabaseConstants.CONFIGS_INITIAL_SCHEMA_PATH)); + val configsDatabaseMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway); + final Optional definitionsProvider = + Optional.of(new LocalDefinitionsProvider(LocalDefinitionsProvider.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS)); + val jobsDatabaseInitializationTimeoutMs = TimeUnit.SECONDS.toMillis(60L); + val jobsDatabaseInitializer = DatabaseCheckFactory.createJobsDatabaseInitializer(jobsDslContext, + jobsDatabaseInitializationTimeoutMs, MoreResources.readResource(DatabaseConstants.JOBS_INITIAL_SCHEMA_PATH)); + val jobsDatabaseMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway); + val jobsPersistence = new DefaultJobPersistence(jobDatabase); + val protocolVersionChecker = new ProtocolVersionChecker(jobsPersistence, airbyteProtocolRange, configRepository, definitionsProvider); + + val bootloader = + new Bootloader(false, configRepository, configDatabaseInitializer, configsDatabaseMigrator, currentAirbyteVersion, + definitionsProvider, mockedFeatureFlags, jobsDatabaseInitializer, jobsDatabaseMigrator, jobsPersistence, protocolVersionChecker, + runMigrationOnStartup, mockedSecretMigrator); + bootloader.afterInitialization(); + bootloader.load(); + + val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway); + assertEquals("0.40.26.001", jobsMigrator.getLatestMigration().getVersion().getVersion()); + + val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway); + // this line should change with every new migration + // to show that you meant to make a new migration to the prod database + assertEquals("0.40.23.002", configsMigrator.getLatestMigration().getVersion().getVersion()); + + assertEquals(VERSION_0330_ALPHA, jobsPersistence.getVersion().get()); + assertEquals(new Version(PROTOCOL_VERSION_123), jobsPersistence.getAirbyteProtocolVersionMin().get()); + assertEquals(new Version(PROTOCOL_VERSION_124), jobsPersistence.getAirbyteProtocolVersionMax().get()); + + assertNotEquals(Optional.empty(), jobsPersistence.getDeployment()); + } + } + + @Test + void testBootloaderAppRunSecretMigration() throws Exception { + val mockedConfigs = mock(Configs.class); + when(mockedConfigs.getSecretPersistenceType()).thenReturn(TESTING_CONFIG_DB_TABLE); + + val currentAirbyteVersion = new AirbyteVersion(VERSION_0330_ALPHA); + val airbyteProtocolRange = new AirbyteProtocolVersionRange(new Version(PROTOCOL_VERSION_123), new Version(PROTOCOL_VERSION_124)); + val mockedFeatureFlags = mock(FeatureFlags.class); + val runMigrationOnStartup = true; + + try (val configsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES); + val jobsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES)) { + + val configsFlyway = createConfigsFlyway(configsDataSource); + val jobsFlyway = createJobsFlyway(jobsDataSource); + + val configDatabase = new ConfigsDatabaseTestProvider(configsDslContext, configsFlyway).create(false); + val jobDatabase = new JobsDatabaseTestProvider(jobsDslContext, jobsFlyway).create(false); + val configRepository = new ConfigRepository(configDatabase); + val configsDatabaseInitializationTimeoutMs = TimeUnit.SECONDS.toMillis(60L); + val configDatabaseInitializer = DatabaseCheckFactory.createConfigsDatabaseInitializer(configsDslContext, + configsDatabaseInitializationTimeoutMs, MoreResources.readResource(DatabaseConstants.CONFIGS_INITIAL_SCHEMA_PATH)); + val configsDatabaseMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway); + final Optional definitionsProvider = + Optional.of(new LocalDefinitionsProvider(LocalDefinitionsProvider.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS)); + val jobsDatabaseInitializationTimeoutMs = TimeUnit.SECONDS.toMillis(60L); + val jobsDatabaseInitializer = DatabaseCheckFactory.createJobsDatabaseInitializer(jobsDslContext, + jobsDatabaseInitializationTimeoutMs, MoreResources.readResource(DatabaseConstants.JOBS_INITIAL_SCHEMA_PATH)); + val jobsDatabaseMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway); + val jobsPersistence = new DefaultJobPersistence(jobDatabase); + val protocolVersionChecker = new ProtocolVersionChecker(jobsPersistence, airbyteProtocolRange, configRepository, definitionsProvider); + + val secretsPersistence = SecretPersistence.getLongLived(configsDslContext, mockedConfigs); + val localTestingSecretPersistence = new LocalTestingSecretPersistence(configDatabase); + + val secretsReader = new SecretsRepositoryReader(configRepository, new RealSecretsHydrator(localTestingSecretPersistence)); + val secretsWriter = new SecretsRepositoryWriter(configRepository, secretsPersistence, Optional.empty()); + + val spiedSecretMigrator = + spy(new SecretMigrator(secretsReader, secretsWriter, configRepository, jobsPersistence, secretsPersistence)); + + // Bootstrap the database for the test + val initBootloader = + new Bootloader(false, configRepository, configDatabaseInitializer, configsDatabaseMigrator, currentAirbyteVersion, + definitionsProvider, mockedFeatureFlags, jobsDatabaseInitializer, jobsDatabaseMigrator, jobsPersistence, protocolVersionChecker, + runMigrationOnStartup, null); + initBootloader.afterInitialization(); + initBootloader.load(); + + final DefinitionsProvider localDefinitions = new LocalDefinitionsProvider(LocalDefinitionsProvider.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); + configRepository.seedActorDefinitions(localDefinitions.getSourceDefinitions(), localDefinitions.getDestinationDefinitions()); + + final String sourceSpecs = """ + { + "account_id": "1234567891234567", + "start_date": "2022-04-01T00:00:00Z", + "access_token": "nonhiddensecret", + "include_deleted": false, + "fetch_thumbnail_images": false + } + + """; + + final ObjectMapper mapper = new ObjectMapper(); + + final UUID workspaceId = UUID.randomUUID(); + configRepository.writeStandardWorkspaceNoSecrets(new StandardWorkspace() + .withWorkspaceId(workspaceId) + .withName("wName") + .withSlug("wSlug") + .withEmail("email@mail.com") + .withTombstone(false) + .withInitialSetupComplete(false) + .withDefaultGeography(Geography.AUTO)); + final UUID sourceId = UUID.randomUUID(); + configRepository.writeSourceConnectionNoSecrets(new SourceConnection() + .withSourceDefinitionId(UUID.fromString("e7778cfc-e97c-4458-9ecb-b4f2bba8946c")) // Facebook Marketing + .withSourceId(sourceId) + .withName("test source") + .withWorkspaceId(workspaceId) + .withTombstone(false) + .withConfiguration(mapper.readTree(sourceSpecs))); + + when(mockedFeatureFlags.forceSecretMigration()).thenReturn(false); + + // Perform secrets migration + var bootloader = + new Bootloader(false, configRepository, configDatabaseInitializer, configsDatabaseMigrator, currentAirbyteVersion, + definitionsProvider, mockedFeatureFlags, jobsDatabaseInitializer, jobsDatabaseMigrator, jobsPersistence, protocolVersionChecker, + runMigrationOnStartup, spiedSecretMigrator); + boolean isMigrated = jobsPersistence.isSecretMigrated(); + + assertFalse(isMigrated); + + bootloader.afterInitialization(); + bootloader.load(); + verify(spiedSecretMigrator).migrateSecrets(); + + final SourceConnection sourceConnection = configRepository.getSourceConnection(sourceId); + + assertFalse(sourceConnection.getConfiguration().toString().contains("nonhiddensecret")); + assertTrue(sourceConnection.getConfiguration().toString().contains("_secret")); + + isMigrated = jobsPersistence.isSecretMigrated(); + assertTrue(isMigrated); + + // Verify that the migration does not happen if it has already been performed + reset(spiedSecretMigrator); + // We need to re-create the bootloader because it is closing the persistence after running load + bootloader = + new Bootloader(false, configRepository, configDatabaseInitializer, configsDatabaseMigrator, currentAirbyteVersion, + definitionsProvider, mockedFeatureFlags, jobsDatabaseInitializer, jobsDatabaseMigrator, jobsPersistence, protocolVersionChecker, + runMigrationOnStartup, spiedSecretMigrator); + bootloader.afterInitialization(); + bootloader.load(); + verifyNoInteractions(spiedSecretMigrator); + + // Verify that the migration occurs if the force migration feature flag is enabled + reset(spiedSecretMigrator); + when(mockedFeatureFlags.forceSecretMigration()).thenReturn(true); + // We need to re-create the bootloader because it is closing the persistence after running load + bootloader = + new Bootloader(false, configRepository, configDatabaseInitializer, configsDatabaseMigrator, currentAirbyteVersion, + definitionsProvider, mockedFeatureFlags, jobsDatabaseInitializer, jobsDatabaseMigrator, jobsPersistence, protocolVersionChecker, + runMigrationOnStartup, spiedSecretMigrator); + bootloader.afterInitialization(); + bootloader.load(); + verify(spiedSecretMigrator).migrateSecrets(); + } + } + + // + @Test + void testIsLegalUpgradePredicate() throws Exception { + val currentAirbyteVersion = new AirbyteVersion(VERSION_0330_ALPHA); + val airbyteProtocolRange = new AirbyteProtocolVersionRange(new Version(PROTOCOL_VERSION_123), new Version(PROTOCOL_VERSION_124)); + val mockedFeatureFlags = mock(FeatureFlags.class); + val runMigrationOnStartup = true; + val mockedSecretMigrator = mock(SecretMigrator.class); + + try (val configsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES); + val jobsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES)) { + + val configsFlyway = createConfigsFlyway(configsDataSource); + val jobsFlyway = createJobsFlyway(jobsDataSource); + + val configDatabase = new ConfigsDatabaseTestProvider(configsDslContext, configsFlyway).create(false); + val jobDatabase = new JobsDatabaseTestProvider(jobsDslContext, jobsFlyway).create(false); + val configRepository = new ConfigRepository(configDatabase); + val configsDatabaseInitializationTimeoutMs = TimeUnit.SECONDS.toMillis(60L); + val configDatabaseInitializer = DatabaseCheckFactory.createConfigsDatabaseInitializer(configsDslContext, + configsDatabaseInitializationTimeoutMs, MoreResources.readResource(DatabaseConstants.CONFIGS_INITIAL_SCHEMA_PATH)); + val configsDatabaseMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway); + final Optional definitionsProvider = Optional.of( + new LocalDefinitionsProvider(LocalDefinitionsProvider.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS)); + val jobsDatabaseInitializationTimeoutMs = TimeUnit.SECONDS.toMillis(60L); + val jobsDatabaseInitializer = DatabaseCheckFactory.createJobsDatabaseInitializer(jobsDslContext, + jobsDatabaseInitializationTimeoutMs, MoreResources.readResource(DatabaseConstants.JOBS_INITIAL_SCHEMA_PATH)); + val jobsDatabaseMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway); + val jobsPersistence = new DefaultJobPersistence(jobDatabase); + val protocolVersionChecker = new ProtocolVersionChecker(jobsPersistence, airbyteProtocolRange, configRepository, definitionsProvider); + + val bootloader = + new Bootloader(false, configRepository, configDatabaseInitializer, configsDatabaseMigrator, currentAirbyteVersion, + definitionsProvider, mockedFeatureFlags, jobsDatabaseInitializer, jobsDatabaseMigrator, jobsPersistence, protocolVersionChecker, + runMigrationOnStartup, mockedSecretMigrator); + + // starting from no previous version is always legal. + assertTrue(bootloader.isLegalUpgrade(null, new AirbyteVersion("0.17.1-alpha"))); + assertTrue(bootloader.isLegalUpgrade(null, new AirbyteVersion(VERSION_0320_ALPHA))); + assertTrue(bootloader.isLegalUpgrade(null, new AirbyteVersion(VERSION_0321_ALPHA))); + assertTrue(bootloader.isLegalUpgrade(null, new AirbyteVersion("0.33.1-alpha"))); + // starting from a version that is pre-breaking migration cannot go past the breaking migration. + assertTrue(bootloader.isLegalUpgrade(new AirbyteVersion(VERSION_0170_ALPHA), new AirbyteVersion("0.17.1-alpha"))); + assertTrue(bootloader.isLegalUpgrade(new AirbyteVersion(VERSION_0170_ALPHA), new AirbyteVersion("0.18.0-alpha"))); + assertTrue(bootloader.isLegalUpgrade(new AirbyteVersion(VERSION_0170_ALPHA), new AirbyteVersion(VERSION_0320_ALPHA))); + assertFalse(bootloader.isLegalUpgrade(new AirbyteVersion(VERSION_0170_ALPHA), new AirbyteVersion(VERSION_0321_ALPHA))); + assertFalse(bootloader.isLegalUpgrade(new AirbyteVersion(VERSION_0170_ALPHA), new AirbyteVersion(VERSION_0330_ALPHA))); + // any migration starting at the breaking migration or after it can upgrade to anything. + assertTrue(bootloader.isLegalUpgrade(new AirbyteVersion(VERSION_0320_ALPHA), new AirbyteVersion(VERSION_0321_ALPHA))); + assertTrue(bootloader.isLegalUpgrade(new AirbyteVersion(VERSION_0320_ALPHA), new AirbyteVersion(VERSION_0330_ALPHA))); + assertTrue(bootloader.isLegalUpgrade(new AirbyteVersion(VERSION_0321_ALPHA), new AirbyteVersion(VERSION_0321_ALPHA))); + assertTrue(bootloader.isLegalUpgrade(new AirbyteVersion(VERSION_0321_ALPHA), new AirbyteVersion(VERSION_0330_ALPHA))); + assertTrue(bootloader.isLegalUpgrade(new AirbyteVersion(VERSION_0330_ALPHA), new AirbyteVersion("0.33.1-alpha"))); + assertTrue(bootloader.isLegalUpgrade(new AirbyteVersion(VERSION_0330_ALPHA), new AirbyteVersion("0.34.0-alpha"))); + } + } + + @Test + void testPostLoadExecutionExecutes() throws Exception { + final var testTriggered = new AtomicBoolean(); + val currentAirbyteVersion = new AirbyteVersion(VERSION_0330_ALPHA); + val airbyteProtocolRange = new AirbyteProtocolVersionRange(new Version(PROTOCOL_VERSION_123), new Version(PROTOCOL_VERSION_124)); + val mockedFeatureFlags = mock(FeatureFlags.class); + val runMigrationOnStartup = true; + val mockedSecretMigrator = mock(SecretMigrator.class); + + try (val configsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES); + val jobsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES)) { + + val configsFlyway = createConfigsFlyway(configsDataSource); + val jobsFlyway = createJobsFlyway(jobsDataSource); + + val configDatabase = new ConfigsDatabaseTestProvider(configsDslContext, configsFlyway).create(false); + val jobDatabase = new JobsDatabaseTestProvider(jobsDslContext, jobsFlyway).create(false); + val configRepository = new ConfigRepository(configDatabase); + val configsDatabaseInitializationTimeoutMs = TimeUnit.SECONDS.toMillis(60L); + val configDatabaseInitializer = DatabaseCheckFactory.createConfigsDatabaseInitializer(configsDslContext, + configsDatabaseInitializationTimeoutMs, MoreResources.readResource(DatabaseConstants.CONFIGS_INITIAL_SCHEMA_PATH)); + val configsDatabaseMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway); + final Optional definitionsProvider = + Optional.of(new LocalDefinitionsProvider(LocalDefinitionsProvider.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS)); + val jobsDatabaseInitializationTimeoutMs = TimeUnit.SECONDS.toMillis(60L); + val jobsDatabaseInitializer = DatabaseCheckFactory.createJobsDatabaseInitializer(jobsDslContext, + jobsDatabaseInitializationTimeoutMs, MoreResources.readResource(DatabaseConstants.JOBS_INITIAL_SCHEMA_PATH)); + val jobsDatabaseMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway); + val jobsPersistence = new DefaultJobPersistence(jobDatabase); + val protocolVersionChecker = new ProtocolVersionChecker(jobsPersistence, airbyteProtocolRange, configRepository, definitionsProvider); + val bootloader = + new Bootloader(false, configRepository, configDatabaseInitializer, configsDatabaseMigrator, currentAirbyteVersion, + definitionsProvider, mockedFeatureFlags, jobsDatabaseInitializer, jobsDatabaseMigrator, jobsPersistence, protocolVersionChecker, + runMigrationOnStartup, mockedSecretMigrator, () -> testTriggered.set(true)); + bootloader.load(); + assertTrue(testTriggered.get()); + } + } + + private Flyway createConfigsFlyway(final DataSource dataSource) { + return FlywayFactory.create(dataSource, getClass().getName(), ConfigsDatabaseMigrator.DB_IDENTIFIER, + ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION); + } + + private Flyway createJobsFlyway(final DataSource dataSource) { + return FlywayFactory.create(dataSource, getClass().getName(), JobsDatabaseMigrator.DB_IDENTIFIER, + JobsDatabaseMigrator.MIGRATION_FILE_LOCATION); + } + + private void closeDataSource(final DataSource dataSource) throws Exception { + DataSourceFactory.close(dataSource); + } + +} diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/ProtocolVersionCheckerTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/ProtocolVersionCheckerTest.java index 59f290a480cf4..edc0a0896fb67 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/ProtocolVersionCheckerTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/ProtocolVersionCheckerTest.java @@ -5,13 +5,13 @@ package io.airbyte.bootloader; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import io.airbyte.commons.version.AirbyteProtocolVersionRange; import io.airbyte.commons.version.Version; import io.airbyte.config.ActorType; -import io.airbyte.config.Configs; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.init.DefinitionsProvider; @@ -32,11 +32,9 @@ class ProtocolVersionCheckerTest { - Configs configs; ConfigRepository configRepository; DefinitionsProvider definitionsProvider; JobPersistence jobPersistence; - ProtocolVersionChecker protocolVersionChecker; final Version V0_0_0 = new Version("0.0.0"); final Version V1_0_0 = new Version("1.0.0"); @@ -44,11 +42,9 @@ class ProtocolVersionCheckerTest { @BeforeEach void beforeEach() throws IOException { - configs = mock(Configs.class); configRepository = mock(ConfigRepository.class); definitionsProvider = mock(DefinitionsProvider.class); jobPersistence = mock(JobPersistence.class); - protocolVersionChecker = new ProtocolVersionChecker(jobPersistence, configs, configRepository, Optional.of(definitionsProvider)); when(jobPersistence.getVersion()).thenReturn(Optional.of("1.2.3")); } @@ -56,17 +52,23 @@ void beforeEach() throws IOException { @ParameterizedTest @ValueSource(booleans = {true, false}) void testFirstInstallCheck(final boolean supportAutoUpgrade) throws IOException { + final AirbyteProtocolVersionRange expectedRange = new AirbyteProtocolVersionRange(V0_0_0, V1_0_0); when(jobPersistence.getVersion()).thenReturn(Optional.empty()); - setTargetProtocolRangeRange(V0_0_0, V1_0_0); - - assertEquals(Optional.of(new AirbyteProtocolVersionRange(V0_0_0, V1_0_0)), protocolVersionChecker.validate(supportAutoUpgrade)); + final ProtocolVersionChecker protocolVersionChecker = + new ProtocolVersionChecker(jobPersistence, expectedRange, configRepository, Optional.of(definitionsProvider)); + final Optional supportedRange = protocolVersionChecker.validate(supportAutoUpgrade); + assertTrue(supportedRange.isPresent()); + assertEquals(expectedRange.max(), supportedRange.get().max()); + assertEquals(expectedRange.min(), supportedRange.get().min()); } @Test void testGetTargetRange() throws IOException { - setTargetProtocolRangeRange(V1_0_0, V2_0_0); - - assertEquals(new AirbyteProtocolVersionRange(V1_0_0, V2_0_0), protocolVersionChecker.getTargetProtocolVersionRange()); + final AirbyteProtocolVersionRange expectedRange = new AirbyteProtocolVersionRange(V1_0_0, V2_0_0); + final ProtocolVersionChecker protocolVersionChecker = + new ProtocolVersionChecker(jobPersistence, expectedRange, configRepository, Optional.of(definitionsProvider)); + assertEquals(expectedRange.max(), protocolVersionChecker.getTargetProtocolVersionRange().max()); + assertEquals(expectedRange.min(), protocolVersionChecker.getTargetProtocolVersionRange().min()); } @Test @@ -87,6 +89,8 @@ void testRetrievingCurrentConflicts() throws IOException { dest2, Map.entry(ActorType.DESTINATION, V0_0_0)); when(configRepository.getActorDefinitionToProtocolVersionMap()).thenReturn(initialActorDefinitions); + final ProtocolVersionChecker protocolVersionChecker = + new ProtocolVersionChecker(jobPersistence, targetRange, configRepository, Optional.of(definitionsProvider)); final Map> conflicts = protocolVersionChecker.getConflictingActorDefinitions(targetRange); final Map> expectedConflicts = Map.of( @@ -107,6 +111,8 @@ void testRetrievingCurrentConflictsWhenNoConflicts() throws IOException { dest1, Map.entry(ActorType.DESTINATION, V1_0_0)); when(configRepository.getActorDefinitionToProtocolVersionMap()).thenReturn(initialActorDefinitions); + final ProtocolVersionChecker protocolVersionChecker = + new ProtocolVersionChecker(jobPersistence, targetRange, configRepository, Optional.of(definitionsProvider)); final Map> conflicts = protocolVersionChecker.getConflictingActorDefinitions(targetRange); assertEquals(Map.of(), conflicts); @@ -127,6 +133,8 @@ void testProjectRemainingSourceConflicts() { Map.entry(upgradedSource, V1_0_0), Map.entry(notChangedSource, V0_0_0))); + final ProtocolVersionChecker protocolVersionChecker = + new ProtocolVersionChecker(jobPersistence, targetRange, configRepository, Optional.of(definitionsProvider)); final Set actualConflicts = protocolVersionChecker.projectRemainingConflictsAfterConnectorUpgrades(targetRange, initialConflicts, ActorType.SOURCE); @@ -148,6 +156,8 @@ void testProjectRemainingDestinationConflicts() { Map.entry(dest2, V1_0_0), Map.entry(dest3, V2_0_0))); + final ProtocolVersionChecker protocolVersionChecker = + new ProtocolVersionChecker(jobPersistence, targetRange, configRepository, Optional.of(definitionsProvider)); final Set actualConflicts = protocolVersionChecker.projectRemainingConflictsAfterConnectorUpgrades(targetRange, initialConflicts, ActorType.DESTINATION); @@ -158,18 +168,21 @@ void testProjectRemainingDestinationConflicts() { @ParameterizedTest @ValueSource(booleans = {true, false}) void testValidateSameRange(final boolean supportAutoUpgrade) throws Exception { - setCurrentProtocolRangeRange(V0_0_0, V2_0_0); - setTargetProtocolRangeRange(V0_0_0, V2_0_0); - - final Optional range = protocolVersionChecker.validate(supportAutoUpgrade); - assertEquals(Optional.of(new AirbyteProtocolVersionRange(V0_0_0, V2_0_0)), range); + final AirbyteProtocolVersionRange expectedRange = new AirbyteProtocolVersionRange(V0_0_0, V2_0_0); + setCurrentProtocolRangeRange(expectedRange.min(), expectedRange.max()); + final ProtocolVersionChecker protocolVersionChecker = + new ProtocolVersionChecker(jobPersistence, expectedRange, configRepository, Optional.of(definitionsProvider)); + + final Optional supportedRange = protocolVersionChecker.validate(supportAutoUpgrade); + assertTrue(supportedRange.isPresent()); + assertEquals(expectedRange.max(), supportedRange.get().max()); + assertEquals(expectedRange.min(), supportedRange.get().min()); } @ParameterizedTest @ValueSource(booleans = {true, false}) void testValidateAllConnectorsAreUpgraded(final boolean supportAutoUpgrade) throws Exception { setCurrentProtocolRangeRange(V0_0_0, V1_0_0); - setTargetProtocolRangeRange(V1_0_0, V2_0_0); final UUID source1 = UUID.randomUUID(); final UUID source2 = UUID.randomUUID(); @@ -178,6 +191,7 @@ void testValidateAllConnectorsAreUpgraded(final boolean supportAutoUpgrade) thro final UUID dest1 = UUID.randomUUID(); final UUID dest2 = UUID.randomUUID(); final UUID dest3 = UUID.randomUUID(); + final AirbyteProtocolVersionRange expectedVersionRange = new AirbyteProtocolVersionRange(V1_0_0, V2_0_0); final Map> initialActorDefinitions = Map.of( source1, Map.entry(ActorType.SOURCE, V0_0_0), @@ -199,13 +213,17 @@ void testValidateAllConnectorsAreUpgraded(final boolean supportAutoUpgrade) thro Map.entry(dest2, V1_0_0), Map.entry(dest3, V2_0_0))); + final ProtocolVersionChecker protocolVersionChecker = + new ProtocolVersionChecker(jobPersistence, expectedVersionRange, configRepository, Optional.of(definitionsProvider)); final Optional actualRange = protocolVersionChecker.validate(supportAutoUpgrade); // Without auto upgrade, we will fail the validation because it would require connector automatic // actor definition // upgrade for used sources/destinations. if (supportAutoUpgrade) { - assertEquals(Optional.of(new AirbyteProtocolVersionRange(V1_0_0, V2_0_0)), actualRange); + assertTrue(actualRange.isPresent()); + assertEquals(expectedVersionRange.max(), actualRange.get().max()); + assertEquals(expectedVersionRange.min(), actualRange.get().min()); } else { assertEquals(Optional.empty(), actualRange); } @@ -214,8 +232,8 @@ void testValidateAllConnectorsAreUpgraded(final boolean supportAutoUpgrade) thro @ParameterizedTest @ValueSource(booleans = {true, false}) void testValidateBadUpgradeMissingSource(final boolean supportAutoUpgrade) throws Exception { + final AirbyteProtocolVersionRange expectedVersionRange = new AirbyteProtocolVersionRange(V1_0_0, V2_0_0); setCurrentProtocolRangeRange(V0_0_0, V1_0_0); - setTargetProtocolRangeRange(V1_0_0, V2_0_0); final UUID source1 = UUID.randomUUID(); final UUID source2 = UUID.randomUUID(); @@ -236,6 +254,8 @@ void testValidateBadUpgradeMissingSource(final boolean supportAutoUpgrade) throw Map.entry(dest1, V1_0_0), Map.entry(dest2, V1_0_0))); + final ProtocolVersionChecker protocolVersionChecker = + new ProtocolVersionChecker(jobPersistence, expectedVersionRange, configRepository, Optional.of(definitionsProvider)); final Optional actualRange = protocolVersionChecker.validate(supportAutoUpgrade); assertEquals(Optional.empty(), actualRange); } @@ -243,8 +263,8 @@ void testValidateBadUpgradeMissingSource(final boolean supportAutoUpgrade) throw @ParameterizedTest @ValueSource(booleans = {true, false}) void testValidateBadUpgradeMissingDestination(final boolean supportAutoUpgrade) throws Exception { + final AirbyteProtocolVersionRange expectedVersionRange = new AirbyteProtocolVersionRange(V1_0_0, V2_0_0); setCurrentProtocolRangeRange(V0_0_0, V1_0_0); - setTargetProtocolRangeRange(V1_0_0, V2_0_0); final UUID source1 = UUID.randomUUID(); final UUID source2 = UUID.randomUUID(); @@ -265,6 +285,8 @@ void testValidateBadUpgradeMissingDestination(final boolean supportAutoUpgrade) Map.entry(dest1, V1_0_0), Map.entry(dest2, V0_0_0))); + final ProtocolVersionChecker protocolVersionChecker = + new ProtocolVersionChecker(jobPersistence, expectedVersionRange, configRepository, Optional.of(definitionsProvider)); final Optional actualRange = protocolVersionChecker.validate(supportAutoUpgrade); assertEquals(Optional.empty(), actualRange); } @@ -272,10 +294,10 @@ void testValidateBadUpgradeMissingDestination(final boolean supportAutoUpgrade) @ParameterizedTest @ValueSource(booleans = {true, false}) void testValidateFailsOnProtocolRangeChangeWithoutDefinitionsProvider(final boolean supportAutoUpgrade) throws Exception { - protocolVersionChecker = new ProtocolVersionChecker(jobPersistence, configs, configRepository, Optional.empty()); - + final AirbyteProtocolVersionRange expectedVersionRange = new AirbyteProtocolVersionRange(V1_0_0, V2_0_0); setCurrentProtocolRangeRange(V0_0_0, V1_0_0); - setTargetProtocolRangeRange(V1_0_0, V2_0_0); + final ProtocolVersionChecker protocolVersionChecker = + new ProtocolVersionChecker(jobPersistence, expectedVersionRange, configRepository, Optional.empty()); final UUID source1 = UUID.randomUUID(); final UUID dest1 = UUID.randomUUID(); @@ -292,10 +314,10 @@ void testValidateFailsOnProtocolRangeChangeWithoutDefinitionsProvider(final bool @ParameterizedTest @ValueSource(booleans = {true, false}) void testValidateSucceedsWhenNoProtocolRangeChangeWithoutDefinitionsProvider(final boolean supportAutoUpgrade) throws Exception { - protocolVersionChecker = new ProtocolVersionChecker(jobPersistence, configs, configRepository, Optional.empty()); - + final AirbyteProtocolVersionRange expectedVersionRange = new AirbyteProtocolVersionRange(V0_0_0, V2_0_0); setCurrentProtocolRangeRange(V0_0_0, V2_0_0); - setTargetProtocolRangeRange(V0_0_0, V2_0_0); + final ProtocolVersionChecker protocolVersionChecker = + new ProtocolVersionChecker(jobPersistence, expectedVersionRange, configRepository, Optional.empty()); final UUID source1 = UUID.randomUUID(); final UUID dest1 = UUID.randomUUID(); @@ -306,7 +328,9 @@ void testValidateSucceedsWhenNoProtocolRangeChangeWithoutDefinitionsProvider(fin when(configRepository.getActorDefinitionToProtocolVersionMap()).thenReturn(initialActorDefinitions); final Optional actualRange = protocolVersionChecker.validate(supportAutoUpgrade); - assertEquals(Optional.of(new AirbyteProtocolVersionRange(V0_0_0, V2_0_0)), actualRange); + assertTrue(actualRange.isPresent()); + assertEquals(expectedVersionRange.max(), actualRange.get().max()); + assertEquals(expectedVersionRange.min(), actualRange.get().min()); } private void setCurrentProtocolRangeRange(final Version min, final Version max) throws IOException { @@ -315,11 +339,6 @@ private void setCurrentProtocolRangeRange(final Version min, final Version max) when(jobPersistence.getAirbyteProtocolVersionMax()).thenReturn(Optional.of(max)); } - private void setTargetProtocolRangeRange(final Version min, final Version max) throws IOException { - when(configs.getAirbyteProtocolVersionMin()).thenReturn(min); - when(configs.getAirbyteProtocolVersionMax()).thenReturn(max); - } - private void setNewDestinationDefinitions(final List> defs) { final List destDefinitions = defs.stream() .map(e -> new StandardDestinationDefinition() diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/SecretPersistenceBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/SecretPersistenceBeanFactory.java index 3f7868801afb9..4b625901606f2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/SecretPersistenceBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/SecretPersistenceBeanFactory.java @@ -5,6 +5,7 @@ package io.airbyte.workers.config; import io.airbyte.commons.temporal.config.WorkerMode; +import io.airbyte.config.persistence.split_secrets.AWSSecretManagerPersistence; import io.airbyte.config.persistence.split_secrets.GoogleSecretManagerPersistence; import io.airbyte.config.persistence.split_secrets.LocalTestingSecretPersistence; import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator; @@ -33,6 +34,8 @@ public class SecretPersistenceBeanFactory { pattern = "(?i)^(?!google_secret_manager).*") @Requires(property = "airbyte.secret.persistence", pattern = "(?i)^(?!vault).*") + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^(?!aws_secret_manager).*") @Requires(env = WorkerMode.CONTROL_PLANE) @Named("secretPersistence") public SecretPersistence defaultSecretPersistence(@Named("configDatabase") final Database configDatabase) { @@ -68,6 +71,16 @@ public SecretPersistence vaultSecretPersistence(@Value("${airbyte.secret.store.v return new VaultSecretPersistence(address, prefix, token); } + @Singleton + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^aws_secret_manager$") + @Requires(env = WorkerMode.CONTROL_PLANE) + @Named("secretPersistence") + public SecretPersistence awsSecretPersistence(@Value("${airbyte.secret.store.aws.access-key}") final String awsAccessKey, + @Value("${airbyte.secret.store.aws.secret-key}") final String awsSecretKey) { + return new AWSSecretManagerPersistence(awsAccessKey, awsSecretKey); + } + @Singleton @Requires(property = "airbyte.acceptance.test.enabled", value = "true") diff --git a/airbyte-workers/src/main/resources/application.yml b/airbyte-workers/src/main/resources/application.yml index 38452982765c5..b3c552ab798bc 100644 --- a/airbyte-workers/src/main/resources/application.yml +++ b/airbyte-workers/src/main/resources/application.yml @@ -184,6 +184,9 @@ airbyte: secret: persistence: ${SECRET_PERSISTENCE:TESTING_CONFIG_DB_TABLE} store: + aws: + access-key: ${AWS_ACCESS_KEY:} + secret-key: ${AWS_SECRET_ACCESS_KEY:} gcp: credentials: ${SECRET_STORE_GCP_CREDENTIALS:} project-id: ${SECRET_STORE_GCP_PROJECT_ID:} diff --git a/charts/airbyte/templates/env-configmap.yaml b/charts/airbyte/templates/env-configmap.yaml index a219395275e45..117899483dcef 100644 --- a/charts/airbyte/templates/env-configmap.yaml +++ b/charts/airbyte/templates/env-configmap.yaml @@ -11,6 +11,7 @@ metadata: data: AIRBYTE_VERSION: {{ .Values.version | default .Chart.AppVersion }} API_URL: {{ .Values.webapp.api.url }} + BOOTLOADER_MIGRATION_BASELINE_VERSION: "0.29.0.001" CONNECTOR_BUILDER_API_URL: {{ .Values.webapp.connectorBuilderServer.url | quote }} CONFIG_ROOT: /configs CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION: "0.35.15.001" diff --git a/docker-compose.yaml b/docker-compose.yaml index 095f67bc94c67..2a6e0d61c132a 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -24,9 +24,7 @@ services: container_name: airbyte-bootloader environment: - AIRBYTE_VERSION=${VERSION} - - CONFIG_DATABASE_PASSWORD=${CONFIG_DATABASE_PASSWORD:-} - - CONFIG_DATABASE_URL=${CONFIG_DATABASE_URL:-} - - CONFIG_DATABASE_USER=${CONFIG_DATABASE_USER:-} + - BOOTLOADER_MIGRATION_BASELINE_VERSION=${BOOTLOADER_MIGRATION_BASELINE_VERSION:-} - DATABASE_PASSWORD=${DATABASE_PASSWORD} - DATABASE_URL=${DATABASE_URL} - DATABASE_USER=${DATABASE_USER}