diff --git a/dune-project b/dune-project index 678cabfb4..a2efcaee2 100644 --- a/dune-project +++ b/dune-project @@ -39,7 +39,7 @@ (logs (>= 0.7.0)) (fmt (>= 0.8.9)) (cmdliner (and (>= 1.1.0) :with-test)) - (uring (>= 0.4)))) + (uring (>= 0.5)))) (package (name eio_luv) (synopsis "Eio implementation using luv (libuv)") diff --git a/eio_linux.opam b/eio_linux.opam index 184b12174..6cdc8612c 100644 --- a/eio_linux.opam +++ b/eio_linux.opam @@ -16,7 +16,7 @@ depends: [ "logs" {>= "0.7.0"} "fmt" {>= "0.8.9"} "cmdliner" {>= "1.1.0" & with-test} - "uring" {>= "0.4"} + "uring" {>= "0.5"} "odoc" {with-doc} ] build: [ diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index e951f5fab..07398d583 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -234,11 +234,18 @@ type _ Effect.t += Enter : (t -> 'a Suspended.t -> unit) -> 'a Effect.t type _ Effect.t += Cancel : io_job Uring.job -> unit Effect.t let enter fn = Effect.perform (Enter fn) +let rec enqueue_job t fn = + match fn () with + | Some _ as r -> r + | None -> + if Uring.submit t.uring > 0 then enqueue_job t fn + else None + (* Cancellations always come from the same domain, so no need to send wake events here. *) -let rec enqueue_cancel job st = +let rec enqueue_cancel job t = Ctf.label "cancel"; - match Uring.cancel st.uring job Cancel_job with - | None -> Queue.push (fun st -> enqueue_cancel job st) st.io_q + match enqueue_job t (fun () -> Uring.cancel t.uring job Cancel_job) with + | None -> Queue.push (fun t -> enqueue_cancel job t) t.io_q | Some _ -> () let cancel job = Effect.perform (Cancel job) @@ -261,15 +268,15 @@ let cancel job = Effect.perform (Cancel job) If the operation completes before Linux processes the cancellation, we get [ENOENT], which we ignore. *) -(* [with_cancel_hook ~action st fn] calls [fn] to create a job, +(* [with_cancel_hook ~action t fn] calls [fn] to create a job, then sets the fiber's cancel function to cancel it. If [action] is already cancelled, it schedules [action] to be discontinued. @return Whether to retry the operation later, once there is space. *) -let with_cancel_hook ~action st fn = +let with_cancel_hook ~action t fn = match Fiber_context.get_error action.Suspended.fiber with - | Some ex -> enqueue_failed_thread st action ex; false + | Some ex -> enqueue_failed_thread t action ex; false | None -> - match fn () with + match enqueue_job t fn with | None -> true | Some job -> Fiber_context.set_cancel_fn action.fiber (fun _ -> cancel job); @@ -364,11 +371,11 @@ let rec enqueue_poll_add_unix fd poll_mask st action cb = if retry then (* wait until an sqe is available *) Queue.push (fun st -> enqueue_poll_add_unix fd poll_mask st action cb) st.io_q -let rec enqueue_close st action fd = +let rec enqueue_close t action fd = Ctf.label "close"; - let subm = Uring.close st.uring fd (Job_no_cancel action) in + let subm = enqueue_job t (fun () -> Uring.close t.uring fd (Job_no_cancel action)) in if subm = None then (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_close st action fd) st.io_q + Queue.push (fun t -> enqueue_close t action fd) t.io_q let enqueue_write st action (file_offset,fd,buf,len) = let file_offset = @@ -483,12 +490,12 @@ let rec enqueue_accept fd client_addr st action = Queue.push (fun st -> enqueue_accept fd client_addr st action) st.io_q ) -let rec enqueue_noop st action = +let rec enqueue_noop t action = Ctf.label "noop"; - let retry = (Uring.noop st.uring (Job_no_cancel action) = None) in - if retry then ( + let job = enqueue_job t (fun () -> Uring.noop t.uring (Job_no_cancel action)) in + if job = None then ( (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_noop st action) st.io_q + Queue.push (fun t -> enqueue_noop t action) t.io_q ) let submit_pending_io st = diff --git a/lib_eio_linux/tests/test.ml b/lib_eio_linux/tests/test.ml index 8f7b265e8..273e1438b 100644 --- a/lib_eio_linux/tests/test.ml +++ b/lib_eio_linux/tests/test.ml @@ -110,6 +110,21 @@ let test_iovec () = ); Alcotest.(check string) "Transfer correct" "Got [foo] and [bar]" (Cstruct.to_string message) +(* We fill the SQE buffer and need to submit early. *) +let test_no_sqe () = + try + Eio_linux.run ~queue_depth:4 @@ fun _stdenv -> + Switch.run @@ fun sw -> + for _ = 1 to 8 do + Fiber.fork ~sw (fun () -> + let r, _w = Eio_unix.pipe sw in + ignore (Eio.Flow.single_read r (Cstruct.create 1) : int); + assert false + ) + done; + raise Exit + with Exit -> () + let () = let open Alcotest in run "eio_linux" [ @@ -119,5 +134,6 @@ let () = test_case "poll_add" `Quick test_poll_add; test_case "poll_add_busy" `Quick test_poll_add_busy; test_case "iovec" `Quick test_iovec; + test_case "no-sqe" `Quick test_no_sqe; ]; ]