From f30025cc99dcbe7e63a95e3d31869797bc929165 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Wed, 23 Nov 2022 10:48:32 +0000 Subject: [PATCH 1/2] Expose Eio_luv.Low_level.Stream.write --- lib_eio_luv/eio_luv.mli | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/lib_eio_luv/eio_luv.mli b/lib_eio_luv/eio_luv.mli index 7169fcf6f..ac82b7608 100644 --- a/lib_eio_luv/eio_luv.mli +++ b/lib_eio_luv/eio_luv.mli @@ -131,6 +131,18 @@ module Low_level : sig @param sw The handle is closed when [sw] is released, if not closed manually first. @param close_unix if [true] (the default), calling [close] also closes [fd]. *) end + + module Stream : sig + type 'a t = [`Stream of 'a] Handle.t + + val read_into : [< `Pipe | `TCP | `TTY ] t -> Luv.Buffer.t -> int + (** [read_into handle buf] reads some bytes from [handle] into [buf] returning the number + of bytes read. + @raise End_of_file if there is no more data to read *) + + val write : [ `Stream of [< `Pipe | `TCP | `TTY ] ] Handle.t -> Luv.Buffer.t list -> unit + (** [write handle bufs] writes the contents of [bufs] to [handle]. *) + end end (** {1 Eio API} *) From de4532177a8f5a670cd1d450843ebec60d6a43cc Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Sat, 29 Oct 2022 14:14:17 +0100 Subject: [PATCH 2/2] Add low-level process support to eio_luv --- lib_eio_luv/eio_luv.ml | 45 ++++++++++++++ lib_eio_luv/eio_luv.mli | 52 ++++++++++++++++ lib_eio_luv/tests/process.md | 116 +++++++++++++++++++++++++++++++++++ 3 files changed, 213 insertions(+) create mode 100644 lib_eio_luv/tests/process.md diff --git a/lib_eio_luv/eio_luv.ml b/lib_eio_luv/eio_luv.ml index bef32a6f3..738e7aa23 100644 --- a/lib_eio_luv/eio_luv.ml +++ b/lib_eio_luv/eio_luv.ml @@ -574,6 +574,51 @@ module Low_level = struct Handle.of_luv ~sw sock end + module Pipe = struct + type t = [`Stream of [`Pipe]] Handle.t + + let init ?for_handle_passing ~sw () = + Luv.Pipe.init ~loop:(get_loop ()) ?for_handle_passing () + |> or_raise + |> Handle.of_luv ~close_unix:true ~sw + end + + module Process = struct + type t = { + proc : Luv.Process.t; + status : (int * int64) Promise.t; + } + + let pid t = Luv.Process.pid t.proc + + let to_parent_pipe ?readable_in_child ?writable_in_child ?overlapped ~fd ~parent_pipe () = + let parent_pipe = Handle.to_luv parent_pipe in + Luv.Process.to_parent_pipe ?readable_in_child ?writable_in_child ?overlapped ~fd ~parent_pipe () + + let await_exit t = Promise.await t.status + + let has_exited t = Promise.is_resolved t.status + + let send_signal t i = Luv.Process.kill t.proc i |> or_raise + + let spawn ?cwd ?env ?uid ?gid ?(redirect=[]) ~sw cmd args = + let status, set_status = Promise.create () in + let hook = ref None in + let on_exit _ ~exit_status ~term_signal = + Option.iter Switch.remove_hook !hook; + Promise.resolve set_status (term_signal, exit_status) + in + let proc = Luv.Process.spawn ?environment:env ?uid ?gid ~loop:(get_loop ()) ?working_directory:cwd ~redirect ~on_exit cmd args |> or_raise in + if not (Promise.is_resolved status) then ( + let h = Switch.on_release_cancellable sw (fun () -> + Luv.Process.kill proc Luv.Signal.sigkill |> or_raise; + ignore (Promise.await status) + ) in + hook := Some h + ); + { proc; status } + end + module Poll = Poll let sleep_ms delay = diff --git a/lib_eio_luv/eio_luv.mli b/lib_eio_luv/eio_luv.mli index ac82b7608..63810bfe8 100644 --- a/lib_eio_luv/eio_luv.mli +++ b/lib_eio_luv/eio_luv.mli @@ -143,6 +143,58 @@ module Low_level : sig val write : [ `Stream of [< `Pipe | `TCP | `TTY ] ] Handle.t -> Luv.Buffer.t list -> unit (** [write handle bufs] writes the contents of [bufs] to [handle]. *) end + + module Pipe : sig + type t = [`Pipe] Stream.t + (** A pipe *) + + val init : ?for_handle_passing:bool -> sw:Switch.t -> unit -> t + (** Wraps {!Luv.Pipe.init}*) + end + + module Process : sig + type t + (** A process *) + + val pid : t -> int + (** [pid t] returns the process id of [t]. *) + + val to_parent_pipe : + ?readable_in_child:bool -> + ?writable_in_child:bool -> + ?overlapped:bool -> + fd:int -> + parent_pipe:Pipe.t -> + unit -> + Luv.Process.redirection + (** Wraps {!Luv.Process.to_parent_pipe}*) + + val await_exit : t -> int * int64 + (** [await_exit t] waits for the process [t] to finish. + + It returns [(exit_status, term_signal)], see {!Luv.Process.spawn} for + more details on these values. *) + + val has_exited : t -> bool + (** [has_exited t] checks if the process [t] has exited or not. *) + + val send_signal : t -> int -> unit + (** A wrapper for {!Luv.Process.kill}. *) + + val spawn : + ?cwd:string -> + ?env:(string * string) list -> + ?uid:int -> + ?gid:int -> + ?redirect:Luv.Process.redirection list -> + sw:Switch.t -> + string -> + string list -> t + (** Wraps {!Luv.Process.spawn}. + + The process will be stopped when the switch is released if + it has not already exited.*) + end end (** {1 Eio API} *) diff --git a/lib_eio_luv/tests/process.md b/lib_eio_luv/tests/process.md new file mode 100644 index 000000000..46aae4ac0 --- /dev/null +++ b/lib_eio_luv/tests/process.md @@ -0,0 +1,116 @@ +# Set up the test environment + +```ocaml +# #require "eio_luv";; +# open Eio.Std;; +# open Eio;; +# module Process = Eio_luv.Low_level.Process;; +module Process = Eio_luv.Low_level.Process +``` + +A helper function for reading all of the bytes from a handle. + +```ocaml +let read_all handle buf = + let rec read acc = + match Eio_luv.Low_level.Stream.read_into handle buf with + | i -> read (acc + i) + | exception End_of_file -> acc + in read 0 +``` + +A simple `echo hello` process redirects to stdout. + +```ocaml +# Eio_luv.run @@ fun _env -> + Switch.run @@ fun sw -> + let redirect = Luv.Process.[ + inherit_fd ~fd:stdout ~from_parent_fd:stdout () + ] in + let t = Process.spawn ~sw ~redirect "echo" [ "echo"; "hello" ] in + Process.await_exit t;; +hello +- : int * int64 = (0, 0L) +``` + +Using a pipe to redirect output to a buffer. + +```ocaml +# Eio_luv.run @@ fun _env -> + Switch.run @@ fun sw -> + let parent_pipe = Eio_luv.Low_level.Pipe.init ~sw () in + let buf = Luv.Buffer.create 32 in + let redirect = Eio_luv.Low_level.Process.[ + to_parent_pipe ~fd:Luv.Process.stdout ~parent_pipe () + ] in + let t = Process.spawn ~sw ~redirect "echo" [ "echo"; "Hello,"; "World!" ] in + let read = read_all parent_pipe buf in + let _ = Process.await_exit t in + Luv.Buffer.to_string (Luv.Buffer.sub buf ~offset:0 ~length:read);; +- : string = "Hello, World!\n" +``` + +Writing to stdin of a process works. + +```ocaml +# Eio_luv.run @@ fun _env -> + Switch.run @@ fun sw -> + let parent_pipe = Eio_luv.Low_level.Pipe.init ~sw () in + let bufs = [ Luv.Buffer.from_string "Hello!" ] in + let redirect = Luv.Process.[ + inherit_fd ~fd:stdout ~from_parent_fd:stdout (); + Process.to_parent_pipe ~fd:stdin ~parent_pipe () + ] in + let t = Process.spawn ~sw ~redirect "head" [ "head" ] in + Eio_luv.Low_level.Stream.write parent_pipe bufs; + Eio_luv.Low_level.Handle.close parent_pipe; + Process.await_exit t;; +Hello! +- : int * int64 = (0, 0L) +``` + +Stopping a process works. + +```ocaml +# Eio_luv.run @@ fun _env -> + Switch.run @@ fun sw -> + let redirect = Luv.Process.[ + inherit_fd ~fd:stdout ~from_parent_fd:stdout () + ] in + let t = Process.spawn ~sw ~redirect "sleep" [ "sleep"; "10" ] in + Process.send_signal t Luv.Signal.sigkill; + Process.await_exit t;; +- : int * int64 = (9, 0L) +``` + +Forgetting to wait for a process to finish stops the process. + +```ocaml +# Eio_luv.run @@ fun _env -> + let proc = + Switch.run @@ fun sw -> + let redirect = Luv.Process.[ + inherit_fd ~fd:stdout ~from_parent_fd:stdout () + ] in + Process.spawn ~sw ~redirect "sleep" [ "sleep"; "10" ] + in + Process.await_exit proc;; +- : int * int64 = (9, 0L) +``` + +Stopping a process interacts nicely with switches. + +```ocaml +# Eio_luv.run @@ fun _env -> + let proc = + Switch.run @@ fun sw -> + let redirect = Luv.Process.[ + inherit_fd ~fd:stdout ~from_parent_fd:stdout () + ] in + let t = Process.spawn ~sw ~redirect "sleep" [ "sleep"; "10" ] in + Process.send_signal t Luv.Signal.sigkill; + t + in + Process.await_exit proc;; +- : int * int64 = (9, 0L) +```