Skip to content

Commit

Permalink
Increase coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
scottgerring committed Jul 11, 2023
1 parent fa497ed commit 6851776
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -496,9 +496,14 @@ public static <R> List<R> batchProcessor(final SQSEvent event,
}

BatchContext batchContext = new BatchContext(client);
Queue<SQSMessage> messagesToProcess = new LinkedList<>(event.getRecords());
while (!messagesToProcess.isEmpty()) {
SQSMessage message = messagesToProcess.remove();
int offset = 0;
while (offset < event.getRecords().size()) {

This comment has been minimized.

Copy link
@jeromevdl

jeromevdl Jul 11, 2023

Contributor

I've always learned to avoid break and to have something more explicit (a variable...), I know you don't want my code... :p but you will come to it

// Get the current message and advance to the next. Doing this here
// makes it easier for us to know where we are up to if we have to
// break out of here early.
SQSMessage message = event.getRecords().get(offset);
offset++;

// If the batch hasn't failed, try process the message
try {
handlerReturn.add(handler.process(message));
Expand All @@ -525,11 +530,15 @@ public static <R> List<R> batchProcessor(final SQSEvent event,

// If we have a FIFO batch failure, unprocessed messages will remain on the queue
// past the failed message. We have to add these to the errors
messagesToProcess.forEach(message -> {
LOG.info("Skipping message {} as another message with a message group failed in this batch",
message.getMessageId());
batchContext.addFailure(message, new SkippedMessageDueToFailedBatchException());
});
if (offset < event.getRecords().size()) {
event.getRecords()
.subList(offset, event.getRecords().size())
.forEach(message -> {
LOG.info("Skipping message {} as another message with a message group failed in this batch",
message.getMessageId());
batchContext.addFailure(message, new SkippedMessageDueToFailedBatchException());
});
}

batchContext.processSuccessAndHandleFailed(handlerReturn, suppressException, deleteNonRetryableMessageFromQueue, nonRetryableExceptions);
return handlerReturn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,50 @@ public void processWholeBatch() {
assertThat(results.size()).isEqualTo(3);
}

/**
* Check that a failure in the middle of the batch:
* - deletes the successful message explicitly from SQS
* - marks the failed and subsequent message as failed
* - does not delete the failed or subsequent message
*/
@Test
public void singleFailureInMiddleOfBatch() {
// Arrange
Mockito.when(sqsClient.deleteMessageBatch(deleteMessageBatchCaptor.capture())).thenReturn(DeleteMessageBatchResponse
.builder().build());

// Act
AtomicInteger processedCount = new AtomicInteger();
assertThatExceptionOfType(SQSBatchProcessingException.class)
.isThrownBy(() -> batchProcessor(sqsBatchEvent, false, (message) -> {
int value = processedCount.getAndIncrement();
if (value == 1) {
throw new RuntimeException("Whoops");
}
return true;
}))

// Assert
.isInstanceOf(SQSBatchProcessingException.class)
.satisfies(e -> {
List<SQSEvent.SQSMessage> failures = ((SQSBatchProcessingException)e).getFailures();
assertThat(failures.size()).isEqualTo(2);
List<String> failureIds = failures.stream()
.map(SQSEvent.SQSMessage::getMessageId)
.collect(Collectors.toList());
assertThat(failureIds).contains(sqsBatchEvent.getRecords().get(1).getMessageId());
assertThat(failureIds).contains(sqsBatchEvent.getRecords().get(2).getMessageId());
});

DeleteMessageBatchRequest deleteRequest = deleteMessageBatchCaptor.getValue();
List<String> messageIds = deleteRequest.entries().stream()
.map(DeleteMessageBatchRequestEntry::id)
.collect(Collectors.toList());
assertThat(deleteRequest.entries().size()).isEqualTo(1);
assertThat(messageIds.contains(sqsBatchEvent.getRecords().get(0).getMessageId())).isTrue();

}

@Test
public void singleFailureAtEndOfBatch() {

Expand Down

0 comments on commit 6851776

Please sign in to comment.