Skip to content

feat(idempotency): support methods with the same name (ABCs) by including fully qualified name in v2 #1535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
2 changes: 1 addition & 1 deletion aws_lambda_powertools/utilities/idempotency/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def __init__(
self.fn_kwargs = function_kwargs
self.config = config

persistence_store.configure(config, self.function.__name__)
persistence_store.configure(config, f"{self.function.__module__}.{self.function.__qualname__}")
self.persistence_store = persistence_store

def handle(self) -> Any:
Expand Down
12 changes: 12 additions & 0 deletions docs/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Changes at a glance:

* The API for **event handler's `Response`** has minor changes to support multi value headers and cookies.
* The **legacy SQS batch processor** was removed.
* The **Idempotency key** format changed slightly, invalidating all the existing cached results.

???+ important
Powertools for Python v2 drops suport for Python 3.6, following the Python 3.6 End-Of-Life (EOL) reached on December 23, 2021.
Expand Down Expand Up @@ -142,3 +143,14 @@ You can migrate to the [native batch processing](https://aws.amazon.com/about-aw

return processor.response()
```

## Idempotency key format

The format of the Idempotency key was changed. This is used store the invocation results on a persistent store like DynamoDB.

No changes are necessary in your code, but remember that existing Idempotency records will be ignored when you upgrade, as new executions generate keys with the new format.

Prior to this change, the Idempotency key was generated using only the caller function name (e.g: `lambda_handler#282e83393862a613b612c00283fef4c8`).
After this change, the key is generated using the `module name` + `qualified function name` + `idempotency key` (e.g: `app.classExample.function#app.handler#282e83393862a613b612c00283fef4c8`).

Using qualified names prevents distinct functions with the same name to contend for the same Idempotency key.
2 changes: 1 addition & 1 deletion docs/utilities/idempotency.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ If you're not [changing the default configuration for the DynamoDB persistence l
| TTL attribute name | `expiration` | This can only be configured after your table is created if you're using AWS Console |

???+ tip "Tip: You can share a single state table for all functions"
You can reuse the same DynamoDB table to store idempotency state. We add your `function_name` in addition to the idempotency key as a hash key.
You can reuse the same DynamoDB table to store idempotency state. We add `module_name` and [qualified name for classes and functions](https://peps.python.org/pep-3155/) in addition to the idempotency key as a hash key.

```yaml hl_lines="5-13 21-23" title="AWS Serverless Application Model (SAM) example"
Resources:
Expand Down
Empty file.
19 changes: 19 additions & 0 deletions tests/e2e/idempotency/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import pytest

from tests.e2e.idempotency.infrastructure import IdempotencyDynamoDBStack


@pytest.fixture(autouse=True, scope="module")
def infrastructure(tmp_path_factory, worker_id):
"""Setup and teardown logic for E2E test infrastructure

Yields
------
Dict[str, str]
CloudFormation Outputs from deployed infrastructure
"""
stack = IdempotencyDynamoDBStack()
try:
yield stack.deploy()
finally:
stack.delete()
13 changes: 13 additions & 0 deletions tests/e2e/idempotency/handlers/parallel_execution_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import time

from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, idempotent

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")


@idempotent(persistence_store=persistence_layer)
def lambda_handler(event, context):

time.sleep(10)

return event
14 changes: 14 additions & 0 deletions tests/e2e/idempotency/handlers/ttl_cache_expiration_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import time

from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(expires_after_seconds=20)


@idempotent(config=config, persistence_store=persistence_layer)
def lambda_handler(event, context):

time_now = time.time()

return {"time": str(time_now)}
15 changes: 15 additions & 0 deletions tests/e2e/idempotency/handlers/ttl_cache_timeout_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import time

from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(expires_after_seconds=1)


@idempotent(config=config, persistence_store=persistence_layer)
def lambda_handler(event, context):

sleep_time: int = event.get("sleep") or 0
time.sleep(sleep_time)

return event
29 changes: 29 additions & 0 deletions tests/e2e/idempotency/infrastructure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import Any

from aws_cdk import CfnOutput, RemovalPolicy
from aws_cdk import aws_dynamodb as dynamodb

from tests.e2e.utils.infrastructure import BaseInfrastructure


class IdempotencyDynamoDBStack(BaseInfrastructure):
def create_resources(self):
functions = self.create_lambda_functions()
self._create_dynamodb_table(function=functions)

def _create_dynamodb_table(self, function: Any):
table = dynamodb.Table(
self.stack,
"Idempotency",
table_name="IdempotencyTable",
removal_policy=RemovalPolicy.DESTROY,
partition_key=dynamodb.Attribute(name="id", type=dynamodb.AttributeType.STRING),
time_to_live_attribute="expiration",
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
)

table.grant_read_write_data(function["TtlCacheExpirationHandler"])
table.grant_read_write_data(function["TtlCacheTimeoutHandler"])
table.grant_read_write_data(function["ParallelExecutionHandler"])

CfnOutput(self.stack, "DynamoDBTable", value=table.table_name)
96 changes: 96 additions & 0 deletions tests/e2e/idempotency/test_idempotency_dynamodb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import json
from time import sleep

import pytest

from tests.e2e.utils import data_fetcher
from tests.e2e.utils.functions import execute_lambdas_in_parallel


@pytest.fixture
def ttl_cache_expiration_handler_fn_arn(infrastructure: dict) -> str:
return infrastructure.get("TtlCacheExpirationHandlerArn", "")


@pytest.fixture
def ttl_cache_timeout_handler_fn_arn(infrastructure: dict) -> str:
return infrastructure.get("TtlCacheTimeoutHandlerArn", "")


@pytest.fixture
def parallel_execution_handler_fn_arn(infrastructure: dict) -> str:
return infrastructure.get("ParallelExecutionHandlerArn", "")


@pytest.fixture
def idempotency_table_name(infrastructure: dict) -> str:
return infrastructure.get("DynamoDBTable", "")


def test_ttl_caching_expiration_idempotency(ttl_cache_expiration_handler_fn_arn: str):
# GIVEN
payload = json.dumps({"message": "Lambda Powertools - TTL 20s"})

# WHEN
# first execution
first_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_expiration_handler_fn_arn, payload=payload
)
first_execution_response = first_execution["Payload"].read().decode("utf-8")

# the second execution should return the same response as the first execution
second_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_expiration_handler_fn_arn, payload=payload
)
second_execution_response = second_execution["Payload"].read().decode("utf-8")

# wait 20s to expire ttl and execute again, this should return a new response value
sleep(20)
third_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_expiration_handler_fn_arn, payload=payload
)
third_execution_response = third_execution["Payload"].read().decode("utf-8")

# THEN
assert first_execution_response == second_execution_response
assert third_execution_response != second_execution_response


def test_ttl_caching_timeout_idempotency(ttl_cache_timeout_handler_fn_arn: str):
# GIVEN
payload_timeout_execution = json.dumps({"sleep": 10, "message": "Lambda Powertools - TTL 1s"})
payload_working_execution = json.dumps({"sleep": 0, "message": "Lambda Powertools - TTL 1s"})

# WHEN
# first call should fail due to timeout
execution_with_timeout, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_timeout_handler_fn_arn, payload=payload_timeout_execution
)
execution_with_timeout_response = execution_with_timeout["Payload"].read().decode("utf-8")

# the second call should work and return the payload
execution_working, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_timeout_handler_fn_arn, payload=payload_working_execution
)
execution_working_response = execution_working["Payload"].read().decode("utf-8")

# THEN
assert "Task timed out after" in execution_with_timeout_response
assert payload_working_execution == execution_working_response


def test_parallel_execution_idempotency(parallel_execution_handler_fn_arn: str):
# GIVEN
arguments = json.dumps({"message": "Lambda Powertools - Parallel execution"})

# WHEN
# executing Lambdas in parallel
lambdas_arn = [parallel_execution_handler_fn_arn, parallel_execution_handler_fn_arn]
execution_result_list = execute_lambdas_in_parallel("data_fetcher.get_lambda_response", lambdas_arn, arguments)

timeout_execution_response = execution_result_list[0][0]["Payload"].read().decode("utf-8")
error_idempotency_execution_response = execution_result_list[1][0]["Payload"].read().decode("utf-8")

# THEN
assert "Execution already in progress with idempotency key" in error_idempotency_execution_response
assert "Task timed out after" in timeout_execution_response
1 change: 1 addition & 0 deletions tests/e2e/utils/data_fetcher/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from tests.e2e.utils.data_fetcher.common import get_http_response, get_lambda_response
from tests.e2e.utils.data_fetcher.idempotency import get_ddb_idempotency_record
from tests.e2e.utils.data_fetcher.logs import get_logs
from tests.e2e.utils.data_fetcher.metrics import get_metrics
from tests.e2e.utils.data_fetcher.traces import get_traces
39 changes: 39 additions & 0 deletions tests/e2e/utils/data_fetcher/idempotency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import boto3
from retry import retry


@retry(ValueError, delay=2, jitter=1.5, tries=10)
def get_ddb_idempotency_record(
function_name: str,
table_name: str,
) -> int:
"""_summary_

Parameters
----------
function_name : str
Name of Lambda function to fetch dynamodb record
table_name : str
Name of DynamoDB table

Returns
-------
int
Count of records found

Raises
------
ValueError
When no record is found within retry window
"""
ddb_client = boto3.resource("dynamodb")
table = ddb_client.Table(table_name)
ret = table.scan(
FilterExpression="contains (id, :functionName)",
ExpressionAttributeValues={":functionName": f"{function_name}#"},
)

if not ret["Items"]:
raise ValueError("Empty response from DynamoDB Repeating...")

return ret["Count"]
14 changes: 14 additions & 0 deletions tests/e2e/utils/functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from concurrent.futures import ThreadPoolExecutor

from tests.e2e.utils import data_fetcher # noqa F401


def execute_lambdas_in_parallel(function_name: str, lambdas_arn: list, arguments: str):
result_list = []
with ThreadPoolExecutor() as executor:
running_tasks = executor.map(lambda exec: eval(function_name)(*exec), [(arn, arguments) for arn in lambdas_arn])
executor.shutdown(wait=True)
for running_task in running_tasks:
result_list.append(running_task)

return result_list
14 changes: 10 additions & 4 deletions tests/functional/idempotency/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,24 @@ def expected_params_put_item_with_validation(hashed_idempotency_key, hashed_vali


@pytest.fixture
def hashed_idempotency_key(lambda_apigw_event, default_jmespath, lambda_context):
def hashed_idempotency_key(request, lambda_apigw_event, default_jmespath, lambda_context):
compiled_jmespath = jmespath.compile(default_jmespath)
data = compiled_jmespath.search(lambda_apigw_event)
return "test-func.lambda_handler#" + hash_idempotency_key(data)
return (
f"test-func.{request.function.__module__}.{request.function.__qualname__}.<locals>.lambda_handler#"
+ hash_idempotency_key(data)
)


@pytest.fixture
def hashed_idempotency_key_with_envelope(lambda_apigw_event):
def hashed_idempotency_key_with_envelope(request, lambda_apigw_event):
event = extract_data_from_envelope(
data=lambda_apigw_event, envelope=envelopes.API_GATEWAY_HTTP, jmespath_options={}
)
return "test-func.lambda_handler#" + hash_idempotency_key(event)
return (
f"test-func.{request.function.__module__}.{request.function.__qualname__}.<locals>.lambda_handler#"
+ hash_idempotency_key(event)
)


@pytest.fixture
Expand Down
Loading