Skip to content

Commit

Permalink
Public api update sources (#6537)
Browse files Browse the repository at this point in the history
  • Loading branch information
JonsSpaghetti committed May 22, 2023
1 parent 02a7d2b commit 15d6505
Show file tree
Hide file tree
Showing 8 changed files with 368 additions and 18 deletions.
40 changes: 40 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,29 @@ paths:
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/sources/partial_update:
post:
tags:
- source
summary: Partially update a source
operationId: partialUpdateSource
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/PartialSourceUpdate"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/SourceRead"
"404":
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/sources/list:
post:
tags:
Expand Down Expand Up @@ -3439,6 +3462,20 @@ components:
type: boolean
notifySchemaChange:
type: boolean
PartialSourceUpdate:
type: object
required:
- sourceId
properties:
sourceId:
$ref: "#/components/schemas/SourceId"
connectionConfiguration:
$ref: "#/components/schemas/SourceConfiguration"
name:
type: string
secretId:
example: "airbyte_oauth_workspace_0509f049-d671-48cb-8105-0a23d47e6db6_secret_e0d38206-034e-4d75-9d21-da5a99b02826_v1"
type: string
SourceUpdate:
type: object
required:
Expand All @@ -3452,6 +3489,9 @@ components:
$ref: "#/components/schemas/SourceConfiguration"
name:
type: string
secretId:
example: "airbyte_oauth_workspace_0509f049-d671-48cb-8105-0a23d47e6db6_secret_e0d38206-034e-4d75-9d21-da5a99b02826_v1"
type: string
SourceRead:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airbyte.validation.json.JsonValidationException;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;

/**
Expand Down Expand Up @@ -82,6 +83,30 @@ public SourceConnection source(final UUID sourceId, final String sourceName, fin
return Jsons.clone(persistedSource).withConfiguration(updatedConfiguration);
}

/**
* Partially update the configuration object for a source.
*
* @param sourceId source id
* @param sourceName name of source
* @param newConfiguration new configuration
* @return updated source configuration
* @throws ConfigNotFoundException thrown if the source does not exist
* @throws IOException thrown if exception while interacting with the db
* @throws JsonValidationException thrown if newConfiguration is invalid json
*/
public SourceConnection partialSource(final UUID sourceId, final String sourceName, final JsonNode newConfiguration)
throws ConfigNotFoundException, IOException, JsonValidationException {
// get existing source
final SourceConnection persistedSource = secretsRepositoryReader.getSourceConnectionWithSecrets(sourceId);
persistedSource.setName(Optional.ofNullable(sourceName).orElse(persistedSource.getName()));

// Merge update configuration into the persisted configuration
JsonNode mergeConfiguration = Optional.ofNullable(newConfiguration).orElse(persistedSource.getConfiguration());
final JsonNode updatedConfiguration = Jsons.mergeNodes(persistedSource.getConfiguration(), mergeConfiguration);

return Jsons.clone(persistedSource).withConfiguration(updatedConfiguration);
}

/**
* Update the configuration object for a destination.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.DiscoverCatalogResult;
import io.airbyte.api.model.generated.ListResourcesForWorkspacesRequestBody;
import io.airbyte.api.model.generated.PartialSourceUpdate;
import io.airbyte.api.model.generated.SourceCloneConfiguration;
import io.airbyte.api.model.generated.SourceCloneRequestBody;
import io.airbyte.api.model.generated.SourceCreate;
Expand Down Expand Up @@ -129,6 +130,23 @@ public SourceRead createSourceWithOptionalSecret(final SourceCreate sourceCreate
return createSource(sourceCreate);
}

public SourceRead updateSourceWithOptionalSecret(final PartialSourceUpdate partialSourceUpdate)
throws JsonValidationException, ConfigNotFoundException, IOException {
final ConnectorSpecification spec = getSpecFromSourceId(partialSourceUpdate.getSourceId());
if (partialSourceUpdate.getSecretId() != null && !partialSourceUpdate.getSecretId().isBlank()) {
final JsonNode hydratedSecret = hydrateOAuthResponseSecret(partialSourceUpdate.getSecretId());
// add OAuth Response data to connection configuration
partialSourceUpdate.setConnectionConfiguration(
OAuthSecretHelper.setSecretsInConnectionConfiguration(spec, hydratedSecret,
Optional.ofNullable(partialSourceUpdate.getConnectionConfiguration()).orElse(Jsons.emptyObject())));
} else {
// We aren't using a secret to update the source so no server provided credentials should have been
// passed in.
OAuthSecretHelper.validateNoSecretsInConfiguration(spec, partialSourceUpdate.getConnectionConfiguration());
}
return partialUpdateSource(partialSourceUpdate);
}

@VisibleForTesting
public SourceRead createSource(final SourceCreate sourceCreate)
throws ConfigNotFoundException, IOException, JsonValidationException {
Expand All @@ -152,6 +170,30 @@ public SourceRead createSource(final SourceCreate sourceCreate)
return buildSourceRead(configRepository.getSourceConnection(sourceId), spec);
}

public SourceRead partialUpdateSource(final PartialSourceUpdate partialSourceUpdate)
throws ConfigNotFoundException, IOException, JsonValidationException {

final UUID sourceId = partialSourceUpdate.getSourceId();
final SourceConnection updatedSource = configurationUpdate
.partialSource(sourceId, partialSourceUpdate.getName(),
partialSourceUpdate.getConnectionConfiguration());
final ConnectorSpecification spec = getSpecFromSourceId(sourceId);
validateSource(spec, updatedSource.getConfiguration());

// persist
persistSourceConnection(
updatedSource.getName(),
updatedSource.getSourceDefinitionId(),
updatedSource.getWorkspaceId(),
updatedSource.getSourceId(),
updatedSource.getTombstone(),
updatedSource.getConfiguration(),
spec);

// read configuration from db
return buildSourceRead(configRepository.getSourceConnection(sourceId), spec);
}

public SourceRead updateSource(final SourceUpdate sourceUpdate)
throws ConfigNotFoundException, IOException, JsonValidationException {

Expand Down Expand Up @@ -431,4 +473,18 @@ JsonNode hydrateOAuthResponseSecret(final String secretId) {
return Jsons.jsonNode(completeOAuthResponse.getAuthPayload());
}

@VisibleForTesting
JsonNode hydrateConnectionConfiguration(final UUID sourceDefinitionId,
final UUID workspaceId,
final String secretId,
JsonNode dehydratedConnectionConfiguration)
throws JsonValidationException, ConfigNotFoundException, IOException {
final JsonNode hydratedSecret = hydrateOAuthResponseSecret(secretId);
final ConnectorSpecification spec =
getSpecFromSourceDefinitionIdForWorkspace(sourceDefinitionId, workspaceId);
// add OAuth Response data to connection configuration

return OAuthSecretHelper.setSecretsInConnectionConfiguration(spec, hydratedSecret, dehydratedConnectionConfiguration);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,22 @@
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.server.errors.BadObjectSchemaKnownException;
import io.airbyte.oauth.MoreOAuthParameters;
import io.airbyte.persistence.job.factory.OAuthConfigSupplier;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonValidationException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import lombok.extern.slf4j.Slf4j;

/**
* Holds helpers to handle OAuth secrets.
*/
@Slf4j
public class OAuthSecretHelper {

/**
Expand All @@ -34,20 +38,17 @@ public static JsonNode setSecretsInConnectionConfiguration(final ConnectorSpecif

// Get the paths from advancedAuth that we need
final ObjectNode newConnectionConfiguration = connectionConfiguration.deepCopy();
if (OAuthConfigSupplier.hasOAuthConfigSpecification(spec)) {
final Map<String, List<String>> oAuthPaths = getOAuthConfigPaths(spec);
for (final Entry<String, List<String>> entry : oAuthPaths.entrySet()) {
// Key where we need to stuff things
final String key = entry.getKey();
final List<String> jsonPathList = entry.getValue();
final Map<String, List<String>> oauthPaths = getOAuthConfigPaths(spec);
final JsonNode flattenedSecret = MoreOAuthParameters.flattenOAuthConfig(hydratedSecret);

Jsons.setNestedValue(newConnectionConfiguration, jsonPathList, hydratedSecret.get(key));
}
return newConnectionConfiguration;
} else {
// Just merge, complete_oauth handled setting rootNode for us.
return Jsons.mergeNodes(connectionConfiguration, hydratedSecret);
for (final Entry<String, List<String>> entry : oauthPaths.entrySet()) {
// Key where we need to stuff things
final String key = entry.getKey();
final List<String> jsonPathList = entry.getValue();

Jsons.setNestedValue(newConnectionConfiguration, jsonPathList, flattenedSecret.get(key));
}
return newConnectionConfiguration;
}

/**
Expand All @@ -56,23 +57,115 @@ public static JsonNode setSecretsInConnectionConfiguration(final ConnectorSpecif
* path_in_connector_config i.e. { client_id: ['credentials', 'client_id']}
*/
@VisibleForTesting
public static Map<String, List<String>> getOAuthConfigPaths(final ConnectorSpecification connectorSpecification) throws JsonValidationException {
public static Map<String, List<String>> getAdvancedAuthOAuthPaths(final ConnectorSpecification connectorSpecification, boolean includeOutputPaths)
throws JsonValidationException {
if (OAuthConfigSupplier.hasOAuthConfigSpecification(connectorSpecification)) {
final JsonNode completeOAuthOutputSpecification =
connectorSpecification.getAdvancedAuth().getOauthConfigSpecification().getCompleteOauthOutputSpecification();
final JsonNode completeOAuthServerOutputSpecification =
connectorSpecification.getAdvancedAuth().getOauthConfigSpecification().getCompleteOauthServerOutputSpecification();

// Merge all the mappings into one map
Map<String, List<String>> result = new HashMap<>(OAuthPathExtractor.extractOauthConfigurationPaths(completeOAuthOutputSpecification));
result.putAll(OAuthPathExtractor.extractOauthConfigurationPaths(completeOAuthServerOutputSpecification));
Map<String, List<String>> result = new HashMap<>(OAuthPathExtractor.extractOauthConfigurationPaths(completeOAuthServerOutputSpecification));
if (includeOutputPaths) {
result.putAll(OAuthPathExtractor.extractOauthConfigurationPaths(completeOAuthOutputSpecification));
}
return result;
} else {
throw new JsonValidationException(
String.format("Error parsing advancedAuth - see [%s]", connectorSpecification.getDocumentationUrl()));
}
}

private static Map<String, List<String>> getLegacyOAuthConfigPaths(ConnectorSpecification spec, boolean includeOutputPaths) {
if (!includeOutputPaths) {
return getLegacyOAuthConfigPathsImpl(spec, false);
} else {
return getLegacyOAuthConfigPathsImpl(spec, true);
}
}

/**
* Get the OAuth input paths from the legacy OAuth specifications.
*
* @param spec - a connector spec with legacy OAuth data
*
* @return Map where the key = the property and the value = the path to the property in list form.
*/
private static Map<String, List<String>> getLegacyOAuthConfigPathsImpl(ConnectorSpecification spec, boolean includeOutputPaths) {

final List<String> pathList = new java.util.ArrayList<>();

if (spec.getAuthSpecification().getOauth2Specification().getRootObject().size() > 0) {
// If rootObject isn't empty, we need the first value which is the key where we'll need to set our
// OAuth constants.
// i.e. rootObject = ["credentials", 0], we know our OAuth creds will go in
// connectionConfiguration.credentials
pathList.add(spec.getAuthSpecification().getOauth2Specification().getRootObject().get(0).toString());
}

final Map<String, List<String>> keyToPaths = new HashMap<>(Collections.emptyMap());

// Aggregate all parameters we need
List<List<String>> oauthFlowParameters = spec.getAuthSpecification().getOauth2Specification().getOauthFlowInitParameters();
if (includeOutputPaths) {
oauthFlowParameters.addAll(spec.getAuthSpecification().getOauth2Specification().getOauthFlowOutputParameters());
}

for (List<String> pathKeys : oauthFlowParameters) {
// For a given auth specification, we can have multiple path keys i.e.
// oauthFlowInitParameters:
// - ["client_id"]
// - ["client_secret"]
// For each, we should add them to the pathList so we can validate they aren't set in the config.
log.debug("PATH_KEYS: {}", pathKeys);
List<String> copiedPathList = new java.util.ArrayList<>(pathList);
copiedPathList.addAll(pathKeys);
log.debug("COPIED_PATHS: {}", copiedPathList);

// I don't know if there are specs where this could be true, but no point in setting those anyway
// so doing this for safety.
if (!copiedPathList.isEmpty()) {
keyToPaths.put(copiedPathList.get(copiedPathList.size() - 1), copiedPathList);
}
}
return keyToPaths;
}

/**
* Standardizes out the return format for getting config paths whether it's a legacy OAuth spec or
* an advanced_auth one. Returns all output paths, used for setting secrets.
*
* @param spec - connector specification to get paths for
* @return Map where the key = the property and the value = the path to the property in list form.
*/
public static Map<String, List<String>> getOAuthConfigPaths(ConnectorSpecification spec) throws JsonValidationException {
if (OAuthConfigSupplier.hasOAuthConfigSpecification(spec)) {
return getAdvancedAuthOAuthPaths(spec, true);
} else if (OAuthConfigSupplier.hasLegacyOAuthConfigSpecification(spec)) {
return getLegacyOAuthConfigPaths(spec, true);
} else {
throw new IllegalStateException("No OAuth data in specification");
}
}

/**
* Like getOAuthConfigPaths but does not include the server output paths in case users need to
* change them independently. Used for validation.
*
* @param spec - connector specification to get paths for
* @return Map where the key = the property and the value = the path to the property in list form.
*/
public static Map<String, List<String>> getOAuthInputPaths(ConnectorSpecification spec) throws JsonValidationException {
if (OAuthConfigSupplier.hasOAuthConfigSpecification(spec)) {
return getAdvancedAuthOAuthPaths(spec, false);
} else if (OAuthConfigSupplier.hasLegacyOAuthConfigSpecification(spec)) {
return getLegacyOAuthConfigPaths(spec, false);
} else {
throw new IllegalStateException("No OAuth data in specification");
}
}

/**
* Get OAuth secret paths but only for the completeOauthServerOutput portion of the connector
* specification.
Expand Down Expand Up @@ -153,4 +246,33 @@ private static List<String> alternatingList(final String property,
return result;
}

/**
* Throws an exception if any property is set in the given configuration which corresponds to an
* airbyte_secret field which might be injected by the server in the spec.
*/
public static void validateNoSecretsInConfiguration(final ConnectorSpecification spec,
final JsonNode connectionConfiguration)
throws JsonValidationException {
if (OAuthConfigSupplier.hasOAuthConfigSpecification(spec) || OAuthConfigSupplier.hasLegacyOAuthConfigSpecification((spec))) {
Map<String, List<String>> oauthPaths = getOAuthInputPaths(spec);
for (final Entry<String, List<String>> entry : oauthPaths.entrySet()) {
final String key = entry.getKey();
final List<String> jsonPathList = entry.getValue();

throwIfKeyExistsInConfig(connectionConfiguration, key, jsonPathList);
}
}
}

private static void throwIfKeyExistsInConfig(JsonNode connectionConfiguration, String key, List<String> jsonPathList) {
if (Jsons.navigateTo(connectionConfiguration, jsonPathList) != null) {
// The API referenced by this message is a Cloud feature and not yet available in the open source
// project but will be added.
String errorMessage = String.format(
"Cannot set key '%s', please create an OAuth credentials override instead - https://reference.airbyte.com/reference/workspaceoauthcredentials",
key);
throw new BadObjectSchemaKnownException(errorMessage);
}
}

}
Loading

0 comments on commit 15d6505

Please sign in to comment.