From fb4dbb72bd7a5124181fe225e7eccc6e8fc22266 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Tue, 25 Oct 2022 08:40:24 -0700 Subject: [PATCH 1/8] make bucket_path required --- .../connectors/destination-redshift/src/main/resources/spec.json | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json index 9028e01570de6..e409456013aab 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json @@ -81,6 +81,7 @@ "required": [ "method", "s3_bucket_name", + "s3_bucket_path", "s3_bucket_region", "access_key_id", "secret_access_key" From ddf3f1694890b2a0c36baef7940cf326a9bb5612 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Tue, 25 Oct 2022 14:04:32 -0700 Subject: [PATCH 2/8] check for nonempty path --- .../integrations/destination/s3/S3BaseChecks.java | 11 ++++++++++- .../destination/s3/S3StorageOperations.java | 5 +++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java index 1dc91a13ba8ea..e16ee1210d53e 100644 --- a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java +++ b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java @@ -101,13 +101,22 @@ static void attemptS3WriteAndDelete(final S3StorageOperations storageOperations, attemptWriteAndDeleteS3Object(storageOperations, s3Config, outputTableName, s3); } + /** + * Runs some permissions checks: + * 1. Check whether the bucket exists; create it if not + * 2. Check whether s3://bucketName://bucketPath/ exists; create it (with empty contents) if not + * 3. Attempt to create and delete s3://bucketName/outputTableName + * 4. Attempt to list all objects in the bucket + */ private static void attemptWriteAndDeleteS3Object(final S3StorageOperations storageOperations, final S3DestinationConfig s3Config, final String outputTableName, final AmazonS3 s3) { final var s3Bucket = s3Config.getBucketName(); - storageOperations.createBucketObjectIfNotExists(s3Bucket); + if (!s3Config.getBucketPath().isEmpty()) { + storageOperations.createBucketObjectIfNotExists(s3Config.getBucketPath()); + } s3.putObject(s3Bucket, outputTableName, "check-content"); testIAMUserHasListObjectPermission(s3, s3Bucket); s3.deleteObject(s3Bucket, outputTableName); diff --git a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java index a4d6370cc02d8..0c55b039e8a36 100644 --- a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java +++ b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java @@ -93,6 +93,11 @@ public String getBucketObjectPath(final String namespace, final String streamNam .replaceAll("/+", "/")); } + /** + * Create an object within this bucket with empty contents. Creates the bucket if necessary. + * + * @param objectPath The path to the object. Must be nonempty. + */ @Override public void createBucketObjectIfNotExists(final String objectPath) { final String bucket = s3Config.getBucketName(); From d1da00628bcd448fe96d82cfe737db1f1563ba33 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Tue, 25 Oct 2022 14:07:45 -0700 Subject: [PATCH 3/8] not required --- .../connectors/destination-redshift/src/main/resources/spec.json | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json index e409456013aab..9028e01570de6 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json @@ -81,7 +81,6 @@ "required": [ "method", "s3_bucket_name", - "s3_bucket_path", "s3_bucket_region", "access_key_id", "secret_access_key" From 9e0ba3b3e6e908ff3481662d205c661d92853743 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Tue, 25 Oct 2022 16:07:25 -0700 Subject: [PATCH 4/8] handle null/empty path --- .../destination/s3/S3BaseChecks.java | 13 +++++- .../destination/s3/S3StorageOperations.java | 4 +- .../destination/s3/S3BaseChecksTest.java | 44 +++++++++++++++++++ 3 files changed, 57 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java index 9bbbc658ef06c..7eb1107044859 100644 --- a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java +++ b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java @@ -9,6 +9,7 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.ListObjectsRequest; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory; import java.io.IOException; import java.io.PrintWriter; @@ -92,7 +93,15 @@ static void attemptS3WriteAndDelete(final S3StorageOperations storageOperations, final S3DestinationConfig s3Config, final String bucketPath, final AmazonS3 s3) { - final var prefix = bucketPath.endsWith("/") ? bucketPath : bucketPath + "/"; + final String prefix; + if (Strings.isNullOrEmpty(bucketPath)) { + prefix = ""; + } else if (bucketPath.endsWith("/")) { + prefix = bucketPath; + } else { + prefix = bucketPath + "/"; + } + final String outputTableName = prefix + "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""); attemptWriteAndDeleteS3Object(storageOperations, s3Config, outputTableName, s3); } @@ -111,7 +120,7 @@ private static void attemptWriteAndDeleteS3Object(final S3StorageOperations stor final var s3Bucket = s3Config.getBucketName(); final var bucketPath = s3Config.getBucketPath(); - if (!bucketPath.isEmpty()) { + if (!Strings.isNullOrEmpty(bucketPath)) { storageOperations.createBucketObjectIfNotExists(bucketPath); } s3.putObject(s3Bucket, outputTableName, "check-content"); diff --git a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java index c77b7fa239ee0..5994634351420 100644 --- a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java +++ b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java @@ -94,9 +94,9 @@ public String getBucketObjectPath(final String namespace, final String streamNam } /** - * Create an object within this bucket with empty contents. Creates the bucket if necessary. + * Create a directory object at the specified location. Creates the bucket if necessary. * - * @param objectPath The path to the object. Must be nonempty. + * @param objectPath The directory to create. Must be a nonempty string. */ @Override public void createBucketObjectIfNotExists(final String objectPath) { diff --git a/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/S3BaseChecksTest.java b/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/S3BaseChecksTest.java index 69828a08e466b..16777e02c594d 100644 --- a/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/S3BaseChecksTest.java +++ b/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/S3BaseChecksTest.java @@ -8,6 +8,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.startsWith; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -25,6 +26,8 @@ public class S3BaseChecksTest { @BeforeEach public void setup() { s3Client = mock(AmazonS3.class); + when(s3Client.doesObjectExist(anyString(), eq(""))).thenThrow(new IllegalArgumentException("Object path must not be empty")); + when(s3Client.putObject(anyString(), eq(""), anyString())).thenThrow(new IllegalArgumentException("Object path must not be empty")); } @Test @@ -49,4 +52,45 @@ public void attemptWriteAndDeleteS3Object_should_createSpecificFiles() { verify(s3Client).deleteObject(eq("test_bucket"), startsWith("test/bucket/path/_airbyte_connection_test_")); } + @Test + public void attemptWriteAndDeleteS3Object_should_skipDirectoryCreateIfRootPath() { + S3DestinationConfig config = new S3DestinationConfig( + null, + "test_bucket", + "", + null, + null, + null, + null, + s3Client); + S3StorageOperations operations = new S3StorageOperations(new S3NameTransformer(), s3Client, config); + + S3BaseChecks.attemptS3WriteAndDelete(operations, config, ""); + + verify(s3Client, never()).putObject("test_bucket", "", ""); + verify(s3Client).putObject(eq("test_bucket"), startsWith("_airbyte_connection_test_"), anyString()); + verify(s3Client).listObjects(ArgumentMatchers.argThat(request -> "test_bucket".equals(request.getBucketName()))); + verify(s3Client).deleteObject(eq("test_bucket"), startsWith("_airbyte_connection_test_")); + } + + @Test + public void attemptWriteAndDeleteS3Object_should_skipDirectoryCreateIfNullPath() { + S3DestinationConfig config = new S3DestinationConfig( + null, + "test_bucket", + null, + null, + null, + null, + null, + s3Client); + S3StorageOperations operations = new S3StorageOperations(new S3NameTransformer(), s3Client, config); + + S3BaseChecks.attemptS3WriteAndDelete(operations, config, null); + + verify(s3Client, never()).putObject("test_bucket", "", ""); + verify(s3Client).putObject(eq("test_bucket"), startsWith("_airbyte_connection_test_"), anyString()); + verify(s3Client).listObjects(ArgumentMatchers.argThat(request -> "test_bucket".equals(request.getBucketName()))); + verify(s3Client).deleteObject(eq("test_bucket"), startsWith("_airbyte_connection_test_")); + } } From cfc3484e9356b138b1d24250f4f084d9448862e9 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 26 Oct 2022 07:59:22 -0700 Subject: [PATCH 5/8] version bump + changelog --- .../connectors/destination-redshift/Dockerfile | 2 +- docs/integrations/destinations/redshift.md | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/destination-redshift/Dockerfile b/airbyte-integrations/connectors/destination-redshift/Dockerfile index adbab7cd3ef60..cc49956af5b64 100644 --- a/airbyte-integrations/connectors/destination-redshift/Dockerfile +++ b/airbyte-integrations/connectors/destination-redshift/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.3.50 +LABEL io.airbyte.version=0.3.51 LABEL io.airbyte.name=airbyte/destination-redshift diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index c7de6d268eb42..16953660f04a6 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -141,6 +141,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.3.51 | 2022-10-26 | [\#18434](https://github.com/airbytehq/airbyte/pull/18434) | Fix empty S3 bucket path handling | | 0.3.50 | 2022-09-14 | [\#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage | | 0.3.49 | 2022-09-01 | [\#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) | | 0.3.48 | 2022-09-01 | | Added JDBC URL params | @@ -151,8 +152,8 @@ Each stream will be output into its own raw table in Redshift. Each table will c | 0.3.43 | 2022-06-24 | [\#13690](https://github.com/airbytehq/airbyte/pull/13690) | Improved discovery for NOT SUPER column | | 0.3.42 | 2022-06-21 | [\#14013](https://github.com/airbytehq/airbyte/pull/14013) | Add an option to use encryption with staging in Redshift Destination | | 0.3.40 | 2022-06-17 | [\#13753](https://github.com/airbytehq/airbyte/pull/13753) | Deprecate and remove PART_SIZE_MB fields from connectors based on StreamTransferManager | -| 0.3.39 | 2022-06-02 | [13415](https://github.com/airbytehq/airbyte/pull/13415) | Add dropdown to select Uploading Method.
**PLEASE NOTICE**: After this update your **uploading method** will be set to **Standard**, you will need to reconfigure the method to use **S3 Staging** again. | -| 0.3.37 | 2022-05-23 | [13090](https://github.com/airbytehq/airbyte/pull/13090) | Removed redshiftDataTmpTableMode. Some refactoring. | +| 0.3.39 | 2022-06-02 | [13415](https://github.com/airbytehq/airbyte/pull/13415) | Add dropdown to select Uploading Method.
**PLEASE NOTICE**: After this update your **uploading method** will be set to **Standard**, you will need to reconfigure the method to use **S3 Staging** again. | +| 0.3.37 | 2022-05-23 | [13090](https://github.com/airbytehq/airbyte/pull/13090) | Removed redshiftDataTmpTableMode. Some refactoring. | | 0.3.36 | 2022-05-23 | [12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance | | 0.3.35 | 2022-05-18 | [12940](https://github.com/airbytehq/airbyte/pull/12940) | Fixed maximum record size for SUPER type | | 0.3.34 | 2022-05-16 | [12869](https://github.com/airbytehq/airbyte/pull/12869) | Fixed NPE in S3 staging check | From b8b8c7108df4367a390d9fcc2c96c99275828e8e Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 26 Oct 2022 08:05:41 -0700 Subject: [PATCH 6/8] fix comment --- .../io/airbyte/integrations/destination/s3/S3BaseChecks.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java index 7eb1107044859..9a481f3e3eecc 100644 --- a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java +++ b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java @@ -109,7 +109,8 @@ static void attemptS3WriteAndDelete(final S3StorageOperations storageOperations, /** * Runs some permissions checks: * 1. Check whether the bucket exists; create it if not - * 2. Check whether s3://bucketName://bucketPath/ exists; create it (with empty contents) if not + * 2. Check whether s3://bucketName/bucketPath/ exists; create it (with empty contents) if not. + * (if bucketPath is null/empty-string, then skip this step) * 3. Attempt to create and delete s3://bucketName/outputTableName * 4. Attempt to list all objects in the bucket */ From 84e45393dd80ddbeb02ef1b6d7a05501cc9da8cd Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Wed, 26 Oct 2022 16:01:02 +0000 Subject: [PATCH 7/8] auto-bump connector version --- .../init/src/main/resources/seed/destination_definitions.yaml | 2 +- .../init/src/main/resources/seed/destination_specs.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index eafa3ec2441fd..358cd78174dc6 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -238,7 +238,7 @@ - name: Redshift destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc dockerRepository: airbyte/destination-redshift - dockerImageTag: 0.3.50 + dockerImageTag: 0.3.51 documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift icon: redshift.svg resourceRequirements: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 1186afc086a8d..de3a3fcfa0b03 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -4400,7 +4400,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-redshift:0.3.50" +- dockerImage: "airbyte/destination-redshift:0.3.51" spec: documentationUrl: "https://docs.airbyte.com/integrations/destinations/redshift" connectionSpecification: From 87c0a338caae0c174fa1aef731c9178b18953761 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 26 Oct 2022 09:53:55 -0700 Subject: [PATCH 8/8] format --- .../integrations/destination/s3/S3BaseChecks.java | 10 ++++------ .../integrations/destination/s3/S3BaseChecksTest.java | 1 + 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java index 9a481f3e3eecc..b5f2037e842b7 100644 --- a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java +++ b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3BaseChecks.java @@ -107,12 +107,10 @@ static void attemptS3WriteAndDelete(final S3StorageOperations storageOperations, } /** - * Runs some permissions checks: - * 1. Check whether the bucket exists; create it if not - * 2. Check whether s3://bucketName/bucketPath/ exists; create it (with empty contents) if not. - * (if bucketPath is null/empty-string, then skip this step) - * 3. Attempt to create and delete s3://bucketName/outputTableName - * 4. Attempt to list all objects in the bucket + * Runs some permissions checks: 1. Check whether the bucket exists; create it if not 2. Check + * whether s3://bucketName/bucketPath/ exists; create it (with empty contents) if not. (if + * bucketPath is null/empty-string, then skip this step) 3. Attempt to create and delete + * s3://bucketName/outputTableName 4. Attempt to list all objects in the bucket */ private static void attemptWriteAndDeleteS3Object(final S3StorageOperations storageOperations, final S3DestinationConfig s3Config, diff --git a/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/S3BaseChecksTest.java b/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/S3BaseChecksTest.java index 16777e02c594d..6f08faa683e30 100644 --- a/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/S3BaseChecksTest.java +++ b/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/S3BaseChecksTest.java @@ -93,4 +93,5 @@ public void attemptWriteAndDeleteS3Object_should_skipDirectoryCreateIfNullPath() verify(s3Client).listObjects(ArgumentMatchers.argThat(request -> "test_bucket".equals(request.getBucketName()))); verify(s3Client).deleteObject(eq("test_bucket"), startsWith("_airbyte_connection_test_")); } + }