-
Notifications
You must be signed in to change notification settings - Fork 71
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #461 from talex5/spawn
eio_posix: initial support for subprocesses
- Loading branch information
Showing
19 changed files
with
586 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,8 @@ | ||
(library | ||
(name eio_unix) | ||
(public_name eio.unix) | ||
(foreign_stubs | ||
(language c) | ||
(include_dirs include) | ||
(names fork_action)) | ||
(libraries eio unix threads mtime.clock.os)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
#include <stdlib.h> | ||
#include <unistd.h> | ||
#include <fcntl.h> | ||
#include <string.h> | ||
#include <errno.h> | ||
|
||
#include <caml/mlvalues.h> | ||
|
||
#include "fork_action.h" | ||
|
||
void eio_unix_run_fork_actions(int errors, value v_actions) { | ||
int old_flags = fcntl(errors, F_GETFL, 0); | ||
fcntl(errors, F_SETFL, old_flags & ~O_NONBLOCK); | ||
while (Is_block(v_actions)) { | ||
value v_action = Field(v_actions, 0); | ||
fork_fn *action = (fork_fn *) Nativeint_val(Field(v_action, 0)); | ||
action(errors, v_action); | ||
v_actions = Field(v_actions, 1); | ||
} | ||
_exit(1); | ||
} | ||
|
||
static void try_write_all(int fd, char *buf) { | ||
int len = strlen(buf); | ||
while (len > 0) { | ||
int wrote = write(fd, buf, len); | ||
|
||
if (wrote <= 0) | ||
return; | ||
|
||
buf += wrote; | ||
len -= wrote; | ||
} | ||
} | ||
|
||
void eio_unix_fork_error(int fd, char *fn, char *buf) { | ||
try_write_all(fd, fn); | ||
try_write_all(fd, ": "); | ||
try_write_all(fd, buf); | ||
} | ||
|
||
static char **make_string_array(int errors, value v_array) { | ||
int n = Wosize_val(v_array); | ||
char **c = calloc(sizeof(char *), (n + 1)); | ||
if (!c) { | ||
eio_unix_fork_error(errors, "make_string_array", "out of memory"); | ||
_exit(1); | ||
} | ||
for (int i = 0; i < n; i++) { | ||
c[i] = (char *) String_val(Field(v_array, i)); | ||
} | ||
c[n] = NULL; | ||
return c; | ||
} | ||
|
||
static void action_execve(int errors, value v_config) { | ||
value v_exe = Field(v_config, 1); | ||
char **argv = make_string_array(errors, Field(v_config, 2)); | ||
char **envp = make_string_array(errors, Field(v_config, 3)); | ||
execve(String_val(v_exe), argv, envp); | ||
eio_unix_fork_error(errors, "execve", strerror(errno)); | ||
_exit(1); | ||
} | ||
|
||
CAMLprim value eio_unix_fork_execve(value v_unit) { | ||
return Val_fork_fn(action_execve); | ||
} | ||
|
||
static void action_fchdir(int errors, value v_config) { | ||
value v_fd = Field(v_config, 1); | ||
int r; | ||
r = fchdir(Int_val(v_fd)); | ||
if (r != 0) { | ||
eio_unix_fork_error(errors, "fchdir", strerror(errno)); | ||
_exit(1); | ||
} | ||
} | ||
|
||
CAMLprim value eio_unix_fork_fchdir(value v_unit) { | ||
return Val_fork_fn(action_fchdir); | ||
} | ||
|
||
static void action_chdir(int errors, value v_config) { | ||
value v_path = Field(v_config, 1); | ||
int r; | ||
r = chdir(String_val(v_path)); | ||
if (r != 0) { | ||
eio_unix_fork_error(errors, "chdir", strerror(errno)); | ||
_exit(1); | ||
} | ||
} | ||
|
||
CAMLprim value eio_unix_fork_chdir(value v_unit) { | ||
return Val_fork_fn(action_chdir); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
type c_action = Obj.t | ||
|
||
type t = { run : 'a. ((c_action -> 'a) -> 'a) } [@@unboxed] | ||
|
||
(* A [fork_fn] is a C function that can be executed after forking. It cannot call OCaml code or | ||
run the OCaml GC. It is passed a [Unix.file_descr] for errors and a pointer | ||
to a [c_action]. On success it should write nothing to the error stream and | ||
return 0. On error, it should write a message to the error FD and return a | ||
non-zero value for the exit status (e.g. 1). *) | ||
type fork_fn | ||
|
||
let rec with_actions actions fn = | ||
match actions with | ||
| [] -> fn [] | ||
| { run } :: xs -> | ||
run @@ fun c_action -> | ||
with_actions xs @@ fun c_actions -> | ||
fn (c_action :: c_actions) | ||
|
||
let err_closed op () = | ||
Fmt.failwith "%s: FD is closed!" op | ||
|
||
external action_execve : unit -> fork_fn = "eio_unix_fork_execve" | ||
let action_execve = action_execve () | ||
let execve path ~argv ~env = { run = fun k -> k (Obj.repr (action_execve, path, argv, env)) } | ||
|
||
external action_chdir : unit -> fork_fn = "eio_unix_fork_chdir" | ||
let action_chdir = action_chdir () | ||
let chdir path = { run = fun k -> k (Obj.repr (action_chdir, path)) } | ||
|
||
external action_fchdir : unit -> fork_fn = "eio_unix_fork_fchdir" | ||
let action_fchdir = action_fchdir () | ||
let fchdir fd = { | ||
run = fun k -> | ||
Rcfd.use ~if_closed:(err_closed "fchdir") fd @@ fun fd -> | ||
k (Obj.repr (action_fchdir, fd)) } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
(** Actions to perform after forking a child process. | ||
To spawn a child executable on Unix, the parent forks a copy of itself, | ||
then has the child copy set up the environment for the new program and | ||
execute it. | ||
However, we cannot run any OCaml code in the forked child process. This is | ||
because `fork` only duplicates its own domain. To the child, it appears | ||
that all other domains have stopped responding and if it tries to e.g. | ||
perform a GC then the child process will hang. | ||
Therefore, the fork call and all child actions need to be written in C. | ||
This module provides some support code for doing that. | ||
Individual backends will wrap these actions with higher-level APIs and | ||
can also add their own platform-specific actions *) | ||
|
||
type fork_fn | ||
(** A C function, as defined in "include/fork_action.h". *) | ||
|
||
type c_action = Obj.t | ||
(** An action to be performed in a child process after forking. | ||
This must be a tuple whose first field is a [fork_fn]. *) | ||
|
||
type t = { run : 'a. ((c_action -> 'a) -> 'a) } [@@unboxed] | ||
(** An action that calls [run k] in the parent process to create the C action. | ||
[run] passes the action to [k], which forks the child and runs it. When [k] | ||
returns, [run] can free any resources used. *) | ||
|
||
val with_actions : t list -> (c_action list -> 'a) -> 'a | ||
|
||
(** {2 Actions} *) | ||
|
||
val execve : string -> argv:string array -> env:string array -> t | ||
(** See [execve(2)]. *) | ||
|
||
val chdir : string -> t | ||
(** [chdir path] changes directory to [path]. *) | ||
|
||
val fchdir : Rcfd.t -> t | ||
(** [fchdir fd] changes directory to [fd]. *) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
#include <caml/mlvalues.h> | ||
#include <caml/alloc.h> | ||
|
||
/* A function that runs in the forked child process. It must not run any OCaml code or invoke the GC. | ||
* If the action fails then it writes an error message to the FD [errors] and calls [_exit]. | ||
* v_args is the c_action tuple (where field 0 is the function itself). | ||
*/ | ||
typedef void fork_fn(int errors, value v_args); | ||
|
||
Caml_inline value Val_fork_fn(fork_fn *fn) { | ||
return caml_copy_nativeint((intnat) fn); | ||
} | ||
|
||
/* Run each C action in the list [v_actions]. | ||
* Sets [errors] to be blocking. Never returns. | ||
*/ | ||
void eio_unix_run_fork_actions(int errors, value v_actions); | ||
|
||
/* Write "$fn: $msg" to fd. | ||
* fd must be blocking. | ||
* Ignores failure. */ | ||
void eio_unix_fork_error(int fd, char *fn, char *msg); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
(* Keep track of running child processes and notify their fiber when they exit. | ||
After forking a child process, it gets registered in the global [db] along with a resolver | ||
for the promise of its exit status. When we get a SIGCHLD signal, we reap all exited processes | ||
and resolve their promises, waking whichever fibers are waiting for them. | ||
We have to be careful not to use a PID after [wait] reaps it, as the PID could have been reused by then. | ||
The signal handler can run in any domain or systhread, so we have to be careful about that too. | ||
We can't defer the call to [wait] until we're running in an Eio domain as we don't know which domain | ||
should handle it until [wait] gives as the process ID. We don't want to delegate to a particular domain | ||
because it might be spinning doing CPU stuff for a long time. Instead, we try to take the lock in the | ||
signal handler and do it there. If we can't get the lock then we just record that a wait is needed; | ||
whoever holds the lock will soon release it and will do the reaping for us. | ||
Note that, since signal handlers are global, | ||
this will interfere with any libraries trying to manage processes themselves. | ||
For systems with Process Descriptors we could skip all this nonsense and | ||
just poll on the process's FD. e.g. using [pdfork] on FreeBSD or [CLONE_PIDFD] on Linux. *) | ||
|
||
open Eio.Std | ||
|
||
(* Each child process is registered in this table. | ||
Must hold [lock] when accessing it. *) | ||
let db : (int, Unix.process_status Promise.u) Hashtbl.t = Hashtbl.create 10 | ||
|
||
(* Set to [true] when we receive [SIGCHLD] and [false] before calling [wait]. *) | ||
let need_wait = Atomic.make false | ||
|
||
(* [lock] must be held when spawning or reaping. Otherwise, this can happen: | ||
- We spawn process 100, adding it to [db]. | ||
- It exits, sending us SIGCHLD. | ||
- The signal handler calls [wait], reaping it. | ||
- Another domain spawns another process 100 and adds it to [db], | ||
overwriting the previous entry. | ||
- The signal handler resumes, and gets the wrong entry. | ||
If [lock] is already locked when the SIGCHLD handler runs then it just leaves [need_wait = true] | ||
(a signal handler can't wait on a mutex, since it may have interrupted the holder). | ||
The unlocker needs to check [need_wait] after releasing the lock. *) | ||
let lock = Mutex.create () | ||
|
||
(* [pid] has exited. Notify the waiter. Must hold [lock] when calling this. *) | ||
let report_child_status pid status = | ||
match Hashtbl.find_opt db pid with | ||
| Some r -> | ||
Hashtbl.remove db pid; | ||
Promise.resolve r status | ||
| None -> | ||
(* Not one of ours. Not much we can do here. The spawner will probably get | ||
an [ECHILD] error when they wait, which will do for the error. *) | ||
() | ||
|
||
(* Must hold [lock] when calling this. *) | ||
let rec reap () = | ||
Atomic.set need_wait false; | ||
match Unix.(waitpid [WNOHANG] (-1)) with | ||
| 0, _ -> () (* Returned if there are children but none has exited yet. *) | ||
| pid, status -> report_child_status pid status; reap () | ||
| exception Unix.Unix_error (EINTR, _, _) -> reap () | ||
| exception Unix.Unix_error (ECHILD, _, _) -> () (* Returned if there are no children at all. *) | ||
|
||
let rec reap_nonblocking () = | ||
if Mutex.try_lock lock then ( | ||
reap (); | ||
Mutex.unlock lock; | ||
if Atomic.get need_wait then reap_nonblocking () | ||
) (* else the unlocker will see [need_wait] and call us later *) | ||
|
||
let unlock () = | ||
Mutex.unlock lock; | ||
if Atomic.get need_wait then reap_nonblocking () | ||
|
||
(* Must hold [lock] when calling this. *) | ||
let register pid = | ||
assert (not (Hashtbl.mem db pid)); | ||
let p, r = Promise.create () in | ||
Hashtbl.add db pid r; | ||
p | ||
|
||
let with_lock fn = | ||
Mutex.lock lock; | ||
Fun.protect fn ~finally:unlock | ||
|
||
let handle_sigchld () = | ||
Atomic.set need_wait true; | ||
reap_nonblocking () |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
(** Keep track of child processes and respond to SIGCHLD. *) | ||
|
||
val with_lock : (unit -> 'a) -> 'a | ||
(** This must be held during the (fork, register) sequence | ||
(so that we don't try to reap the process before it's registered), | ||
and also when signalling a child process | ||
(to ensure it isn't reaped at the same time). *) | ||
|
||
val register : int -> Unix.process_status Eio.Promise.t | ||
(** [register pid] adds [pid] to the list of children and returns a promise for its exit status. | ||
You must hold the lock while forking and then calling this. *) | ||
|
||
val handle_sigchld : unit -> unit | ||
(** Call this on [SIGCHLD]. *) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.