Skip to content

Commit

Permalink
feature: add a thread pool
Browse files Browse the repository at this point in the history
replace the current implementation with an unbounded number of workers

Signed-off-by: Rudi Grinberg <me@rgrinberg.com>

<!-- ps-id: f6f8a745-34f1-42fd-bf92-7ca2b6bde297 -->
  • Loading branch information
rgrinberg committed Mar 2, 2023
1 parent e909c5a commit cfa1339
Show file tree
Hide file tree
Showing 18 changed files with 212 additions and 270 deletions.
14 changes: 14 additions & 0 deletions bench/micro/dune
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,17 @@
(allow_overlapping_dependencies)
(modules memo_bench_main)
(libraries memo_bench core_bench.inline_benchmarks))

(library
(name thread_pool_bench)
(modules thread_pool_bench)
(library_flags -linkall)
(preprocess
(pps ppx_bench))
(libraries dune_thread_pool threads.posix core_bench.inline_benchmarks))

(executable
(name thread_pool_bench_main)
(allow_overlapping_dependencies)
(modules thread_pool_bench_main)
(libraries thread_pool_bench core_bench.inline_benchmarks))
1 change: 1 addition & 0 deletions bench/micro/runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ case "$1" in
"dune" ) test="dune_bench"; main="main";;
"fiber" ) test="fiber_bench"; main="fiber_bench_main";;
"memo" ) test="memo_bench"; main="memo_bench_main";;
"thread_pool" ) test="thread_pool"; main="thread_pool_bench_main";;
esac
shift;
export BENCH_LIB="$test"
Expand Down
30 changes: 30 additions & 0 deletions bench/micro/thread_pool_bench.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
open Dune_thread_pool

let spawn_thread f = ignore (Thread.create f ())

let%bench "almost no-op" =
let tp = Thread_pool.create ~min_workers:10 ~max_workers:50 ~spawn_thread in
let tasks = 50_000 in
let counter = Atomic.make tasks in
let f () = Atomic.decr counter in
for _ = 0 to tasks - 1 do
Thread_pool.task tp ~f
done;
while Atomic.get counter > 0 do
Thread.yield ()
done

let%bench "syscall" =
let tp = Thread_pool.create ~min_workers:10 ~max_workers:50 ~spawn_thread in
let tasks = 50_000 in
let counter = Atomic.make tasks in
let f () =
Unix.sleepf 0.0;
Atomic.decr counter
in
for _ = 0 to tasks - 1 do
Thread_pool.task tp ~f
done;
while Atomic.get counter > 0 do
Thread.yield ()
done
1 change: 1 addition & 0 deletions bench/micro/thread_pool_bench_main.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Inline_benchmarks_public.Runner.main ~libname:"thread_pool_bench"
4 changes: 2 additions & 2 deletions boot/libs.ml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ let local_libraries =
; ("src/meta_parser", Some "Dune_meta_parser", false, None)
; ("src/csexp_rpc", Some "Csexp_rpc", false, None)
; ("src/dune_rpc_server", Some "Dune_rpc_server", false, None)
; ("src/thread_worker", Some "Thread_worker", false, None)
; ("src/dune_rpc_client", Some "Dune_rpc_client", false, None)
; ("src/thread_pool", Some "Dune_thread_pool", false, None)
; ("otherlibs/ocamlc_loc/src", Some "Ocamlc_loc", false, None)
; ("src/fsevents", Some "Fsevents", false, None)
; ("vendor/ocaml-inotify/src", Some "Ocaml_inotify", false, None)
Expand All @@ -58,7 +59,6 @@ let local_libraries =
; ("vendor/cmdliner/src", None, false, None)
; ("otherlibs/build-info/src", Some "Build_info", false,
Some "Build_info_data")
; ("src/dune_rpc_client", Some "Dune_rpc_client", false, None)
; ("src/dune_rpc_impl", Some "Dune_rpc_impl", false, None)
]

Expand Down
129 changes: 39 additions & 90 deletions src/csexp_rpc/csexp_rpc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,21 @@ open Stdune
open Fiber.O
module Log = Dune_util.Log

module type Worker = sig
type t

val create : unit -> t Fiber.t

val stop : t -> unit

val task :
t
-> f:(unit -> 'a)
-> ('a, [ `Exn of Exn_with_backtrace.t | `Stopped ]) result Fiber.t
module type Scheduler = sig
val async : (unit -> 'a) -> ('a, Exn_with_backtrace.t) result Fiber.t
end

let worker = Fdecl.create Dyn.opaque

module Worker = struct
type t =
{ stop : unit -> unit
; task :
'a.
(unit -> 'a)
-> ('a, [ `Exn of Exn_with_backtrace.t | `Stopped ]) result Fiber.t
}
let scheduler = Fdecl.create Dyn.opaque

let create () =
let open Fiber.O in
let (module Worker : Worker) = Fdecl.get worker in
let+ w = Worker.create () in
{ stop = (fun () -> Worker.stop w); task = (fun f -> Worker.task w ~f) }
let async f =
let module Scheduler = (val Fdecl.get scheduler : Scheduler) in
Scheduler.async f

let stop t = t.stop ()

let task t ~f = t.task f

let task_exn t ~f =
let+ res = task t ~f in
match res with
| Error `Stopped -> assert false
| Error (`Exn e) -> Exn_with_backtrace.reraise e
| Ok s -> s
end
let async_exn f =
let+ res = async f in
match res with
| Ok s -> s
| Error e -> Exn_with_backtrace.reraise e

module Session_id = Id.Make ()

Expand Down Expand Up @@ -127,8 +101,6 @@ module Session = struct
{ out_channel : out_channel
; in_channel : in_channel
; socket : bool
; writer : Worker.t
; reader : Worker.t
}

type t =
Expand All @@ -140,9 +112,7 @@ module Session = struct
let id = Id.gen () in
if debug then
Log.info [ Pp.textf "RPC created new session %d" (Id.to_int id) ];
let* reader = Worker.create () in
let+ writer = Worker.create () in
let state = Open { in_channel; out_channel; reader; writer; socket } in
let state = Open { in_channel; out_channel; socket } in
{ id; state }

let string_of_packet = function
Expand All @@ -156,9 +126,7 @@ module Session = struct
let close t =
match t.state with
| Closed -> ()
| Open { in_channel; out_channel; reader; writer; socket } ->
Worker.stop reader;
Worker.stop writer;
| Open { in_channel; out_channel; socket } ->
(* with a socket, there's only one fd. We make sure to close it only once.
with dune rpc init, we have two separate fd's (stdin/stdout) so we must
close both. *)
Expand All @@ -179,7 +147,7 @@ module Session = struct
| Closed ->
debug None;
Fiber.return None
| Open { reader; in_channel; _ } ->
| Open { in_channel; _ } ->
let rec read () =
match Csexp.input_opt in_channel with
| exception Unix.Unix_error (_, _, _) -> None
Expand All @@ -189,13 +157,12 @@ module Session = struct
| Ok (Some csexp) -> Some csexp
| Error _ -> None
in
let+ res = Worker.task reader ~f:read in
let+ res = async read in
let res =
match res with
| Error (`Exn _) ->
| Error _ ->
close t;
None
| Error `Stopped -> None
| Ok None ->
close t;
None
Expand All @@ -219,7 +186,7 @@ module Session = struct
| Some sexps ->
Code_error.raise "attempting to write to a closed channel"
[ ("sexp", Dyn.(list Sexp.to_dyn) sexps) ])
| Open { writer; out_channel; socket; _ } -> (
| Open { out_channel; socket; _ } -> (
match sexps with
| None ->
(if socket then
Expand All @@ -233,14 +200,13 @@ module Session = struct
Fiber.return ()
| Some sexps -> (
let+ res =
Worker.task writer ~f:(fun () ->
async (fun () ->
List.iter sexps ~f:(Csexp.to_channel out_channel);
flush out_channel)
in
match res with
| Ok () -> ()
| Error `Stopped -> assert false
| Error (`Exn e) ->
| Error e ->
close t;
Exn_with_backtrace.reraise e))
end
Expand Down Expand Up @@ -313,46 +279,41 @@ module Server = struct
let ready t = Fiber.Ivar.read t.ready

let serve (t : t) =
let* async = Worker.create () in
match t.state with
| `Closed -> Code_error.raise "already closed" []
| `Running _ -> Code_error.raise "already running" []
| `Init fd ->
let* transport =
Worker.task_exn async ~f:(fun () ->
Transport.create fd t.sockaddr ~backlog:t.backlog)
async_exn (fun () -> Transport.create fd t.sockaddr ~backlog:t.backlog)
in
t.state <- `Running transport;
let+ () = Fiber.Ivar.fill t.ready () in
let accept () =
Worker.task async ~f:(fun () ->
async (fun () ->
Transport.accept transport
|> Option.map ~f:(fun client ->
let in_ = Unix.in_channel_of_descr client in
let out = Unix.out_channel_of_descr client in
(in_, out)))
in
let loop () =
let* accept = accept () in
let+ accept = accept () in
match accept with
| Error `Stopped ->
Log.info [ Pp.text "RPC stopped accepting." ];
Fiber.return None
| Error (`Exn exn) ->
| Error exn ->
Log.info
[ Pp.text "RPC accept failed. Server will not accept new clients"
; Exn_with_backtrace.pp exn
];
Fiber.return None
None
| Ok None ->
Log.info
[ Pp.text
"RPC accepted the last client. No more clients will be \
accepted."
];
Fiber.return None
None
| Ok (Some (in_, out)) ->
let+ session = Session.create ~socket:true in_ out in
let session = Session.create ~socket:true in_ out in
Some session
in
Fiber.Stream.In.create loop
Expand Down Expand Up @@ -396,36 +357,24 @@ module Client = struct

type t =
{ mutable transport : Transport.t option
; mutable async : Worker.t option
; sockaddr : Unix.sockaddr
}

let create sockaddr =
let+ async = Worker.create () in
{ sockaddr; async = Some async; transport = None }
let create sockaddr = { sockaddr; transport = None }

let connect t =
match t.async with
| None ->
Code_error.raise "connection already established with the client" []
| Some async -> (
t.async <- None;
let* task =
Worker.task async ~f:(fun () ->
let transport = Transport.create t.sockaddr in
t.transport <- Some transport;
let client = Transport.connect transport in
let out = Unix.out_channel_of_descr client in
let in_ = Unix.in_channel_of_descr client in
(in_, out))
in
Worker.stop async;
match task with
| Error `Stopped -> assert false
| Error (`Exn exn) -> Fiber.return (Error exn)
| Ok (in_, out) ->
let+ res = Session.create ~socket:true in_ out in
Ok res)
let+ task =
async (fun () ->
let transport = Transport.create t.sockaddr in
t.transport <- Some transport;
let client = Transport.connect transport in
let out = Unix.out_channel_of_descr client in
let in_ = Unix.in_channel_of_descr client in
(in_, out))
in
match task with
| Error exn -> Error exn
| Ok (in_, out) -> Ok (Session.create ~socket:true in_ out)

let connect_exn t =
let+ res = connect t in
Expand Down
28 changes: 6 additions & 22 deletions src/csexp_rpc/csexp_rpc.mli
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,21 @@

open Stdune

module type Worker = sig
(** [Csexp_rpc] relies on background threads for all blocking operations.
module type Scheduler = sig
(** [async f] enqueue task [f] *)

This is the signature for all the operation it needs for such background
threads. *)

(** A worker doing tasks in a background thread *)
type t

(** Create a worker doing tasks in a new background thread *)
val create : unit -> t Fiber.t

(** Stop the worker. Tasks that aren't yet started will not be completed. *)
val stop : t -> unit

(** [task t ~f] enqueue task [f] for worker [t] *)
val task :
t
-> f:(unit -> 'a)
-> ('a, [ `Exn of Exn_with_backtrace.t | `Stopped ]) result Fiber.t
val async : (unit -> 'a) -> ('a, Exn_with_backtrace.t) result Fiber.t
end

(** Hack until we move [Dune_engine.Scheduler] into own library *)
val worker : (module Worker) Fdecl.t
val scheduler : (module Scheduler) Fdecl.t

module Session : sig
(** Rpc session backed by two threads. One thread for reading, and another for
writing *)
type t

val create : socket:bool -> in_channel -> out_channel -> t Fiber.t
val create : socket:bool -> in_channel -> out_channel -> t

(* [write t x] writes the s-expression when [x] is [Some sexp], and closes the
session if [x = None ] *)
Expand All @@ -60,7 +44,7 @@ module Client : sig
(** RPC Client *)
type t

val create : Unix.sockaddr -> t Fiber.t
val create : Unix.sockaddr -> t

val stop : t -> unit

Expand Down
3 changes: 2 additions & 1 deletion src/dune_engine/dune
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
csexp_rpc
dune_rpc_private
dune_rpc_server
thread_worker
dune_rpc_client
dune_thread_pool
spawn
ocamlc_loc
dune_file_watcher
Expand Down
Loading

0 comments on commit cfa1339

Please sign in to comment.