Skip to content

Commit

Permalink
Thread pool simplifications
Browse files Browse the repository at this point in the history
- Add `Run_in_systhread` effect. This makes the thread-pool part of the
  scheduler's state, avoiding the need for DLS.

- Remove `max_standby_systhreads_per_domain`. Suggested by Vesa Karvonen.

- Simplify termination. Use one atomic instead of two.

- Use the scheduler's timer to drop idle threads.
  Modified Zzz to allow non-fiber timeouts.
  • Loading branch information
talex5 committed Feb 5, 2024
1 parent d9f2f20 commit 230d533
Show file tree
Hide file tree
Showing 13 changed files with 228 additions and 120 deletions.
2 changes: 1 addition & 1 deletion lib_eio/unix/dune
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
(language c)
(include_dirs include)
(names fork_action stubs))
(libraries eio unix threads mtime.clock.os))
(libraries eio eio.utils unix threads mtime.clock.os))

(rule
(enabled_if %{bin-available:lintcstubs_arity_cmt})
Expand Down
2 changes: 1 addition & 1 deletion lib_eio/unix/eio_unix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ let () =
let sleep d =
Eio.Time.Mono.sleep (Effect.perform Private.Get_monotonic_clock) d

let run_in_systhread = Private.run_in_systhread
let run_in_systhread = Thread_pool.run_in_systhread

module Ipaddr = Net.Ipaddr

Expand Down
8 changes: 6 additions & 2 deletions lib_eio/unix/eio_unix.mli
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ val sleep : float -> unit
It can also be used in programs that don't care about tracking determinism. *)

val run_in_systhread : ?label:string -> (unit -> 'a) -> 'a
(** [run_in_systhread fn] runs the function [fn] in a newly created system thread (a {! Thread.t}).
This allows blocking calls to be made non-blocking.
(** [run_in_systhread fn] runs the function [fn] using a pool of system threads ({! Thread.t}).
This pool creates a new system thread if all threads are busy, it does not wait.
[run_in_systhread] allows blocking calls to be made non-blocking.
@param label The operation name to use in trace output. *)

Expand Down Expand Up @@ -97,6 +99,8 @@ module Private : sig
module Rcfd = Rcfd

module Fork_action = Fork_action

module Thread_pool = Thread_pool
end

module Pi = Pi
2 changes: 1 addition & 1 deletion lib_eio/unix/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ let getnameinfo (sockaddr : Eio.Net.Sockaddr.t) =
| `Udp _ -> [Unix.NI_DGRAM]
in
let sockaddr = sockaddr_to_unix sockaddr in
Private.run_in_systhread ~label:"getnameinfo" (fun () ->
Thread_pool.run_in_systhread ~label:"getnameinfo" (fun () ->
let Unix.{ni_hostname; ni_service} = Unix.getnameinfo sockaddr options in
(ni_hostname, ni_service))

Expand Down
5 changes: 1 addition & 4 deletions lib_eio/unix/private.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,4 @@ let pipe sw = Effect.perform (Pipe sw)

module Rcfd = Rcfd
module Fork_action = Fork_action

let run_in_systhread ?(label="systhread") fn =
Eio.Private.Suspend.enter label @@ fun _ctx enqueue ->
Thread_pool.run_on_systhread ~enqueue fn
module Thread_pool = Thread_pool
173 changes: 101 additions & 72 deletions lib_eio/unix/thread_pool.ml
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
(* This thread pool does not spawn threads in advance,
but up to [max_standby_systhreads_per_domain] threads are
kept alive to wait for more work to arrive.
This number was chosen somewhat arbitrarily but benchmarking
shows it to be a good compromise. *)
let max_standby_systhreads_per_domain = 20
module Zzz = Eio_utils.Zzz

type job =
| New
| Exit
| Job : {
fn: unit -> 'a;
enqueue: ('a, exn) result -> unit;
fn : unit -> 'a;
enqueue : ('a, exn) result -> unit;
} -> job

(* Mailbox with blocking semaphore *)
module Mailbox = struct
type t = {
available: Semaphore.Binary.t;
mutable cell: job;
available : Semaphore.Binary.t;
mutable cell : job;
}

let create () = { available = Semaphore.Binary.make false; cell = New }
Expand All @@ -33,69 +28,103 @@ module Mailbox = struct
mbox.cell
end

(* A lock-free Treiber stack of systhreads on stand-by.
A fresh thread is created if no thread is immediately available.
When the domain exits all thread on stand-by are shutdown. *)
module Free_pool = struct
type list =
| Empty
| Closed
| Free of Mailbox.t * list

type t = list Atomic.t

let rec close_list = function
| Free (x, xs) -> Mailbox.put x Exit; close_list xs
| Empty | Closed -> ()

let close t =
let items = Atomic.exchange t Closed in
close_list items

let rec drop t =
match Atomic.get t with
| Closed | Empty -> ()
| Free _ as items ->
if Atomic.compare_and_set t items Empty then close_list items
else drop t

let rec put t mbox =
match Atomic.get t with
| Closed -> assert false
| (Empty | Free _) as current ->
let next = Free (mbox, current) in
if not (Atomic.compare_and_set t current next) then
put t mbox (* concurrent update, try again *)

let make_thread t =
let mbox = Mailbox.create () in
let _thread : Thread.t = Thread.create (fun () ->
while true do
match Mailbox.take mbox with
| New -> assert false
| Exit -> raise Thread.Exit
| Job { fn; enqueue } ->
(* Ensure thread is in free-pool before enqueuing. *)
let result = try Ok (fn ()) with exn -> Error exn in
put t mbox;
enqueue result
done
) ()
in
mbox

let rec get_thread t =
match Atomic.get t with
| Closed -> invalid_arg "Thread pool closed!"
| Empty -> make_thread t
| Free (mbox, next) as current ->
if Atomic.compare_and_set t current next then mbox
else get_thread t (* concurrent update, try again *)
end

type t = {
threads: (Mailbox.t * int) list Atomic.t;
terminating: bool Atomic.t;
free : Free_pool.t;
sleep_q : Zzz.t;
mutable timeout : Zzz.Key.t option;
}

let create () = { threads = Atomic.make []; terminating = Atomic.make false }

let terminate { threads; terminating } =
Atomic.set terminating true;
List.iter (fun (mbox, _) -> Mailbox.put mbox Exit) (Atomic.get threads)

let rec keep_thread_or_exit ({ threads; _ } as pool) mbox =
match Atomic.get threads with
| (_, count) :: _ when count >= max_standby_systhreads_per_domain ->
(* We've got enough threads on stand-by, so discard the current thread *)
raise Thread.Exit
| current ->
let count = match current with
| [] -> 0
| (_, count) :: _ -> count
type _ Effect.t += Run_in_systhread : (unit -> 'a) -> (('a, exn) result * t) Effect.t

let terminate t =
Free_pool.close t.free;
Option.iter (fun key -> Zzz.remove t.sleep_q key; t.timeout <- None) t.timeout

let create ~sleep_q =
{ free = Atomic.make Free_pool.Empty; sleep_q; timeout = None }

let run t fn =
match fn () with
| x -> terminate t; x
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
terminate t;
Printexc.raise_with_backtrace ex bt

let submit t ~ctx ~enqueue fn =
match Eio.Private.Fiber_context.get_error ctx with
| Some e -> enqueue (Error e)
| None ->
let mbox = Free_pool.get_thread t.free in
Mailbox.put mbox (Job { fn; enqueue })

let run_in_systhread ?(label="systhread") fn =
Eio.Private.Trace.suspend_fiber label;
let r, t = Effect.perform (Run_in_systhread fn) in
if t.timeout = None then (
let time =
Mtime.add_span (Mtime_clock.now ()) Mtime.Span.(20 * ms)
|> Option.value ~default:Mtime.max_stamp
in
if not (Atomic.compare_and_set threads current ((mbox, count + 1) :: current))
then keep_thread_or_exit pool mbox (* concurrent update, try again *)

let make_thread pool =
let mbox = Mailbox.create () in
let _t : Thread.t = Thread.create (fun () ->
while true do
match Mailbox.take mbox with
| New -> assert false
| Exit -> raise Thread.Exit
| Job { fn; enqueue } ->
enqueue (try Ok (fn ()) with exn -> Error exn);
(* We're not yielding inside of [keep_thread_or_exit] so
no need to check [terminating] multiple times *)
if Atomic.get pool.terminating then raise Thread.Exit;
keep_thread_or_exit pool mbox
done
) ()
in
mbox

let rec get_thread ({ threads; _ } as pool) =
match Atomic.get threads with
| [] -> make_thread pool
| ((mbox, _count) :: rest) as current ->
if not (Atomic.compare_and_set threads current rest)
then get_thread pool (* concurrent update, try again *)
else mbox

(* https://v2.ocaml.org/manual/parallelism.html#s:par_systhread_interaction
"Only one systhread at a time is allowed to run OCaml code on a particular domain."
So we keep a separate threadpool per domain. *)
let key =
Domain.DLS.new_key @@ fun () ->
let pool = create () in
Domain.at_exit (fun () -> terminate pool);
pool

let run_on_systhread ~enqueue fn =
let pool = Domain.DLS.get key in
let mbox = get_thread pool in
Mailbox.put mbox (Job { fn; enqueue })
t.timeout <- Some (Zzz.add t.sleep_q time (Fn (fun () -> Free_pool.drop t.free; t.timeout <- None)))
);
match r with
| Ok x -> x
| Error ex -> raise ex
26 changes: 25 additions & 1 deletion lib_eio/unix/thread_pool.mli
Original file line number Diff line number Diff line change
@@ -1 +1,25 @@
val run_on_systhread : enqueue:(('a, exn) result -> unit) -> (unit -> 'a) -> unit
(** A pool of systhreads, to avoid the overhead of creating a new thread for each operation. *)

type t

val create : sleep_q:Eio_utils.Zzz.t -> t
(** [create ~sleep_q] is a new thread pool.
[sleep_q] is used to register a clean-up task to finish idle threads. *)

val run : t -> (unit -> 'a) -> 'a
(** [run t fn] runs [fn ()] and then marks [t] as closed, releasing all idle threads. *)

val submit :
t ->
ctx:Eio.Private.Fiber_context.t ->
enqueue:(('a, exn) result -> unit) ->
(unit -> 'a) ->
unit
(** [submit t ~ctx ~enqueue fn] starts running [fn] in a sys-thread, which uses [enqueue] to return the result.
If [ctx] is already cancelled then the error is passed to [enqueue] immediately.
Systhreads do not respond to cancellation once running. *)

type _ Effect.t += Run_in_systhread : (unit -> 'a) -> (('a, exn) result * t) Effect.t
val run_in_systhread : ?label:string -> (unit -> 'a) -> 'a
20 changes: 14 additions & 6 deletions lib_eio/utils/zzz.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ module Key = struct
let compare = Optint.Int63.compare
end

type item =
| Fiber of unit Suspended.t
| Fn of (unit -> unit)

module Job = struct
type t = {
time : Mtime.t;
thread : unit Suspended.t;
item : item;
}

let compare a b = Mtime.compare a.time b.time
Expand All @@ -21,10 +25,10 @@ type t = {

let create () = { sleep_queue = Q.empty; next_id = Optint.Int63.zero }

let add t time thread =
let add t time item =
let id = t.next_id in
t.next_id <- Optint.Int63.succ t.next_id;
let sleeper = { Job.time; thread } in
let sleeper = { Job.time; item } in
t.sleep_queue <- Q.add id sleeper t.sleep_queue;
id

Expand All @@ -33,9 +37,13 @@ let remove t id =

let pop t ~now =
match Q.min t.sleep_queue with
| Some (_, { Job.time; thread }) when time <= now ->
Eio.Private.Fiber_context.clear_cancel_fn thread.fiber;
| Some (_, { Job.time; item }) when time <= now ->
begin
match item with
| Fiber k -> Eio.Private.Fiber_context.clear_cancel_fn k.fiber
| Fn _ -> ()
end;
t.sleep_queue <- Option.get (Q.rest t.sleep_queue);
`Due thread
`Due item
| Some (_, { Job.time; _ }) -> `Wait_until time
| None -> `Nothing
23 changes: 14 additions & 9 deletions lib_eio/utils/zzz.mli
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,24 @@ end
type t
(** A set of timers (implemented as a priority queue). *)

type item =
| Fiber of unit Suspended.t
| Fn of (unit -> unit)

val create : unit -> t
(** [create ()] is a fresh empty queue. *)

val add : t -> Mtime.t -> unit Suspended.t -> Key.t
(** [add t time thread] adds a new event, due at [time], and returns its ID.
You must use {!Eio.Private.Fiber_context.set_cancel_fn} on [thread] before
calling {!pop}.
Your cancel function should call {!remove} (in addition to resuming [thread]). *)
val add : t -> Mtime.t -> item -> Key.t
(** [add t time item] adds a new event, due at [time], and returns its ID.
If [item] is a {!Fiber},
you must use {!Eio.Private.Fiber_context.set_cancel_fn} on it before calling {!pop}.
Your cancel function should call {!remove} (in addition to resuming it). *)

val remove : t -> Key.t -> unit
(** [remove t key] removes an event previously added with [add]. *)

val pop : t -> now:Mtime.t -> [`Due of unit Suspended.t | `Wait_until of Mtime.t | `Nothing]
(** [pop ~now t] removes and returns the earliest thread due by [now].
It also clears the thread's cancel function.
If no thread is due yet, it returns the time the earliest thread becomes due. *)
val pop : t -> now:Mtime.t -> [`Due of item | `Wait_until of Mtime.t | `Nothing]
(** [pop ~now t] removes and returns the earliest item due by [now].
For fibers, it also clears the thread's cancel function.
If no item is due yet, it returns the time the earliest item becomes due. *)
2 changes: 1 addition & 1 deletion lib_eio_linux/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ let noop () =

let sleep_until time =
Sched.enter "sleep" @@ fun t k ->
let job = Eio_utils.Zzz.add t.sleep_q time k in
let job = Eio_utils.Zzz.add t.sleep_q time (Fiber k) in
Eio.Private.Fiber_context.set_cancel_fn k.fiber (fun ex ->
Eio_utils.Zzz.remove t.sleep_q job;
Sched.enqueue_failed_thread t k ex
Expand Down
Loading

0 comments on commit 230d533

Please sign in to comment.