Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cross-platform subprocess support #499

Merged
merged 1 commit into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Eio replaces existing concurrency libraries such as Lwt
* [Buffered Writing](#buffered-writing)
* [Error Handling](#error-handling)
* [Filesystem Access](#filesystem-access)
* [Running processes](#running-processes)
* [Time](#time)
* [Multicore Support](#multicore-support)
* [Synchronisation Tools](#synchronisation-tools)
Expand Down Expand Up @@ -876,6 +877,55 @@ A program that operates on the current directory will probably want to use `cwd`
whereas a program that accepts a path from the user will probably want to use `fs`,
perhaps with `open_dir` to constrain all access to be within that directory.

## Running processes

Spawning a child process can be done using the [Eio.Process][] module:

```ocaml
# Eio_main.run @@ fun env ->
let proc_mgr = Eio.Stdenv.process_mgr env in
Eio.Process.run proc_mgr ["echo"; "hello"];;
hello
- : unit = ()
```

There are various optional arguments for setting the process's current directory or connecting up the standard streams.
For example, we can use `tr` to convert some text to upper-case:

```ocaml
# Eio_main.run @@ fun env ->
let proc_mgr = Eio.Stdenv.process_mgr env in
Eio.Process.run proc_mgr ["tr"; "a-z"; "A-Z"]
~stdin:(Eio.Flow.string_source "One two three\n");;
ONE TWO THREE
- : unit = ()
```

If you want to capture the output of a process, you can provide a suitable `Eio.Flow.sink` as the `stdout` argument,
or use the `parse_out` convenience wrapper:

```ocaml
# Eio_main.run @@ fun env ->
let proc_mgr = Eio.Stdenv.process_mgr env in
Eio.Process.parse_out proc_mgr Eio.Buf_read.line ["echo"; "hello"];;
- : string = "hello"
```

All process functions either return the exit status or check that it was zero (success):

```ocaml
# Eio_main.run @@ fun env ->
let proc_mgr = Eio.Stdenv.process_mgr env in
Eio.Process.parse_out proc_mgr Eio.Buf_read.take_all ["sh"; "-c"; "exit 3"];;
Exception:
Eio.Io Process Child_error Exited 3,
running command: sh -c "exit 3"
```

`Process.spawn` and `Process.await` give more control over the process's lifetime
and exit status, and `Eio_unix.Process` gives more control over passing file
descriptors (on systems that support them).

## Time

The standard environment provides a [clock][Eio.Time] with the usual POSIX time:
Expand Down Expand Up @@ -1825,3 +1875,4 @@ Some background about the effects system can be found in:
[kcas]: https://github.com/ocaml-multicore/kcas
[Meio]: https://github.com/tarides/meio
[Lambda Capabilities]: https://roscidus.com/blog/blog/2023/04/26/lambda-capabilities/
[Eio.Process]: https://github.com/ocaml-multicore/eio/blob/main/lib_eio/process.ml
14 changes: 8 additions & 6 deletions doc/prelude.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ module Eio_main = struct
let run fn =
Eio_main.run @@ fun env ->
fn @@ object
method net = env#net
method stdin = env#stdin
method stdout = env#stdout
method cwd = env#cwd
method domain_mgr = fake_domain_mgr
method clock = fake_clock env#clock
method net = env#net
method stdin = env#stdin
method stdout = env#stdout
method stderr = env#stderr
method cwd = env#cwd
method process_mgr = env#process_mgr
method domain_mgr = fake_domain_mgr
method clock = fake_clock env#clock
end
end

Expand Down
2 changes: 2 additions & 0 deletions lib_eio/eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ module Flow = Flow
module Buf_read = Buf_read
module Buf_write = Buf_write
module Net = Net
module Process = Process
module Domain_manager = Domain_manager
module Time = Time
module File = File
Expand All @@ -34,6 +35,7 @@ module Stdenv = struct
let stdout (t : <stdout : #Flow.sink; ..>) = t#stdout
let stderr (t : <stderr : #Flow.sink; ..>) = t#stderr
let net (t : <net : #Net.t; ..>) = t#net
let process_mgr (t : <process_mgr : #Process.mgr; ..>) = t#process_mgr
let domain_mgr (t : <domain_mgr : #Domain_manager.t; ..>) = t#domain_mgr
let clock (t : <clock : #Time.clock; ..>) = t#clock
let mono_clock (t : <mono_clock : #Time.Mono.t; ..>) = t#mono_clock
Expand Down
11 changes: 11 additions & 0 deletions lib_eio/eio.mli
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ module Buf_write = Buf_write
(** Networking. *)
module Net = Net

(** Managing child processes. *)
module Process = Process

(** Parallel computation across multiple CPU cores. *)
module Domain_manager : sig
class virtual t : object
Expand Down Expand Up @@ -208,6 +211,14 @@ module Stdenv : sig
val net : <net : #Net.t as 'a; ..> -> 'a
(** [net t] gives access to the process's network namespace. *)

(** {1 Processes }

To use this, see {!Process}.
*)

val process_mgr : <process_mgr : #Process.mgr as 'a; ..> -> 'a
(** [process_mgr t] allows you to manage child processes. *)

(** {1 Domains (using multiple CPU cores)}

To use this, see {!Domain_manager}.
Expand Down
110 changes: 110 additions & 0 deletions lib_eio/process.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
type exit_status = [
| `Exited of int
| `Signaled of int
]

type status = [ exit_status | `Stopped of int ]

let pp_status ppf = function
| `Exited i -> Format.fprintf ppf "Exited %i" i
| `Signaled i -> Format.fprintf ppf "Signalled %i" i
| `Stopped i -> Format.fprintf ppf "Stopped %i" i

type error =
| Executable_not_found of string
| Child_error of exit_status

type Exn.err += E of error

let err e = Exn.create (E e)

let () =
Exn.register_pp (fun f -> function
| E e ->
Fmt.string f "Process ";
begin match e with
| Executable_not_found e -> Fmt.pf f "Executable %S not found" e;
| Child_error e -> Fmt.pf f "Child_error %a" pp_status e;
end;
true
| _ -> false
)

class virtual t = object
method virtual pid : int
method virtual await : exit_status
method virtual signal : int -> unit
end

let pid proc = proc#pid
let await proc = proc#await

let await_exn proc =
match proc#await with
| `Exited 0 -> ()
| status -> raise (err (Child_error status))

let signal proc = proc#signal

class virtual mgr = object
method virtual pipe :
sw:Switch.t ->
<Flow.source; Flow.close> * <Flow.sink; Flow.close>

method virtual spawn :
sw:Switch.t ->
?cwd:Fs.dir Path.t ->
?stdin:Flow.source ->
?stdout:Flow.sink ->
?stderr:Flow.sink ->
?env:string array ->
?executable:string ->
string list ->
t
end

let bad_char = function
| ' ' | '"' | '\'' | '\\' -> true
| c ->
let c = Char.code c in
c <= 32 || c >= 127

let pp_arg f x =
if x = "" || String.exists bad_char x then Fmt.(quote string) f x
else Fmt.string f x

let pp_args = Fmt.hbox (Fmt.list ~sep:Fmt.sp pp_arg)

let spawn ~sw (t:#mgr) ?cwd ?stdin ?stdout ?stderr ?env ?executable args =
t#spawn ~sw
?cwd:(cwd :> Fs.dir Path.t option)
?env
?executable args
?stdin:(stdin :> Flow.source option)
?stdout:(stdout :> Flow.sink option)
?stderr:(stderr :> Flow.sink option)

let run (t:#mgr) ?cwd ?stdin ?stdout ?stderr ?env ?executable args =
Switch.run @@ fun sw ->
let child = spawn ~sw t ?cwd ?stdin ?stdout ?stderr ?env ?executable args in
match await child with
| `Exited 0 -> ()
| status ->
let ex = err (Child_error status) in
raise (Exn.add_context ex "running command: %a" pp_args args)

let pipe ~sw (t:#mgr) = t#pipe ~sw

let parse_out (t:#mgr) parse ?cwd ?stdin ?stderr ?env ?executable args =
Switch.run @@ fun sw ->
let r, w = pipe t ~sw in
try
let child = spawn ~sw t ?cwd ?stdin ~stdout:w ?stderr ?env ?executable args in
Flow.close w;
let output = Buf_read.parse_exn parse r ~max_size:max_int in
Flow.close r;
await_exn child;
output
with Exn.Io _ as ex ->
let bt = Printexc.get_raw_backtrace () in
Exn.reraise_with_context ex bt "running command: %a" pp_args args
150 changes: 150 additions & 0 deletions lib_eio/process.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
(** Example:
{[
# Eio_main.run @@ fun env ->
let proc_mgr = Eio.Stdenv.process_mgr env in
Eio.Process.parse_out proc_mgr Eio.Buf_read.line ["echo"; "hello"]
]}
*)

(** {2 Status and error types} *)

type exit_status = [
| `Exited of int (** Process exited with the given return code. *)
| `Signaled of int (** Process was killed by the given signal. *)
]

type status = [
| exit_status
| `Stopped of int (** Process was stopped (paused) by the given signal. *)
]

val pp_status : status Fmt.t

type error =
| Executable_not_found of string (** The requested executable does not exist. *)
| Child_error of exit_status (** The process exited with an error status. *)

type Exn.err += E of error

val err : error -> exn
(** [err e] is [Eio.Exn.create (E e)] *)

val pp_args : string list Fmt.t
(** Formats a list of arguments, quoting any that might cause confusion to the reader.

This is intended for use in error messages and logging.*)

(** {2 Processes} *)

(** A process. *)
class virtual t : object
method virtual pid : int
method virtual await : exit_status
method virtual signal : int -> unit
end

val pid : #t -> int
(** [pid t] is the process ID of [t]. *)

val await : #t -> exit_status
(** [await t] waits for process [t] to exit and then reports the status. *)

val await_exn : #t -> unit
(** Like {! await} except an exception is raised if the status is not [`Exited 0]. *)

val signal : #t -> int -> unit
(** [signal t i] sends the signal [i] to process [t].

If the process has already exited then this does nothing
(it will not signal a different process, even if the PID has been reused).

See {!Sys} for the signal numbers. *)

(** {2 Spawning processes} *)

class virtual mgr : object
method virtual pipe :
sw:Switch.t ->
<Flow.source; Flow.close> * <Flow.sink; Flow.close>

method virtual spawn :
sw:Switch.t ->
?cwd:Fs.dir Path.t ->
?stdin:Flow.source ->
?stdout:Flow.sink ->
?stderr:Flow.sink ->
?env:string array ->
?executable:string ->
string list ->
t
end
(** A process manager capable of spawning new processes. *)
patricoferris marked this conversation as resolved.
Show resolved Hide resolved

val spawn :
sw:Switch.t ->
#mgr ->
?cwd:#Fs.dir Path.t ->
?stdin:#Flow.source ->
?stdout:#Flow.sink ->
?stderr:#Flow.sink ->
?env:string array ->
?executable:string ->
string list -> t
(** [spawn ~sw mgr args] creates a new child process that is connected to the switch [sw].

The child process will be sent {! Sys.sigkill} when the switch is released.

If the flows [stdin], [stdout] and [stderr] are not backed by file descriptors then
this also creates pipes and spawns fibers to copy the data as necessary.
If you need more control over file descriptors, see {!Eio_unix.Process}.

@param cwd The current working directory of the process (default: same as parent process).
@param stdin The flow to attach to the process's standard input (default: same as parent process).
@param stdout A flow that the process's standard output goes to (default: same as parent process).
@param stderr A flow that the process's standard error goes to (default: same as parent process).
@param env The environment for the process (default: same as parent process).
@param executable The path of the executable to run.
If not given then the first item in [args] is used,
searching $PATH for it if necessary. *)

val run :
#mgr ->
?cwd:#Fs.dir Path.t ->
?stdin:#Flow.source ->
?stdout:#Flow.sink ->
?stderr:#Flow.sink ->
?env:string array ->
?executable:string ->
string list -> unit
(** [run] does {!spawn} followed by {!await_exn}, with the advantage that if the process fails then
the error message includes the command that failed.

Note: If [spawn] needed to create extra fibers to copy [stdin], etc, then it also waits for those to finish. *)

val parse_out :
patricoferris marked this conversation as resolved.
Show resolved Hide resolved
#mgr ->
'a Buf_read.parser ->
?cwd:#Fs.dir Path.t ->
?stdin:#Flow.source ->
?stderr:#Flow.sink ->
?env:string array ->
?executable:string ->
string list -> 'a
(** [parse_out mgr parser args] runs [args] and parses the child's stdout with [parser].

It also waits for the process to finish and checks its exit status is zero.

Note that [parser] must consume the entire output of the process (like {!Buf_read.parse}).

To return all the output as a string, use {!Buf_read.take_all} as the parser.

This is a convenience wrapper around {!run},
and the optional arguments have the same meanings. *)

(** {2 Pipes} *)

val pipe : sw:Switch.t -> #mgr -> <Flow.source; Flow.close> * <Flow.sink; Flow.close>
(** [pipe ~sw mgr] creates a pipe backed by the OS.

The flows can be used by {!spawn} without the need for extra fibers to copy the data.
This can be used to connect multiple processes together. *)
Loading