Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try a WorkerContext? #1

Open
richardsheridan opened this issue Jun 20, 2024 · 2 comments
Open

Try a WorkerContext? #1

richardsheridan opened this issue Jun 20, 2024 · 2 comments

Comments

@richardsheridan
Copy link

Hi, is there any reason that this wrapper can't be replaced with a WorkerContext?

class WorkerPool:
_ctx: trio_parallel.WorkerContext | None
_ctx_lock: trio.Lock
_limiter: trio.CapacityLimiter
_n_workers: int
_idle_timeout: float
_init: Callable[[], object]
_retire: Callable[[], bool]
_grace_period: float
_worker_type: WorkerType

If there's something I could do to make my feature a little more usable or better documented, I'd like to hear it. :)

@gjoseph92
Copy link
Contributor

Hey @richardsheridan, impressed you noticed this! I'm traveling right now btw, so my replies may be somewhat limited until next week.

Short story is that this really is a wrapper around a WorkerContext. In order to "restart" the pool, we just close and release the current WorkerContext, then start a new one and point the reference to that. I found it sufficiently documented to figure that out at least :)

I would say that if the WorkerContext exposed public methods for manually controlling the scaling, that would have saved having to write this! Then we could keep the same context the whole time, and presumably await scale(0); await scale(n) to restart. That might also be helpful because I found that when workers crashed, there was no way to know that had happened, and to replace them.

On the subject of wishlists, it would also be great to have a way to cancel tasks on workers less violently than SIGKILL. I'd love for cancellation to propagate into the worker process as Cancelled or SIGINT, so workers could stop doing what they're doing and pick up new work without the latency of spinning up a new process.

Overall though, thanks for trio-parallel, I found it very helpful and usable!

@richardsheridan
Copy link
Author

richardsheridan commented Jun 21, 2024

Cool! Let me break this down, in reverse order.

Actually, SIGINT can work as you describe by running signal.signal(signal.SIG_DFL) in your init function and keeping cancellable=False, as well as creating a trio signal handler so your parent program doesn't end. I considered the behavior a bug before, but maybe it was secretly a feature, as long as you remember to catch the exception... maybe I'll put this recipe in the docs.

I don't have a generic way to run code that respects trio-style cancellation. I don't want to maintain something that complex, so if you really need that, you should try tractor.

Worker scaling is "automatic" based on the way LIFO caching interacts with the idle_timeout, to support arbitrary concurrency patterns. Consider several independent "submit loops" using the same context, but with different limiters. If all the loops are suddenly running, it'll scale up to whatever their summed limit is, and if a few run out of work for several minutes, the idle workers will time out and scale down to only what is still in use.

If on the other hand you really want a specific, constant number of workers, you can just do partial(ctx.run_sync, limiter=limiter) and save yourself a class.

Finally, consider retiring workers instead of restarting the pool if what you need is fresh processes. Still, restarting the pool is also an intended use-case, but a trionic recipe that doesn't touch internal methods would look more like this:

def _worker_init():
    print(f"worker init {os.getpid()}")
    import_user_code(USER_CODE_PATH)

def _handle_changed_file(file):
    print(f"changed {file} in worker {os.getpid()}")

def _drain_and_merge_batches(batch):
    batch = set(batch)
    try:
         while True:
            batch |= change_recv.receive_nowait()
    except trio.WouldBlock:
        return batch
  
async def submit_batch(batch):
    async with trio_parallel.open_worker_context() as ctx, trio.open_nursery() as nursery:
        for changed_file in batch:
            nursery.start_soon(
                ctx.run_sync,
                _handle_changed_file,
                changed_file,
                limiter=limiter,
                ),
            )

if __name__ == "__main__":
    import trio
    import trio_parallel
    from fused_local.user_code import (
        import_user_code,
        watch_with_channel,   # highly recommend to make this
    )

    USER_CODE_PATH = Path.cwd() / "example.py"
    N_WORKERS = 2

    change_send, change_recv = trio.open_memory_channel(float("inf"))
    limiter = trio.CapacityLimiter(N_WORKERS)

    async def main():
        async with trio.open_nursery() as nursery:
              nursery.start_soon(
                  watch_with_channel, USER_CODE_PATH, change_send
              )

              for changed_file_batch in change_recv:
                  changed_file_batch = _drain_and_merge_batches(changed_file_batch)
                  await submit_batch(changed_file_batch)
                  # or maybe you're hasty
                  # something_to_cancel_previous_batch()
                  # nursery.start_soon(submit_batch, changed_file_batch)
    try:
        trio.run(main)
    except* KeyboardInterrupt:
        print("shut down")

Not tested or anything, but it gives you the shape of things. You could also wire up a cancelscope to handle early stopping for rapid changes if handling changes takes a long time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants