Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process API Redux #473

Closed
wants to merge 11 commits into from
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Eio replaces existing concurrency libraries such as Lwt
* [Buffered Writing](#buffered-writing)
* [Error Handling](#error-handling)
* [Filesystem Access](#filesystem-access)
* [Subprocesses](#subprocesses)
* [Time](#time)
* [Multicore Support](#multicore-support)
* [Synchronisation Tools](#synchronisation-tools)
Expand Down Expand Up @@ -891,6 +892,44 @@ Note: the `eio_luv` backend doesn't have the `openat`, `mkdirat`, etc.,
calls that are necessary to implement these checks without races,
so be careful if symlinks out of the subtree may be created while the program is running.

## Subprocesses

Spawning subprocesses is provided by the [Eio.Process][] module. [Eio_unix][] contains a helper function
for finding the absolute path to programs.


```ocaml
# Eio_main.run @@ fun env ->
let proc_mgr = Eio.Stdenv.process_mgr env in
let stdin, stdout, stderr = Eio.Stdenv.stdio env in
let echo =
Option.get @@ Eio_unix.resolve_program ~paths:[ "/usr/bin"; "/bin" ] "echo"
in
Eio.Switch.run @@ fun sw ->
let child = Eio.Process.spawn proc_mgr ~sw ~stdin ~stdout ~stderr echo [ "echo"; "hello" ] in
Comment on lines +905 to +909
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the low-level API should mimic the platform's native API, this high-level API is where we can add convenience features to make things easier to use.

For example, I think it should handle looking up the path for you. It could have an optional ?executable argument to take the path explicitly, but default to doing the sensible thing with argv[0] (i.e. look up implicit paths in $PATH). Note that this example wouldn't work on e.g. NixOS, where echo is at /run/current-system/sw/bin/echo.

Suggested change
let echo =
Option.get @@ Eio_unix.resolve_program ~paths:[ "/usr/bin"; "/bin" ] "echo"
in
Eio.Switch.run @@ fun sw ->
let child = Eio.Process.spawn proc_mgr ~sw ~stdin ~stdout ~stderr echo [ "echo"; "hello" ] in
Eio.Switch.run @@ fun sw ->
let child = Eio.Process.spawn proc_mgr ~sw ~stdout ["echo"; "hello"] in

stdin can default to /dev/null I think (we don't want to encourage passing it if it's not needed), and thinking about it further I'm OK with stderr being passed through by default. We already let all Eio code output traceln messages to stderr.

Not sure what's best for stdout. Making it optional and passing it though seems OK, as does making it a required argument. I think defaulting to /dev/null would be bad though as you wouldn't get any error indicating why it didn't work (unlike redirecting stdin, where you get end-of-file).

Eio.Process.await child;;
hello
- : Eio.Process.status = Eio.Process.Exited 0
```

If you want to capture the output of a process, you can provide a suitable `Eio.Flow.sink` as the `stdout` argument.

```ocaml
# Eio_main.run @@ fun env ->
let proc_mgr = Eio.Stdenv.process_mgr env in
let buffer = Buffer.create 4 in
let stdin, _, stderr = Eio.Stdenv.stdio env in
let stdout = Eio.Flow.buffer_sink buffer in
let echo =
Option.get @@ Eio_unix.resolve_program ~paths:[ "/usr/bin"; "/bin" ] "echo"
in
Eio.Switch.run @@ fun sw ->
let child = Eio.Process.spawn proc_mgr ~sw ~stdin ~stdout ~stderr echo [ "echo"; "hello" ] in
Eio.Process.await_exn child;
Buffer.contents buffer;;
Comment on lines +920 to +929
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point we should add some other convenience functions like popen for cases like this, e.g.

Suggested change
let buffer = Buffer.create 4 in
let stdin, _, stderr = Eio.Stdenv.stdio env in
let stdout = Eio.Flow.buffer_sink buffer in
let echo =
Option.get @@ Eio_unix.resolve_program ~paths:[ "/usr/bin"; "/bin" ] "echo"
in
Eio.Switch.run @@ fun sw ->
let child = Eio.Process.spawn proc_mgr ~sw ~stdin ~stdout ~stderr echo [ "echo"; "hello" ] in
Eio.Process.await_exn child;
Buffer.contents buffer;;
Eio.Process.popen ["echo"; "hello"]

- : string = "hello\n"
```

## Time

The standard environment provides a [clock][Eio.Time] with the usual POSIX time:
Expand Down Expand Up @@ -1743,3 +1782,4 @@ Some background about the effects system can be found in:
[Eio.Mutex]: https://ocaml-multicore.github.io/eio/eio/Eio/Mutex/index.html
[Eio.Semaphore]: https://ocaml-multicore.github.io/eio/eio/Eio/Semaphore/index.html
[Eio.Condition]: https://ocaml-multicore.github.io/eio/eio/Eio/Condition/index.html
[Eio.Process]: https://ocaml-multicore.github.io/eio/eio/Eio/Process/index.html
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this will be a broken link until the next release. Might be OK, or we could link to the source code until then.

14 changes: 8 additions & 6 deletions doc/prelude.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ module Eio_main = struct
let run fn =
Eio_main.run @@ fun env ->
fn @@ object
method net = env#net
method stdin = env#stdin
method stdout = env#stdout
method cwd = env#cwd
method domain_mgr = fake_domain_mgr
method clock = fake_clock env#clock
method net = env#net
method stdin = env#stdin
method stdout = env#stdout
method stderr = env#stderr
method cwd = env#cwd
method process_mgr = env#process_mgr
method domain_mgr = fake_domain_mgr
method clock = fake_clock env#clock
end
end

Expand Down
4 changes: 4 additions & 0 deletions lib_eio/eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ module Flow = Flow
module Buf_read = Buf_read
module Buf_write = Buf_write
module Net = Net
module Process = Process
module Domain_manager = Domain_manager
module Time = Time
module File = File
Expand All @@ -35,6 +36,7 @@ module Stdenv = struct
stdout : Flow.sink;
stderr : Flow.sink;
net : Net.t;
process_mgr : Process.mgr;
domain_mgr : Domain_manager.t;
clock : Time.clock;
mono_clock : Time.Mono.t;
Expand All @@ -47,7 +49,9 @@ module Stdenv = struct
let stdin (t : <stdin : #Flow.source; ..>) = t#stdin
let stdout (t : <stdout : #Flow.sink; ..>) = t#stdout
let stderr (t : <stderr : #Flow.sink; ..>) = t#stderr
let stdio (t : <stdin : #Flow.source; stdout: #Flow.sink; stderr : #Flow.sink; ..>) = t#stdin, t#stdout, t#stderr
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not very convinced about this. I suspect that providing defaults for the standard streams would make this unnecessary.

let net (t : <net : #Net.t; ..>) = t#net
let process_mgr (t : <process_mgr : #Process.mgr; ..>) = t#process_mgr
let domain_mgr (t : <domain_mgr : #Domain_manager.t; ..>) = t#domain_mgr
let clock (t : <clock : #Time.clock; ..>) = t#clock
let mono_clock (t : <mono_clock : #Time.Mono.t; ..>) = t#mono_clock
Expand Down
15 changes: 15 additions & 0 deletions lib_eio/eio.mli
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ module Buf_write = Buf_write
(** Networking. *)
module Net = Net

(** Subprocesses. *)
module Process = Process

(** Parallel computation across multiple CPU cores. *)
module Domain_manager : sig
class virtual t : object
Expand Down Expand Up @@ -180,6 +183,7 @@ module Stdenv : sig
stdout : Flow.sink;
stderr : Flow.sink;
net : Net.t;
process_mgr : Process.mgr;
domain_mgr : Domain_manager.t;
clock : Time.clock;
mono_clock : Time.Mono.t;
Expand All @@ -197,6 +201,9 @@ module Stdenv : sig
val stdout : <stdout : #Flow.sink as 'a; ..> -> 'a
val stderr : <stderr : #Flow.sink as 'a; ..> -> 'a

val stdio : <stdin : #Flow.source as 'a; stdout : #Flow.sink as 'b; stderr : #Flow.sink as 'c; ..> -> 'a * 'b * 'c
(** [stdio t] returns [stdin, stdout, stderr]. *)

(** {1 File-system access}

To use these, see {!Path}. *)
Expand All @@ -222,6 +229,14 @@ module Stdenv : sig
val net : <net : #Net.t as 'a; ..> -> 'a
(** [net t] gives access to the process's network namespace. *)

(** {1 Processes }

To use this, see {!Process}.
*)

val process_mgr : <process_mgr : #Process.mgr as 'a; ..> -> 'a
(** [process_mgr t] allows you to run subprocesses. *)

(** {1 Domains (using multiple CPU cores)}

To use this, see {!Domain_manager}.
Expand Down
53 changes: 53 additions & 0 deletions lib_eio/process.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
type status = Exited of int | Signaled of int | Stopped of int
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a polymorphic variant (often the Stopped case isn't possible, e.g. for an exit status).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would something like

type exit_status = [ `Exited of int | `Signaled of int ]

type status = [ exit_status | `Stopped of int ] 

be useful? If the Stopped case can sometimes happen in the exit case the exit_status method would still have to handle it. If it can never happen I suppose we can assert false it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it can never happen and we should assert false for now.


let pp_status ppf = function
| Exited i -> Format.fprintf ppf "Exited %i" i
| Signaled i -> Format.fprintf ppf "Signalled %i" i
| Stopped i -> Format.fprintf ppf "Stopped %i" i

type Exn.err += E of status
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should allow for the possibility of other process errors:

Suggested change
type Exn.err += E of status
type error =
| Exited of status
type Exn.err += E of error


let err e = Exn.create (E e)

let () =
Exn.register_pp (fun f -> function
| E e ->
Fmt.string f "Process ";
pp_status f e;
true
| _ -> false
)

class virtual t = object
method virtual pid : int
method virtual await : status
method virtual signal : int -> unit
end

let pid proc = proc#pid
let await proc = proc#await

let await_exn proc =
match proc#await with
| Exited 0 -> ()
| status -> raise (err status)

let signal proc = proc#signal

class virtual mgr = object
method virtual spawn :
sw:Switch.t ->
?cwd:Fs.dir Path.t ->
stdin:Flow.source ->
stdout:Flow.sink ->
stderr:Flow.sink ->
string ->
string list ->
t
end

let spawn ~sw (t:#mgr) ?cwd ~stdin ~stdout ~stderr cmd args =
t#spawn ~sw ?cwd cmd args
~stdin:(stdin :> Flow.source)
~stdout:(stdout :> Flow.sink)
~stderr:(stderr :> Flow.sink)
46 changes: 46 additions & 0 deletions lib_eio/process.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
type status = Exited of int | Signaled of int | Stopped of int

val pp_status : Format.formatter -> status -> unit

type Exn.err += E of status

val err : status -> exn
(** [err e] is [Eio.Exn.create (E e)] *)

class virtual t : object
method virtual pid : int
method virtual await : status
method virtual signal : int -> unit
end

val pid : #t -> int
(** The process ID *)

val await : #t -> status
(** This functions waits for the subprocess to exit and then reports the status. *)

val await_exn : #t -> unit
(** Like {! await} except an exception is raised if the status is not [Exited 0]. *)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a shame we won't have the failing command in the error message.

We could have a Process.run for the common case of spawn followed by await. Then it would be easy to add run_exn and have it include the command in the error.


val signal : #t -> int -> unit
(** [signal t i] sends the signal [i] to process [t]. *)

class virtual mgr : object
method virtual spawn :
sw:Switch.t ->
?cwd:Fs.dir Path.t ->
stdin:Flow.source ->
stdout:Flow.sink ->
stderr:Flow.sink ->
string ->
string list ->
t
end
(** A process manager capable of spawning new processes. *)

val spawn : sw:Switch.t -> #mgr -> ?cwd:Fs.dir Path.t -> stdin:#Flow.source -> stdout:#Flow.sink -> stderr:#Flow.sink -> string -> string list -> t
(** [spawn ~sw mgr ?cwd ~stdin ~stdout ~stderr cmd args] creates a new subprocess that is connected to the
switch [sw]. A process will be sent {! Sys.sigkill} when the switch is released.

You must provide a standard input and outputs that are backed by file descriptors and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
You must provide a standard input and outputs that are backed by file descriptors and
[stdin], [stderr] and [stdout] must be backed by file descriptors.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, hopefully with the pipe changes I added in d553598 this can be completely removed as we check if there's an FD and if not we pipe into the sources and sinks (that code could do with a review I think)

[cwd] will optionally change the current working directory of the process.*)
15 changes: 14 additions & 1 deletion lib_eio/unix/eio_unix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type socket = <
module Private = struct
type _ Eio.Generic.ty += Unix_file_descr : [`Peek | `Take] -> Unix.file_descr Eio.Generic.ty

type _ Effect.t +=
type _ Effect.t +=
| Await_readable : Unix.file_descr -> unit Effect.t
| Await_writable : Unix.file_descr -> unit Effect.t
| Get_monotonic_clock : Eio.Time.Mono.t Effect.t
Expand Down Expand Up @@ -79,3 +79,16 @@ let getnameinfo (sockaddr : Eio.Net.Sockaddr.t) =
run_in_systhread (fun () ->
let Unix.{ni_hostname; ni_service} = Unix.getnameinfo sockaddr options in
(ni_hostname, ni_service))

let resolve_program ~paths prog =
if not (Filename.is_relative prog) then begin
if Sys.file_exists prog then Some prog else None
end
else
let rec loop = function
| path :: rest ->
let p = Filename.concat path prog in
if Sys.file_exists p then Some p else loop rest
| [] -> None
in
loop paths
8 changes: 6 additions & 2 deletions lib_eio/unix/eio_unix.mli
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ module Private : sig
type _ Eio.Generic.ty += Unix_file_descr : [`Peek | `Take] -> Unix.file_descr Eio.Generic.ty
(** See {!FD}. *)

type _ Effect.t +=
type _ Effect.t +=
| Await_readable : Unix.file_descr -> unit Effect.t (** See {!await_readable} *)
| Await_writable : Unix.file_descr -> unit Effect.t (** See {!await_writable} *)
| Get_monotonic_clock : Eio.Time.Mono.t Effect.t
| Socket_of_fd : Switch.t * bool * Unix.file_descr ->
socket Effect.t (** See {!FD.as_socket} *)
| Socketpair : Eio.Switch.t * Unix.socket_domain * Unix.socket_type * int ->
(socket * socket) Effect.t (** See {!socketpair} *)
| Pipe : Eio.Switch.t ->
| Pipe : Eio.Switch.t ->
(<Eio.Flow.source; Eio.Flow.close; unix_fd> * <Eio.Flow.sink; Eio.Flow.close; unix_fd>) Effect.t (** See {!pipe} *)

module Rcfd = Rcfd
Expand All @@ -110,3 +110,7 @@ module Ctf = Ctf_unix

val getnameinfo : Eio.Net.Sockaddr.t -> (string * string)
(** [getnameinfo sockaddr] returns domain name and service for [sockaddr]. *)

val resolve_program : paths:string list -> string -> string option
(** [resolve_program ~paths prog] tries to resolve the absolute path for [prog]
by looking in each of [paths]. *)
16 changes: 16 additions & 0 deletions lib_eio/unix/fork_action.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
#include <string.h>
#include <errno.h>

#include <caml/memory.h>
#include <caml/mlvalues.h>
#include <caml/unixsupport.h>

#include "fork_action.h"

Expand Down Expand Up @@ -39,6 +41,20 @@ void eio_unix_fork_error(int fd, char *fn, char *buf) {
try_write_all(fd, buf);
}

CAMLprim value eio_unix_spawn(value v_errors, value v_actions) {
CAMLparam1(v_actions);
pid_t child_pid;

child_pid = fork();
if (child_pid == 0) {
eio_unix_run_fork_actions(Int_val(v_errors), v_actions);
} else if (child_pid < 0) {
uerror("fork", Nothing);
}

CAMLreturn(Val_long(child_pid));
}

static char **make_string_array(int errors, value v_array) {
int n = Wosize_val(v_array);
char **c = calloc(sizeof(char *), (n + 1));
Expand Down
6 changes: 6 additions & 0 deletions lib_eio/unix/fork_action.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ let rec with_actions actions fn =
let err_closed op () =
Fmt.failwith "%s: FD is closed!" op

external eio_spawn : Unix.file_descr -> c_action list -> int = "eio_unix_spawn"

let spawn errors actions =
Rcfd.use ~if_closed:(err_closed "spawn") errors @@ fun errors ->
eio_spawn errors actions

external action_execve : unit -> fork_fn = "eio_unix_fork_execve"
let action_execve = action_execve ()
let execve path ~argv ~env = { run = fun k -> k (Obj.repr (action_execve, path, argv, env)) }
Expand Down
6 changes: 6 additions & 0 deletions lib_eio/unix/fork_action.mli
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ type t = { run : 'a. ((c_action -> 'a) -> 'a) } [@@unboxed]

val with_actions : t list -> (c_action list -> 'a) -> 'a

val spawn : Rcfd.t -> c_action list -> int
(** [spawn errors actions] forks a child process and executes [actions] in it.

If an error occurs, a message is written to [errors].
Returns the PID of the child process. *)

(** {2 Actions} *)

val execve : string -> argv:string array -> env:string array -> t
Expand Down
Loading