-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Databricks destination: introduce personal staging location (#18557)
* Update s3 parquet writer - Rename gcsFileLocation. - Merge local variables. * Implement staging location getter * Add integration test and fix post request * Add comment and format code
- Loading branch information
Showing
5 changed files
with
148 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
5 changes: 5 additions & 0 deletions
5
airbyte-integrations/connectors/destination-databricks/sample_secrets/staging_config.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
{ | ||
"databricks_username": "<user-email>", | ||
"databricks_server_hostname": "abc-12345678-wxyz.cloud.databricks.com", | ||
"databricks_personal_access_token": "dapi0123456789abcdefghij0123456789AB" | ||
} |
95 changes: 95 additions & 0 deletions
95
.../java/io/airbyte/integrations/destination/databricks/DatabricksStagingLocationGetter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.databricks; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import io.airbyte.commons.json.Jsons; | ||
import java.io.IOException; | ||
import java.net.URI; | ||
import java.net.http.HttpClient; | ||
import java.net.http.HttpRequest; | ||
import java.net.http.HttpRequest.BodyPublishers; | ||
import java.net.http.HttpResponse; | ||
import java.time.Duration; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* Request personal staging locations from Databricks metastore. The equivalent curl command is: | ||
* | ||
* <pre> | ||
* curl --location --request POST \ | ||
* 'https://<server-host>/api/2.1/unity-catalog/temporary-stage-credentials' \ | ||
* --header 'Authorization: Bearer <personal-access-token>' \ | ||
* --form 'staging_url="stage://tmp/<username>/file.csv"' \ | ||
* --form 'operation="PUT"' \ | ||
* --form 'credential_type="PRESIGNED_URL"' | ||
* </pre> | ||
*/ | ||
public class DatabricksStagingLocationGetter { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksStagingLocationGetter.class); | ||
private static final String PERSONAL_STAGING_REQUEST_URL = "https://%s/api/2.1/unity-catalog/temporary-stage-credentials"; | ||
private static final String STAGING_URL = "stage://tmp/%s/%s"; | ||
|
||
private static final HttpClient httpClient = HttpClient.newBuilder() | ||
.version(HttpClient.Version.HTTP_2) | ||
.connectTimeout(Duration.ofSeconds(10)) | ||
.build(); | ||
|
||
private static final Map<String, String> staticRequestParams = Map.of( | ||
"operation", "PUT", | ||
"credential_type", "PRESIGNED_URL"); | ||
|
||
private static final String PARAM_STAGING_URL = "staging_url"; | ||
private static final String RESPONSE_PRESIGNED_URL = "presigned_url"; | ||
private static final String RESPONSE_URL = "url"; | ||
private static final String RESPONSE_EXPIRATION = "expiration_time"; | ||
|
||
private final String username; | ||
private final String serverHost; | ||
private final String personalAccessToken; | ||
|
||
public DatabricksStagingLocationGetter(final String username, final String serverHost, final String personalAccessToken) { | ||
this.username = username; | ||
this.serverHost = serverHost; | ||
this.personalAccessToken = personalAccessToken; | ||
} | ||
|
||
/** | ||
* @param filePath include path and filename: <path>/<filename>. | ||
* @return the pre-signed URL for the file in the personal staging location on the metastore. | ||
*/ | ||
public PreSignedUrl getPreSignedUrl(final String filePath) throws IOException, InterruptedException { | ||
final String stagingUrl = String.format(STAGING_URL, username, filePath); | ||
LOGGER.info("Requesting Databricks personal staging location for {}", stagingUrl); | ||
|
||
final Map<String, String> requestBody = new HashMap<>(staticRequestParams); | ||
requestBody.put(PARAM_STAGING_URL, stagingUrl); | ||
|
||
final HttpRequest request = HttpRequest.newBuilder() | ||
.POST(BodyPublishers.ofString(Jsons.serialize(requestBody))) | ||
.uri(URI.create(String.format(PERSONAL_STAGING_REQUEST_URL, serverHost))) | ||
.header("Authorization", "Bearer " + personalAccessToken) | ||
.build(); | ||
final HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); | ||
|
||
final JsonNode jsonResponse = Jsons.deserialize(response.body()); | ||
if (jsonResponse.has(RESPONSE_PRESIGNED_URL) && jsonResponse.get(RESPONSE_PRESIGNED_URL).has(RESPONSE_URL) && jsonResponse.get( | ||
RESPONSE_PRESIGNED_URL).has(RESPONSE_EXPIRATION)) { | ||
return new PreSignedUrl( | ||
jsonResponse.get(RESPONSE_PRESIGNED_URL).get(RESPONSE_URL).asText(), | ||
jsonResponse.get(RESPONSE_PRESIGNED_URL).get(RESPONSE_EXPIRATION).asLong()); | ||
} else { | ||
final String message = String.format("Failed to get pre-signed URL for %s: %s", stagingUrl, jsonResponse); | ||
LOGGER.error(message); | ||
throw new RuntimeException(message); | ||
} | ||
|
||
} | ||
|
||
} |
9 changes: 9 additions & 0 deletions
9
...databricks/src/main/java/io/airbyte/integrations/destination/databricks/PreSignedUrl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.databricks; | ||
|
||
public record PreSignedUrl(String url, long expirationTimeMillis) { | ||
|
||
} |
32 changes: 32 additions & 0 deletions
32
...a/io/airbyte/integrations/destination/databricks/DatabricksStagingLocationGetterTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.databricks; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertTrue; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import io.airbyte.commons.io.IOs; | ||
import io.airbyte.commons.json.Jsons; | ||
import java.io.IOException; | ||
import java.nio.file.Path; | ||
import org.junit.jupiter.api.Test; | ||
|
||
public class DatabricksStagingLocationGetterTest { | ||
|
||
private static final String SECRETS_CONFIG_JSON = "secrets/staging_config.json"; | ||
|
||
@Test | ||
public void testGetStagingLocation() throws IOException, InterruptedException { | ||
final JsonNode config = Jsons.deserialize(IOs.readFile(Path.of(SECRETS_CONFIG_JSON))); | ||
final DatabricksStagingLocationGetter stagingLocationGetter = new DatabricksStagingLocationGetter( | ||
config.get("databricks_username").asText(), | ||
config.get("databricks_server_hostname").asText(), | ||
config.get("databricks_personal_access_token").asText()); | ||
final PreSignedUrl preSignedUrl = stagingLocationGetter.getPreSignedUrl(System.currentTimeMillis() + "/test.csv"); | ||
assertTrue(preSignedUrl.url().startsWith("https://")); | ||
assertTrue(preSignedUrl.expirationTimeMillis() > 0); | ||
} | ||
|
||
} |