Skip to content

Run retry e2e tests #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@

**/test_report/

target

2 changes: 1 addition & 1 deletion .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
name: "Features integration test (sdk-test-suite version ${{ matrix.sdk-test-suite }})"
strategy:
matrix:
sdk-test-suite: [ "1.5" ]
sdk-test-suite: [ "2.0" ]
permissions:
contents: read
issues: read
Expand Down
9 changes: 8 additions & 1 deletion test-services/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@ FROM ghcr.io/pyo3/maturin AS build-sdk

WORKDIR /usr/src/app

COPY . .
COPY src ./src/
COPY python ./python/
COPY Cargo.lock .
COPY Cargo.toml .
COPY rust-toolchain.toml .
COPY requirements.txt .
COPY pyproject.toml .
COPY LICENSE .

RUN maturin build --out dist --interpreter python3.12

Expand Down
22 changes: 11 additions & 11 deletions test-services/services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@
from typing import Dict, Union
from restate import Service, VirtualObject, Workflow

from .counter import counter_object
from .proxy import proxy
from .awakable_holder import awakeable_holder
from. block_and_wait_workflow import workflow
from .cancel_test import runner, blocking_service
from .failing import failing
from .kill_test import kill_runner, kill_singleton
from .list_object import list_object
from .map_object import map_object
from .non_determinism import non_deterministic
from .test_utils import test_utils
from .counter import counter_object as s1
from .proxy import proxy as s2
from .awakeable_holder import awakeable_holder as s3
from. block_and_wait_workflow import workflow as s4
from .cancel_test import runner, blocking_service as s5
from .failing import failing as s6
from .kill_test import kill_runner, kill_singleton as s7
from .list_object import list_object as s8
from .map_object import map_object as s9
from .non_determinism import non_deterministic as s10
from .test_utils import test_utils as s11

def list_services(bindings):
"""List all services in this module"""
Expand Down
4 changes: 2 additions & 2 deletions test-services/services/cancel_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from restate import VirtualObject, ObjectContext
from restate.exceptions import TerminalError

from . import awakable_holder
from . import awakeable_holder

BlockingOperation = Literal["CALL", "SLEEP", "AWAKEABLE"]

Expand Down Expand Up @@ -47,7 +47,7 @@ async def verify_test(ctx: ObjectContext) -> bool:
@blocking_service.handler()
async def block(ctx: ObjectContext, op: BlockingOperation):
name, awakeable = ctx.awakeable()
await ctx.object_call(awakable_holder.hold, key="cancel", arg=name)
await ctx.object_call(awakeable_holder.hold, key="cancel", arg=name)
await awakeable

if op == "CALL":
Expand Down
46 changes: 31 additions & 15 deletions test-services/services/failing.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,44 @@ async def failing_call_with_eventual_success(ctx: ObjectContext) -> int:
return 4
raise ValueError(f"Failed at attempt: {failures}")

@failing.handler(name="terminallyFailingSideEffect")
async def terminally_failing_side_effect(ctx: ObjectContext):

def side_effect():
raise TerminalError(message="failed side effect")

await ctx.run("sideEffect", side_effect)
raise ValueError("Should not reach here")

side_effect_failures = 0

@failing.handler(name="failingSideEffectWithEventualSuccess")
async def failing_side_effect_with_eventual_success(ctx: ObjectContext) -> int:
eventual_success_side_effects = 0

@failing.handler(name="sideEffectSucceedsAfterGivenAttempts")
async def side_effect_succeeds_after_given_attempts(ctx: ObjectContext, minimum_attempts: int) -> int:

def side_effect():
global side_effect_failures
side_effect_failures += 1
if side_effect_failures >= 4:
side_effect_failures = 0
return 4
raise ValueError(f"Failed at attempt: {side_effect_failures}")
global eventual_success_side_effects
eventual_success_side_effects += 1
if eventual_success_side_effects >= minimum_attempts:
return eventual_success_side_effects
raise ValueError(f"Failed at attempt: {eventual_success_side_effects}")

return await ctx.run("sideEffect", side_effect) # type: ignore
return await ctx.run("sideEffect", side_effect, max_attempts=minimum_attempts + 1) # type: ignore

eventual_failure_side_effects = 0

@failing.handler(name="terminallyFailingSideEffect")
async def terminally_failing_side_effect(ctx: ObjectContext):
@failing.handler(name="sideEffectFailsAfterGivenAttempts")
async def side_effect_fails_after_given_attempts(ctx: ObjectContext, retry_policy_max_retry_count: int) -> int:

def side_effect():
raise TerminalError(message="failed side effect")
global eventual_failure_side_effects
eventual_failure_side_effects += 1
raise ValueError(f"Failed at attempt: {eventual_failure_side_effects}")

try:
await ctx.run("sideEffect", side_effect, max_attempts=retry_policy_max_retry_count)
raise ValueError("Side effect did not fail.")
except TerminalError as t:
global eventual_failure_side_effects
return eventual_failure_side_effects

await ctx.run("sideEffect", side_effect)
raise ValueError("Should not reach here")
4 changes: 2 additions & 2 deletions test-services/services/kill_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from restate import Service, Context, VirtualObject, ObjectContext

from . import awakable_holder
from . import awakeable_holder

kill_runner = Service("KillTestRunner")

Expand All @@ -27,7 +27,7 @@ async def start_call_tree(ctx: Context):
@kill_singleton.handler(name="recursiveCall")
async def recursive_call(ctx: ObjectContext):
name, promise = ctx.awakeable()
ctx.object_send(awakable_holder.hold, key="kill", arg=name)
ctx.object_send(awakeable_holder.hold, key="kill", arg=name)
await promise

await ctx.object_call(recursive_call, key="", arg=None)
Expand Down
20 changes: 16 additions & 4 deletions test-services/services/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# pylint: disable=C0116
# pylint: disable=W0613

from datetime import timedelta
from restate import Service, Context
from typing import TypedDict, Optional, Iterable

Expand All @@ -23,6 +24,7 @@ class ProxyRequest(TypedDict):
virtualObjectKey: Optional[str]
handlerName: str
message: Iterable[int]
delayMillis: Optional[int]


@proxy.handler()
Expand All @@ -31,16 +33,21 @@ async def call(ctx: Context, req: ProxyRequest) -> Iterable[int]:
req['serviceName'],
req['handlerName'],
bytes(req['message']),
req['virtualObjectKey']))
req.get('virtualObjectKey')))


@proxy.handler(name="oneWayCall")
async def one_way_call(ctx: Context, req: ProxyRequest):
send_delay = None
if req.get('delayMillis'):
send_delay = timedelta(milliseconds=req['delayMillis'])
ctx.generic_send(
req['serviceName'],
req['handlerName'],
bytes(req['message']),
req['virtualObjectKey'])
req.get('virtualObjectKey'),
send_delay
)


class ManyCallRequest(TypedDict):
Expand All @@ -54,17 +61,22 @@ async def many_calls(ctx: Context, requests: Iterable[ManyCallRequest]):

for req in requests:
if req['oneWayCall']:
send_delay = None
if req['proxyRequest'].get('delayMillis'):
send_delay = timedelta(milliseconds=req['proxyRequest']['delayMillis'])
ctx.generic_send(
req['proxyRequest']['serviceName'],
req['proxyRequest']['handlerName'],
bytes(req['proxyRequest']['message']),
req['proxyRequest']['virtualObjectKey'])
req['proxyRequest'].get('virtualObjectKey'),
send_delay
)
else:
awaitable = ctx.generic_call(
req['proxyRequest']['serviceName'],
req['proxyRequest']['handlerName'],
bytes(req['proxyRequest']['message']),
req['proxyRequest']['virtualObjectKey'])
req['proxyRequest'].get('virtualObjectKey'))
if req['awaitAtTheEnd']:
to_await.append(awaitable)

Expand Down
44 changes: 38 additions & 6 deletions test-services/services/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
# pylint: disable=C0116
# pylint: disable=W0613

import os
from datetime import timedelta
from typing import Dict, Any
import typing
from typing import (Dict, Iterable, List, Union, TypedDict, Literal, Any)
from restate import Service, Context

from . import awakable_holder
from . import list_object
from . import awakeable_holder

test_utils = Service("TestUtilsService")

Expand All @@ -37,7 +38,7 @@ async def echo_headers(context: Context) -> Dict[str, str]:
async def create_awakeable_and_await_it(context: Context, req: Dict[str, Any]) -> Dict[str, Any]:
name, awakeable = context.awakeable()

await context.object_call(awakable_holder.hold, key=req["awakeableKey"], arg=name)
await context.object_call(awakeable_holder.hold, key=req["awakeableKey"], arg=name)

if "awaitTimeout" not in req:
return {"type": "result", "value": await awakeable}
Expand All @@ -46,7 +47,7 @@ async def create_awakeable_and_await_it(context: Context, req: Dict[str, Any]) -
raise NotImplementedError()

@test_utils.handler(name="sleepConcurrently")
async def sleep_concurrently(context: Context, millis_duration: typing.List[int]) -> None:
async def sleep_concurrently(context: Context, millis_duration: List[int]) -> None:
timers = [context.sleep(timedelta(milliseconds=duration)) for duration in millis_duration]

for timer in timers:
Expand All @@ -65,4 +66,35 @@ def effect():
await context.run("count", effect)

return invoked_side_effects


@test_utils.handler(name="getEnvVariable")
async def get_env_variable(context: Context, env_name: str) -> str:
return os.environ.get(env_name, default="")

class CreateAwakeableAndAwaitIt(TypedDict):
type: Literal["createAwakeableAndAwaitIt"]
awakeableKey: str

class GetEnvVariable(TypedDict):
type: Literal["getEnvVariable"]
envName: str

Command = Union[
CreateAwakeableAndAwaitIt,
GetEnvVariable
]

class InterpretRequest(TypedDict):
listName: str
commands: Iterable[Command]

@test_utils.handler(name="interpretCommands")
async def interpret_commands(context: Context, req: InterpretRequest):
for cmd in req['commands']:
if cmd['type'] == "createAwakeableAndAwaitIt":
name, awakeable = context.awakeable()
context.object_send(awakeable_holder.hold, key=cmd["awakeableKey"], arg=name)
result = await awakeable
context.object_send(list_object.append, key=req['listName'], arg=result)
elif cmd['type'] == "getEnvVariable":
context.object_send(list_object.append, key=req['listName'], arg=os.environ.get(cmd['envName'], default=""))
7 changes: 6 additions & 1 deletion test-services/testservices.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,9 @@ def test_services():
names = os.environ.get('SERVICES')
return services.services_named(names.split(',')) if names else services.all_services()

app = restate.app(services=test_services())
identity_keys = None
e2e_signing_key_env = os.environ.get('E2E_REQUEST_SIGNING_ENV')
if os.environ.get('E2E_REQUEST_SIGNING_ENV'):
identity_keys = [os.environ.get('E2E_REQUEST_SIGNING_ENV')]

app = restate.app(services=test_services(), identity_keys=identity_keys)