Skip to content

Commit

Permalink
Thread pool simplifications
Browse files Browse the repository at this point in the history
- Add `Run_in_systhread` effect. This makes the thread-pool part of the
  scheduler's state, avoiding the need for DLS.

- Remove `max_standby_systhreads_per_domain`. Suggested by Vesa Karvonen.

- Simplify termination. Use one atomic instead of two.

- Use the scheduler's timer to drop idle threads.
  Modified Zzz to allow non-fiber timeouts.
  • Loading branch information
talex5 committed Feb 3, 2024
1 parent d9f2f20 commit 4c768d1
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 113 deletions.
2 changes: 1 addition & 1 deletion lib_eio/unix/dune
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
(language c)
(include_dirs include)
(names fork_action stubs))
(libraries eio unix threads mtime.clock.os))
(libraries eio eio.utils unix threads mtime.clock.os))

(rule
(enabled_if %{bin-available:lintcstubs_arity_cmt})
Expand Down
9 changes: 7 additions & 2 deletions lib_eio/unix/eio_unix.mli
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ val sleep : float -> unit
It can also be used in programs that don't care about tracking determinism. *)

val run_in_systhread : ?label:string -> (unit -> 'a) -> 'a
(** [run_in_systhread fn] runs the function [fn] in a newly created system thread (a {! Thread.t}).
This allows blocking calls to be made non-blocking.
(** [run_in_systhread fn] runs the function [fn] using a pool of system threads ({! Thread.t}).
This pool creates a new system thread if all threads are busy, it does not wait.
[run_in_systhread] allows blocking calls to be made non-blocking.
@param label The operation name to use in trace output. *)

Expand Down Expand Up @@ -93,10 +95,13 @@ module Private : sig
| Await_writable : Unix.file_descr -> unit Effect.t (** See {!await_writable} *)
| Get_monotonic_clock : Eio.Time.Mono.ty r Effect.t
| Pipe : Eio.Switch.t -> (source_ty r * sink_ty r) Effect.t (** See {!pipe} *)
| Run_in_systhread : (unit -> 'a) -> 'a Effect.t (** See {!run_in_systhread} *)

module Rcfd = Rcfd

module Fork_action = Fork_action

module Thread_pool = Thread_pool
end

module Pi = Pi
6 changes: 4 additions & 2 deletions lib_eio/unix/private.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type _ Effect.t +=
| Await_writable : Unix.file_descr -> unit Effect.t
| Get_monotonic_clock : Eio.Time.Mono.ty r Effect.t
| Pipe : Switch.t -> (source_ty r * sink_ty r) Effect.t
| Run_in_systhread : (unit -> 'a) -> 'a Effect.t

let await_readable fd = Effect.perform (Await_readable fd)
let await_writable fd = Effect.perform (Await_writable fd)
Expand All @@ -16,7 +17,8 @@ let pipe sw = Effect.perform (Pipe sw)

module Rcfd = Rcfd
module Fork_action = Fork_action
module Thread_pool = Thread_pool

let run_in_systhread ?(label="systhread") fn =
Eio.Private.Suspend.enter label @@ fun _ctx enqueue ->
Thread_pool.run_on_systhread ~enqueue fn
Eio.Private.Trace.suspend_fiber label;
Effect.perform (Run_in_systhread fn)
156 changes: 87 additions & 69 deletions lib_eio/unix/thread_pool.ml
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
(* This thread pool does not spawn threads in advance,
but up to [max_standby_systhreads_per_domain] threads are
kept alive to wait for more work to arrive.
This number was chosen somewhat arbitrarily but benchmarking
shows it to be a good compromise. *)
let max_standby_systhreads_per_domain = 20
module Zzz = Eio_utils.Zzz

type job =
| New
Expand Down Expand Up @@ -33,69 +28,92 @@ module Mailbox = struct
mbox.cell
end

(* A lock-free Treiber stack of systhreads on stand-by.
A fresh thread is created if no thread is immediately available.
When the domain exits all thread on stand-by are shutdown. *)
module Free_pool = struct
type list =
| Empty
| Closed
| Free of Mailbox.t * list

type t = list Atomic.t

let rec close_list = function
| Free (x, xs) -> Mailbox.put x Exit; close_list xs
| Empty | Closed -> ()

let close t =
let items = Atomic.exchange t Closed in
close_list items

let rec drop t =
match Atomic.get t with
| Closed | Empty -> ()
| Free _ as items ->
if Atomic.compare_and_set t items Empty then close_list items
else drop t

let rec keep_thread_or_exit t mbox =
match Atomic.get t with
| Closed -> raise Thread.Exit
| (Empty | Free _) as current ->
let next = Free (mbox, current) in
if not (Atomic.compare_and_set t current next) then
keep_thread_or_exit t mbox (* concurrent update, try again *)

let make_thread t =
let mbox = Mailbox.create () in
let _thread : Thread.t = Thread.create (fun () ->
while true do
match Mailbox.take mbox with
| New -> assert false
| Exit -> raise Thread.Exit
| Job { fn; enqueue } ->
enqueue (try Ok (fn ()) with exn -> Error exn);
keep_thread_or_exit t mbox
done
) ()
in
mbox

let rec get_thread t =
match Atomic.get t with
| Closed -> invalid_arg "Thread pool closed!"
| Empty -> make_thread t
| Free (mbox, next) as current ->
if Atomic.compare_and_set t current next then mbox
else get_thread t (* concurrent update, try again *)
end

type t = {
threads: (Mailbox.t * int) list Atomic.t;
terminating: bool Atomic.t;
free : Free_pool.t;
sleep_q : Zzz.t;
mutable timeout : Zzz.Key.t option;
}

let create () = { threads = Atomic.make []; terminating = Atomic.make false }

let terminate { threads; terminating } =
Atomic.set terminating true;
List.iter (fun (mbox, _) -> Mailbox.put mbox Exit) (Atomic.get threads)

let rec keep_thread_or_exit ({ threads; _ } as pool) mbox =
match Atomic.get threads with
| (_, count) :: _ when count >= max_standby_systhreads_per_domain ->
(* We've got enough threads on stand-by, so discard the current thread *)
raise Thread.Exit
| current ->
let count = match current with
| [] -> 0
| (_, count) :: _ -> count
in
if not (Atomic.compare_and_set threads current ((mbox, count + 1) :: current))
then keep_thread_or_exit pool mbox (* concurrent update, try again *)

let make_thread pool =
let mbox = Mailbox.create () in
let _t : Thread.t = Thread.create (fun () ->
while true do
match Mailbox.take mbox with
| New -> assert false
| Exit -> raise Thread.Exit
| Job { fn; enqueue } ->
enqueue (try Ok (fn ()) with exn -> Error exn);
(* We're not yielding inside of [keep_thread_or_exit] so
no need to check [terminating] multiple times *)
if Atomic.get pool.terminating then raise Thread.Exit;
keep_thread_or_exit pool mbox
done
) ()
in
mbox

let rec get_thread ({ threads; _ } as pool) =
match Atomic.get threads with
| [] -> make_thread pool
| ((mbox, _count) :: rest) as current ->
if not (Atomic.compare_and_set threads current rest)
then get_thread pool (* concurrent update, try again *)
else mbox

(* https://v2.ocaml.org/manual/parallelism.html#s:par_systhread_interaction
"Only one systhread at a time is allowed to run OCaml code on a particular domain."
So we keep a separate threadpool per domain. *)
let key =
Domain.DLS.new_key @@ fun () ->
let pool = create () in
Domain.at_exit (fun () -> terminate pool);
pool

let run_on_systhread ~enqueue fn =
let pool = Domain.DLS.get key in
let mbox = get_thread pool in
Mailbox.put mbox (Job { fn; enqueue })
let terminate t =
Free_pool.close t.free;
Option.iter (fun key -> Zzz.remove t.sleep_q key; t.timeout <- None) t.timeout

let create ~sleep_q =
{ free = Atomic.make Free_pool.Empty; sleep_q; timeout = None }

let run t fn =
match fn () with
| x -> terminate t; x
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
terminate t;
Printexc.raise_with_backtrace ex bt

let submit t ~ctx ~enqueue fn =
match Eio.Private.Fiber_context.get_error ctx with
| Some e -> enqueue (Error e)
| None ->
let mbox = Free_pool.get_thread t.free in
Mailbox.put mbox (Job { fn; enqueue });
if t.timeout = None then (
let time =
Mtime.add_span (Mtime_clock.now ()) Mtime.Span.(20 * ms)
|> Option.value ~default:Mtime.max_stamp
in
t.timeout <- Some (Zzz.add t.sleep_q time (Fn (fun () -> Free_pool.drop t.free)))
)
22 changes: 21 additions & 1 deletion lib_eio/unix/thread_pool.mli
Original file line number Diff line number Diff line change
@@ -1 +1,21 @@
val run_on_systhread : enqueue:(('a, exn) result -> unit) -> (unit -> 'a) -> unit
(** A pool of systhreads, to avoid the overhead of creating a new thread for each operation. *)

type t

val create : sleep_q:Eio_utils.Zzz.t -> t
(** [create ~sleep_q] is a new thread pool.
[sleep_q] is used to register a clean-up task to finish idle threads. *)

val run : t -> (unit -> 'a) -> 'a
(** [run t fn] runs [fn ()] and then calls marks [t] as closed, releasing all threads. *)

val submit :
t ->
ctx:Eio.Private.Fiber_context.t ->
enqueue:(('a, exn) result -> unit) ->
(unit -> 'a) ->
unit
(** [submit t ~ctx ~enqueue fn] starts running [fn] in a sys-thread, which uses [enqueue] to return the result.
If [ctx] is cancelled then the error is passed to [enqueue] immediately. *)
20 changes: 14 additions & 6 deletions lib_eio/utils/zzz.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ module Key = struct
let compare = Optint.Int63.compare
end

type item =
| Fiber of unit Suspended.t
| Fn of (unit -> unit)

module Job = struct
type t = {
time : Mtime.t;
thread : unit Suspended.t;
item : item;
}

let compare a b = Mtime.compare a.time b.time
Expand All @@ -21,10 +25,10 @@ type t = {

let create () = { sleep_queue = Q.empty; next_id = Optint.Int63.zero }

let add t time thread =
let add t time item =
let id = t.next_id in
t.next_id <- Optint.Int63.succ t.next_id;
let sleeper = { Job.time; thread } in
let sleeper = { Job.time; item } in
t.sleep_queue <- Q.add id sleeper t.sleep_queue;
id

Expand All @@ -33,9 +37,13 @@ let remove t id =

let pop t ~now =
match Q.min t.sleep_queue with
| Some (_, { Job.time; thread }) when time <= now ->
Eio.Private.Fiber_context.clear_cancel_fn thread.fiber;
| Some (_, { Job.time; item }) when time <= now ->
begin
match item with
| Fiber k -> Eio.Private.Fiber_context.clear_cancel_fn k.fiber
| Fn _ -> ()
end;
t.sleep_queue <- Option.get (Q.rest t.sleep_queue);
`Due thread
`Due item
| Some (_, { Job.time; _ }) -> `Wait_until time
| None -> `Nothing
23 changes: 14 additions & 9 deletions lib_eio/utils/zzz.mli
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,24 @@ end
type t
(** A set of timers (implemented as a priority queue). *)

type item =
| Fiber of unit Suspended.t
| Fn of (unit -> unit)

val create : unit -> t
(** [create ()] is a fresh empty queue. *)

val add : t -> Mtime.t -> unit Suspended.t -> Key.t
(** [add t time thread] adds a new event, due at [time], and returns its ID.
You must use {!Eio.Private.Fiber_context.set_cancel_fn} on [thread] before
calling {!pop}.
Your cancel function should call {!remove} (in addition to resuming [thread]). *)
val add : t -> Mtime.t -> item -> Key.t
(** [add t time item] adds a new event, due at [time], and returns its ID.
If [item] is a {!Fiber},
you must use {!Eio.Private.Fiber_context.set_cancel_fn} on it before calling {!pop}.
Your cancel function should call {!remove} (in addition to resuming it). *)

val remove : t -> Key.t -> unit
(** [remove t key] removes an event previously added with [add]. *)

val pop : t -> now:Mtime.t -> [`Due of unit Suspended.t | `Wait_until of Mtime.t | `Nothing]
(** [pop ~now t] removes and returns the earliest thread due by [now].
It also clears the thread's cancel function.
If no thread is due yet, it returns the time the earliest thread becomes due. *)
val pop : t -> now:Mtime.t -> [`Due of item | `Wait_until of Mtime.t | `Nothing]
(** [pop ~now t] removes and returns the earliest item due by [now].
For fibers, it also clears the thread's cancel function.
If no item is due yet, it returns the time the earliest item becomes due. *)
2 changes: 1 addition & 1 deletion lib_eio_linux/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ let noop () =

let sleep_until time =
Sched.enter "sleep" @@ fun t k ->
let job = Eio_utils.Zzz.add t.sleep_q time k in
let job = Eio_utils.Zzz.add t.sleep_q time (Fiber k) in
Eio.Private.Fiber_context.set_cancel_fn k.fiber (fun ex ->
Eio_utils.Zzz.remove t.sleep_q job;
Sched.enqueue_failed_thread t k ex
Expand Down
22 changes: 19 additions & 3 deletions lib_eio_linux/sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type t = {
need_wakeup : bool Atomic.t;

sleep_q: Zzz.t;

thread_pool : Eio_unix.Private.Thread_pool.t;
}

type _ Effect.t +=
Expand Down Expand Up @@ -213,8 +215,12 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] =
let now = Mtime_clock.now () in
match Zzz.pop ~now sleep_q with
| `Due k ->
(* A sleeping task is now due *)
Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *)
Suspended.continue k () (* A sleeping task is now due *)
begin match k with
| Fiber k -> Suspended.continue k ()
| Fn fn -> fn (); schedule st
end
| `Wait_until _ | `Nothing as next_due ->
(* Handle any pending events before submitting. This is faster. *)
match Uring.get_cqe_nonblocking uring with
Expand Down Expand Up @@ -447,6 +453,15 @@ let run ~extra_effects st main arg =
);
schedule st
)
| Eio_unix.Private.Run_in_systhread fn -> Some (fun k ->
let k = { Suspended.k; fiber } in
let enqueue = function
| Ok v -> enqueue_thread st k v
| Error ex -> enqueue_failed_thread st k ex
in
Eio_unix.Private.Thread_pool.submit st.thread_pool ~ctx:fiber ~enqueue fn;
schedule st
)
| Alloc -> Some (fun k ->
match st.mem with
| None -> continue k None
Expand Down Expand Up @@ -475,7 +490,7 @@ let run ~extra_effects st main arg =
fork ~new_fiber (fun () ->
Switch.run_protected ~name:"eio_linux" (fun sw ->
Fiber.fork_daemon ~sw (fun () -> monitor_event_fd st);
match main arg with
match Eio_unix.Private.Thread_pool.run st.thread_pool (fun () -> main arg) with
| x -> result := Some (Ok x)
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
Expand Down Expand Up @@ -548,7 +563,8 @@ let with_sched ?(fallback=no_fallback) config fn =
let io_q = Queue.create () in
let mem_q = Queue.create () in
with_eventfd @@ fun eventfd ->
fn { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q }
let thread_pool = Eio_unix.Private.Thread_pool.create ~sleep_q in
fn { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q; thread_pool }
with
| x -> Uring.exit uring; x
| exception ex ->
Expand Down
Loading

0 comments on commit 4c768d1

Please sign in to comment.