From 06d3653c5140ec9737c8be12a5b83ea58a28e87e Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 9 Sep 2024 11:32:04 +0200 Subject: [PATCH 1/3] Make sure we fill the identity key. Fix https://github.com/restatedev/sdk-python/issues/16 --- test-services/testservices.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/test-services/testservices.py b/test-services/testservices.py index 26518a7..abf5597 100644 --- a/test-services/testservices.py +++ b/test-services/testservices.py @@ -20,4 +20,9 @@ def test_services(): names = os.environ.get('SERVICES') return services.services_named(names.split(',')) if names else services.all_services() -app = restate.app(services=test_services()) +identity_keys = None +e2e_signing_key_env = os.environ.get('E2E_REQUEST_SIGNING_ENV') +if os.environ.get('E2E_REQUEST_SIGNING_ENV'): + identity_keys = [os.environ.get('E2E_REQUEST_SIGNING_ENV')] + +app = restate.app(services=test_services(), identity_keys=identity_keys) From 9af86387546c736c9a582109ee8c3a2d831b80e1 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 9 Sep 2024 14:21:00 +0200 Subject: [PATCH 2/3] Update to test suite 2.0 --- .github/workflows/integration.yaml | 2 +- test-services/services/__init__.py | 22 ++++----- ...awakable_holder.py => awakeable_holder.py} | 0 test-services/services/cancel_test.py | 4 +- test-services/services/failing.py | 46 +++++++++++++------ test-services/services/kill_test.py | 4 +- test-services/services/proxy.py | 20 ++++++-- test-services/services/test_utils.py | 44 +++++++++++++++--- 8 files changed, 101 insertions(+), 41 deletions(-) rename test-services/services/{awakable_holder.py => awakeable_holder.py} (100%) diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index edd7061..77e076b 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -41,7 +41,7 @@ jobs: name: "Features integration test (sdk-test-suite version ${{ matrix.sdk-test-suite }})" strategy: matrix: - sdk-test-suite: [ "1.5" ] + sdk-test-suite: [ "2.0" ] permissions: contents: read issues: read diff --git a/test-services/services/__init__.py b/test-services/services/__init__.py index ee3230c..7e31fc3 100644 --- a/test-services/services/__init__.py +++ b/test-services/services/__init__.py @@ -11,17 +11,17 @@ from typing import Dict, Union from restate import Service, VirtualObject, Workflow -from .counter import counter_object -from .proxy import proxy -from .awakable_holder import awakeable_holder -from. block_and_wait_workflow import workflow -from .cancel_test import runner, blocking_service -from .failing import failing -from .kill_test import kill_runner, kill_singleton -from .list_object import list_object -from .map_object import map_object -from .non_determinism import non_deterministic -from .test_utils import test_utils +from .counter import counter_object as s1 +from .proxy import proxy as s2 +from .awakeable_holder import awakeable_holder as s3 +from. block_and_wait_workflow import workflow as s4 +from .cancel_test import runner, blocking_service as s5 +from .failing import failing as s6 +from .kill_test import kill_runner, kill_singleton as s7 +from .list_object import list_object as s8 +from .map_object import map_object as s9 +from .non_determinism import non_deterministic as s10 +from .test_utils import test_utils as s11 def list_services(bindings): """List all services in this module""" diff --git a/test-services/services/awakable_holder.py b/test-services/services/awakeable_holder.py similarity index 100% rename from test-services/services/awakable_holder.py rename to test-services/services/awakeable_holder.py diff --git a/test-services/services/cancel_test.py b/test-services/services/cancel_test.py index ff7ebef..7ae58dd 100644 --- a/test-services/services/cancel_test.py +++ b/test-services/services/cancel_test.py @@ -18,7 +18,7 @@ from restate import VirtualObject, ObjectContext from restate.exceptions import TerminalError -from . import awakable_holder +from . import awakeable_holder BlockingOperation = Literal["CALL", "SLEEP", "AWAKEABLE"] @@ -47,7 +47,7 @@ async def verify_test(ctx: ObjectContext) -> bool: @blocking_service.handler() async def block(ctx: ObjectContext, op: BlockingOperation): name, awakeable = ctx.awakeable() - await ctx.object_call(awakable_holder.hold, key="cancel", arg=name) + await ctx.object_call(awakeable_holder.hold, key="cancel", arg=name) await awakeable if op == "CALL": diff --git a/test-services/services/failing.py b/test-services/services/failing.py index 1a0fc5c..6700fc0 100644 --- a/test-services/services/failing.py +++ b/test-services/services/failing.py @@ -39,28 +39,44 @@ async def failing_call_with_eventual_success(ctx: ObjectContext) -> int: return 4 raise ValueError(f"Failed at attempt: {failures}") +@failing.handler(name="terminallyFailingSideEffect") +async def terminally_failing_side_effect(ctx: ObjectContext): + + def side_effect(): + raise TerminalError(message="failed side effect") + + await ctx.run("sideEffect", side_effect) + raise ValueError("Should not reach here") -side_effect_failures = 0 -@failing.handler(name="failingSideEffectWithEventualSuccess") -async def failing_side_effect_with_eventual_success(ctx: ObjectContext) -> int: +eventual_success_side_effects = 0 + +@failing.handler(name="sideEffectSucceedsAfterGivenAttempts") +async def side_effect_succeeds_after_given_attempts(ctx: ObjectContext, minimum_attempts: int) -> int: def side_effect(): - global side_effect_failures - side_effect_failures += 1 - if side_effect_failures >= 4: - side_effect_failures = 0 - return 4 - raise ValueError(f"Failed at attempt: {side_effect_failures}") + global eventual_success_side_effects + eventual_success_side_effects += 1 + if eventual_success_side_effects >= minimum_attempts: + return eventual_success_side_effects + raise ValueError(f"Failed at attempt: {eventual_success_side_effects}") - return await ctx.run("sideEffect", side_effect) # type: ignore + return await ctx.run("sideEffect", side_effect, max_attempts=minimum_attempts + 1) # type: ignore +eventual_failure_side_effects = 0 -@failing.handler(name="terminallyFailingSideEffect") -async def terminally_failing_side_effect(ctx: ObjectContext): +@failing.handler(name="sideEffectFailsAfterGivenAttempts") +async def side_effect_fails_after_given_attempts(ctx: ObjectContext, retry_policy_max_retry_count: int) -> int: def side_effect(): - raise TerminalError(message="failed side effect") + global eventual_failure_side_effects + eventual_failure_side_effects += 1 + raise ValueError(f"Failed at attempt: {eventual_failure_side_effects}") + + try: + await ctx.run("sideEffect", side_effect, max_attempts=retry_policy_max_retry_count) + raise ValueError("Side effect did not fail.") + except TerminalError as t: + global eventual_failure_side_effects + return eventual_failure_side_effects - await ctx.run("sideEffect", side_effect) - raise ValueError("Should not reach here") diff --git a/test-services/services/kill_test.py b/test-services/services/kill_test.py index d431f86..9f12a0a 100644 --- a/test-services/services/kill_test.py +++ b/test-services/services/kill_test.py @@ -14,7 +14,7 @@ from restate import Service, Context, VirtualObject, ObjectContext -from . import awakable_holder +from . import awakeable_holder kill_runner = Service("KillTestRunner") @@ -27,7 +27,7 @@ async def start_call_tree(ctx: Context): @kill_singleton.handler(name="recursiveCall") async def recursive_call(ctx: ObjectContext): name, promise = ctx.awakeable() - ctx.object_send(awakable_holder.hold, key="kill", arg=name) + ctx.object_send(awakeable_holder.hold, key="kill", arg=name) await promise await ctx.object_call(recursive_call, key="", arg=None) diff --git a/test-services/services/proxy.py b/test-services/services/proxy.py index b04b150..cb4bff0 100644 --- a/test-services/services/proxy.py +++ b/test-services/services/proxy.py @@ -12,6 +12,7 @@ # pylint: disable=C0116 # pylint: disable=W0613 +from datetime import timedelta from restate import Service, Context from typing import TypedDict, Optional, Iterable @@ -23,6 +24,7 @@ class ProxyRequest(TypedDict): virtualObjectKey: Optional[str] handlerName: str message: Iterable[int] + delayMillis: Optional[int] @proxy.handler() @@ -31,16 +33,21 @@ async def call(ctx: Context, req: ProxyRequest) -> Iterable[int]: req['serviceName'], req['handlerName'], bytes(req['message']), - req['virtualObjectKey'])) + req.get('virtualObjectKey'))) @proxy.handler(name="oneWayCall") async def one_way_call(ctx: Context, req: ProxyRequest): + send_delay = None + if req.get('delayMillis'): + send_delay = timedelta(milliseconds=req['delayMillis']) ctx.generic_send( req['serviceName'], req['handlerName'], bytes(req['message']), - req['virtualObjectKey']) + req.get('virtualObjectKey'), + send_delay + ) class ManyCallRequest(TypedDict): @@ -54,17 +61,22 @@ async def many_calls(ctx: Context, requests: Iterable[ManyCallRequest]): for req in requests: if req['oneWayCall']: + send_delay = None + if req['proxyRequest'].get('delayMillis'): + send_delay = timedelta(milliseconds=req['proxyRequest']['delayMillis']) ctx.generic_send( req['proxyRequest']['serviceName'], req['proxyRequest']['handlerName'], bytes(req['proxyRequest']['message']), - req['proxyRequest']['virtualObjectKey']) + req['proxyRequest'].get('virtualObjectKey'), + send_delay + ) else: awaitable = ctx.generic_call( req['proxyRequest']['serviceName'], req['proxyRequest']['handlerName'], bytes(req['proxyRequest']['message']), - req['proxyRequest']['virtualObjectKey']) + req['proxyRequest'].get('virtualObjectKey')) if req['awaitAtTheEnd']: to_await.append(awaitable) diff --git a/test-services/services/test_utils.py b/test-services/services/test_utils.py index 14f8696..f5b392f 100644 --- a/test-services/services/test_utils.py +++ b/test-services/services/test_utils.py @@ -12,12 +12,13 @@ # pylint: disable=C0116 # pylint: disable=W0613 +import os from datetime import timedelta -from typing import Dict, Any -import typing +from typing import (Dict, Iterable, List, Union, TypedDict, Literal, Any) from restate import Service, Context -from . import awakable_holder +from . import list_object +from . import awakeable_holder test_utils = Service("TestUtilsService") @@ -37,7 +38,7 @@ async def echo_headers(context: Context) -> Dict[str, str]: async def create_awakeable_and_await_it(context: Context, req: Dict[str, Any]) -> Dict[str, Any]: name, awakeable = context.awakeable() - await context.object_call(awakable_holder.hold, key=req["awakeableKey"], arg=name) + await context.object_call(awakeable_holder.hold, key=req["awakeableKey"], arg=name) if "awaitTimeout" not in req: return {"type": "result", "value": await awakeable} @@ -46,7 +47,7 @@ async def create_awakeable_and_await_it(context: Context, req: Dict[str, Any]) - raise NotImplementedError() @test_utils.handler(name="sleepConcurrently") -async def sleep_concurrently(context: Context, millis_duration: typing.List[int]) -> None: +async def sleep_concurrently(context: Context, millis_duration: List[int]) -> None: timers = [context.sleep(timedelta(milliseconds=duration)) for duration in millis_duration] for timer in timers: @@ -65,4 +66,35 @@ def effect(): await context.run("count", effect) return invoked_side_effects - + +@test_utils.handler(name="getEnvVariable") +async def get_env_variable(context: Context, env_name: str) -> str: + return os.environ.get(env_name, default="") + +class CreateAwakeableAndAwaitIt(TypedDict): + type: Literal["createAwakeableAndAwaitIt"] + awakeableKey: str + +class GetEnvVariable(TypedDict): + type: Literal["getEnvVariable"] + envName: str + +Command = Union[ + CreateAwakeableAndAwaitIt, + GetEnvVariable +] + +class InterpretRequest(TypedDict): + listName: str + commands: Iterable[Command] + +@test_utils.handler(name="interpretCommands") +async def interpret_commands(context: Context, req: InterpretRequest): + for cmd in req['commands']: + if cmd['type'] == "createAwakeableAndAwaitIt": + name, awakeable = context.awakeable() + context.object_send(awakeable_holder.hold, key=cmd["awakeableKey"], arg=name) + result = await awakeable + context.object_send(list_object.append, key=req['listName'], arg=result) + elif cmd['type'] == "getEnvVariable": + context.object_send(list_object.append, key=req['listName'], arg=os.environ.get(cmd['envName'], default="")) From aabc6c9b7d4cfffb4c3ee90b8112498080f2d248 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 9 Sep 2024 14:49:41 +0200 Subject: [PATCH 3/3] Make dockerfile more cacheable --- .dockerignore | 2 ++ test-services/Dockerfile | 9 ++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/.dockerignore b/.dockerignore index c1fb70b..d076eee 100644 --- a/.dockerignore +++ b/.dockerignore @@ -14,3 +14,5 @@ **/test_report/ +target + diff --git a/test-services/Dockerfile b/test-services/Dockerfile index 9492bc1..a86a17d 100644 --- a/test-services/Dockerfile +++ b/test-services/Dockerfile @@ -4,7 +4,14 @@ FROM ghcr.io/pyo3/maturin AS build-sdk WORKDIR /usr/src/app -COPY . . +COPY src ./src/ +COPY python ./python/ +COPY Cargo.lock . +COPY Cargo.toml . +COPY rust-toolchain.toml . +COPY requirements.txt . +COPY pyproject.toml . +COPY LICENSE . RUN maturin build --out dist --interpreter python3.12