Skip to content

Commit

Permalink
eio_posix and eio_windows: check for IO periodically
Browse files Browse the repository at this point in the history
These already had code to check periodically for timeouts, but forgot to
check for IO at the same time.
  • Loading branch information
talex5 committed Jan 26, 2024
1 parent b68d396 commit 442a379
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 44 deletions.
43 changes: 22 additions & 21 deletions lib_eio_posix/sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -203,27 +203,28 @@ let rec next t : [`Exit_scheduler] =
`Exit_scheduler
) else (
Atomic.set t.need_wakeup true;
if Lf_queue.is_empty t.run_q then (
(* At this point we're not going to check [run_q] again before sleeping.
If [need_wakeup] is still [true], this is fine because we don't promise to do that.
If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *)
Trace.suspend_domain Begin;
let nready =
try Poll.ppoll_or_poll t.poll (t.poll_maxi + 1) timeout
with Unix.Unix_error (Unix.EINTR, _, "") -> 0
in
Trace.suspend_domain End;
Atomic.set t.need_wakeup false;
Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *)
Poll.iter_ready t.poll nready (ready t);
next t
) else (
(* Someone added a new job while we were setting [need_wakeup] to [true].
They might or might not have seen that, so we can't be sure they'll send an event. *)
Atomic.set t.need_wakeup false;
Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *)
next t
)
let timeout =
if Lf_queue.is_empty t.run_q then timeout
else (
(* Either we're just checking for IO to avoid starvation, or
someone added a new job while we were setting [need_wakeup] to [true].
They might or might not have seen that, so we can't be sure they'll send an event. *)
Poll.Nowait
)
in
(* At this point we're not going to check [run_q] again before sleeping.
If [need_wakeup] is still [true], this is fine because we don't promise to do that.
If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *)
Trace.suspend_domain Begin;
let nready =
try Poll.ppoll_or_poll t.poll (t.poll_maxi + 1) timeout
with Unix.Unix_error (Unix.EINTR, _, "") -> 0
in
Trace.suspend_domain End;
Atomic.set t.need_wakeup false;
Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *)
Poll.iter_ready t.poll nready (ready t);
next t
)

let with_sched fn =
Expand Down
47 changes: 24 additions & 23 deletions lib_eio_windows/sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -198,32 +198,33 @@ let rec next t : [`Exit_scheduler] =
`Exit_scheduler
) else (
Atomic.set t.need_wakeup true;
if Lf_queue.is_empty t.run_q then (
(* At this point we're not going to check [run_q] again before sleeping.
If [need_wakeup] is still [true], this is fine because we don't promise to do that.
If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *)
Trace.suspend_domain Begin;
let cons fd acc = fd :: acc in
let read = FdSet.fold cons t.poll.to_read [] in
let write = FdSet.fold cons t.poll.to_write [] in
match Unix.select read write [] timeout with
| exception Unix.(Unix_error (EINTR, _, _)) ->
Trace.suspend_domain End;
next t
| readable, writeable, _ ->
Trace.suspend_domain End;
Atomic.set t.need_wakeup false;
Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *)
List.iter (ready t [ `W ]) writeable;
List.iter (ready t [ `R ]) readable;
next t
) else (
(* Someone added a new job while we were setting [need_wakeup] to [true].
They might or might not have seen that, so we can't be sure they'll send an event. *)
let timeout =
if Lf_queue.is_empty t.run_q then timeout
else (
(* Either we're just checking for IO to avoid starvation, or
someone added a new job while we were setting [need_wakeup] to [true].
They might or might not have seen that, so we can't be sure they'll send an event. *)
0.0
)
in
(* At this point we're not going to check [run_q] again before sleeping.
If [need_wakeup] is still [true], this is fine because we don't promise to do that.
If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *)
Trace.suspend_domain Begin;
let cons fd acc = fd :: acc in
let read = FdSet.fold cons t.poll.to_read [] in
let write = FdSet.fold cons t.poll.to_write [] in
match Unix.select read write [] timeout with
| exception Unix.(Unix_error (EINTR, _, _)) ->
Trace.suspend_domain End;
next t
| readable, writeable, _ ->
Trace.suspend_domain End;
Atomic.set t.need_wakeup false;
Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *)
List.iter (ready t [ `W ]) writeable;
List.iter (ready t [ `R ]) readable;
next t
)
)

let with_sched fn =
Expand Down
23 changes: 23 additions & 0 deletions tests/flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,26 @@ Sending a very long vector over a flow should just send it in chunks, not fail:
+Read 30000 bytes
- : unit = ()
```

## Starvation

Even if a fiber is already ready to run, we still perform IO from time to time:

```ocaml
# run @@ fun _ ->
Switch.run @@ fun sw ->
let r, w = Eio_unix.pipe sw in
let rec spin () = Fiber.yield (); spin () in
Fiber.fork_daemon ~sw spin;
Fiber.both
(fun () ->
let buf = Cstruct.create 3 in
Eio.Flow.read_exact r buf;
traceln "Got %S" (Cstruct.to_string buf)
)
(fun () ->
Eio.Flow.write w [Cstruct.of_string "msg"]
)
+Got "msg"
- : unit = ()
```

0 comments on commit 442a379

Please sign in to comment.