-
Notifications
You must be signed in to change notification settings - Fork 407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(batch): new BatchProcessor for SQS, DynamoDB, Kinesis #886
feat(batch): new BatchProcessor for SQS, DynamoDB, Kinesis #886
Conversation
Codecov Report
@@ Coverage Diff @@
## develop #886 +/- ##
===========================================
- Coverage 99.88% 99.79% -0.10%
===========================================
Files 118 118
Lines 5161 5262 +101
Branches 578 596 +18
===========================================
+ Hits 5155 5251 +96
- Misses 2 6 +4
- Partials 4 5 +1
Continue to review full report at Codecov.
|
@cakepietoast ready to review implementation before it gets bigger with mypy and doc stuff. |
@heitorlessa as discussed, here's the sqs example:
|
🤦♂️ yes! Not sure why I used Callback
…On Wed, 15 Dec 2021 at 09:54, Tom McCarthy ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In aws_lambda_powertools/utilities/batch/base.py
<#886 (comment)>
:
> @@ -72,7 +110,7 @@ def __call__(self, records: List[Any], handler: Callable):
self.handler = handler
return self
- def success_handler(self, record: Any, result: Any):
+ def success_handler(self, record, result: Any) -> SuccessCallback:
Would SuccessResponse be a more accurate name?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#886 (review)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAZPQBFPYJNIEG5KWW7JBGDURBJTJANCNFSM5JZVI3AA>
.
|
Great point. It’s good to anticipate someone will subclass it sooner or
later
…On Tue, 14 Dec 2021 at 15:54, Guilherme Martins Crocetti < ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In aws_lambda_powertools/utilities/batch/base.py
<#886 (comment)>
:
> +class BatchProcessor(BasePartialProcessor):
+ DEFAULT_RESPONSE: Dict[str, List[Optional[dict]]] = {"batchItemFailures": []}
+
+ def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = None):
+ """Process batch and partially report failed items
+
+ Parameters
+ ----------
+ event_type: EventType
+ Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event
+ model: Optional["BatchTypeModels"]
+ Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord
+ """
+ self.event_type = event_type
+ self.model = model
+ self.batch_response = self.DEFAULT_RESPONSE
Hey @heitorlessa <https://github.com/heitorlessa> , don't know if this
class is meant to be extended. Case it's not, following comment won't make
much sense but anyways:
Do you think it's worth being more defensive in here by adding a deepcopy ?
⬇️ Suggested change
- self.batch_response = self.DEFAULT_RESPONSE
+ self.batch_response = deepcopy(self.DEFAULT_RESPONSE)
Just to avoid a scenario where someone extends from this class and mess
with the DEFAULT_RESPONSE through self.batch_response (self.batch_response.update({"batchItemFailures":
[]}) for instance ) - or any sort of changes...
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#886 (review)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAZPQBDWTSDMHXO55IOJODDUQ5LBTANCNFSM5JZVI3AA>
.
|
Co-authored-by: Guilherme Martins Crocetti <gmcrocetti@gmail.com>
…tools-python into feat/batch-new-processor * 'develop' of https://github.com/awslabs/aws-lambda-powertools-python: fix(parser): kinesis sequence number is str, not int (aws-powertools#907) feat(apigateway): add exception_handler support (aws-powertools#898) fix(event-sources): Pass authorizer data to APIGatewayEventAuthorizer (aws-powertools#897) chore(deps): bump fastjsonschema from 2.15.1 to 2.15.2 (aws-powertools#891)
@@ -146,3 +185,120 @@ def batch_processor( | |||
processor.process() | |||
|
|||
return handler(event, context) | |||
|
|||
|
|||
class BatchProcessor(BasePartialProcessor): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it make sense to have separate processors for each event type (SQS, DynamoDB or Kinesis) instead of growing the complexity of this class? Then you could encapsulate the failure collection in the specific processor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was the initial version we wanted to implemented - a KinesisDataStreamProcessor, DynamoDB... then @cakepietoast argued that this was gonna confuse customers with other available processors (Sqs, PartialProcessor, BaseProcessor), as we can only deprecate them in v2.
I'm 50/50 here if I'm honest. I prefer a separate one but I also can see customers easily confused of which one to pick despite docs change I'm gonna make.
Implementation wise, this will remain stable. The only two changes I can anticipate is 1/ supporting the new Permanent Exception parameter, and 2/ raising a descriptive exception in case we reach an AttributeError when collecting message id/sequence number from a malformed event/model.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we changed the nomenclature to be “producer” and “consumer” (these processors would be consumers). I had that other idea earlier to make it easier to use the SQS and DynamoDB batch write methods taking into account their batch sizes, those could be “producers” 🤷♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where would the producer sit and what would be its responsibilities?
For that suggestion on partitioning, we should add it to the Event Source Data Class as it's a no brainier.
I think the word Consumer wouldn't be explicit enough on the capabilities Batch provide - maybe something else then?
Features
- Transform incoming records into Event Source Data Class or Pydantic Models
- Call an user defined function for each record in the batch
- Keep track of exceptions raised for any record
- For partial successes, extract message IDs (SQS) or sequence numbers (DynamoDB & Kinesis) and build a response to the new BatchItemIdentifiers contract
- Future: If a raised exception matches the permanent_exceptions set, send these records to the configured DLQ in batches of 10
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Docs finally done - @michaelbrewer @ran-isenberg could you have a quick pass before we release it tomorrow? All I need to do now is merge and start writing the release notes. Here's what I've changed:
|
@heitorlessa super cool. i will try to play around with it once merged |
@heitorlessa looks great, thank you very much for putting the effort supporting both parser and data classes. |
…tools-python into complex * 'develop' of https://github.com/awslabs/aws-lambda-powertools-python: (24 commits) docs: consistency around admonitions and snippets (aws-powertools#919) chore(deps-dev): bump mypy from 0.920 to 0.930 (aws-powertools#925) fix(event-sources): handle dynamodb null type as none, not bool (aws-powertools#929) fix(apigateway): support @app.not_found() syntax & housekeeping (aws-powertools#926) docs: Added GraphQL Sample API to Examples section of README.md (aws-powertools#930) feat(idempotency): support dataclasses & pydantic models payloads (aws-powertools#908) feat(tracer): ignore tracing for certain hostname(s) or url(s) (aws-powertools#910) feat(event-sources): cache parsed json in data class (aws-powertools#909) fix(warning): future distutils deprecation (aws-powertools#921) docs(batch): remove leftover from legacy docs(layer): bump Lambda Layer to version 6 chore: bump to 1.23.0 docs(apigateway): add new not_found feature (aws-powertools#915) docs: external reference to cloudformation custom resource helper (aws-powertools#914) feat(logger): allow handler with custom kwargs signature (aws-powertools#913) chore: minor housekeeping before release (aws-powertools#912) chore(deps-dev): bump mypy from 0.910 to 0.920 (aws-powertools#903) feat(batch): new BatchProcessor for SQS, DynamoDB, Kinesis (aws-powertools#886) fix(parser): overload parse when using envelope (aws-powertools#885) fix(parser): kinesis sequence number is str, not int (aws-powertools#907) ...
Issue #, if available: #858
Description of changes:
Implements new Batch Processor following the new built-in partial response handling for Lambda SQS, DynamoDB Stream, and Kinesis Data Streams.
Checklist
.report()
->response
to match Java counterpartsuppress_exception
behaviourBatchProcessingError
when all batch records fail to ensure failure arisesTasks to be created in separate PR to ease reviewing
UX
As a decorator
As a context manager, in case you need full access to the batch processed, or handle exceptions yourself
Breaking change checklist
RFC issue #:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.