Skip to content

Commit

Permalink
eio_posix: initial support for subprocesses
Browse files Browse the repository at this point in the history
To spawn a child executable on Unix, the parent `fork`s a copy of
itself, then has the child copy set up the environment and `execve` the
new program.

However, we cannot run any OCaml code in the forked child process. This
is because `fork` only duplicates its own domain. To the child, it
appears that all other domains have stopped responding and if it tries
to e.g. perform a GC then the child process will hang.

Therefore, the `fork` and all child actions need to be written in C.
To keep this flexible, this PR adds a new `fork_action` type and lets
the user provide a list of such actions to perform.
  • Loading branch information
talex5 committed Mar 15, 2023
1 parent eef510c commit 0bec10c
Show file tree
Hide file tree
Showing 18 changed files with 553 additions and 10 deletions.
4 changes: 4 additions & 0 deletions lib_eio/unix/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
(library
(name eio_unix)
(public_name eio.unix)
(foreign_stubs
(language c)
(include_dirs include)
(names fork_action))
(libraries eio unix threads mtime.clock.os))
2 changes: 2 additions & 0 deletions lib_eio/unix/eio_unix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ module Private = struct
| Pipe : Eio.Switch.t -> (<Eio.Flow.source; Eio.Flow.close; unix_fd> * <Eio.Flow.sink; Eio.Flow.close; unix_fd>) Effect.t

module Rcfd = Rcfd

module Fork_action = Fork_action
end

let await_readable fd = Effect.perform (Private.Await_readable fd)
Expand Down
2 changes: 2 additions & 0 deletions lib_eio/unix/eio_unix.mli
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ module Private : sig
(<Eio.Flow.source; Eio.Flow.close; unix_fd> * <Eio.Flow.sink; Eio.Flow.close; unix_fd>) Effect.t (** See {!pipe} *)

module Rcfd = Rcfd

module Fork_action = Fork_action
end

module Ctf = Ctf_unix
Expand Down
100 changes: 100 additions & 0 deletions lib_eio/unix/fork_action.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>

#include <caml/mlvalues.h>

#include "fork_action.h"

void eio_unix_run_fork_actions(int errors, value v_actions) {
int old_flags = fcntl(errors, F_GETFL, 0);
fcntl(errors, F_SETFL, old_flags & ~O_NONBLOCK);
while (Is_block(v_actions)) {
value v_action = Field(v_actions, 0);
fork_fn *action = (fork_fn *) Nativeint_val(Field(v_action, 0));
int err = action(errors, v_action);
if (err) {
_exit(err);
}
v_actions = Field(v_actions, 1);
}
_exit(1);
}

static void try_write_all(int fd, char *buf) {
int len = strlen(buf);
while (len > 0) {
int wrote = write(fd, buf, len);

if (wrote <= 0)
return;

buf += wrote;
len -= wrote;
}
}

void eio_unix_fork_error(int fd, char *fn, char *buf) {
try_write_all(fd, fn);
try_write_all(fd, ": ");
try_write_all(fd, buf);
}

static char **make_string_array(int errors, value v_array) {
int n = Wosize_val(v_array);
char **c = malloc(sizeof(char *) * (n + 1));
if (!c) {
eio_unix_fork_error(errors, "make_string_array", "out of memory");
_exit(1);
}
for (int i = 0; i < n; i++) {
c[i] = (char *) String_val(Field(v_array, i));
}
c[n] = NULL;
return c;
}

static int action_execve(int errors, value v_config) {
value v_exe = Field(v_config, 1);
char **argv = make_string_array(errors, Field(v_config, 2));
char **envp = make_string_array(errors, Field(v_config, 3));
execve(String_val(v_exe), argv, envp);
eio_unix_fork_error(errors, "execve", strerror(errno));
return 1;
}

CAMLprim value eio_unix_fork_execve(value v_unit) {
return Val_fork_fn(action_execve);
}

static int action_fchdir(int errors, value v_config) {
value v_fd = Field(v_config, 1);
int r;
r = fchdir(Int_val(v_fd));
if (r != 0) {
eio_unix_fork_error(errors, "fchdir", strerror(errno));
return 1;
}
return 0;
}

CAMLprim value eio_unix_fork_fchdir(value v_unit) {
return Val_fork_fn(action_fchdir);
}

static int action_chdir(int errors, value v_config) {
value v_path = Field(v_config, 1);
int r;
r = chdir(String_val(v_path));
if (r != 0) {
eio_unix_fork_error(errors, "chdir", strerror(errno));
return 1;
}
return 0;
}

CAMLprim value eio_unix_fork_chdir(value v_unit) {
return Val_fork_fn(action_chdir);
}
30 changes: 30 additions & 0 deletions lib_eio/unix/fork_action.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
type c_action = Obj.t

type t = { run : 'a. ((c_action -> 'a) -> 'a) } [@@unboxed]

(* A [fork_fn] is a C function that can be executed after forking. It cannot call OCaml code or
run the OCaml GC. It is passed a [Unix.file_descr] for errors and a pointer
to a [fork_action]. On success it should write nothing to the error stream and
return 0. On error, it should write a message to the error FD and return a
non-zero value for the exit status (e.g. 1). *)
type fork_fn

let rec with_actions actions fn =
match actions with
| [] -> fn []
| { run } :: xs ->
run @@ fun c_action ->
with_actions xs @@ fun c_actions ->
fn (c_action :: c_actions)

external action_execve : unit -> fork_fn = "eio_unix_fork_execve"
let action_execve = action_execve ()
let execve path ~argv ~env = { run = fun k -> k (Obj.repr (action_execve, path, argv, env)) }

external action_chdir : unit -> fork_fn = "eio_unix_fork_chdir"
let action_chdir = action_chdir ()
let chdir path = { run = fun k -> k (Obj.repr (action_chdir, path)) }

external action_fchdir : unit -> fork_fn = "eio_unix_fork_fchdir"
let action_fchdir = action_fchdir ()
let fchdir fd = { run = fun k -> k (Obj.repr (action_fchdir, fd)) }
40 changes: 40 additions & 0 deletions lib_eio/unix/fork_action.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
(** Actions to perform after forking a child process.
To spawn a child executable on Unix, the parent forks a copy of itself,
then has the child copy set up the environment for the new program and
execute it.
However, we cannot run any OCaml code in the forked child process. This is
because `fork` only duplicates its own domain. To the child, it appears
that all other domains have stopped responding and if it tries to e.g.
perform a GC then the child process will hang.
Therefore, the fork call and all child actions need to be written in C.
This module provides some support code for doing that.
Individual backends will wrap these actions with higher-level APIs and
can also add their own platform-specific actions *)

type fork_fn
(** A C function, as defined in "include/fork_action.h". *)

type c_action = Obj.t
(** An action to be performed in a child process after forking.
This must be a tuple whose first field is a [fork_fn]. *)

type t = { run : 'a. ((c_action -> 'a) -> 'a) } [@@unboxed]
(** An action that calls [run k] in the parent process to create the C action.
[run] passes the action to [k], which forks the child and runs it. When [k]
returns, [run] can free any resources used. *)

val with_actions : t list -> (c_action list -> 'a) -> 'a

(** {2 Actions} *)

val execve : string -> argv:string array -> env:string array -> t
(** See [execve(2)]. *)

val chdir : string -> t
(** [chdir path] changes directory to [path]. *)

val fchdir : Unix.file_descr -> t
(** [fchdir fd] changes directory to [fd]. *)
23 changes: 23 additions & 0 deletions lib_eio/unix/include/fork_action.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#include <caml/mlvalues.h>
#include <caml/alloc.h>

/* A function that runs in the forked child process. It must not run any OCaml code or invoke the GC.
* If the action succeeds then the function returns 0.
* Otherwise, it writes an error message to the FD [errors] and returns a non-zero result.
* v_args is the c_action tuple (where field 0 is the function itself).
*/
typedef int fork_fn(int errors, value v_args);

#define Val_fork_fn(fn) (caml_copy_nativeint((intnat) fn))

/* Run each C action in the list [v_actions].
* If one returns a non-zero value then it stops and calls
* _exit with that result. If the all succeed, it calls _exit(1).
* Sets [errors] to be blocking. Never returns.
*/
void eio_unix_run_fork_actions(int errors, value v_actions);

/* Write "$fn: $msg" to fd.
* fd must be blocking.
* Ignores failure. */
void eio_unix_fork_error(int fd, char *fn, char *msg);
6 changes: 1 addition & 5 deletions lib_eio_posix/domain_mgr.ml
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ let run_event_loop fn x =
)
| Eio_unix.Private.Pipe sw -> Some (fun k ->
match
let unix_r, unix_w = Unix.pipe ~cloexec:true () in
let r = Fd.of_unix ~sw ~blocking:false ~close_unix:true unix_r in
let w = Fd.of_unix ~sw ~blocking:false ~close_unix:true unix_w in
Unix.set_nonblock unix_r;
Unix.set_nonblock unix_w;
let r, w = Low_level.pipe ~sw in
let source = (Flow.of_fd r :> <Eio.Flow.source; Eio.Flow.close; Eio_unix.unix_fd>) in
let sink = (Flow.of_fd w :> <Eio.Flow.sink; Eio.Flow.close; Eio_unix.unix_fd>) in
(source, sink)
Expand Down
1 change: 1 addition & 0 deletions lib_eio_posix/dune
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
(foreign_stubs
(language c)
(flags :standard -D_LARGEFILE64_SOURCE)
(include_dirs ../lib_eio/unix/include)
(names eio_posix_stubs))
(libraries eio eio.utils eio.unix fmt iomux))

Expand Down
2 changes: 2 additions & 0 deletions lib_eio_posix/eio_posix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*)

module Low_level = Low_level
module Process = Process

type stdenv = <
stdin : <Eio.Flow.source; Eio_unix.unix_fd>;
Expand All @@ -33,6 +34,7 @@ type stdenv = <
let run main =
(* SIGPIPE makes no sense in a modern application. *)
Sys.(set_signal sigpipe Signal_ignore);
Sys.(set_signal sigchld (Signal_handle (fun (_:int) -> Process.handle_sigchld ())));
let stdin = (Flow.of_fd Low_level.Fd.stdin :> <Eio.Flow.source; Eio_unix.unix_fd>) in
let stdout = (Flow.of_fd Low_level.Fd.stdout :> <Eio.Flow.sink; Eio_unix.unix_fd>) in
let stderr = (Flow.of_fd Low_level.Fd.stderr :> <Eio.Flow.sink; Eio_unix.unix_fd>) in
Expand Down
2 changes: 2 additions & 0 deletions lib_eio_posix/eio_posix.mli
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ val run : (stdenv -> 'a) -> 'a

module Low_level = Low_level
(** Low-level API for making POSIX calls directly. *)

module Process = Process
16 changes: 16 additions & 0 deletions lib_eio_posix/eio_posix_stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include <caml/unixsupport.h>
#include <caml/bigarray.h>

#include "fork_action.h"

#ifdef ARCH_SIXTYFOUR
#define Int63_val(v) Long_val(v)
#else
Expand Down Expand Up @@ -185,3 +187,17 @@ CAMLprim value caml_eio_posix_renameat(value v_old_fd, value v_old_path, value v
if (ret == -1) uerror("renameat", v_old_path);
CAMLreturn(Val_unit);
}

CAMLprim value caml_eio_posix_spawn(value v_errors, value v_actions) {
CAMLparam1(v_actions);
pid_t child_pid;

child_pid = fork();
if (child_pid == 0) {
eio_unix_run_fork_actions(Int_val(v_errors), v_actions);
} else if (child_pid < 0) {
uerror("fork", Nothing);
}

CAMLreturn(Val_long(child_pid));
}
8 changes: 8 additions & 0 deletions lib_eio_posix/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,11 @@ let rename ?old_dir old_path ?new_dir new_path =
with_dirfd "rename-old" old_dir @@ fun old_dir ->
with_dirfd "rename-new" new_dir @@ fun new_dir ->
eio_renameat old_dir old_path new_dir new_path

let pipe ~sw =
let unix_r, unix_w = Unix.pipe ~cloexec:true () in
let r = Fd.of_unix ~sw ~blocking:false ~close_unix:true unix_r in
let w = Fd.of_unix ~sw ~blocking:false ~close_unix:true unix_w in
Unix.set_nonblock unix_r;
Unix.set_nonblock unix_w;
r, w
2 changes: 2 additions & 0 deletions lib_eio_posix/low_level.mli
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ val writev : Fd.t -> Cstruct.t array -> int
val preadv : file_offset:Optint.Int63.t -> Fd.t -> Cstruct.t array -> int
val pwritev : file_offset:Optint.Int63.t -> Fd.t -> Cstruct.t array -> int

val pipe : sw:Switch.t -> Fd.t * Fd.t

module Open_flags : sig
type t

Expand Down
Loading

0 comments on commit 0bec10c

Please sign in to comment.