From 886142616f6c6e887b5f13b0ef5287304a75115a Mon Sep 17 00:00:00 2001 From: igalshilman Date: Mon, 24 Feb 2025 14:05:18 +0100 Subject: [PATCH] [verification] Move the interpreter to test-services/ --- shell.nix | 2 +- test-services/Dockerfile | 4 +- test-services/entrypoint.sh | 5 + test-services/hypercorn-config.toml | 1 - test-services/services/__init__.py | 5 + .../services}/interpreter.py | 103 +++++++++++++++--- verification/Dockerfile | 13 --- verification/README.md | 12 -- verification/hypercorn-config.toml | 6 - verification/requirements.txt | 2 - 10 files changed, 100 insertions(+), 53 deletions(-) create mode 100755 test-services/entrypoint.sh rename {verification => test-services/services}/interpreter.py (74%) delete mode 100644 verification/Dockerfile delete mode 100644 verification/README.md delete mode 100644 verification/hypercorn-config.toml delete mode 100644 verification/requirements.txt diff --git a/shell.nix b/shell.nix index 58cb365..54689ba 100755 --- a/shell.nix +++ b/shell.nix @@ -1,6 +1,6 @@ { pkgs ? import {} }: -(pkgs.buildFHSUserEnv { +(pkgs.buildFHSEnv { name = "sdk-python"; targetPkgs = pkgs: (with pkgs; [ python3 diff --git a/test-services/Dockerfile b/test-services/Dockerfile index 316a1bd..b73d902 100644 --- a/test-services/Dockerfile +++ b/test-services/Dockerfile @@ -28,4 +28,6 @@ EXPOSE 9080 ENV RESTATE_CORE_LOG=debug ENV RUST_BACKTRACE=1 -CMD ["hypercorn", "testservices:app", "--config", "hypercorn-config.toml"] +ENV PORT 9080 + +ENTRYPOINT ["./entrypoint.sh"] diff --git a/test-services/entrypoint.sh b/test-services/entrypoint.sh new file mode 100755 index 0000000..a4a7b26 --- /dev/null +++ b/test-services/entrypoint.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env sh + +PORT=${PORT:-"9080"} + +python3 -m hypercorn testservices:app --config hypercorn-config.toml --bind "0.0.0.0:${PORT}" diff --git a/test-services/hypercorn-config.toml b/test-services/hypercorn-config.toml index f7ae2f8..0a02d23 100644 --- a/test-services/hypercorn-config.toml +++ b/test-services/hypercorn-config.toml @@ -1,4 +1,3 @@ -bind = "0.0.0.0:9080" h2_max_concurrent_streams = 2147483647 keep_alive_max_requests = 2147483647 keep_alive_timeout = 2147483647 diff --git a/test-services/services/__init__.py b/test-services/services/__init__.py index 7e31fc3..168284f 100644 --- a/test-services/services/__init__.py +++ b/test-services/services/__init__.py @@ -23,6 +23,11 @@ from .non_determinism import non_deterministic as s10 from .test_utils import test_utils as s11 +from .interpreter import layer_0 as s12 +from .interpreter import layer_1 as s13 +from .interpreter import layer_2 as s14 +from .interpreter import helper as s15 + def list_services(bindings): """List all services in this module""" return {obj.name : obj for _, obj in bindings.items() if isinstance(obj, (Service, VirtualObject, Workflow))} diff --git a/verification/interpreter.py b/test-services/services/interpreter.py similarity index 74% rename from verification/interpreter.py rename to test-services/services/interpreter.py index fa35951..80c3147 100644 --- a/verification/interpreter.py +++ b/test-services/services/interpreter.py @@ -17,10 +17,11 @@ import random -from restate.context import ObjectContext, ObjectSharedContext +from restate.context import Context, ObjectContext, ObjectSharedContext from restate.exceptions import TerminalError from restate.object import VirtualObject from restate.serde import JsonSerde + import restate SET_STATE = 1 @@ -49,6 +50,75 @@ # pylint: disable=C0301 # pylint: disable=R0914, R0912, R0915, R0913 + +helper = restate.Service("ServiceInterpreterHelper") + +@helper.handler() +async def ping(ctx: Context) -> None: # pylint: disable=unused-argument + pass + +@helper.handler() +async def echo(ctx: Context, parameters: str) -> str: # pylint: disable=unused-argument + return parameters + +@helper.handler(name = "echoLater") +async def echo_later(ctx: Context, parameter: dict[str, typing.Any]) -> str: + await ctx.sleep(timedelta(milliseconds=parameter['sleep'])) + return parameter['parameter'] + +@helper.handler(name="terminalFailure") +async def terminal_failure(ctx: Context) -> str: + raise TerminalError("bye") + +@helper.handler(name="incrementIndirectly") +async def increment_indirectly(ctx: Context, parameter) -> None: + + layer = parameter['layer'] + key = parameter['key'] + + program = { + "commands": [ + { + "kind": INCREMENT_STATE_COUNTER, + }, + ], + } + + program_bytes = json.dumps(program).encode('utf-8') + + ctx.generic_send(f"ObjectInterpreterL{layer}", "interpret", program_bytes, key) + +@helper.handler(name="resolveAwakeable") +async def resolve_awakeable(ctx: Context, aid: str) -> None: + ctx.resolve_awakeable(aid, "ok") + +@helper.handler(name="rejectAwakeable") +async def reject_awakeable(ctx: Context, aid: str) -> None: + ctx.reject_awakeable(aid, "error") + +@helper.handler(name="incrementViaAwakeableDance") +async def increment_via_awakeable_dance(ctx: Context, input: dict[str, typing.Any]) -> None: + tx_promise_id = input['txPromiseId'] + layer = input['interpreter']['layer'] + key = input['interpreter']['key'] + + aid, promise = ctx.awakeable() + ctx.resolve_awakeable(tx_promise_id, aid) + await promise + + program = { + "commands": [ + { + "kind": INCREMENT_STATE_COUNTER, + }, + ], + } + + program_bytes = json.dumps(program).encode('utf-8') + + ctx.generic_send(f"ObjectInterpreterL{layer}", "interpret", program_bytes, key) + + class SupportService: def __init__(self, ctx: ObjectContext) -> None: @@ -76,7 +146,7 @@ async def echo(self, parameters: str) -> str: async def echo_later(self, parameter: str, sleep: int) -> str: arg = {"parameter": parameter, "sleep": sleep} - return await self.call(method="echo_later", arg=arg) + return await self.call(method="echoLater", arg=arg) async def terminal_failure(self) -> str: return await self.call(method="terminalFailure", arg=None) @@ -117,7 +187,6 @@ async def interpreter(layer: int, coros: dict[int, typing.Tuple[typing.Any, typing.Awaitable[typing.Any]]] = {} for i, command in enumerate(program['commands']): - print(f"{ctx.request().id} {ctx.key()} {id(ctx)} COMMAND {i} {command}", flush=True) command_type = command['kind'] if command_type == SET_STATE: ctx.set(f"key-{command['key']}", f"value-{command['key']}") @@ -172,10 +241,7 @@ async def side_effect(): expected, coro = coros[index] del coros[index] - print(f"AWAITING {index} {coro}", flush=True) result = await coro - print (f"AWAITED {index} {result}", flush=True) - print(f"EXPECTED {result} {expected}", flush=True) if result != expected: raise TerminalError(f"Expected {expected} but got {result}") elif command_type == RESOLVE_AWAKEABLE: @@ -198,22 +264,25 @@ async def side_effect(): js_program = json.dumps(program) raw_js_program = js_program.encode('utf-8') promise = ctx.generic_call(next_layer, "interpret", raw_js_program, key) - print(f"Storing coro {i} {promise}", flush=True) coros[i] = (b'', promise) else: raise ValueError(f"Unknown command type: {command_type}") - print(f"{ctx.request().id} {ctx.key()} {id(ctx)} DONE command {i}", flush=True) - print("DONE " + str(len(program['commands'])) , flush=True) -layer_0 = VirtualObject("ObjectInterpreterL0") +def make_layer(i): + layer = VirtualObject(f"ObjectInterpreterL{i}") + + @layer.handler() + async def interpret(ctx: ObjectContext, program: Program) -> None: + await interpreter(0, ctx, program) + + @layer.handler(kind="shared") + async def counter(ctx: ObjectSharedContext) -> int: + return await ctx.get("counter") or 0 -@layer_0.handler() -async def interpret(ctx: ObjectContext, program: Program) -> None: - await interpreter(0, ctx, program) + return layer -@layer_0.handler(kind="shared") -async def counter(ctx: ObjectSharedContext) -> int: - return await ctx.get("counter") or 0 +layer_0 = make_layer(0) +layer_1 = make_layer(1) +layer_2 = make_layer(2) -app = restate.app(services=[layer_0]) diff --git a/verification/Dockerfile b/verification/Dockerfile deleted file mode 100644 index c827c1a..0000000 --- a/verification/Dockerfile +++ /dev/null @@ -1,13 +0,0 @@ -FROM python:3.11-slim - -WORKDIR /usr/src/app - -COPY requirements.txt ./ -RUN pip install --no-cache-dir -r requirements.txt - -COPY . . - -EXPOSE 9000 - -ENV PYTHONPATH="/usr/src/app/src" -CMD ["hypercorn", "interpreter:app", "--config", "hypercorn-config.toml"] diff --git a/verification/README.md b/verification/README.md deleted file mode 100644 index 414f8b4..0000000 --- a/verification/README.md +++ /dev/null @@ -1,12 +0,0 @@ -## The command interpreter for restate's e2e verification test - -This is a work in progress, currently it implements the L0 object interpreter, -and requires the other verification services will be deployed separately. - -## Build - -``` -docker build . -t e2e-py-services:main -``` - - diff --git a/verification/hypercorn-config.toml b/verification/hypercorn-config.toml deleted file mode 100644 index 8c39879..0000000 --- a/verification/hypercorn-config.toml +++ /dev/null @@ -1,6 +0,0 @@ -bind = "0.0.0.0:9000" -h2_max_concurrent_streams = 2147483647 -keep_alive_max_requests = 2147483647 -keep_alive_timeout = 2147483647 -workers = 8 - diff --git a/verification/requirements.txt b/verification/requirements.txt deleted file mode 100644 index 949de4a..0000000 --- a/verification/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -hypercorn -restate_sdk \ No newline at end of file