Skip to content

docs(idempotency): fix, improve, and increase visibility for batch integration #1776

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 80 additions & 69 deletions docs/utilities/idempotency.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,75 +132,6 @@ When using `idempotent_function`, you must tell us which keyword parameter in yo

DynamoDB Persistency layer uses a Resource client [which is not thread-safe](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html?highlight=multithreading#multithreading-or-multiprocessing-with-resources){target="_blank"}.

=== "batch_sample.py"

This example also demonstrates how you can integrate with [Batch utility](batch.md), so you can process each record in an idempotent manner.

```python hl_lines="4-5 16 21 30"
from aws_lambda_powertools.utilities.batch import (BatchProcessor, EventType,
batch_processor)
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.idempotency import (
DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function)


processor = BatchProcessor(event_type=EventType.SQS)
dynamodb = DynamoDBPersistenceLayer(table_name="idem")
config = IdempotencyConfig(
event_key_jmespath="messageId", # see Choosing a payload subset section
use_local_cache=True,
)


@idempotent_function(data_keyword_argument="record", config=config, persistence_store=dynamodb)
def record_handler(record: SQSRecord):
return {"message": record["body"]}


@idempotent_function(data_keyword_argument="data", config=config, persistence_store=dynamodb)
def dummy(arg_one, arg_two, data: dict, **kwargs):
return {"data": data}


@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context):
config.register_lambda_context(context) # see Lambda timeouts section
# `data` parameter must be called as a keyword argument to work
dummy("hello", "universe", data="test")
return processor.response()
```

=== "Batch event"

```json hl_lines="4"
{
"Records": [
{
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185"
},
"messageAttributes": {
"testAttr": {
"stringValue": "100",
"binaryValue": "base64Str",
"dataType": "Number"
}
},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2"
}
]
}
```

=== "dataclass_sample.py"

```python hl_lines="3-4 23 33"
Expand Down Expand Up @@ -276,6 +207,82 @@ When using `idempotent_function`, you must tell us which keyword parameter in yo
process_order(order=order)
```

#### Batch integration

You can can easily integrate with [Batch utility](batch.md) via context manager. This ensures that you process each record in an idempotent manner, and guard against a [Lambda timeout](#lambda-timeouts) idempotent situation.

???+ "Choosing an unique batch record attribute"
In this example, we choose `messageId` as our idempotency token since we know it'll be unique.

Depending on your use case, it might be more accurate [to choose another field](#choosing-a-payload-subset-for-idempotency) your producer intentionally set to define uniqueness.

=== "batch_sample.py"

```python hl_lines="3-4 10 15 21 25-26 29 31"
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.idempotency import (
DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function)


processor = BatchProcessor(event_type=EventType.SQS)
dynamodb = DynamoDBPersistenceLayer(table_name="idem")
config = IdempotencyConfig(
event_key_jmespath="messageId", # see Choosing a payload subset section
use_local_cache=True,
)


@idempotent_function(data_keyword_argument="record", config=config, persistence_store=dynamodb)
def record_handler(record: SQSRecord):
return {"message": record.body}


def lambda_handler(event, context):
config.register_lambda_context(context) # see Lambda timeouts section

# with Lambda context registered for Idempotency
# we can now kick in the Bach processing logic
batch = event["Records"]
with processor(records=batch, handler=record_handler):
# in case you want to access each record processed by your record_handler
# otherwise ignore the result variable assignment
processed_messages = processor.process()

return processor.response()
```

=== "batch_event.json"

```json hl_lines="4"
{
"Records": [
{
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185"
},
"messageAttributes": {
"testAttr": {
"stringValue": "100",
"binaryValue": "base64Str",
"dataType": "Number"
}
},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2"
}
]
}
```

### Choosing a payload subset for idempotency

???+ tip "Tip: Dealing with always changing payloads"
Expand Down Expand Up @@ -982,6 +989,10 @@ class DynamoDBPersistenceLayer(BasePersistenceLayer):

## Compatibility with other utilities

### Batch

See [Batch integration](#batch-integration) above.

### Validation utility

The idempotency utility can be used with the `validator` decorator. Ensure that idempotency is the innermost decorator.
Expand Down