Skip to content

Commit

Permalink
Destination Redshift: handle empty s3 bucket_path (airbytehq#18434)
Browse files Browse the repository at this point in the history
* make bucket_path required

* check for nonempty path

* not required

* handle null/empty path

* version bump + changelog

* fix comment

* auto-bump connector version

* format

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
2 people authored and jhammarstedt committed Oct 31, 2022
1 parent be89eb7 commit d717f9e
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.50
dockerImageTag: 0.3.51
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
icon: redshift.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4400,7 +4400,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.3.50"
- dockerImage: "airbyte/destination-redshift:0.3.51"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/redshift"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
import java.io.IOException;
import java.io.PrintWriter;
Expand Down Expand Up @@ -92,19 +93,35 @@ static void attemptS3WriteAndDelete(final S3StorageOperations storageOperations,
final S3DestinationConfig s3Config,
final String bucketPath,
final AmazonS3 s3) {
final var prefix = bucketPath.endsWith("/") ? bucketPath : bucketPath + "/";
final String prefix;
if (Strings.isNullOrEmpty(bucketPath)) {
prefix = "";
} else if (bucketPath.endsWith("/")) {
prefix = bucketPath;
} else {
prefix = bucketPath + "/";
}

final String outputTableName = prefix + "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", "");
attemptWriteAndDeleteS3Object(storageOperations, s3Config, outputTableName, s3);
}

/**
* Runs some permissions checks: 1. Check whether the bucket exists; create it if not 2. Check
* whether s3://bucketName/bucketPath/ exists; create it (with empty contents) if not. (if
* bucketPath is null/empty-string, then skip this step) 3. Attempt to create and delete
* s3://bucketName/outputTableName 4. Attempt to list all objects in the bucket
*/
private static void attemptWriteAndDeleteS3Object(final S3StorageOperations storageOperations,
final S3DestinationConfig s3Config,
final String outputTableName,
final AmazonS3 s3) {
final var s3Bucket = s3Config.getBucketName();
final var bucketPath = s3Config.getBucketPath();

storageOperations.createBucketObjectIfNotExists(bucketPath);
if (!Strings.isNullOrEmpty(bucketPath)) {
storageOperations.createBucketObjectIfNotExists(bucketPath);
}
s3.putObject(s3Bucket, outputTableName, "check-content");
testIAMUserHasListObjectPermission(s3, s3Bucket);
s3.deleteObject(s3Bucket, outputTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ public String getBucketObjectPath(final String namespace, final String streamNam
.replaceAll("/+", "/"));
}

/**
* Create a directory object at the specified location. Creates the bucket if necessary.
*
* @param objectPath The directory to create. Must be a nonempty string.
*/
@Override
public void createBucketObjectIfNotExists(final String objectPath) {
final String bucket = s3Config.getBucketName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.startsWith;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -25,6 +26,8 @@ public class S3BaseChecksTest {
@BeforeEach
public void setup() {
s3Client = mock(AmazonS3.class);
when(s3Client.doesObjectExist(anyString(), eq(""))).thenThrow(new IllegalArgumentException("Object path must not be empty"));
when(s3Client.putObject(anyString(), eq(""), anyString())).thenThrow(new IllegalArgumentException("Object path must not be empty"));
}

@Test
Expand All @@ -49,4 +52,46 @@ public void attemptWriteAndDeleteS3Object_should_createSpecificFiles() {
verify(s3Client).deleteObject(eq("test_bucket"), startsWith("test/bucket/path/_airbyte_connection_test_"));
}

@Test
public void attemptWriteAndDeleteS3Object_should_skipDirectoryCreateIfRootPath() {
S3DestinationConfig config = new S3DestinationConfig(
null,
"test_bucket",
"",
null,
null,
null,
null,
s3Client);
S3StorageOperations operations = new S3StorageOperations(new S3NameTransformer(), s3Client, config);

S3BaseChecks.attemptS3WriteAndDelete(operations, config, "");

verify(s3Client, never()).putObject("test_bucket", "", "");
verify(s3Client).putObject(eq("test_bucket"), startsWith("_airbyte_connection_test_"), anyString());
verify(s3Client).listObjects(ArgumentMatchers.<ListObjectsRequest>argThat(request -> "test_bucket".equals(request.getBucketName())));
verify(s3Client).deleteObject(eq("test_bucket"), startsWith("_airbyte_connection_test_"));
}

@Test
public void attemptWriteAndDeleteS3Object_should_skipDirectoryCreateIfNullPath() {
S3DestinationConfig config = new S3DestinationConfig(
null,
"test_bucket",
null,
null,
null,
null,
null,
s3Client);
S3StorageOperations operations = new S3StorageOperations(new S3NameTransformer(), s3Client, config);

S3BaseChecks.attemptS3WriteAndDelete(operations, config, null);

verify(s3Client, never()).putObject("test_bucket", "", "");
verify(s3Client).putObject(eq("test_bucket"), startsWith("_airbyte_connection_test_"), anyString());
verify(s3Client).listObjects(ArgumentMatchers.<ListObjectsRequest>argThat(request -> "test_bucket".equals(request.getBucketName())));
verify(s3Client).deleteObject(eq("test_bucket"), startsWith("_airbyte_connection_test_"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.50
LABEL io.airbyte.version=0.3.51
LABEL io.airbyte.name=airbyte/destination-redshift
5 changes: 3 additions & 2 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.3.51 | 2022-10-26 | [\#18434](https://github.com/airbytehq/airbyte/pull/18434) | Fix empty S3 bucket path handling |
| 0.3.50 | 2022-09-14 | [\#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage |
| 0.3.49 | 2022-09-01 | [\#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) |
| 0.3.48 | 2022-09-01 | | Added JDBC URL params |
Expand All @@ -151,8 +152,8 @@ Each stream will be output into its own raw table in Redshift. Each table will c
| 0.3.43 | 2022-06-24 | [\#13690](https://github.com/airbytehq/airbyte/pull/13690) | Improved discovery for NOT SUPER column |
| 0.3.42 | 2022-06-21 | [\#14013](https://github.com/airbytehq/airbyte/pull/14013) | Add an option to use encryption with staging in Redshift Destination |
| 0.3.40 | 2022-06-17 | [\#13753](https://github.com/airbytehq/airbyte/pull/13753) | Deprecate and remove PART_SIZE_MB fields from connectors based on StreamTransferManager |
| 0.3.39 | 2022-06-02 | [13415](https://github.com/airbytehq/airbyte/pull/13415) | Add dropdown to select Uploading Method. <br /> **PLEASE NOTICE**: After this update your **uploading method** will be set to **Standard**, you will need to reconfigure the method to use **S3 Staging** again. |
| 0.3.37 | 2022-05-23 | [13090](https://github.com/airbytehq/airbyte/pull/13090) | Removed redshiftDataTmpTableMode. Some refactoring. |
| 0.3.39 | 2022-06-02 | [13415](https://github.com/airbytehq/airbyte/pull/13415) | Add dropdown to select Uploading Method. <br /> **PLEASE NOTICE**: After this update your **uploading method** will be set to **Standard**, you will need to reconfigure the method to use **S3 Staging** again. |
| 0.3.37 | 2022-05-23 | [13090](https://github.com/airbytehq/airbyte/pull/13090) | Removed redshiftDataTmpTableMode. Some refactoring. |
| 0.3.36 | 2022-05-23 | [12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance |
| 0.3.35 | 2022-05-18 | [12940](https://github.com/airbytehq/airbyte/pull/12940) | Fixed maximum record size for SUPER type |
| 0.3.34 | 2022-05-16 | [12869](https://github.com/airbytehq/airbyte/pull/12869) | Fixed NPE in S3 staging check |
Expand Down

0 comments on commit d717f9e

Please sign in to comment.