Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate airbyte-bootloader to Micronaut #21073

Merged
merged 25 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3a02111
Migrate airbyte-bootloader to Micronaut
jdpgrailsdev Jan 5, 2023
8b66991
PR feedback
jdpgrailsdev Jan 5, 2023
71fd5e8
More PR feedback
jdpgrailsdev Jan 5, 2023
cdd12d9
Rename variable for clarity
jdpgrailsdev Jan 5, 2023
91f24b7
Add properties to support cloud
jdpgrailsdev Jan 5, 2023
24bf6ba
Formatting
jdpgrailsdev Jan 5, 2023
3979792
Use default values for env vars
jdpgrailsdev Jan 6, 2023
2f5387b
Re-organization to support cloud overrides
jdpgrailsdev Jan 9, 2023
629945b
Disable conditional logging
jdpgrailsdev Jan 9, 2023
92d34ef
More singleton cleanup
jdpgrailsdev Jan 9, 2023
6a89ce2
Merge branch 'master' into jonathan/airbyte-bootloader-micronaut
jdpgrailsdev Jan 9, 2023
4f8e837
Merge branch 'master' into jonathan/airbyte-bootloader-micronaut
jdpgrailsdev Jan 10, 2023
ca72ec8
Merge branch 'master' into jonathan/airbyte-bootloader-micronaut
jdpgrailsdev Jan 10, 2023
c73c59a
Merge branch 'master' into jonathan/airbyte-bootloader-micronaut
jdpgrailsdev Jan 10, 2023
8fa0f74
test: try CI without fluentbit
perangel Jan 10, 2023
f0bdb5f
Merge branch 'master' into jonathan/airbyte-bootloader-micronaut
jdpgrailsdev Jan 10, 2023
b8fbff0
Merge branch 'master' into jonathan/airbyte-bootloader-micronaut
jdpgrailsdev Jan 11, 2023
53dccb1
Revert "test: try CI without fluentbit"
perangel Jan 11, 2023
e4867aa
test: enable SSH on EC2 runner
perangel Jan 11, 2023
e8873ee
Revert "test: enable SSH on EC2 runner"
perangel Jan 11, 2023
6806d9c
Merge branch 'master' into jonathan/airbyte-bootloader-micronaut
jdpgrailsdev Jan 11, 2023
c22ebdd
Avoid early database connection on startup
jdpgrailsdev Jan 11, 2023
8992df0
Fix compile issues from refactor
jdpgrailsdev Jan 11, 2023
1c82df0
Formatting
jdpgrailsdev Jan 11, 2023
07c751c
Merge branch 'master' into jonathan/airbyte-bootloader-micronaut
jdpgrailsdev Jan 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
This application runs at start up for Airbyte. It is responsible for making sure that the environment is upgraded and in a good state. e.g. It makes sure the database has been migrated to the correct version.

## Entrypoint
* BootloaderApp.java - has the main method for running the bootloader.
* Application.java - has the main method for running the bootloader.
48 changes: 43 additions & 5 deletions airbyte-bootloader/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,21 @@ plugins {
}

dependencies {
annotationProcessor platform(libs.micronaut.bom)
annotationProcessor libs.bundles.micronaut.annotation.processor

implementation platform(libs.micronaut.bom)
implementation libs.bundles.micronaut

// Ensure that the versions defined in deps.toml are used
// instead of versions from transitive dependencies
implementation (libs.flyway.core) {
force = true
}
implementation (libs.jooq) {
force = true
}

implementation project(':airbyte-config:init')
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-config:config-persistence')
Expand All @@ -11,19 +26,42 @@ dependencies {
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-persistence:job-persistence')

implementation libs.temporal.sdk
implementation libs.flyway.core
testAnnotationProcessor platform(libs.micronaut.bom)
testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, what happened to libs.flyway.core?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It got moved up in this block to ensure that we get the correct version.


testImplementation libs.platform.testcontainers.postgresql
testImplementation libs.bundles.micronaut.test
testImplementation libs.bundles.junit
testImplementation libs.junit.jupiter.system.stubs
testImplementation libs.platform.testcontainers.postgresql
}

mainClassName = 'io.airbyte.bootloader.Application'

application {
applicationName = "airbyte-bootloader"
mainClass = 'io.airbyte.bootloader.BootloaderApp'
applicationName = project.name
mainClass = mainClassName
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
}

Properties env = new Properties()
rootProject.file('.env.dev').withInputStream { env.load(it) }

run {
// default for running on local machine.
env.each { entry ->
environment entry.getKey(), entry.getValue()
}

environment 'AIRBYTE_ROLE', System.getenv('AIRBYTE_ROLE')
environment 'AIRBYTE_VERSION', env.VERSION
environment 'DATABASE_URL', 'jdbc:postgresql://localhost:5432/airbyte'
}

test {
// Required to enable mocked beans
systemProperty("mockito.test.enabled", "true")
}

// produce reproducible archives
// (see https://docs.gradle.org/current/userguide/working_with_files.html#sec:reproducible_archives)
tasks.withType(AbstractArchiveTask) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.bootloader;

import io.micronaut.context.ApplicationContext;
import io.micronaut.runtime.Micronaut;
import lombok.extern.slf4j.Slf4j;

/**
* Main application entry point responsible for starting the server and invoking the bootstrapping
* of the Airbyte environment.
*/
@Slf4j
public class Application {

public static void main(final String[] args) {
try {
final ApplicationContext applicationContext = Micronaut.run(Application.class, args);
final Bootloader bootloader = applicationContext.getBean(Bootloader.class);
bootloader.load();
System.exit(0);
} catch (final Exception e) {
log.error("Unable to bootstrap Airbyte environment.", e);
System.exit(-1);
}
}

}
244 changes: 244 additions & 0 deletions airbyte-bootloader/src/main/java/io/airbyte/bootloader/Bootloader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.bootloader;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Geography;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.init.DefinitionsProvider;
import io.airbyte.config.init.PostLoadExecutor;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.db.init.DatabaseInitializationException;
import io.airbyte.db.init.DatabaseInitializer;
import io.airbyte.db.instance.DatabaseMigrator;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.validation.json.JsonValidationException;
import io.micronaut.context.annotation.Value;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;

/**
* Ensures that the databases are migrated to the appropriate level.
*/
@Singleton
@Slf4j
public class Bootloader {

private static final AirbyteVersion VERSION_BREAK = new AirbyteVersion("0.32.0-alpha");

private final boolean autoUpgradeConnectors;
private final ConfigRepository configRepository;
private final DatabaseMigrator configsDatabaseMigrator;
private final DatabaseInitializer configsDatabaseInitializer;
private final AirbyteVersion currentAirbyteVersion;
private final Optional<DefinitionsProvider> definitionsProvider;
private final FeatureFlags featureFlags;
private final DatabaseInitializer jobsDatabaseInitializer;
private final DatabaseMigrator jobsDatabaseMigrator;
private final JobPersistence jobPersistence;
private final PostLoadExecutor postLoadExecution;
private final ProtocolVersionChecker protocolVersionChecker;
private final boolean runMigrationOnStartup;
private final SecretMigrator secretMigrator;

public Bootloader(
@Value("${airbyte.bootloader.auto-upgrade-connectors}") final boolean autoUpgradeConnectors,
final ConfigRepository configRepository,
@Named("configsDatabaseInitializer") final DatabaseInitializer configsDatabaseInitializer,
@Named("configsDatabaseMigrator") final DatabaseMigrator configsDatabaseMigrator,
final AirbyteVersion currentAirbyteVersion,
final Optional<DefinitionsProvider> definitionsProvider,
final FeatureFlags featureFlags,
@Named("jobsDatabaseInitializer") final DatabaseInitializer jobsDatabaseInitializer,
@Named("jobsDatabaseMigrator") final DatabaseMigrator jobsDatabaseMigrator,
final JobPersistence jobPersistence,
final ProtocolVersionChecker protocolVersionChecker,
@Value("${airbyte.bootloader.run-migration-on-startup}") final boolean runMigrationOnStartup,
final SecretMigrator secretMigrator,
final PostLoadExecutor postLoadExecution) {
this.autoUpgradeConnectors = autoUpgradeConnectors;
this.configRepository = configRepository;
this.configsDatabaseInitializer = configsDatabaseInitializer;
this.configsDatabaseMigrator = configsDatabaseMigrator;
this.currentAirbyteVersion = currentAirbyteVersion;
this.definitionsProvider = definitionsProvider;
this.featureFlags = featureFlags;
this.jobsDatabaseInitializer = jobsDatabaseInitializer;
this.jobsDatabaseMigrator = jobsDatabaseMigrator;
this.jobPersistence = jobPersistence;
this.protocolVersionChecker = protocolVersionChecker;
this.runMigrationOnStartup = runMigrationOnStartup;
this.secretMigrator = secretMigrator;
this.postLoadExecution = postLoadExecution;
}

/**
* Performs all required bootstrapping for the Airbyte environment. This includes the following:
* <ul>
* <li>Initializes the databases</li>
* <li>Check database migration compatibility</li>
* <li>Check protocol version compatibility</li>
* <li>Migrate databases</li>
* <li>Create default workspace</li>
* <li>Create default deployment</li>
* <li>Perform post migration tasks</li>
* </ul>
*
* @throws Exception if unable to perform any of the bootstrap operations.
*/
public void load() throws Exception {
log.info("Initializing databases...");
initializeDatabases();

log.info("Checking migration compatibility...");
assertNonBreakingMigration(jobPersistence, currentAirbyteVersion);

log.info("Checking protocol version constraints...");
assertNonBreakingProtocolVersionConstraints(protocolVersionChecker, jobPersistence, autoUpgradeConnectors);

log.info("Running database migrations...");
runFlywayMigration(runMigrationOnStartup, configsDatabaseMigrator, jobsDatabaseMigrator);

log.info("Creating workspace (if none exists)...");
createWorkspaceIfNoneExists(configRepository);

log.info("Creating deployment (if none exists)...");
createDeploymentIfNoneExists(jobPersistence);

final String airbyteVersion = currentAirbyteVersion.serialize();
log.info("Setting Airbyte version to '{}'...", airbyteVersion);
jobPersistence.setVersion(airbyteVersion);
log.info("Set version to '{}'", airbyteVersion);

if (postLoadExecution != null) {
postLoadExecution.execute();
log.info("Finished running post load Execution.");
}

log.info("Finished bootstrapping Airbyte environment.");
}

private void assertNonBreakingMigration(final JobPersistence jobPersistence, final AirbyteVersion airbyteVersion)
throws IOException {
// version in the database when the server main method is called. may be empty if this is the first
// time the server is started.
log.info("Checking for illegal upgrade...");
final Optional<AirbyteVersion> initialAirbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new);
if (!isLegalUpgrade(initialAirbyteDatabaseVersion.orElse(null), airbyteVersion)) {
final String attentionBanner = MoreResources.readResource("banner/attention-banner.txt");
log.error(attentionBanner);
final String message = String.format(
"Cannot upgrade from version %s to version %s directly. First you must upgrade to version %s. After that upgrade is complete, you may upgrade to version %s",
initialAirbyteDatabaseVersion.get().serialize(),
airbyteVersion.serialize(),
VERSION_BREAK.serialize(),
airbyteVersion.serialize());

log.error(message);
throw new RuntimeException(message);
}
}

private void assertNonBreakingProtocolVersionConstraints(final ProtocolVersionChecker protocolVersionChecker,
final JobPersistence jobPersistence,
final boolean autoUpgradeConnectors)
throws Exception {
final Optional<AirbyteProtocolVersionRange> newProtocolRange = protocolVersionChecker.validate(autoUpgradeConnectors);
if (newProtocolRange.isEmpty()) {
throw new RuntimeException(
"Aborting bootloader to avoid breaking existing connection after an upgrade. " +
"Please address airbyte protocol version support issues in the connectors before retrying.");
}
trackProtocolVersion(jobPersistence, newProtocolRange.get());
}

private void createDeploymentIfNoneExists(final JobPersistence jobPersistence) throws IOException {
final Optional<UUID> deploymentOptional = jobPersistence.getDeployment();
if (deploymentOptional.isPresent()) {
log.info("Running deployment: {}", deploymentOptional.get());
} else {
final UUID deploymentId = UUID.randomUUID();
jobPersistence.setDeployment(deploymentId);
log.info("Created deployment: {}", deploymentId);
}
}

private void createWorkspaceIfNoneExists(final ConfigRepository configRepository) throws JsonValidationException, IOException {
if (!configRepository.listStandardWorkspaces(true).isEmpty()) {
log.info("Workspace already exists for the deployment.");
return;
}

final UUID workspaceId = UUID.randomUUID();
final StandardWorkspace workspace = new StandardWorkspace()
.withWorkspaceId(workspaceId)
.withCustomerId(UUID.randomUUID())
.withName(workspaceId.toString())
.withSlug(workspaceId.toString())
.withInitialSetupComplete(false)
.withDisplaySetupWizard(true)
.withTombstone(false)
.withDefaultGeography(Geography.AUTO);
// NOTE: it's safe to use the NoSecrets version since we know that the user hasn't supplied any
// secrets yet.
configRepository.writeStandardWorkspaceNoSecrets(workspace);
}

private void initializeDatabases() throws DatabaseInitializationException {
log.info("Initializing databases...");
configsDatabaseInitializer.initialize();
jobsDatabaseInitializer.initialize();
log.info("Databases initialized.");
}

@VisibleForTesting
boolean isLegalUpgrade(final AirbyteVersion airbyteDatabaseVersion, final AirbyteVersion airbyteVersion) {
// means there was no previous version so upgrade even needs to happen. always legal.
if (airbyteDatabaseVersion == null) {
log.info("No previous Airbyte Version set.");
return true;
}

log.info("Current Airbyte version: {}", airbyteDatabaseVersion);
log.info("Future Airbyte version: {}", airbyteVersion);
final var futureVersionIsAfterVersionBreak = airbyteVersion.greaterThan(VERSION_BREAK) || airbyteVersion.isDev();
final var isUpgradingThroughVersionBreak = airbyteDatabaseVersion.lessThan(VERSION_BREAK) && futureVersionIsAfterVersionBreak;
return !isUpgradingThroughVersionBreak;
}

private void runFlywayMigration(final boolean runDatabaseMigrationOnStartup,
final DatabaseMigrator configDbMigrator,
final DatabaseMigrator jobDbMigrator) {
log.info("Creating baseline for config database...");
configDbMigrator.createBaseline();
log.info("Creating baseline for job database...");
jobDbMigrator.createBaseline();

if (runDatabaseMigrationOnStartup) {
log.info("Migrating configs database...");
configDbMigrator.migrate();
log.info("Migrating jobs database...");
jobDbMigrator.migrate();
} else {
log.info("Auto database migration has been skipped.");
}
}

private void trackProtocolVersion(final JobPersistence jobPersistence, final AirbyteProtocolVersionRange protocolVersionRange)
throws IOException {
jobPersistence.setAirbyteProtocolVersionMin(protocolVersionRange.min());
jobPersistence.setAirbyteProtocolVersionMax(protocolVersionRange.max());
log.info("AirbyteProtocol version support range: [{}:{}]", protocolVersionRange.min().serialize(), protocolVersionRange.max().serialize());
}

}
Loading