Skip to content

Commit

Permalink
Add low-level process support to eio_linux
Browse files Browse the repository at this point in the history
  • Loading branch information
patricoferris committed Feb 2, 2023
1 parent 388f906 commit d1af2a2
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 1 deletion.
2 changes: 1 addition & 1 deletion lib_eio/unix/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(library
(name eio_unix)
(public_name eio.unix)
(libraries eio unix threads mtime.clock.os))
(libraries eio unix threads mtime.clock.os spawn))
15 changes: 15 additions & 0 deletions lib_eio/unix/eio_unix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,18 @@ let getnameinfo (sockaddr : Eio.Net.Sockaddr.t) =
run_in_systhread (fun () ->
let Unix.{ni_hostname; ni_service} = Unix.getnameinfo sockaddr options in
(ni_hostname, ni_service))

module Spawn = struct
let resolve_program ~paths prog =
if not (Filename.is_relative prog) then Some prog
else
let rec loop = function
| path :: rest ->
let p = Filename.concat path prog in
if Sys.file_exists p then Some p else loop rest
| [] -> None
in
loop paths

let spawn = Spawn.spawn
end
20 changes: 20 additions & 0 deletions lib_eio/unix/eio_unix.mli
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,23 @@ module Ctf = Ctf_unix

val getnameinfo : Eio.Net.Sockaddr.t -> (string * string)
(** [getnameinfo sockaddr] returns domain name and service for [sockaddr]. *)

module Spawn : sig
val resolve_program : paths:string list -> string -> string option
(** [resolve_program ~paths program_name] will try to resolve the [program_name]
in [paths] if it is relative. *)

val spawn :
?env:Spawn.Env.t ->
?cwd:Spawn.Working_dir.t ->
prog:string ->
argv:string list ->
?stdin:Unix.file_descr ->
?stdout:Unix.file_descr ->
?stderr:Unix.file_descr ->
?unix_backend:Spawn.Unix_backend.t ->
?setpgid:Spawn.Pgid.t ->
unit ->
int
(** Calls {!Spawn.spawn}. *)
end
39 changes: 39 additions & 0 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,45 @@ module Low_level = struct
Eio_unix.run_in_systhread @@ fun () ->
Unix.getaddrinfo node service []
|> List.filter_map to_eio_sockaddr_t

module Process = struct
external pidfd_open : int -> Unix.file_descr = "caml_eio_pidfd_open"

type t = {
process : FD.t;
pid : int;
mutable hook : Switch.hook option;
mutable status : Unix.process_status option;
}

let await_exit t =
match t.status with
| Some status -> status
| None ->
await_readable t.process;
let status = Unix.waitpid [] t.pid |> snd in
Option.iter Switch.remove_hook t.hook;
t.status <- Some status;
status

let spawn ?env ?cwd ?stdin ?stdout ?stderr ~sw prog argv =
let paths = Option.map (fun v -> String.split_on_char ':' v) (Sys.getenv_opt "PATH") |> Option.value ~default:[ "/usr/bin"; "/usr/local/bin" ] in
let prog = match Eio_unix.Spawn.resolve_program ~paths prog with
| Some prog -> prog
| None -> raise (Eio.Fs.err (Eio.Fs.Not_found (Eio_unix.Unix_error (Unix.ENOENT, "", ""))))
in
let pid = Eio_unix.Spawn.spawn ?env ?cwd ?stdin ?stdout ?stderr ~prog ~argv () in
let fd = pidfd_open pid in
let process = FD.of_unix ~sw ~seekable:false ~close_unix:true fd in
let t = { process; pid; hook = None; status = None } in
let hook = Switch.on_release_cancellable sw (fun () ->
Unix.kill pid Sys.sigkill; ignore (await_exit t)
) in
t.hook <- Some hook;
t

let send_signal t i = Unix.kill t.pid i
end
end

module EventFD_pool : sig
Expand Down
25 changes: 25 additions & 0 deletions lib_eio_linux/eio_linux.mli
Original file line number Diff line number Diff line change
Expand Up @@ -248,4 +248,29 @@ module Low_level : sig
(** [getaddrinfo host] returns a list of IP addresses for [host]. [host] is either a domain name or
an ipaddress. *)

module Process : sig
type t
(** A subprocess *)

val spawn :
?env:Spawn.Env.t ->
?cwd:Spawn.Working_dir.t ->
?stdin:Unix.file_descr ->
?stdout:Unix.file_descr ->
?stderr:Unix.file_descr ->
sw:Switch.t ->
string ->
string list ->
t
(** Spawns a subprocess. By default all of the optional arguments are inherited from the
calling process. If the process has not finished when the switch is released, the process
will be sent [Sys.sigkill]. *)

val await_exit : t -> Unix.process_status
(** [await_exit t] waits for the process [t] to exit. This blocks the fiber until the process
has finished. *)

val send_signal : t -> int -> unit
(** A wrapper for {!Unix.kill}. *)
end
end
12 changes: 12 additions & 0 deletions lib_eio_linux/eio_stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,15 @@ CAMLprim value caml_eio_getdents(value v_fd) {

CAMLreturn(result);
}

static int pidfd_open(pid_t pid, unsigned int flags) {
return syscall(__NR_pidfd_open, pid, flags);
}

CAMLprim value caml_eio_pidfd_open(value v_pid) {
CAMLparam1(v_pid);
int fd;
// Returns a file descriptor for the PID with close-on-exec set.
fd = pidfd_open(Int_val(v_pid), 0);
CAMLreturn(Val_int(fd));
}
103 changes: 103 additions & 0 deletions lib_eio_linux/tests/process.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Set up the test environment

```ocaml
# #require "eio_linux";;
# open Eio.Std;;
# open Eio;;
# module Process = Eio_linux.Low_level.Process;;
module Process = Eio_linux.Low_level.Process
```

A helper function for reading all of the bytes from a handle.

```ocaml
let read_all fd =
let buf = Cstruct.create 32 in
let acc_buffer = Buffer.create 42 in
let rec read () =
match Flow.single_read fd buf with
| i ->
Buffer.add_string acc_buffer
Cstruct.(to_string (sub buf 0 i));
read ()
| exception End_of_file -> Buffer.contents acc_buffer
in read ()
```

A simple `echo hello` process redirects to stdout.

```ocaml
# Eio_linux.run @@ fun _env ->
Switch.run @@ fun sw ->
let t = Process.spawn ~sw "echo" [ "echo"; "hello" ] in
Process.await_exit t;;
hello
- : Unix.process_status = Unix.WEXITED 0
```

Using a pipe to redirect output to a buffer.

```ocaml
# Eio_linux.run @@ fun _env ->
Switch.run @@ fun sw ->
let rp, wp = Eio_unix.pipe sw in
let w = Eio_unix.FD.peek wp in
let t = Process.spawn ~sw ~stdout:w "echo" [ "echo"; "Hello,"; "World!" ] in
let _ = Process.await_exit t in
Flow.close wp;
let result = read_all rp in
result;;
- : string = "Hello, World!\n"
```

Writing to stdin of a process works.

```ocaml
# Eio_linux.run @@ fun _env ->
Switch.run @@ fun sw ->
let rp, wp = Eio_unix.pipe sw in
let r = Eio_unix.FD.peek rp in
let t = Process.spawn ~sw ~stdin:r "head" [ "head" ] in
Flow.copy_string "Hello!" wp;
Flow.close wp;
Process.await_exit t;;
Hello!
- : Unix.process_status = Unix.WEXITED 0
```

Stopping a process works.

```ocaml
# Eio_linux.run @@ fun _env ->
Switch.run @@ fun sw ->
let t = Process.spawn ~sw "sleep" [ "sleep"; "10" ] in
Process.send_signal t Sys.sigkill;
Process.await_exit t;;
- : Unix.process_status = Unix.WSIGNALED (-7)
```

Forgetting to wait for a process to finish stops the process.

```ocaml
# Eio_linux.run @@ fun _env ->
let proc =
Switch.run @@ fun sw ->
Process.spawn ~sw "sleep" [ "sleep"; "10" ]
in
Process.await_exit proc;;
- : Unix.process_status = Unix.WSIGNALED (-7)
```

Stopping a process interacts nicely with switches.

```ocaml
# Eio_linux.run @@ fun _env ->
let proc =
Switch.run @@ fun sw ->
let t = Process.spawn ~sw "sleep" [ "sleep"; "10" ] in
Process.send_signal t Sys.sigkill;
t
in
Process.await_exit proc;;
- : Unix.process_status = Unix.WSIGNALED (-7)
```

0 comments on commit d1af2a2

Please sign in to comment.