From fdf9c3dbf4746385d27bac9f774b2337a683c976 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Fri, 17 Mar 2023 10:49:04 +0000 Subject: [PATCH] Add inherit_fds fork action --- fuzz/dune | 4 +- fuzz/fuzz_inherit_fds.ml | 46 ++++++++++++++++ lib_eio/unix/fork_action.c | 81 ++++++++++++++++++++++++++++ lib_eio/unix/fork_action.ml | 32 +++++++++++ lib_eio/unix/fork_action.mli | 15 ++++++ lib_eio/unix/inherit_fds.ml | 97 +++++++++++++++++++++++++++++++++ lib_eio/unix/inherit_fds.mli | 19 +++++++ lib_eio_posix/low_level.ml | 5 ++ lib_eio_posix/low_level.mli | 11 ++++ lib_eio_posix/test/spawn.md | 102 +++++++++++++++++++++++++++++++++++ 10 files changed, 410 insertions(+), 2 deletions(-) create mode 100644 fuzz/fuzz_inherit_fds.ml create mode 100644 lib_eio/unix/inherit_fds.ml create mode 100644 lib_eio/unix/inherit_fds.mli diff --git a/fuzz/dune b/fuzz/dune index f0e84e3fc..9462c1399 100644 --- a/fuzz/dune +++ b/fuzz/dune @@ -1,4 +1,4 @@ (tests (package eio) - (libraries cstruct crowbar fmt eio eio.mock) - (names fuzz_buf_read fuzz_buf_write)) + (libraries cstruct crowbar fmt eio eio.mock eio.unix) + (names fuzz_buf_read fuzz_buf_write fuzz_inherit_fds)) diff --git a/fuzz/fuzz_inherit_fds.ml b/fuzz/fuzz_inherit_fds.ml new file mode 100644 index 000000000..c55925d2a --- /dev/null +++ b/fuzz/fuzz_inherit_fds.ml @@ -0,0 +1,46 @@ +module I = Eio_unix__Inherit_fds + +module S = Set.Make(Int) + +let pp f = function + | `Cloexec x -> Fmt.pf f "close %d" x + | `Keep x -> Fmt.pf f "keep %d" x + +let rec has_duplicates ~seen = function + | [] -> false + | (dst, _) :: _ when S.mem dst seen -> true + | (dst, _) :: xs -> has_duplicates xs ~seen:(S.add dst seen) + +let inherit_fds mapping = + let has_duplicates = has_duplicates ~seen:S.empty mapping in + let fds = Hashtbl.create 10 in + mapping |> List.iter (fun (_dst, src) -> + Hashtbl.add fds src (`Cloexec src); + ); + match I.plan mapping with + | exception (Invalid_argument _) -> assert has_duplicates + | plan -> + assert (not has_duplicates); + plan |> List.iter (fun {I.src; dst} -> + (* Fmt.pr "%d -> %d@." src dst; *) + let v = + match Hashtbl.find fds src with + | `Cloexec x | `Keep x -> + if dst = -1 then `Cloexec x else `Keep x + in + Hashtbl.add fds dst v + ); + mapping |> List.iter (fun (dst, src) -> + let v = Hashtbl.find fds dst in + Crowbar.check_eq ~pp v (`Keep src); + Hashtbl.remove fds dst; + ); + fds |> Hashtbl.iter (fun x -> function + | `Cloexec _ -> () + | `Keep _ -> Fmt.failwith "%d should be close-on-exec!" x + ) + +let fd = Crowbar.range 10 (* Restrict range to make cycles more likely *) + +let () = + Crowbar.(add_test ~name:"inherit_fds" [list (pair fd fd)] inherit_fds) diff --git a/lib_eio/unix/fork_action.c b/lib_eio/unix/fork_action.c index 5abe73b09..2c429862d 100644 --- a/lib_eio/unix/fork_action.c +++ b/lib_eio/unix/fork_action.c @@ -93,3 +93,84 @@ static void action_chdir(int errors, value v_config) { CAMLprim value eio_unix_fork_chdir(value v_unit) { return Val_fork_fn(action_chdir); } + +static void set_blocking(int errors, int fd, int blocking) { + int r = fcntl(fd, F_GETFL, 0); + if (r != -1) { + int flags = blocking + ? r & ~O_NONBLOCK + : r | O_NONBLOCK; + if (r != flags) { + r = fcntl(fd, F_SETFL, flags); + } + } + if (r == -1) { + eio_unix_fork_error(errors, "fcntl", strerror(errno)); + _exit(1); + } +} + +static void set_cloexec(int errors, int fd, int cloexec) { + int r = fcntl(fd, F_GETFD, 0); + if (r != -1) { + int flags = cloexec + ? r | FD_CLOEXEC + : r & ~FD_CLOEXEC; + if (r != flags) { + r = fcntl(fd, F_SETFD, flags); + } + } + if (r == -1) { + eio_unix_fork_error(errors, "fcntl", strerror(errno)); + _exit(1); + } +} + +static void action_dups(int errors, value v_config) { + value v_plan = Field(v_config, 1); + value v_blocking = Field(v_config, 2); + int tmp = -1; + while (Is_block(v_plan)) { + value v_dup = Field(v_plan, 0); + int src = Int_val(Field(v_dup, 0)); + int dst = Int_val(Field(v_dup, 1)); + if (src == -1) src = tmp; + if (dst == -1) { + // Dup to a temporary FD + if (tmp == -1) { + tmp = dup(src); + if (tmp < 0) { + eio_unix_fork_error(errors, "dup-tmp", strerror(errno)); + _exit(1); + } + } else { + int r = dup2(src, tmp); + if (r < 0) { + eio_unix_fork_error(errors, "dup2-tmp", strerror(errno)); + _exit(1); + } + } + set_cloexec(errors, tmp, 1); + } else if (src == dst) { + set_cloexec(errors, dst, 0); + } else { + int r = dup2(src, dst); + if (r < 0) { + eio_unix_fork_error(errors, "dup2", strerror(errno)); + _exit(1); + } + } + v_plan = Field(v_plan, 1); + } + while (Is_block(v_blocking)) { + value v_flags = Field(v_blocking, 0); + int fd = Int_val(Field(v_flags, 0)); + int blocking = Bool_val(Field(v_flags, 1)); + set_blocking(errors, fd, blocking); + v_blocking = Field(v_blocking, 1); + } +} + +CAMLprim value eio_unix_fork_dups(value v_unit) { + return Val_fork_fn(action_dups); +} diff --git a/lib_eio/unix/fork_action.ml b/lib_eio/unix/fork_action.ml index df09e93f0..659410bad 100644 --- a/lib_eio/unix/fork_action.ml +++ b/lib_eio/unix/fork_action.ml @@ -34,3 +34,35 @@ let fchdir fd = { run = fun k -> Rcfd.use ~if_closed:(err_closed "fchdir") fd @@ fun fd -> k (Obj.repr (action_fchdir, fd)) } + +let int_of_fd : Unix.file_descr -> int = Obj.magic + +type action = Inherit_fds.action = { src : int; dst : int } + +let rec with_fds mapping k = + match mapping with + | [] -> k [] + | (dst, src, _) :: xs -> + Rcfd.use ~if_closed:(err_closed "inherit_fds") src @@ fun src -> + with_fds xs @@ fun xs -> + k ((dst, int_of_fd src) :: xs) + +type blocking = [ + | `Blocking + | `Nonblocking + | `Preserve_blocking +] + +external action_dups : unit -> fork_fn = "eio_unix_fork_dups" +let action_dups = action_dups () +let inherit_fds m = + let blocking = m |> List.filter_map (fun (dst, _, flags) -> + match flags with + | `Blocking -> Some (dst, true) + | `Nonblocking -> Some (dst, false) + | `Preserve_blocking -> None + ) + in + with_fds m @@ fun m -> + let plan : action list = Inherit_fds.plan m in + { run = fun k -> k (Obj.repr (action_dups, plan, blocking)) } diff --git a/lib_eio/unix/fork_action.mli b/lib_eio/unix/fork_action.mli index 26ea4dc8b..7699b5b45 100644 --- a/lib_eio/unix/fork_action.mli +++ b/lib_eio/unix/fork_action.mli @@ -38,3 +38,18 @@ val chdir : string -> t val fchdir : Rcfd.t -> t (** [fchdir fd] changes directory to [fd]. *) + +type blocking = [ + | `Blocking (** Clear the [O_NONBLOCK] flag in the child process. *) + | `Nonblocking (** Set the [O_NONBLOCK] flag in the child process. *) + | `Preserve_blocking (** Don't change the blocking mode of the FD. *) +] + +val inherit_fds : (int * Rcfd.t * [< blocking]) list -> t +(** [inherit_fds mapping] marks file descriptors as not close-on-exec and renumbers them. + + For each (fd, src, flags) in [mapping], we use [dup2] to duplicate [src] as [fd]. + If there are cycles in [mapping], a temporary FD is used to break the cycle. + A mapping from an FD to itself simply clears the close-on-exec flag. + + After this, the new FDs may also be set as blocking or non-blocking, depending on [flags]. *) diff --git a/lib_eio/unix/inherit_fds.ml b/lib_eio/unix/inherit_fds.ml new file mode 100644 index 000000000..40687f0e1 --- /dev/null +++ b/lib_eio/unix/inherit_fds.ml @@ -0,0 +1,97 @@ +(* + * Copyright (C) 2023 Thomas Leonard + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +module M = Map.Make(Int) + +module Count = struct + let create () = ref M.empty + + let get t fd = + M.find_opt fd !t + |> Option.value ~default:0 + + let incr t fd = + let inc x = Some (1 + Option.value x ~default:0) in + t := M.update fd inc !t + + let decr t fd = + match get t fd with + | i when i <= 0 -> assert false + | 1 -> t := M.remove fd !t; `Unused + | i -> t := M.add fd (pred i) !t; `Still_needed +end + +type action = { src : int; dst : int } + +let plan mapping = + let mapping = + List.fold_left (fun acc (dst, src) -> + if M.mem dst acc then Fmt.invalid_arg "FD %d assigned twice!" dst; + M.add dst src acc + ) M.empty mapping + in + let plan = ref [] in + let dup2 src dst = plan := {src; dst} :: !plan in + let users_of = Count.create () in + (* First, for any FDs that map to themselves we emit (fd, fd) and then forget about it, + as this doesn't interfere with anything else. + We also set [users_of] to track how many times each FD is needed. *) + let mapping = mapping |> M.filter (fun dst src -> + if src = dst then (dup2 src src; false) (* Just clear the close-on-exec flag. *) + else (Count.incr users_of src; true) + ) in + let tmp = ref (-1) in (* The FD we dup'd to the temporary FD when breaking cycles. *) + let rec no_users dst = + (* Nothing requires the old value of [dst] now, + so if we wanted to put something there, do it. *) + M.find_opt dst mapping |> Option.iter (fun src -> dup src dst) + and dup src dst = + (* Duplicate [src] as [dst]. *) + if src = !tmp then ( + (* We moved [src] to [tmp] to break a cycle, so use [tmp] instead. + Also, there's nothing to do after this as the cycle is broken. *) + dup2 (-1) dst; + ) else ( + dup2 src dst; + (* Record that [dst] no longer depends on [src]. *) + match Count.decr users_of src with + | `Still_needed -> () + | `Unused -> no_users src + ) + in + (* Find any loose ends and work backwards. + Note: we need to do this in two steps because [dup] modifies [users_of]. *) + mapping + |> M.filter (fun dst _src -> Count.get users_of dst = 0) (* FDs with no dependants *) + |> M.iter (fun dst src -> dup src dst); + (* At this point there are no loose ends; we have nothing but cycles left. *) + (* M.iter (fun _ v -> assert (v = 1)) !users_of; *) + (* For each cycle, break it at one point using the temporary FD. + It's safe to allocate the temporary FD now because every FD we plan to use is already allocated. *) + let rec break_cycles () = + match M.min_binding_opt !users_of with (* Pick any remaining FD. *) + | None -> () + | Some (src, _) -> + dup2 src (-1); (* Duplicate [src] somewhere. *) + tmp := src; (* Remember that when we try to use it later. *) + (* The FD that needed [src] can now use [tmp] instead: *) + let state = Count.decr users_of src in + assert (state = `Unused); + no_users src; (* Free this cycle. *) + break_cycles () (* Free any other cycles. *) + in + break_cycles (); + List.rev !plan diff --git a/lib_eio/unix/inherit_fds.mli b/lib_eio/unix/inherit_fds.mli new file mode 100644 index 000000000..5765b4954 --- /dev/null +++ b/lib_eio/unix/inherit_fds.mli @@ -0,0 +1,19 @@ +(** Plan how to renumber FDs in a child process. *) + +type action = { src : int; dst : int } +(** { src; dst} is (roughly) a request to [dup2(src, dst)]. + + [dst] should not be marked as close-on-exec. + If [src = dst] then simply clear the close-on-exec flag for the FD. + + An FD of -1 means to use a temporary FD (e.g. use [dup] the first time, + with close-on-exec true). This is needed if there are cycles (e.g. we want + to switch FDs 1 and 2). Only one temporary FD is needed at a time, so it + can be reused as necessary. *) + +val plan : (int * int) list -> action list +(** [plan mapping] calculates a sequence of operations to renumber file descriptors so that + FD x afterwards refers to the object that [List.assoc mapping x] referred to at the start. + + It returns a list of actions to be performed in sequence. + Example: [plan [1, 2]] is just [[(2, 1)]]. *) diff --git a/lib_eio_posix/low_level.ml b/lib_eio_posix/low_level.ml index e950f3be7..7b3930c7e 100644 --- a/lib_eio_posix/low_level.ml +++ b/lib_eio_posix/low_level.ml @@ -222,6 +222,11 @@ module Process = struct let fchdir fd = Eio_unix.Private.Fork_action.fchdir (Fd.to_rcfd fd) let chdir = Eio_unix.Private.Fork_action.chdir let execve = Eio_unix.Private.Fork_action.execve + + let inherit_fds m : t = + m + |> List.map (fun (dst, src, flags) -> (dst, Fd.to_rcfd src, flags)) + |> Eio_unix.Private.Fork_action.inherit_fds end (* Read a (typically short) error message from a child process. *) diff --git a/lib_eio_posix/low_level.mli b/lib_eio_posix/low_level.mli index 512c166cc..17b5f1766 100644 --- a/lib_eio_posix/low_level.mli +++ b/lib_eio_posix/low_level.mli @@ -93,6 +93,17 @@ module Process : sig val fchdir : Fd.t -> t (** [fchdir dir] changes directory to [dir]. *) + + val inherit_fds : (int * Fd.t * [< `Blocking | `Nonblocking | `Preserve_blocking]) list -> t + (** [inherit_fds mapping] marks file descriptors as not close-on-exec and renumbers them. + + For each key in [mapping], we use [dup2] to duplicate the source descriptor. + If there are cycles in [mapping], a temporary FD is used to break the cycle. + A mapping from an FD to itself simply clears the close-on-exec flag. + + For each FD, you can also say whether it should be set as blocking or non-blocking. + + All FDs are set to blocking mode in the child. *) end val spawn : sw:Switch.t -> Fork_action.t list -> t diff --git a/lib_eio_posix/test/spawn.md b/lib_eio_posix/test/spawn.md index 528a627eb..fbba3cc14 100644 --- a/lib_eio_posix/test/spawn.md +++ b/lib_eio_posix/test/spawn.md @@ -129,3 +129,105 @@ Signalling an exited child does nothing: FOO=bar - : unit = () ``` + +Inheriting file descriptors: + +```ocaml +let fd flow = Eio_posix.Low_level.Fd.get_fd_opt flow |> Option.get +let unix flow = Eio_posix.Low_level.Fd.to_unix `Peek (fd flow) +let int_of_fd : Unix.file_descr -> int = Obj.magic +let id flow = int_of_fd (unix flow) +let read_all pipe = + let r = Eio.Buf_read.of_flow pipe ~max_size:1024 in + Eio.Buf_read.take_all r +``` + +```ocaml +# Eio_posix.run @@ fun _env -> + Switch.run @@ fun sw -> + let pipe_r, pipe_w = Eio_unix.pipe sw in + let child = + Process.spawn ~sw Process.Fork_action.[ + inherit_fds [ + 1, fd pipe_w, `Blocking; + ]; + execve "/usr/bin/env" + ~argv:[| "env" |] + ~env:[| "FOO=bar" |]; + ] + in + Eio.Flow.close pipe_w; + let r = Eio.Buf_read.of_flow pipe_r ~max_size:1024 in + traceln "Read: %S" (Eio.Buf_read.take_all r); + Promise.await (Process.exit_status child);; ++Read: "FOO=bar\n" +- : Unix.process_status = Unix.WEXITED 0 +``` + +Swapping FDs (note: plain sh can't handle multi-digit FDs!): + +```ocaml +# Eio_posix.run @@ fun _env -> + Switch.run @@ fun sw -> + let pipe1_r, pipe1_w = Eio_unix.pipe sw in + let pipe2_r, pipe2_w = Eio_unix.pipe sw in + let pipe3_r, pipe3_w = Eio_unix.pipe sw in + let pipe4_r, pipe4_w = Eio_unix.pipe sw in + let child = + Process.spawn ~sw Process.Fork_action.[ + inherit_fds [ + id pipe1_w, fd pipe2_w, `Blocking; + id pipe2_w, fd pipe1_w, `Blocking; + id pipe3_w, fd pipe4_w, `Blocking; + id pipe4_w, fd pipe3_w, `Blocking; + ]; + execve "/usr/bin/env" + ~argv:[| + "env"; "bash"; "-c"; + Printf.sprintf "echo one >&%d; echo two >&%d; echo three >&%d; echo four >&%d" + (id pipe1_w) + (id pipe2_w) + (id pipe3_w) + (id pipe4_w) + |] + ~env:(Unix.environment ()) + ] + in + Eio.Flow.close pipe1_w; + Eio.Flow.close pipe2_w; + Eio.Flow.close pipe3_w; + Eio.Flow.close pipe4_w; + traceln "Pipe1: %S" (read_all pipe1_r); + traceln "Pipe2: %S" (read_all pipe2_r); + traceln "Pipe3: %S" (read_all pipe3_r); + traceln "Pipe4: %S" (read_all pipe4_r); + Promise.await (Process.exit_status child);; ++Pipe1: "two\n" ++Pipe2: "one\n" ++Pipe3: "four\n" ++Pipe4: "three\n" +- : Unix.process_status = Unix.WEXITED 0 +``` + +Keeping an FD open: + +```ocaml +# Eio_posix.run @@ fun _env -> + Switch.run @@ fun sw -> + let pipe1_r, pipe1_w = Eio_unix.pipe sw in + let child = + Process.spawn ~sw Process.Fork_action.[ + inherit_fds [ + id pipe1_w, fd pipe1_w, `Blocking; + ]; + execve "/usr/bin/env" + ~argv:[| "env"; "bash"; "-c"; Printf.sprintf "echo one >&%d" (id pipe1_w) |] + ~env:(Unix.environment ()) + ] + in + Eio.Flow.close pipe1_w; + traceln "Pipe1: %S" (read_all pipe1_r); + Promise.await (Process.exit_status child);; ++Pipe1: "one\n" +- : Unix.process_status = Unix.WEXITED 0 +```