-
Notifications
You must be signed in to change notification settings - Fork 89
/
Copy pathBatchRequestHandler.java
104 lines (91 loc) · 3.66 KB
/
BatchRequestHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package software.amazon.lambda.powertools.batch;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.sun.xml.internal.ws.api.message.Message;
import java.util.List;
import java.util.stream.Collectors;
/**
* An abstract base class that will be extended per-event-source to supply event-source
* specific functionality. As much as possible is pulled into the base class, and
* event-source specific logic is delegated downwards (e.g., mapping errors to response,
* extracting messages from batch).
*
* IRL, this would be implemented leaning on the {@link BatchMessageHandlerBuilder}, here
* I have provided implementation inline for illustrative purposes.
*
* <b>Note - this is package private. We don't let users extend this, which gives us
* much more flexibility to add extra functionality later by minimising our API surface
* area. </b>
*
* @param <T> The batch event type
* @param <U> The individual message type for each message within teh batch
* @param <V> The batch result type
*/
abstract class BatchRequestHandler<T, U, V> implements RequestHandler<T, V> {
/**
* Used to wrap the result of processing a single message. We wrap the message itself,
* and optionally an exception that was raised during the processing. A lack of
* exception indicates success.
*/
protected class MessageProcessingResult<U> {
private final U message;
private final Exception exception;
public MessageProcessingResult(U message, Exception exception) {
this.message = message;
this.exception = exception;
}
}
/**
* The batch processing logic goes here. This can be generic across all message types.
*
* @param input The batch message to process
* @param context The Lambda execution environment context object.
* @return
*/
@Override
public V handleRequest(T input, Context context) {
// Extract messages from batch
List<U> messages = extractMessages(input);
// For each message, map it to either 1/ a successful result, or 2/ an exception
List<MessageProcessingResult<U>> results = messages.stream().map(m -> {
try {
enhanceMessage(m);
processItem(m, context);
return new MessageProcessingResult<>(m, null);
} catch (Exception e) {
return new MessageProcessingResult<>(m, e);
}
}).collect(Collectors.toList());
// Generate the response from the list of results
return writeResponse(results);
}
/**
* Provided by the event-specific child to extract the individual records
* from the batch request
*
* @param input The batch
* @return the messages within the batch
*/
protected abstract List<U> extractMessages(T input);
/**
* Given the set of message processing results, generates the appropriate batch
* processing result to return to Lambda.
*
* @param results the result of processing each message, and the messages themselves
* @return the batch processing result to return to lambda
*/
protected abstract V writeResponse(Iterable<MessageProcessingResult<U>> results);
/**
* To be provided by the user. Processes an individual message within the batch
* @param message
* @param context
*/
public abstract void processItem(U message, Context context);
/**
* This could be overriden by event-specific children to implement things like large
* message processing.
* @param message
*/
protected void enhanceMessage(U message) {
}
}