Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Databricks destination: introduce personal staging location #18557

Merged
merged 5 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ public class S3ParquetWriter extends BaseS3Writer implements DestinationFileWrit
private final AvroRecordFactory avroRecordFactory;
private final Schema schema;
private final String outputFilename;
// object key = <path>/<output-filename>
private final String objectKey;
private final String gcsFileLocation;
// full file path = s3://<bucket>/<path>/<output-filename>
private final String fullFilePath;

public S3ParquetWriter(final S3DestinationConfig config,
final AmazonS3 s3Client,
Expand All @@ -61,14 +63,10 @@ public S3ParquetWriter(final S3DestinationConfig config,
.build());

objectKey = String.join("/", outputPrefix, outputFilename);
fullFilePath = String.format("s3a://%s/%s", config.getBucketName(), objectKey);
LOGGER.info("Full S3 path for stream '{}': {}", stream.getName(), fullFilePath);

LOGGER.info("Full S3 path for stream '{}': s3://{}/{}", stream.getName(), config.getBucketName(), objectKey);
gcsFileLocation = String.format("s3a://%s/%s/%s", config.getBucketName(), outputPrefix, outputFilename);

final URI uri = new URI(
String.format("s3a://%s/%s/%s", config.getBucketName(), outputPrefix, outputFilename));
final Path path = new Path(uri);

final Path path = new Path(new URI(fullFilePath));
final S3ParquetFormatConfig formatConfig = (S3ParquetFormatConfig) config.getFormatConfig();
final Configuration hadoopConfig = getHadoopConfig(config);
this.parquetWriter = AvroParquetWriter.<Record>builder(HadoopOutputFile.fromPath(path, hadoopConfig))
Expand Down Expand Up @@ -137,7 +135,7 @@ public String getOutputPath() {

@Override
public String getFileLocation() {
return gcsFileLocation;
return fullFilePath;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"databricks_username": "<user-email>",
"databricks_server_hostname": "abc-12345678-wxyz.cloud.databricks.com",
"databricks_personal_access_token": "dapi0123456789abcdefghij0123456789AB"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.databricks;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Request personal staging locations from Databricks metastore. The equivalent curl command is:
*
* <pre>
* curl --location --request POST \
* 'https://<server-host>/api/2.1/unity-catalog/temporary-stage-credentials' \
* --header 'Authorization: Bearer <personal-access-token>' \
* --form 'staging_url="stage://tmp/<username>/file.csv"' \
* --form 'operation="PUT"' \
* --form 'credential_type="PRESIGNED_URL"'
* </pre>
*/
public class DatabricksStagingLocationGetter {

private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksStagingLocationGetter.class);
private static final String PERSONAL_STAGING_REQUEST_URL = "https://%s/api/2.1/unity-catalog/temporary-stage-credentials";
private static final String STAGING_URL = "stage://tmp/%s/%s";

private static final HttpClient httpClient = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
.connectTimeout(Duration.ofSeconds(10))
.build();

private static final Map<String, String> staticRequestParams = Map.of(
"operation", "PUT",
"credential_type", "PRESIGNED_URL");

private static final String PARAM_STAGING_URL = "staging_url";
private static final String RESPONSE_PRESIGNED_URL = "presigned_url";
private static final String RESPONSE_URL = "url";
private static final String RESPONSE_EXPIRATION = "expiration_time";

private final String username;
private final String serverHost;
private final String personalAccessToken;

public DatabricksStagingLocationGetter(final String username, final String serverHost, final String personalAccessToken) {
this.username = username;
this.serverHost = serverHost;
this.personalAccessToken = personalAccessToken;
}

/**
* @param filePath include path and filename: <path>/<filename>.
* @return the pre-signed URL for the file in the personal staging location on the metastore.
*/
public PreSignedUrl getPreSignedUrl(final String filePath) throws IOException, InterruptedException {
final String stagingUrl = String.format(STAGING_URL, username, filePath);
LOGGER.info("Requesting Databricks personal staging location for {}", stagingUrl);

final Map<String, String> requestBody = new HashMap<>(staticRequestParams);
requestBody.put(PARAM_STAGING_URL, stagingUrl);

final HttpRequest request = HttpRequest.newBuilder()
.POST(BodyPublishers.ofString(Jsons.serialize(requestBody)))
.uri(URI.create(String.format(PERSONAL_STAGING_REQUEST_URL, serverHost)))
.header("Authorization", "Bearer " + personalAccessToken)
.build();
final HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

final JsonNode jsonResponse = Jsons.deserialize(response.body());
if (jsonResponse.has(RESPONSE_PRESIGNED_URL) && jsonResponse.get(RESPONSE_PRESIGNED_URL).has(RESPONSE_URL) && jsonResponse.get(
RESPONSE_PRESIGNED_URL).has(RESPONSE_EXPIRATION)) {
return new PreSignedUrl(
jsonResponse.get(RESPONSE_PRESIGNED_URL).get(RESPONSE_URL).asText(),
jsonResponse.get(RESPONSE_PRESIGNED_URL).get(RESPONSE_EXPIRATION).asLong());
} else {
final String message = String.format("Failed to get pre-signed URL for %s: %s", stagingUrl, jsonResponse);
LOGGER.error(message);
throw new RuntimeException(message);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.databricks;

public record PreSignedUrl(String url, long expirationTimeMillis) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.databricks;

import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import java.io.IOException;
import java.nio.file.Path;
import org.junit.jupiter.api.Test;

public class DatabricksStagingLocationGetterTest {

private static final String SECRETS_CONFIG_JSON = "secrets/staging_config.json";

@Test
public void testGetStagingLocation() throws IOException, InterruptedException {
final JsonNode config = Jsons.deserialize(IOs.readFile(Path.of(SECRETS_CONFIG_JSON)));
final DatabricksStagingLocationGetter stagingLocationGetter = new DatabricksStagingLocationGetter(
config.get("databricks_username").asText(),
config.get("databricks_server_hostname").asText(),
config.get("databricks_personal_access_token").asText());
final PreSignedUrl preSignedUrl = stagingLocationGetter.getPreSignedUrl(System.currentTimeMillis() + "/test.csv");
assertTrue(preSignedUrl.url().startsWith("https://"));
assertTrue(preSignedUrl.expirationTimeMillis() > 0);
}

}