Skip to content

Use RestateDurableFuture in workflow methods #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/restate/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def peek(self) -> Awaitable[typing.Optional[T]]:
"""

@abc.abstractmethod
def value(self) -> Awaitable[T]:
def value(self) -> RestateDurableFuture[T]:
"""
Returns the value of the promise if it is resolved, None otherwise.
"""
Expand Down
37 changes: 10 additions & 27 deletions python/restate/server_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,21 +133,9 @@ def __init__(self, server_context: "ServerInvocationContext", name, serde) -> No
super().__init__(name=name, serde=JsonSerde() if serde is None else serde)
self.server_context = server_context

def value(self) -> Awaitable[Any]:
vm: VMWrapper = self.server_context.vm
handle = vm.sys_get_promise(self.name)
coro = self.server_context.create_poll_or_cancel_coroutine([handle])
serde = self.serde
assert serde is not None

async def await_point():
await coro
res = self.server_context.must_take_notification(handle)
if res is None:
return None
return serde.deserialize(res)

return await_point()
def value(self) -> RestateDurableFuture[Any]:
handle = self.server_context.vm.sys_get_promise(self.name)
return self.server_context.create_future(handle, self.serde)

def resolve(self, value: Any) -> Awaitable[None]:
vm: VMWrapper = self.server_context.vm
Expand All @@ -156,36 +144,31 @@ def resolve(self, value: Any) -> Awaitable[None]:
handle = vm.sys_complete_promise_success(self.name, value_buffer)

async def await_point():
await self.server_context.create_poll_or_cancel_coroutine([handle])
if not self.server_context.vm.is_completed(handle):
await self.server_context.create_poll_or_cancel_coroutine([handle])
self.server_context.must_take_notification(handle)

return await_point()
return ServerDurableFuture(self.server_context, handle, await_point)

def reject(self, message: str, code: int = 500) -> Awaitable[None]:
vm: VMWrapper = self.server_context.vm
py_failure = Failure(code=code, message=message)
handle = vm.sys_complete_promise_failure(self.name, py_failure)

async def await_point():
await self.server_context.create_poll_or_cancel_coroutine([handle])
if not self.server_context.vm.is_completed(handle):
await self.server_context.create_poll_or_cancel_coroutine([handle])
self.server_context.must_take_notification(handle)

return await_point()
return ServerDurableFuture(self.server_context, handle, await_point)

def peek(self) -> Awaitable[Any | None]:
vm: VMWrapper = self.server_context.vm
handle = vm.sys_peek_promise(self.name)
serde = self.serde
assert serde is not None

async def await_point():
await self.server_context.create_poll_or_cancel_coroutine([handle])
res = self.server_context.must_take_notification(handle)
if res is None:
return None
return serde.deserialize(res)

return await_point()
return self.server_context.create_future(handle, serde)


# disable too many public method
Expand Down