Skip to content

Commit

Permalink
Remove ConfigPersistence usage from SecretsMigrator (#18747)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Nov 1, 2022
1 parent 7b9a097 commit ebb9126
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
import io.airbyte.config.init.ApplyDefinitionsHelper;
import io.airbyte.config.init.DefinitionsProvider;
import io.airbyte.config.init.LocalDefinitionsProvider;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DataSourceFactory;
Expand Down Expand Up @@ -68,7 +72,7 @@ public class BootloaderApp {
private final Runnable postLoadExecution;
private final FeatureFlags featureFlags;
private final SecretMigrator secretMigrator;
private ConfigPersistence configPersistence;
private ConfigRepository configRepository;
private DefinitionsProvider localDefinitionsProvider;
private Database configDatabase;
private Database jobDatabase;
Expand Down Expand Up @@ -128,9 +132,6 @@ public BootloaderApp(final Configs configs,

postLoadExecution = () -> {
try {
final ConfigRepository configRepository =
new ConfigRepository(configPersistence, configDatabase);

final ApplyDefinitionsHelper applyDefinitionsHelper = new ApplyDefinitionsHelper(configRepository, localDefinitionsProvider);
applyDefinitionsHelper.apply();

Expand All @@ -141,7 +142,7 @@ public BootloaderApp(final Configs configs,
}
}
LOGGER.info("Loaded seed data..");
} catch (final IOException | JsonValidationException e) {
} catch (final IOException | JsonValidationException | ConfigNotFoundException e) {
throw new RuntimeException(e);
}
};
Expand Down Expand Up @@ -173,9 +174,6 @@ public void load() throws Exception {
runFlywayMigration(configs, configDbMigrator, jobDbMigrator);
LOGGER.info("Ran Flyway migrations.");

final ConfigRepository configRepository =
new ConfigRepository(configPersistence, configDatabase);

createWorkspaceIfNoneExists(configRepository);
LOGGER.info("Default workspace created.");

Expand Down Expand Up @@ -219,7 +217,7 @@ private static JobPersistence getJobPersistence(final Database jobDatabase) thro
private void initPersistences(final DSLContext configsDslContext, final DSLContext jobsDslContext) {
try {
configDatabase = getConfigDatabase(configsDslContext);
configPersistence = getConfigPersistence(configDatabase);
configRepository = new ConfigRepository(getConfigPersistence(configDatabase), configDatabase);
localDefinitionsProvider = getLocalDefinitionsProvider();
jobDatabase = getJobDatabase(jobsDslContext);
jobPersistence = getJobPersistence(jobDatabase);
Expand All @@ -244,10 +242,17 @@ public static void main(final String[] args) throws Exception {
// TODO Will be converted to an injected singleton during DI migration
final Database configDatabase = getConfigDatabase(configsDslContext);
final ConfigPersistence configPersistence = getConfigPersistence(configDatabase);
final ConfigRepository configRepository = new ConfigRepository(configPersistence, configDatabase);
final Database jobDatabase = getJobDatabase(jobsDslContext);
final JobPersistence jobPersistence = getJobPersistence(jobDatabase);

final SecretsHydrator secretsHydrator = SecretPersistence.getSecretsHydrator(configsDslContext, configs);
final Optional<SecretPersistence> secretPersistence = SecretPersistence.getLongLived(configsDslContext, configs);
final SecretsRepositoryReader secretsRepositoryReader = new SecretsRepositoryReader(configRepository, secretsHydrator);
final SecretsRepositoryWriter secretsRepositoryWriter = new SecretsRepositoryWriter(configRepository, secretPersistence, Optional.empty());

final SecretMigrator secretMigrator =
new SecretMigrator(configPersistence, jobPersistence, SecretPersistence.getLongLived(configsDslContext, configs));
new SecretMigrator(secretsRepositoryReader, secretsRepositoryWriter, configRepository, jobPersistence, secretPersistence);
final Flyway configsFlyway = FlywayFactory.create(configsDataSource, BootloaderApp.class.getSimpleName(), ConfigsDatabaseMigrator.DB_IDENTIFIER,
ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION);
final Flyway jobsFlyway = FlywayFactory.create(jobsDataSource, BootloaderApp.class.getSimpleName(), JobsDatabaseMigrator.DB_IDENTIFIER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,27 @@

package io.airbyte.bootloader;

import static io.airbyte.config.persistence.split_secrets.SecretsHelpers.COORDINATE_FIELD;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.JsonPaths;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.split_secrets.SecretCoordinate;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.config.persistence.split_secrets.SecretsHelpers;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Value;
Expand All @@ -37,7 +34,9 @@
@Slf4j
public class SecretMigrator {

private final ConfigPersistence configPersistence;
private final SecretsRepositoryReader secretsReader;
private final SecretsRepositoryWriter secretsWriter;
private final ConfigRepository configRepository;
private final JobPersistence jobPersistence;
private final Optional<SecretPersistence> secretPersistence;

Expand All @@ -55,34 +54,39 @@ static class ConnectorConfiguration {
* Then for all the secret that are stored in a plain text format, it will save the plain text in
* the secret manager and store the coordinate in the config DB.
*/
public void migrateSecrets() throws JsonValidationException, IOException {
public void migrateSecrets() throws JsonValidationException, IOException, ConfigNotFoundException {
if (secretPersistence.isEmpty()) {
log.info("No secret persistence is provided, the migration won't be run ");

return;
}
final List<StandardSourceDefinition> standardSourceDefinitions =
configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class);
final List<StandardSourceDefinition> standardSourceDefinitions = configRepository.listStandardSourceDefinitions(true);

final Map<UUID, JsonNode> definitionIdToSourceSpecs = standardSourceDefinitions
.stream().collect(Collectors.toMap(StandardSourceDefinition::getSourceDefinitionId,
def -> def.getSpec().getConnectionSpecification()));
final Map<UUID, ConnectorSpecification> definitionIdToSourceSpecs = standardSourceDefinitions
.stream().collect(Collectors.toMap(StandardSourceDefinition::getSourceDefinitionId, StandardSourceDefinition::getSpec));

final List<SourceConnection> sources = configPersistence.listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class);
final List<SourceConnection> sourcesWithoutSecrets = configRepository.listSourceConnection();
final List<SourceConnection> sourcesWithSecrets = new ArrayList<>();
for (final SourceConnection source : sourcesWithoutSecrets) {
final SourceConnection sourceWithSecrets = secretsReader.getSourceConnectionWithSecrets(source.getSourceId());
sourcesWithSecrets.add(sourceWithSecrets);
}

migrateSources(sources, definitionIdToSourceSpecs);
migrateSources(sourcesWithSecrets, definitionIdToSourceSpecs);

final List<StandardDestinationDefinition> standardDestinationDefinitions =
configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION,
StandardDestinationDefinition.class);
final List<StandardDestinationDefinition> standardDestinationDefinitions = configRepository.listStandardDestinationDefinitions(true);

final Map<UUID, JsonNode> definitionIdToDestinationSpecs = standardDestinationDefinitions.stream()
.collect(Collectors.toMap(StandardDestinationDefinition::getDestinationDefinitionId,
def -> def.getSpec().getConnectionSpecification()));
final Map<UUID, ConnectorSpecification> definitionIdToDestinationSpecs = standardDestinationDefinitions.stream()
.collect(Collectors.toMap(StandardDestinationDefinition::getDestinationDefinitionId, StandardDestinationDefinition::getSpec));

final List<DestinationConnection> destinations = configPersistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class);
final List<DestinationConnection> destinationsWithoutSecrets = configRepository.listDestinationConnection();
final List<DestinationConnection> destinationsWithSecrets = new ArrayList<>();
for (final DestinationConnection destination : destinationsWithoutSecrets) {
final DestinationConnection destinationWithoutSecrets = secretsReader.getDestinationConnectionWithSecrets(destination.getDestinationId());
destinationsWithSecrets.add(destinationWithoutSecrets);
}

migrateDestinations(destinations, definitionIdToDestinationSpecs);
migrateDestinations(destinationsWithSecrets, definitionIdToDestinationSpecs);

jobPersistence.setSecretMigrationDone();
}
Expand All @@ -91,120 +95,46 @@ public void migrateSecrets() throws JsonValidationException, IOException {
* This is migrating the secrets for the source actors
*/
@VisibleForTesting
void migrateSources(final List<SourceConnection> sources, final Map<UUID, JsonNode> definitionIdToSourceSpecs)
void migrateSources(final List<SourceConnection> sources, final Map<UUID, ConnectorSpecification> definitionIdToSourceSpecs)
throws JsonValidationException, IOException {
log.info("Migrating Sources");
final List<SourceConnection> sourceConnections = sources.stream()
.map(source -> {
final JsonNode migratedConfig = migrateConfiguration(new ConnectorConfiguration(
source.getWorkspaceId(),
source.getConfiguration(),
definitionIdToSourceSpecs.get(source.getSourceDefinitionId())),
() -> UUID.randomUUID());
source.setConfiguration(migratedConfig);
return source;
})
.toList();

for (final SourceConnection source : sourceConnections) {
configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, source.getSourceId().toString(), source);
for (final SourceConnection source : sources) {
final Optional<ConnectorSpecification> specOptional = Optional.ofNullable(definitionIdToSourceSpecs.get(source.getSourceDefinitionId()));

if (specOptional.isPresent()) {
secretsWriter.writeSourceConnection(source, specOptional.get());
} else {
// if the spec can't be found, don't risk writing secrets to db. wipe out the configuration for the
// connector.
final SourceConnection sourceWithConfigRemoved = Jsons.clone(source);
sourceWithConfigRemoved.setConfiguration(Jsons.emptyObject());
secretsWriter.writeSourceConnection(sourceWithConfigRemoved, new ConnectorSpecification().withConnectionSpecification(Jsons.emptyObject()));
}
}
}

/**
* This is migrating the secrets for the destination actors
*/
@VisibleForTesting
void migrateDestinations(final List<DestinationConnection> destinations, final Map<UUID, JsonNode> definitionIdToDestinationSpecs)
void migrateDestinations(final List<DestinationConnection> destinations, final Map<UUID, ConnectorSpecification> definitionIdToDestinationSpecs)
throws JsonValidationException, IOException {
log.info("Migration Destinations");
for (final DestinationConnection destination : destinations) {
final Optional<ConnectorSpecification> specOptional =
Optional.ofNullable(definitionIdToDestinationSpecs.get(destination.getDestinationDefinitionId()));

final List<DestinationConnection> destinationConnections = destinations.stream().map(destination -> {
final JsonNode migratedConfig = migrateConfiguration(new ConnectorConfiguration(
destination.getWorkspaceId(),
destination.getConfiguration(),
definitionIdToDestinationSpecs.get(destination.getDestinationDefinitionId())),
() -> UUID.randomUUID());
destination.setConfiguration(migratedConfig);
return destination;
})
.toList();
for (final DestinationConnection destination : destinationConnections) {
configPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destination.getDestinationId().toString(), destination);
}
}

/**
* This is a generic method to migrate an actor configuration It will extract the secret path form
* the provided spec and then replace them by coordinates in the actor configuration
*/
@VisibleForTesting
JsonNode migrateConfiguration(final ConnectorConfiguration connectorConfiguration, final Supplier<UUID> uuidProvider) {
if (connectorConfiguration.getSpec() == null) {
throw new IllegalStateException("No connector definition to match the connector");
}

final AtomicReference<JsonNode> connectorConfigurationJson = new AtomicReference<>(connectorConfiguration.getConfiguration());
final List<String> uniqSecretPaths = getSecretPath(connectorConfiguration.getSpec())
.stream()
.flatMap(secretPath -> getAllExplodedPath(connectorConfigurationJson.get(), secretPath).stream())
.toList();

final UUID workspaceId = connectorConfiguration.getWorkspace();
uniqSecretPaths.forEach(secretPath -> {
final Optional<JsonNode> secretValue = getValueForPath(connectorConfigurationJson.get(), secretPath);
if (secretValue.isEmpty()) {
throw new IllegalStateException("Missing secret for the path: " + secretPath);
}

// Only migrate plain text.
if (secretValue.get().isTextual()) {
final JsonNode stringSecretValue = secretValue.get();

final SecretCoordinate coordinate =
new SecretCoordinate(SecretsHelpers.getCoordinatorBase("airbyte_workspace_", workspaceId, uuidProvider), 1);
secretPersistence.get().write(coordinate, stringSecretValue.textValue());
connectorConfigurationJson.set(replaceAtJsonNode(connectorConfigurationJson.get(), secretPath,
Jsons.jsonNode(Map.of(COORDINATE_FIELD, coordinate.getFullCoordinate()))));
if (specOptional.isPresent()) {
secretsWriter.writeDestinationConnection(destination, specOptional.get());
} else {
log.error("Not migrating already migrated secrets");
// if the spec can't be found, don't risk writing secrets to db. wipe out the configuration for the
// connector.
final DestinationConnection destinationWithConfigRemoved = Jsons.clone(destination);
destinationWithConfigRemoved.setConfiguration(Jsons.emptyObject());
secretsWriter.writeDestinationConnection(destinationWithConfigRemoved,
new ConnectorSpecification().withConnectionSpecification(Jsons.emptyObject()));
}

});

return connectorConfigurationJson.get();
}

/**
* Wrapper to help to mock static methods
*/
@VisibleForTesting
JsonNode replaceAtJsonNode(final JsonNode connectorConfigurationJson, final String secretPath, final JsonNode replacement) {
return JsonPaths.replaceAtJsonNode(connectorConfigurationJson, secretPath, replacement);
}

/**
* Wrapper to help to mock static methods
*/
@VisibleForTesting
List<String> getSecretPath(final JsonNode specs) {
return SecretsHelpers.getSortedSecretPaths(specs);
}

/**
* Wrapper to help to mock static methods
*/
@VisibleForTesting
List<String> getAllExplodedPath(final JsonNode node, final String path) {
return JsonPaths.getPaths(node, path);
}

/**
* Wrapper to help to mock static methods
*/
@VisibleForTesting
Optional<JsonNode> getValueForPath(final JsonNode node, final String path) {
return JsonPaths.getSingleValue(node, path);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.config.persistence.split_secrets.LocalTestingSecretPersistence;
import io.airbyte.config.persistence.split_secrets.RealSecretsHydrator;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DataSourceFactory;
Expand Down Expand Up @@ -185,10 +189,17 @@ void testBootloaderAppRunSecretMigration() throws Exception {
val jobDatabase = new JobsDatabaseTestProvider(jobsDslContext, jobsFlyway).create(false);

val configPersistence = new DatabaseConfigPersistence(configDatabase, jsonSecretsProcessor);
val configRepository = new ConfigRepository(configPersistence, configDatabase);
val jobsPersistence = new DefaultJobPersistence(jobDatabase);

val secretsPersistence = SecretPersistence.getLongLived(configsDslContext, mockedConfigs);
final LocalTestingSecretPersistence localTestingSecretPersistence = new LocalTestingSecretPersistence(configDatabase);

val secretsReader = new SecretsRepositoryReader(configRepository, new RealSecretsHydrator(localTestingSecretPersistence));
val secretsWriter = new SecretsRepositoryWriter(configRepository, secretsPersistence, Optional.empty());

val spiedSecretMigrator =
spy(new SecretMigrator(configPersistence, jobsPersistence, SecretPersistence.getLongLived(configsDslContext, mockedConfigs)));
spy(new SecretMigrator(secretsReader, secretsWriter, configRepository, jobsPersistence, secretsPersistence));

// Although we are able to inject mocked configs into the Bootloader, a particular migration in the
// configs database requires the env var to be set. Flyway prevents injection, so we dynamically set
Expand All @@ -202,7 +213,6 @@ void testBootloaderAppRunSecretMigration() throws Exception {
initBootloader.load();

final DefinitionsProvider localDefinitions = new LocalDefinitionsProvider(LocalDefinitionsProvider.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS);
final ConfigRepository configRepository = new ConfigRepository(configPersistence, configDatabase);
final ConfigPersistence localConfigPersistence = new DefinitionProviderToConfigPersistenceAdapter(localDefinitions);
configRepository.loadDataNoSecrets(localConfigPersistence);

Expand Down
Loading

0 comments on commit ebb9126

Please sign in to comment.