From cddc2eaac8610e061f3f3ad8065fcd4aa92108e7 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 26 Oct 2022 11:04:10 -0700 Subject: [PATCH] Destination Redshift: handle empty s3 bucket_path (#18434) * make bucket_path required * check for nonempty path * not required * handle null/empty path * version bump + changelog * fix comment * auto-bump connector version * format Co-authored-by: Octavia Squidington III --- .../seed/destination_definitions.yaml | 2 +- .../resources/seed/destination_specs.yaml | 2 +- .../destination/s3/S3BaseChecks.java | 21 ++++++++- .../destination/s3/S3StorageOperations.java | 5 +++ .../destination/s3/S3BaseChecksTest.java | 45 +++++++++++++++++++ .../destination-redshift/Dockerfile | 2 +- docs/integrations/destinations/redshift.md | 5 ++- 7 files changed, 75 insertions(+), 7 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index eafa3ec2441f..358cd78174dc 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -238,7 +238,7 @@ - name: Redshift destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc dockerRepository: airbyte/destination-redshift - dockerImageTag: 0.3.50 + dockerImageTag: 0.3.51 documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift icon: redshift.svg resourceRequirements: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 1186afc086a8..de3a3fcfa0b0 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -4400,7 +4400,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-redshift:0.3.50" +- dockerImage: "airbyte/destination-redshift:0.3.51" spec: documentationUrl: "https://docs.airbyte.com/integrations/destinations/redshift" connectionSpecification: diff --git a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java index 26ae7718c9dd..b5f2037e842b 100644 --- a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java +++ b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java @@ -9,6 +9,7 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.ListObjectsRequest; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory; import java.io.IOException; import java.io.PrintWriter; @@ -92,11 +93,25 @@ static void attemptS3WriteAndDelete(final S3StorageOperations storageOperations, final S3DestinationConfig s3Config, final String bucketPath, final AmazonS3 s3) { - final var prefix = bucketPath.endsWith("/") ? bucketPath : bucketPath + "/"; + final String prefix; + if (Strings.isNullOrEmpty(bucketPath)) { + prefix = ""; + } else if (bucketPath.endsWith("/")) { + prefix = bucketPath; + } else { + prefix = bucketPath + "/"; + } + final String outputTableName = prefix + "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""); attemptWriteAndDeleteS3Object(storageOperations, s3Config, outputTableName, s3); } + /** + * Runs some permissions checks: 1. Check whether the bucket exists; create it if not 2. Check + * whether s3://bucketName/bucketPath/ exists; create it (with empty contents) if not. (if + * bucketPath is null/empty-string, then skip this step) 3. Attempt to create and delete + * s3://bucketName/outputTableName 4. Attempt to list all objects in the bucket + */ private static void attemptWriteAndDeleteS3Object(final S3StorageOperations storageOperations, final S3DestinationConfig s3Config, final String outputTableName, @@ -104,7 +119,9 @@ private static void attemptWriteAndDeleteS3Object(final S3StorageOperations stor final var s3Bucket = s3Config.getBucketName(); final var bucketPath = s3Config.getBucketPath(); - storageOperations.createBucketObjectIfNotExists(bucketPath); + if (!Strings.isNullOrEmpty(bucketPath)) { + storageOperations.createBucketObjectIfNotExists(bucketPath); + } s3.putObject(s3Bucket, outputTableName, "check-content"); testIAMUserHasListObjectPermission(s3, s3Bucket); s3.deleteObject(s3Bucket, outputTableName); diff --git a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java index ef333e649bfd..599463435142 100644 --- a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java +++ b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java @@ -93,6 +93,11 @@ public String getBucketObjectPath(final String namespace, final String streamNam .replaceAll("/+", "/")); } + /** + * Create a directory object at the specified location. Creates the bucket if necessary. + * + * @param objectPath The directory to create. Must be a nonempty string. + */ @Override public void createBucketObjectIfNotExists(final String objectPath) { final String bucket = s3Config.getBucketName(); diff --git a/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/S3BaseChecksTest.java b/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/S3BaseChecksTest.java index 69828a08e466..6f08faa683e3 100644 --- a/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/S3BaseChecksTest.java +++ b/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/S3BaseChecksTest.java @@ -8,6 +8,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.startsWith; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -25,6 +26,8 @@ public class S3BaseChecksTest { @BeforeEach public void setup() { s3Client = mock(AmazonS3.class); + when(s3Client.doesObjectExist(anyString(), eq(""))).thenThrow(new IllegalArgumentException("Object path must not be empty")); + when(s3Client.putObject(anyString(), eq(""), anyString())).thenThrow(new IllegalArgumentException("Object path must not be empty")); } @Test @@ -49,4 +52,46 @@ public void attemptWriteAndDeleteS3Object_should_createSpecificFiles() { verify(s3Client).deleteObject(eq("test_bucket"), startsWith("test/bucket/path/_airbyte_connection_test_")); } + @Test + public void attemptWriteAndDeleteS3Object_should_skipDirectoryCreateIfRootPath() { + S3DestinationConfig config = new S3DestinationConfig( + null, + "test_bucket", + "", + null, + null, + null, + null, + s3Client); + S3StorageOperations operations = new S3StorageOperations(new S3NameTransformer(), s3Client, config); + + S3BaseChecks.attemptS3WriteAndDelete(operations, config, ""); + + verify(s3Client, never()).putObject("test_bucket", "", ""); + verify(s3Client).putObject(eq("test_bucket"), startsWith("_airbyte_connection_test_"), anyString()); + verify(s3Client).listObjects(ArgumentMatchers.argThat(request -> "test_bucket".equals(request.getBucketName()))); + verify(s3Client).deleteObject(eq("test_bucket"), startsWith("_airbyte_connection_test_")); + } + + @Test + public void attemptWriteAndDeleteS3Object_should_skipDirectoryCreateIfNullPath() { + S3DestinationConfig config = new S3DestinationConfig( + null, + "test_bucket", + null, + null, + null, + null, + null, + s3Client); + S3StorageOperations operations = new S3StorageOperations(new S3NameTransformer(), s3Client, config); + + S3BaseChecks.attemptS3WriteAndDelete(operations, config, null); + + verify(s3Client, never()).putObject("test_bucket", "", ""); + verify(s3Client).putObject(eq("test_bucket"), startsWith("_airbyte_connection_test_"), anyString()); + verify(s3Client).listObjects(ArgumentMatchers.argThat(request -> "test_bucket".equals(request.getBucketName()))); + verify(s3Client).deleteObject(eq("test_bucket"), startsWith("_airbyte_connection_test_")); + } + } diff --git a/airbyte-integrations/connectors/destination-redshift/Dockerfile b/airbyte-integrations/connectors/destination-redshift/Dockerfile index adbab7cd3ef6..cc49956af5b6 100644 --- a/airbyte-integrations/connectors/destination-redshift/Dockerfile +++ b/airbyte-integrations/connectors/destination-redshift/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.3.50 +LABEL io.airbyte.version=0.3.51 LABEL io.airbyte.name=airbyte/destination-redshift diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index c7de6d268eb4..16953660f04a 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -141,6 +141,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.3.51 | 2022-10-26 | [\#18434](https://github.com/airbytehq/airbyte/pull/18434) | Fix empty S3 bucket path handling | | 0.3.50 | 2022-09-14 | [\#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage | | 0.3.49 | 2022-09-01 | [\#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) | | 0.3.48 | 2022-09-01 | | Added JDBC URL params | @@ -151,8 +152,8 @@ Each stream will be output into its own raw table in Redshift. Each table will c | 0.3.43 | 2022-06-24 | [\#13690](https://github.com/airbytehq/airbyte/pull/13690) | Improved discovery for NOT SUPER column | | 0.3.42 | 2022-06-21 | [\#14013](https://github.com/airbytehq/airbyte/pull/14013) | Add an option to use encryption with staging in Redshift Destination | | 0.3.40 | 2022-06-17 | [\#13753](https://github.com/airbytehq/airbyte/pull/13753) | Deprecate and remove PART_SIZE_MB fields from connectors based on StreamTransferManager | -| 0.3.39 | 2022-06-02 | [13415](https://github.com/airbytehq/airbyte/pull/13415) | Add dropdown to select Uploading Method.
**PLEASE NOTICE**: After this update your **uploading method** will be set to **Standard**, you will need to reconfigure the method to use **S3 Staging** again. | -| 0.3.37 | 2022-05-23 | [13090](https://github.com/airbytehq/airbyte/pull/13090) | Removed redshiftDataTmpTableMode. Some refactoring. | +| 0.3.39 | 2022-06-02 | [13415](https://github.com/airbytehq/airbyte/pull/13415) | Add dropdown to select Uploading Method.
**PLEASE NOTICE**: After this update your **uploading method** will be set to **Standard**, you will need to reconfigure the method to use **S3 Staging** again. | +| 0.3.37 | 2022-05-23 | [13090](https://github.com/airbytehq/airbyte/pull/13090) | Removed redshiftDataTmpTableMode. Some refactoring. | | 0.3.36 | 2022-05-23 | [12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance | | 0.3.35 | 2022-05-18 | [12940](https://github.com/airbytehq/airbyte/pull/12940) | Fixed maximum record size for SUPER type | | 0.3.34 | 2022-05-16 | [12869](https://github.com/airbytehq/airbyte/pull/12869) | Fixed NPE in S3 staging check |