Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eio_luv: add low-level process support #359

Merged
merged 2 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions lib_eio_luv/eio_luv.ml
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,51 @@ module Low_level = struct
Handle.of_luv ~sw sock
end

module Pipe = struct
type t = [`Stream of [`Pipe]] Handle.t

let init ?for_handle_passing ~sw () =
Luv.Pipe.init ~loop:(get_loop ()) ?for_handle_passing ()
|> or_raise
|> Handle.of_luv ~close_unix:true ~sw
Comment on lines +580 to +583
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to attach to the switch on init. We need to make sure it's freed even if to_handle isn't called, and we don't want to call it twice if to_handle is used twice.

I also exposed the type in the mli, removing the need for to_luv and to_handle. We already allow users to convert other Luv resources to handles, so might as well allow pipes too.

I opened #371 to expose get_loop.

end

module Process = struct
type t = {
proc : Luv.Process.t;
status : (int * int64) Promise.t;
}

let pid t = Luv.Process.pid t.proc

let to_parent_pipe ?readable_in_child ?writable_in_child ?overlapped ~fd ~parent_pipe () =
let parent_pipe = Handle.to_luv parent_pipe in
Luv.Process.to_parent_pipe ?readable_in_child ?writable_in_child ?overlapped ~fd ~parent_pipe ()

let await_exit t = Promise.await t.status
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the remove_hook to spawn. It needs to be called even if the application doesn't wait.


let has_exited t = Promise.is_resolved t.status

let send_signal t i = Luv.Process.kill t.proc i |> or_raise

let spawn ?cwd ?env ?uid ?gid ?(redirect=[]) ~sw cmd args =
let status, set_status = Promise.create () in
let hook = ref None in
let on_exit _ ~exit_status ~term_signal =
Option.iter Switch.remove_hook !hook;
Promise.resolve set_status (term_signal, exit_status)
in
let proc = Luv.Process.spawn ?environment:env ?uid ?gid ~loop:(get_loop ()) ?working_directory:cwd ~redirect ~on_exit cmd args |> or_raise in
if not (Promise.is_resolved status) then (
let h = Switch.on_release_cancellable sw (fun () ->
Luv.Process.kill proc Luv.Signal.sigkill |> or_raise;
ignore (Promise.await status)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a wait here just so we can be sure the process has completely finished stopping by the time the switch finishes.

) in
hook := Some h
);
{ proc; status }
end

module Poll = Poll

let sleep_ms delay =
Expand Down
64 changes: 64 additions & 0 deletions lib_eio_luv/eio_luv.mli
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,70 @@ module Low_level : sig
@param sw The handle is closed when [sw] is released, if not closed manually first.
@param close_unix if [true] (the default), calling [close] also closes [fd]. *)
end

module Stream : sig
type 'a t = [`Stream of 'a] Handle.t

val read_into : [< `Pipe | `TCP | `TTY ] t -> Luv.Buffer.t -> int
(** [read_into handle buf] reads some bytes from [handle] into [buf] returning the number
of bytes read.
@raise End_of_file if there is no more data to read *)

val write : [ `Stream of [< `Pipe | `TCP | `TTY ] ] Handle.t -> Luv.Buffer.t list -> unit
(** [write handle bufs] writes the contents of [bufs] to [handle]. *)
end

module Pipe : sig
type t = [`Pipe] Stream.t
(** A pipe *)

val init : ?for_handle_passing:bool -> sw:Switch.t -> unit -> t
(** Wraps {!Luv.Pipe.init}*)
end

module Process : sig
type t
(** A process *)

val pid : t -> int
(** [pid t] returns the process id of [t]. *)

val to_parent_pipe :
?readable_in_child:bool ->
?writable_in_child:bool ->
?overlapped:bool ->
fd:int ->
parent_pipe:Pipe.t ->
unit ->
Luv.Process.redirection
(** Wraps {!Luv.Process.to_parent_pipe}*)

val await_exit : t -> int * int64
(** [await_exit t] waits for the process [t] to finish.

It returns [(exit_status, term_signal)], see {!Luv.Process.spawn} for
more details on these values. *)

val has_exited : t -> bool
(** [has_exited t] checks if the process [t] has exited or not. *)

val send_signal : t -> int -> unit
(** A wrapper for {!Luv.Process.kill}. *)

val spawn :
?cwd:string ->
?env:(string * string) list ->
?uid:int ->
?gid:int ->
?redirect:Luv.Process.redirection list ->
sw:Switch.t ->
string ->
string list -> t
(** Wraps {!Luv.Process.spawn}.

The process will be stopped when the switch is released if
it has not already exited.*)
end
end

(** {1 Eio API} *)
Expand Down
116 changes: 116 additions & 0 deletions lib_eio_luv/tests/process.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Set up the test environment

```ocaml
# #require "eio_luv";;
# open Eio.Std;;
# open Eio;;
# module Process = Eio_luv.Low_level.Process;;
module Process = Eio_luv.Low_level.Process
```

A helper function for reading all of the bytes from a handle.

```ocaml
let read_all handle buf =
let rec read acc =
match Eio_luv.Low_level.Stream.read_into handle buf with
| i -> read (acc + i)
| exception End_of_file -> acc
in read 0
```

A simple `echo hello` process redirects to stdout.

```ocaml
# Eio_luv.run @@ fun _env ->
Switch.run @@ fun sw ->
patricoferris marked this conversation as resolved.
Show resolved Hide resolved
let redirect = Luv.Process.[
inherit_fd ~fd:stdout ~from_parent_fd:stdout ()
] in
let t = Process.spawn ~sw ~redirect "echo" [ "echo"; "hello" ] in
Process.await_exit t;;
hello
- : int * int64 = (0, 0L)
```

Using a pipe to redirect output to a buffer.

```ocaml
# Eio_luv.run @@ fun _env ->
Switch.run @@ fun sw ->
let parent_pipe = Eio_luv.Low_level.Pipe.init ~sw () in
let buf = Luv.Buffer.create 32 in
let redirect = Eio_luv.Low_level.Process.[
to_parent_pipe ~fd:Luv.Process.stdout ~parent_pipe ()
] in
let t = Process.spawn ~sw ~redirect "echo" [ "echo"; "Hello,"; "World!" ] in
let read = read_all parent_pipe buf in
let _ = Process.await_exit t in
Luv.Buffer.to_string (Luv.Buffer.sub buf ~offset:0 ~length:read);;
- : string = "Hello, World!\n"
```

Writing to stdin of a process works.

```ocaml
# Eio_luv.run @@ fun _env ->
Switch.run @@ fun sw ->
let parent_pipe = Eio_luv.Low_level.Pipe.init ~sw () in
let bufs = [ Luv.Buffer.from_string "Hello!" ] in
let redirect = Luv.Process.[
inherit_fd ~fd:stdout ~from_parent_fd:stdout ();
Process.to_parent_pipe ~fd:stdin ~parent_pipe ()
] in
let t = Process.spawn ~sw ~redirect "head" [ "head" ] in
Eio_luv.Low_level.Stream.write parent_pipe bufs;
Eio_luv.Low_level.Handle.close parent_pipe;
Process.await_exit t;;
Hello!
- : int * int64 = (0, 0L)
```

Stopping a process works.

```ocaml
# Eio_luv.run @@ fun _env ->
Switch.run @@ fun sw ->
let redirect = Luv.Process.[
inherit_fd ~fd:stdout ~from_parent_fd:stdout ()
] in
let t = Process.spawn ~sw ~redirect "sleep" [ "sleep"; "10" ] in
Process.send_signal t Luv.Signal.sigkill;
Process.await_exit t;;
- : int * int64 = (9, 0L)
```

Forgetting to wait for a process to finish stops the process.

```ocaml
# Eio_luv.run @@ fun _env ->
let proc =
Switch.run @@ fun sw ->
let redirect = Luv.Process.[
inherit_fd ~fd:stdout ~from_parent_fd:stdout ()
] in
Process.spawn ~sw ~redirect "sleep" [ "sleep"; "10" ]
in
Process.await_exit proc;;
- : int * int64 = (9, 0L)
```

Stopping a process interacts nicely with switches.

```ocaml
# Eio_luv.run @@ fun _env ->
let proc =
Switch.run @@ fun sw ->
let redirect = Luv.Process.[
inherit_fd ~fd:stdout ~from_parent_fd:stdout ()
] in
let t = Process.spawn ~sw ~redirect "sleep" [ "sleep"; "10" ] in
Process.send_signal t Luv.Signal.sigkill;
t
in
Process.await_exit proc;;
- : int * int64 = (9, 0L)
```