diff --git a/aws_lambda_powertools/utilities/idempotency/base.py b/aws_lambda_powertools/utilities/idempotency/base.py index ddd054daa14..9281c77109a 100644 --- a/aws_lambda_powertools/utilities/idempotency/base.py +++ b/aws_lambda_powertools/utilities/idempotency/base.py @@ -76,7 +76,7 @@ def __init__( self.fn_kwargs = function_kwargs self.config = config - persistence_store.configure(config, self.function.__name__) + persistence_store.configure(config, f"{self.function.__module__}.{self.function.__qualname__}") self.persistence_store = persistence_store def handle(self) -> Any: diff --git a/docs/upgrade.md b/docs/upgrade.md index 3d1257f1c12..37e9a318522 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -12,6 +12,7 @@ Changes at a glance: * The API for **event handler's `Response`** has minor changes to support multi value headers and cookies. * The **legacy SQS batch processor** was removed. +* The **Idempotency key** format changed slightly, invalidating all the existing cached results. ???+ important Powertools for Python v2 drops suport for Python 3.6, following the Python 3.6 End-Of-Life (EOL) reached on December 23, 2021. @@ -142,3 +143,14 @@ You can migrate to the [native batch processing](https://aws.amazon.com/about-aw return processor.response() ``` + +## Idempotency key format + +The format of the Idempotency key was changed. This is used store the invocation results on a persistent store like DynamoDB. + +No changes are necessary in your code, but remember that existing Idempotency records will be ignored when you upgrade, as new executions generate keys with the new format. + +Prior to this change, the Idempotency key was generated using only the caller function name (e.g: `lambda_handler#282e83393862a613b612c00283fef4c8`). +After this change, the key is generated using the `module name` + `qualified function name` + `idempotency key` (e.g: `app.classExample.function#app.handler#282e83393862a613b612c00283fef4c8`). + +Using qualified names prevents distinct functions with the same name to contend for the same Idempotency key. diff --git a/docs/utilities/idempotency.md b/docs/utilities/idempotency.md index 7ba61fd3062..f02cd8700b8 100644 --- a/docs/utilities/idempotency.md +++ b/docs/utilities/idempotency.md @@ -42,7 +42,7 @@ If you're not [changing the default configuration for the DynamoDB persistence l | TTL attribute name | `expiration` | This can only be configured after your table is created if you're using AWS Console | ???+ tip "Tip: You can share a single state table for all functions" - You can reuse the same DynamoDB table to store idempotency state. We add your `function_name` in addition to the idempotency key as a hash key. + You can reuse the same DynamoDB table to store idempotency state. We add `module_name` and [qualified name for classes and functions](https://peps.python.org/pep-3155/) in addition to the idempotency key as a hash key. ```yaml hl_lines="5-13 21-23" title="AWS Serverless Application Model (SAM) example" Resources: diff --git a/tests/e2e/idempotency/__init__.py b/tests/e2e/idempotency/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/e2e/idempotency/conftest.py b/tests/e2e/idempotency/conftest.py new file mode 100644 index 00000000000..24a7c71c1f2 --- /dev/null +++ b/tests/e2e/idempotency/conftest.py @@ -0,0 +1,19 @@ +import pytest + +from tests.e2e.idempotency.infrastructure import IdempotencyDynamoDBStack + + +@pytest.fixture(autouse=True, scope="module") +def infrastructure(tmp_path_factory, worker_id): + """Setup and teardown logic for E2E test infrastructure + + Yields + ------ + Dict[str, str] + CloudFormation Outputs from deployed infrastructure + """ + stack = IdempotencyDynamoDBStack() + try: + yield stack.deploy() + finally: + stack.delete() diff --git a/tests/e2e/idempotency/handlers/parallel_execution_handler.py b/tests/e2e/idempotency/handlers/parallel_execution_handler.py new file mode 100644 index 00000000000..401097d4194 --- /dev/null +++ b/tests/e2e/idempotency/handlers/parallel_execution_handler.py @@ -0,0 +1,13 @@ +import time + +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, idempotent + +persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") + + +@idempotent(persistence_store=persistence_layer) +def lambda_handler(event, context): + + time.sleep(10) + + return event diff --git a/tests/e2e/idempotency/handlers/ttl_cache_expiration_handler.py b/tests/e2e/idempotency/handlers/ttl_cache_expiration_handler.py new file mode 100644 index 00000000000..eabf11e7852 --- /dev/null +++ b/tests/e2e/idempotency/handlers/ttl_cache_expiration_handler.py @@ -0,0 +1,14 @@ +import time + +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent + +persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") +config = IdempotencyConfig(expires_after_seconds=20) + + +@idempotent(config=config, persistence_store=persistence_layer) +def lambda_handler(event, context): + + time_now = time.time() + + return {"time": str(time_now)} diff --git a/tests/e2e/idempotency/handlers/ttl_cache_timeout_handler.py b/tests/e2e/idempotency/handlers/ttl_cache_timeout_handler.py new file mode 100644 index 00000000000..4de97a4afe4 --- /dev/null +++ b/tests/e2e/idempotency/handlers/ttl_cache_timeout_handler.py @@ -0,0 +1,15 @@ +import time + +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent + +persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") +config = IdempotencyConfig(expires_after_seconds=1) + + +@idempotent(config=config, persistence_store=persistence_layer) +def lambda_handler(event, context): + + sleep_time: int = event.get("sleep") or 0 + time.sleep(sleep_time) + + return event diff --git a/tests/e2e/idempotency/infrastructure.py b/tests/e2e/idempotency/infrastructure.py new file mode 100644 index 00000000000..997cadc4943 --- /dev/null +++ b/tests/e2e/idempotency/infrastructure.py @@ -0,0 +1,29 @@ +from typing import Any + +from aws_cdk import CfnOutput, RemovalPolicy +from aws_cdk import aws_dynamodb as dynamodb + +from tests.e2e.utils.infrastructure import BaseInfrastructure + + +class IdempotencyDynamoDBStack(BaseInfrastructure): + def create_resources(self): + functions = self.create_lambda_functions() + self._create_dynamodb_table(function=functions) + + def _create_dynamodb_table(self, function: Any): + table = dynamodb.Table( + self.stack, + "Idempotency", + table_name="IdempotencyTable", + removal_policy=RemovalPolicy.DESTROY, + partition_key=dynamodb.Attribute(name="id", type=dynamodb.AttributeType.STRING), + time_to_live_attribute="expiration", + billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST, + ) + + table.grant_read_write_data(function["TtlCacheExpirationHandler"]) + table.grant_read_write_data(function["TtlCacheTimeoutHandler"]) + table.grant_read_write_data(function["ParallelExecutionHandler"]) + + CfnOutput(self.stack, "DynamoDBTable", value=table.table_name) diff --git a/tests/e2e/idempotency/test_idempotency_dynamodb.py b/tests/e2e/idempotency/test_idempotency_dynamodb.py new file mode 100644 index 00000000000..19369b141db --- /dev/null +++ b/tests/e2e/idempotency/test_idempotency_dynamodb.py @@ -0,0 +1,96 @@ +import json +from time import sleep + +import pytest + +from tests.e2e.utils import data_fetcher +from tests.e2e.utils.functions import execute_lambdas_in_parallel + + +@pytest.fixture +def ttl_cache_expiration_handler_fn_arn(infrastructure: dict) -> str: + return infrastructure.get("TtlCacheExpirationHandlerArn", "") + + +@pytest.fixture +def ttl_cache_timeout_handler_fn_arn(infrastructure: dict) -> str: + return infrastructure.get("TtlCacheTimeoutHandlerArn", "") + + +@pytest.fixture +def parallel_execution_handler_fn_arn(infrastructure: dict) -> str: + return infrastructure.get("ParallelExecutionHandlerArn", "") + + +@pytest.fixture +def idempotency_table_name(infrastructure: dict) -> str: + return infrastructure.get("DynamoDBTable", "") + + +def test_ttl_caching_expiration_idempotency(ttl_cache_expiration_handler_fn_arn: str): + # GIVEN + payload = json.dumps({"message": "Lambda Powertools - TTL 20s"}) + + # WHEN + # first execution + first_execution, _ = data_fetcher.get_lambda_response( + lambda_arn=ttl_cache_expiration_handler_fn_arn, payload=payload + ) + first_execution_response = first_execution["Payload"].read().decode("utf-8") + + # the second execution should return the same response as the first execution + second_execution, _ = data_fetcher.get_lambda_response( + lambda_arn=ttl_cache_expiration_handler_fn_arn, payload=payload + ) + second_execution_response = second_execution["Payload"].read().decode("utf-8") + + # wait 20s to expire ttl and execute again, this should return a new response value + sleep(20) + third_execution, _ = data_fetcher.get_lambda_response( + lambda_arn=ttl_cache_expiration_handler_fn_arn, payload=payload + ) + third_execution_response = third_execution["Payload"].read().decode("utf-8") + + # THEN + assert first_execution_response == second_execution_response + assert third_execution_response != second_execution_response + + +def test_ttl_caching_timeout_idempotency(ttl_cache_timeout_handler_fn_arn: str): + # GIVEN + payload_timeout_execution = json.dumps({"sleep": 10, "message": "Lambda Powertools - TTL 1s"}) + payload_working_execution = json.dumps({"sleep": 0, "message": "Lambda Powertools - TTL 1s"}) + + # WHEN + # first call should fail due to timeout + execution_with_timeout, _ = data_fetcher.get_lambda_response( + lambda_arn=ttl_cache_timeout_handler_fn_arn, payload=payload_timeout_execution + ) + execution_with_timeout_response = execution_with_timeout["Payload"].read().decode("utf-8") + + # the second call should work and return the payload + execution_working, _ = data_fetcher.get_lambda_response( + lambda_arn=ttl_cache_timeout_handler_fn_arn, payload=payload_working_execution + ) + execution_working_response = execution_working["Payload"].read().decode("utf-8") + + # THEN + assert "Task timed out after" in execution_with_timeout_response + assert payload_working_execution == execution_working_response + + +def test_parallel_execution_idempotency(parallel_execution_handler_fn_arn: str): + # GIVEN + arguments = json.dumps({"message": "Lambda Powertools - Parallel execution"}) + + # WHEN + # executing Lambdas in parallel + lambdas_arn = [parallel_execution_handler_fn_arn, parallel_execution_handler_fn_arn] + execution_result_list = execute_lambdas_in_parallel("data_fetcher.get_lambda_response", lambdas_arn, arguments) + + timeout_execution_response = execution_result_list[0][0]["Payload"].read().decode("utf-8") + error_idempotency_execution_response = execution_result_list[1][0]["Payload"].read().decode("utf-8") + + # THEN + assert "Execution already in progress with idempotency key" in error_idempotency_execution_response + assert "Task timed out after" in timeout_execution_response diff --git a/tests/e2e/utils/data_fetcher/__init__.py b/tests/e2e/utils/data_fetcher/__init__.py index be6909537e5..fdd1de5c515 100644 --- a/tests/e2e/utils/data_fetcher/__init__.py +++ b/tests/e2e/utils/data_fetcher/__init__.py @@ -1,4 +1,5 @@ from tests.e2e.utils.data_fetcher.common import get_http_response, get_lambda_response +from tests.e2e.utils.data_fetcher.idempotency import get_ddb_idempotency_record from tests.e2e.utils.data_fetcher.logs import get_logs from tests.e2e.utils.data_fetcher.metrics import get_metrics from tests.e2e.utils.data_fetcher.traces import get_traces diff --git a/tests/e2e/utils/data_fetcher/idempotency.py b/tests/e2e/utils/data_fetcher/idempotency.py new file mode 100644 index 00000000000..109e6735d3b --- /dev/null +++ b/tests/e2e/utils/data_fetcher/idempotency.py @@ -0,0 +1,39 @@ +import boto3 +from retry import retry + + +@retry(ValueError, delay=2, jitter=1.5, tries=10) +def get_ddb_idempotency_record( + function_name: str, + table_name: str, +) -> int: + """_summary_ + + Parameters + ---------- + function_name : str + Name of Lambda function to fetch dynamodb record + table_name : str + Name of DynamoDB table + + Returns + ------- + int + Count of records found + + Raises + ------ + ValueError + When no record is found within retry window + """ + ddb_client = boto3.resource("dynamodb") + table = ddb_client.Table(table_name) + ret = table.scan( + FilterExpression="contains (id, :functionName)", + ExpressionAttributeValues={":functionName": f"{function_name}#"}, + ) + + if not ret["Items"]: + raise ValueError("Empty response from DynamoDB Repeating...") + + return ret["Count"] diff --git a/tests/e2e/utils/functions.py b/tests/e2e/utils/functions.py new file mode 100644 index 00000000000..7b64c439298 --- /dev/null +++ b/tests/e2e/utils/functions.py @@ -0,0 +1,14 @@ +from concurrent.futures import ThreadPoolExecutor + +from tests.e2e.utils import data_fetcher # noqa F401 + + +def execute_lambdas_in_parallel(function_name: str, lambdas_arn: list, arguments: str): + result_list = [] + with ThreadPoolExecutor() as executor: + running_tasks = executor.map(lambda exec: eval(function_name)(*exec), [(arn, arguments) for arn in lambdas_arn]) + executor.shutdown(wait=True) + for running_task in running_tasks: + result_list.append(running_task) + + return result_list diff --git a/tests/functional/idempotency/conftest.py b/tests/functional/idempotency/conftest.py index b5cf79727b1..657a4b6bd13 100644 --- a/tests/functional/idempotency/conftest.py +++ b/tests/functional/idempotency/conftest.py @@ -172,18 +172,24 @@ def expected_params_put_item_with_validation(hashed_idempotency_key, hashed_vali @pytest.fixture -def hashed_idempotency_key(lambda_apigw_event, default_jmespath, lambda_context): +def hashed_idempotency_key(request, lambda_apigw_event, default_jmespath, lambda_context): compiled_jmespath = jmespath.compile(default_jmespath) data = compiled_jmespath.search(lambda_apigw_event) - return "test-func.lambda_handler#" + hash_idempotency_key(data) + return ( + f"test-func.{request.function.__module__}.{request.function.__qualname__}..lambda_handler#" + + hash_idempotency_key(data) + ) @pytest.fixture -def hashed_idempotency_key_with_envelope(lambda_apigw_event): +def hashed_idempotency_key_with_envelope(request, lambda_apigw_event): event = extract_data_from_envelope( data=lambda_apigw_event, envelope=envelopes.API_GATEWAY_HTTP, jmespath_options={} ) - return "test-func.lambda_handler#" + hash_idempotency_key(event) + return ( + f"test-func.{request.function.__module__}.{request.function.__qualname__}..lambda_handler#" + + hash_idempotency_key(event) + ) @pytest.fixture diff --git a/tests/functional/idempotency/test_idempotency.py b/tests/functional/idempotency/test_idempotency.py index 97a9166efa0..f1e32cb5ebc 100644 --- a/tests/functional/idempotency/test_idempotency.py +++ b/tests/functional/idempotency/test_idempotency.py @@ -32,6 +32,7 @@ from tests.functional.utils import json_serialize, load_event TABLE_NAME = "TEST_TABLE" +TESTS_MODULE_PREFIX = "test-func.functional.idempotency.test_idempotency" def get_dataclasses_lib(): @@ -770,7 +771,7 @@ def lambda_handler(event, context): def test_idempotent_lambda_expires_in_progress_unavailable_remaining_time(): mock_event = {"data": "value"} - idempotency_key = "test-func.function#" + hash_idempotency_key(mock_event) + idempotency_key = f"{TESTS_MODULE_PREFIX}.test_idempotent_lambda_expires_in_progress_unavailable_remaining_time..function#{hash_idempotency_key(mock_event)}" # noqa E501 persistence_layer = MockPersistenceLayer(expected_idempotency_key=idempotency_key) expected_result = {"message": "Foo"} @@ -1109,7 +1110,8 @@ def _delete_record(self, data_record: DataRecord) -> None: def test_idempotent_lambda_event_source(lambda_context): # Scenario to validate that we can use the event_source decorator before or after the idempotent decorator mock_event = load_event("apiGatewayProxyV2Event.json") - persistence_layer = MockPersistenceLayer("test-func.lambda_handler#" + hash_idempotency_key(mock_event)) + idempotency_key = f"{TESTS_MODULE_PREFIX}.test_idempotent_lambda_event_source..lambda_handler#{hash_idempotency_key(mock_event)}" # noqa E501 + persistence_layer = MockPersistenceLayer(idempotency_key) expected_result = {"message": "Foo"} # GIVEN an event_source decorator @@ -1129,7 +1131,9 @@ def lambda_handler(event, _): def test_idempotent_function(): # Scenario to validate we can use idempotent_function with any function mock_event = {"data": "value"} - idempotency_key = "test-func.record_handler#" + hash_idempotency_key(mock_event) + idempotency_key = ( + f"{TESTS_MODULE_PREFIX}.test_idempotent_function..record_handler#{hash_idempotency_key(mock_event)}" + ) persistence_layer = MockPersistenceLayer(expected_idempotency_key=idempotency_key) expected_result = {"message": "Foo"} @@ -1147,7 +1151,7 @@ def test_idempotent_function_arbitrary_args_kwargs(): # Scenario to validate we can use idempotent_function with a function # with an arbitrary number of args and kwargs mock_event = {"data": "value"} - idempotency_key = "test-func.record_handler#" + hash_idempotency_key(mock_event) + idempotency_key = f"{TESTS_MODULE_PREFIX}.test_idempotent_function_arbitrary_args_kwargs..record_handler#{hash_idempotency_key(mock_event)}" # noqa E501 persistence_layer = MockPersistenceLayer(expected_idempotency_key=idempotency_key) expected_result = {"message": "Foo"} @@ -1163,7 +1167,7 @@ def record_handler(arg_one, arg_two, record, is_record): def test_idempotent_function_invalid_data_kwarg(): mock_event = {"data": "value"} - idempotency_key = "test-func.record_handler#" + hash_idempotency_key(mock_event) + idempotency_key = f"{TESTS_MODULE_PREFIX}.test_idempotent_function_invalid_data_kwarg..record_handler#{hash_idempotency_key(mock_event)}" # noqa E501 persistence_layer = MockPersistenceLayer(expected_idempotency_key=idempotency_key) expected_result = {"message": "Foo"} keyword_argument = "payload" @@ -1200,7 +1204,7 @@ def record_handler(record): def test_idempotent_function_and_lambda_handler(lambda_context): # Scenario to validate we can use both idempotent_function and idempotent decorators mock_event = {"data": "value"} - idempotency_key = "test-func.record_handler#" + hash_idempotency_key(mock_event) + idempotency_key = f"{TESTS_MODULE_PREFIX}.test_idempotent_function_and_lambda_handler..record_handler#{hash_idempotency_key(mock_event)}" # noqa E501 persistence_layer = MockPersistenceLayer(expected_idempotency_key=idempotency_key) expected_result = {"message": "Foo"} @@ -1208,7 +1212,9 @@ def test_idempotent_function_and_lambda_handler(lambda_context): def record_handler(record): return expected_result - persistence_layer = MockPersistenceLayer("test-func.lambda_handler#" + hash_idempotency_key(mock_event)) + persistence_layer = MockPersistenceLayer( + f"{TESTS_MODULE_PREFIX}.test_idempotent_function_and_lambda_handler..lambda_handler#{hash_idempotency_key(mock_event)}" # noqa E501 + ) @idempotent(persistence_store=persistence_layer) def lambda_handler(event, _): @@ -1229,7 +1235,9 @@ def test_idempotent_data_sorting(): # Scenario to validate same data in different order hashes to the same idempotency key data_one = {"data": "test message 1", "more_data": "more data 1"} data_two = {"more_data": "more data 1", "data": "test message 1"} - idempotency_key = "test-func.dummy#" + hash_idempotency_key(data_one) + idempotency_key = ( + f"{TESTS_MODULE_PREFIX}.test_idempotent_data_sorting..dummy#{hash_idempotency_key(data_one)}" + ) # Assertion will happen in MockPersistenceLayer persistence_layer = MockPersistenceLayer(expected_idempotency_key=idempotency_key) @@ -1337,7 +1345,7 @@ def test_idempotent_function_dataclass_with_jmespath(): dataclasses = get_dataclasses_lib() config = IdempotencyConfig(event_key_jmespath="transaction_id", use_local_cache=True) mock_event = {"customer_id": "fake", "transaction_id": "fake-id"} - idempotency_key = "test-func.collect_payment#" + hash_idempotency_key(mock_event["transaction_id"]) + idempotency_key = f"{TESTS_MODULE_PREFIX}.test_idempotent_function_dataclass_with_jmespath..collect_payment#{hash_idempotency_key(mock_event['transaction_id'])}" # noqa E501 persistence_layer = MockPersistenceLayer(expected_idempotency_key=idempotency_key) @dataclasses.dataclass @@ -1362,7 +1370,7 @@ def test_idempotent_function_pydantic_with_jmespath(): # GIVEN config = IdempotencyConfig(event_key_jmespath="transaction_id", use_local_cache=True) mock_event = {"customer_id": "fake", "transaction_id": "fake-id"} - idempotency_key = "test-func.collect_payment#" + hash_idempotency_key(mock_event["transaction_id"]) + idempotency_key = f"{TESTS_MODULE_PREFIX}.test_idempotent_function_pydantic_with_jmespath..collect_payment#{hash_idempotency_key(mock_event['transaction_id'])}" # noqa E501 persistence_layer = MockPersistenceLayer(expected_idempotency_key=idempotency_key) class Payment(BaseModel): diff --git a/tests/functional/idempotency/utils.py b/tests/functional/idempotency/utils.py index 797b696aba4..f9cdaf05d0a 100644 --- a/tests/functional/idempotency/utils.py +++ b/tests/functional/idempotency/utils.py @@ -14,9 +14,13 @@ def hash_idempotency_key(data: Any): def build_idempotency_put_item_stub( data: Dict, function_name: str = "test-func", + function_qualified_name: str = "test_idempotent_lambda_first_execution_event_mutation.", + module_name: str = "functional.idempotency.test_idempotency", handler_name: str = "lambda_handler", ) -> Dict: - idempotency_key_hash = f"{function_name}.{handler_name}#{hash_idempotency_key(data)}" + idempotency_key_hash = ( + f"{function_name}.{module_name}.{function_qualified_name}.{handler_name}#{hash_idempotency_key(data)}" + ) return { "ConditionExpression": ( "attribute_not_exists(#id) OR #expiry < :now OR " @@ -43,9 +47,13 @@ def build_idempotency_update_item_stub( data: Dict, handler_response: Dict, function_name: str = "test-func", + function_qualified_name: str = "test_idempotent_lambda_first_execution_event_mutation.", + module_name: str = "functional.idempotency.test_idempotency", handler_name: str = "lambda_handler", ) -> Dict: - idempotency_key_hash = f"{function_name}.{handler_name}#{hash_idempotency_key(data)}" + idempotency_key_hash = ( + f"{function_name}.{module_name}.{function_qualified_name}.{handler_name}#{hash_idempotency_key(data)}" + ) serialized_lambda_response = json_serialize(handler_response) return { "ExpressionAttributeNames": {