Skip to content

Commit

Permalink
fix: Initialize async IO thread lazily (#8122)
Browse files Browse the repository at this point in the history
* fix: Initialize async IO thread lazily

The goal of that change is to avoid effects of threads when they are not
actually needed.

Signed-off-by: Etienne Millon <me@emillon.org>
  • Loading branch information
emillon committed Jul 6, 2023
1 parent bfeabc0 commit 49fbbfb
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 18 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
- Disable background operations and threaded console on MacOS and other Unixes
where we rely on fork. (#8100, #8121, fixes #8083, @rgrinberg, @emillon)

- Initialize async IO thread lazily. (#8122, @emillon)

3.9.0 (2023-06-28)
------------------

Expand Down
57 changes: 39 additions & 18 deletions src/dune_async_io/async_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type t =
; mutex : Mutex.t
; scheduler : (module Scheduler)
; mutable running : bool
; mutable started : bool
; (* this flag is to save a write to the pipe we used to interrupt select *)
mutable interrupting : bool
; pipe_buf : Bytes.t
Expand Down Expand Up @@ -208,10 +209,42 @@ let rec select_loop t =
Scheduler.fill_jobs fills);
select_loop t

let t_var = Fiber.Var.create ()
let start t =
let module Scheduler = (val t.scheduler : Scheduler) in
Scheduler.spawn_thread (fun () ->
Mutex.lock t.mutex;
Exn.protect
~f:(fun () -> select_loop t)
~finally:(fun () -> Mutex.unlock t.mutex))

module T_var : sig
(** Wrap the global t_var so that it is started whenever requested. *)

val get_exn : unit -> t Fiber.t

val setup : t -> (unit -> 'a Fiber.t) -> 'a Fiber.t
end = struct
let t_var = Fiber.Var.create ()

let get_exn () =
let+ t = Fiber.Var.get_exn t_var in
if not t.started then (
start t;
t.started <- true);
t

let setup t f =
Fiber.Var.set t_var t (fun () ->
Fiber.finalize f ~finally:(fun () ->
if t.started then (
Mutex.lock t.mutex;
t.running <- false;
interrupt t;
Mutex.unlock t.mutex);
Fiber.return ()))
end

let with_io scheduler f =
let module Scheduler = (val scheduler : Scheduler) in
let t =
let pipe_read, pipe_write =
if not Sys.win32 then Unix.pipe ~cloexec:true ()
Expand All @@ -234,25 +267,13 @@ let with_io scheduler f =
; pipe_buf = Bytes.create 512
; interrupting = false
; to_close = []
; started = false
}
in
let () =
Scheduler.spawn_thread (fun () ->
Mutex.lock t.mutex;
Exn.protect
~f:(fun () -> select_loop t)
~finally:(fun () -> Mutex.unlock t.mutex))
in
Fiber.Var.set t_var t (fun () ->
Fiber.finalize f ~finally:(fun () ->
Mutex.lock t.mutex;
t.running <- false;
interrupt t;
Mutex.unlock t.mutex;
Fiber.return ()))
T_var.setup t f

let with_ f =
let+ t = Fiber.Var.get_exn t_var in
let+ t = T_var.get_exn () in
Mutex.lock t.mutex;
Exn.protect ~f:(fun () -> f t) ~finally:(fun () -> Mutex.unlock t.mutex)

Expand All @@ -268,7 +289,7 @@ let cancel_fd scheduler table fd =
Fiber.Ivar.fill t.ivar (Error `Cancelled))

let close fd =
let* t = Fiber.Var.get_exn t_var in
let* t = T_var.get_exn () in
Mutex.lock t.mutex;
(* everything below is guaranteed not to raise so the mutex will be unlocked
in the end. There's no need to use [protect] to make sure we don't deadlock *)
Expand Down

0 comments on commit 49fbbfb

Please sign in to comment.