Skip to content

Commit 3580f57

Browse files
committed
feat: add sqs failure processors
1 parent 81e0198 commit 3580f57

File tree

2 files changed

+67
-1
lines changed

2 files changed

+67
-1
lines changed

aws_lambda_powertools/utilities/batch/base.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def _clean(self):
2727
raise NotImplementedError()
2828

2929
@abstractmethod
30-
def _process_record(self):
30+
def _process_record(self, record):
3131
raise NotImplementedError()
3232

3333
def process(self) -> List[Tuple]:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# -*- coding: utf-8 -*-
2+
3+
"""
4+
Batch SQS utilities
5+
"""
6+
7+
from typing import List
8+
9+
import boto3
10+
11+
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
12+
13+
from .base import BasePartialProcessor
14+
15+
16+
class BasePartialSQSProcessor(BasePartialProcessor):
17+
def __init__(self):
18+
self._client = boto3.client("sqs")
19+
self.success_messages: List = []
20+
self.fail_messages: List = []
21+
22+
super().__init__()
23+
24+
def get_queue_url(self):
25+
*_, account_id, queue_name = self.records[0]["eventSourceARN"].split(":")
26+
return f"{self._client._endpoint.host}/{account_id}/{queue_name}"
27+
28+
def get_entries_to_clean(self):
29+
return [{"Id": msg["messageId"], "ReceiptHandle": msg["receiptHandle"]} for msg in self.success_messages]
30+
31+
def _process_record(self, record):
32+
try:
33+
result = self.handler(record)
34+
return self.success_handler(record, result)
35+
except Exception as exc:
36+
return self.failure_handler(record, exc)
37+
38+
def _prepare(self):
39+
"""
40+
"""
41+
self.success_messages.clear()
42+
self.fail_messages.clear()
43+
44+
def _clean(self):
45+
"""
46+
"""
47+
if not self.fail_messages:
48+
return
49+
50+
queue_url = self.get_queue_url()
51+
entries_to_remove = self.get_entries_to_clean()
52+
53+
return self._client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove)
54+
55+
56+
class DefaultPartialSQSProcessor(BasePartialSQSProcessor):
57+
pass
58+
59+
60+
@lambda_handler_decorator
61+
def partial_sqs_processor(handler, event, context, record_handler, processor=None):
62+
records = event["Records"]
63+
processor = processor or DefaultPartialSQSProcessor()
64+
65+
with processor(records, record_handler) as ctx:
66+
ctx.process()

0 commit comments

Comments
 (0)