diff --git a/src/aws_durable_execution_sdk_python/state.py b/src/aws_durable_execution_sdk_python/state.py index a6fc0c7..8553a7d 100644 --- a/src/aws_durable_execution_sdk_python/state.py +++ b/src/aws_durable_execution_sdk_python/state.py @@ -613,20 +613,20 @@ def checkpoint_batches_forever(self) -> None: logger.debug("Checkpoint batch processed successfully") - # Signal completion for any synchronous operations - for queued_op in batch: - if queued_op.completion_event is not None: - queued_op.completion_event.set() - # Update local token for next iteration current_checkpoint_token = output.checkpoint_token - # Fetch new operations from the API + # Fetch new operations from the API before unblocking sync waiters self.fetch_paginated_operations( output.new_execution_state.operations, output.checkpoint_token, output.new_execution_state.next_marker, ) + + # Signal completion for any synchronous operations + for queued_op in batch: + if queued_op.completion_event is not None: + queued_op.completion_event.set() except Exception as e: # Checkpoint failed - wake all blocked threads so they can raise error # Drain both queues and signal all completion events