Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Ability to save/load secrets through google secrets manager. #5791

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
33c410f
Ability to save/load secrets through google secrets manager.
airbyte-jenny Sep 1, 2021
562bcad
Stored secrets in secrets persistence and test that they're stored pr…
airbyte-jenny Sep 8, 2021
3ef3e97
Improving integration tests for secrets manager
airbyte-jenny Sep 9, 2021
47d3a54
Include secrets store config for integration tests
airbyte-jenny Sep 10, 2021
2212ec9
Merged master in
airbyte-jenny Sep 10, 2021
b9b9ccf
Resolved data type errors after merge
airbyte-jenny Sep 10, 2021
6c6292f
Merged master
airbyte-jenny Sep 10, 2021
2dc1632
Implement no-op loadData
airbyte-jenny Sep 10, 2021
42fc14f
Handle connector definition id lookups for secrets store
airbyte-jenny Sep 10, 2021
da844ee
Made unit tests work with secrets manager in place
airbyte-jenny Sep 13, 2021
28d7db3
Fixed NPE on secrets manager feature flag
airbyte-jenny Sep 13, 2021
9de2a35
Exposed definitionId to repository lookup for use by ConfigRepository
airbyte-jenny Sep 13, 2021
b7ddf1b
Whitespace
airbyte-jenny Sep 13, 2021
ba444de
Merge branch 'master' into airbyte-jenny/config-secrets-store
airbyte-jenny Sep 14, 2021
2eda5c8
Merged master
airbyte-jenny Sep 16, 2021
084e5e6
Matched test API to regular
airbyte-jenny Sep 16, 2021
ef3c6b8
Bug fix
airbyte-jenny Sep 17, 2021
e159602
Merged master
airbyte-jenny Sep 17, 2021
ae6dcab
Fix error
airbyte-jenny Sep 17, 2021
0ec0cac
Merged master
airbyte-jenny Sep 20, 2021
ea0aebf
Bug fix to match new API
airbyte-jenny Sep 20, 2021
2e62d78
Cleaning up merge conflict remainders and bugs
airbyte-jenny Sep 20, 2021
c68f66e
Merged master
airbyte-jenny Sep 20, 2021
b093836
Include gcp credentials for secrets store
airbyte-jenny Sep 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@ public interface AirbyteConfig {
*/
File getConfigSchemaFile();

boolean isSecret();

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,44 +35,42 @@ public enum ConfigSchema implements AirbyteConfig {
STANDARD_WORKSPACE("StandardWorkspace.yaml",
StandardWorkspace.class,
standardWorkspace -> standardWorkspace.getWorkspaceId().toString(),
"workspaceId"),
"workspaceId", false),

// source
STANDARD_SOURCE_DEFINITION("StandardSourceDefinition.yaml",
StandardSourceDefinition.class,
standardSourceDefinition -> standardSourceDefinition.getSourceDefinitionId().toString(),
"sourceDefinitionId"),
"sourceDefinitionId", false),
SOURCE_CONNECTION("SourceConnection.yaml",
SourceConnection.class,
sourceConnection -> sourceConnection.getSourceId().toString(),
"sourceId"),
"sourceId", true),

// destination
STANDARD_DESTINATION_DEFINITION("StandardDestinationDefinition.yaml",
StandardDestinationDefinition.class,
standardDestinationDefinition -> standardDestinationDefinition.getDestinationDefinitionId().toString(),
"destinationDefinitionId"),
"destinationDefinitionId", false),
DESTINATION_CONNECTION("DestinationConnection.yaml",
DestinationConnection.class,
destinationConnection -> destinationConnection.getDestinationId().toString(),
"destinationId"),
"destinationId", true),

// sync
STANDARD_SYNC("StandardSync.yaml",
StandardSync.class,
standardSync -> standardSync.getConnectionId().toString(),
"connectionId"),
"connectionId", false),
STANDARD_SYNC_OPERATION("StandardSyncOperation.yaml",
StandardSyncOperation.class,
standardSyncOperation -> standardSyncOperation.getOperationId().toString(),
"operationId"),

standardSyncOperation -> standardSyncOperation.getOperationId().toString(), "operationId", false),
SOURCE_OAUTH_PARAM("SourceOAuthParameter.yaml", SourceOAuthParameter.class,
sourceOAuthParameter -> sourceOAuthParameter.getOauthParameterId().toString(),
"oauthParameterId"),
"oauthParameterId", false),
DESTINATION_OAUTH_PARAM("DestinationOAuthParameter.yaml", DestinationOAuthParameter.class,
destinationOAuthParameter -> destinationOAuthParameter.getOauthParameterId().toString(),
"oauthParameterId"),
"oauthParameterId", false),

STANDARD_SYNC_SUMMARY("StandardSyncSummary.yaml", StandardSyncSummary.class),

Expand All @@ -90,21 +88,25 @@ public enum ConfigSchema implements AirbyteConfig {
private final Class<?> className;
private final Function<?, String> extractId;
private final String idFieldName;
private final boolean isSecret;

<T> ConfigSchema(final String schemaFilename,
Class<T> className,
Function<T, String> extractId,
String idFieldName) {
String idFieldName,
boolean isSecret) {
this.schemaFilename = schemaFilename;
this.className = className;
this.extractId = extractId;
this.idFieldName = idFieldName;
this.isSecret = isSecret;
}

<T> ConfigSchema(final String schemaFilename,
Class<T> className) {
this.schemaFilename = schemaFilename;
this.className = className;
this.isSecret = false;
this.extractId = object -> {
throw new RuntimeException(className.getSimpleName() + " doesn't have an id");
};
Expand All @@ -120,6 +122,11 @@ public <T> Class<T> getClassName() {
return (Class<T>) className;
}

@Override
public boolean isSecret() {
return isSecret;
}

@Override
public <T> String getId(T object) {
if (getClassName().isInstance(object)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;

public interface ConfigPersistence {
Expand All @@ -46,6 +47,10 @@ public interface ConfigPersistence {

Map<String, Stream<JsonNode>> dumpConfigs() throws IOException;

void loadData(ConfigPersistence seedPersistence) throws IOException;
void loadData(ConfigPersistence seedPersistence, Set<String> connectorRepositoriesInUse) throws IOException;

Set<String> getRepositoriesFromDefinitionIds(Set<String> usedConnectorDefinitionIds) throws IOException;

Set<String> listDefinitionIdsInUseByConnectors() throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,31 @@
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Stream;

public class ConfigRepository {

private final ConfigPersistence persistence;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick. Maybe rename this variable to something like routinePersistence or nonSecrectsPersistence to distinguish it from the newly added one?

private final ConfigPersistence secretsPersistence;

public ConfigRepository(final ConfigPersistence persistence, final ConfigPersistence secretsPersistence) {
this.persistence = persistence;
this.secretsPersistence = secretsPersistence;
}

/**
* This should only be used for backward compatibility where a secret store isn't yet in use.
*/
public ConfigRepository(final ConfigPersistence persistence) {
this.persistence = persistence;
this.secretsPersistence = persistence;
}

public StandardWorkspace getStandardWorkspace(final UUID workspaceId, final boolean includeTombstone)
Expand Down Expand Up @@ -168,7 +180,7 @@ public void writeStandardDestinationDefinition(final StandardDestinationDefiniti
}

public SourceConnection getSourceConnection(final UUID sourceId) throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(ConfigSchema.SOURCE_CONNECTION, sourceId.toString(), SourceConnection.class);
return secretsPersistence.getConfig(ConfigSchema.SOURCE_CONNECTION, sourceId.toString(), SourceConnection.class);
}

public void writeSourceConnection(final SourceConnection source, final ConnectorSpecification connectorSpecification)
Expand All @@ -177,16 +189,16 @@ public void writeSourceConnection(final SourceConnection source, final Connector
final JsonSchemaValidator validator = new JsonSchemaValidator();
validator.ensure(connectorSpecification.getConnectionSpecification(), source.getConfiguration());

persistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, source.getSourceId().toString(), source);
secretsPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, source.getSourceId().toString(), source);
}

public List<SourceConnection> listSourceConnection() throws JsonValidationException, IOException {
return persistence.listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class);
return secretsPersistence.listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class);
}

public DestinationConnection getDestinationConnection(final UUID destinationId)
throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId.toString(), DestinationConnection.class);
return secretsPersistence.getConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId.toString(), DestinationConnection.class);
}

public void writeDestinationConnection(final DestinationConnection destinationConnection, final ConnectorSpecification connectorSpecification)
Expand All @@ -195,11 +207,11 @@ public void writeDestinationConnection(final DestinationConnection destinationCo
final JsonSchemaValidator validator = new JsonSchemaValidator();
validator.ensure(connectorSpecification.getConnectionSpecification(), destinationConnection.getConfiguration());

persistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationConnection.getDestinationId().toString(), destinationConnection);
secretsPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationConnection.getDestinationId().toString(), destinationConnection);
}

public List<DestinationConnection> listDestinationConnection() throws JsonValidationException, IOException {
return persistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class);
return secretsPersistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class);
}

public StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, IOException, ConfigNotFoundException {
Expand Down Expand Up @@ -277,15 +289,58 @@ public List<DestinationOAuthParameter> listDestinationOAuthParam() throws JsonVa
}

public void replaceAllConfigs(final Map<AirbyteConfig, Stream<?>> configs, final boolean dryRun) throws IOException {
persistence.replaceAllConfigs(configs, dryRun);
// if we're using a single persistence layer, this is the easy route.
if (persistence == secretsPersistence) {
persistence.replaceAllConfigs(configs, dryRun);
return;
}

// Or if we're using secrets storage, split into sets by where they should be stored.
Map<AirbyteConfig, Stream<?>> secretConfigs = new LinkedHashMap<AirbyteConfig, Stream<?>>();
Map<AirbyteConfig, Stream<?>> nonsecretConfigs = new LinkedHashMap<AirbyteConfig, Stream<?>>();
for (AirbyteConfig configType : configs.keySet()) {
if (configType.isSecret()) {
secretConfigs.put(configType, configs.get(configType));
} else {
nonsecretConfigs.put(configType, configs.get(configType));
}
}
// And store each type in its own persistence store.
persistence.replaceAllConfigs(nonsecretConfigs, dryRun);
secretsPersistence.replaceAllConfigs(secretConfigs, dryRun);
}

public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
return persistence.dumpConfigs();
Map<String, Stream<JsonNode>> bothConfigTypes = persistence.dumpConfigs();
// Secrets and nonsecrets might be stored separately, and if they are, go looking for the rest.
// No need to do that if they're in the same single store, because we don't need duplicates or
// double work.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick. This comment contains some pronouns that take a few seconds to understand: the rest, do that. Also the structure in replaceAllConfigs looks more clear than this one.

What about following the same structure as in replaceAllConfigs?

// If secrets and nonsecrets are stored in the same persistence,
// just return configs from one persistence becase we don't need duplicates or double work.
if (secretsPersistence == persistence) {
  return persistence.dumpConfigs();
}

// If secrets and nonsecrets are stored separately, return configs from both.
return MoreMaps.merge(persistence.dumpConfigs(), secretsPersistence.dumpConfigs());

if (secretsPersistence != persistence) {
bothConfigTypes.putAll(secretsPersistence.dumpConfigs());
}
return bothConfigTypes;
}

/**
* During migration, this runs through the list of connectors, determining their docker image and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Migration is a loaded term. It's more like "connector definition update" instead of "migration".

* version, for purposes of skipping upgrading in-use definitions.
*
* @return
Copy link
Contributor

@tuliren tuliren Sep 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick. This @return doc is empty and can be removed.

*/
public Set<String> listDefinitionsInUseByConnectors() throws IOException {
// Once we have secrets coordinates as references instead of storing the whole config, this lookup
// will be simpler again.
Set<String> definitionIds;
if (secretsPersistence != persistence) {
definitionIds = secretsPersistence.listDefinitionIdsInUseByConnectors();
} else {
definitionIds = persistence.listDefinitionIdsInUseByConnectors();
}
return persistence.getRepositoriesFromDefinitionIds(definitionIds);
}

public void loadData(ConfigPersistence seedPersistence) throws IOException {
persistence.loadData(seedPersistence);
public void loadData(ConfigPersistence seedPersistence, Set<String> connectorRepositoriesInUse) throws IOException {
persistence.loadData(seedPersistence, connectorRepositoriesInUse);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,24 @@ public void migrateFileConfigs(Configs serverConfigs) throws IOException {
}

@Override
public void loadData(ConfigPersistence seedConfigPersistence) throws IOException {
public void loadData(ConfigPersistence seedConfigPersistence, Set<String> connectorRepositoriesInUse) throws IOException {
database.transaction(ctx -> {
updateConfigsFromSeed(ctx, seedConfigPersistence);
final boolean isInitialized = ctx.fetchExists(AIRBYTE_CONFIGS);
if (isInitialized) {
updateConfigsFromSeed(ctx, seedConfigPersistence, connectorRepositoriesInUse);
} else {
copyConfigsFromSeed(ctx, seedConfigPersistence);
}
Copy link
Contributor

@tuliren tuliren Sep 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server start logic has been changed. The loadData method only updates the connector definitions now. It will only be called after the database has been initialized.

This is necessary to fix a bug during server launch. The rationale is that we cannot insert the seed or update the connector definitions as part of the initialization process. When the server launches, it can be on an old version, and the seed or the latest definitions may not be compatible with the old data structure. The server will run the migrations first, and then we call loadData to update the connectors.

So adding back the copyConfigsFromSeed is no not necessary here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry that this change was added recently. Let me know if you want to discuss this in person for clarification.

Copy link
Contributor

@tuliren tuliren Sep 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More context. The only thing we do to "initialize" the persistence is copying old configs from the file system if the user is running an old Airbyte version:
https://github.com/airbytehq/airbyte/blob/master/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java#L187

We call loadData after file-based and database migrations here:
https://github.com/airbytehq/airbyte/blob/master/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java#L248

return null;
});
}

/*
* public void loadData(ConfigPersistence seedConfigPersistence) throws IOException {
* database.transaction(ctx -> { updateConfigsFromSeed(ctx, seedConfigPersistence); return null; });
* }
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be deleted.


public ValidatingConfigPersistence withValidation() {
return new ValidatingConfigPersistence(this);
}
Expand Down Expand Up @@ -312,11 +323,10 @@ private ConnectorCounter(int newCount, int updateCount) {
}

@VisibleForTesting
void updateConfigsFromSeed(DSLContext ctx, ConfigPersistence seedConfigPersistence) throws SQLException {
LOGGER.info("Updating connector definitions from the seed if necessary...");
void updateConfigsFromSeed(DSLContext ctx, ConfigPersistence seedConfigPersistence, Set<String> connectorRepositoriesInUse) throws SQLException {
LOGGER.info("Config database has been initialized; updating connector definitions from the seed if necessary...");

try {
Set<String> connectorRepositoriesInUse = getConnectorRepositoriesInUse(ctx);
LOGGER.info("Connectors in use: {}", connectorRepositoriesInUse);

Map<String, ConnectorInfo> connectorRepositoryToInfoMap = getConnectorRepositoryToInfoMap(ctx);
Expand Down Expand Up @@ -474,31 +484,50 @@ Map<String, ConnectorInfo> getConnectorRepositoryToInfoMap(DSLContext ctx) {
* connectors can be added manually by users, and their config ids are not always the same
* as those in the seed.
*/
private Set<String> getConnectorRepositoriesInUse(DSLContext ctx) {
public Set<String> listDefinitionIdsInUseByConnectors() throws IOException {
Set<String> usedConnectorDefinitionIds = new HashSet<>();
// query for used source definitions
usedConnectorDefinitionIds.addAll(ctx
.select(field("config_blob ->> 'sourceDefinitionId'", SQLDataType.VARCHAR))
.from(AIRBYTE_CONFIGS)
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(ConfigSchema.SOURCE_CONNECTION.name()))
.fetch().stream()
.flatMap(row -> Stream.of(row.value1()))
.collect(Collectors.toSet()));
// query for used destination definitions
usedConnectorDefinitionIds.addAll(ctx
.select(field("config_blob ->> 'destinationDefinitionId'", SQLDataType.VARCHAR))
.from(AIRBYTE_CONFIGS)
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(ConfigSchema.DESTINATION_CONNECTION.name()))
.fetch().stream()
.flatMap(row -> Stream.of(row.value1()))
.collect(Collectors.toSet()));
database.transaction(ctx -> {
// query for used source definitions
usedConnectorDefinitionIds.addAll(ctx
.select(field("config_blob ->> 'sourceDefinitionId'", SQLDataType.VARCHAR))
.from(AIRBYTE_CONFIGS)
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(ConfigSchema.SOURCE_CONNECTION.name()))
.fetch().stream()
.flatMap(row -> Stream.of(row.value1()))
.collect(Collectors.toSet()));
// query for used destination definitions
usedConnectorDefinitionIds.addAll(ctx
.select(field("config_blob ->> 'destinationDefinitionId'", SQLDataType.VARCHAR))
.from(AIRBYTE_CONFIGS)
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(ConfigSchema.DESTINATION_CONNECTION.name()))
.fetch().stream()
.flatMap(row -> Stream.of(row.value1()))
.collect(Collectors.toSet()));
return null;
});
return usedConnectorDefinitionIds;
}

public Set<String> getRepositoriesFromDefinitionIds(Set<String> usedConnectorDefinitionIds) throws IOException {
final Set<String> reps = new HashSet<String>();
// reps.addAll(ctx.select(field("config_blob ->> 'dockerRepository'", SQLDataType.VARCHAR))
database.transaction(ctx -> {
System.out.println(ctx.select(field("config_blob ->> 'dockerRepository'", SQLDataType.VARCHAR))
.from(AIRBYTE_CONFIGS).fetch());
return null;
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for local debugging only, right? It's a printout statement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The local variable reps is unnecessary once the debugging code is removed. The following should be sufficient for this method:

return ctx.select(field("config_blob ->> 'dockerRepository'", SQLDataType.VARCHAR))
        .from(AIRBYTE_CONFIGS)
        .where(AIRBYTE_CONFIGS.CONFIG_ID.in(usedConnectorDefinitionIds))
        .fetch().stream()
        .map(Record1::value1)
        .collect(Collectors.toSet());


database.transaction(ctx -> {
reps.addAll(ctx.select(field("config_blob ->> 'dockerRepository'", SQLDataType.VARCHAR))
.from(AIRBYTE_CONFIGS)
.where(AIRBYTE_CONFIGS.CONFIG_ID.in(usedConnectorDefinitionIds))
.fetch().stream()
.map(Record1::value1)
.collect(Collectors.toSet()));
return null;
});
return reps;

return ctx.select(field("config_blob ->> 'dockerRepository'", SQLDataType.VARCHAR))
.from(AIRBYTE_CONFIGS)
.where(AIRBYTE_CONFIGS.CONFIG_ID.in(usedConnectorDefinitionIds))
.fetch().stream()
.map(Record1::value1)
.collect(Collectors.toSet());
}

}
Loading