Skip to content

Commit

Permalink
feature: add a thread pool (#7201)
Browse files Browse the repository at this point in the history
replace the current implementation with an unbounded number of workers

Signed-off-by: Rudi Grinberg <me@rgrinberg.com>
  • Loading branch information
rgrinberg authored Mar 17, 2023
1 parent dfd5d29 commit 8bea7ea
Show file tree
Hide file tree
Showing 18 changed files with 218 additions and 270 deletions.
14 changes: 14 additions & 0 deletions bench/micro/dune
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,17 @@
(allow_overlapping_dependencies)
(modules memo_bench_main)
(libraries memo_bench core_bench.inline_benchmarks))

(library
(name thread_pool_bench)
(modules thread_pool_bench)
(library_flags -linkall)
(preprocess
(pps ppx_bench))
(libraries dune_thread_pool threads.posix core_bench.inline_benchmarks))

(executable
(name thread_pool_bench_main)
(allow_overlapping_dependencies)
(modules thread_pool_bench_main)
(libraries thread_pool_bench core_bench.inline_benchmarks))
1 change: 1 addition & 0 deletions bench/micro/runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ case "$1" in
"dune" ) test="dune_bench"; main="main";;
"fiber" ) test="fiber_bench"; main="fiber_bench_main";;
"memo" ) test="memo_bench"; main="memo_bench_main";;
"thread_pool" ) test="thread_pool"; main="thread_pool_bench_main";;
esac
shift;
export BENCH_LIB="$test"
Expand Down
30 changes: 30 additions & 0 deletions bench/micro/thread_pool_bench.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
open Dune_thread_pool

let spawn_thread f = ignore (Thread.create f ())

let%bench "almost no-op" =
let tp = Thread_pool.create ~min_workers:10 ~max_workers:50 ~spawn_thread in
let tasks = 50_000 in
let counter = Atomic.make tasks in
let f () = Atomic.decr counter in
for _ = 0 to tasks - 1 do
Thread_pool.task tp ~f
done;
while Atomic.get counter > 0 do
Thread.yield ()
done

let%bench "syscall" =
let tp = Thread_pool.create ~min_workers:10 ~max_workers:50 ~spawn_thread in
let tasks = 50_000 in
let counter = Atomic.make tasks in
let f () =
Unix.sleepf 0.0;
Atomic.decr counter
in
for _ = 0 to tasks - 1 do
Thread_pool.task tp ~f
done;
while Atomic.get counter > 0 do
Thread.yield ()
done
1 change: 1 addition & 0 deletions bench/micro/thread_pool_bench_main.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Inline_benchmarks_public.Runner.main ~libname:"thread_pool_bench"
4 changes: 2 additions & 2 deletions boot/libs.ml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ let local_libraries =
; ("src/meta_parser", Some "Dune_meta_parser", false, None)
; ("src/csexp_rpc", Some "Csexp_rpc", false, None)
; ("src/dune_rpc_server", Some "Dune_rpc_server", false, None)
; ("src/thread_worker", Some "Thread_worker", false, None)
; ("src/dune_rpc_client", Some "Dune_rpc_client", false, None)
; ("src/thread_pool", Some "Dune_thread_pool", false, None)
; ("otherlibs/ocamlc_loc/src", Some "Ocamlc_loc", false, None)
; ("src/fsevents", Some "Fsevents", false, None)
; ("vendor/ocaml-inotify/src", Some "Ocaml_inotify", false, None)
Expand All @@ -62,7 +63,6 @@ let local_libraries =
; ("vendor/cmdliner/src", None, false, None)
; ("otherlibs/build-info/src", Some "Build_info", false,
Some "Build_info_data")
; ("src/dune_rpc_client", Some "Dune_rpc_client", false, None)
; ("src/dune_rpc_impl", Some "Dune_rpc_impl", false, None)
]

Expand Down
132 changes: 40 additions & 92 deletions src/csexp_rpc/csexp_rpc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,21 @@ open Stdune
open Fiber.O
module Log = Dune_util.Log

module type Worker = sig
type t

val create : unit -> t Fiber.t

val stop : t -> unit

val task :
t
-> f:(unit -> 'a)
-> ('a, [ `Exn of Exn_with_backtrace.t | `Stopped ]) result Fiber.t
module type Scheduler = sig
val async : (unit -> 'a) -> ('a, Exn_with_backtrace.t) result Fiber.t
end

let worker = Fdecl.create Dyn.opaque

module Worker = struct
type t =
{ stop : unit -> unit
; task :
'a.
(unit -> 'a)
-> ('a, [ `Exn of Exn_with_backtrace.t | `Stopped ]) result Fiber.t
}
let scheduler = Fdecl.create Dyn.opaque

let create () =
let open Fiber.O in
let (module Worker : Worker) = Fdecl.get worker in
let+ w = Worker.create () in
{ stop = (fun () -> Worker.stop w); task = (fun f -> Worker.task w ~f) }
let async f =
let module Scheduler = (val Fdecl.get scheduler : Scheduler) in
Scheduler.async f

let stop t = t.stop ()

let task t ~f = t.task f

let task_exn t ~f =
let+ res = task t ~f in
match res with
| Error `Stopped -> assert false
| Error (`Exn e) -> Exn_with_backtrace.reraise e
| Ok s -> s
end
let async_exn f =
let+ res = async f in
match res with
| Ok s -> s
| Error e -> Exn_with_backtrace.reraise e

module Session_id = Id.Make ()

Expand Down Expand Up @@ -136,8 +110,6 @@ module Session = struct
event based IO, we won't need this mutex anymore *)
write_mutex : Mutex.t
; in_channel : in_channel
; writer : Worker.t
; reader : Worker.t
}

type t =
Expand All @@ -149,16 +121,12 @@ module Session = struct
let id = Id.gen () in
if debug then
Log.info [ Pp.textf "RPC created new session %d" (Id.to_int id) ];
let* reader = Worker.create () in
let+ writer = Worker.create () in
let state =
Open
{ fd
; in_channel
; out_buf = Io_buffer.create ~size:8192
; write_mutex = Mutex.create ()
; writer
; reader
}
in
{ id; state }
Expand All @@ -174,10 +142,10 @@ module Session = struct
let close t =
match t.state with
| Closed -> ()
| Open { write_mutex = _; fd = _; in_channel; out_buf = _; reader; writer }
->
Worker.stop reader;
Worker.stop writer;
| Open { write_mutex = _; fd = _; in_channel; out_buf = _ } ->
(* with a socket, there's only one fd. We make sure to close it only once.
with dune rpc init, we have two separate fd's (stdin/stdout) so we must
close both. *)
close_in_noerr in_channel;
t.state <- Closed

Expand All @@ -194,7 +162,7 @@ module Session = struct
| Closed ->
debug None;
Fiber.return None
| Open { reader; in_channel; _ } ->
| Open { in_channel; _ } ->
let rec read () =
match Csexp.input_opt in_channel with
| exception Unix.Unix_error (_, _, _) -> None
Expand All @@ -204,13 +172,12 @@ module Session = struct
| Ok (Some csexp) -> Some csexp
| Error _ -> None
in
let+ res = Worker.task reader ~f:read in
let+ res = async read in
let res =
match res with
| Error (`Exn _) ->
| Error _ ->
close t;
None
| Error `Stopped -> None
| Ok None ->
close t;
None
Expand Down Expand Up @@ -258,7 +225,7 @@ module Session = struct
| Some sexps ->
Code_error.raise "attempting to write to a closed channel"
[ ("sexp", Dyn.(list Sexp.to_dyn) sexps) ])
| Open { writer; fd; out_buf; write_mutex; _ } -> (
| Open { fd; out_buf; write_mutex; _ } -> (
match sexps with
| None ->
(try
Expand All @@ -273,13 +240,11 @@ module Session = struct
Io_buffer.write_csexps out_buf sexps;
let flush_token = Io_buffer.flush_token out_buf in
Mutex.unlock write_mutex;
Worker.task writer ~f:(fun () ->
csexp_write_loop fd out_buf flush_token write_mutex)
async (fun () -> csexp_write_loop fd out_buf flush_token write_mutex)
in
match res with
| Ok () -> ()
| Error `Stopped -> assert false
| Error (`Exn e) ->
| Error e ->
close t;
Exn_with_backtrace.reraise e))
end
Expand Down Expand Up @@ -353,45 +318,40 @@ module Server = struct
let ready t = Fiber.Ivar.read t.ready

let serve (t : t) =
let* async = Worker.create () in
match t.state with
| `Closed -> Code_error.raise "already closed" []
| `Running _ -> Code_error.raise "already running" []
| `Init fd ->
let* transport =
Worker.task_exn async ~f:(fun () ->
Transport.create fd t.sockaddr ~backlog:t.backlog)
async_exn (fun () -> Transport.create fd t.sockaddr ~backlog:t.backlog)
in
t.state <- `Running transport;
let+ () = Fiber.Ivar.fill t.ready () in
let accept () =
Worker.task async ~f:(fun () ->
async (fun () ->
Transport.accept transport
|> Option.map ~f:(fun client ->
let in_ = Unix.in_channel_of_descr client in
(client, in_)))
in
let loop () =
let* accept = accept () in
let+ accept = accept () in
match accept with
| Error `Stopped ->
Log.info [ Pp.text "RPC stopped accepting." ];
Fiber.return None
| Error (`Exn exn) ->
| Error exn ->
Log.info
[ Pp.text "RPC accept failed. Server will not accept new clients"
; Exn_with_backtrace.pp exn
];
Fiber.return None
None
| Ok None ->
Log.info
[ Pp.text
"RPC accepted the last client. No more clients will be \
accepted."
];
Fiber.return None
None
| Ok (Some (fd, in_)) ->
let+ session = Session.create fd in_ in
let session = Session.create fd in_ in
Some session
in
Fiber.Stream.In.create loop
Expand Down Expand Up @@ -435,35 +395,23 @@ module Client = struct

type t =
{ mutable transport : Transport.t option
; mutable async : Worker.t option
; sockaddr : Unix.sockaddr
}

let create sockaddr =
let+ async = Worker.create () in
{ sockaddr; async = Some async; transport = None }
let create sockaddr = { sockaddr; transport = None }

let connect t =
match t.async with
| None ->
Code_error.raise "connection already established with the client" []
| Some async -> (
t.async <- None;
let* task =
Worker.task async ~f:(fun () ->
let transport = Transport.create t.sockaddr in
t.transport <- Some transport;
let client = Transport.connect transport in
let in_ = Unix.in_channel_of_descr client in
(client, in_))
in
Worker.stop async;
match task with
| Error `Stopped -> assert false
| Error (`Exn exn) -> Fiber.return (Error exn)
| Ok (in_, out) ->
let+ res = Session.create in_ out in
Ok res)
let+ task =
async (fun () ->
let transport = Transport.create t.sockaddr in
t.transport <- Some transport;
let client = Transport.connect transport in
let in_ = Unix.in_channel_of_descr client in
(client, in_))
in
match task with
| Error exn -> Error exn
| Ok (fd, in_) -> Ok (Session.create fd in_)

let connect_exn t =
let+ res = connect t in
Expand Down
26 changes: 5 additions & 21 deletions src/csexp_rpc/csexp_rpc.mli
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,14 @@

open Stdune

module type Worker = sig
(** [Csexp_rpc] relies on background threads for all blocking operations.
module type Scheduler = sig
(** [async f] enqueue task [f] *)

This is the signature for all the operation it needs for such background
threads. *)

(** A worker doing tasks in a background thread *)
type t

(** Create a worker doing tasks in a new background thread *)
val create : unit -> t Fiber.t

(** Stop the worker. Tasks that aren't yet started will not be completed. *)
val stop : t -> unit

(** [task t ~f] enqueue task [f] for worker [t] *)
val task :
t
-> f:(unit -> 'a)
-> ('a, [ `Exn of Exn_with_backtrace.t | `Stopped ]) result Fiber.t
val async : (unit -> 'a) -> ('a, Exn_with_backtrace.t) result Fiber.t
end

(** Hack until we move [Dune_engine.Scheduler] into own library *)
val worker : (module Worker) Fdecl.t
val scheduler : (module Scheduler) Fdecl.t

module Session : sig
(** Rpc session backed by two threads. One thread for reading, and another for
Expand All @@ -58,7 +42,7 @@ module Client : sig
(** RPC Client *)
type t

val create : Unix.sockaddr -> t Fiber.t
val create : Unix.sockaddr -> t

val stop : t -> unit

Expand Down
3 changes: 2 additions & 1 deletion src/dune_engine/dune
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
csexp_rpc
dune_rpc_private
dune_rpc_server
thread_worker
dune_rpc_client
dune_thread_pool
spawn
ocamlc_loc
dune_file_watcher
Expand Down
Loading

0 comments on commit 8bea7ea

Please sign in to comment.