Skip to content

Commit 34eb007

Browse files
committed
Optimize MPMC unbounded queue with status bits
1 parent c711653 commit 34eb007

File tree

5 files changed

+71
-83
lines changed

5 files changed

+71
-83
lines changed

bench/bench_spsc_queue.ml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ let item_count = 2_000_000
22

33
module type QUEUE = sig
44
type 'a t
5-
val make : unit -> 'a t
5+
val make : unit -> int t
66
val push : 'a t -> 'a -> unit
77
val pop : 'a t -> 'a option
88
val name : string
@@ -55,7 +55,7 @@ end)
5555

5656
module Mpmc_queue = Bench (struct
5757
include Lockfree.Mpmc_queue
58-
let make () = make ()
58+
let make () = make ~dummy:(-1) ()
5959
let name = "mpmc-queue"
6060
end)
6161

bench/mpmc_queue.ml

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
module type QUEUE = sig
22
type 'a t
3-
val make : unit -> 'a t
3+
val make : unit -> int t
44
val push : 'a t -> 'a -> unit
55
val pop : 'a t -> 'a option
66
val name : string
77
end
88

99
module Bench (Q : QUEUE) = struct
1010

11-
let num_of_elements = ref 500_000
11+
let num_of_elements = ref 2_100_000
1212
let num_of_pushers = ref 4
1313
let num_of_takers = ref 4
14-
let num_of_iterations = ref 10
14+
let num_of_iterations = ref 20
1515

1616
let taker queue num_of_elements () =
1717
let i = ref 0 in
@@ -101,14 +101,15 @@ module Bench (Q : QUEUE) = struct
101101

102102
let bench : (unit -> _) list =
103103
[
104+
benchmark ~takers:1 ~pushers:1;
104105
benchmark ~takers:4 ~pushers:4;
105-
benchmark ~takers:1 ~pushers:8;
106-
benchmark ~takers:8 ~pushers:1;
106+
benchmark ~takers:1 ~pushers:7;
107+
benchmark ~takers:7 ~pushers:1;
107108
]
108109
end
109110

110111
module Relaxed = Bench (struct
111-
let name = "mpmc-relaxed"
112+
let name = "mpmc-relaxed-fad"
112113
module Q = Lockfree.Mpmc_relaxed_queue
113114
include Q.Not_lockfree
114115
type 'a t = 'a Q.t
@@ -132,7 +133,7 @@ end)
132133
module Unbounded = Bench (struct
133134
let name = "mpmc-unbounded"
134135
include Lockfree.Mpmc_queue
135-
let make () = make ()
136+
let make () = make ~dummy:(-1) ()
136137
end)
137138

138139
let bench = Relaxed.bench @ Relaxed_cas.bench @ Unbounded.bench

src/mpmc_queue.ml

Lines changed: 52 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1-
let default_capacity = 4096
2-
let spinlock_iterations = 16
1+
module Array = struct
2+
include Array
3+
let get = unsafe_get
4+
let set = unsafe_set
5+
end
36

4-
type 'a cell =
5-
| Empty
6-
| Tombstone
7-
| Value of 'a
7+
let default_capacity = 512
88

99
type 'a s =
10-
{ buffer : 'a cell Atomic.t array
10+
{ status : int Atomic.t array
11+
; buffer : 'a array
1112
; head : int Atomic.t
1213
; tail : int Atomic.t
1314
; rest : 'a s option Atomic.t
@@ -16,36 +17,40 @@ type 'a s =
1617
type 'a t =
1718
{ first : 'a s Atomic.t
1819
; last : 'a s Atomic.t
20+
; dummy : 'a
1921
}
2022

21-
let make_s ~capacity () =
23+
let pack_size = Sys.int_size / 2
24+
25+
let make_s ~capacity ~dummy =
2226
{ head = Atomic.make 0
2327
; tail = Atomic.make (-1)
24-
; buffer = Array.init capacity (fun _ -> Atomic.make Empty)
28+
; buffer = Array.make capacity dummy
29+
; status = Array.init (1 + capacity / pack_size) (fun _ -> Atomic.make 0)
2530
; rest = Atomic.make None
2631
}
2732

28-
let make ?(capacity = default_capacity) () =
29-
let s = make_s ~capacity () in
33+
let make ?(capacity = default_capacity) ~dummy () =
34+
let s = make_s ~capacity ~dummy in
3035
{ first = Atomic.make s
3136
; last = Atomic.make s
37+
; dummy
3238
}
3339

3440
let rec gift_rest t some_s =
35-
if Atomic.compare_and_set t.rest None some_s
36-
then ()
37-
else follow_rest t some_s
41+
if not (Atomic.compare_and_set t.rest None some_s)
42+
then follow_rest t some_s
3843

3944
and follow_rest t some_s =
4045
match Atomic.get t.rest with
4146
| None -> gift_rest t some_s
4247
| Some t -> follow_rest t some_s
4348

44-
let force_rest t =
49+
let force_rest ~dummy t =
4550
match Atomic.get t.rest with
4651
| Some s -> s
4752
| None ->
48-
let s = make_s ~capacity:(Array.length t.buffer) () in
53+
let s = make_s ~capacity:(Array.length t.buffer) ~dummy in
4954
let some_s = Some s in
5055
if Atomic.compare_and_set t.rest None some_s
5156
then s
@@ -55,44 +60,45 @@ let force_rest t =
5560
gift_rest rest some_s ;
5661
rest
5762

58-
let rec push_s t x =
63+
let mark t i =
64+
let status = t.status.(i / pack_size) in
65+
let shift = 2 * (i mod pack_size) in
66+
let status = Atomic.fetch_and_add status (1 lsl shift) in
67+
(status lsr shift) land 1 = 0
68+
69+
let rec push_s ~dummy t x =
5970
let i = Atomic.fetch_and_add t.tail 1 in
6071
if i < 0
61-
then (let _ = force_rest t in push_s t x)
72+
then (let _ = force_rest ~dummy t in push_s ~dummy t x)
6273
else if i >= Array.length t.buffer
6374
then false
6475
else begin
65-
let cell = Array.unsafe_get t.buffer i in
66-
match Atomic.get cell with
67-
| Empty ->
68-
if Atomic.compare_and_set cell Empty (Value x)
69-
then true
70-
else begin
71-
assert (Atomic.get cell = Tombstone) ;
72-
push_s t x
73-
end
74-
| Tombstone ->
75-
push_s t x
76-
| Value _ -> assert false
76+
t.buffer.(i) <- x ;
77+
if mark t i
78+
then true
79+
else begin
80+
t.buffer.(i) <- dummy ;
81+
let hd = Atomic.get t.head in
82+
let _ : bool = Atomic.compare_and_set t.tail (i + 1) (hd + 1) in
83+
push_s ~dummy t x
84+
end
7785
end
7886

79-
let rec push t x =
80-
let last = Atomic.get t.last in
81-
if push_s last x
82-
then ()
83-
else begin
84-
let rest = force_rest last in
85-
let _ : bool = Atomic.compare_and_set t.last last rest in
87+
let rec push ({ last ; dummy ; _ } as t) x =
88+
let last_s = Atomic.get last in
89+
if not (push_s ~dummy last_s x)
90+
then begin
91+
let rest = force_rest ~dummy last_s in
92+
let _ : bool = Atomic.compare_and_set last last_s rest in
8693
push t x
8794
end
8895

89-
9096
type 'a pop_result =
9197
| Is_empty
9298
| Wait_for_it
9399
| Pop of 'a
94100

95-
let rec pop_s t =
101+
let rec pop_s ~dummy t =
96102
let current_head = Atomic.get t.head in
97103
if current_head >= Array.length t.buffer
98104
then Is_empty
@@ -102,38 +108,17 @@ let rec pop_s t =
102108
let i = Atomic.fetch_and_add t.head 1 in
103109
if i >= Array.length t.buffer
104110
then Is_empty
105-
else
106-
let cell = Array.unsafe_get t.buffer i in
107-
if i >= Atomic.get t.tail
108-
then tombstone t cell
109-
else spinlock ~retry:spinlock_iterations t cell
110-
111-
and tombstone t cell =
112-
if Atomic.compare_and_set cell Empty Tombstone
113-
then pop_s t
114-
else begin match Atomic.get cell with
115-
| (Value v) as old ->
116-
assert (Atomic.compare_and_set cell old Tombstone) ;
117-
Pop v
118-
| _ -> assert false
119-
end
120-
121-
and spinlock ~retry t cell =
122-
match Atomic.get cell with
123-
| (Value v) as old ->
124-
assert (Atomic.compare_and_set cell old Tombstone) ;
111+
else if mark t i
112+
then pop_s ~dummy t
113+
else begin
114+
let v = t.buffer.(i) in
115+
t.buffer.(i) <- dummy ;
125116
Pop v
126-
| Empty when retry <= 0 ->
127-
tombstone t cell
128-
| Empty ->
129-
Domain.cpu_relax () ;
130-
spinlock ~retry:(retry - 1) t cell
131-
| Tombstone ->
132-
assert false
117+
end
133118

134119
let rec pop t =
135120
let first = Atomic.get t.first in
136-
match pop_s first with
121+
match pop_s ~dummy:t.dummy first with
137122
| Pop v -> Some v
138123
| Wait_for_it -> None
139124
| Is_empty ->

src/mpmc_queue.mli

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
type 'a t
44
(** A queue of items of type ['a]. *)
55

6-
val make : ?capacity:int -> unit -> 'a t
7-
(** [make ()] creates a new empty queue.
6+
val make : ?capacity:int -> dummy:'a -> unit -> 'a t
7+
(** [make ~dummy ()] creates a new empty queue.
88
9-
The optional parameter [?capacity] defaults to 4096 and is used to size the
10-
internal buffers of the queue: Choosing a small number lower the pause
11-
durations caused by allocations, but a larger capacity provides overall
12-
faster operations. *)
9+
- The [dummy] element is a placeholder for ['a] values.
10+
- The optional parameter [?capacity] defaults to 512 and is used to size the
11+
internal buffers of the queue: Choosing a small number lower the pause
12+
durations caused by allocations, but a larger capacity can provide overall
13+
faster operations.
14+
*)
1315

1416
val push : 'a t -> 'a -> unit
1517
(** [push t x] adds [x] to the tail of the queue. *)

test/mpmc_queue/lin_mpmc_queue.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module Q = Lockfree.Mpmc_queue
33
module Test = struct
44
type t = int Q.t
55

6-
let init () = Q.make ~capacity:1 ()
6+
let init () = Q.make ~capacity:1 ~dummy:(-1) ()
77

88
let cleanup _ = ()
99

0 commit comments

Comments
 (0)