From d77826f096c965aa5fc65d1061eac70c43bdc251 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Thu, 20 Oct 2022 15:39:51 +0100 Subject: [PATCH] Add pipe to eio_unix --- lib_eio/unix/eio_unix.ml | 3 +++ lib_eio/unix/eio_unix.mli | 7 +++++++ lib_eio_linux/eio_linux.ml | 6 ++++++ lib_eio_luv/eio_luv.ml | 7 +++++++ tests/flow.md | 23 +++++++++++++++++++++++ 5 files changed, 46 insertions(+) diff --git a/lib_eio/unix/eio_unix.ml b/lib_eio/unix/eio_unix.ml index 56b03de94..ca6c7f4d7 100644 --- a/lib_eio/unix/eio_unix.ml +++ b/lib_eio/unix/eio_unix.ml @@ -17,6 +17,7 @@ module Private = struct | Get_system_clock : Eio.Time.clock Effect.t | Socket_of_fd : Eio.Switch.t * bool * Unix.file_descr -> socket Effect.t | Socketpair : Eio.Switch.t * Unix.socket_domain * Unix.socket_type * int -> (socket * socket) Effect.t + | Pipe : Eio.Switch.t -> ( * ) Effect.t end let await_readable fd = Effect.perform (Private.Await_readable fd) @@ -48,6 +49,8 @@ end let socketpair ~sw ?(domain=Unix.PF_UNIX) ?(ty=Unix.SOCK_STREAM) ?(protocol=0) () = Effect.perform (Private.Socketpair (sw, domain, ty, protocol)) +let pipe sw = Effect.perform (Private.Pipe sw) + module Ipaddr = struct let to_unix : _ Eio.Net.Ipaddr.t -> Unix.inet_addr = Obj.magic let of_unix : Unix.inet_addr -> _ Eio.Net.Ipaddr.t = Obj.magic diff --git a/lib_eio/unix/eio_unix.mli b/lib_eio/unix/eio_unix.mli index 3f9abb06c..7fa3207ad 100644 --- a/lib_eio/unix/eio_unix.mli +++ b/lib_eio/unix/eio_unix.mli @@ -77,6 +77,11 @@ val socketpair : This creates OS-level resources using [socketpair(2)]. Note that, like all FDs created by Eio, they are both marked as close-on-exec by default. *) +val pipe : Switch.t -> * +(** [pipe sw] returns a connected pair of flows [src] and [sink]. Data written to [sink] + can be read from [src]. + Note that, like all FDs created by Eio, they are both marked as close-on-exec by default. *) + (** API for Eio backends only. *) module Private : sig type _ Eio.Generic.ty += Unix_file_descr : [`Peek | `Take] -> Unix.file_descr Eio.Generic.ty @@ -90,6 +95,8 @@ module Private : sig socket Effect.t (** See {!FD.as_socket} *) | Socketpair : Eio.Switch.t * Unix.socket_domain * Unix.socket_type * int -> (socket * socket) Effect.t (** See {!socketpair} *) + | Pipe : Eio.Switch.t -> + ( * ) Effect.t (** See {!pipe} *) end module Ctf = Ctf_unix diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index dd1caf7aa..7722eebca 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -1446,6 +1446,12 @@ let rec run : type a. let b = FD.of_unix ~sw ~seekable:false ~close_unix:true b |> flow in continue k ((a :> Eio_unix.socket), (b :> Eio_unix.socket)) ) + | Eio_unix.Private.Pipe sw -> Some (fun k -> + let r, w = Unix.pipe ~cloexec:true () in + let r = (flow (FD.of_unix ~sw ~seekable:false ~close_unix:true r) :> ) in + let w = (flow (FD.of_unix ~sw ~seekable:false ~close_unix:true w) :> ) in + continue k (r, w) + ) | Low_level.Alloc -> Some (fun k -> match st.mem with | None -> continue k None diff --git a/lib_eio_luv/eio_luv.ml b/lib_eio_luv/eio_luv.ml index c758ffd15..96f7b50a9 100644 --- a/lib_eio_luv/eio_luv.ml +++ b/lib_eio_luv/eio_luv.ml @@ -596,6 +596,7 @@ let get_fd_opt t = Eio.Generic.probe t FD let flow fd = object (_ : ) method fd = fd method close = Low_level.File.close fd + method unix_fd op = File.to_unix op fd method probe : type a. a Eio.Generic.ty -> a option = function | FD -> Some fd @@ -1155,6 +1156,12 @@ let rec run : type a. (_ -> a) -> a = fun main -> with Luv_error _ as ex -> discontinue k ex ) + | Eio_unix.Private.Pipe sw -> Some (fun k -> + let r, w = Luv.Pipe.pipe ~read_flags:[] ~write_flags:[] () |> or_raise in + let r = (flow (File.of_luv ~close_unix:true ~sw r) :> ) in + let w = (flow (File.of_luv ~close_unix:true ~sw w) :> ) in + continue k (r, w) + ) | _ -> None } in diff --git a/tests/flow.md b/tests/flow.md index 8cf8caf8d..3e99d098f 100644 --- a/tests/flow.md +++ b/tests/flow.md @@ -118,3 +118,26 @@ Copying from src using `Read_source_buffer`: +dst: wrote "foobar" - : unit = () ``` + +## Pipes + +Writing to and reading from a pipe. + +```ocaml +# Eio_main.run @@ fun env -> + Switch.run @@ fun sw -> + let r, w = Eio_unix.pipe sw in + let msg = "Hello, world" in + Eio.Fiber.both + (fun () -> + let buf = Cstruct.create (String.length msg) in + let () = Eio.Flow.read_exact r buf in + traceln "Got: %s" (Cstruct.to_string buf) + ) + (fun () -> + Eio.Flow.copy_string msg w; + Eio.Flow.close w + );; ++Got: Hello, world +- : unit = () +```