-
Notifications
You must be signed in to change notification settings - Fork 8
Description
As of version 0.8.0, the python sdk supports the args param in the ctx.run function, which allows us to pass the argument of the function to be ran by ctx.run in a tuple. Below is the implemented code in the SDK:
def run(self,
name: str,
action: RunAction[T],
serde: Serde[T] = DefaultSerde(),
max_attempts: Optional[int] = None,
max_retry_duration: Optional[timedelta] = None,
type_hint: Optional[typing.Type[T]] = None,
args: Optional[typing.Tuple[Any, ...]] = None
) -> RestateDurableFuture[T]:However, this does not give me the type hints for the parameters of the action function. During development, I might pass the arguments in the wrong order or might not pass some arguments at all. There's no way for me to know if I have passed the right args until and unless I execute the ctx.run function & it throws an error.
It will be of tremendous help while writing the code if I can get the correct type hints for args and both kwargs.
Also, the current version only supports positional arguments that we can pass. We should also support passing keyword arguments.
Currently, I have this wrapper written on top of ctx.run to compensate for the type hints in our code base:
P = ParamSpec("P")
T = TypeVar("T")
async def ctx_run(
name: str,
ctx: ObjectContext,
fn: Callable[P, Coroutine[typing.Any, typing.Any, T]],
*fn_args: P.args,
max_attempts: int | None = None, # type: ignore
serde: Serde[T] = DefaultSerde(), # type: ignore
max_retry_duration: timedelta | None = None, # type: ignore
type_hint: typing.Type[T] | None = None, # type: ignore
**fn_kwargs: P.kwargs,
) -> T:
return await ctx.run(
name,
fn,
args=fn_args,
max_attempts=max_attempts,
serde=serde,
max_retry_duration=max_retry_duration,
type_hint=type_hint,
**fn_kwargs,
)However, I have a better solution that we can have in the SDK. We take max_attempts, serde, max_retry_duration, type_hint as a Pydantic model param and do the following:
class RunOptions[T](BaseModel):
serde: Serde[T] = DefaultSerde()
type_hint: Optional[typing.Type[T]] = None
max_attempts: Optional[int] = None
max_retry_duration: Optional[timedelta] = None
def run(self,
name: str,
action: Callable[P, Coroutine[Any, Any,T]],
run_options: RunOptions[T] = RunOptions[T](),
/,
*args: P.args,
**kwargs: P.kwargs
) -> RestateDurableFuture[T]: