Skip to content

Commit

Permalink
feat(idempotency): handle lambda timeout scenarios for INPROGRESS rec…
Browse files Browse the repository at this point in the history
…ords (#1387)

Co-authored-by: heitorlessa <lessa@amazon.co.uk>
  • Loading branch information
rubenfonseca and heitorlessa authored Jul 29, 2022
1 parent 02e8b60 commit 160feae
Show file tree
Hide file tree
Showing 12 changed files with 561 additions and 121 deletions.
32 changes: 31 additions & 1 deletion aws_lambda_powertools/utilities/idempotency/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import logging
from copy import deepcopy
from typing import Any, Callable, Dict, Optional, Tuple
Expand Down Expand Up @@ -73,6 +74,7 @@ def __init__(
self.data = deepcopy(_prepare_data(function_payload))
self.fn_args = function_args
self.fn_kwargs = function_kwargs
self.config = config

persistence_store.configure(config, self.function.__name__)
self.persistence_store = persistence_store
Expand Down Expand Up @@ -101,7 +103,9 @@ def _process_idempotency(self):
try:
# We call save_inprogress first as an optimization for the most common case where no idempotent record
# already exists. If it succeeds, there's no need to call get_record.
self.persistence_store.save_inprogress(data=self.data)
self.persistence_store.save_inprogress(
data=self.data, remaining_time_in_millis=self._get_remaining_time_in_millis()
)
except IdempotencyKeyError:
raise
except IdempotencyItemAlreadyExistsError:
Expand All @@ -113,6 +117,25 @@ def _process_idempotency(self):

return self._get_function_response()

def _get_remaining_time_in_millis(self) -> Optional[int]:
"""
Tries to determine the remaining time available for the current lambda invocation.
This only works if the idempotent handler decorator is used, since we need to access the lambda context.
However, this could be improved if we start storing the lambda context globally during the invocation. One
way to do this is to register the lambda context when configuring the IdempotencyConfig object.
Returns
-------
Optional[int]
Remaining time in millis, or None if the remaining time cannot be determined.
"""

if self.config.lambda_context is not None:
return self.config.lambda_context.get_remaining_time_in_millis()

return None

def _get_idempotency_record(self) -> DataRecord:
"""
Retrieve the idempotency record from the persistence layer.
Expand Down Expand Up @@ -167,6 +190,13 @@ def _handle_for_status(self, data_record: DataRecord) -> Optional[Dict[Any, Any]
raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")

if data_record.status == STATUS_CONSTANTS["INPROGRESS"]:
if data_record.in_progress_expiry_timestamp is not None and data_record.in_progress_expiry_timestamp < int(
datetime.datetime.now().timestamp() * 1000
):
raise IdempotencyInconsistentStateError(
"item should have been expired in-progress because it already time-outed."
)

raise IdempotencyAlreadyInProgressError(
f"Execution already in progress with idempotency key: "
f"{self.persistence_store.event_key_jmespath}={data_record.idempotency_key}"
Expand Down
10 changes: 10 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Dict, Optional

from aws_lambda_powertools.utilities.typing import LambdaContext


class IdempotencyConfig:
def __init__(
Expand All @@ -12,6 +14,7 @@ def __init__(
use_local_cache: bool = False,
local_cache_max_items: int = 256,
hash_function: str = "md5",
lambda_context: Optional[LambdaContext] = None,
):
"""
Initialize the base persistence layer
Expand All @@ -32,6 +35,8 @@ def __init__(
Max number of items to store in local cache, by default 1024
hash_function: str, optional
Function to use for calculating hashes, by default md5.
lambda_context: LambdaContext, optional
Lambda Context containing information about the invocation, function and execution environment.
"""
self.event_key_jmespath = event_key_jmespath
self.payload_validation_jmespath = payload_validation_jmespath
Expand All @@ -41,3 +46,8 @@ def __init__(
self.use_local_cache = use_local_cache
self.local_cache_max_items = local_cache_max_items
self.hash_function = hash_function
self.lambda_context: Optional[LambdaContext] = lambda_context

def register_lambda_context(self, lambda_context: LambdaContext):
"""Captures the Lambda context, to calculate the remaining time before the invocation times out"""
self.lambda_context = lambda_context
2 changes: 2 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/idempotency.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ def idempotent(
return handler(event, context)

config = config or IdempotencyConfig()
config.register_lambda_context(context)

args = event, context
idempotency_handler = IdempotencyHandler(
function=handler,
Expand Down
20 changes: 19 additions & 1 deletion aws_lambda_powertools/utilities/idempotency/persistence/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(
idempotency_key,
status: str = "",
expiry_timestamp: Optional[int] = None,
in_progress_expiry_timestamp: Optional[int] = None,
response_data: Optional[str] = "",
payload_hash: Optional[str] = None,
) -> None:
Expand All @@ -53,6 +54,8 @@ def __init__(
status of the idempotent record
expiry_timestamp: int, optional
time before the record should expire, in seconds
in_progress_expiry_timestamp: int, optional
time before the record should expire while in the INPROGRESS state, in seconds
payload_hash: str, optional
hashed representation of payload
response_data: str, optional
Expand All @@ -61,6 +64,7 @@ def __init__(
self.idempotency_key = idempotency_key
self.payload_hash = payload_hash
self.expiry_timestamp = expiry_timestamp
self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
self._status = status
self.response_data = response_data

Expand Down Expand Up @@ -328,14 +332,16 @@ def save_success(self, data: Dict[str, Any], result: dict) -> None:

self._save_to_cache(data_record=data_record)

def save_inprogress(self, data: Dict[str, Any]) -> None:
def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) -> None:
"""
Save record of function's execution being in progress
Parameters
----------
data: Dict[str, Any]
Payload
remaining_time_in_millis: Optional[int]
If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis
"""
data_record = DataRecord(
idempotency_key=self._get_hashed_idempotency_key(data=data),
Expand All @@ -344,6 +350,18 @@ def save_inprogress(self, data: Dict[str, Any]) -> None:
payload_hash=self._get_hashed_payload(data=data),
)

if remaining_time_in_millis:
now = datetime.datetime.now()
period = datetime.timedelta(milliseconds=remaining_time_in_millis)
timestamp = (now + period).timestamp()

data_record.in_progress_expiry_timestamp = int(timestamp * 1000)
else:
warnings.warn(
"Couldn't determine the remaining time left. "
"Did you call register_lambda_context on IdempotencyConfig?"
)

logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")

if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
IdempotencyItemAlreadyExistsError,
IdempotencyItemNotFoundError,
)
from aws_lambda_powertools.utilities.idempotency.persistence.base import DataRecord
from aws_lambda_powertools.utilities.idempotency.persistence.base import STATUS_CONSTANTS, DataRecord

logger = logging.getLogger(__name__)

Expand All @@ -25,6 +25,7 @@ def __init__(
static_pk_value: Optional[str] = None,
sort_key_attr: Optional[str] = None,
expiry_attr: str = "expiration",
in_progress_expiry_attr: str = "in_progress_expiration",
status_attr: str = "status",
data_attr: str = "data",
validation_key_attr: str = "validation",
Expand All @@ -47,6 +48,8 @@ def __init__(
DynamoDB attribute name for the sort key
expiry_attr: str, optional
DynamoDB attribute name for expiry timestamp, by default "expiration"
in_progress_expiry_attr: str, optional
DynamoDB attribute name for in-progress expiry timestamp, by default "in_progress_expiration"
status_attr: str, optional
DynamoDB attribute name for status, by default "status"
data_attr: str, optional
Expand Down Expand Up @@ -85,6 +88,7 @@ def __init__(
self.static_pk_value = static_pk_value
self.sort_key_attr = sort_key_attr
self.expiry_attr = expiry_attr
self.in_progress_expiry_attr = in_progress_expiry_attr
self.status_attr = status_attr
self.data_attr = data_attr
self.validation_key_attr = validation_key_attr
Expand Down Expand Up @@ -133,6 +137,7 @@ def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord:
idempotency_key=item[self.key_attr],
status=item[self.status_attr],
expiry_timestamp=item[self.expiry_attr],
in_progress_expiry_timestamp=item.get(self.in_progress_expiry_attr),
response_data=item.get(self.data_attr),
payload_hash=item.get(self.validation_key_attr),
)
Expand All @@ -153,33 +158,75 @@ def _put_record(self, data_record: DataRecord) -> None:
self.status_attr: data_record.status,
}

if data_record.in_progress_expiry_timestamp is not None:
item[self.in_progress_expiry_attr] = data_record.in_progress_expiry_timestamp

if self.payload_validation_enabled:
item[self.validation_key_attr] = data_record.payload_hash

now = datetime.datetime.now()
try:
logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}")

# | LOCKED | RETRY if status = "INPROGRESS" | RETRY
# |----------------|-------------------------------------------------------|-------------> .... (time)
# | Lambda Idempotency Record
# | Timeout Timeout
# | (in_progress_expiry) (expiry)

# Conditions to successfully save a record:

# The idempotency key does not exist:
# - first time that this invocation key is used
# - previous invocation with the same key was deleted due to TTL
idempotency_key_not_exist = "attribute_not_exists(#id)"

# The idempotency record exists but it's expired:
idempotency_expiry_expired = "#expiry < :now"

# The status of the record is "INPROGRESS", there is an in-progress expiry timestamp, but it's expired
inprogress_expiry_expired = " AND ".join(
[
"#status = :inprogress",
"attribute_exists(#in_progress_expiry)",
"#in_progress_expiry < :now_in_millis",
]
)

condition_expression = (
f"{idempotency_key_not_exist} OR {idempotency_expiry_expired} OR ({inprogress_expiry_expired})"
)

self.table.put_item(
Item=item,
ConditionExpression="attribute_not_exists(#id) OR #now < :now",
ExpressionAttributeNames={"#id": self.key_attr, "#now": self.expiry_attr},
ExpressionAttributeValues={":now": int(now.timestamp())},
ConditionExpression=condition_expression,
ExpressionAttributeNames={
"#id": self.key_attr,
"#expiry": self.expiry_attr,
"#in_progress_expiry": self.in_progress_expiry_attr,
"#status": self.status_attr,
},
ExpressionAttributeValues={
":now": int(now.timestamp()),
":now_in_millis": int(now.timestamp() * 1000),
":inprogress": STATUS_CONSTANTS["INPROGRESS"],
},
)
except self.table.meta.client.exceptions.ConditionalCheckFailedException:
logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}")
raise IdempotencyItemAlreadyExistsError

def _update_record(self, data_record: DataRecord):
logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}")
update_expression = "SET #response_data = :response_data, #expiry = :expiry, #status = :status"
update_expression = "SET #response_data = :response_data, #expiry = :expiry, " "#status = :status"
expression_attr_values = {
":expiry": data_record.expiry_timestamp,
":response_data": data_record.response_data,
":status": data_record.status,
}
expression_attr_names = {
"#response_data": self.data_attr,
"#expiry": self.expiry_attr,
"#response_data": self.data_attr,
"#status": self.status_attr,
}

Expand Down
Binary file removed docs/media/idempotent_sequence.png
Binary file not shown.
Binary file removed docs/media/idempotent_sequence_exception.png
Binary file not shown.
Loading

0 comments on commit 160feae

Please sign in to comment.