diff --git a/python/restate/context.py b/python/restate/context.py index c108995..6e7624a 100644 --- a/python/restate/context.py +++ b/python/restate/context.py @@ -81,6 +81,18 @@ def clear(self, name: str) -> None: def clear_all(self) -> None: """clear all the values in the store.""" +# pylint: disable=R0903 +class SendHandle(abc.ABC): + """ + Represents a send operation. + """ + + @abc.abstractmethod + async def invocation_id(self) -> str: + """ + Returns the invocation id of the send operation. + """ + class Context(abc.ABC): """ Represents the context of the current invocation. @@ -122,7 +134,8 @@ def sleep(self, delta: timedelta) -> Awaitable[None]: @abc.abstractmethod def service_call(self, tpe: Callable[[Any, I], Awaitable[O]], - arg: I) -> Awaitable[O]: + arg: I, + idempotency_key: str | None = None) -> Awaitable[O]: """ Invokes the given service with the given argument. """ @@ -133,7 +146,8 @@ def service_send(self, tpe: Callable[[Any, I], Awaitable[O]], arg: I, send_delay: Optional[timedelta] = None, - ) -> None: + idempotency_key: str | None = None, + ) -> SendHandle: """ Invokes the given service with the given argument. """ @@ -142,7 +156,9 @@ def service_send(self, def object_call(self, tpe: Callable[[Any, I], Awaitable[O]], key: str, - arg: I) -> Awaitable[O]: + arg: I, + idempotency_key: str | None = None, + ) -> Awaitable[O]: """ Invokes the given object with the given argument. """ @@ -153,7 +169,8 @@ def object_send(self, key: str, arg: I, send_delay: Optional[timedelta] = None, - ) -> None: + idempotency_key: str | None = None, + ) -> SendHandle: """ Send a message to an object with the given argument. """ @@ -162,7 +179,9 @@ def object_send(self, def workflow_call(self, tpe: Callable[[Any, I], Awaitable[O]], key: str, - arg: I) -> Awaitable[O]: + arg: I, + idempotency_key: str | None = None, + ) -> Awaitable[O]: """ Invokes the given workflow with the given argument. """ @@ -173,7 +192,8 @@ def workflow_send(self, key: str, arg: I, send_delay: Optional[timedelta] = None, - ) -> None: + idempotency_key: str | None = None, + ) -> SendHandle: """ Send a message to an object with the given argument. """ @@ -184,7 +204,8 @@ def generic_call(self, service: str, handler: str, arg: bytes, - key: Optional[str] = None) -> Awaitable[bytes]: + key: Optional[str] = None, + idempotency_key: str | None = None) -> Awaitable[bytes]: """ Invokes the given generic service/handler with the given argument. """ @@ -195,7 +216,9 @@ def generic_send(self, handler: str, arg: bytes, key: Optional[str] = None, - send_delay: Optional[timedelta] = None) -> None: + send_delay: Optional[timedelta] = None, + idempotency_key: str | None = None, + ) -> SendHandle: """ Send a message to a generic service/handler with the given argument. """ @@ -222,6 +245,18 @@ def reject_awakeable(self, name: str, failure_message: str, failure_code: int = Rejects the awakeable with the given name. """ + @abc.abstractmethod + def cancel(self, invocation_id: str): + """ + Cancels the invocation with the given id. + """ + + @abc.abstractmethod + def attach_invocation(self, invocation_id: str, serde: Serde[T] = JsonSerde()) -> T: + """ + Attaches the invocation with the given id. + """ + class ObjectContext(Context, KeyValueStore): """ diff --git a/python/restate/discovery.py b/python/restate/discovery.py index 2f5ec52..972099d 100644 --- a/python/restate/discovery.py +++ b/python/restate/discovery.py @@ -21,11 +21,13 @@ # pylint: disable=C0115 # pylint: disable=C0103 # pylint: disable=W0622 +# pylint: disable=R0913, +# pylint: disable=R0917, import json import typing from enum import Enum -from typing import Optional, Any, List, get_args, get_origin +from typing import Dict, Optional, Any, List, get_args, get_origin from restate.endpoint import Endpoint as RestateEndpoint @@ -58,17 +60,21 @@ def __init__(self, contentType: str, setContentTypeIfEmpty: bool, jsonSchema: Op self.jsonSchema = jsonSchema class Handler: - def __init__(self, name: str, ty: Optional[ServiceHandlerType] = None, input: Optional[InputPayload] = None, output: Optional[OutputPayload] = None): + def __init__(self, name: str, ty: Optional[ServiceHandlerType] = None, input: Optional[InputPayload] = None, output: Optional[OutputPayload] = None, description: Optional[str] = None, metadata: Optional[Dict[str, str]] = None): self.name = name self.ty = ty self.input = input self.output = output + self.documentation = description + self.metadata = metadata class Service: - def __init__(self, name: str, ty: ServiceType, handlers: List[Handler]): + def __init__(self, name: str, ty: ServiceType, handlers: List[Handler], description: Optional[str] = None, metadata: Optional[Dict[str, str]] = None): self.name = name self.ty = ty self.handlers = handlers + self.documentation = description + self.metadata = metadata class Endpoint: def __init__(self, protocolMode: ProtocolMode, minProtocolVersion: int, maxProtocolVersion: int, services: List[Service]): @@ -182,10 +188,16 @@ def compute_discovery(endpoint: RestateEndpoint, discovered_as : typing.Literal[ contentType=handler.handler_io.content_type, jsonSchema=json_schema_from_type_hint(handler.handler_io.output_type)) # add the handler - service_handlers.append(Handler(name=handler.name, ty=ty, input=inp, output=out)) - + service_handlers.append(Handler(name=handler.name, + ty=ty, + input=inp, + output=out, + description=handler.description, + metadata=handler.metadata)) # add the service - services.append(Service(name=service.name, ty=service_type, handlers=service_handlers)) + description = service.service_tag.description + metadata = service.service_tag.metadata + services.append(Service(name=service.name, ty=service_type, handlers=service_handlers, description=description, metadata=metadata)) if endpoint.protocol: protocol_mode = PROTOCOL_MODES[endpoint.protocol] diff --git a/python/restate/handler.py b/python/restate/handler.py index e5afee1..3029aae 100644 --- a/python/restate/handler.py +++ b/python/restate/handler.py @@ -17,7 +17,7 @@ from dataclasses import dataclass from inspect import Signature -from typing import Any, Callable, Awaitable, Generic, Literal, Optional, TypeVar +from typing import Any, Callable, Awaitable, Dict, Generic, Literal, Optional, TypeVar from restate.exceptions import TerminalError from restate.serde import JsonSerde, Serde, PydanticJsonSerde @@ -52,6 +52,8 @@ class ServiceTag: """ kind: Literal["object", "service", "workflow"] name: str + description: Optional[str] = None + metadata: Optional[Dict[str, str]] = None @dataclass class TypeHint(Generic[T]): @@ -114,6 +116,7 @@ def extract_io_type_hints(handler_io: HandlerIO[I, O], signature: Signature): if isinstance(handler_io.output_serde, JsonSerde): # type: ignore handler_io.output_serde = PydanticJsonSerde(annotation) +# pylint: disable=R0902 @dataclass class Handler(Generic[I, O]): """ @@ -125,6 +128,8 @@ class Handler(Generic[I, O]): name: str fn: Callable[[Any, I], Awaitable[O]] | Callable[[Any], Awaitable[O]] arity: int + description: Optional[str] = None + metadata: Optional[Dict[str, str]] = None # disable too many arguments warning @@ -135,7 +140,9 @@ def make_handler(service_tag: ServiceTag, name: str | None, kind: Optional[Literal["exclusive", "shared", "workflow"]], wrapped: Any, - signature: Signature) -> Handler[I, O]: + signature: Signature, + description: Optional[str] = None, + metadata: Optional[Dict[str, str]] = None) -> Handler[I, O]: """ Factory function to create a handler. """ @@ -152,12 +159,14 @@ def make_handler(service_tag: ServiceTag, arity = len(signature.parameters) extract_io_type_hints(handler_io, signature) - handler = Handler[I, O](service_tag, - handler_io, - kind, - handler_name, - wrapped, - arity) + handler = Handler[I, O](service_tag=service_tag, + handler_io=handler_io, + kind=kind, + name=handler_name, + fn=wrapped, + arity=arity, + description=description, + metadata=metadata) vars(wrapped)[RESTATE_UNIQUE_HANDLER_SYMBOL] = handler return handler diff --git a/python/restate/object.py b/python/restate/object.py index 0ba827b..3aef088 100644 --- a/python/restate/object.py +++ b/python/restate/object.py @@ -18,7 +18,7 @@ import typing from restate.serde import Serde, JsonSerde -from .handler import HandlerIO, ServiceTag, make_handler +from restate.handler import Handler, HandlerIO, ServiceTag, make_handler I = typing.TypeVar('I') O = typing.TypeVar('O') @@ -36,10 +36,16 @@ class VirtualObject: Args: name (str): The name of the object. + description (str): The description of the object. + metadata (dict): The metadata of the object. """ - def __init__(self, name): - self.service_tag = ServiceTag("object", name) + handlers: typing.Dict[str, Handler[typing.Any, typing.Any]] + + def __init__(self, name, + description: typing.Optional[str] = None, + metadata: typing.Optional[typing.Dict[str, str]]=None): + self.service_tag = ServiceTag("object", name, description, metadata) self.handlers = {} @property @@ -55,7 +61,8 @@ def handler(self, accept: str = "application/json", content_type: str = "application/json", input_serde: Serde[I] = JsonSerde[I](), # type: ignore - output_serde: Serde[O] = JsonSerde[O]()) -> typing.Callable: # type: ignore + output_serde: Serde[O] = JsonSerde[O](), # type: ignore + metadata: typing.Optional[dict] = None) -> typing.Callable: """ Decorator for defining a handler function. @@ -86,7 +93,7 @@ def wrapped(*args, **kwargs): return fn(*args, **kwargs) signature = inspect.signature(fn, eval_str=True) - handler = make_handler(self.service_tag, handler_io, name, kind, wrapped, signature) + handler = make_handler(self.service_tag, handler_io, name, kind, wrapped, signature, inspect.getdoc(fn), metadata) self.handlers[handler.name] = handler return wrapped diff --git a/python/restate/server_context.py b/python/restate/server_context.py index c749600..e953588 100644 --- a/python/restate/server_context.py +++ b/python/restate/server_context.py @@ -18,7 +18,7 @@ import typing import traceback -from restate.context import DurablePromise, ObjectContext, Request +from restate.context import DurablePromise, ObjectContext, Request, SendHandle from restate.exceptions import TerminalError from restate.handler import Handler, handler_from_callable, invoke_handler from restate.serde import BytesSerde, JsonSerde, Serde @@ -36,6 +36,27 @@ # disable line too long # pylint: disable=C0301 +# disable too few public methods +# pylint: disable=R0903 + +class ServerSendHandle(SendHandle): + """This class implements the send API""" + _invocation_id: typing.Optional[str] + + def __init__(self, context, handle: int) -> None: + super().__init__() + self.handle = handle + self.context = context + self._invocation_id = None + + async def invocation_id(self) -> str: + """Get the invocation id.""" + if self._invocation_id is not None: + return self._invocation_id + res = await self.context.create_poll_or_cancel_coroutine(self.handle) + self._invocation_id = res + return res + async def async_value(n: Callable[[], T]) -> T: """convert a simple value to a coroutine.""" return n() @@ -317,14 +338,15 @@ def do_call(self, parameter: I, key: Optional[str] = None, send_delay: Optional[timedelta] = None, - send: bool = False) -> Awaitable[O] | None: + send: bool = False, + idempotency_key: str | None = None) -> Awaitable[O] | SendHandle: """Make an RPC call to the given handler""" target_handler = handler_from_callable(tpe) service=target_handler.service_tag.name handler=target_handler.name input_serde = target_handler.handler_io.input_serde output_serde = target_handler.handler_io.output_serde - return self.do_raw_call(service, handler, parameter, input_serde, output_serde, key, send_delay, send) + return self.do_raw_call(service, handler, parameter, input_serde, output_serde, key, send_delay, send, idempotency_key) def do_raw_call(self, @@ -335,21 +357,24 @@ def do_raw_call(self, output_serde: Serde[O], key: Optional[str] = None, send_delay: Optional[timedelta] = None, - send: bool = False) -> Awaitable[O] | None: + send: bool = False, + idempotency_key: str | None = None + ) -> Awaitable[O] | SendHandle: """Make an RPC call to the given handler""" parameter = input_serde.serialize(input_param) if send_delay: ms = int(send_delay.total_seconds() * 1000) - self.vm.sys_send(service, handler, parameter, key, delay=ms) - return None + send_handle = self.vm.sys_send(service, handler, parameter, key, delay=ms, idempotency_key=idempotency_key) + return ServerSendHandle(self, send_handle) if send: - self.vm.sys_send(service, handler, parameter, key) - return None + send_handle = self.vm.sys_send(service, handler, parameter, key, idempotency_key=idempotency_key) + return ServerSendHandle(self, send_handle) handle = self.vm.sys_call(service=service, handler=handler, parameter=parameter, - key=key) + key=key, + idempotency_key=idempotency_key) async def await_point(s: ServerInvocationContext, h, o: Serde[O]): """Wait for this handle to be resolved, and deserialize the response.""" @@ -360,43 +385,53 @@ async def await_point(s: ServerInvocationContext, h, o: Serde[O]): def service_call(self, tpe: Callable[[Any, I], Awaitable[O]], - arg: I) -> Awaitable[O]: - coro = self.do_call(tpe, arg) - assert coro is not None + arg: I, + idempotency_key: str | None = None + ) -> Awaitable[O]: + coro = self.do_call(tpe, arg, idempotency_key=idempotency_key) + assert not isinstance(coro, SendHandle) return coro - def service_send(self, tpe: Callable[[Any, I], Awaitable[O]], arg: I, send_delay: timedelta | None = None) -> None: - self.do_call(tpe=tpe, parameter=arg, send_delay=send_delay, send=True) + def service_send(self, tpe: Callable[[Any, I], Awaitable[O]], arg: I, send_delay: timedelta | None = None, idempotency_key: str | None = None) -> SendHandle: + send = self.do_call(tpe=tpe, parameter=arg, send_delay=send_delay, send=True, idempotency_key=idempotency_key) + assert isinstance(send, SendHandle) + return send def object_call(self, tpe: Callable[[Any, I],Awaitable[O]], key: str, arg: I, - send_delay: Optional[timedelta] = None, - send: bool = False) -> Awaitable[O]: - coro = self.do_call(tpe, arg, key, send_delay, send) - assert coro is not None + idempotency_key: str | None = None + ) -> Awaitable[O]: + coro = self.do_call(tpe, arg, key, idempotency_key=idempotency_key) + assert not isinstance(coro, SendHandle) return coro - def object_send(self, tpe: Callable[[Any, I], Awaitable[O]], key: str, arg: I, send_delay: timedelta | None = None) -> None: - self.do_call(tpe=tpe, key=key, parameter=arg, send_delay=send_delay, send=True) + def object_send(self, tpe: Callable[[Any, I], Awaitable[O]], key: str, arg: I, send_delay: timedelta | None = None, idempotency_key: str | None = None) -> SendHandle: + send = self.do_call(tpe=tpe, key=key, parameter=arg, send_delay=send_delay, send=True, idempotency_key=idempotency_key) + assert isinstance(send, SendHandle) + return send def workflow_call(self, tpe: Callable[[Any, I], Awaitable[O]], key: str, - arg: I) -> Awaitable[O]: - return self.object_call(tpe, key, arg) + arg: I, + idempotency_key: str | None = None + ) -> Awaitable[O]: + return self.object_call(tpe, key, arg, idempotency_key=idempotency_key) - def workflow_send(self, tpe: Callable[[Any, I], Awaitable[O]], key: str, arg: I, send_delay: timedelta | None = None) -> None: - return self.object_send(tpe, key, arg, send_delay) + def workflow_send(self, tpe: Callable[[Any, I], Awaitable[O]], key: str, arg: I, send_delay: timedelta | None = None, idempotency_key: str | None = None) -> SendHandle: + send = self.object_send(tpe, key, arg, send_delay, idempotency_key=idempotency_key) + assert isinstance(send, SendHandle) + return send - def generic_call(self, service: str, handler: str, arg: bytes, key: str | None = None) -> Awaitable[bytes]: + def generic_call(self, service: str, handler: str, arg: bytes, key: str | None = None, idempotency_key: str | None = None) -> Awaitable[bytes]: serde = BytesSerde() - return self.do_raw_call(service, handler, arg, serde, serde, key) # type: ignore + return self.do_raw_call(service, handler, arg, serde, serde, key, idempotency_key) # type: ignore - def generic_send(self, service: str, handler: str, arg: bytes, key: str | None = None, send_delay: timedelta | None = None) -> None: + def generic_send(self, service: str, handler: str, arg: bytes, key: str | None = None, send_delay: timedelta | None = None, idempotency_key: str | None = None) -> SendHandle: serde = BytesSerde() - return self.do_raw_call(service, handler, arg, serde, serde , key, send_delay, True) # type: ignore + return self.do_raw_call(service, handler, arg, serde, serde , key, send_delay, True, idempotency_key) # type: ignore def awakeable(self, serde: typing.Optional[Serde[I]] = JsonSerde()) -> typing.Tuple[str, Awaitable[Any]]: @@ -430,3 +465,24 @@ def promise(self, name: str, serde: typing.Optional[Serde[T]] = JsonSerde()) -> def key(self) -> str: return self.invocation.key + + def cancel(self, invocation_id: str): + """cancel an existing invocation by id.""" + if invocation_id is None: + raise ValueError("invocation_id cannot be None") + self.vm.sys_cancel(invocation_id) + + def attach_invocation(self, invocation_id: str, serde: Serde[T] = JsonSerde()) -> T: + if invocation_id is None: + raise ValueError("invocation_id cannot be None") + assert serde is not None + handle = self.vm.attach_invocation(invocation_id) + coro = self.create_poll_or_cancel_coroutine(handle) + + async def await_point(): + """Wait for this handle to be resolved.""" + res = await coro + assert res is not None + return serde.deserialize(res) + + return await_point() diff --git a/python/restate/service.py b/python/restate/service.py index fb2566e..42fad62 100644 --- a/python/restate/service.py +++ b/python/restate/service.py @@ -19,7 +19,7 @@ import typing from restate.serde import Serde, JsonSerde -from .handler import Handler, HandlerIO, ServiceTag, make_handler +from restate.handler import Handler, HandlerIO, ServiceTag, make_handler I = typing.TypeVar('I') O = typing.TypeVar('O') @@ -39,8 +39,10 @@ class Service: name (str): The name of the service. """ - def __init__(self, name: str) -> None: - self.service_tag = ServiceTag("service", name) + def __init__(self, name: str, + description: typing.Optional[str] = None, + metadata: typing.Optional[typing.Dict[str, str]] = None) -> None: + self.service_tag = ServiceTag("service", name, description, metadata) self.handlers: typing.Dict[str, Handler] = {} @property @@ -55,7 +57,8 @@ def handler(self, accept: str = "application/json", content_type: str = "application/json", input_serde: Serde[I] = JsonSerde[I](), # type: ignore - output_serde: Serde[O] = JsonSerde[O]()) -> typing.Callable: # type: ignore + output_serde: Serde[O] = JsonSerde[O](), # type: ignore + metadata: typing.Optional[typing.Dict[str, str]] = None) -> typing.Callable: """ Decorator for defining a handler function. @@ -85,7 +88,7 @@ def wrapped(*args, **kwargs): return fn(*args, **kwargs) signature = inspect.signature(fn, eval_str=True) - handler = make_handler(self.service_tag, handler_io, name, None, wrapped, signature) + handler = make_handler(self.service_tag, handler_io, name, None, wrapped, signature, inspect.getdoc(fn), metadata) self.handlers[handler.name] = handler return wrapped diff --git a/python/restate/vm.py b/python/restate/vm.py index 23feef7..b741791 100644 --- a/python/restate/vm.py +++ b/python/restate/vm.py @@ -12,6 +12,7 @@ wrap the restate._internal.PyVM class """ # pylint: disable=E1101,R0917 +# pylint: disable=too-many-arguments from dataclasses import dataclass import typing @@ -243,9 +244,11 @@ def sys_call(self, service: str, handler: str, parameter: bytes, - key: typing.Optional[str] = None): + key: typing.Optional[str] = None, + idempotency_key: typing.Optional[str] = None + ): """Call a service""" - return self.vm.sys_call(service, handler, parameter, key) + return self.vm.sys_call(service, handler, parameter, key, idempotency_key) # pylint: disable=too-many-arguments def sys_send(self, @@ -253,9 +256,14 @@ def sys_send(self, handler: str, parameter: bytes, key: typing.Optional[str] = None, - delay: typing.Optional[int] = None) -> None: - """send an invocation to a service (no response)""" - self.vm.sys_send(service, handler, parameter, key, delay) + delay: typing.Optional[int] = None, + idempotency_key: typing.Optional[str] = None + ) -> int: + """ + send an invocation to a service, and return the handle + to the promise that will resolve with the invocation id + """ + return self.vm.sys_send(service, handler, parameter, key, delay, idempotency_key) def sys_run(self, name: str) -> int: """ @@ -347,3 +355,15 @@ def sys_end(self): It calls the `sys_end` method of the `vm` object. """ self.vm.sys_end() + + def sys_cancel(self, invocation_id: str): + """ + Cancel a running invocation + """ + self.vm.sys_cancel(invocation_id) + + def attach_invocation(self, invocation_id: str) -> int: + """ + Attach to an invocation + """ + return self.vm.attach_invocation(invocation_id) diff --git a/python/restate/workflow.py b/python/restate/workflow.py index 7c63838..441c29b 100644 --- a/python/restate/workflow.py +++ b/python/restate/workflow.py @@ -19,7 +19,7 @@ import typing from restate.serde import Serde, JsonSerde -from .handler import HandlerIO, ServiceTag, make_handler +from restate.handler import Handler, HandlerIO, ServiceTag, make_handler I = typing.TypeVar('I') O = typing.TypeVar('O') @@ -42,8 +42,10 @@ class Workflow: name (str): The name of the object. """ - def __init__(self, name): - self.service_tag = ServiceTag("workflow", name) + handlers: typing.Dict[str, Handler[typing.Any, typing.Any]] + + def __init__(self, name, description: typing.Optional[str] = None, metadata: typing.Optional[typing.Dict[str, str]] = None): + self.service_tag = ServiceTag("workflow", name, description, metadata) self.handlers = {} @property @@ -58,25 +60,28 @@ def main(self, accept: str = "application/json", content_type: str = "application/json", input_serde: Serde[I] = JsonSerde[I](), # type: ignore - output_serde: Serde[O] = JsonSerde[O]()) -> typing.Callable: # type: ignore + output_serde: Serde[O] = JsonSerde[O](), # type: ignore + metadata: typing.Optional[typing.Dict[str, str]] = None) -> typing.Callable: # type: ignore """Mark this handler as a workflow entry point""" return self._add_handler(name, kind="workflow", accept=accept, content_type=content_type, input_serde=input_serde, - output_serde=output_serde) + output_serde=output_serde, + metadata=metadata) def handler(self, name: typing.Optional[str] = None, accept: str = "application/json", content_type: str = "application/json", input_serde: Serde[I] = JsonSerde[I](), # type: ignore - output_serde: Serde[O] = JsonSerde[O]()) -> typing.Callable: # type: ignore + output_serde: Serde[O] = JsonSerde[O](), # type: ignore + metadata: typing.Optional[typing.Dict[str, str]] = None) -> typing.Callable: """ Decorator for defining a handler function. """ - return self._add_handler(name, "shared", accept, content_type, input_serde, output_serde) + return self._add_handler(name, "shared", accept, content_type, input_serde, output_serde, metadata) def _add_handler(self, name: typing.Optional[str] = None, @@ -84,7 +89,8 @@ def _add_handler(self, accept: str = "application/json", content_type: str = "application/json", input_serde: Serde[I] = JsonSerde[I](), # type: ignore - output_serde: Serde[O] = JsonSerde[O]()) -> typing.Callable: # type: ignore + output_serde: Serde[O] = JsonSerde[O](), # type: ignore + metadata: typing.Optional[typing.Dict[str, str]] = None) -> typing.Callable: # type: ignore """ Decorator for defining a handler function. @@ -94,6 +100,7 @@ def _add_handler(self, content_type: The content type of the request. Default "application/json". serializer: The serializer function to convert the response object to bytes. deserializer: The deserializer function to convert the request bytes to an object. + metadata: An optional dictionary of metadata. Returns: Callable: The decorated function. @@ -115,7 +122,15 @@ def wrapped(*args, **kwargs): return fn(*args, **kwargs) signature = inspect.signature(fn, eval_str=True) - handler = make_handler(self.service_tag, handler_io, name, kind, wrapped, signature) + description = inspect.getdoc(fn) + handler = make_handler(service_tag=self.service_tag, + handler_io=handler_io, + name=name, + kind=kind, + wrapped=wrapped, + signature=signature, + description=description, + metadata=metadata) self.handlers[handler.name] = handler return wrapped diff --git a/src/lib.rs b/src/lib.rs index d38d1b7..01c98ca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -433,13 +433,14 @@ impl PyVM { .map_err(Into::into) } - #[pyo3(signature = (service, handler, buffer, key=None))] + #[pyo3(signature = (service, handler, buffer, key=None, idempotency_key=None))] fn sys_call( mut self_: PyRefMut<'_, Self>, service: String, handler: String, buffer: &Bound<'_, PyBytes>, key: Option, + idempotency_key: Option, ) -> Result { self_ .vm @@ -448,7 +449,7 @@ impl PyVM { service, handler, key, - idempotency_key: None, + idempotency_key, headers: vec![], }, buffer.as_bytes().to_vec().into(), @@ -457,7 +458,7 @@ impl PyVM { .map_err(Into::into) } - #[pyo3(signature = (service, handler, buffer, key=None, delay=None))] + #[pyo3(signature = (service, handler, buffer, key=None, delay=None, idempotency_key=None))] fn sys_send( mut self_: PyRefMut<'_, Self>, service: String, @@ -465,6 +466,7 @@ impl PyVM { buffer: &Bound<'_, PyBytes>, key: Option, delay: Option, + idempotency_key: Option, ) -> Result { self_ .vm @@ -473,7 +475,7 @@ impl PyVM { service, handler, key, - idempotency_key: None, + idempotency_key, headers: vec![], }, buffer.as_bytes().to_vec().into(), @@ -580,6 +582,13 @@ impl PyVM { self_.vm.sys_run(name).map(Into::into).map_err(Into::into) } + fn sys_cancel( + mut self_: PyRefMut<'_, Self>, + invocation_id: String, + ) -> Result<(), PyVMError> { + self_.vm.sys_cancel_invocation(invocation_id).map_err(Into::into) + } + fn propose_run_completion_success( mut self_: PyRefMut<'_, Self>, handle: PyNotificationHandle, @@ -651,6 +660,17 @@ impl PyVM { .map_err(Into::into) } + fn attach_invocation( + mut self_: PyRefMut<'_, Self>, + invocation_id: String, + ) -> Result { + self_ + .vm + .sys_attach_invocation(restate_sdk_shared_core::AttachInvocationTarget::InvocationId(invocation_id)) + .map(Into::into) + .map_err(Into::into) + } + fn sys_end(mut self_: PyRefMut<'_, Self>) -> Result<(), PyVMError> { self_.vm.sys_end().map(Into::into).map_err(Into::into) }