Skip to content

[verification] Move the interpreter to test-services/ #43

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion shell.nix
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{ pkgs ? import <nixpkgs> {} }:

(pkgs.buildFHSUserEnv {
(pkgs.buildFHSEnv {
name = "sdk-python";
targetPkgs = pkgs: (with pkgs; [
python3
Expand Down
4 changes: 3 additions & 1 deletion test-services/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ EXPOSE 9080

ENV RESTATE_CORE_LOG=debug
ENV RUST_BACKTRACE=1
CMD ["hypercorn", "testservices:app", "--config", "hypercorn-config.toml"]
ENV PORT 9080

ENTRYPOINT ["./entrypoint.sh"]
5 changes: 5 additions & 0 deletions test-services/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env sh

PORT=${PORT:-"9080"}

python3 -m hypercorn testservices:app --config hypercorn-config.toml --bind "0.0.0.0:${PORT}"
1 change: 0 additions & 1 deletion test-services/hypercorn-config.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
bind = "0.0.0.0:9080"
h2_max_concurrent_streams = 2147483647
keep_alive_max_requests = 2147483647
keep_alive_timeout = 2147483647
Expand Down
5 changes: 5 additions & 0 deletions test-services/services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
from .non_determinism import non_deterministic as s10
from .test_utils import test_utils as s11

from .interpreter import layer_0 as s12
from .interpreter import layer_1 as s13
from .interpreter import layer_2 as s14
from .interpreter import helper as s15

def list_services(bindings):
"""List all services in this module"""
return {obj.name : obj for _, obj in bindings.items() if isinstance(obj, (Service, VirtualObject, Workflow))}
Expand Down
103 changes: 86 additions & 17 deletions verification/interpreter.py → test-services/services/interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
import random


from restate.context import ObjectContext, ObjectSharedContext
from restate.context import Context, ObjectContext, ObjectSharedContext
from restate.exceptions import TerminalError
from restate.object import VirtualObject
from restate.serde import JsonSerde

import restate

SET_STATE = 1
Expand Down Expand Up @@ -49,6 +50,75 @@
# pylint: disable=C0301
# pylint: disable=R0914, R0912, R0915, R0913


helper = restate.Service("ServiceInterpreterHelper")

@helper.handler()
async def ping(ctx: Context) -> None: # pylint: disable=unused-argument
pass

@helper.handler()
async def echo(ctx: Context, parameters: str) -> str: # pylint: disable=unused-argument
return parameters

@helper.handler(name = "echoLater")
async def echo_later(ctx: Context, parameter: dict[str, typing.Any]) -> str:
await ctx.sleep(timedelta(milliseconds=parameter['sleep']))
return parameter['parameter']

@helper.handler(name="terminalFailure")
async def terminal_failure(ctx: Context) -> str:
raise TerminalError("bye")

@helper.handler(name="incrementIndirectly")
async def increment_indirectly(ctx: Context, parameter) -> None:

layer = parameter['layer']
key = parameter['key']

program = {
"commands": [
{
"kind": INCREMENT_STATE_COUNTER,
},
],
}

program_bytes = json.dumps(program).encode('utf-8')

ctx.generic_send(f"ObjectInterpreterL{layer}", "interpret", program_bytes, key)

@helper.handler(name="resolveAwakeable")
async def resolve_awakeable(ctx: Context, aid: str) -> None:
ctx.resolve_awakeable(aid, "ok")

@helper.handler(name="rejectAwakeable")
async def reject_awakeable(ctx: Context, aid: str) -> None:
ctx.reject_awakeable(aid, "error")

@helper.handler(name="incrementViaAwakeableDance")
async def increment_via_awakeable_dance(ctx: Context, input: dict[str, typing.Any]) -> None:
tx_promise_id = input['txPromiseId']
layer = input['interpreter']['layer']
key = input['interpreter']['key']

aid, promise = ctx.awakeable()
ctx.resolve_awakeable(tx_promise_id, aid)
await promise

program = {
"commands": [
{
"kind": INCREMENT_STATE_COUNTER,
},
],
}

program_bytes = json.dumps(program).encode('utf-8')

ctx.generic_send(f"ObjectInterpreterL{layer}", "interpret", program_bytes, key)


class SupportService:

def __init__(self, ctx: ObjectContext) -> None:
Expand Down Expand Up @@ -76,7 +146,7 @@ async def echo(self, parameters: str) -> str:

async def echo_later(self, parameter: str, sleep: int) -> str:
arg = {"parameter": parameter, "sleep": sleep}
return await self.call(method="echo_later", arg=arg)
return await self.call(method="echoLater", arg=arg)

async def terminal_failure(self) -> str:
return await self.call(method="terminalFailure", arg=None)
Expand Down Expand Up @@ -117,7 +187,6 @@ async def interpreter(layer: int,
coros: dict[int,
typing.Tuple[typing.Any, typing.Awaitable[typing.Any]]] = {}
for i, command in enumerate(program['commands']):
print(f"{ctx.request().id} {ctx.key()} {id(ctx)} COMMAND {i} {command}", flush=True)
command_type = command['kind']
if command_type == SET_STATE:
ctx.set(f"key-{command['key']}", f"value-{command['key']}")
Expand Down Expand Up @@ -172,10 +241,7 @@ async def side_effect():

expected, coro = coros[index]
del coros[index]
print(f"AWAITING {index} {coro}", flush=True)
result = await coro
print (f"AWAITED {index} {result}", flush=True)
print(f"EXPECTED {result} {expected}", flush=True)
if result != expected:
raise TerminalError(f"Expected {expected} but got {result}")
elif command_type == RESOLVE_AWAKEABLE:
Expand All @@ -198,22 +264,25 @@ async def side_effect():
js_program = json.dumps(program)
raw_js_program = js_program.encode('utf-8')
promise = ctx.generic_call(next_layer, "interpret", raw_js_program, key)
print(f"Storing coro {i} {promise}", flush=True)
coros[i] = (b'', promise)
else:
raise ValueError(f"Unknown command type: {command_type}")
print(f"{ctx.request().id} {ctx.key()} {id(ctx)} DONE command {i}", flush=True)
print("DONE " + str(len(program['commands'])) , flush=True)

layer_0 = VirtualObject("ObjectInterpreterL0")
def make_layer(i):
layer = VirtualObject(f"ObjectInterpreterL{i}")

@layer.handler()
async def interpret(ctx: ObjectContext, program: Program) -> None:
await interpreter(0, ctx, program)

@layer.handler(kind="shared")
async def counter(ctx: ObjectSharedContext) -> int:
return await ctx.get("counter") or 0

@layer_0.handler()
async def interpret(ctx: ObjectContext, program: Program) -> None:
await interpreter(0, ctx, program)
return layer

@layer_0.handler(kind="shared")
async def counter(ctx: ObjectSharedContext) -> int:
return await ctx.get("counter") or 0

layer_0 = make_layer(0)
layer_1 = make_layer(1)
layer_2 = make_layer(2)

app = restate.app(services=[layer_0])
13 changes: 0 additions & 13 deletions verification/Dockerfile

This file was deleted.

12 changes: 0 additions & 12 deletions verification/README.md

This file was deleted.

6 changes: 0 additions & 6 deletions verification/hypercorn-config.toml

This file was deleted.

2 changes: 0 additions & 2 deletions verification/requirements.txt

This file was deleted.