From 288c1af94b095f6b24eec2f3508f8780cf270366 Mon Sep 17 00:00:00 2001 From: Rudi Grinberg Date: Tue, 28 Feb 2023 17:49:56 -0600 Subject: [PATCH] feature: add a thread pool replace the current implementation with an unbounded number of workers Signed-off-by: Rudi Grinberg --- bench/micro/dune | 14 ++ bench/micro/runner.sh | 1 + bench/micro/thread_pool_bench.ml | 30 ++++ bench/micro/thread_pool_bench_main.ml | 1 + boot/libs.ml | 4 +- src/csexp_rpc/csexp_rpc.ml | 129 ++++++------------ src/csexp_rpc/csexp_rpc.mli | 28 +--- src/dune_engine/dune | 3 +- src/dune_engine/scheduler.ml | 66 ++++----- src/dune_engine/scheduler.mli | 20 +-- src/dune_rpc_client/client.ml | 2 +- src/{thread_worker => thread_pool}/dune | 2 +- src/thread_pool/thread_pool.ml | 73 ++++++++++ src/thread_pool/thread_pool.mli | 14 ++ src/thread_worker/thread_worker.ml | 78 ----------- src/thread_worker/thread_worker.mli | 11 -- .../expect-tests/csexp_rpc/csexp_rpc_tests.ml | 2 +- .../expect-tests/dune_rpc_e2e/dune_rpc_e2e.ml | 7 +- 18 files changed, 216 insertions(+), 269 deletions(-) create mode 100644 bench/micro/thread_pool_bench.ml create mode 100644 bench/micro/thread_pool_bench_main.ml rename src/{thread_worker => thread_pool}/dune (77%) create mode 100644 src/thread_pool/thread_pool.ml create mode 100644 src/thread_pool/thread_pool.mli delete mode 100644 src/thread_worker/thread_worker.ml delete mode 100644 src/thread_worker/thread_worker.mli diff --git a/bench/micro/dune b/bench/micro/dune index 345f896dfaa5..349186032f09 100644 --- a/bench/micro/dune +++ b/bench/micro/dune @@ -31,3 +31,17 @@ (allow_overlapping_dependencies) (modules memo_bench_main) (libraries memo_bench core_bench.inline_benchmarks)) + +(library + (name thread_pool_bench) + (modules thread_pool_bench) + (library_flags -linkall) + (preprocess + (pps ppx_bench)) + (libraries dune_thread_pool threads.posix core_bench.inline_benchmarks)) + +(executable + (name thread_pool_bench_main) + (allow_overlapping_dependencies) + (modules thread_pool_bench_main) + (libraries thread_pool_bench core_bench.inline_benchmarks)) diff --git a/bench/micro/runner.sh b/bench/micro/runner.sh index 47a99cc58e89..9e3e78a31303 100755 --- a/bench/micro/runner.sh +++ b/bench/micro/runner.sh @@ -4,6 +4,7 @@ case "$1" in "dune" ) test="dune_bench"; main="main";; "fiber" ) test="fiber_bench"; main="fiber_bench_main";; "memo" ) test="memo_bench"; main="memo_bench_main";; + "thread_pool" ) test="thread_pool"; main="thread_pool_bench_main";; esac shift; export BENCH_LIB="$test" diff --git a/bench/micro/thread_pool_bench.ml b/bench/micro/thread_pool_bench.ml new file mode 100644 index 000000000000..2da94dd1141e --- /dev/null +++ b/bench/micro/thread_pool_bench.ml @@ -0,0 +1,30 @@ +open Dune_thread_pool + +let spawn_thread f = ignore (Thread.create f ()) + +let%bench "almost no-op" = + let tp = Thread_pool.create ~min_workers:10 ~max_workers:50 ~spawn_thread in + let tasks = 50_000 in + let counter = Atomic.make tasks in + let f () = Atomic.decr counter in + for _ = 0 to tasks - 1 do + Thread_pool.task tp ~f + done; + while Atomic.get counter > 0 do + Thread.yield () + done + +let%bench "syscall" = + let tp = Thread_pool.create ~min_workers:10 ~max_workers:50 ~spawn_thread in + let tasks = 50_000 in + let counter = Atomic.make tasks in + let f () = + Unix.sleepf 0.0; + Atomic.decr counter + in + for _ = 0 to tasks - 1 do + Thread_pool.task tp ~f + done; + while Atomic.get counter > 0 do + Thread.yield () + done diff --git a/bench/micro/thread_pool_bench_main.ml b/bench/micro/thread_pool_bench_main.ml new file mode 100644 index 000000000000..4c38b20b9e1e --- /dev/null +++ b/bench/micro/thread_pool_bench_main.ml @@ -0,0 +1 @@ +Inline_benchmarks_public.Runner.main ~libname:"thread_pool_bench" diff --git a/boot/libs.ml b/boot/libs.ml index 249aac4d3864..ea448f422b0b 100644 --- a/boot/libs.ml +++ b/boot/libs.ml @@ -42,7 +42,8 @@ let local_libraries = ; ("src/meta_parser", Some "Dune_meta_parser", false, None) ; ("src/csexp_rpc", Some "Csexp_rpc", false, None) ; ("src/dune_rpc_server", Some "Dune_rpc_server", false, None) - ; ("src/thread_worker", Some "Thread_worker", false, None) + ; ("src/dune_rpc_client", Some "Dune_rpc_client", false, None) + ; ("src/thread_pool", Some "Dune_thread_pool", false, None) ; ("otherlibs/ocamlc_loc/src", Some "Ocamlc_loc", false, None) ; ("src/fsevents", Some "Fsevents", false, None) ; ("vendor/ocaml-inotify/src", Some "Ocaml_inotify", false, None) @@ -62,7 +63,6 @@ let local_libraries = ; ("vendor/cmdliner/src", None, false, None) ; ("otherlibs/build-info/src", Some "Build_info", false, Some "Build_info_data") - ; ("src/dune_rpc_client", Some "Dune_rpc_client", false, None) ; ("src/dune_rpc_impl", Some "Dune_rpc_impl", false, None) ] diff --git a/src/csexp_rpc/csexp_rpc.ml b/src/csexp_rpc/csexp_rpc.ml index 3ccbb7700748..1621f4a53c3a 100644 --- a/src/csexp_rpc/csexp_rpc.ml +++ b/src/csexp_rpc/csexp_rpc.ml @@ -2,47 +2,21 @@ open Stdune open Fiber.O module Log = Dune_util.Log -module type Worker = sig - type t - - val create : unit -> t Fiber.t - - val stop : t -> unit - - val task : - t - -> f:(unit -> 'a) - -> ('a, [ `Exn of Exn_with_backtrace.t | `Stopped ]) result Fiber.t +module type Scheduler = sig + val async : (unit -> 'a) -> ('a, Exn_with_backtrace.t) result Fiber.t end -let worker = Fdecl.create Dyn.opaque - -module Worker = struct - type t = - { stop : unit -> unit - ; task : - 'a. - (unit -> 'a) - -> ('a, [ `Exn of Exn_with_backtrace.t | `Stopped ]) result Fiber.t - } +let scheduler = Fdecl.create Dyn.opaque - let create () = - let open Fiber.O in - let (module Worker : Worker) = Fdecl.get worker in - let+ w = Worker.create () in - { stop = (fun () -> Worker.stop w); task = (fun f -> Worker.task w ~f) } +let async f = + let module Scheduler = (val Fdecl.get scheduler : Scheduler) in + Scheduler.async f - let stop t = t.stop () - - let task t ~f = t.task f - - let task_exn t ~f = - let+ res = task t ~f in - match res with - | Error `Stopped -> assert false - | Error (`Exn e) -> Exn_with_backtrace.reraise e - | Ok s -> s -end +let async_exn f = + let+ res = async f in + match res with + | Ok s -> s + | Error e -> Exn_with_backtrace.reraise e module Session_id = Id.Make () @@ -127,8 +101,6 @@ module Session = struct { out_channel : out_channel ; in_channel : in_channel ; socket : bool - ; writer : Worker.t - ; reader : Worker.t } type t = @@ -140,9 +112,7 @@ module Session = struct let id = Id.gen () in if debug then Log.info [ Pp.textf "RPC created new session %d" (Id.to_int id) ]; - let* reader = Worker.create () in - let+ writer = Worker.create () in - let state = Open { in_channel; out_channel; reader; writer; socket } in + let state = Open { in_channel; out_channel; socket } in { id; state } let string_of_packet = function @@ -156,9 +126,7 @@ module Session = struct let close t = match t.state with | Closed -> () - | Open { in_channel; out_channel; reader; writer; socket } -> - Worker.stop reader; - Worker.stop writer; + | Open { in_channel; out_channel; socket } -> (* with a socket, there's only one fd. We make sure to close it only once. with dune rpc init, we have two separate fd's (stdin/stdout) so we must close both. *) @@ -179,7 +147,7 @@ module Session = struct | Closed -> debug None; Fiber.return None - | Open { reader; in_channel; _ } -> + | Open { in_channel; _ } -> let rec read () = match Csexp.input_opt in_channel with | exception Unix.Unix_error (_, _, _) -> None @@ -189,13 +157,12 @@ module Session = struct | Ok (Some csexp) -> Some csexp | Error _ -> None in - let+ res = Worker.task reader ~f:read in + let+ res = async read in let res = match res with - | Error (`Exn _) -> + | Error _ -> close t; None - | Error `Stopped -> None | Ok None -> close t; None @@ -219,7 +186,7 @@ module Session = struct | Some sexps -> Code_error.raise "attempting to write to a closed channel" [ ("sexp", Dyn.(list Sexp.to_dyn) sexps) ]) - | Open { writer; out_channel; socket; _ } -> ( + | Open { out_channel; socket; _ } -> ( match sexps with | None -> (if socket then @@ -233,14 +200,13 @@ module Session = struct Fiber.return () | Some sexps -> ( let+ res = - Worker.task writer ~f:(fun () -> + async (fun () -> List.iter sexps ~f:(Csexp.to_channel out_channel); flush out_channel) in match res with | Ok () -> () - | Error `Stopped -> assert false - | Error (`Exn e) -> + | Error e -> close t; Exn_with_backtrace.reraise e)) end @@ -313,19 +279,17 @@ module Server = struct let ready t = Fiber.Ivar.read t.ready let serve (t : t) = - let* async = Worker.create () in match t.state with | `Closed -> Code_error.raise "already closed" [] | `Running _ -> Code_error.raise "already running" [] | `Init fd -> let* transport = - Worker.task_exn async ~f:(fun () -> - Transport.create fd t.sockaddr ~backlog:t.backlog) + async_exn (fun () -> Transport.create fd t.sockaddr ~backlog:t.backlog) in t.state <- `Running transport; let+ () = Fiber.Ivar.fill t.ready () in let accept () = - Worker.task async ~f:(fun () -> + async (fun () -> Transport.accept transport |> Option.map ~f:(fun client -> let in_ = Unix.in_channel_of_descr client in @@ -333,26 +297,23 @@ module Server = struct (in_, out))) in let loop () = - let* accept = accept () in + let+ accept = accept () in match accept with - | Error `Stopped -> - Log.info [ Pp.text "RPC stopped accepting." ]; - Fiber.return None - | Error (`Exn exn) -> + | Error exn -> Log.info [ Pp.text "RPC accept failed. Server will not accept new clients" ; Exn_with_backtrace.pp exn ]; - Fiber.return None + None | Ok None -> Log.info [ Pp.text "RPC accepted the last client. No more clients will be \ accepted." ]; - Fiber.return None + None | Ok (Some (in_, out)) -> - let+ session = Session.create ~socket:true in_ out in + let session = Session.create ~socket:true in_ out in Some session in Fiber.Stream.In.create loop @@ -396,36 +357,24 @@ module Client = struct type t = { mutable transport : Transport.t option - ; mutable async : Worker.t option ; sockaddr : Unix.sockaddr } - let create sockaddr = - let+ async = Worker.create () in - { sockaddr; async = Some async; transport = None } + let create sockaddr = { sockaddr; transport = None } let connect t = - match t.async with - | None -> - Code_error.raise "connection already established with the client" [] - | Some async -> ( - t.async <- None; - let* task = - Worker.task async ~f:(fun () -> - let transport = Transport.create t.sockaddr in - t.transport <- Some transport; - let client = Transport.connect transport in - let out = Unix.out_channel_of_descr client in - let in_ = Unix.in_channel_of_descr client in - (in_, out)) - in - Worker.stop async; - match task with - | Error `Stopped -> assert false - | Error (`Exn exn) -> Fiber.return (Error exn) - | Ok (in_, out) -> - let+ res = Session.create ~socket:true in_ out in - Ok res) + let+ task = + async (fun () -> + let transport = Transport.create t.sockaddr in + t.transport <- Some transport; + let client = Transport.connect transport in + let out = Unix.out_channel_of_descr client in + let in_ = Unix.in_channel_of_descr client in + (in_, out)) + in + match task with + | Error exn -> Error exn + | Ok (in_, out) -> Ok (Session.create ~socket:true in_ out) let connect_exn t = let+ res = connect t in diff --git a/src/csexp_rpc/csexp_rpc.mli b/src/csexp_rpc/csexp_rpc.mli index c12d5f3961eb..aec1d2dde9ff 100644 --- a/src/csexp_rpc/csexp_rpc.mli +++ b/src/csexp_rpc/csexp_rpc.mli @@ -15,37 +15,21 @@ open Stdune -module type Worker = sig - (** [Csexp_rpc] relies on background threads for all blocking operations. +module type Scheduler = sig + (** [async f] enqueue task [f] *) - This is the signature for all the operation it needs for such background - threads. *) - - (** A worker doing tasks in a background thread *) - type t - - (** Create a worker doing tasks in a new background thread *) - val create : unit -> t Fiber.t - - (** Stop the worker. Tasks that aren't yet started will not be completed. *) - val stop : t -> unit - - (** [task t ~f] enqueue task [f] for worker [t] *) - val task : - t - -> f:(unit -> 'a) - -> ('a, [ `Exn of Exn_with_backtrace.t | `Stopped ]) result Fiber.t + val async : (unit -> 'a) -> ('a, Exn_with_backtrace.t) result Fiber.t end (** Hack until we move [Dune_engine.Scheduler] into own library *) -val worker : (module Worker) Fdecl.t +val scheduler : (module Scheduler) Fdecl.t module Session : sig (** Rpc session backed by two threads. One thread for reading, and another for writing *) type t - val create : socket:bool -> in_channel -> out_channel -> t Fiber.t + val create : socket:bool -> in_channel -> out_channel -> t (* [write t x] writes the s-expression when [x] is [Some sexp], and closes the session if [x = None ] *) @@ -60,7 +44,7 @@ module Client : sig (** RPC Client *) type t - val create : Unix.sockaddr -> t Fiber.t + val create : Unix.sockaddr -> t val stop : t -> unit diff --git a/src/dune_engine/dune b/src/dune_engine/dune index a80a64ac1617..5714f74e7aa6 100644 --- a/src/dune_engine/dune +++ b/src/dune_engine/dune @@ -29,7 +29,8 @@ csexp_rpc dune_rpc_private dune_rpc_server - thread_worker + dune_rpc_client + dune_thread_pool spawn ocamlc_loc dune_file_watcher diff --git a/src/dune_engine/scheduler.ml b/src/dune_engine/scheduler.ml index bf180a5c08f1..68362a139c77 100644 --- a/src/dune_engine/scheduler.ml +++ b/src/dune_engine/scheduler.ml @@ -1,5 +1,6 @@ open Import open Fiber.O +open Dune_thread_pool module Config = struct include Config @@ -796,6 +797,7 @@ type t = ; fs_syncs : unit Fiber.Ivar.t Dune_file_watcher.Sync_id.Table.t ; wait_for_build_input_change : unit Fiber.Ivar.t option ref ; cancel : Fiber.Cancel.t + ; thread_pool : Thread_pool.t } let t : t Fiber.Var.t = Fiber.Var.create () @@ -920,6 +922,8 @@ let prepare (config : Config.t) ~(handler : Handler.t) = be an [option]. We use a dummy cancellation rather than an option to keep the code simpler. *) Fiber.Cancel.create () + ; thread_pool = + Thread_pool.create ~spawn_thread ~min_workers:4 ~max_workers:50 } ) module Run_once : sig @@ -1048,47 +1052,27 @@ end = struct | Ok -> res end -module Worker = struct - type t = - { worker : Thread_worker.t - ; events : Event.Queue.t - } - - let stop t = Thread_worker.stop t.worker - - let create () = - let+ scheduler = t () in - let worker = - let signal_watcher = scheduler.config.signal_watcher in - Thread_worker.create ~spawn_thread:(Thread.spawn ~signal_watcher) - in - { worker; events = scheduler.events } - - let task (t : t) ~f = - let ivar = Fiber.Ivar.create () in - let f () = - let res = Exn_with_backtrace.try_with f in - Event.Queue.send_worker_task_completed t.events (Fiber.Fill (ivar, res)) - in - match Thread_worker.add_work t.worker ~f with - | Error `Stopped -> Fiber.return (Error `Stopped) - | Ok () -> ( - Event.Queue.register_worker_task_started t.events; - let+ res = Fiber.Ivar.read ivar in - match res with - | Error exn -> Error (`Exn exn) - | Ok e -> Ok e) - - let task_exn t ~f = - let+ res = task t ~f in - match res with - | Ok a -> a - | Error `Stopped -> - Code_error.raise "Scheduler.Worker.task_exn: worker stopped" [] - | Error (`Exn e) -> Exn_with_backtrace.reraise e -end - -let () = Fdecl.set Csexp_rpc.worker (module Worker) +let async f = + let* t = t () in + let ivar = Fiber.Ivar.create () in + let f () = + let res = Exn_with_backtrace.try_with f in + Event.Queue.send_worker_task_completed t.events (Fiber.Fill (ivar, res)) + in + Thread_pool.task t.thread_pool ~f; + Event.Queue.register_worker_task_started t.events; + Fiber.Ivar.read ivar + +let () = + Fdecl.set Csexp_rpc.scheduler + (module struct + let async f = async f + end) + +let async_exn f = + async f >>| function + | Error exn -> Exn_with_backtrace.reraise exn + | Ok e -> e let flush_file_watcher t = match t.file_watcher with diff --git a/src/dune_engine/scheduler.mli b/src/dune_engine/scheduler.mli index eab813de55dc..c27a2c1de64b 100644 --- a/src/dune_engine/scheduler.mli +++ b/src/dune_engine/scheduler.mli @@ -74,23 +74,11 @@ module Run : sig -> 'a end -module Worker : sig - (** A worker is a thread that runs submitted tasks *) - type t +(** [async f] run [f] inside a background thread pool *) +val async : (unit -> 'a) -> ('a, Exn_with_backtrace.t) result Fiber.t - val create : unit -> t Fiber.t - - val task : - t - -> f:(unit -> 'a) - -> ('a, [ `Exn of Exn_with_backtrace.t | `Stopped ]) result Fiber.t - - (** Should be used for tasks never raise and always complete before stop is - called *) - val task_exn : t -> f:(unit -> 'a) -> 'a Fiber.t - - val stop : t -> unit -end +(** [async_exn f] run [f] inside a background thread pool *) +val async_exn : (unit -> 'a) -> 'a Fiber.t type t diff --git a/src/dune_rpc_client/client.ml b/src/dune_rpc_client/client.ml index e2c163715f07..57c96d9fa9ab 100644 --- a/src/dune_rpc_client/client.ml +++ b/src/dune_rpc_client/client.ml @@ -9,7 +9,7 @@ module Connection = struct let connect where = let sock = Where.to_socket where in - let* client = Csexp_rpc.Client.create sock in + let client = Csexp_rpc.Client.create sock in let+ res = Csexp_rpc.Client.connect client in match res with | Ok s -> Ok s diff --git a/src/thread_worker/dune b/src/thread_pool/dune similarity index 77% rename from src/thread_worker/dune rename to src/thread_pool/dune index 1895fd928e94..5e2e6137b0df 100644 --- a/src/thread_worker/dune +++ b/src/thread_pool/dune @@ -1,5 +1,5 @@ (library - (name thread_worker) + (name dune_thread_pool) (libraries stdune threads.posix) (instrumentation (backend bisect_ppx))) diff --git a/src/thread_pool/thread_pool.ml b/src/thread_pool/thread_pool.ml new file mode 100644 index 000000000000..a30317dbfdc4 --- /dev/null +++ b/src/thread_pool/thread_pool.ml @@ -0,0 +1,73 @@ +open Stdune + +type t = + { mutex : Mutex.t + ; cv : Condition.t + ; spawn_thread : (unit -> unit) -> unit + ; tasks : (unit -> unit) Queue.t + ; min_workers : int + ; max_workers : int + ; (* number of threads waiting for a task *) + mutable idle : int + ; (* total number of running threads *) + mutable running : int + ; mutable dead : Thread.t list + } + +let spawn_worker t = + let rec loop () = + while Queue.is_empty t.tasks do + (* TODO [pthread_cond_timedwait] to set a maximum time for idling *) + Condition.wait t.cv t.mutex + done; + let task = Queue.pop_exn t.tasks in + t.idle <- t.idle - 1; + Mutex.unlock t.mutex; + (match task () with + | exception _ -> assert false + | () -> ()); + Mutex.lock t.mutex; + maybe_retry () + and start () = + Mutex.lock t.mutex; + t.idle <- t.idle + 1; + loop () + and maybe_retry () = + if t.running <= t.min_workers then loop () + else ( + t.running <- t.running - 1; + t.dead <- Thread.self () :: t.dead; + Mutex.unlock t.mutex) + in + t.running <- t.running + 1; + t.spawn_thread start + +let maybe_spawn_worker t = + if t.idle = 0 && t.running < t.max_workers then spawn_worker t + +let create ~min_workers ~max_workers ~spawn_thread = + let t = + { min_workers + ; max_workers + ; spawn_thread + ; cv = Condition.create () + ; mutex = Mutex.create () + ; tasks = Queue.create () + ; idle = 0 + ; running = 0 + ; dead = [] + } + in + for _ = 0 to min_workers - 1 do + spawn_worker t + done; + t + +let task t ~f = + Mutex.lock t.mutex; + List.iter t.dead ~f:Thread.join; + t.dead <- []; + Queue.push t.tasks f; + maybe_spawn_worker t; + Condition.signal t.cv; + Mutex.unlock t.mutex diff --git a/src/thread_pool/thread_pool.mli b/src/thread_pool/thread_pool.mli new file mode 100644 index 000000000000..82c209012b98 --- /dev/null +++ b/src/thread_pool/thread_pool.mli @@ -0,0 +1,14 @@ +(** Simple thread pool *) + +(** A thread pool *) +type t + +val create : + min_workers:int (** minimum number of threads to spawn *) + -> max_workers:int (** maximum number of threads to spawn *) + -> spawn_thread:((unit -> unit) -> unit) + (** [spawn_thread f] launches [f] in a thread *) + -> t + +(** [task t ~f] runs [f] inside the pool [f] *) +val task : t -> f:(unit -> unit) -> unit diff --git a/src/thread_worker/thread_worker.ml b/src/thread_worker/thread_worker.ml deleted file mode 100644 index 41867dab82ba..000000000000 --- a/src/thread_worker/thread_worker.ml +++ /dev/null @@ -1,78 +0,0 @@ -open Stdune - -let with_mutex t ~f = - Mutex.lock t; - let res = f () in - Mutex.unlock t; - res - -type state = - | Running - | Stopped - | Finished - -type t = - { work : (unit -> unit) Queue.t - ; mutable state : state - ; mutex : Mutex.t - ; work_available : Condition.t - } - -let is_running t = - match t.state with - | Running -> true - | Stopped | Finished -> false - -let run t = - let rec loop () = - match t.state with - | Stopped -> ( - match Queue.pop t.work with - | None -> t.state <- Finished - | Some job -> do_work job) - | Finished -> () - | Running -> ( - match Queue.pop t.work with - | Some job -> do_work job - | None -> - while Queue.is_empty t.work && is_running t do - Condition.wait t.work_available t.mutex - done; - loop ()) - and do_work job = - Mutex.unlock t.mutex; - job (); - Mutex.lock t.mutex; - loop () - in - Mutex.lock t.mutex; - loop (); - Mutex.unlock t.mutex - -let create ~spawn_thread = - let t = - { work = Queue.create () - ; state = Finished - ; mutex = Mutex.create () - ; work_available = Condition.create () - } - in - t.state <- Running; - spawn_thread (fun () -> run t); - t - -let add_work t ~f = - with_mutex t.mutex ~f:(fun () -> - if is_running t then ( - Queue.push t.work f; - Condition.signal t.work_available; - Ok ()) - else Error `Stopped) - -let stop t = - with_mutex t.mutex ~f:(fun () -> - match t.state with - | Running -> - t.state <- Stopped; - Condition.signal t.work_available - | Stopped | Finished -> ()) diff --git a/src/thread_worker/thread_worker.mli b/src/thread_worker/thread_worker.mli deleted file mode 100644 index 8a3c67ebf4a4..000000000000 --- a/src/thread_worker/thread_worker.mli +++ /dev/null @@ -1,11 +0,0 @@ -(** A system thread used to execute blocking functions asynchronously *) - -type t - -(** Spawn a new worker thread *) -val create : spawn_thread:((unit -> unit) -> unit) -> t - -(** [add_work t ~f] add the work of running [f] to this worker *) -val add_work : t -> f:(unit -> unit) -> (unit, [ `Stopped ]) result - -val stop : t -> unit diff --git a/test/expect-tests/csexp_rpc/csexp_rpc_tests.ml b/test/expect-tests/csexp_rpc/csexp_rpc_tests.ml index ce7046b30363..7e69c1977a3b 100644 --- a/test/expect-tests/csexp_rpc/csexp_rpc_tests.ml +++ b/test/expect-tests/csexp_rpc/csexp_rpc_tests.ml @@ -49,7 +49,7 @@ let%expect_test "csexp server life cycle" = let run () = let server = server addr in let* sessions = Server.serve server in - let* client = client (Csexp_rpc.Server.listening_address server) in + let client = client (Csexp_rpc.Server.listening_address server) in Fiber.fork_and_join_unit (fun () -> let log fmt = Logger.log client_log fmt in diff --git a/test/expect-tests/dune_rpc_e2e/dune_rpc_e2e.ml b/test/expect-tests/dune_rpc_e2e/dune_rpc_e2e.ml index 61a4a37c51c7..2b4517a4c9a2 100644 --- a/test/expect-tests/dune_rpc_e2e/dune_rpc_e2e.ml +++ b/test/expect-tests/dune_rpc_e2e/dune_rpc_e2e.ml @@ -70,14 +70,12 @@ let run_client ?handler f = notification_exn client Dune_rpc.Public.Notification.shutdown ())) let read_lines in_ = - let* reader = Scheduler.Worker.create () in let in_ = Unix.in_channel_of_descr in_ in let rec loop acc = - let* res = Scheduler.Worker.task reader ~f:(fun () -> input_line in_) in + let* res = Scheduler.async (fun () -> input_line in_) in match res with | Ok a -> loop (a :: acc) - | Error `Stopped -> assert false - | Error (`Exn e) -> + | Error e -> (match e.exn with | End_of_file -> () | _ -> @@ -86,7 +84,6 @@ let read_lines in_ = Fiber.return (String.concat (List.rev acc) ~sep:"\n") in let+ res = loop [] in - Scheduler.Worker.stop reader; close_in_noerr in_; res