Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/aws_durable_execution_sdk_python/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ def from_dict(input_dict: MutableMapping[str, Any]) -> InitialExecutionState:
next_marker=input_dict.get("NextMarker", ""),
)

@staticmethod
def from_json_dict(input_dict: MutableMapping[str, Any]) -> InitialExecutionState:
operations = []
if input_operations := input_dict.get("Operations"):
operations = [Operation.from_json_dict(op) for op in input_operations]
return InitialExecutionState(
operations=operations,
next_marker=input_dict.get("NextMarker", ""),
)

def get_execution_operation(self) -> Operation | None:
if not self.operations:
# Due to payload size limitations we may have an empty operations list.
Expand Down Expand Up @@ -91,6 +101,12 @@ def to_dict(self) -> MutableMapping[str, Any]:
"NextMarker": self.next_marker,
}

def to_json_dict(self) -> MutableMapping[str, Any]:
return {
"Operations": [op.to_json_dict() for op in self.operations],
"NextMarker": self.next_marker,
}


@dataclass(frozen=True)
class DurableExecutionInvocationInput:
Expand All @@ -110,13 +126,32 @@ def from_dict(
),
)

@staticmethod
def from_json_dict(
input_dict: MutableMapping[str, Any],
) -> DurableExecutionInvocationInput:
return DurableExecutionInvocationInput(
durable_execution_arn=input_dict["DurableExecutionArn"],
checkpoint_token=input_dict["CheckpointToken"],
initial_execution_state=InitialExecutionState.from_json_dict(
input_dict.get("InitialExecutionState", {})
),
)

def to_dict(self) -> MutableMapping[str, Any]:
return {
"DurableExecutionArn": self.durable_execution_arn,
"CheckpointToken": self.checkpoint_token,
"InitialExecutionState": self.initial_execution_state.to_dict(),
}

def to_json_dict(self) -> MutableMapping[str, Any]:
return {
"DurableExecutionArn": self.durable_execution_arn,
"CheckpointToken": self.checkpoint_token,
"InitialExecutionState": self.initial_execution_state.to_json_dict(),
}


@dataclass(frozen=True)
class DurableExecutionInvocationInputWithClient(DurableExecutionInvocationInput):
Expand Down
100 changes: 97 additions & 3 deletions src/aws_durable_execution_sdk_python/lambda_service.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import copy
import datetime
import logging
from dataclasses import dataclass, field
Expand Down Expand Up @@ -692,6 +693,24 @@ def create_wait_start(
# endregion wait


class TimestampConverter:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep it in lambda_service module to prevent dependency loop

"""Converter for datetime/Unix timestamp conversions."""

@staticmethod
def to_unix_millis(dt: datetime.datetime | None) -> int | None:
"""Convert datetime to Unix timestamp in milliseconds."""
return int(dt.timestamp() * 1000) if dt else None

@staticmethod
def from_unix_millis(ms: int | None) -> datetime.datetime | None:
"""Convert Unix timestamp in milliseconds to datetime."""
return (
datetime.datetime.fromtimestamp(ms / 1000, tz=datetime.UTC)
if ms is not None
else None
)


@dataclass(frozen=True)
class Operation:
"""Represent the Operation type for GetDurableExecutionState and CheckpointDurableExecution."""
Expand Down Expand Up @@ -805,9 +824,11 @@ def to_dict(self) -> MutableMapping[str, Any]:
step_dict["Error"] = self.step_details.error.to_dict()
result["StepDetails"] = step_dict
if self.wait_details:
result["WaitDetails"] = {
"ScheduledEndTimestamp": self.wait_details.scheduled_end_timestamp
}
result["WaitDetails"] = (
{"ScheduledEndTimestamp": self.wait_details.scheduled_end_timestamp}
if self.wait_details.scheduled_end_timestamp
else {}
)
if self.callback_details:
callback_dict: MutableMapping[str, Any] = {
"CallbackId": self.callback_details.callback_id
Expand All @@ -826,6 +847,79 @@ def to_dict(self) -> MutableMapping[str, Any]:
result["ChainedInvokeDetails"] = invoke_dict
return result

def to_json_dict(self) -> MutableMapping[str, Any]:
"""Convert the Operation to a JSON-serializable dictionary.

Converts datetime objects to millisecond timestamps for JSON compatibility.

Returns:
A dictionary with JSON-serializable values
"""
# Start with the regular to_dict output
result = self.to_dict()

# Convert datetime objects to millisecond timestamps
if ts := result.get("StartTimestamp"):
result["StartTimestamp"] = TimestampConverter.to_unix_millis(ts)

if ts := result.get("EndTimestamp"):
result["EndTimestamp"] = TimestampConverter.to_unix_millis(ts)

if (step_details := result.get("StepDetails")) and (
ts := step_details.get("NextAttemptTimestamp")
):
result["StepDetails"]["NextAttemptTimestamp"] = (
TimestampConverter.to_unix_millis(ts)
)

if (wait_details := result.get("WaitDetails")) and (
ts := wait_details.get("ScheduledEndTimestamp")
):
result["WaitDetails"]["ScheduledEndTimestamp"] = (
TimestampConverter.to_unix_millis(ts)
)

return result

@classmethod
def from_json_dict(cls, data: MutableMapping[str, Any]) -> Operation:
"""Create an Operation from a JSON-serializable dictionary.

Converts millisecond timestamps back to datetime objects.

Args:
data: Dictionary with JSON-serializable values (millisecond timestamps)

Returns:
An Operation instance with datetime objects
"""
# Make a copy to avoid modifying the original data
data_copy = copy.deepcopy(data)

# Convert millisecond timestamps back to datetime objects
if ms := data_copy.get("StartTimestamp"):
data_copy["StartTimestamp"] = TimestampConverter.from_unix_millis(ms)

if ms := data_copy.get("EndTimestamp"):
data_copy["EndTimestamp"] = TimestampConverter.from_unix_millis(ms)

if (step_details := data_copy.get("StepDetails")) and (
ms := step_details.get("NextAttemptTimestamp")
):
step_details["NextAttemptTimestamp"] = TimestampConverter.from_unix_millis(
ms
)

if (wait_details := data_copy.get("WaitDetails")) and (
ms := wait_details.get("ScheduledEndTimestamp")
):
wait_details["ScheduledEndTimestamp"] = TimestampConverter.from_unix_millis(
ms
)

# Use the existing from_dict method with the converted data
return cls.from_dict(data_copy)


@dataclass(frozen=True)
class CheckpointUpdatedExecutionState:
Expand Down
Loading