Skip to content

Commit

Permalink
Add cancellation to all browser IO
Browse files Browse the repository at this point in the history
  • Loading branch information
patricoferris committed Jan 13, 2023
1 parent 9ca66c7 commit 1654f56
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions lib_eio_js/browser/eio_browser.ml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ module Suspended = struct
Ctf.note_switch (tid t);
Effect.Deep.continue t.k v

let _discontinue t ex =
let discontinue t ex =
Ctf.note_switch (tid t);
Effect.Deep.discontinue t.k ex
end
Expand Down Expand Up @@ -90,8 +90,8 @@ end

type t = {
(* Suspended fibers waiting to run again.
[Lf_queue] is like [Stdlib.Queue], but is thread-safe (lock-free) and
allows pushing items to the head too, which we need. *)
[Run_queue] is like [Stdlib.Queue], but allows pushing items
to the head too, which we need. *)
mutable run_q : (unit -> unit) Run_queue.t;
mutable pending_io : int;
mutable scheduler : Scheduler.t option;
Expand All @@ -102,14 +102,19 @@ let enqueue_thread t k v =
t.pending_io <- t.pending_io - 1;
Option.iter Scheduler.wakeup t.scheduler

let enqueue_failed_thread t k v =
Run_queue.push t.run_q (fun () -> Suspended.discontinue k v);
t.pending_io <- t.pending_io - 1;
Option.iter Scheduler.wakeup t.scheduler


type _ Effect.t += Enter_unchecked : (t -> 'a Suspended.t -> unit) -> 'a Effect.t
let enter_unchecked fn = Effect.perform (Enter_unchecked fn)

let enter_io fn =
enter_unchecked @@ fun st k ->
st.pending_io <- st.pending_io + 1;
fn (enqueue_thread st k)
fn st k

(* Resume the next runnable fiber, if any. *)
let schedule t : unit =
Expand All @@ -121,21 +126,34 @@ let schedule t : unit =
end

module Timeout = struct
let set_timeout ~ms f = ignore (G.set_timeout ~ms f)

let sleep ~ms =
enter_io @@ set_timeout ~ms
enter_io @@ fun st k ->
let timer = ref None in
Fiber_context.set_cancel_fn k.fiber (fun exn -> Option.iter G.stop_timer !timer; enqueue_failed_thread st k exn);
let id = G.set_timeout ~ms (fun () ->
Fiber_context.clear_cancel_fn k.fiber;
enqueue_thread st k ()
) in
timer := Some id
end

let await fut =
enter_io @@ Fut.await fut
enter_io @@ fun st k ->
let cancelled = ref true in
Fiber_context.set_cancel_fn k.fiber (fun exn -> cancelled := true; enqueue_failed_thread st k exn);
Fut.await fut (fun v ->
Fiber_context.clear_cancel_fn k.fiber;
if not !cancelled then enqueue_thread st k v
)

let next_event : 'a Brr.Ev.type' -> Brr.Ev.target -> 'a Brr.Ev.t = fun typ target ->
let opts = Brr.Ev.listen_opts ~once:true () in
let listen fn =
ignore (Brr.Ev.listen ~opts typ fn target : Brr.Ev.listener)
in
enter_io listen
let listen fn = Brr.Ev.listen ~opts typ fn target in
enter_io @@ fun st k ->
let listener = ref None in
Fiber_context.set_cancel_fn k.fiber (fun exn -> Option.iter Ev.unlisten !listener; enqueue_failed_thread st k exn);
let v = listen (fun v -> enqueue_thread st k v) in
listener := Some v

(* Largely based on the Eio_mock.Backend event loop. *)
let run main =
Expand Down

0 comments on commit 1654f56

Please sign in to comment.