Skip to content

Commit c711653

Browse files
committed
Integrate MPMC unbounded queue with existing benchmarks
1 parent 61f75a5 commit c711653

File tree

6 files changed

+181
-137
lines changed

6 files changed

+181
-137
lines changed

bench/bench_spsc_queue.ml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,5 @@ module Mpmc_queue = Bench (struct
5858
let make () = make ()
5959
let name = "mpmc-queue"
6060
end)
61+
62+
let bench = [ Spsc_queue.bench ; Mpmc_queue.bench ]

bench/bench_spsc_queue.mli

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1 @@
1-
module Spsc_queue : sig
2-
val bench : unit -> Benchmark_result.t
3-
end
4-
5-
module Mpmc_queue : sig
6-
val bench : unit -> Benchmark_result.t
7-
end
1+
val bench : (unit -> Benchmark_result.t) list

bench/main.ml

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,32 +8,24 @@ let backoff_benchmarks =
88
]
99

1010
let benchmark_list =
11-
[
12-
Bench_spsc_queue.Spsc_queue.bench;
13-
Bench_spsc_queue.Mpmc_queue.bench;
14-
Mpmc_queue.bench ~takers:4 ~pushers:4;
15-
Mpmc_queue.bench ~takers:1 ~pushers:8;
16-
Mpmc_queue.bench ~takers:8 ~pushers:1;
17-
Mpmc_queue.bench ~use_cas:true ~takers:4 ~pushers:4;
18-
Mpmc_queue.bench ~use_cas:true ~takers:1 ~pushers:8;
19-
Mpmc_queue.bench ~use_cas:true ~takers:8 ~pushers:1;
20-
]
11+
Bench_spsc_queue.bench
12+
@ Mpmc_queue.bench
2113
@ backoff_benchmarks
2214

2315
let () =
24-
let results =
25-
(* todo: should assert no stranded domains between tests. *)
26-
List.map (fun f -> f ()) benchmark_list
27-
|> List.map Benchmark_result.to_json
28-
|> String.concat ", "
29-
in
30-
let output =
31-
Printf.sprintf {| {"name": "lockfree", "results": [%s]}|} results
32-
(* Cannot use Yojson rewriters as of today none works on OCaml 5.1.0.
33-
This at least verifies that the manually crafted JSON is well-formed.
16+
List.iter
17+
(fun f ->
18+
(* todo: should assert no stranded domains between tests. *)
19+
let r = f () in
20+
let r = Benchmark_result.to_json r in
21+
let output =
22+
Printf.sprintf {| {"name": "lockfree", "results": [%s]}|} r
23+
(* Cannot use Yojson rewriters as of today none works on OCaml 5.1.0.
24+
This at least verifies that the manually crafted JSON is well-formed.
3425
35-
If the type grow, we could switch to running ppx manually on 5.0.0 and
36-
pasting in its output. *)
37-
|> Yojson.Basic.prettify
38-
in
39-
Printf.printf "%s" output
26+
If the type grow, we could switch to running ppx manually on 5.0.0 and
27+
pasting in its output. *)
28+
|> Yojson.Basic.prettify
29+
in
30+
Printf.printf "%s\n%!" output)
31+
benchmark_list

bench/mpmc_queue.ml

Lines changed: 140 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,99 +1,146 @@
1-
open Lockfree.Mpmc_relaxed_queue
1+
module type QUEUE = sig
2+
type 'a t
3+
val make : unit -> 'a t
4+
val push : 'a t -> 'a -> unit
5+
val pop : 'a t -> 'a option
6+
val name : string
7+
end
28

3-
let num_of_elements = ref 500_000
4-
let num_of_pushers = ref 4
5-
let num_of_takers = ref 4
6-
let num_of_iterations = ref 10
7-
let use_cas_intf = ref false
8-
let pop = ref Not_lockfree.pop
9-
let push = ref Not_lockfree.push
9+
module Bench (Q : QUEUE) = struct
1010

11-
let taker queue num_of_elements () =
12-
let i = ref 0 in
13-
while !i < num_of_elements do
14-
if Option.is_some (!pop queue) then i := !i + 1
15-
done
11+
let num_of_elements = ref 500_000
12+
let num_of_pushers = ref 4
13+
let num_of_takers = ref 4
14+
let num_of_iterations = ref 10
1615

17-
let pusher queue num_of_elements () =
18-
let i = ref 0 in
19-
while !i < num_of_elements do
20-
if !push queue !i then i := !i + 1
21-
done
16+
let taker queue num_of_elements () =
17+
let i = ref 0 in
18+
while !i < num_of_elements do
19+
if Option.is_some (Q.pop queue) then i := !i + 1
20+
done
2221

23-
let create_output ~time_median ~throughput_median ~throughput_stddev =
24-
let time =
25-
({
26-
name = "time";
27-
value = `Numeric time_median;
28-
units = "s";
29-
description = "median time";
30-
}
31-
: Benchmark_result.Metric.t)
32-
in
33-
let throughput =
34-
({
35-
name = "throughput";
36-
value = `Numeric throughput_median;
37-
units = "item/s";
38-
description = "median throughput";
39-
}
40-
: Benchmark_result.Metric.t)
41-
in
42-
let throughput_stddev =
43-
({
44-
name = "throughput-stddev";
45-
value = `Numeric throughput_stddev;
46-
units = "item/s";
47-
description = "stddev throughput";
48-
}
49-
: Benchmark_result.Metric.t)
50-
in
51-
let metrics = [ time; throughput; throughput_stddev ] in
52-
let name =
53-
Printf.sprintf "mpmc-queue-pushers:%d,takers:%d,use-cas:%b" !num_of_pushers
54-
!num_of_takers !use_cas_intf
55-
in
56-
({ name; metrics } : Benchmark_result.t)
22+
let pusher queue num_of_elements () =
23+
for i = 0 to num_of_elements - 1 do
24+
Q.push queue i
25+
done
5726

58-
let run_bench () =
59-
if !use_cas_intf then (
60-
push := Lockfree.Mpmc_relaxed_queue.Not_lockfree.CAS_interface.push;
61-
pop := Lockfree.Mpmc_relaxed_queue.Not_lockfree.CAS_interface.pop);
62-
let queue = create ~size_exponent:10 () in
63-
let orchestrator =
64-
Orchestrator.init
65-
~total_domains:(!num_of_takers + !num_of_pushers)
66-
~rounds:!num_of_iterations
67-
in
68-
(* define function to start domains *)
69-
let start_n_domains n f =
70-
assert (!num_of_elements mod n == 0);
71-
let items_per_pusher = !num_of_elements / n in
72-
List.init n (fun _ ->
73-
Domain.spawn (fun () ->
74-
Orchestrator.worker orchestrator (f queue items_per_pusher)))
75-
in
76-
(* start domains *)
77-
let domains =
78-
let takers = start_n_domains !num_of_takers taker in
79-
let pushers = start_n_domains !num_of_pushers pusher in
80-
Sys.opaque_identity (pushers @ takers)
81-
in
82-
(* run test *)
83-
let times = Orchestrator.run orchestrator in
84-
List.iter Domain.join domains;
85-
let time_median = Stats.median times in
86-
let throughputs =
87-
List.map (fun time -> Int.to_float !num_of_elements /. time) times
88-
in
89-
let throughput_median = Stats.median throughputs in
90-
let throughput_stddev = Stats.stddev throughputs in
91-
create_output ~time_median ~throughput_median ~throughput_stddev
27+
let create_output ~time_median ~throughput_median ~throughput_stddev =
28+
let time =
29+
({
30+
name = "time";
31+
value = `Numeric time_median;
32+
units = "s";
33+
description = "median time";
34+
}
35+
: Benchmark_result.Metric.t)
36+
in
37+
let throughput =
38+
({
39+
name = "throughput";
40+
value = `Numeric throughput_median;
41+
units = "item/s";
42+
description = "median throughput";
43+
}
44+
: Benchmark_result.Metric.t)
45+
in
46+
let throughput_stddev =
47+
({
48+
name = "throughput-stddev";
49+
value = `Numeric throughput_stddev;
50+
units = "item/s";
51+
description = "stddev throughput";
52+
}
53+
: Benchmark_result.Metric.t)
54+
in
55+
let metrics = [ time; throughput; throughput_stddev ] in
56+
let name =
57+
Printf.sprintf "%s-pushers:%d,takers:%d"
58+
Q.name
59+
!num_of_pushers !num_of_takers
60+
in
61+
({ name; metrics } : Benchmark_result.t)
9262

93-
let bench ?takers ?pushers ?use_cas ?iterations ?elements () =
94-
num_of_takers := Option.value takers ~default:!num_of_takers;
95-
num_of_pushers := Option.value pushers ~default:!num_of_pushers;
96-
use_cas_intf := Option.value use_cas ~default:!use_cas_intf;
97-
num_of_iterations := Option.value iterations ~default:!num_of_iterations;
98-
num_of_elements := Option.value elements ~default:!num_of_elements;
99-
run_bench ()
63+
let run_bench () =
64+
let queue = Q.make () in
65+
let orchestrator =
66+
Orchestrator.init
67+
~total_domains:(!num_of_takers + !num_of_pushers)
68+
~rounds:!num_of_iterations
69+
in
70+
(* define function to start domains *)
71+
let start_n_domains n f =
72+
assert (!num_of_elements mod n == 0);
73+
let items_per_pusher = !num_of_elements / n in
74+
List.init n (fun _ ->
75+
Domain.spawn (fun () ->
76+
Orchestrator.worker orchestrator (f queue items_per_pusher)))
77+
in
78+
(* start domains *)
79+
let domains =
80+
let takers = start_n_domains !num_of_takers taker in
81+
let pushers = start_n_domains !num_of_pushers pusher in
82+
Sys.opaque_identity (pushers @ takers)
83+
in
84+
(* run test *)
85+
let times = Orchestrator.run orchestrator in
86+
List.iter Domain.join domains;
87+
let time_median = Stats.median times in
88+
let throughputs =
89+
List.map (fun time -> Int.to_float !num_of_elements /. time) times
90+
in
91+
let throughput_median = Stats.median throughputs in
92+
let throughput_stddev = Stats.stddev throughputs in
93+
create_output ~time_median ~throughput_median ~throughput_stddev
94+
95+
let benchmark ?takers ?pushers ?iterations ?elements () =
96+
num_of_takers := Option.value takers ~default:!num_of_takers;
97+
num_of_pushers := Option.value pushers ~default:!num_of_pushers;
98+
num_of_iterations := Option.value iterations ~default:!num_of_iterations;
99+
num_of_elements := Option.value elements ~default:!num_of_elements;
100+
run_bench ()
101+
102+
let bench : (unit -> _) list =
103+
[
104+
benchmark ~takers:4 ~pushers:4;
105+
benchmark ~takers:1 ~pushers:8;
106+
benchmark ~takers:8 ~pushers:1;
107+
]
108+
end
109+
110+
module Relaxed = Bench (struct
111+
let name = "mpmc-relaxed"
112+
module Q = Lockfree.Mpmc_relaxed_queue
113+
include Q.Not_lockfree
114+
type 'a t = 'a Q.t
115+
let make () = Q.create ~size_exponent:10 ()
116+
let rec push t x =
117+
if not (Q.Not_lockfree.push t x)
118+
then push t x
119+
end)
120+
121+
module Relaxed_cas = Bench (struct
122+
let name = "mpmc-relaxed-cas"
123+
module Q = Lockfree.Mpmc_relaxed_queue
124+
include Q.Not_lockfree.CAS_interface
125+
type 'a t = 'a Q.t
126+
let make () = Q.create ~size_exponent:10 ()
127+
let rec push t x =
128+
if not (Q.Not_lockfree.CAS_interface.push t x)
129+
then push t x
130+
end)
131+
132+
module Unbounded = Bench (struct
133+
let name = "mpmc-unbounded"
134+
include Lockfree.Mpmc_queue
135+
let make () = make ()
136+
end)
137+
138+
let bench = Relaxed.bench @ Relaxed_cas.bench @ Unbounded.bench
139+
140+
let benchmark ~takers ~pushers ~impl ~iterations ~elements () =
141+
let impl = match impl with
142+
| `CAS -> Relaxed_cas.benchmark
143+
| `FAD -> Relaxed.benchmark
144+
| `Unbounded -> Unbounded.benchmark
145+
in
146+
impl ~takers ~pushers ~iterations ~elements ()

bench/mpmc_queue.mli

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1-
val bench :
2-
?takers:int ->
3-
?pushers:int ->
4-
?use_cas:bool ->
5-
?iterations:int ->
6-
?elements:int ->
1+
val bench : (unit -> Benchmark_result.t) list
2+
3+
val benchmark :
4+
takers:int ->
5+
pushers:int ->
6+
impl:[ `CAS | `FAD | `Unbounded ] ->
7+
iterations:int ->
8+
elements:int ->
79
unit ->
810
Benchmark_result.t
11+

bench/mpmc_queue_cmd.ml

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,30 @@ let elements = ref 100_000
22
let pushers = ref 4
33
let takers = ref 4
44
let iterations = ref 10
5-
let use_cas = ref false
5+
let impl = ref `FAD
6+
7+
let use_impl = function
8+
| "CAS" -> impl := `CAS
9+
| "FAD" -> impl := `FAD
10+
| "UNBOUNDED" -> impl := `Unbounded
11+
| str -> Printf.ksprintf failwith "-impl expected CAS or FAD or UNBOUNDED, got %S" str
612

713
let speclist =
814
[
915
("-items", Arg.Set_int elements, "number of items to insert and remove");
1016
("-pushers", Arg.Set_int pushers, "number of domains pushing items");
1117
("-takers", Arg.Set_int takers, "number of domains taking times");
1218
("-iterations", Arg.Set_int iterations, "run the benchmark this many times");
13-
("-use-cas", Arg.Set use_cas, "use CAS instead of FAD");
19+
("-impl", Arg.String use_impl, "queue implementation to use: CAS or FAD or UNBOUNDED");
1420
]
1521

16-
let _f () =
22+
let () =
1723
Arg.parse speclist
1824
(fun _ -> ())
1925
"mpmc_queue.exe [-items INT] [-pushers INT] [-takers INT] [-iterations \
20-
INT] [-use-cas]";
26+
INT] [-impl CAS|FAD|UNBOUNDED]";
2127
let result =
22-
Mpmc_queue.bench ~takers:!takers ~pushers:!pushers ~use_cas:!use_cas
28+
Mpmc_queue.benchmark ~takers:!takers ~pushers:!pushers ~impl:!impl
2329
~iterations:!iterations ~elements:!elements ()
2430
in
25-
Benchmark_result.to_json result |> Yojson.Basic.prettify |> print_string
31+
Benchmark_result.to_json result |> Yojson.Basic.prettify |> print_endline

0 commit comments

Comments
 (0)