From ad183c578f0c9915e9f3a243f9aaf9481d2c1ecc Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Thu, 1 Feb 2024 15:08:26 +0000 Subject: [PATCH] Thread pool simplifications - Add `Run_in_systhread` effect. This makes the thread-pool part of the scheduler's state, avoiding the need for DLS. - Remove `max_standby_systhreads_per_domain`. Suggested by Vesa Karvonen. - Simplify termination. Use one atomic instead of two. - Use the scheduler's timer to drop idle threads. Modified Zzz to allow non-fiber timeouts. --- lib_eio/unix/dune | 2 +- lib_eio/unix/eio_unix.ml | 2 +- lib_eio/unix/eio_unix.mli | 8 +- lib_eio/unix/net.ml | 2 +- lib_eio/unix/private.ml | 5 +- lib_eio/unix/thread_pool.ml | 172 ++++++++++++++++++++--------------- lib_eio/unix/thread_pool.mli | 26 +++++- lib_eio/utils/zzz.ml | 20 ++-- lib_eio/utils/zzz.mli | 23 +++-- lib_eio_linux/low_level.ml | 2 +- lib_eio_linux/sched.ml | 19 +++- lib_eio_posix/sched.ml | 26 ++++-- lib_eio_windows/sched.ml | 40 +++++--- 13 files changed, 227 insertions(+), 120 deletions(-) diff --git a/lib_eio/unix/dune b/lib_eio/unix/dune index 446a4fa2a..32f8e0f2b 100644 --- a/lib_eio/unix/dune +++ b/lib_eio/unix/dune @@ -5,7 +5,7 @@ (language c) (include_dirs include) (names fork_action stubs)) - (libraries eio unix threads mtime.clock.os)) + (libraries eio eio.utils unix threads mtime.clock.os)) (rule (enabled_if %{bin-available:lintcstubs_arity_cmt}) diff --git a/lib_eio/unix/eio_unix.ml b/lib_eio/unix/eio_unix.ml index 45325817e..78bd48037 100644 --- a/lib_eio/unix/eio_unix.ml +++ b/lib_eio/unix/eio_unix.ml @@ -22,7 +22,7 @@ let () = let sleep d = Eio.Time.Mono.sleep (Effect.perform Private.Get_monotonic_clock) d -let run_in_systhread = Private.run_in_systhread +let run_in_systhread = Thread_pool.run_in_systhread module Ipaddr = Net.Ipaddr diff --git a/lib_eio/unix/eio_unix.mli b/lib_eio/unix/eio_unix.mli index 6be8c93a2..fd450a3da 100644 --- a/lib_eio/unix/eio_unix.mli +++ b/lib_eio/unix/eio_unix.mli @@ -51,8 +51,10 @@ val sleep : float -> unit It can also be used in programs that don't care about tracking determinism. *) val run_in_systhread : ?label:string -> (unit -> 'a) -> 'a -(** [run_in_systhread fn] runs the function [fn] in a newly created system thread (a {! Thread.t}). - This allows blocking calls to be made non-blocking. +(** [run_in_systhread fn] runs the function [fn] using a pool of system threads ({! Thread.t}). + + This pool creates a new system thread if all threads are busy, it does not wait. + [run_in_systhread] allows blocking calls to be made non-blocking. @param label The operation name to use in trace output. *) @@ -98,6 +100,8 @@ module Private : sig module Fork_action = Fork_action + module Thread_pool = Thread_pool + val read_link : Fd.t option -> string -> string end diff --git a/lib_eio/unix/net.ml b/lib_eio/unix/net.ml index 05e3af229..d32dabe3a 100644 --- a/lib_eio/unix/net.ml +++ b/lib_eio/unix/net.ml @@ -51,7 +51,7 @@ let getnameinfo (sockaddr : Eio.Net.Sockaddr.t) = | `Udp _ -> [Unix.NI_DGRAM] in let sockaddr = sockaddr_to_unix sockaddr in - Private.run_in_systhread ~label:"getnameinfo" (fun () -> + Thread_pool.run_in_systhread ~label:"getnameinfo" (fun () -> let Unix.{ni_hostname; ni_service} = Unix.getnameinfo sockaddr options in (ni_hostname, ni_service)) diff --git a/lib_eio/unix/private.ml b/lib_eio/unix/private.ml index e12107f7f..5964af7ba 100644 --- a/lib_eio/unix/private.ml +++ b/lib_eio/unix/private.ml @@ -16,10 +16,7 @@ let pipe sw = Effect.perform (Pipe sw) module Rcfd = Rcfd module Fork_action = Fork_action - -let run_in_systhread ?(label="systhread") fn = - Eio.Private.Suspend.enter label @@ fun _ctx enqueue -> - Thread_pool.run_on_systhread ~enqueue fn +module Thread_pool = Thread_pool external eio_readlinkat : Unix.file_descr -> string -> Cstruct.t -> int = "eio_unix_readlinkat" diff --git a/lib_eio/unix/thread_pool.ml b/lib_eio/unix/thread_pool.ml index 2dcd5d411..7fde3a85d 100644 --- a/lib_eio/unix/thread_pool.ml +++ b/lib_eio/unix/thread_pool.ml @@ -1,23 +1,18 @@ -(* This thread pool does not spawn threads in advance, - but up to [max_standby_systhreads_per_domain] threads are - kept alive to wait for more work to arrive. - This number was chosen somewhat arbitrarily but benchmarking - shows it to be a good compromise. *) -let max_standby_systhreads_per_domain = 20 +module Zzz = Eio_utils.Zzz type job = | New | Exit | Job : { - fn: unit -> 'a; - enqueue: ('a, exn) result -> unit; + fn : unit -> 'a; + enqueue : ('a, exn) result -> unit; } -> job (* Mailbox with blocking semaphore *) module Mailbox = struct type t = { - available: Semaphore.Binary.t; - mutable cell: job; + available : Semaphore.Binary.t; + mutable cell : job; } let create () = { available = Semaphore.Binary.make false; cell = New } @@ -33,69 +28,102 @@ module Mailbox = struct mbox.cell end -(* A lock-free Treiber stack of systhreads on stand-by. - A fresh thread is created if no thread is immediately available. - When the domain exits all thread on stand-by are shutdown. *) +module Free_pool = struct + type list = + | Empty + | Closed + | Free of Mailbox.t * list + + type t = list Atomic.t + + let rec close_list = function + | Free (x, xs) -> Mailbox.put x Exit; close_list xs + | Empty | Closed -> () + + let close t = + let items = Atomic.exchange t Closed in + close_list items + + let rec drop t = + match Atomic.get t with + | Closed | Empty -> () + | Free _ as items -> + if Atomic.compare_and_set t items Empty then close_list items + else drop t + + let rec put t mbox = + match Atomic.get t with + | Closed -> assert false + | (Empty | Free _) as current -> + let next = Free (mbox, current) in + if not (Atomic.compare_and_set t current next) then + put t mbox (* concurrent update, try again *) + + let make_thread t = + let mbox = Mailbox.create () in + let _thread : Thread.t = Thread.create (fun () -> + while true do + match Mailbox.take mbox with + | New -> assert false + | Exit -> raise Thread.Exit + | Job { fn; enqueue } -> + let result = try Ok (fn ()) with exn -> Error exn in + put t mbox; (* Ensure thread is in free-pool before enqueuing. *) + enqueue result + done + ) () + in + mbox + + let rec get_thread t = + match Atomic.get t with + | Closed -> invalid_arg "Thread pool closed!" + | Empty -> make_thread t + | Free (mbox, next) as current -> + if Atomic.compare_and_set t current next then mbox + else get_thread t (* concurrent update, try again *) +end + type t = { - threads: (Mailbox.t * int) list Atomic.t; - terminating: bool Atomic.t; + free : Free_pool.t; + sleep_q : Zzz.t; + mutable timeout : Zzz.Key.t option; } -let create () = { threads = Atomic.make []; terminating = Atomic.make false } - -let terminate { threads; terminating } = - Atomic.set terminating true; - List.iter (fun (mbox, _) -> Mailbox.put mbox Exit) (Atomic.get threads) - -let rec keep_thread_or_exit ({ threads; _ } as pool) mbox = - match Atomic.get threads with - | (_, count) :: _ when count >= max_standby_systhreads_per_domain -> - (* We've got enough threads on stand-by, so discard the current thread *) - raise Thread.Exit - | current -> - let count = match current with - | [] -> 0 - | (_, count) :: _ -> count +type _ Effect.t += Run_in_systhread : (unit -> 'a) -> (('a, exn) result * t) Effect.t + +let terminate t = + Free_pool.close t.free; + Option.iter (fun key -> Zzz.remove t.sleep_q key; t.timeout <- None) t.timeout + +let create ~sleep_q = + { free = Atomic.make Free_pool.Empty; sleep_q; timeout = None } + +let run t fn = + match fn () with + | x -> terminate t; x + | exception ex -> + let bt = Printexc.get_raw_backtrace () in + terminate t; + Printexc.raise_with_backtrace ex bt + +let submit t ~ctx ~enqueue fn = + match Eio.Private.Fiber_context.get_error ctx with + | Some e -> enqueue (Error e) + | None -> + let mbox = Free_pool.get_thread t.free in + Mailbox.put mbox (Job { fn; enqueue }) + +let run_in_systhread ?(label="systhread") fn = + Eio.Private.Trace.suspend_fiber label; + let r, t = Effect.perform (Run_in_systhread fn) in + if t.timeout = None then ( + let time = + Mtime.add_span (Mtime_clock.now ()) Mtime.Span.(20 * ms) + |> Option.value ~default:Mtime.max_stamp in - if not (Atomic.compare_and_set threads current ((mbox, count + 1) :: current)) - then keep_thread_or_exit pool mbox (* concurrent update, try again *) - -let make_thread pool = - let mbox = Mailbox.create () in - let _t : Thread.t = Thread.create (fun () -> - while true do - match Mailbox.take mbox with - | New -> assert false - | Exit -> raise Thread.Exit - | Job { fn; enqueue } -> - enqueue (try Ok (fn ()) with exn -> Error exn); - (* We're not yielding inside of [keep_thread_or_exit] so - no need to check [terminating] multiple times *) - if Atomic.get pool.terminating then raise Thread.Exit; - keep_thread_or_exit pool mbox - done - ) () - in - mbox - -let rec get_thread ({ threads; _ } as pool) = - match Atomic.get threads with - | [] -> make_thread pool - | ((mbox, _count) :: rest) as current -> - if not (Atomic.compare_and_set threads current rest) - then get_thread pool (* concurrent update, try again *) - else mbox - -(* https://v2.ocaml.org/manual/parallelism.html#s:par_systhread_interaction - "Only one systhread at a time is allowed to run OCaml code on a particular domain." - So we keep a separate threadpool per domain. *) -let key = - Domain.DLS.new_key @@ fun () -> - let pool = create () in - Domain.at_exit (fun () -> terminate pool); - pool - -let run_on_systhread ~enqueue fn = - let pool = Domain.DLS.get key in - let mbox = get_thread pool in - Mailbox.put mbox (Job { fn; enqueue }) + t.timeout <- Some (Zzz.add t.sleep_q time (Fn (fun () -> Free_pool.drop t.free; t.timeout <- None))) + ); + match r with + | Ok x -> x + | Error ex -> raise ex diff --git a/lib_eio/unix/thread_pool.mli b/lib_eio/unix/thread_pool.mli index b735331c2..f0ef45deb 100644 --- a/lib_eio/unix/thread_pool.mli +++ b/lib_eio/unix/thread_pool.mli @@ -1 +1,25 @@ -val run_on_systhread : enqueue:(('a, exn) result -> unit) -> (unit -> 'a) -> unit +(** A pool of systhreads, to avoid the overhead of creating a new thread for each operation. *) + +type t + +val create : sleep_q:Eio_utils.Zzz.t -> t +(** [create ~sleep_q] is a new thread pool. + + [sleep_q] is used to register a clean-up task to finish idle threads. *) + +val run : t -> (unit -> 'a) -> 'a +(** [run t fn] runs [fn ()] and then marks [t] as closed, releasing all idle threads. *) + +val submit : + t -> + ctx:Eio.Private.Fiber_context.t -> + enqueue:(('a, exn) result -> unit) -> + (unit -> 'a) -> + unit +(** [submit t ~ctx ~enqueue fn] starts running [fn] in a sys-thread, which uses [enqueue] to return the result. + + If [ctx] is already cancelled then the error is passed to [enqueue] immediately. + Systhreads do not respond to cancellation once running. *) + +type _ Effect.t += Run_in_systhread : (unit -> 'a) -> (('a, exn) result * t) Effect.t +val run_in_systhread : ?label:string -> (unit -> 'a) -> 'a diff --git a/lib_eio/utils/zzz.ml b/lib_eio/utils/zzz.ml index 614e5abce..a3dd23149 100644 --- a/lib_eio/utils/zzz.ml +++ b/lib_eio/utils/zzz.ml @@ -3,10 +3,14 @@ module Key = struct let compare = Optint.Int63.compare end +type item = + | Fiber of unit Suspended.t + | Fn of (unit -> unit) + module Job = struct type t = { time : Mtime.t; - thread : unit Suspended.t; + item : item; } let compare a b = Mtime.compare a.time b.time @@ -21,10 +25,10 @@ type t = { let create () = { sleep_queue = Q.empty; next_id = Optint.Int63.zero } -let add t time thread = +let add t time item = let id = t.next_id in t.next_id <- Optint.Int63.succ t.next_id; - let sleeper = { Job.time; thread } in + let sleeper = { Job.time; item } in t.sleep_queue <- Q.add id sleeper t.sleep_queue; id @@ -33,9 +37,13 @@ let remove t id = let pop t ~now = match Q.min t.sleep_queue with - | Some (_, { Job.time; thread }) when time <= now -> - Eio.Private.Fiber_context.clear_cancel_fn thread.fiber; + | Some (_, { Job.time; item }) when time <= now -> + begin + match item with + | Fiber k -> Eio.Private.Fiber_context.clear_cancel_fn k.fiber + | Fn _ -> () + end; t.sleep_queue <- Option.get (Q.rest t.sleep_queue); - `Due thread + `Due item | Some (_, { Job.time; _ }) -> `Wait_until time | None -> `Nothing diff --git a/lib_eio/utils/zzz.mli b/lib_eio/utils/zzz.mli index 4005085e6..4bb1edc9f 100644 --- a/lib_eio/utils/zzz.mli +++ b/lib_eio/utils/zzz.mli @@ -8,19 +8,24 @@ end type t (** A set of timers (implemented as a priority queue). *) +type item = + | Fiber of unit Suspended.t + | Fn of (unit -> unit) + val create : unit -> t (** [create ()] is a fresh empty queue. *) -val add : t -> Mtime.t -> unit Suspended.t -> Key.t -(** [add t time thread] adds a new event, due at [time], and returns its ID. - You must use {!Eio.Private.Fiber_context.set_cancel_fn} on [thread] before - calling {!pop}. - Your cancel function should call {!remove} (in addition to resuming [thread]). *) +val add : t -> Mtime.t -> item -> Key.t +(** [add t time item] adds a new event, due at [time], and returns its ID. + + If [item] is a {!Fiber}, + you must use {!Eio.Private.Fiber_context.set_cancel_fn} on it before calling {!pop}. + Your cancel function should call {!remove} (in addition to resuming it). *) val remove : t -> Key.t -> unit (** [remove t key] removes an event previously added with [add]. *) -val pop : t -> now:Mtime.t -> [`Due of unit Suspended.t | `Wait_until of Mtime.t | `Nothing] -(** [pop ~now t] removes and returns the earliest thread due by [now]. - It also clears the thread's cancel function. - If no thread is due yet, it returns the time the earliest thread becomes due. *) +val pop : t -> now:Mtime.t -> [`Due of item | `Wait_until of Mtime.t | `Nothing] +(** [pop ~now t] removes and returns the earliest item due by [now]. + For fibers, it also clears the thread's cancel function. + If no item is due yet, it returns the time the earliest item becomes due. *) diff --git a/lib_eio_linux/low_level.ml b/lib_eio_linux/low_level.ml index 172da8f01..ce3e3df40 100644 --- a/lib_eio_linux/low_level.ml +++ b/lib_eio_linux/low_level.ml @@ -113,7 +113,7 @@ let noop () = let sleep_until time = Sched.enter "sleep" @@ fun t k -> - let job = Eio_utils.Zzz.add t.sleep_q time k in + let job = Eio_utils.Zzz.add t.sleep_q time (Fiber k) in Eio.Private.Fiber_context.set_cancel_fn k.fiber (fun ex -> Eio_utils.Zzz.remove t.sleep_q job; Sched.enqueue_failed_thread t k ex diff --git a/lib_eio_linux/sched.ml b/lib_eio_linux/sched.ml index b9e39e5f1..bce5fd79e 100644 --- a/lib_eio_linux/sched.ml +++ b/lib_eio_linux/sched.ml @@ -67,6 +67,8 @@ type t = { need_wakeup : bool Atomic.t; sleep_q: Zzz.t; + + thread_pool : Eio_unix.Private.Thread_pool.t; } type _ Effect.t += @@ -213,8 +215,12 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] = let now = Mtime_clock.now () in match Zzz.pop ~now sleep_q with | `Due k -> + (* A sleeping task is now due *) Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *) - Suspended.continue k () (* A sleeping task is now due *) + begin match k with + | Fiber k -> Suspended.continue k () + | Fn fn -> fn (); schedule st + end | `Wait_until _ | `Nothing as next_due -> (* Handle any pending events before submitting. This is faster. *) match Uring.get_cqe_nonblocking uring with @@ -447,6 +453,12 @@ let run ~extra_effects st main arg = ); schedule st ) + | Eio_unix.Private.Thread_pool.Run_in_systhread fn -> Some (fun k -> + let k = { Suspended.k; fiber } in + let enqueue x = enqueue_thread st k (x, st.thread_pool) in + Eio_unix.Private.Thread_pool.submit st.thread_pool ~ctx:fiber ~enqueue fn; + schedule st + ) | Alloc -> Some (fun k -> match st.mem with | None -> continue k None @@ -475,7 +487,7 @@ let run ~extra_effects st main arg = fork ~new_fiber (fun () -> Switch.run_protected ~name:"eio_linux" (fun sw -> Fiber.fork_daemon ~sw (fun () -> monitor_event_fd st); - match main arg with + match Eio_unix.Private.Thread_pool.run st.thread_pool (fun () -> main arg) with | x -> result := Some (Ok x) | exception ex -> let bt = Printexc.get_raw_backtrace () in @@ -548,7 +560,8 @@ let with_sched ?(fallback=no_fallback) config fn = let io_q = Queue.create () in let mem_q = Queue.create () in with_eventfd @@ fun eventfd -> - fn { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q } + let thread_pool = Eio_unix.Private.Thread_pool.create ~sleep_q in + fn { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q; thread_pool } with | x -> Uring.exit uring; x | exception ex -> diff --git a/lib_eio_posix/sched.ml b/lib_eio_posix/sched.ml index 74c6e6c99..a0de826c7 100644 --- a/lib_eio_posix/sched.ml +++ b/lib_eio_posix/sched.ml @@ -59,6 +59,8 @@ type t = { need_wakeup : bool Atomic.t; sleep_q: Zzz.t; (* Fibers waiting for timers. *) + + thread_pool : Eio_unix.Private.Thread_pool.t; } (* The message to send to [eventfd] (any character would do). *) @@ -185,8 +187,12 @@ let rec next t : [`Exit_scheduler] = let now = Mtime_clock.now () in match Zzz.pop ~now t.sleep_q with | `Due k -> + (* A sleeping task is now due *) Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *) - Suspended.continue k () (* A sleeping task is now due *) + begin match k with + | Fiber k -> Suspended.continue k () + | Fn fn -> fn (); next t + end | `Wait_until _ | `Nothing as next_due -> let timeout = match next_due with @@ -242,8 +248,9 @@ let with_sched fn = in let poll = Poll.create () in let fd_map = Hashtbl.create 10 in + let thread_pool = Eio_unix.Private.Thread_pool.create ~sleep_q in let t = { run_q; poll; poll_maxi = (-1); fd_map; eventfd; eventfd_r; - active_ops = 0; need_wakeup = Atomic.make false; sleep_q } in + active_ops = 0; need_wakeup = Atomic.make false; sleep_q; thread_pool } in let eventfd_ri = Iomux.Util.fd_of_unix eventfd_r in Poll.set_index t.poll eventfd_ri eventfd_r Poll.Flags.pollin; if eventfd_ri > t.poll_maxi then @@ -299,7 +306,7 @@ let await_timeout t (k : unit Suspended.t) time = match Fiber_context.get_error k.fiber with | Some e -> Suspended.discontinue k e | None -> - let node = Zzz.add t.sleep_q time k in + let node = Zzz.add t.sleep_q time (Fiber k) in Fiber_context.set_cancel_fn k.fiber (fun ex -> Zzz.remove t.sleep_q node; enqueue_failed_thread t k ex @@ -359,6 +366,12 @@ let run ~extra_effects t main x = | Eio_unix.Private.Await_writable fd -> Some (fun k -> await_writable t { Suspended.k; fiber } fd ) + | Eio_unix.Private.Thread_pool.Run_in_systhread fn -> Some (fun k -> + let k = { Suspended.k; fiber } in + let enqueue x = enqueue_thread t k (x, t.thread_pool) in + Eio_unix.Private.Thread_pool.submit t.thread_pool ~ctx:fiber ~enqueue fn; + next t + ) | e -> extra_effects.Effect.Deep.effc e } in @@ -368,10 +381,11 @@ let run ~extra_effects t main x = Domain_local_await.using ~prepare_for_await:Eio.Private.Dla.prepare_for_await ~while_running:(fun () -> - fork ~new_fiber (fun () -> - result := Some (with_op t main x); + fork ~new_fiber (fun () -> + Eio_unix.Private.Thread_pool.run t.thread_pool @@ fun () -> + result := Some (with_op t main x); + ) ) - ) in match !result with | Some x -> x diff --git a/lib_eio_windows/sched.ml b/lib_eio_windows/sched.ml index 365d7b8cf..46a0ba650 100755 --- a/lib_eio_windows/sched.ml +++ b/lib_eio_windows/sched.ml @@ -70,6 +70,8 @@ type t = { need_wakeup : bool Atomic.t; sleep_q: Zzz.t; (* Fibers waiting for timers. *) + + thread_pool : Eio_unix.Private.Thread_pool.t; } (* The message to send to [eventfd] (any character would do). *) @@ -124,17 +126,17 @@ let update t waiters fd = | false, true -> `W | true, true -> `RW in - match flags with - | `Empty -> ( + match flags with + | `Empty -> ( t.poll.to_read <- FdSet.remove fd t.poll.to_read; t.poll.to_write <- FdSet.remove fd t.poll.to_write; Hashtbl.remove t.fd_map fd ) - | `R -> t.poll.to_read <- FdSet.add fd t.poll.to_read - | `W -> t.poll.to_write <- FdSet.add fd t.poll.to_write - | `RW -> - t.poll.to_read <- FdSet.add fd t.poll.to_read; - t.poll.to_write <- FdSet.add fd t.poll.to_write + | `R -> t.poll.to_read <- FdSet.add fd t.poll.to_read + | `W -> t.poll.to_write <- FdSet.add fd t.poll.to_write + | `RW -> + t.poll.to_read <- FdSet.add fd t.poll.to_read; + t.poll.to_write <- FdSet.add fd t.poll.to_write let resume t node = t.active_ops <- t.active_ops - 1; @@ -178,8 +180,12 @@ let rec next t : [`Exit_scheduler] = let now = Mtime_clock.now () in match Zzz.pop ~now t.sleep_q with | `Due k -> + (* A sleeping task is now due *) Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *) - Suspended.continue k () (* A sleeping task is now due *) + begin match k with + | Fiber k -> Suspended.continue k () + | Fn fn -> fn (); next t + end | `Wait_until _ | `Nothing as next_due -> let timeout = match next_due with @@ -243,8 +249,9 @@ let with_sched fn = in let poll = { to_read = FdSet.empty; to_write = FdSet.empty } in let fd_map = Hashtbl.create 10 in + let thread_pool = Eio_unix.Private.Thread_pool.create ~sleep_q in let t = { run_q; poll; fd_map; eventfd; eventfd_r; - active_ops = 0; need_wakeup = Atomic.make false; sleep_q } in + active_ops = 0; need_wakeup = Atomic.make false; sleep_q; thread_pool } in t.poll.to_read <- FdSet.add eventfd_r t.poll.to_read; match fn t with | x -> cleanup (); x @@ -293,7 +300,7 @@ let await_timeout t (k : unit Suspended.t) time = match Fiber_context.get_error k.fiber with | Some e -> Suspended.discontinue k e | None -> - let node = Zzz.add t.sleep_q time k in + let node = Zzz.add t.sleep_q time (Fiber k) in Fiber_context.set_cancel_fn k.fiber (fun ex -> Zzz.remove t.sleep_q node; enqueue_failed_thread t k ex @@ -350,6 +357,12 @@ let run ~extra_effects t main x = | Eio_unix.Private.Await_writable fd -> Some (fun k -> await_writable t { Suspended.k; fiber } fd ) + | Eio_unix.Private.Thread_pool.Run_in_systhread fn -> Some (fun k -> + let k = { Suspended.k; fiber } in + let enqueue x = enqueue_thread t k (x, t.thread_pool) in + Eio_unix.Private.Thread_pool.submit t.thread_pool ~ctx:fiber ~enqueue fn; + next t + ) | e -> extra_effects.Effect.Deep.effc e } in @@ -359,10 +372,11 @@ let run ~extra_effects t main x = Domain_local_await.using ~prepare_for_await:Eio.Private.Dla.prepare_for_await ~while_running:(fun () -> - fork ~new_fiber (fun () -> - result := Some (with_op t main x); + fork ~new_fiber (fun () -> + Eio_unix.Private.Thread_pool.run t.thread_pool @@ fun () -> + result := Some (with_op t main x); + ) ) - ) in match !result with | Some x -> x