From da34befeea40bcc65fbf4b428ebf1bbb6ad2eb42 Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Mon, 13 Sep 2021 17:37:53 +0200 Subject: [PATCH] Implement Google Analytics & Google Ads OAuth Flow (#5911) Co-authored-by: Sherif Nada --- airbyte-api/src/main/openapi/config.yaml | 110 ++++---- .../source/bigquery/BigQuerySource.java | 6 - airbyte-oauth/build.gradle | 6 +- .../io/airbyte/oauth/MoreOAuthParameters.java | 60 +++++ .../oauth/OAuthFlowImplementation.java | 12 +- .../oauth/OAuthImplementationFactory.java | 16 +- .../oauth/google/GoogleAdsOauthFlow.java | 48 ++++ .../google/GoogleAnalyticsOauthFlow.java | 35 +++ .../airbyte/oauth/google/GoogleOAuthFlow.java | 249 ++++++++++++++++++ .../GoogleOAuthFlowIntegrationTest.java | 186 +++++++++++++ .../oauth/google/GoogleOAuthFlowTest.java | 204 ++++++++++++++ airbyte-scheduler/persistence/build.gradle | 1 + .../job_factory/OAuthConfigSupplier.java | 50 ++-- .../job_factory/OAuthConfigSupplierTest.java | 100 ++++++- .../airbyte/server/handlers/OAuthHandler.java | 28 +- .../api/generated-api-html/index.html | 12 +- 16 files changed, 1032 insertions(+), 91 deletions(-) create mode 100644 airbyte-oauth/src/main/java/io/airbyte/oauth/MoreOAuthParameters.java create mode 100644 airbyte-oauth/src/main/java/io/airbyte/oauth/google/GoogleAdsOauthFlow.java create mode 100644 airbyte-oauth/src/main/java/io/airbyte/oauth/google/GoogleAnalyticsOauthFlow.java create mode 100644 airbyte-oauth/src/main/java/io/airbyte/oauth/google/GoogleOAuthFlow.java create mode 100644 airbyte-oauth/src/test-integration/java/io.airbyte.oauth/GoogleOAuthFlowIntegrationTest.java create mode 100644 airbyte-oauth/src/test/java/io/airbyte/oauth/google/GoogleOAuthFlowTest.java diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index e6988e245a87..583a14909088 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -1184,6 +1184,29 @@ paths: $ref: "#/components/responses/NotFoundResponse" "422": $ref: "#/components/responses/InvalidInputResponse" + /v1/source_oauths/oauth_params/create: + post: + tags: + - oauth + summary: > + Sets instancewide variables to be used for the oauth flow when creating this source. When set, these variables will be injected + into a connector's configuration before any interaction with the connector image itself. This enables running oauth flows with + consistent variables e.g: the company's Google Ads developer_token, client_id, and client_secret without the user having to know + about these variables. + operationId: setInstancewideSourceOauthParams + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/SetInstancewideSourceOauthParamsRequestBody" + required: true + responses: + "200": + description: Successful + "400": + $ref: "#/components/responses/ExceptionResponse" + "404": + $ref: "#/components/responses/NotFoundResponse" /v1/source_oauths/get_consent_url: post: tags: @@ -1276,6 +1299,29 @@ paths: $ref: "#/components/responses/NotFoundResponse" "422": $ref: "#/components/responses/InvalidInputResponse" + /v1/destination_oauths/oauth_params/create: + post: + tags: + - oauth + summary: > + Sets instancewide variables to be used for the oauth flow when creating this destination. When set, these variables will be injected + into a connector's configuration before any interaction with the connector image itself. This enables running oauth flows with + consistent variables e.g: the company's Google Ads developer_token, client_id, and client_secret without the user having to know + about these variables. + operationId: setInstancewideDestinationOauthParams + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/SetInstancewideDestinationOauthParamsRequestBody" + required: true + responses: + "200": + description: Successful + "400": + $ref: "#/components/responses/ExceptionResponse" + "404": + $ref: "#/components/responses/NotFoundResponse" /v1/web_backend/connections/list: post: tags: @@ -1620,52 +1666,6 @@ paths: $ref: "#/components/schemas/ImportRead" "404": $ref: "#/components/responses/NotFoundResponse" - /v1/source_oauths/oauth_params/create: - post: - tags: - - oauth - summary: > - Sets instancewide variables to be used for the oauth flow when creating this source. When set, these variables will be injected - into a connector's configuration before any interaction with the connector image itself. This enables running oauth flows with - consistent variables e.g: the company's Google Ads developer_token, client_id, and client_secret without the user having to know - about these variables. - operationId: setInstancewideSourceOauthParams - requestBody: - content: - application/json: - schema: - $ref: "#/components/schemas/SetInstancewideSourceOauthParamsRequestBody" - required: true - responses: - "200": - description: Successful - "400": - $ref: "#/components/responses/ExceptionResponse" - "404": - $ref: "#/components/responses/NotFoundResponse" - /v1/destination_oauths/oauth_params/create: - post: - tags: - - oauth - summary: > - Sets instancewide variables to be used for the oauth flow when creating this destination. When set, these variables will be injected - into a connector's configuration before any interaction with the connector image itself. This enables running oauth flows with - consistent variables e.g: the company's Google Ads developer_token, client_id, and client_secret without the user having to know - about these variables. - operationId: setInstancewideDestinationOauthParams - requestBody: - content: - application/json: - schema: - $ref: "#/components/schemas/SetInstancewideDestinationOauthParamsRequestBody" - required: true - responses: - "200": - description: Successful - "400": - $ref: "#/components/responses/ExceptionResponse" - "404": - $ref: "#/components/responses/NotFoundResponse" components: securitySchemes: bearerAuth: @@ -2991,20 +2991,30 @@ components: type: object required: - sourceDefinitionId + - workspaceId + - redirectUrl properties: sourceDefinitionId: $ref: "#/components/schemas/SourceDefinitionId" workspaceId: $ref: "#/components/schemas/WorkspaceId" + redirectUrl: + description: The url to redirect to after getting the user consent + type: string DestinationOauthConsentRequest: type: object required: - destinationDefinitionId + - workspaceId + - redirectUrl properties: destinationDefinitionId: $ref: "#/components/schemas/DestinationDefinitionId" workspaceId: $ref: "#/components/schemas/WorkspaceId" + redirectUrl: + description: The url to redirect to after getting the user consent + type: string OAuthConsentRead: type: object required: @@ -3016,11 +3026,15 @@ components: type: object required: - sourceDefinitionId + - workspaceId properties: sourceDefinitionId: $ref: "#/components/schemas/SourceDefinitionId" workspaceId: $ref: "#/components/schemas/WorkspaceId" + redirectUrl: + description: When completing OAuth flow to gain an access token, some API sometimes requires to verify that the app re-send the redirectUrl that was used when consent was given. + type: string queryParams: description: The query parameters present in the redirect URL after a user granted consent e.g auth code type: object @@ -3029,11 +3043,15 @@ components: type: object required: - destinationDefinitionId + - workspaceId properties: destinationDefinitionId: $ref: "#/components/schemas/DestinationDefinitionId" workspaceId: $ref: "#/components/schemas/WorkspaceId" + redirectUrl: + description: When completing OAuth flow to gain an access token, some API sometimes requires to verify that the app re-send the redirectUrl that was used when consent was given. + type: string queryParams: description: The query parameters present in the redirect URL after a user granted consent e.g auth code type: object diff --git a/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java b/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java index ecdab23bbc0b..8d53f39de3ae 100644 --- a/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java +++ b/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java @@ -140,12 +140,6 @@ protected List>> discoverInternal(Big return result; } - @Override - protected List>> discoverInternal(BigQueryDatabase database, String schema) throws Exception { - // todo to be added - return discoverInternal(database); - } - @Override protected Map> discoverPrimaryKeys(BigQueryDatabase database, List>> tableInfos) { return Collections.emptyMap(); diff --git a/airbyte-oauth/build.gradle b/airbyte-oauth/build.gradle index 19ac6c1facd0..a5b74ae07a95 100644 --- a/airbyte-oauth/build.gradle +++ b/airbyte-oauth/build.gradle @@ -1,7 +1,11 @@ plugins { id "java-library" + id 'airbyte-integration-test-java' } dependencies { - + implementation project(':airbyte-config:models') + implementation project(':airbyte-config:persistence') + implementation project(':airbyte-json-validation') + testImplementation project(':airbyte-oauth') } diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/MoreOAuthParameters.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/MoreOAuthParameters.java new file mode 100644 index 000000000000..37c039826712 --- /dev/null +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/MoreOAuthParameters.java @@ -0,0 +1,60 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.oauth; + +import io.airbyte.config.DestinationOAuthParameter; +import io.airbyte.config.SourceOAuthParameter; +import java.util.Comparator; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Stream; + +public class MoreOAuthParameters { + + public static Optional getSourceOAuthParameter( + Stream stream, + UUID workspaceId, + UUID sourceDefinitionId) { + return stream + .filter(p -> sourceDefinitionId.equals(p.getSourceDefinitionId())) + .filter(p -> p.getWorkspaceId() == null || workspaceId.equals(p.getWorkspaceId())) + // we prefer params specific to a workspace before global ones (ie workspace is null) + .min(Comparator.comparing(SourceOAuthParameter::getWorkspaceId, Comparator.nullsLast(Comparator.naturalOrder())) + .thenComparing(SourceOAuthParameter::getOauthParameterId)); + } + + public static Optional getDestinationOAuthParameter( + Stream stream, + UUID workspaceId, + UUID destinationDefinitionId) { + return stream + .filter(p -> destinationDefinitionId.equals(p.getDestinationDefinitionId())) + .filter(p -> p.getWorkspaceId() == null || workspaceId.equals(p.getWorkspaceId())) + // we prefer params specific to a workspace before global ones (ie workspace is null) + .min(Comparator.comparing(DestinationOAuthParameter::getWorkspaceId, Comparator.nullsLast(Comparator.naturalOrder())) + .thenComparing(DestinationOAuthParameter::getOauthParameterId)); + } + +} diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthFlowImplementation.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthFlowImplementation.java index 1b114d563d3c..92cd0343fc96 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthFlowImplementation.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthFlowImplementation.java @@ -24,13 +24,21 @@ package io.airbyte.oauth; +import io.airbyte.config.persistence.ConfigNotFoundException; +import java.io.IOException; import java.util.Map; import java.util.UUID; public interface OAuthFlowImplementation { - String getConsentUrl(); + String getSourceConsentUrl(UUID workspaceId, UUID sourceDefinitionId, String redirectUrl) throws IOException, ConfigNotFoundException; - Map completeOAuth(UUID workspaceId, Map queryParams); + String getDestinationConsentUrl(UUID workspaceId, UUID destinationDefinitionId, String redirectUrl) throws IOException, ConfigNotFoundException; + + Map completeSourceOAuth(UUID workspaceId, UUID sourceDefinitionId, Map queryParams, String redirectUrl) + throws IOException, ConfigNotFoundException; + + Map completeDestinationOAuth(UUID workspaceId, UUID destinationDefinitionId, Map queryParams, String redirectUrl) + throws IOException, ConfigNotFoundException; } diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java index 1354b201de3e..645b5296b243 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java @@ -25,15 +25,23 @@ package io.airbyte.oauth; import com.google.common.collect.ImmutableMap; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.oauth.google.GoogleAdsOauthFlow; +import io.airbyte.oauth.google.GoogleAnalyticsOauthFlow; import java.util.Map; public class OAuthImplementationFactory { - static final Map OAUTH_FLOW_MAPPING = - ImmutableMap.builder() - .build(); + private final Map OAUTH_FLOW_MAPPING; - public static OAuthFlowImplementation create(String imageName) { + public OAuthImplementationFactory(ConfigRepository configRepository) { + OAUTH_FLOW_MAPPING = ImmutableMap.builder() + .put("airbyte/source-google-analytics-v4", new GoogleAnalyticsOauthFlow(configRepository)) + .put("airbyte/source-google-ads", new GoogleAdsOauthFlow(configRepository)) + .build(); + } + + public OAuthFlowImplementation create(String imageName) { if (OAUTH_FLOW_MAPPING.containsKey(imageName)) { return OAUTH_FLOW_MAPPING.get(imageName); } else { diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/google/GoogleAdsOauthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/google/GoogleAdsOauthFlow.java new file mode 100644 index 000000000000..28c79a3f8f70 --- /dev/null +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/google/GoogleAdsOauthFlow.java @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.oauth.google; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.config.persistence.ConfigRepository; + +public class GoogleAdsOauthFlow extends GoogleOAuthFlow { + + public GoogleAdsOauthFlow(ConfigRepository configRepository) { + super(configRepository, "https://www.googleapis.com/auth/adwords"); + } + + @Override + protected String getClientIdUnsafe(JsonNode config) { + // the config object containing client ID and secret is nested inside the "credentials" object + return super.getClientIdUnsafe(config.get("credentials")); + } + + @Override + protected String getClientSecretUnsafe(JsonNode config) { + // the config object containing client ID and secret is nested inside the "credentials" object + return super.getClientSecretUnsafe(config.get("credentials")); + } + +} diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/google/GoogleAnalyticsOauthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/google/GoogleAnalyticsOauthFlow.java new file mode 100644 index 000000000000..7f7ba6ce9727 --- /dev/null +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/google/GoogleAnalyticsOauthFlow.java @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.oauth.google; + +import io.airbyte.config.persistence.ConfigRepository; + +public class GoogleAnalyticsOauthFlow extends GoogleOAuthFlow { + + public GoogleAnalyticsOauthFlow(ConfigRepository configRepository) { + super(configRepository, "https://www.googleapis.com/auth/analytics.readonly"); + } + +} diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/google/GoogleOAuthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/google/GoogleOAuthFlow.java new file mode 100644 index 000000000000..ad3a6c06b8d4 --- /dev/null +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/google/GoogleOAuthFlow.java @@ -0,0 +1,249 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.oauth.google; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ConfigSchema; +import io.airbyte.config.DestinationOAuthParameter; +import io.airbyte.config.SourceOAuthParameter; +import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.oauth.MoreOAuthParameters; +import io.airbyte.oauth.OAuthFlowImplementation; +import io.airbyte.validation.json.JsonValidationException; +import java.io.IOException; +import java.net.URI; +import java.net.URLEncoder; +import java.net.http.HttpClient; +import java.net.http.HttpClient.Version; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +/** + * Following docs from https://developers.google.com/identity/protocols/oauth2/web-server + */ +public class GoogleOAuthFlow implements OAuthFlowImplementation { + + private final HttpClient httpClient; + + private final static String CONSENT_URL = "https://accounts.google.com/o/oauth2/v2/auth"; + private final static String ACCESS_TOKEN_URL = "https://oauth2.googleapis.com/token"; + + private final String scope; + private final List googleQueryParameters; + + private final ConfigRepository configRepository; + + public GoogleOAuthFlow(ConfigRepository configRepository, String scope) { + this(configRepository, scope, HttpClient.newBuilder().version(Version.HTTP_1_1).build()); + } + + @VisibleForTesting + GoogleOAuthFlow(ConfigRepository configRepository, String scope, HttpClient httpClient) { + this.configRepository = configRepository; + this.httpClient = httpClient; + this.scope = UrlEncode(scope); + this.googleQueryParameters = List.of( + String.format("scope=%s", this.scope), + "access_type=offline", + "include_granted_scopes=true", + "response_type=code", + "prompt=consent"); + } + + @Override + public String getSourceConsentUrl(UUID workspaceId, UUID sourceDefinitionId, String redirectUrl) throws IOException, ConfigNotFoundException { + final JsonNode oAuthParamConfig = getSourceOAuthParamConfig(workspaceId, sourceDefinitionId); + return getConsentUrl(sourceDefinitionId, getClientIdUnsafe(oAuthParamConfig), redirectUrl); + } + + @Override + public String getDestinationConsentUrl(UUID workspaceId, UUID destinationDefinitionId, String redirectUrl) + throws IOException, ConfigNotFoundException { + final JsonNode oAuthParamConfig = getDestinationOAuthParamConfig(workspaceId, destinationDefinitionId); + return getConsentUrl(destinationDefinitionId, getClientIdUnsafe(oAuthParamConfig), redirectUrl); + } + + private String getConsentUrl(UUID definitionId, String clientId, String redirectUrl) { + final StringBuilder result = new StringBuilder(CONSENT_URL) + .append("?"); + for (String queryParameter : googleQueryParameters) { + result.append(queryParameter).append("&"); + } + return result + // TODO state should be randomly generated, and the 2nd step of oauth should verify its value + // matches the initially generated state value + .append("state=").append(definitionId.toString()).append("&") + .append("client_id=").append(clientId).append("&") + .append("redirect_uri=").append(redirectUrl) + .toString(); + } + + @Override + public Map completeSourceOAuth(UUID workspaceId, UUID sourceDefinitionId, Map queryParams, String redirectUrl) + throws IOException, ConfigNotFoundException { + if (queryParams.containsKey("code")) { + final String code = (String) queryParams.get("code"); + final JsonNode oAuthParamConfig = getSourceOAuthParamConfig(workspaceId, sourceDefinitionId); + final String clientId = getClientIdUnsafe(oAuthParamConfig); + final String clientSecret = getClientSecretUnsafe(oAuthParamConfig); + return completeOAuthFlow(clientId, clientSecret, code, redirectUrl); + } else { + throw new IOException("Undefined 'code' from consent redirected url."); + } + } + + @Override + public Map completeDestinationOAuth(UUID workspaceId, + UUID destinationDefinitionId, + Map queryParams, + String redirectUrl) + throws IOException, ConfigNotFoundException { + if (queryParams.containsKey("code")) { + final String code = (String) queryParams.get("code"); + final JsonNode oAuthParamConfig = getDestinationOAuthParamConfig(workspaceId, destinationDefinitionId); + final String clientId = getClientIdUnsafe(oAuthParamConfig); + final String clientSecret = getClientSecretUnsafe(oAuthParamConfig); + return completeOAuthFlow(clientId, clientSecret, code, redirectUrl); + } else { + throw new IOException("Undefined 'code' from consent redirected url."); + } + } + + private Map completeOAuthFlow(String clientId, String clientSecret, String code, String redirectUrl) throws IOException { + final ImmutableMap body = new Builder() + .put("client_id", clientId) + .put("client_secret", clientSecret) + .put("code", code) + .put("grant_type", "authorization_code") + .put("redirect_uri", redirectUrl) + .build(); + final HttpRequest request = HttpRequest.newBuilder() + .POST(HttpRequest.BodyPublishers.ofString(toUrlEncodedString(body))) + .uri(URI.create(ACCESS_TOKEN_URL)) + .header("Content-Type", "application/x-www-form-urlencoded") + .build(); + final HttpResponse response; + try { + response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + var data = Jsons.deserialize(response.body()); + if (data.has("refresh_token")) { + return Map.of("refresh_token", data.get("refresh_token").asText()); + } else { + // TODO This means the response from Google did not have a refresh token and is probably a + // programming error + // handle this better + throw new IOException(String.format("Missing 'refresh_token' in query params from %s. Response: %s", ACCESS_TOKEN_URL, data)); + } + } catch (InterruptedException e) { + throw new IOException("Failed to complete Google OAuth flow", e); + } + } + + private static String toUrlEncodedString(ImmutableMap body) { + final StringBuilder result = new StringBuilder(); + for (var entry : body.entrySet()) { + if (result.length() > 0) { + result.append("&"); + } + result.append(entry.getKey()).append("=").append(entry.getValue()); + } + return result.toString(); + } + + private JsonNode getSourceOAuthParamConfig(UUID workspaceId, UUID sourceDefinitionId) throws IOException, ConfigNotFoundException { + try { + final Optional param = MoreOAuthParameters.getSourceOAuthParameter( + configRepository.listSourceOAuthParam().stream(), workspaceId, sourceDefinitionId); + if (param.isPresent()) { + return param.get().getConfiguration(); + } else { + throw new ConfigNotFoundException(ConfigSchema.SOURCE_OAUTH_PARAM, "Undefined OAuth Parameter."); + } + } catch (JsonValidationException e) { + throw new IOException("Failed to load OAuth Parameters", e); + } + } + + private JsonNode getDestinationOAuthParamConfig(UUID workspaceId, UUID destinationDefinitionId) throws IOException, ConfigNotFoundException { + try { + final Optional param = MoreOAuthParameters.getDestinationOAuthParameter( + configRepository.listDestinationOAuthParam().stream(), workspaceId, destinationDefinitionId); + if (param.isPresent()) { + return param.get().getConfiguration(); + } else { + throw new ConfigNotFoundException(ConfigSchema.DESTINATION_OAUTH_PARAM, "Undefined OAuth Parameter."); + } + } catch (JsonValidationException e) { + throw new IOException("Failed to load OAuth Parameters", e); + } + } + + private String UrlEncode(String s) { + try { + return URLEncoder.encode(s, StandardCharsets.UTF_8); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Throws an exception if the client ID cannot be extracted. Subclasses should override this to + * parse the config differently. + * + * @return + */ + protected String getClientIdUnsafe(JsonNode oauthConfig) { + if (oauthConfig.get("client_id") != null) { + return oauthConfig.get("client_id").asText(); + } else { + throw new IllegalArgumentException("Undefined parameter 'client_id' for Google OAuth Flow."); + } + } + + /** + * Throws an exception if the client secret cannot be extracted. Subclasses should override this to + * parse the config differently. + * + * @return + */ + protected String getClientSecretUnsafe(JsonNode oauthConfig) { + if (oauthConfig.get("client_secret") != null) { + return oauthConfig.get("client_secret").asText(); + } else { + throw new IllegalArgumentException("Undefined parameter 'client_secret' for Google OAuth Flow."); + } + } + +} diff --git a/airbyte-oauth/src/test-integration/java/io.airbyte.oauth/GoogleOAuthFlowIntegrationTest.java b/airbyte-oauth/src/test-integration/java/io.airbyte.oauth/GoogleOAuthFlowIntegrationTest.java new file mode 100644 index 000000000000..55ee5f99e91d --- /dev/null +++ b/airbyte-oauth/src/test-integration/java/io.airbyte.oauth/GoogleOAuthFlowIntegrationTest.java @@ -0,0 +1,186 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.oauth; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.SourceOAuthParameter; +import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.oauth.google.GoogleAnalyticsOauthFlow; +import io.airbyte.oauth.google.GoogleOAuthFlow; +import io.airbyte.validation.json.JsonValidationException; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GoogleOAuthFlowIntegrationTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(GoogleOAuthFlowIntegrationTest.class); + private static final String REDIRECT_URL = "http://localhost/code"; + private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); + + private ConfigRepository configRepository; + private GoogleOAuthFlow googleOAuthFlow; + private HttpServer server; + private ServerHandler serverHandler; + + @BeforeEach + public void setup() throws IOException { + if (!Files.exists(CREDENTIALS_PATH)) { + throw new IllegalStateException( + "Must provide path to a oauth credentials file."); + } + configRepository = mock(ConfigRepository.class); + googleOAuthFlow = new GoogleAnalyticsOauthFlow(configRepository); + + server = HttpServer.create(new InetSocketAddress(80), 0); + server.setExecutor(null); // creates a default executor + server.start(); + serverHandler = new ServerHandler("code"); + server.createContext("/code", serverHandler); + } + + @AfterEach + void tearDown() { + server.stop(1); + } + + @Test + public void testFullGoogleOAuthFlow() throws InterruptedException, ConfigNotFoundException, IOException, JsonValidationException { + int limit = 20; + final UUID workspaceId = UUID.randomUUID(); + final UUID definitionId = UUID.randomUUID(); + final String fullConfigAsString = new String(Files.readAllBytes(CREDENTIALS_PATH)); + final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString); + when(configRepository.listSourceOAuthParam()).thenReturn(List.of(new SourceOAuthParameter() + .withOauthParameterId(UUID.randomUUID()) + .withSourceDefinitionId(definitionId) + .withWorkspaceId(workspaceId) + .withConfiguration(Jsons.jsonNode(ImmutableMap.builder() + .put("client_id", credentialsJson.get("client_id").asText()) + .put("client_secret", credentialsJson.get("client_secret").asText()) + .build())))); + final String url = googleOAuthFlow.getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL); + LOGGER.info("Waiting for user consent at: {}", url); + // TODO: To automate, start a selenium job to navigate to the Consent URL and click on allowing + // access... + while (!serverHandler.isSucceeded() && limit > 0) { + Thread.sleep(1000); + limit -= 1; + } + assertTrue(serverHandler.isSucceeded(), "Failed to get User consent on time"); + final Map params = googleOAuthFlow.completeSourceOAuth(workspaceId, definitionId, + Map.of("code", serverHandler.getParamValue()), REDIRECT_URL); + LOGGER.info("Response from completing OAuth Flow is: {}", params.toString()); + assertTrue(params.containsKey("refresh_token")); + assertTrue(params.get("refresh_token").toString().length() > 0); + } + + static class ServerHandler implements HttpHandler { + + final private String expectedParam; + private String paramValue; + private boolean succeeded; + + public ServerHandler(String expectedParam) { + this.expectedParam = expectedParam; + this.paramValue = ""; + this.succeeded = false; + } + + public boolean isSucceeded() { + return succeeded; + } + + public String getParamValue() { + return paramValue; + } + + @Override + public void handle(HttpExchange t) { + final String query = t.getRequestURI().getQuery(); + LOGGER.info("Received query: '{}'", query); + final Map data; + try { + data = deserialize(query); + final String response; + if (data != null && data.containsKey(expectedParam)) { + paramValue = data.get(expectedParam); + response = String.format("Successfully extracted %s:\n'%s'\nTest should be continuing the OAuth Flow to retrieve the refresh_token...", + expectedParam, paramValue); + LOGGER.info(response); + t.sendResponseHeaders(200, response.length()); + succeeded = true; + } else { + response = String.format("Unable to parse query params from redirected url: %s", query); + t.sendResponseHeaders(500, response.length()); + } + final OutputStream os = t.getResponseBody(); + os.write(response.getBytes()); + os.close(); + } catch (RuntimeException | IOException e) { + LOGGER.error("Failed to parse from body {}", query, e); + } + } + + private static Map deserialize(String query) { + if (query == null) { + return null; + } + final Map result = new HashMap<>(); + for (String param : query.split("&")) { + String[] entry = param.split("="); + if (entry.length > 1) { + result.put(entry[0], entry[1]); + } else { + result.put(entry[0], ""); + } + } + return result; + } + + } + +} diff --git a/airbyte-oauth/src/test/java/io/airbyte/oauth/google/GoogleOAuthFlowTest.java b/airbyte-oauth/src/test/java/io/airbyte/oauth/google/GoogleOAuthFlowTest.java new file mode 100644 index 000000000000..50d5319d5372 --- /dev/null +++ b/airbyte-oauth/src/test/java/io/airbyte/oauth/google/GoogleOAuthFlowTest.java @@ -0,0 +1,204 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.oauth.google; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.DestinationOAuthParameter; +import io.airbyte.config.SourceOAuthParameter; +import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.validation.json.JsonValidationException; +import java.io.IOException; +import java.net.http.HttpClient; +import java.net.http.HttpResponse; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GoogleOAuthFlowTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(GoogleOAuthFlowTest.class); + private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); + private static final String REDIRECT_URL = "https%3A//airbyte.io"; + private static final String SCOPE = "https%3A//www.googleapis.com/auth/analytics.readonly"; + + private HttpClient httpClient; + private ConfigRepository configRepository; + private GoogleOAuthFlow googleOAuthFlow; + + private UUID workspaceId; + private UUID definitionId; + + @BeforeEach + public void setup() { + httpClient = mock(HttpClient.class); + configRepository = mock(ConfigRepository.class); + googleOAuthFlow = new GoogleOAuthFlow(configRepository, SCOPE, httpClient); + + workspaceId = UUID.randomUUID(); + definitionId = UUID.randomUUID(); + } + + @Test + public void testGetConsentUrlEmptyOAuthParameters() { + assertThrows(ConfigNotFoundException.class, () -> googleOAuthFlow.getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL)); + assertThrows(ConfigNotFoundException.class, () -> googleOAuthFlow.getDestinationConsentUrl(workspaceId, definitionId, REDIRECT_URL)); + } + + @Test + public void testGetConsentUrlIncompleteOAuthParameters() throws IOException, JsonValidationException { + when(configRepository.listSourceOAuthParam()).thenReturn(List.of(new SourceOAuthParameter() + .withOauthParameterId(UUID.randomUUID()) + .withSourceDefinitionId(definitionId) + .withWorkspaceId(workspaceId) + .withConfiguration(Jsons.emptyObject()))); + when(configRepository.listDestinationOAuthParam()).thenReturn(List.of(new DestinationOAuthParameter() + .withOauthParameterId(UUID.randomUUID()) + .withDestinationDefinitionId(definitionId) + .withWorkspaceId(workspaceId) + .withConfiguration(Jsons.emptyObject()))); + assertThrows(IllegalArgumentException.class, () -> googleOAuthFlow.getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL)); + assertThrows(IllegalArgumentException.class, () -> googleOAuthFlow.getDestinationConsentUrl(workspaceId, definitionId, REDIRECT_URL)); + } + + @Test + public void testGetSourceConsentUrl() throws IOException, ConfigNotFoundException, JsonValidationException { + when(configRepository.listSourceOAuthParam()).thenReturn(List.of(new SourceOAuthParameter() + .withOauthParameterId(UUID.randomUUID()) + .withSourceDefinitionId(definitionId) + .withWorkspaceId(workspaceId) + .withConfiguration(Jsons.jsonNode(ImmutableMap.builder() + .put("client_id", getClientId()) + .build())))); + final String actualSourceUrl = googleOAuthFlow.getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL); + final String expectedSourceUrl = String.format( + "https://accounts.google.com/o/oauth2/v2/auth?scope=%s&access_type=offline&include_granted_scopes=true&response_type=code&prompt=consent&state=%s&client_id=%s&redirect_uri=%s", + SCOPE, + definitionId, + getClientId(), + REDIRECT_URL); + LOGGER.info(expectedSourceUrl); + assertEquals(expectedSourceUrl, actualSourceUrl); + } + + @Test + public void testGetDestinationConsentUrl() throws IOException, ConfigNotFoundException, JsonValidationException { + when(configRepository.listDestinationOAuthParam()).thenReturn(List.of(new DestinationOAuthParameter() + .withOauthParameterId(UUID.randomUUID()) + .withDestinationDefinitionId(definitionId) + .withWorkspaceId(workspaceId) + .withConfiguration(Jsons.jsonNode(ImmutableMap.builder() + .put("client_id", getClientId()) + .build())))); + final String actualDestinationUrl = googleOAuthFlow.getDestinationConsentUrl(workspaceId, definitionId, REDIRECT_URL); + final String expectedDestinationUrl = String.format( + "https://accounts.google.com/o/oauth2/v2/auth?scope=%s&access_type=offline&include_granted_scopes=true&response_type=code&prompt=consent&state=%s&client_id=%s&redirect_uri=%s", + SCOPE, + definitionId, + getClientId(), + REDIRECT_URL); + LOGGER.info(expectedDestinationUrl); + assertEquals(expectedDestinationUrl, actualDestinationUrl); + } + + @Test + public void testCompleteOAuthMissingCode() throws IOException, ConfigNotFoundException, JsonValidationException { + when(configRepository.listSourceOAuthParam()).thenReturn(List.of(new SourceOAuthParameter() + .withOauthParameterId(UUID.randomUUID()) + .withSourceDefinitionId(definitionId) + .withWorkspaceId(workspaceId) + .withConfiguration(Jsons.jsonNode(ImmutableMap.builder() + .put("client_id", getClientId()) + .put("client_secret", "test_client_secret") + .build())))); + final Map queryParams = Map.of(); + assertThrows(IOException.class, () -> googleOAuthFlow.completeSourceOAuth(workspaceId, definitionId, queryParams, REDIRECT_URL)); + } + + @Test + public void testCompleteSourceOAuth() throws IOException, ConfigNotFoundException, JsonValidationException, InterruptedException { + when(configRepository.listSourceOAuthParam()).thenReturn(List.of(new SourceOAuthParameter() + .withOauthParameterId(UUID.randomUUID()) + .withSourceDefinitionId(definitionId) + .withWorkspaceId(workspaceId) + .withConfiguration(Jsons.jsonNode(ImmutableMap.builder() + .put("client_id", getClientId()) + .put("client_secret", "test_client_secret") + .build())))); + final String expectedQueryParams = Jsons.serialize(Map.of( + "refresh_token", "refresh_token_response")); + final HttpResponse response = mock(HttpResponse.class); + when(response.body()).thenReturn(expectedQueryParams); + when(httpClient.send(any(), any())).thenReturn(response); + final Map queryParams = Map.of("code", "test_code"); + final Map actualQueryParams = googleOAuthFlow.completeSourceOAuth(workspaceId, definitionId, queryParams, REDIRECT_URL); + assertEquals(expectedQueryParams, Jsons.serialize(actualQueryParams)); + } + + @Test + public void testCompleteDestinationOAuth() throws IOException, ConfigNotFoundException, JsonValidationException, InterruptedException { + when(configRepository.listDestinationOAuthParam()).thenReturn(List.of(new DestinationOAuthParameter() + .withOauthParameterId(UUID.randomUUID()) + .withDestinationDefinitionId(definitionId) + .withWorkspaceId(workspaceId) + .withConfiguration(Jsons.jsonNode(ImmutableMap.builder() + .put("client_id", getClientId()) + .put("client_secret", "test_client_secret") + .build())))); + final String expectedQueryParams = Jsons.serialize(Map.of( + "refresh_token", "refresh_token_response")); + final HttpResponse response = mock(HttpResponse.class); + when(response.body()).thenReturn(expectedQueryParams); + when(httpClient.send(any(), any())).thenReturn(response); + final Map queryParams = Map.of("code", "test_code"); + final Map actualQueryParams = googleOAuthFlow.completeDestinationOAuth(workspaceId, definitionId, queryParams, REDIRECT_URL); + assertEquals(expectedQueryParams, Jsons.serialize(actualQueryParams)); + } + + private String getClientId() throws IOException { + if (!Files.exists(CREDENTIALS_PATH)) { + return "test_client_id"; + } else { + final String fullConfigAsString = new String(Files.readAllBytes(CREDENTIALS_PATH)); + final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString); + return credentialsJson.get("client_id").asText(); + } + } + +} diff --git a/airbyte-scheduler/persistence/build.gradle b/airbyte-scheduler/persistence/build.gradle index 042d37171223..6aef6f9ba65a 100644 --- a/airbyte-scheduler/persistence/build.gradle +++ b/airbyte-scheduler/persistence/build.gradle @@ -10,6 +10,7 @@ dependencies { implementation project(':airbyte-db:lib') implementation project(':airbyte-json-validation') implementation project(':airbyte-notification') + implementation project(':airbyte-oauth') implementation project(':airbyte-protocol:models') implementation project(':airbyte-scheduler:models') diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplier.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplier.java index afe532ec0189..6f47d1f75a4a 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplier.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplier.java @@ -24,16 +24,17 @@ package io.airbyte.scheduler.persistence.job_factory; +import static com.fasterxml.jackson.databind.node.JsonNodeType.OBJECT; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import io.airbyte.commons.json.Jsons; -import io.airbyte.config.DestinationOAuthParameter; -import io.airbyte.config.SourceOAuthParameter; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.oauth.MoreOAuthParameters; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; -import java.util.Comparator; import java.util.UUID; public class OAuthConfigSupplier { @@ -50,12 +51,9 @@ public OAuthConfigSupplier(ConfigRepository configRepository, boolean maskSecret public JsonNode injectSourceOAuthParameters(UUID sourceDefinitionId, UUID workspaceId, JsonNode sourceConnectorConfig) throws IOException { try { - configRepository.listSourceOAuthParam().stream() - .filter(p -> sourceDefinitionId.equals(p.getSourceDefinitionId())) - .filter(p -> p.getWorkspaceId() == null || workspaceId.equals(p.getWorkspaceId())) - // we prefer params specific to a workspace before global ones (ie workspace is null) - .min(Comparator.comparing(SourceOAuthParameter::getWorkspaceId, Comparator.nullsLast(Comparator.naturalOrder())) - .thenComparing(SourceOAuthParameter::getOauthParameterId)) + // TODO there will be cases where we shouldn't write oauth params. See + // https://github.com/airbytehq/airbyte/issues/5989 + MoreOAuthParameters.getSourceOAuthParameter(configRepository.listSourceOAuthParam().stream(), workspaceId, sourceDefinitionId) .ifPresent( sourceOAuthParameter -> injectJsonNode((ObjectNode) sourceConnectorConfig, (ObjectNode) sourceOAuthParameter.getConfiguration())); return sourceConnectorConfig; @@ -67,12 +65,7 @@ public JsonNode injectSourceOAuthParameters(UUID sourceDefinitionId, UUID worksp public JsonNode injectDestinationOAuthParameters(UUID destinationDefinitionId, UUID workspaceId, JsonNode destinationConnectorConfig) throws IOException { try { - configRepository.listDestinationOAuthParam().stream() - .filter(p -> destinationDefinitionId.equals(p.getDestinationDefinitionId())) - .filter(p -> p.getWorkspaceId() == null || workspaceId.equals(p.getWorkspaceId())) - // we prefer params specific to a workspace before global ones (ie workspace is null) - .min(Comparator.comparing(DestinationOAuthParameter::getWorkspaceId, Comparator.nullsLast(Comparator.naturalOrder())) - .thenComparing(DestinationOAuthParameter::getOauthParameterId)) + MoreOAuthParameters.getDestinationOAuthParameter(configRepository.listDestinationOAuthParam().stream(), workspaceId, destinationDefinitionId) .ifPresent(destinationOAuthParameter -> injectJsonNode((ObjectNode) destinationConnectorConfig, (ObjectNode) destinationOAuthParameter.getConfiguration())); return destinationConnectorConfig; @@ -81,15 +74,32 @@ public JsonNode injectDestinationOAuthParameters(UUID destinationDefinitionId, U } } - private void injectJsonNode(ObjectNode config, ObjectNode fromConfig) { + @VisibleForTesting + void injectJsonNode(ObjectNode mainConfig, ObjectNode fromConfig) { + // TODO this method might make sense to have as a general utility in Jsons for (String key : Jsons.keys(fromConfig)) { - if (maskSecrets) { - config.set(key, Jsons.jsonNode(SECRET_MASK)); + if (fromConfig.get(key).getNodeType() == OBJECT) { + // nested objects are merged rather than overwrite the contents of the equivalent object in config + if (mainConfig.get(key) == null) { + injectJsonNode(mainConfig.putObject(key), (ObjectNode) fromConfig.get(key)); + } else if (mainConfig.get(key).getNodeType() == OBJECT) { + injectJsonNode((ObjectNode) mainConfig.get(key), (ObjectNode) fromConfig.get(key)); + } else { + throw new IllegalStateException("Can't merge an object node into a non-object node!"); + } } else { - if (!config.has(key) || isSecretMask(config.get(key).asText())) { - config.set(key, fromConfig.get(key)); + if (maskSecrets) { + // TODO secrets should be masked with the correct type + // https://github.com/airbytehq/airbyte/issues/5990 + // In the short-term this is not world-ending as all secret fields are currently strings + mainConfig.set(key, Jsons.jsonNode(SECRET_MASK)); + } else { + if (!mainConfig.has(key) || isSecretMask(mainConfig.get(key).asText())) { + mainConfig.set(key, fromConfig.get(key)); + } } } + } } diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplierTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplierTest.java index 1e87cea74538..e82ec1e7a328 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplierTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplierTest.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeType; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; @@ -40,6 +41,7 @@ import java.util.Map; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; public class OAuthConfigSupplierTest { @@ -142,8 +144,8 @@ void testInjectMaskedOAuthParameters() throws JsonValidationException, IOExcepti assertEquals(expectedConfig, actualConfig); } - private JsonNode generateJsonConfig() { - return Jsons.jsonNode(ImmutableMap.builder() + private ObjectNode generateJsonConfig() { + return (ObjectNode) Jsons.jsonNode(ImmutableMap.builder() .put("apiSecret", "123") .put("client", "testing") .build()); @@ -156,4 +158,98 @@ private Map generateOAuthParameters() { .build(); } + private void maskAllValues(ObjectNode node) { + for (String key : Jsons.keys(node)) { + if (node.get(key).getNodeType() == JsonNodeType.OBJECT) { + maskAllValues((ObjectNode) node.get(key)); + } else { + node.set(key, Jsons.jsonNode(OAuthConfigSupplier.SECRET_MASK)); + } + } + } + + @Test + void testInjectUnnestedNode_Masked() { + OAuthConfigSupplier supplier = new OAuthConfigSupplier(configRepository, true); + ObjectNode oauthParams = (ObjectNode) Jsons.jsonNode(generateOAuthParameters()); + ObjectNode maskedOauthParams = Jsons.clone(oauthParams); + maskAllValues(maskedOauthParams); + ObjectNode actual = generateJsonConfig(); + ObjectNode expected = Jsons.clone(actual); + expected.setAll(maskedOauthParams); + + supplier.injectJsonNode(actual, oauthParams); + assertEquals(expected, actual); + } + + @Test + void testInjectUnnestedNode_Unmasked() { + OAuthConfigSupplier supplier = new OAuthConfigSupplier(configRepository, false); + ObjectNode oauthParams = (ObjectNode) Jsons.jsonNode(generateOAuthParameters()); + + ObjectNode actual = generateJsonConfig(); + ObjectNode expected = Jsons.clone(actual); + expected.setAll(oauthParams); + + supplier.injectJsonNode(actual, oauthParams); + + assertEquals(expected, actual); + } + + @Test + void testInjectNewNestedNode_Masked() { + OAuthConfigSupplier supplier = new OAuthConfigSupplier(configRepository, true); + ObjectNode oauthParams = (ObjectNode) Jsons.jsonNode(generateOAuthParameters()); + ObjectNode maskedOauthParams = Jsons.clone(oauthParams); + maskAllValues(maskedOauthParams); + ObjectNode nestedConfig = (ObjectNode) Jsons.jsonNode(ImmutableMap.builder() + .put("oauth_credentials", oauthParams) + .build()); + + // nested node does not exist in actual object + ObjectNode actual = generateJsonConfig(); + ObjectNode expected = Jsons.clone(actual); + expected.putObject("oauth_credentials").setAll(maskedOauthParams); + + supplier.injectJsonNode(actual, nestedConfig); + assertEquals(expected, actual); + } + + @Test + @DisplayName("A nested config should be inserted with the same nesting structure") + void testInjectNewNestedNode_Unmasked() { + OAuthConfigSupplier supplier = new OAuthConfigSupplier(configRepository, false); + ObjectNode oauthParams = (ObjectNode) Jsons.jsonNode(generateOAuthParameters()); + ObjectNode nestedConfig = (ObjectNode) Jsons.jsonNode(ImmutableMap.builder() + .put("oauth_credentials", oauthParams) + .build()); + + // nested node does not exist in actual object + ObjectNode actual = generateJsonConfig(); + ObjectNode expected = Jsons.clone(actual); + expected.putObject("oauth_credentials").setAll(oauthParams); + + supplier.injectJsonNode(actual, nestedConfig); + assertEquals(expected, actual); + } + + @Test + @DisplayName("A nested node which partially exists in the main config should be merged into the main config, not overwrite the whole nested object") + void testInjectedPartiallyExistingNestedNode_Unmasked() { + OAuthConfigSupplier supplier = new OAuthConfigSupplier(configRepository, false); + ObjectNode oauthParams = (ObjectNode) Jsons.jsonNode(generateOAuthParameters()); + ObjectNode nestedConfig = (ObjectNode) Jsons.jsonNode(ImmutableMap.builder() + .put("oauth_credentials", oauthParams) + .build()); + + // nested node partially exists in actual object + ObjectNode actual = generateJsonConfig(); + actual.putObject("oauth_credentials").put("irrelevant_field", "_"); + ObjectNode expected = Jsons.clone(actual); + ((ObjectNode) expected.get("oauth_credentials")).setAll(oauthParams); + + supplier.injectJsonNode(actual, nestedConfig); + assertEquals(expected, actual); + } + } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/OAuthHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/OAuthHandler.java index 597b8d43960d..c8e3bb155e18 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/OAuthHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/OAuthHandler.java @@ -48,35 +48,51 @@ public class OAuthHandler { private final ConfigRepository configRepository; + private final OAuthImplementationFactory oAuthImplementationFactory; public OAuthHandler(ConfigRepository configRepository) { this.configRepository = configRepository; + this.oAuthImplementationFactory = new OAuthImplementationFactory(configRepository); } public OAuthConsentRead getSourceOAuthConsent(SourceOauthConsentRequest sourceDefinitionIdRequestBody) throws JsonValidationException, ConfigNotFoundException, IOException { final OAuthFlowImplementation oAuthFlowImplementation = getSourceOAuthFlowImplementation(sourceDefinitionIdRequestBody.getSourceDefinitionId()); - return new OAuthConsentRead().consentUrl(oAuthFlowImplementation.getConsentUrl()); + return new OAuthConsentRead().consentUrl(oAuthFlowImplementation.getSourceConsentUrl( + sourceDefinitionIdRequestBody.getWorkspaceId(), + sourceDefinitionIdRequestBody.getSourceDefinitionId(), + sourceDefinitionIdRequestBody.getRedirectUrl())); } public OAuthConsentRead getDestinationOAuthConsent(DestinationOauthConsentRequest destinationDefinitionIdRequestBody) throws JsonValidationException, ConfigNotFoundException, IOException { final OAuthFlowImplementation oAuthFlowImplementation = getDestinationOAuthFlowImplementation(destinationDefinitionIdRequestBody.getDestinationDefinitionId()); - return new OAuthConsentRead().consentUrl(oAuthFlowImplementation.getConsentUrl()); + return new OAuthConsentRead().consentUrl(oAuthFlowImplementation.getDestinationConsentUrl( + destinationDefinitionIdRequestBody.getWorkspaceId(), + destinationDefinitionIdRequestBody.getDestinationDefinitionId(), + destinationDefinitionIdRequestBody.getRedirectUrl())); } public Map completeSourceOAuth(CompleteSourceOauthRequest oauthSourceRequestBody) throws JsonValidationException, ConfigNotFoundException, IOException { final OAuthFlowImplementation oAuthFlowImplementation = getSourceOAuthFlowImplementation(oauthSourceRequestBody.getSourceDefinitionId()); - return oAuthFlowImplementation.completeOAuth(oauthSourceRequestBody.getWorkspaceId(), oauthSourceRequestBody.getQueryParams()); + return oAuthFlowImplementation.completeSourceOAuth( + oauthSourceRequestBody.getWorkspaceId(), + oauthSourceRequestBody.getSourceDefinitionId(), + oauthSourceRequestBody.getQueryParams(), + oauthSourceRequestBody.getRedirectUrl()); } public Map completeDestinationOAuth(CompleteDestinationOAuthRequest oauthDestinationRequestBody) throws JsonValidationException, ConfigNotFoundException, IOException { final OAuthFlowImplementation oAuthFlowImplementation = getDestinationOAuthFlowImplementation(oauthDestinationRequestBody.getDestinationDefinitionId()); - return oAuthFlowImplementation.completeOAuth(oauthDestinationRequestBody.getWorkspaceId(), oauthDestinationRequestBody.getQueryParams()); + return oAuthFlowImplementation.completeDestinationOAuth( + oauthDestinationRequestBody.getWorkspaceId(), + oauthDestinationRequestBody.getDestinationDefinitionId(), + oauthDestinationRequestBody.getQueryParams(), + oauthDestinationRequestBody.getRedirectUrl()); } public void setDestinationInstancewideOauthParams(SetInstancewideDestinationOauthParamsRequestBody requestBody) @@ -100,14 +116,14 @@ private OAuthFlowImplementation getSourceOAuthFlowImplementation(UUID sourceDefi throws JsonValidationException, ConfigNotFoundException, IOException { final StandardSourceDefinition standardSourceDefinition = configRepository .getStandardSourceDefinition(sourceDefinitionId); - return OAuthImplementationFactory.create(standardSourceDefinition.getDockerRepository()); + return oAuthImplementationFactory.create(standardSourceDefinition.getDockerRepository()); } private OAuthFlowImplementation getDestinationOAuthFlowImplementation(UUID destinationDefinitionId) throws JsonValidationException, ConfigNotFoundException, IOException { final StandardDestinationDefinition standardDestinationDefinition = configRepository .getStandardDestinationDefinition(destinationDefinitionId); - return OAuthImplementationFactory.create(standardDestinationDefinition.getDockerRepository()); + return oAuthImplementationFactory.create(standardDestinationDefinition.getDockerRepository()); } } diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index 60e645113ced..882c56a17a4d 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -6197,7 +6197,8 @@

CompleteDestinationOAuthRequ
destinationDefinitionId
UUID format: uuid
-
workspaceId (optional)
UUID format: uuid
+
workspaceId
UUID format: uuid
+
redirectUrl (optional)
String When completing OAuth flow to gain an access token, some API sometimes requires to verify that the app re-send the redirectUrl that was used when consent was given.
queryParams (optional)
map[String, Object] The query parameters present in the redirect URL after a user granted consent e.g auth code
@@ -6206,7 +6207,8 @@

CompleteSourceOauthRequest
sourceDefinitionId
UUID format: uuid
-
workspaceId (optional)
UUID format: uuid
+
workspaceId
UUID format: uuid
+
redirectUrl (optional)
String When completing OAuth flow to gain an access token, some API sometimes requires to verify that the app re-send the redirectUrl that was used when consent was given.
queryParams (optional)
map[String, Object] The query parameters present in the redirect URL after a user granted consent e.g auth code
@@ -6434,7 +6436,8 @@

DestinationOauthConsentReques
destinationDefinitionId
UUID format: uuid
-
workspaceId (optional)
UUID format: uuid
+
workspaceId
UUID format: uuid
+
redirectUrl
String The url to redirect to after getting the user consent
@@ -6877,7 +6880,8 @@

SourceOauthConsentRequest -
sourceDefinitionId
UUID format: uuid
-
workspaceId (optional)
UUID format: uuid
+
workspaceId
UUID format: uuid
+
redirectUrl
String The url to redirect to after getting the user consent