Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Clients to modify the PutObjectRequests #48

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
target/
.idea
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.amazonaws</groupId>
<artifactId>amazon-sqs-java-extended-client-lib</artifactId>
<version>1.0.2</version>
<version>1.0.3</version>
<packaging>jar</packaging>
<name>Amazon SQS Extended Client Library for Java</name>
<description>An extension to the Amazon SQS client that enables sending and receiving messages up to 2GB via Amazon S3.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1335,7 +1335,8 @@ private void storeTextInS3(String s3Key, String messageContentStr, Long messageC
PutObjectRequest putObjectRequest = new PutObjectRequest(clientConfiguration.getS3BucketName(), s3Key,
messageContentStream, messageContentStreamMetadata);
try {
clientConfiguration.getAmazonS3Client().putObject(putObjectRequest);
PutObjectRequest clientModifiedPutRequest = clientConfiguration.getPutObjectModifier().apply(putObjectRequest);
clientConfiguration.getAmazonS3Client().putObject(clientModifiedPutRequest);
} catch (AmazonServiceException e) {
String errorMessage = "Failed to store the message content in an S3 object. SQS message was not sent.";
LOG.error(errorMessage, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
package com.amazon.sqs.javamessaging;

import com.amazonaws.AmazonClientException;
import com.amazonaws.internal.SdkFunction;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.PutObjectRequest;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.annotation.NotThreadSafe;

import java.util.List;

/**
* Amazon SQS extended client configuration options such as Amazon S3 client,
* bucket name, and message size threshold for large-payload messages.
Expand All @@ -36,6 +36,7 @@ public class ExtendedClientConfiguration {
private boolean largePayloadSupport = false;
private boolean alwaysThroughS3 = false;
private int messageSizeThreshold = SQSExtendedClientConstants.DEFAULT_MESSAGE_SIZE_THRESHOLD;
private SdkFunction<PutObjectRequest, PutObjectRequest> putObjectModifier = new PutObjectRequestIdentityFunction();

public ExtendedClientConfiguration() {
s3 = null;
Expand All @@ -48,6 +49,7 @@ public ExtendedClientConfiguration(ExtendedClientConfiguration other) {
this.largePayloadSupport = other.largePayloadSupport;
this.alwaysThroughS3 = other.alwaysThroughS3;
this.messageSizeThreshold = other.messageSizeThreshold;
this.putObjectModifier = other.putObjectModifier;
}

/**
Expand Down Expand Up @@ -214,4 +216,45 @@ public ExtendedClientConfiguration withAlwaysThroughS3(boolean alwaysThroughS3)
public boolean isAlwaysThroughS3() {
return alwaysThroughS3;
}

/**
* Sets the function which you can use to modify the PutObjectRequest to enable support for encryption headers,
* meta-data, etc.
*
* @param putObjectModifier
* An implementation of the internal SdkFunction which takes a PutObjectRequest, modifies it, and returns it.
*/
public void setPutObjectModifier(SdkFunction<PutObjectRequest, PutObjectRequest> putObjectModifier)
{
this.putObjectModifier = putObjectModifier;
}

/**
* Sets the function which you can use to modify the PutObjectRequest to enable support for encryption headers,
* meta-data, etc.
*
* @param putObjectModifier
* An implementation of the internal SdkFunction which takes a PutObjectRequest, modifies it, and returns it.
*/
public ExtendedClientConfiguration withPutObjectModifier(SdkFunction<PutObjectRequest, PutObjectRequest> putObjectModifier) {
this.setPutObjectModifier(putObjectModifier);
return this;
}

/**
* Used to return the function which modifies the PutObjectRequests
*
* @return The custom configured SdkFunction or an identity function by default.
* stored in Amazon S3. Default: false
*/
public SdkFunction<PutObjectRequest, PutObjectRequest> getPutObjectModifier() {
return putObjectModifier;
}

static class PutObjectRequestIdentityFunction implements SdkFunction<PutObjectRequest, PutObjectRequest> {
@Override
public PutObjectRequest apply(PutObjectRequest putObjectRequest) {
return putObjectRequest;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;

import com.amazonaws.internal.SdkFunction;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.sqs.AmazonSQS;
Expand All @@ -34,8 +35,14 @@
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.mock;
Expand All @@ -48,11 +55,13 @@
/**
* Tests the AmazonSQSExtendedClient class.
*/
@RunWith(MockitoJUnitRunner.class)
public class AmazonSQSExtendedClientTest {

private AmazonSQS extendedSqsWithDefaultConfig;
private AmazonSQS mockSqsBackend;
private AmazonS3 mockS3;
@Mock private AmazonSQS mockSqsBackend;
@Mock private AmazonS3 mockS3;
@Mock private SdkFunction<PutObjectRequest, PutObjectRequest> mockModificationFunction;
private static final String S3_BUCKET_NAME = "test-bucket-name";
private static final String SQS_QUEUE_URL = "test-queue-url";

Expand All @@ -65,12 +74,19 @@ public class AmazonSQSExtendedClientTest {

@Before
public void setupClient() {
mockS3 = mock(AmazonS3.class);
mockSqsBackend = mock(AmazonSQS.class);
when(mockS3.putObject(isA(PutObjectRequest.class))).thenReturn(null);

when(mockModificationFunction.apply(any(PutObjectRequest.class))).thenAnswer(new Answer<PutObjectRequest>()
{
@Override
public PutObjectRequest answer(InvocationOnMock invocationOnMock) {
return (PutObjectRequest) invocationOnMock.getArguments()[0];
}
});

ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration()
.withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME);
.withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME)
.withPutObjectModifier(mockModificationFunction);

extendedSqsWithDefaultConfig = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration));

Expand All @@ -84,6 +100,7 @@ public void testWhenSendLargeMessageThenPayloadIsStoredInS3() {
extendedSqsWithDefaultConfig.sendMessage(messageRequest);

verify(mockS3, times(1)).putObject(isA(PutObjectRequest.class));
verify(mockModificationFunction, times(1)).apply(any(PutObjectRequest.class));
}

@Test
Expand All @@ -95,6 +112,7 @@ public void testWhenSendSmallMessageThenS3IsNotUsed() {
extendedSqsWithDefaultConfig.sendMessage(messageRequest);

verify(mockS3, never()).putObject(isA(PutObjectRequest.class));
verify(mockModificationFunction, never()).apply(any(PutObjectRequest.class));
}

@Test
Expand All @@ -117,27 +135,33 @@ public void testWhenSendMessageWithAlwaysThroughS3AndMessageIsSmallThenItIsStill
int messageLength = LESS_THAN_SQS_SIZE_LIMIT;
String messageBody = generateStringWithLength(messageLength);
ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration()
.withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withAlwaysThroughS3(true);
.withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME)
.withAlwaysThroughS3(true)
.withPutObjectModifier(mockModificationFunction);
AmazonSQS sqsExtended = spy(new AmazonSQSExtendedClient(mock(AmazonSQSClient.class), extendedClientConfiguration));

SendMessageRequest messageRequest = new SendMessageRequest(SQS_QUEUE_URL, messageBody);
sqsExtended.sendMessage(messageRequest);

verify(mockS3, times(1)).putObject(isA(PutObjectRequest.class));
verify(mockModificationFunction, times(1)).apply(any(PutObjectRequest.class));
}

@Test
public void testWhenSendMessageWithSetMessageSizeThresholdThenThresholdIsHonored() {
int messageLength = ARBITRATY_SMALLER_THRESSHOLD * 2;
String messageBody = generateStringWithLength(messageLength);
ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration()
.withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withMessageSizeThreshold(ARBITRATY_SMALLER_THRESSHOLD);
.withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME)
.withMessageSizeThreshold(ARBITRATY_SMALLER_THRESSHOLD)
.withPutObjectModifier(mockModificationFunction);

AmazonSQS sqsExtended = spy(new AmazonSQSExtendedClient(mock(AmazonSQSClient.class), extendedClientConfiguration));

SendMessageRequest messageRequest = new SendMessageRequest(SQS_QUEUE_URL, messageBody);
sqsExtended.sendMessage(messageRequest);
verify(mockS3, times(1)).putObject(isA(PutObjectRequest.class));
verify(mockModificationFunction, times(1)).apply(any(PutObjectRequest.class));
}

@Test
Expand Down Expand Up @@ -191,6 +215,7 @@ public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStor

// There should be 8 puts for the 8 messages above the threshhold
verify(mockS3, times(8)).putObject(isA(PutObjectRequest.class));
verify(mockModificationFunction, times(8)).apply(any(PutObjectRequest.class));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.sqs.javamessaging;

import com.amazonaws.internal.SdkFunction;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.PutObjectRequest;
import junit.framework.Assert;
Expand All @@ -32,7 +33,6 @@ public class ExtendedClientConfigurationTest {

@Before
public void setup() {

}

@Test
Expand Down Expand Up @@ -116,5 +116,49 @@ public void testMessageSizeThreshold() {

}

@Test
public void identityFunction_doesNotModifyInput()
{
ExtendedClientConfiguration.PutObjectRequestIdentityFunction identityFunction = new ExtendedClientConfiguration.PutObjectRequestIdentityFunction();
PutObjectRequest inputRequest = mock(PutObjectRequest.class);
PutObjectRequest outputRequest = identityFunction.apply(inputRequest);
Assert.assertSame(inputRequest, outputRequest);
verifyZeroInteractions(inputRequest);
}

@Test
public void noPutObjectFunctionDefined_useIdentity()
{
ExtendedClientConfiguration configuration = new ExtendedClientConfiguration();
Assert.assertEquals(configuration.getPutObjectModifier().getClass(), ExtendedClientConfiguration.PutObjectRequestIdentityFunction.class);
}

@Test
public void testPutObjectFunctionSetter()
{
ExtendedClientConfiguration configuration = new ExtendedClientConfiguration();
SdkFunction<PutObjectRequest, PutObjectRequest> modifierFunction = new SdkFunction<PutObjectRequest, PutObjectRequest>() {
@Override
public PutObjectRequest apply(PutObjectRequest putObjectRequest) {
return null;
}
};

configuration.setPutObjectModifier(modifierFunction);
Assert.assertEquals(configuration.getPutObjectModifier(), modifierFunction);
}

@Test
public void testPutObjectFunctioFluentApi()
{
SdkFunction<PutObjectRequest, PutObjectRequest> modifierFunction = new SdkFunction<PutObjectRequest, PutObjectRequest>() {
@Override
public PutObjectRequest apply(PutObjectRequest putObjectRequest) {
return null;
}
};

ExtendedClientConfiguration configuration = new ExtendedClientConfiguration().withPutObjectModifier(modifierFunction);
Assert.assertEquals(configuration.getPutObjectModifier(), modifierFunction);
}
}