Skip to content

RFC: SQS partial batch failure middleware #92

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
gmcrocetti opened this issue Jul 20, 2020 · 13 comments
Closed

RFC: SQS partial batch failure middleware #92

gmcrocetti opened this issue Jul 20, 2020 · 13 comments
Labels

Comments

@gmcrocetti
Copy link
Contributor

gmcrocetti commented Jul 20, 2020

First things first: Congratulations for the amazing repo.

Key information

  • RFC PR: (leave this empty)
  • Related issue(s), if known:
  • Area: Utilities
  • Meet tenets: Yes

Summary

A lambda processing a batch of messages from SQS is a very common approach and it works smooth for most use cases. Now, for the sake of example, suppose we're processing a batch and one of the messages failed, lambda is going to redrive this batch to the queue again, including the successful ones ! Re-running successful messages is not acceptable for all use cases.

Motivation

A very common execution pattern is running a lambda connected in sqs, in most cases with a batch size not equal to one. In such cases, an error to one of the processed messages will cause the whole batch to return to the queue. For some use cases, it's impossible to rely on such behavior - non idempotent actions. A solution for this problem would improve the experience of using a lambda with SQS.

Proposal

I'm going to propose a very simple code that's not complete.

from aws_lambda_powertools.middleware_factory import lambda_handler_decorator

@lambda_handler_decorator
def sqs_partial_batch_failure(handler, event, context):
	sent_records = event['Records']
	sqs_client = boto3.client('sqs')

	response = handler(event, context)

	successful_messages = get_successful(response)
	sqs_client.delete_message_batch(successful_messages)  # deletes 3rd and 7th messages


# batchsize of 10, fails for 3rd and 7th message
@sqs_partial_batch_failure
def handler(event, context):
	for record in event['Records']:
		do_sth(record)

Drawbacks

  • Add boto3 as dependency;
  • It may be unrelated with the proposal of this package;
  • Adds a little performance overhead to track failed records and delete the successful ones.

Rationale and alternatives

  • What other designs have been considered? Why not them?
    The lambda may be invoked with a single message (batch size one) but it just feels like ill-use of the full power of this integration. Running as much as possible message per lambda call is the best scenario.
    Inspired by (middy)[https://github.com/middyjs/middy/tree/master/packages]

  • What is the impact of not doing this?
    It may attract more users to use such a powerful option: batch processing with sqs.

Unresolved questions

Optional, stash area for topics that need further development e.g. TBD

@gmcrocetti gmcrocetti added feature-request feature request triage Pending triage from maintainers labels Jul 20, 2020
@ghost ghost removed the triage Pending triage from maintainers label Jul 20, 2020
@nmoutschen
Copy link
Contributor

Hi Guilherme! Thanks a lot for creating an issue! 😄

We're currently working through what tiny utilities should be included as part of the powertools. There's a fine balance between providing a good set of standard tools and preventing bloat.

Could you flesh this issue as an RFC (see this ticket template) in term of what the ideal developer experience would look like? If you already have similar implementations on GitHub, that'd also help a lot with knowing what would be the impact for such a middleware. Up to you if you want to create a new ticket using the template or include the RFC in this one. 😄

@gmcrocetti gmcrocetti changed the title SQS Batch middleware RFC: SQS partial batch failure middleware Jul 28, 2020
@gmcrocetti
Copy link
Contributor Author

Hi Guilherme! Thanks a lot for creating an issue! 😄

We're currently working through what tiny utilities should be included as part of the powertools. There's a fine balance between providing a good set of standard tools and preventing bloat.

Could you flesh this issue as an RFC (see this ticket template) in term of what the ideal developer experience would look like? If you already have similar implementations on GitHub, that'd also help a lot with knowing what would be the impact for such a middleware. Up to you if you want to create a new ticket using the template or include the RFC in this one. 😄

Thanks for you suggestion, Nicolas !

@nmoutschen
Copy link
Contributor

Thanks a lot, it's much clearer this way! 👍

Quick question on how the implementation would look like. If I understood correctly your proposal, the handler should return something that will tell the middleware that messages were failed/successful, then if there are any failed message, the middleware should throw an error (similar to how middy does it)?

E.g. would it look like this?

from aws_lambda_powertools.middleware_factory import lambda_handler_decorator


sqs_client = boto3.client("sqs")    


def get_queue_url_from_arn(arn: str) -> str:
    _, account_id, name = arn.rsplit(":", 2)
    return sqs.get_queue_url(
        QueueName=name,
        QueueOwnerAWSAccountId=account_id
    )["QueueUrl"]


@lambda_handler_decorator
def sqs_partial_batch_failure(handler, event, context):
    response = handler(event, context)

    # This could be optional.
    # We only need the list of successful messages, but this could be used for logging.
    failed_messages = response.get("failed", [])

    successful_messages = response.get("success", [])

    # Process successful messages, if any.
    if successful_messages:
        queue_url = get_queue_url_from_arn(event["Records"][0]["eventSourceARN"])

        # Delete the successful messages
        sqs.delete_message_batch(
            QueueUrl=queue_url,
            Entries=[
                {
                    "Id": e["messageId"],
                    "ReceiptHandle": e["receiptHandle"]
                } for e in successful_messages
            ]
        )

    # Throw an exception for failed messages
    if failed_messages:
        throw Exception(f"Failed to process {len(failed_messages)} messages.")

    # Alternative if we don't capture failed messages:
    # if len(successful_messages) < len(event["Records"]):
    #     throw Exception("Failed to process {} messages.".format(len(event["Records"]) - len(successful_messages)))

@gmcrocetti
Copy link
Contributor Author

gmcrocetti commented Jul 28, 2020

Exactly ! After removing successful messages, raise an error for failed ones. One technical detail of middy is that they enforce an API (allSettled), I'm not a huge fan of this approach but also, not quite sure if we'll be able to create some abstraction of "error tracking" each record, for both sync and async cases. Thoughts ?

@Nr18
Copy link

Nr18 commented Jul 30, 2020

We have built this our selves so I can share the how/why as input, sample code:

sqs = SQS(queue=os.getenv("QUEUE_URL"))

def handler(event: dict, context: LambdaContext) -> None:
    # Run the given callback method for each message
    sqs.process_messages(event["Records"], sqs_callback)

def sqs_callback(message: str, attributes: dict) -> bool:
    # Process your message here
    return True # False or an Exception would not delete the specific message

The SQS class lives in a lambda layer:

class SQS:
    def __init__(self, queue: str) -> None:
        self.log = logging.getLogger()        
        self.client = boto3.client("sqs")
        self.queue = queue

    def process_messages(self, messages: List[dict], callback: Callable) -> None:
        """
        Call the callback with the body for each SQS message in the given event, the callback needs to return True if
        the message has been handled as expected. False or an exception may be raised in case of unexpected output.

        When the callback function returns True the message will be deleted from the SQS Queue.

        When the callback function returns False the message will not be deleted from the SQS Queue and fallback on the
        retry policy and dead-letter queue configuration.
        """
        self.log.info(f"Found {len(messages)} records.")

        failed_records = []

        for message in messages:
            try:
                # Recover the trace context from the trace header (java sample code, see https://docs.aws.amazon.com/xray/latest/devguide/xray-services-sqs.html)
                # Segment segment = AWSXRay.getCurrentSegment();
                # segment.setTraceId(traceHeader.getRootTraceId());
                # segment.setParentId(traceHeader.getParentId());
                # segment.setSampled(traceHeader.getSampled().equals(TraceHeader.SampleDecision.SAMPLED));
                processed = callback(message["body"], message.get("attributes"))
            except Exception as e:
                processed = False
                self.log.warning(str(e))

            if processed:
                self.log.info(f"Record {message['messageId']} has been processed.")
                self.__delete_message(message)
            else:
                self.log.warning(
                    f"Unable to process the message with the id: {message['messageId']}"
                )
                failed_records.append(message["messageId"])

        if failed_records:
            raise ValueError(
                "The following records failed to be processed:\n"
                + "\n".join(failed_records)
            )

    def __delete_message(self, record: dict):
        self.log.info(f"Delete message {record['messageId']} from the SQS Queue.")
        self.client.delete_message(
            QueueUrl=self.queue, ReceiptHandle=record["receiptHandle"]
        )

So here are some considerations:

  • We do the looping of the messages in a separate class so that the looping behaviour only needs to be tested by the tests of the class and you don't need to do that in your lambda test
  • We make sure that valid processed messages get deleted and if a failure occurs we raise an exception triggering a retry.
  • We pass along a QueueUrl because I read somewhere that the ARN to URL conversion would not be guaranteed but that would be a nice improvement to fetch the QueueURL from the event or context.
  • Currently it's not allowed to set the traceId in an AWS Lambda function but if that becomes a possibility you are able to read the AWSTraceHeader attribute and set the traceId to the incoming trace for each message. That is not something you want to duplicate in each Lambda function.

@gmcrocetti
Copy link
Contributor Author

Hi @nmoutschen an @Nr18, very insightful opnions. I've been thinking over this problem and sth came to my mind: This problem "partial batch failure" is not a particular problem of SQS, we have some other services (kinesis and ddb) facing the same technical limitation. I know this is out of the scope of the current issue, but what if we extended a little bit by offering support to all these services ? At least to think about a interface for all of them.
Here's a prototype:

from typing import Callable, Coroutine, List, Dict
from contextlib import contextmanager, 

import boto3

from aws_lambda_powertools.middleware_factory import lambda_handler_decorator


def get_queue_url() -> str:
    pass

def service_factory(service: str) -> str:
    if service == 'sqs':
        return sqs_partial_batch

def get_service_from_record(record: Dict):
    return record.get('eventSource')


class BatchProcessor:

    def __init__(self, records: List):
        self.records = records
        self.failed_messages = []

    def process(self, record_handler: Callable):
        for record in self.records:
            try:
                record_handler(record)
            except Exception:
                self.failed_messages.append(record)

    async def async_process(self, record_handler: Coroutine):
        pass

@contextmanager
def ddb_partial_batch(records: List[Dict]):
    pass


@contextmanager
def kinesis_partial_batch(records: List[Dict]):
    pass

@contextmanager
def sqs_partial_batch(records: List[Dict]):
    queue_url = get_queue_url()
    sqs_client = boto3.client('sqs')
    processor: BatchProcessor = BatchProcessor(records)

    try:
        yield processor
    finally:
        sqs_client.delete_message_batch(
            QueueUrl=queue_url, Entries=processor.failed_messages
        )

def process_record(record: Dict):
    if record.get('fail'):
        raise Exception

    return record

def handler(event, context):
    with sqs_partial_batch(event['Records']) as ctx:
        ctx.process(process_record)

It's also possible to create a middleware to work for all services:

@lambda_handler_decorator
def partial_batch(handler, event, context, record_handler: Callable = None):
    service = get_service_from_record(event['Records'][0])
    partial_batch_processor = service_factory(service)

    records: List[Dict] = event['Records']

    with partial_batch_processor(records) as processor:
        processor.process(record_handler)

@partial_batch(record_handler=process_record)
def handler(event, context):
    pass

We can provide both of them :). Hope to hear from you guys !
ps. The implementation is incomplete.

@Nr18
Copy link

Nr18 commented Jul 31, 2020

@gmcrocetti that was something I was thinking about as well. In a Lambda function, you want to focus on the business logic so having only this in your lambda does that:

from ........ import partial_batch

@partial_batch(record_handler=process_record)
def handler(event, context) -> None:
    pass

def process_record(record: dict) -> None:
    # Business logic
    pass

Just thinking outload here if you look at the powertools it would look like this (a lot of decoration going on):

@tracer.capture_lambda_handler
@logger.inject_lambda_context
@metrics.log_metrics(capture_cold_start_metric=True)
@partial_batch(record_handler=process_record)
def handler(event: dict, context: LambdaContext) -> None:
    pass

@tracer.capture_method
def process_record(record: dict) -> None:
    # Business logic
    pass

@heitorlessa
Copy link
Contributor

heitorlessa commented Jul 31, 2020 via email

@nmoutschen nmoutschen added RFC and removed feature-request feature request labels Aug 4, 2020
@gmcrocetti
Copy link
Contributor Author

gmcrocetti commented Aug 6, 2020

Any updates ? I would like to contribute, if possible :)

@heitorlessa
Copy link
Contributor

hey @gmcrocetti - I'm happy with this, though I can't work on the implementation atm. If you'd like to give it a go, feel free to create a PR after setting up your dev env.

As this will be a utility, you might be able to copy the structure Nicolas started in #96 so we can use .utilities namespace for non-core utilities.

Let me know if you need a hand on any of this, otherwise I can look at it after finishing the #97 and #95 :)

Thanks a lot for the help

@heitorlessa
Copy link
Contributor

Nearly there... \ o / Minor adjustments on docs, and we should be able to publish it in the next release (1.5.0)

@heitorlessa
Copy link
Contributor

heitorlessa commented Aug 31, 2020

@gmcrocetti lemme know what's easiest way to reach out to you - Feel free to DM me on Twitter. I'd like to help complete that PR so we can release 1.5.0 this week ;)

Update - We've just got our own Slack channel (#lambda-powertools) on AWS Developers workspace - Invite if you don't have an account yet

@heitorlessa heitorlessa added pending-release Fix or implementation already in dev waiting to be released and removed pending-release Fix or implementation already in dev waiting to be released labels Aug 31, 2020
@heitorlessa
Copy link
Contributor

This has now available in 1.5.0 release

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Development

No branches or pull requests

4 participants