Skip to content

Commit

Permalink
Only migrate active and disable connection (#9454)
Browse files Browse the repository at this point in the history
Filter out the deleted workflows.

Change a log to the right level.
  • Loading branch information
benmoriceau authored Jan 13, 2022
1 parent d992cce commit a495917
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void start() throws IOException {
// anymore.
cleanupZombies(jobPersistence, jobNotifier);

LOGGER.error("Start running the old scheduler");
LOGGER.info("Start running the old scheduler");
scheduleJobsPool.scheduleWithFixedDelay(
() -> {
MDC.setContextMap(mdc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.init.YamlSeedConfigPersistence;
import io.airbyte.config.persistence.ConfigNotFoundException;
Expand Down Expand Up @@ -235,7 +236,9 @@ private static void migrateExistingConnection(final ConfigRepository configRepos
throws JsonValidationException, ConfigNotFoundException, IOException {
LOGGER.info("Start migration to the new scheduler...");
final Set<UUID> connectionIds =
configRepository.listStandardSyncs().stream().map(standardSync -> standardSync.getConnectionId()).collect(Collectors.toSet());
configRepository.listStandardSyncs().stream()
.filter(standardSync -> standardSync.getStatus() == Status.ACTIVE || standardSync.getStatus() == Status.INACTIVE)
.map(standardSync -> standardSync.getConnectionId()).collect(Collectors.toSet());
temporalWorkerRunFactory.migrateSyncIfNeeded(connectionIds);
LOGGER.info("Done migrating to the new scheduler...");
}
Expand Down

0 comments on commit a495917

Please sign in to comment.