Skip to content

ctx.run Blocks Event Loop for Sync Functions #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
ouatu-ro opened this issue Mar 12, 2025 · 4 comments
Closed

ctx.run Blocks Event Loop for Sync Functions #46

ouatu-ro opened this issue Mar 12, 2025 · 4 comments

Comments

@ouatu-ro
Copy link
Contributor

ouatu-ro commented Mar 12, 2025

ctx.run does not properly handle synchronous functions, running them directly in the event loop instead of offloading them. This causes inconsistent behavior:

  • Sometimes tasks execute in parallel as expected.
  • Other times, they execute sequentially, doubling execution time.

Steps to Reproduce

Run the following code and start two handler invocations at the same time:

@greeter.handler()
async def sleep_handler(ctx: Context, secs: int):
    await ctx.run("waiting", lambda: time.sleep(secs))
    return f"time.sleep {secs} seconds"


@greeter.handler()
async def greet(ctx: Context, req: GreetingRequest) -> Greeting:
    print("before")
    promise = ctx.service_call(sleep_handler, arg=5)
    promise2 = ctx.service_call(sleep_handler, arg=5)
    start_time = time.time()
    results = [await promise, await promise2]
    end_time = time.time()
    print(f"{end_time - start_time=}")
    print(f"{req=}")
    return Greeting(
        message=f"You said hi to {req.name} after {end_time - start_time} seconds!"
    )
Expected Behavior
  • ctx.service_call should allow both tasks to execute in parallel, finishing in ~5 seconds.
Actual Behavior
  • The first ctx.run blocks for 5 seconds, then the second runs for another 5 seconds.
  • Total execution time is ~10 seconds instead of 5, but strangely sometimes it behaves as expected

Example Output (Sequential Execution):

before
before
end_time - start_time=10.020586013793945
req=GreetingRequest(name='Name1')
end_time - start_time=9.34077501296997
req=GreetingRequest(name='Name2')

Possible Root Cause

Currently, ctx.run executes sync functions inline:

if inspect.iscoroutinefunction(action):
    action_result = await action()
else:
    action_result = action()  # Blocks the event loop

This prevents efficient execution of multiple ctx.run calls.

Proposed Fix

Modify ctx.run to automatically offload sync functions:

import asyncio

if inspect.iscoroutinefunction(action):
    action_result = await action()
else:
    action_result = await asyncio.to_thread(action)  # Prevents blocking
Impact
  • Prevents event loop blocking.
  • Enables multiple ctx.run calls to execute concurrently.
  • Improves performance for CPU-bound and I/O-bound tasks.
  • Ensures consistent execution time (always ~5s instead of sometimes 10s).
before
before
end_time - start_time=5.017489910125732
req=GreetingRequest(name='asdf24')
end_time - start_time=5.016418933868408
req=GreetingRequest(name='asdfg24')

Would love to hear thoughts on this!

Restate version: 0.5.1

@ouatu-ro
Copy link
Contributor Author

To consistently reproduce the bad behavior (sequential execution), use:

@greeter.handler()
async def greet(ctx: Context, req: GreetingRequest) -> Greeting:
    promises = [ctx.service_call(sleep_handler, arg=5) for _ in range(20)]
    start_time = time.time()
    results = [await promise for promise in promises]
    end_time = time.time()
    print(f"{end_time - start_time=}")

instead.

Replacing action_result = action() with action_result = await asyncio.to_thread(action) consistently changes this behavior, allowing parallel execution.

@igalshilman
Copy link
Contributor

igalshilman commented Mar 13, 2025

Hi @ouatu-ro,
The reason that ctx.run can take a non async lambda, is to support capturing value side effects, like for example:

ctx.run("customer id" , lambda : uuid() )

Having to force this using async would be bad ergonomics .

Perhaps we need to insert a variant to the run() that runs the action in the background thread, or be clearer in the docstr.
what do you think?

@ouatu-ro
Copy link
Contributor Author

Hi @igalshilman,

Sorry, I wasn’t clear earlier. Your suggestion is exactly what I tired. Running actions in a background thread by default makes most sense for Python.

I modified the implementation of async def run in ServerInvocationContext (in restate/serverContext) from:

if inspect.iscoroutinefunction(action):
    action_result = await action()  # type: ignore
else:
    action_result = action() # performance killer in an async context

to:

if inspect.iscoroutinefunction(action):
    action_result = await action()  # type: ignore
else:
    action_result = await asyncio.to_thread(action)

This still works great with print(await ctx.run("customer id", lambda: str(uuid.uuid4()))), and works great with retries as well.

This change allows synchronous functions to run without blocking the event loop which is crucial for maintaining performance of other handlers. Currently all handlers get affected by sync executions inside ctx.run. Changing this will align with what most Python developers expect when mixing sync and async code; for example, FastAPI automatically offloads sync functions to a thread pool (see https://fastapi.tiangolo.com/async/).

Instead of always using asyncio.to_thread, if race conditions are a big concern, you could even use a thread pool with a concurrency limit of 1.

What's more, since ctx.run is essentially a retryable block that does not have access to ctx itself, it seems logical to run it off the main event loop (where the workflows and log append operations occur) to avoid performance degradation when using third-party libraries that perform blocking I/O (which sadly is the case with many Python libs, since async/await got added relatively late in Python and it's not yet the standard).

I believe this change is crucial. For example, consider this scenario: with the current ctr.run implementation

@greeter.handler()
async def sleep_handler(ctx: Context, secs: int):
    await ctx.run("waiting", lambda: time.sleep(secs))
    return f"time.sleep {secs} seconds"

@greeter.handler()
async def greet(ctx: Context, req: GreetingRequest) -> Greeting:
    promises = [ctx.service_call(sleep_handler, arg=5) for _ in range(20)]
    start_time = time.time()
    results = [await promise for promise in promises]
    end_time = time.time()
    print(f"{end_time - start_time=}")

With the current ctx.run implementation, running even 10 handler invocations causes a significant performance hit across all Restate handlers. Offloading sync actions to a background thread should mitigate that.

What do you think?

@igalshilman
Copy link
Contributor

Hey @ouatu-ro !
If you'll submit a PR I'd be happy to merge it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants