Skip to content

Commit e3094cb

Browse files
committed
Add functional API for LargeMessages utility.
1 parent 6e4701c commit e3094cb

File tree

10 files changed

+619
-86
lines changed

10 files changed

+619
-86
lines changed

powertools-large-messages/pom.xml

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414
-->
1515

1616
<project xmlns="http://maven.apache.org/POM/4.0.0"
17-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
18-
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
17+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
18+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
1919
<modelVersion>4.0.0</modelVersion>
2020

21-
<description>A suite of utilities for AWS Lambda Functions that makes handling large messages in SQS and SNS easier.</description>
21+
<description>A suite of utilities for AWS Lambda Functions that makes handling large messages in SQS and SNS easier.</description>
2222

2323
<parent>
2424
<groupId>software.amazon.lambda</groupId>
@@ -41,6 +41,13 @@
4141
<groupId>software.amazon.lambda</groupId>
4242
<artifactId>powertools-common</artifactId>
4343
</dependency>
44+
<dependency>
45+
<groupId>software.amazon.lambda</groupId>
46+
<artifactId>powertools-common</artifactId>
47+
<version>${project.version}</version>
48+
<type>test-jar</type>
49+
<scope>test</scope>
50+
</dependency>
4451
<dependency>
4552
<groupId>com.amazonaws</groupId>
4653
<artifactId>aws-lambda-java-events</artifactId>
@@ -102,6 +109,11 @@
102109
<artifactId>mockito-core</artifactId>
103110
<scope>test</scope>
104111
</dependency>
112+
<dependency>
113+
<groupId>org.mockito</groupId>
114+
<artifactId>mockito-junit-jupiter</artifactId>
115+
<scope>test</scope>
116+
</dependency>
105117
<dependency>
106118
<groupId>org.slf4j</groupId>
107119
<artifactId>slf4j-simple</artifactId>

powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessage.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@
2020
import java.lang.annotation.Target;
2121

2222
/**
23-
* <p>Use this annotation to handle large messages (> 256 KB) from SQS or SNS.
23+
* <p>Use this annotation to handle large messages (> 1 MB) from SQS or SNS.
2424
* When large messages are sent to an SQS Queue or SNS Topic, they are offloaded to S3 and only a reference is passed in the message/record.</p>
2525
*
2626
* <p>{@code @LargeMessage} automatically retrieves and deletes messages
2727
* which have been offloaded to S3 using the {@code amazon-sqs-java-extended-client-lib} or {@code amazon-sns-java-extended-client-lib}
2828
* client libraries.</p>
2929
*
30-
* <p>This version of the {@code @LargeMessage} is compatible with version
31-
* 1.1.0+ of {@code amazon-sqs-java-extended-client-lib} / {@code amazon-sns-java-extended-client-lib}.</p>
30+
* <p>This version of the {@code @LargeMessage} is compatible with version 1.1.0+ and 2.0.0+
31+
* of {@code amazon-sqs-java-extended-client-lib} / {@code amazon-sns-java-extended-client-lib}.</p>
3232
* <br/>
3333
* <p>Put this annotation on a method where the first parameter is either a {@link com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage} or {@link com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord}.
3434
* <br/>
@@ -54,9 +54,11 @@
5454
* </pre>
5555
* </p>
5656
*
57-
* <p><b>Note 1</b>: Retrieving payloads and deleting objects from S3 will increase the duration of the
57+
* <p><b>Note 1</b>: The message object (SQSMessage or SNSRecord) is modified in-place to avoid duplicating
58+
* the large blob in memory. The message body will be replaced with the S3 object content.</p>
59+
* <p><b>Note 2</b>: Retrieving payloads and deleting objects from S3 will increase the duration of the
5860
* Lambda function.</p>
59-
* <p><b>Note 2</b>: Make sure to configure your function with enough memory to be able to retrieve S3 objects.</p>
61+
* <p><b>Note 3</b>: Make sure to configure your function with enough memory to be able to retrieve S3 objects.</p>
6062
*/
6163
@Retention(RetentionPolicy.RUNTIME)
6264
@Target(ElementType.METHOD)
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Copyright 2023 Amazon.com, Inc. or its affiliates.
3+
* Licensed under the Apache License, Version 2.0 (the
4+
* "License"); you may not use this file except in compliance
5+
* with the License. You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*
13+
*/
14+
15+
package software.amazon.lambda.powertools.largemessages;
16+
17+
import java.util.Optional;
18+
import java.util.function.Function;
19+
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
import software.amazon.lambda.powertools.largemessages.internal.LargeMessageProcessor;
24+
import software.amazon.lambda.powertools.largemessages.internal.LargeMessageProcessorFactory;
25+
26+
/**
27+
* Functional API for processing large messages without AspectJ.
28+
* <p>
29+
* Use this class to handle large messages (> 1 MB) from SQS or SNS.
30+
* When large messages are sent to an SQS Queue or SNS Topic, they are offloaded to S3 and only a reference is passed in the message/record.
31+
* <p>
32+
* {@code LargeMessages} automatically retrieves and optionally deletes messages
33+
* which have been offloaded to S3 using the {@code amazon-sqs-java-extended-client-lib} or {@code amazon-sns-java-extended-client-lib}
34+
* client libraries.
35+
* <p>
36+
* This version is compatible with version 1.1.0+ and 2.0.0+ of {@code amazon-sqs-java-extended-client-lib} / {@code amazon-sns-java-extended-client-lib}.
37+
* <p>
38+
* <u>SQS Example</u>:
39+
* <pre>
40+
* public class SqsBatchHandler implements RequestHandler&lt;SQSEvent, SQSBatchResponse&gt; {
41+
* private final BatchMessageHandler&lt;SQSEvent, SQSBatchResponse&gt; handler;
42+
*
43+
* public SqsBatchHandler() {
44+
* handler = new BatchMessageHandlerBuilder()
45+
* .withSqsBatchHandler()
46+
* .buildWithRawMessageHandler(this::processMessage);
47+
* }
48+
*
49+
* &#64;Override
50+
* public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
51+
* return handler.processBatch(sqsEvent, context);
52+
* }
53+
*
54+
* private void processMessage(SQSEvent.SQSMessage sqsMessage) {
55+
* LargeMessages.processLargeMessage(sqsMessage, this::handleProcessedMessage);
56+
* }
57+
*
58+
* private void handleProcessedMessage(SQSEvent.SQSMessage processedMessage) {
59+
* // processedMessage.getBody() will contain the content of the S3 Object
60+
* }
61+
* }
62+
* </pre>
63+
* <p>
64+
* To disable the deletion of S3 objects:
65+
* <pre>
66+
* LargeMessages.processLargeMessage(sqsMessage, this::handleProcessedMessage, false);
67+
* </pre>
68+
* <p>
69+
* For multi-argument methods, use a lambda to pass additional parameters:
70+
* <pre>
71+
* public void handleRequest(SQSEvent event, Context context) {
72+
* event.getRecords().forEach(message -&gt;
73+
* LargeMessages.processLargeMessage(message, processedMsg -&gt; processMessage(processedMsg, context))
74+
* );
75+
* }
76+
*
77+
* private void processMessage(SQSMessage processedMessage, Context context) {
78+
* // processedMessage.getBody() will contain the content of the S3 Object
79+
* }
80+
* </pre>
81+
* <p>
82+
* <b>Note 1</b>: The message object (SQSMessage or SNSRecord) is modified in-place to avoid duplicating
83+
* the large blob in memory. The message body will be replaced with the S3 object content.
84+
* <p>
85+
* <b>Note 2</b>: Retrieving payloads and deleting objects from S3 will increase the duration of the Lambda function.
86+
* <p>
87+
* <b>Note 3</b>: Make sure to configure your function with enough memory to be able to retrieve S3 objects.
88+
*
89+
* @see LargeMessage
90+
*/
91+
public final class LargeMessages {
92+
93+
private static final Logger LOG = LoggerFactory.getLogger(LargeMessages.class);
94+
95+
private LargeMessages() {
96+
// utility class
97+
}
98+
99+
/**
100+
* Process a large message and execute the function with the processed message.
101+
* <p>
102+
* The S3 object will be deleted after processing (default behavior).
103+
* To disable S3 object deletion, use {@link #processLargeMessage(Object, Function, boolean)}.
104+
* <p>
105+
* Example usage:
106+
* <pre>
107+
* String returnValueOfFunction = LargeMessages.processLargeMessage(sqsMessage, this::handleMessage);
108+
* String returnValueOfFunction = LargeMessages.processLargeMessage(sqsMessage, processedMsg -&gt; processOrder(processedMsg, orderId, amount));
109+
* </pre>
110+
*
111+
* @param message the message to process (SQSMessage or SNSRecord)
112+
* @param function the function to execute with the processed message
113+
* @param <T> the message type
114+
* @param <R> the return type of the function
115+
* @return the result of the function execution
116+
*/
117+
public static <T, R> R processLargeMessage(T message, Function<T, R> function) {
118+
return processLargeMessage(message, function, true);
119+
}
120+
121+
/**
122+
* Process a large message and execute the function with the processed message.
123+
* <p>
124+
* Example usage:
125+
* <pre>
126+
* String returnValueOfFunction = LargeMessages.processLargeMessage(sqsMessage, this::handleMessage, false);
127+
* String returnValueOfFunction = LargeMessages.processLargeMessage(sqsMessage, processedMsg -&gt; processOrder(processedMsg, orderId, amount), false);
128+
* </pre>
129+
*
130+
* @param message the message to process (SQSMessage or SNSRecord)
131+
* @param function the function to execute with the processed message
132+
* @param deleteS3Object whether to delete the S3 object after processing
133+
* @param <T> the message type
134+
* @param <R> the return type of the function
135+
* @return the result of the function execution
136+
*/
137+
public static <T, R> R processLargeMessage(T message, Function<T, R> function, boolean deleteS3Object) {
138+
Optional<LargeMessageProcessor<?>> processor = LargeMessageProcessorFactory.get(message);
139+
140+
if (!processor.isPresent()) {
141+
LOG.warn("Unsupported message type [{}], proceeding without large message processing",
142+
message.getClass());
143+
return function.apply(message);
144+
}
145+
146+
try {
147+
@SuppressWarnings("unchecked")
148+
LargeMessageProcessor<T> typedProcessor = (LargeMessageProcessor<T>) processor.get();
149+
return typedProcessor.process(message, function::apply, deleteS3Object);
150+
} catch (RuntimeException e) {
151+
throw e;
152+
} catch (Throwable t) {
153+
throw new LargeMessageProcessingException("Failed to process large message", t);
154+
}
155+
}
156+
}

powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspect.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
package software.amazon.lambda.powertools.largemessages.internal;
1616

1717
import java.util.Optional;
18+
1819
import org.aspectj.lang.ProceedingJoinPoint;
1920
import org.aspectj.lang.annotation.Around;
2021
import org.aspectj.lang.annotation.Aspect;
2122
import org.aspectj.lang.annotation.Pointcut;
2223
import org.slf4j.Logger;
2324
import org.slf4j.LoggerFactory;
25+
2426
import software.amazon.lambda.powertools.largemessages.LargeMessage;
2527

2628
/**
@@ -31,17 +33,17 @@ public class LargeMessageAspect {
3133

3234
private static final Logger LOG = LoggerFactory.getLogger(LargeMessageAspect.class);
3335

34-
@SuppressWarnings({"EmptyMethod"})
36+
@SuppressWarnings({ "EmptyMethod" })
3537
@Pointcut("@annotation(largeMessage)")
3638
public void callAt(LargeMessage largeMessage) {
39+
// Pointcut method - body intentionally empty
3740
}
3841

42+
@SuppressWarnings("unchecked")
3943
@Around(value = "callAt(largeMessage) && execution(@LargeMessage * *.*(..))", argNames = "pjp,largeMessage")
40-
public Object around(ProceedingJoinPoint pjp,
41-
LargeMessage largeMessage) throws Throwable {
44+
public Object around(ProceedingJoinPoint pjp, LargeMessage largeMessage) throws Throwable {
4245
Object[] proceedArgs = pjp.getArgs();
4346

44-
// we need a message to process
4547
if (proceedArgs.length == 0) {
4648
LOG.warn("@LargeMessage annotation is placed on a method without any message to process, proceeding");
4749
return pjp.proceed(proceedArgs);
@@ -56,7 +58,8 @@ public Object around(ProceedingJoinPoint pjp,
5658
return pjp.proceed(proceedArgs);
5759
}
5860

59-
return largeMessageProcessor.get().process(pjp, largeMessage.deleteS3Object());
61+
return ((LargeMessageProcessor<Object>) largeMessageProcessor.get()).process(message,
62+
msg -> pjp.proceed(proceedArgs), largeMessage.deleteS3Object());
6063
}
6164

6265
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2023 Amazon.com, Inc. or its affiliates.
3+
* Licensed under the Apache License, Version 2.0 (the
4+
* "License"); you may not use this file except in compliance
5+
* with the License. You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*
13+
*/
14+
15+
package software.amazon.lambda.powertools.largemessages.internal;
16+
17+
/**
18+
* Functional interface for large message processing.
19+
* <p>
20+
* This interface is similar to {@link java.util.function.Function} but throws {@link Throwable}
21+
* instead of no exceptions. This is necessary to support AspectJ's {@code ProceedingJoinPoint.proceed()}
22+
* which throws {@code Throwable}, allowing exceptions to bubble up naturally without wrapping.
23+
* <p>
24+
* This interface should not be exposed to user-facing APIs such as
25+
* {@link software.amazon.lambda.powertools.largemessages.LargeMessages}. These should use plain
26+
* {@link java.util.function.Function}.
27+
*
28+
* @param <T> the input type (message type)
29+
* @param <R> the return type of the function
30+
*/
31+
@FunctionalInterface
32+
public interface LargeMessageFunction<T, R> {
33+
@SuppressWarnings("java:S112") // Throwable is required for AspectJ compatibility
34+
R apply(T processedMessage) throws Throwable;
35+
}

0 commit comments

Comments
 (0)