Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Redshift: handle empty s3 bucket_path #18434

Merged
merged 11 commits into from
Oct 26, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.50
dockerImageTag: 0.3.51
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
icon: redshift.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4400,7 +4400,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.3.50"
- dockerImage: "airbyte/destination-redshift:0.3.51"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/redshift"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
import java.io.IOException;
import java.io.PrintWriter;
Expand Down Expand Up @@ -92,19 +93,35 @@ static void attemptS3WriteAndDelete(final S3StorageOperations storageOperations,
final S3DestinationConfig s3Config,
final String bucketPath,
final AmazonS3 s3) {
final var prefix = bucketPath.endsWith("/") ? bucketPath : bucketPath + "/";
final String prefix;
if (Strings.isNullOrEmpty(bucketPath)) {
prefix = "";
} else if (bucketPath.endsWith("/")) {
prefix = bucketPath;
} else {
prefix = bucketPath + "/";
}

final String outputTableName = prefix + "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", "");
attemptWriteAndDeleteS3Object(storageOperations, s3Config, outputTableName, s3);
}

/**
* Runs some permissions checks: 1. Check whether the bucket exists; create it if not 2. Check
* whether s3://bucketName/bucketPath/ exists; create it (with empty contents) if not. (if
* bucketPath is null/empty-string, then skip this step) 3. Attempt to create and delete
* s3://bucketName/outputTableName 4. Attempt to list all objects in the bucket
*/
private static void attemptWriteAndDeleteS3Object(final S3StorageOperations storageOperations,
final S3DestinationConfig s3Config,
final String outputTableName,
final AmazonS3 s3) {
final var s3Bucket = s3Config.getBucketName();
final var bucketPath = s3Config.getBucketPath();

storageOperations.createBucketObjectIfNotExists(bucketPath);
if (!Strings.isNullOrEmpty(bucketPath)) {
storageOperations.createBucketObjectIfNotExists(bucketPath);
}
s3.putObject(s3Bucket, outputTableName, "check-content");
testIAMUserHasListObjectPermission(s3, s3Bucket);
s3.deleteObject(s3Bucket, outputTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ public String getBucketObjectPath(final String namespace, final String streamNam
.replaceAll("/+", "/"));
}

/**
* Create a directory object at the specified location. Creates the bucket if necessary.
*
* @param objectPath The directory to create. Must be a nonempty string.
*/
@Override
public void createBucketObjectIfNotExists(final String objectPath) {
final String bucket = s3Config.getBucketName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.startsWith;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -25,6 +26,8 @@ public class S3BaseChecksTest {
@BeforeEach
public void setup() {
s3Client = mock(AmazonS3.class);
when(s3Client.doesObjectExist(anyString(), eq(""))).thenThrow(new IllegalArgumentException("Object path must not be empty"));
when(s3Client.putObject(anyString(), eq(""), anyString())).thenThrow(new IllegalArgumentException("Object path must not be empty"));
}

@Test
Expand All @@ -49,4 +52,46 @@ public void attemptWriteAndDeleteS3Object_should_createSpecificFiles() {
verify(s3Client).deleteObject(eq("test_bucket"), startsWith("test/bucket/path/_airbyte_connection_test_"));
}

@Test
public void attemptWriteAndDeleteS3Object_should_skipDirectoryCreateIfRootPath() {
S3DestinationConfig config = new S3DestinationConfig(
null,
"test_bucket",
"",
null,
null,
null,
null,
s3Client);
S3StorageOperations operations = new S3StorageOperations(new S3NameTransformer(), s3Client, config);

S3BaseChecks.attemptS3WriteAndDelete(operations, config, "");

verify(s3Client, never()).putObject("test_bucket", "", "");
verify(s3Client).putObject(eq("test_bucket"), startsWith("_airbyte_connection_test_"), anyString());
verify(s3Client).listObjects(ArgumentMatchers.<ListObjectsRequest>argThat(request -> "test_bucket".equals(request.getBucketName())));
verify(s3Client).deleteObject(eq("test_bucket"), startsWith("_airbyte_connection_test_"));
}

@Test
public void attemptWriteAndDeleteS3Object_should_skipDirectoryCreateIfNullPath() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall, just noticed that there's a seemingly random _ in the method name and in the existing methods (so presume you're keeping with what's already existing). Minor nit but to keep test naming conventions, would it make more sense to have this consistent with camelCase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol this is actually my preferred testcase naming convention - imo the testWhateverMethod style is harder to read and doesn't provide much info about what the testcase is intended to do

iirc there's a few of these scattered through the codebase (which is where I picked it up from). Not strongly opinionated on this but mildly prefer to spread this style further.

S3DestinationConfig config = new S3DestinationConfig(
null,
"test_bucket",
null,
null,
null,
null,
null,
s3Client);
S3StorageOperations operations = new S3StorageOperations(new S3NameTransformer(), s3Client, config);

S3BaseChecks.attemptS3WriteAndDelete(operations, config, null);

verify(s3Client, never()).putObject("test_bucket", "", "");
verify(s3Client).putObject(eq("test_bucket"), startsWith("_airbyte_connection_test_"), anyString());
verify(s3Client).listObjects(ArgumentMatchers.<ListObjectsRequest>argThat(request -> "test_bucket".equals(request.getBucketName())));
verify(s3Client).deleteObject(eq("test_bucket"), startsWith("_airbyte_connection_test_"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.50
LABEL io.airbyte.version=0.3.51
LABEL io.airbyte.name=airbyte/destination-redshift
5 changes: 3 additions & 2 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.3.51 | 2022-10-26 | [\#18434](https://github.com/airbytehq/airbyte/pull/18434) | Fix empty S3 bucket path handling |
| 0.3.50 | 2022-09-14 | [\#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage |
| 0.3.49 | 2022-09-01 | [\#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) |
| 0.3.48 | 2022-09-01 | | Added JDBC URL params |
Expand All @@ -151,8 +152,8 @@ Each stream will be output into its own raw table in Redshift. Each table will c
| 0.3.43 | 2022-06-24 | [\#13690](https://github.com/airbytehq/airbyte/pull/13690) | Improved discovery for NOT SUPER column |
| 0.3.42 | 2022-06-21 | [\#14013](https://github.com/airbytehq/airbyte/pull/14013) | Add an option to use encryption with staging in Redshift Destination |
| 0.3.40 | 2022-06-17 | [\#13753](https://github.com/airbytehq/airbyte/pull/13753) | Deprecate and remove PART_SIZE_MB fields from connectors based on StreamTransferManager |
| 0.3.39 | 2022-06-02 | [13415](https://github.com/airbytehq/airbyte/pull/13415) | Add dropdown to select Uploading Method. <br /> **PLEASE NOTICE**: After this update your **uploading method** will be set to **Standard**, you will need to reconfigure the method to use **S3 Staging** again. |
| 0.3.37 | 2022-05-23 | [13090](https://github.com/airbytehq/airbyte/pull/13090) | Removed redshiftDataTmpTableMode. Some refactoring. |
| 0.3.39 | 2022-06-02 | [13415](https://github.com/airbytehq/airbyte/pull/13415) | Add dropdown to select Uploading Method. <br /> **PLEASE NOTICE**: After this update your **uploading method** will be set to **Standard**, you will need to reconfigure the method to use **S3 Staging** again. |
| 0.3.37 | 2022-05-23 | [13090](https://github.com/airbytehq/airbyte/pull/13090) | Removed redshiftDataTmpTableMode. Some refactoring. |
| 0.3.36 | 2022-05-23 | [12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance |
| 0.3.35 | 2022-05-18 | [12940](https://github.com/airbytehq/airbyte/pull/12940) | Fixed maximum record size for SUPER type |
| 0.3.34 | 2022-05-16 | [12869](https://github.com/airbytehq/airbyte/pull/12869) | Fixed NPE in S3 staging check |
Expand Down