diff --git a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java index 3104e3093e83..1a5bd5cc877d 100644 --- a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java +++ b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java @@ -40,8 +40,10 @@ public class S3ParquetWriter extends BaseS3Writer implements DestinationFileWrit private final AvroRecordFactory avroRecordFactory; private final Schema schema; private final String outputFilename; + // object key = / private final String objectKey; - private final String gcsFileLocation; + // full file path = s3://// + private final String fullFilePath; public S3ParquetWriter(final S3DestinationConfig config, final AmazonS3 s3Client, @@ -61,14 +63,10 @@ public S3ParquetWriter(final S3DestinationConfig config, .build()); objectKey = String.join("/", outputPrefix, outputFilename); + fullFilePath = String.format("s3a://%s/%s", config.getBucketName(), objectKey); + LOGGER.info("Full S3 path for stream '{}': {}", stream.getName(), fullFilePath); - LOGGER.info("Full S3 path for stream '{}': s3://{}/{}", stream.getName(), config.getBucketName(), objectKey); - gcsFileLocation = String.format("s3a://%s/%s/%s", config.getBucketName(), outputPrefix, outputFilename); - - final URI uri = new URI( - String.format("s3a://%s/%s/%s", config.getBucketName(), outputPrefix, outputFilename)); - final Path path = new Path(uri); - + final Path path = new Path(new URI(fullFilePath)); final S3ParquetFormatConfig formatConfig = (S3ParquetFormatConfig) config.getFormatConfig(); final Configuration hadoopConfig = getHadoopConfig(config); this.parquetWriter = AvroParquetWriter.builder(HadoopOutputFile.fromPath(path, hadoopConfig)) @@ -137,7 +135,7 @@ public String getOutputPath() { @Override public String getFileLocation() { - return gcsFileLocation; + return fullFilePath; } @Override diff --git a/airbyte-integrations/connectors/destination-databricks/sample_secrets/staging_config.json b/airbyte-integrations/connectors/destination-databricks/sample_secrets/staging_config.json new file mode 100644 index 000000000000..82528e383eb3 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/sample_secrets/staging_config.json @@ -0,0 +1,5 @@ +{ + "databricks_username": "", + "databricks_server_hostname": "abc-12345678-wxyz.cloud.databricks.com", + "databricks_personal_access_token": "dapi0123456789abcdefghij0123456789AB" +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStagingLocationGetter.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStagingLocationGetter.java new file mode 100644 index 000000000000..5eb2868588f2 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStagingLocationGetter.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.databricks; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpRequest.BodyPublishers; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Request personal staging locations from Databricks metastore. The equivalent curl command is: + * + *
+ * curl --location --request POST \
+ *   'https:///api/2.1/unity-catalog/temporary-stage-credentials' \
+ *   --header 'Authorization: Bearer ' \
+ *   --form 'staging_url="stage://tmp//file.csv"' \
+ *   --form 'operation="PUT"' \
+ *   --form 'credential_type="PRESIGNED_URL"'
+ * 
+ */ +public class DatabricksStagingLocationGetter { + + private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksStagingLocationGetter.class); + private static final String PERSONAL_STAGING_REQUEST_URL = "https://%s/api/2.1/unity-catalog/temporary-stage-credentials"; + private static final String STAGING_URL = "stage://tmp/%s/%s"; + + private static final HttpClient httpClient = HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_2) + .connectTimeout(Duration.ofSeconds(10)) + .build(); + + private static final Map staticRequestParams = Map.of( + "operation", "PUT", + "credential_type", "PRESIGNED_URL"); + + private static final String PARAM_STAGING_URL = "staging_url"; + private static final String RESPONSE_PRESIGNED_URL = "presigned_url"; + private static final String RESPONSE_URL = "url"; + private static final String RESPONSE_EXPIRATION = "expiration_time"; + + private final String username; + private final String serverHost; + private final String personalAccessToken; + + public DatabricksStagingLocationGetter(final String username, final String serverHost, final String personalAccessToken) { + this.username = username; + this.serverHost = serverHost; + this.personalAccessToken = personalAccessToken; + } + + /** + * @param filePath include path and filename: /. + * @return the pre-signed URL for the file in the personal staging location on the metastore. + */ + public PreSignedUrl getPreSignedUrl(final String filePath) throws IOException, InterruptedException { + final String stagingUrl = String.format(STAGING_URL, username, filePath); + LOGGER.info("Requesting Databricks personal staging location for {}", stagingUrl); + + final Map requestBody = new HashMap<>(staticRequestParams); + requestBody.put(PARAM_STAGING_URL, stagingUrl); + + final HttpRequest request = HttpRequest.newBuilder() + .POST(BodyPublishers.ofString(Jsons.serialize(requestBody))) + .uri(URI.create(String.format(PERSONAL_STAGING_REQUEST_URL, serverHost))) + .header("Authorization", "Bearer " + personalAccessToken) + .build(); + final HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + + final JsonNode jsonResponse = Jsons.deserialize(response.body()); + if (jsonResponse.has(RESPONSE_PRESIGNED_URL) && jsonResponse.get(RESPONSE_PRESIGNED_URL).has(RESPONSE_URL) && jsonResponse.get( + RESPONSE_PRESIGNED_URL).has(RESPONSE_EXPIRATION)) { + return new PreSignedUrl( + jsonResponse.get(RESPONSE_PRESIGNED_URL).get(RESPONSE_URL).asText(), + jsonResponse.get(RESPONSE_PRESIGNED_URL).get(RESPONSE_EXPIRATION).asLong()); + } else { + final String message = String.format("Failed to get pre-signed URL for %s: %s", stagingUrl, jsonResponse); + LOGGER.error(message); + throw new RuntimeException(message); + } + + } + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/PreSignedUrl.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/PreSignedUrl.java new file mode 100644 index 000000000000..f65e08301f83 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/PreSignedUrl.java @@ -0,0 +1,9 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.databricks; + +public record PreSignedUrl(String url, long expirationTimeMillis) { + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksStagingLocationGetterTest.java b/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksStagingLocationGetterTest.java new file mode 100644 index 000000000000..7afd47493414 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksStagingLocationGetterTest.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.databricks; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import java.io.IOException; +import java.nio.file.Path; +import org.junit.jupiter.api.Test; + +public class DatabricksStagingLocationGetterTest { + + private static final String SECRETS_CONFIG_JSON = "secrets/staging_config.json"; + + @Test + public void testGetStagingLocation() throws IOException, InterruptedException { + final JsonNode config = Jsons.deserialize(IOs.readFile(Path.of(SECRETS_CONFIG_JSON))); + final DatabricksStagingLocationGetter stagingLocationGetter = new DatabricksStagingLocationGetter( + config.get("databricks_username").asText(), + config.get("databricks_server_hostname").asText(), + config.get("databricks_personal_access_token").asText()); + final PreSignedUrl preSignedUrl = stagingLocationGetter.getPreSignedUrl(System.currentTimeMillis() + "/test.csv"); + assertTrue(preSignedUrl.url().startsWith("https://")); + assertTrue(preSignedUrl.expirationTimeMillis() > 0); + } + +}