Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC WIP: New batch processor for new native partial response (SQS, DynamoDB, Kinesis, Aurora Stream(?)) #797

Closed
pankajagrawal16 opened this issue Mar 16, 2022 · 15 comments
Assignees
Labels
enhancement New feature or request priority:3 Neutral - not a core feature or affects less than 40% of users RFC

Comments

@pankajagrawal16
Copy link
Contributor

pankajagrawal16 commented Mar 16, 2022

Key information

  • RFC PR: n/a
  • Related issue(s), if known: n/a
  • Area: Batch
  • Meet tenets: Yes

Summary

A new generic batch processing utility, which can process records from SQS, Kinesis Data Streams, and DynamoDB streams, and handle reporting batch failures.

Motivation

With the launch of support for partial batch responses for Lambda/SQS, the event source mapping can now natively handle partial failures in a batch - removing the need for calls to the delete api. This support already exists for Kinesis and DynamoDB streams.

The Java SDK for Lambda contains 1/ the incoming message types (both batch and nested messages within batch) and 2/ the partial batch response types. The documentation for each event source (e.g. SQS) contains examples for implementing partial batch responses. Powertools aims to improve on this by:

  • Providing a consistent experience across event sources - the user should 1/ become aware that other message sources can be handled in the same fashion, and not 2/ not have to rework their logic to use a different message source
  • Benefit automatically from all best practices here. For instance, the example SQS code linked above does not deal with FIFO queues, and this it is not immediately apparent that extra logic is needed. Powertools will transparently handle this case
  • Integrate without surprises with adjacent features, such as large message handling

Proposal

1. Layout

The new utility will be implemented in a new package powertools-batch. The existing SQS batch processor powertools-sqs will be maintained for bug fixes and removed in powertools for java v2.

2. Existing SQS Batch Interface Simplifications

Powertools for Java has an existing batch processing mechanism for SQS only, which was written before partial responses and uses explicit message deletion instead (documentation, code).

It includes extra tunables that are not present in the python implementation and make less sense with partial batch responses. We will not support these:

  • Non-retryable exceptions - there is no mechanism to indicate in a partial batch response that a particular message should not be retried and instead moved to DLQ - a message either succeeds, or fails and is retried.
  • Supress exception - stops the processor from throwing an exception on failure. Because the old processor explicitly handles batch failures by deletion, it can throw once its done. With the new processor, because failure of a message returns a result explicitly to Lambda to allow lambda to remove the message, this is unnecessary

The existing implementation in addition to a utility class provides an annotation to handle batch responses. It works like this:

    @Override
    @SqsBatch(value = SampleMessageHandler.class, suppressException = true)
    public String handleRequest(SQSEvent input, Context context) {
        return "{\"statusCode\": 200}";
    }

This doesn't make sense in the world of partial batch responses. The message returned to Lambda will be the partial batch response itself, and will therefore be generated by the new batch utility. That means that this still of embedding a success message in the center makes no sense, as the user's own code does not control the return.

3. Features to retain

  • Idempotency utility integration
  • Large message support. This has been historically implemented by the @SqsLargeMessage annotation on the lambda request handler itself. However in Feature request: Better failures handling while using both BatchProcessing and LargeMessageHandling #596 we can see that the current implementation is not optimal when batches and large messages are combined, leading to an entire batch failure when a single message has issues. We will resolve this here by incorporating the large message processing into the inner-loop of the batch processing, rather than an aspect that happens before the batch. We will see if this can be done automatically or if it will require the user to hint they need large message processing enabled when providing their batch handler.
  • FIFO queue support. The responses for FIFO queues are different - you must stop processing the batch as soon as any failures appear. We have an open PR for this against our existing SQS impl, and should implement the same here fix: Handle batch failures in FIFO queues correctly #1183

4. User-Facing API

To decide on the approach, let's look at 2 alternative implementations of the user-facing API. The complete
implementation has not been built, just enough of the API of the new library to complete the RFC phase.

We can provide a simple builder interface.
This decouples us completely from the request handler model which may provide extra flexibility in the future, and
gives us a mechanism to extend behaviour later without breaking interfaces by adding extra parameters to the builder.
By way of example, success and failure hooks are added to SQS, a feature also provided in the Python implementation.

A partial mockup of this implementation can be found here.

public class SqsExampleWithIdempotency implements RequestHandler<SQSEvent, SqsBatchResponse> {

    private SqsBatchMessageHandler handler;

    public SqsExampleWithIdempotency() {
        // Example 1 - process a raw SQS message in an idempotent fashion
        // return ...
        batchHandler =  new BatchMessageHandlerBuilder()
                // First we can set parameters common to all message sources
                .withFailureHandler((msg, e) -> System.out.println("Whoops: " + msg.getMessageId())) 
                .withSqsBatchHandler()
                 // now we can set any parameters specific to SQS
          
                 .withRawMessageHandler(this::processRawMessage)
                 //.withDeserializedMessageHandler<Basket>(this::processDeserializedMessage)
                .build(); 
        }       
   
    @Override
    public SqsBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
         return handler.process(sqsEvent);
    }

    @SqsLargeMessage
    private void processRawMessage(SQSEvent.SQSMessage sqsMessage, Context context) {
    }

    @Idempotent    
    private void processDeserializedMessage(@IdempotencyKey Basket basket, Context context) {
        // Do some stuff
    }
}

4.3 RequestHandler base class

A third option was considered providing a base BatchRequestHandler<...> for the user to extend.
This option was discarded because it limits flexibility of the user's code. The code for this
variant can nonetheless be found
here.

5. Data Model

The new module will consume batch events from Lambda using the types in aws-lambda-java-events. From there individual records must be pulled out and passed onto the user-provided handler.

The events library already has nested types for the messages within a batch; we simply pass these through to the user's handler. These types do not share a ABC so each handler is coupled to the concrete type of the source that is producing messages.

This approach decreases the complexity of the powertools implementation - no additional mapping needs to be done - and would also automatically pass through new fields appearing in the interface with a simple dependency version update.

Questions

  • Should we wait for the messages v4 API, and depend on this, while the old version of the SQS processor continues to use the old messaging library?
  • Do we intend for this module to supersede powertools-sqs completely? If so, we'd need to duplicate some other functionality - e.g. the large message aspect - so that users do not need to pull both libraries in.

Drawbacks

This change will introduce redundancy between the existing SQS batch processing utility and this
new utility. The old utility will be removed as part of the v2 changes.

This utility adds no additional dependencies. The message types involved are all bundled together
in aws-lambda-java-events.

Rationale and alternatives

  1. Provide an abstract-base-RequestHandler that can be extended by the user (example code). This has been discarded as previous feedback has indicated that in some cases, it is not practical to extend a Powertools class in the RequestHandler - using default interfaces allows us to mixin implementation without doing this
  2. Use annotations and aspects to inject behaviour - this has been discarded because 1/ it adds unnecessary complexity in this case and 2/ is not ergonomic when the return type of the function must be governed by the message processor
  3. Use interface defaults to mix behaviour into request handler - this has been discarded because 1/ it couples us to the RequestHandler still to some extent and 2/ it isn't a common pattern in Java

Unresolved questions

Optional, stash area for topics that need further development e.g. TBD

@pankajagrawal16
Copy link
Contributor Author

@machafer will start to look the implementations and the UX.

@pankajagrawal16 pankajagrawal16 added the enhancement New feature or request label Mar 16, 2022
@jeromevdl
Copy link
Contributor

@machafer, what's the status on this please?

@scottgerring
Copy link
Contributor

As I recently worked on the code here i'll pick this up !

@scottgerring scottgerring changed the title RFC: New batch processor for new native partial response (SQS, DynamoDB, Kinesis, Aurora Stream(?)) RFC WIP: New batch processor for new native partial response (SQS, DynamoDB, Kinesis, Aurora Stream(?)) Jun 21, 2023
@jeromevdl
Copy link
Contributor

Have you seen this issue #596 and is it something to consider as part of this RFC?

Regarding the proposition, I'm not sure how you can return this List<String> with the BatchMessageHandlerBuilder. Also, from the documentation, there is already a SQSBatchResponse class with a BatchItemFailure. We should probably not reinvent the wheel again and use this:

public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> {
    @Override
    public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
 
         List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>();
         String messageId = "";
         for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) {
             try {
                 //process your message
                 messageId = message.getMessageId();
             } catch (Exception e) {
                 //Add failed message identifier to the batchItemFailures list
                 batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId));
             }
         }
         return new SQSBatchResponse(batchItemFailures);
     }
}

Same for kinesis/dynamodb: StreamsEventResponse.BatchItemFailure.

➡️ I would definitely use the built-in events (just seen the solution 4.2). But in that case, what will be the developer experience, what will be the added value of the module? Can you elaborate on this?

Regarding your questions:

  • Events v4: We need to gather informations internally to understand the status / differences / ETA.
  • I agree this module should supersede the old SQS one, that will be marked deprecated here and deleted in v2. Regarding the common code, I don't know if we are talking about 1 class or much more. We can probably live with duplicated code if it's not too big...

@jeromevdl
Copy link
Contributor

Can we also take inspiration from python and see if we can have a similar "experience".

Wondering if we could have something like this (for SQS), using interfaces and default implementations:

public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse>, BatchProcessor<SQSEvent, SQSBatchResponse> {
     
    @Override
    public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
         // processBatch is implemented as default in the interface and handle exceptions in the processElement function to add the item to the BatchItemFailure list
         return this.processBatch(sqsEvent); // we may need to pass context too... ?
    }

    // this method comes from the BatchProcessor interface, developers need to override the appropriate one
    @Override
    protected void processItem(SQSMessage message) {
        // do some stuff with this item
    }

}

With the BatchProcessor:

public interface BatchProcessor<I, O> {
    default O processBatch(I input) {
        // depending on the input type (SQS/Kinesis/Dynamo/...), create the appropriate response
        // browse the list of items
        // for each item
        try {
            processItem(item);
        catch (Throwable t) {
            // put item in item failure list
        }
    }
    
    default void processItem(SQSMessage message) {
        System.out.println(message.messageId);
    }

    default void processItem(KinesisEventRecord record) {
        System.out.println(record.eventID);
    }

    default void processItem(DynamodbStreamRecord record) {
        System.out.println(record.eventID);
    }
}

With this, we could add new streaming services with the Interface defaults without breaking anything.

@scottgerring
Copy link
Contributor

scottgerring commented Jun 25, 2023

Thanks @jeromevdl for the considered feedback!

Regarding the proposition, I'm not sure how you can return this List

Good catch. I think the base class / base builder will have to return Object as there is no relationship between the various event-specific batch response times

I've linked the examples and existing partial response types into the RFC. I've also updated the section at the top explaining why we should do this. It's important we get this part right.

I don't know if we are talking about 1 class or much more. We can probably live with duplicated code if it's not too big...

My feeling is how to handle this will come out as part of the implementation, we just have to be attentive to it.

I would definitely use the built-in events

I tend to agree. I don't think there is much value add, and a lot of extra code to maintain, in mapping to another model. Realistically the presented use case - "in some cases you can move message handlers between event sources without changing a type in your code" - is pretty tenuous.

@scottgerring
Copy link
Contributor

scottgerring commented Jun 25, 2023

Can we also take inspiration from python and see if we can have a similar "experience".

Looks like another option - I hadn't thought of using interface defaults. Let me have a play with it - i've started hacking around on a branch to try and get a feel for the ergonomics of different options. We've narrowed down the solution space in this discussion already; I will mock up some variants on the branch and we can discuss again 💪

@scottgerring
Copy link
Contributor

I've added a reasonably complete example using extension and a very rough sketch using default impls. I think it would be helpful to jump on a call together in the next week to discuss

@jeromevdl
Copy link
Contributor

  • We cannot use abstract classes as it's become too restrictive (we had the feedback once), that's why I proposed interfaces. I never really played with default though... Also with this, developers don't have the hand on the handleRequest method (here).
  • I like the idea of the generic U, but we could go even further and let the developer have the inner type of the message (what's in the body). They propose it on python. What you suggest here
  • I think you overcomplicate things, but I may miss something...
    • Ex: Why a builder ? We're not in the SDK 😉
    • Also we don't want to let this to the developers, this is actually why we build this module (the response only consists in PartialBatchFailure, they don't need to return anything)...
@Override
    protected SQSBatchResponse writeResponse(Iterable<MessageProcessingResult<SQSEvent.SQSMessage>> results) {
        // Here we map up the SQS-specific response for the batch based on the success of the individual messages
        throw new NotImplementedException();
    }

But we have a good basis to discuss on Thursday.

@scottgerring
Copy link
Contributor

scottgerring commented Jun 28, 2023

We cannot use abstract classes as it's become too restrictive

This is a pretty strong statement - do you have some more details? I think it's important to know why we're discarding this. The default interface stuff I struggled to make work but if we turn our brains to it we can probably get somewhere. I can't think of a way of extending this that would cause problems with the fairly classic inheritance structure i've used so a good counter example - "if we add feature X, it will break the public interface" would be good.

I like the idea of the generic U, but we could go even further and let the developer have the inner type of the message (what's in the body).

I've got this in both variants now - for SQS, SQSEvent is the batch, SQSEvent.Message is the individual message:

public void processItem(SQSEvent.SQSMessage message, Context context) {
// Process an SQS message without throwing
}

.process(sqsEvent, (SQSEvent.SQSMessage message) -> {
// Process the message without throwing an exception
});
}

Because of the way the ABC is extended it's easy to provide a map down from the batch to the records per-type - here's the SQS batch handler:

protected List<SQSEvent.SQSMessage> extractMessages(SQSEvent input) {
return input.getRecords();
}

I think you overcomplicate things, but I may miss something... - Ex: Why a builder ? We're not in the SDK 😉

This may well be the case! My reasoning is - if we want to add a tuneable , we should be able to do it without breaking the interface.With the existing SQS batch handling impl you can't because the whole implementation is a series of various public overloads taking a huge list of possible params e.g.:

public static <R> List<R> batchProcessor(final SQSEvent event,
final boolean suppressException,
final SqsMessageHandler<R> handler,
final boolean deleteNonRetryableMessageFromQueue,
final Class<? extends Exception>... nonRetryableExceptions) {

I can across this as part of #1183 , where I would've liked to add a "use FIFO batch behaviour" switch but can no longer change the interface (in this case we can avoid it because we can infer we are on a FIFO queue, but that's kind of beside the point).

I'm not confident about this being the right way and am keen to discuss.

Also we don't want to let this to the developers, this is actually why we build this module (the response only consists in PartialBatchFailure, they don't need to return anything)...

This is a small part of the impl I started to flesh out and not a user facing thing - the user's code returns nothing or throws like the examples inline above. Appreciate it's hard to decode intent from a big dump of uncommented PoC code :)

@scottgerring
Copy link
Contributor

The idempotency library also integrates with the SQS utility; we must retain this functionality also, and it gives another example of an extension point.

@mriccia
Copy link
Contributor

mriccia commented Jul 12, 2023

If we go down the Builder route, we should implement it in a way that the batch handler is created and configured once with a builder, and then it is invoked during the handleRequest.
Here some sample code of how the interface might look like:

public class SqsExampleWithBuilder implements RequestHandler<SQSEvent, SQSBatchResponse> {

    BatchMessageHandler handler;

    public SqsExampleWithBuilder(){
        handler = new BatchMessageHandler.Builder()
                .withSource(SourceType.SQS)
                .withFailureHandler(msg -> System.out.println("Whoops: " + msg.getMessageId()))
                .withRawMessageHandler(this::processWithIdempotency)
                .build();
    }

    @Override
    public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
        return handler.handle(sqsEvent, context);

    }

    @Idempotent
    @SqsLargeMessage
    private void processWithIdempotency(@IdempotencyKey SQSEvent.SQSMessage sqsMessage, Context context) {
    }

}

@jeromevdl
Copy link
Contributor

jeromevdl commented Jul 12, 2023

Thanks for the comment @mriccia

It's more verbose than the interfaces, but probably more customizable I admit...

How do you handle the inner content of a message (body deserialization) ?

FailureHandler is optional right (as we need to handle the failure and add items to partialBatchFailure)?

Not sure about the source... can't we guess it instead of asking it?

Otherwise, I kinda like it...

Something like this ?

 BatchMessageHandler<SQSEvent>  handler = new BatchMessageHandler.Builder(SQSEvent.class)
                .withFailureHandler(msg -> System.out.println("Whoops: " + msg.getMessageId()))
                .withDeserializedMessageHandler(this::processDeserialized, Basket.class)
                .build();

    private void processDeserialized(Basket message, Context context) {
    }

@jeromevdl jeromevdl added priority:3 Neutral - not a core feature or affects less than 40% of users and removed triage labels Jul 17, 2023
@mriccia
Copy link
Contributor

mriccia commented Jul 19, 2023

RFC looks good now.
when it comes to testing this, let's verify that it works with messages passed across multiple services, for example:

  • SNS -> SQS
  • S3 notification -> SNS -> SQS
  • etc.

@jeromevdl
Copy link
Contributor

RFC is good now, let's build it!

For the ...->SNS->SQS, users will have to unwrap manually from the SQSMessage...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request priority:3 Neutral - not a core feature or affects less than 40% of users RFC
Projects
Status: Shipped
Development

No branches or pull requests

6 participants