Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing piece: worker process pools, run_in_worker_process #5

Open
njsmith opened this issue Jan 22, 2017 · 9 comments
Open

Missing piece: worker process pools, run_in_worker_process #5

njsmith opened this issue Jan 22, 2017 · 9 comments

Comments

@njsmith
Copy link
Member

njsmith commented Jan 22, 2017

It'd be nice to have a clean way to dispatch CPU-bound work. Possibly based on multiprocessing.

@njsmith
Copy link
Member Author

njsmith commented Jan 22, 2017

An interesting question is how this interacts with task-local storage (#2). run_in_worker_thread can easily preserve task-local context because it's in-process (and there's no contention, because the owning task is frozen for the duration of the run_in_worker_thread call). For worker processes, it's not so simple -- what if there's something in the context that can't be pickled? Should changes made in the process be propagated back?

One possibility is that when creating task-local objects, they could be tagged as multiprocess friendly or not, e.g. trio.TaskLocal(pickle=True).

@auvipy
Copy link

auvipy commented Feb 7, 2019

should I work something on trio-multiprocessing?

@njsmith
Copy link
Member Author

njsmith commented Feb 10, 2019

@auvipy Well I mean... it's up to you :-). I think it would be great to have multiprocessing-like tools in Trio. There are lots of things that would be great though, so it really depends on whether it's something you want to work on :-).

Since I haven't looked at this issue in like, 2 years, let me update with some more recent thoughts:

I wouldn't try to use the multiprocessing library, because it's incredibly complex internally, and fragile and lots of ways. I would do something from scratch, and focus on a small set of features with lots of polish.

I think I'd probably do it as a third-party library, instead of baking it into trio itself, because it's actually a pretty complex and self-contained project.

My first goal would be a really seamless await run_in_process(async_fn, *args). I'd use cloudpickle instead of pickle, to avoid a lot of hassles that multiprocessing users run into. I'd try hard to make it friendly to use: make sure control-C works in a sensible way, make sure you can use print inside a subprocess to see what's happening, make sure that if the parent dies unexpectedly the children don't get left behind as orphans, that kind of thing.

Next goal would probably be some kind of process pool support, to cache processes between invocations, because process startup is very expensive, much more so than threads.

Then if I wanted to get really fancy, I'd think about ways to seamlessly pass channel objects between processes, so that you can set up producers and consumers in different processes and they can talk to each other.

@auvipy
Copy link

auvipy commented Feb 10, 2019

I understand. thanks for explaining. celery need a better billiard :)

@njsmith
Copy link
Member Author

njsmith commented Feb 10, 2019

Ah, I see! Well, then I guess you understand why I am hesitant to suggest using multiprocessing itself as the base :-).

@pipermerriam
Copy link

@njsmith I'd be curious to get your feedback on this proof-of-concept for a run_in_process function.

https://github.com/ethereum/trinity/pull/1079/files

Specifically, I'm interested in what you think about how I leveraged the trio subprocess APIs to achieve the process isolation.

This is probably something that I'll keep internal to our codebase for a bit while I polish it, but if this approach is something you think is acceptable for larger scale use then I'd like to get it packaged up for others to use as well.

@njsmith
Copy link
Member Author

njsmith commented Sep 9, 2019

@pipermerriam I guess there are two important decisions that affect how you implement a run_in_worker_process-style API:

  1. Do you want to spawn a new process each time, or re-use processes? The former is simpler and avoids issues with state leaking between invocations; the latter is faster because you can amortize the process startup cost.
  2. Do you want to use the child's stdio to communicate with the parent, or do you want to use some other mechanism? The former is simpler to implement; the later lets you use stuff like print and pdb in the child.

It looks like in your prototype, you're going for "new process each time" and "use stdio". In that case, you can simplify the code a lot, to something like:

async def run_in_worker_process(...):
    encoded_job = cloudpickle.dumps((async_fn, args, ...))
    p = await trio.run_process([sys.executable, "-m", __name__], stdin=encoded_job)
    return cloudpickle.loads(p.stdout)

if __name__ == "__main__":
    job = cloudpickle.load(sys.stdin.detach())
    retval = trio.run(...)
    cloudpickle.dump(sys.stdout.detach(), retval)

If you want to re-use the processes but are happy with using stdio, then you need to add something like the framing protocol that you use in your prototype. But you don't need to create any pipes manually or anything – you can use trio.open_process(..., stdin=subprocess.PIPE, stdout=subprocess.PIPE), and then process.stdio is a regular trio Stream that you can use to send and receive data. This way Trio takes care of setting up the pipes, handling unix vs windows differences, choosing which low-level system API to use when working with those pipes, etc. You might also want #1104 to make the process lifetime management easier. Ideally you also want #174 so you can use Trio in the child when talking to stdin/stdout, though the way your prototype uses sync operations to talk to stdio and then calls trio.run in between would also work.

If you don't want to use stdio... well, I guess if you only care about one-shot processes then the simplest solution is to pass data through temp files :-). But let's say you choose the fanciest option, where you need persistent processes that you can send and receive messages to, and we use some other channel for communication. In this case you need to do something like:

  • create your pipes, using the appropriate method on Windows and Unix. (On Windows we don't have any public API for this yet... that's support for windows named pipes #824. On Unix you can use os.pipe + trio.hazmat.FdStream. Note: in your prototype you use trio.open_file – don't do that! That's for disk files, and it uses an inefficient thread-based API underneath, because that's the only thing that works for disk files. For pipes there are much better APIs, and FdStream will use them.)

  • Pass the pipes into the child on some non-standard descriptor. On Windows this involves subprocess.STARTUPINFO.lpAttributeList. On Unix you use the pass_fds argument to run_process/open_process. Also tell the child where to find the pipes/handles, using e.g. command line arguments.

  • In the child, convert those back into some kind of usable file objects. If the child is sync, then on Windows this involves msvcrt.open_osfhandle to convert from a windows handle into an fd, and then on all platforms, once you have an fd you can use io.open to convert it into a python file object. If the child is async, then again you need support for windows named pipes #824 and FdStream.

@pipermerriam
Copy link

pipermerriam commented Sep 9, 2019

@njsmith great feedback and lots of good pointers.

I spent another few hours tinkering and have things in a state that I'm starting to get more happy with. I'm not necessarily convinced that any of my current design decisions are exactly right but behavior-wise it's quickly approaching the general purpose API that I'm shooting for.

  1. It no longer uses stdin/stdout for child process communication. Child processes are passed file descriptors (thanks for the tip about pass_fds) and use those for communication between parent and child.
  2. Things like SIGINT and SIGTERM should just work (they are relayed to child processes when they occur in the parent process).
  3. SIGKILL should also just work causing immediate termination.

The code doesn't currently re-use child processes but have had that functionality in mind and I'm reasonably confident that it can be added without too much effort. For my specific use case this isn't a high value feature but I know there are plenty of use cases where elimination of this overhead allows offloading CPU bound work so it's definitely on the roadmap.

Again, curious to hear any new thoughts you might have but I don't want to take up too much of your time.

Update: The previously linked PR is still a fine way to view the code but I've moved it to https://github.com/ethereum/trio-run-in-process which has the code in a more organized fashion.

@auvipy
Copy link

auvipy commented Dec 15, 2020

Celery could be greatly benefitted by this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants