Skip to content

Commit

Permalink
Add a test for listObjects permission to destination-s3 connector
Browse files Browse the repository at this point in the history
* add testIAMUserHasListObjectPermission method to S3Destination
  and call this method from S3Destination::check. Method throws
  an exception if IAM user does not have listObjects permission
  on the destination bucket

* add a unit test to S3DestinationTest to verify that S3Destination::check
  fails if listObjects throws an exception

* add a unit test to S3DestinationTest to verify that S3Destination::check
  succeeds if listObjects succeeds

* Add S3DestinationConfigFactory in order to be able to mock S3 client
  used in S3Destination::check
  • Loading branch information
grishick committed Mar 4, 2022
1 parent 21a8844 commit 2d04a71
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import alex.mojaki.s3upload.MultiPartOutputStream;
import alex.mojaki.s3upload.StreamTransferManager;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.integrations.BaseConnector;
Expand All @@ -33,17 +34,28 @@
public class S3Destination extends BaseConnector implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(S3Destination.class);
private final S3DestinationConfigFactory configFactory;

public S3Destination () {
this.configFactory = new S3DestinationConfigFactory();
}

public S3Destination (final S3DestinationConfigFactory configFactory) {
this.configFactory = configFactory;
}
public static void main(final String[] args) throws Exception {
new IntegrationRunner(new S3Destination()).run(args);
}

@Override
public AirbyteConnectionStatus check(final JsonNode config) {
try {
final S3DestinationConfig destinationConfig = S3DestinationConfig.getS3DestinationConfig(config);
final S3DestinationConfig destinationConfig = this.configFactory.getS3DestinationConfig(config);
final AmazonS3 s3Client = destinationConfig.getS3Client();

// Test for listObjects permission
testIAMUserHasListObjectPermission(s3Client, destinationConfig.getBucketName());

// Test single upload (for small files) permissions
testSingleUpload(s3Client, destinationConfig.getBucketName());

Expand All @@ -60,6 +72,13 @@ public AirbyteConnectionStatus check(final JsonNode config) {
}
}

public static void testIAMUserHasListObjectPermission(final AmazonS3 s3Client, final String bucketName) {
LOGGER.info("Started testing if IAM user can call listObjects on the destination bucket");
final ListObjectsRequest request = new ListObjectsRequest().withBucketName(bucketName).withMaxKeys(1);
s3Client.listObjects(request);
LOGGER.info("Finished checking for listObjects permission");
}

public static void testSingleUpload(final AmazonS3 s3Client, final String bucketName) {
LOGGER.info("Started testing if all required credentials assigned to user for single file uploading");
final String testFile = "test_" + System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class S3DestinationConfig {
private final String secretAccessKey;
private final Integer partSize;
private final S3FormatConfig formatConfig;
private final AmazonS3 s3Client;

/**
* The part size should not matter in any use case that depends on this constructor. So the default
Expand All @@ -51,13 +52,14 @@ public S3DestinationConfig(final String endpoint,
}

public S3DestinationConfig(final String endpoint,
final String bucketName,
final String bucketPath,
final String bucketRegion,
final String accessKeyId,
final String secretAccessKey,
final Integer partSize,
final S3FormatConfig formatConfig) {
final String bucketName,
final String bucketPath,
final String bucketRegion,
final String accessKeyId,
final String secretAccessKey,
final Integer partSize,
final S3FormatConfig formatConfig,
final AmazonS3 s3Client) {
this.endpoint = endpoint;
this.bucketName = bucketName;
this.bucketPath = bucketPath;
Expand All @@ -66,6 +68,18 @@ public S3DestinationConfig(final String endpoint,
this.secretAccessKey = secretAccessKey;
this.formatConfig = formatConfig;
this.partSize = partSize;
this.s3Client = s3Client;
}

public S3DestinationConfig(final String endpoint,
final String bucketName,
final String bucketPath,
final String bucketRegion,
final String accessKeyId,
final String secretAccessKey,
final Integer partSize,
final S3FormatConfig formatConfig) {
this(endpoint, bucketName, bucketPath, bucketRegion, accessKeyId, secretAccessKey, partSize, formatConfig, null);
}

public static S3DestinationConfig getS3DestinationConfig(final JsonNode config) {
Expand Down Expand Up @@ -127,6 +141,10 @@ public S3FormatConfig getFormatConfig() {
}

public AmazonS3 getS3Client() {
if(this.s3Client !=null) {
return this.s3Client;
}

final AWSCredentials awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey);

if (accessKeyId.isEmpty() && !secretAccessKey.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.airbyte.integrations.destination.s3;

import com.fasterxml.jackson.databind.JsonNode;

public class S3DestinationConfigFactory {
public S3DestinationConfig getS3DestinationConfig(final JsonNode config) {
return S3DestinationConfig.getS3DestinationConfig(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,27 @@

package io.airbyte.integrations.destination.s3;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;

import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
Expand All @@ -20,19 +34,66 @@
public class S3DestinationTest {

private AmazonS3 s3;
private AmazonS3 s3NoAccess;
private S3DestinationConfig config;

@BeforeEach
public void setup() {
// S3 client mock for successful operations
s3 = mock(AmazonS3.class);
final InitiateMultipartUploadResult uploadResult = mock(InitiateMultipartUploadResult.class);
final UploadPartResult uploadPartResult = mock(UploadPartResult.class);
when(s3.uploadPart(any(UploadPartRequest.class))).thenReturn(uploadPartResult);
when(s3.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(uploadResult);

// S3 client mock that throws Access Denied exception for listObjects operation
s3NoAccess = mock(AmazonS3.class);
doThrow(new AmazonS3Exception("Access Denied")).when(s3NoAccess).listObjects(any(ListObjectsRequest.class));
config = new S3DestinationConfig(
"fake-endpoint",
"fake-bucket",
"fake-bucketPath",
"fake-region",
"fake-accessKeyId",
"fake-secretAccessKey",
null);
S3DestinationConfig.DEFAULT_PART_SIZE_MB, null, s3);
}

@Test
public void checksS3NoListObjectPermission() {
final S3Destination destinationFail = new S3Destination(new S3DestinationConfigFactory () {
public S3DestinationConfig getS3DestinationConfig(final JsonNode config) {
return new S3DestinationConfig(
"fake-endpoint",
"fake-bucket",
"fake-bucketPath",
"fake-region",
"fake-accessKeyId",
"fake-secretAccessKey",
S3DestinationConfig.DEFAULT_PART_SIZE_MB,
null, s3NoAccess);
}
});
AirbyteConnectionStatus status = destinationFail.check(null);
assertEquals(Status.FAILED, status.getStatus(), "Connection check should have failed");
assertTrue(status.getMessage().indexOf("Access Denied") > 0, "Connection check returned wrong failure message");

// Test that check succeeds when IAM user has listObjects permission
final S3Destination destinationSuccess = new S3Destination(new S3DestinationConfigFactory () {
public S3DestinationConfig getS3DestinationConfig(final JsonNode config) {
return new S3DestinationConfig(
"fake-endpoint",
"fake-bucket",
"fake-bucketPath",
"fake-region",
"fake-accessKeyId",
"fake-secretAccessKey",
S3DestinationConfig.DEFAULT_PART_SIZE_MB,
null, s3);
}
});
status = destinationSuccess.check(null);
assertEquals(Status.SUCCEEDED, status.getStatus(), "Connection check should have succeeded");
}

@Test
Expand Down

0 comments on commit 2d04a71

Please sign in to comment.