Skip to content

Commit

Permalink
Databricks destination: introduce personal staging location (#18557)
Browse files Browse the repository at this point in the history
* Update s3 parquet writer

- Rename gcsFileLocation.
- Merge local variables.

* Implement staging location getter

* Add integration test and fix post request

* Add comment and format code
  • Loading branch information
tuliren authored and nataly committed Nov 3, 2022
1 parent 5dfe9d6 commit 137c8aa
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ public class S3ParquetWriter extends BaseS3Writer implements DestinationFileWrit
private final AvroRecordFactory avroRecordFactory;
private final Schema schema;
private final String outputFilename;
// object key = <path>/<output-filename>
private final String objectKey;
private final String gcsFileLocation;
// full file path = s3://<bucket>/<path>/<output-filename>
private final String fullFilePath;

public S3ParquetWriter(final S3DestinationConfig config,
final AmazonS3 s3Client,
Expand All @@ -61,14 +63,10 @@ public S3ParquetWriter(final S3DestinationConfig config,
.build());

objectKey = String.join("/", outputPrefix, outputFilename);
fullFilePath = String.format("s3a://%s/%s", config.getBucketName(), objectKey);
LOGGER.info("Full S3 path for stream '{}': {}", stream.getName(), fullFilePath);

LOGGER.info("Full S3 path for stream '{}': s3://{}/{}", stream.getName(), config.getBucketName(), objectKey);
gcsFileLocation = String.format("s3a://%s/%s/%s", config.getBucketName(), outputPrefix, outputFilename);

final URI uri = new URI(
String.format("s3a://%s/%s/%s", config.getBucketName(), outputPrefix, outputFilename));
final Path path = new Path(uri);

final Path path = new Path(new URI(fullFilePath));
final S3ParquetFormatConfig formatConfig = (S3ParquetFormatConfig) config.getFormatConfig();
final Configuration hadoopConfig = getHadoopConfig(config);
this.parquetWriter = AvroParquetWriter.<Record>builder(HadoopOutputFile.fromPath(path, hadoopConfig))
Expand Down Expand Up @@ -137,7 +135,7 @@ public String getOutputPath() {

@Override
public String getFileLocation() {
return gcsFileLocation;
return fullFilePath;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"databricks_username": "<user-email>",
"databricks_server_hostname": "abc-12345678-wxyz.cloud.databricks.com",
"databricks_personal_access_token": "dapi0123456789abcdefghij0123456789AB"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.databricks;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Request personal staging locations from Databricks metastore. The equivalent curl command is:
*
* <pre>
* curl --location --request POST \
* 'https://<server-host>/api/2.1/unity-catalog/temporary-stage-credentials' \
* --header 'Authorization: Bearer <personal-access-token>' \
* --form 'staging_url="stage://tmp/<username>/file.csv"' \
* --form 'operation="PUT"' \
* --form 'credential_type="PRESIGNED_URL"'
* </pre>
*/
public class DatabricksStagingLocationGetter {

private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksStagingLocationGetter.class);
private static final String PERSONAL_STAGING_REQUEST_URL = "https://%s/api/2.1/unity-catalog/temporary-stage-credentials";
private static final String STAGING_URL = "stage://tmp/%s/%s";

private static final HttpClient httpClient = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
.connectTimeout(Duration.ofSeconds(10))
.build();

private static final Map<String, String> staticRequestParams = Map.of(
"operation", "PUT",
"credential_type", "PRESIGNED_URL");

private static final String PARAM_STAGING_URL = "staging_url";
private static final String RESPONSE_PRESIGNED_URL = "presigned_url";
private static final String RESPONSE_URL = "url";
private static final String RESPONSE_EXPIRATION = "expiration_time";

private final String username;
private final String serverHost;
private final String personalAccessToken;

public DatabricksStagingLocationGetter(final String username, final String serverHost, final String personalAccessToken) {
this.username = username;
this.serverHost = serverHost;
this.personalAccessToken = personalAccessToken;
}

/**
* @param filePath include path and filename: <path>/<filename>.
* @return the pre-signed URL for the file in the personal staging location on the metastore.
*/
public PreSignedUrl getPreSignedUrl(final String filePath) throws IOException, InterruptedException {
final String stagingUrl = String.format(STAGING_URL, username, filePath);
LOGGER.info("Requesting Databricks personal staging location for {}", stagingUrl);

final Map<String, String> requestBody = new HashMap<>(staticRequestParams);
requestBody.put(PARAM_STAGING_URL, stagingUrl);

final HttpRequest request = HttpRequest.newBuilder()
.POST(BodyPublishers.ofString(Jsons.serialize(requestBody)))
.uri(URI.create(String.format(PERSONAL_STAGING_REQUEST_URL, serverHost)))
.header("Authorization", "Bearer " + personalAccessToken)
.build();
final HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

final JsonNode jsonResponse = Jsons.deserialize(response.body());
if (jsonResponse.has(RESPONSE_PRESIGNED_URL) && jsonResponse.get(RESPONSE_PRESIGNED_URL).has(RESPONSE_URL) && jsonResponse.get(
RESPONSE_PRESIGNED_URL).has(RESPONSE_EXPIRATION)) {
return new PreSignedUrl(
jsonResponse.get(RESPONSE_PRESIGNED_URL).get(RESPONSE_URL).asText(),
jsonResponse.get(RESPONSE_PRESIGNED_URL).get(RESPONSE_EXPIRATION).asLong());
} else {
final String message = String.format("Failed to get pre-signed URL for %s: %s", stagingUrl, jsonResponse);
LOGGER.error(message);
throw new RuntimeException(message);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.databricks;

public record PreSignedUrl(String url, long expirationTimeMillis) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.databricks;

import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import java.io.IOException;
import java.nio.file.Path;
import org.junit.jupiter.api.Test;

public class DatabricksStagingLocationGetterTest {

private static final String SECRETS_CONFIG_JSON = "secrets/staging_config.json";

@Test
public void testGetStagingLocation() throws IOException, InterruptedException {
final JsonNode config = Jsons.deserialize(IOs.readFile(Path.of(SECRETS_CONFIG_JSON)));
final DatabricksStagingLocationGetter stagingLocationGetter = new DatabricksStagingLocationGetter(
config.get("databricks_username").asText(),
config.get("databricks_server_hostname").asText(),
config.get("databricks_personal_access_token").asText());
final PreSignedUrl preSignedUrl = stagingLocationGetter.getPreSignedUrl(System.currentTimeMillis() + "/test.csv");
assertTrue(preSignedUrl.url().startsWith("https://"));
assertTrue(preSignedUrl.expirationTimeMillis() > 0);
}

}

0 comments on commit 137c8aa

Please sign in to comment.