-
Notifications
You must be signed in to change notification settings - Fork 7
Description
Expected Behavior
Under high concurrency, context.map + context.wait should complete normally. Checkpoint batching should not create duplicate START operations, and the background checkpoint thread should continue processing batches.
Actual Behavior
Under high concurrency, the SDK sometimes issues a duplicate START for a WAIT operation and the control plane returns InvalidParameterValueException: Cannot start a WAIT operation that already exists. The checkpoint batching thread exits after the exception, map items remain in STARTED, and the invocation times out.
Steps to Reproduce
Use the context.map operation in conjunction with another operation that creates a checkpoint. This example code will cause the issue (you may have to run it more than once since it's a race condition). Assigning 128MB only to the lambda seems to work well for reproduction:
import json
import time
import boto3
from aws_durable_execution_sdk_python import DurableContext, durable_execution
from aws_durable_execution_sdk_python.config import Duration, MapConfig
lambda_client = boto3.client("lambda")
def _wait_item(child_context: DurableContext, item: dict, index: int, _items):
wait_seconds = int(item["wait_seconds"])
child_context.wait(duration=Duration.from_seconds(wait_seconds), name=f"wait-{index}")
return {"index": index, "wait_seconds": wait_seconds}
@durable_execution(boto3_client=lambda_client) # type: ignore[arg-type]
def handler(event: dict, context: DurableContext):
print(json.dumps({"durable_execution_arn": context.state.durable_execution_arn}))
count = int(event.get("count", 200))
concurrency = int(event.get("concurrency", count))
wait_seconds = int(event.get("wait_seconds", 1))
items = [{"wait_seconds": wait_seconds} for _ in range(count)]
start = time.time()
result = context.map(
items,
_wait_item,
name="wait-map",
config=MapConfig(max_concurrency=concurrency),
)
elapsed_ms = round((time.time() - start) * 1000, 2)
summary = {
"count": count,
"concurrency": concurrency,
"wait_seconds": wait_seconds,
"elapsed_ms": elapsed_ms,
"map_status": result.status.value,
"success_count": result.success_count,
"failure_count": result.failure_count,
"completion_reason": result.completion_reason.value,
}
print(json.dumps({"repro_summary": summary}))
return summaryYou can also:
- Clone repro repo: https://github.com/alessandrobologna/lambda-durable-checkpoint-race-repro
sam buildsam deploy --guided --stack-name lambda-durable-checkpoint-race- Invoke with the provided event:
sam remote invoke --stack-name lambda-durable-checkpoint-race --event-file events/repro.json ReproFunction
- Check CloudWatch logs for duplicate START error and “Checkpoint batch processing failed.”
- Observe the sam remote invoke timeout:
Invoking Lambda Function ReproFunction
Error: Read timeout on endpoint URL: "https://lambda.us-east-1.amazonaws.com/2015-03-31/functions/lambda-durable-checkpoint-race-repro/invocations?Qualifier=%24LATEST"
SDK Version
1.1.0
Python Version
3.14
Is this a regression?
No
Last Working Version
No response
Additional Context
No response