Skip to content

Commit b192ffe

Browse files
committed
Optimize MPMC unbounded queue with status bits
1 parent d94fb3d commit b192ffe

File tree

3 files changed

+93
-125
lines changed

3 files changed

+93
-125
lines changed

bench/mpmc_queue.ml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ module Bench (Q : QUEUE) = struct
1111
let num_of_elements = ref 2_100_000
1212
let num_of_pushers = ref 4
1313
let num_of_takers = ref 4
14-
let num_of_iterations = ref 10
14+
let num_of_iterations = ref 20
1515

1616
let taker queue num_of_elements () =
1717
let i = ref 0 in
@@ -100,9 +100,10 @@ module Bench (Q : QUEUE) = struct
100100

101101
let bench : (unit -> _) list =
102102
[
103+
benchmark ~takers:1 ~pushers:1;
103104
benchmark ~takers:4 ~pushers:4;
104-
benchmark ~takers:1 ~pushers:8;
105-
benchmark ~takers:8 ~pushers:1;
105+
benchmark ~takers:1 ~pushers:7;
106+
benchmark ~takers:7 ~pushers:1;
106107
]
107108
end
108109

src/mpmc_queue.ml

Lines changed: 81 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -1,145 +1,110 @@
1-
let default_capacity = 4096
2-
let spinlock_iterations = 16
3-
4-
type 'a cell =
5-
| Empty
6-
| Tombstone
7-
| Value of 'a
8-
9-
type 'a s =
10-
{ buffer : 'a cell Atomic.t array
11-
; head : int Atomic.t
12-
; tail : int Atomic.t
13-
; rest : 'a s option Atomic.t
1+
module Array = struct
2+
include Array
3+
4+
let get = unsafe_get
5+
let set = unsafe_set
6+
end
7+
8+
let default_capacity = 512
9+
10+
type 'a s = {
11+
status : int Atomic.t array;
12+
buffer : 'a array;
13+
head : int Atomic.t;
14+
tail : int Atomic.t;
15+
rest : 'a s option Atomic.t;
16+
}
17+
18+
type 'a t = { first : 'a s Atomic.t; last : 'a s Atomic.t; dummy : 'a }
19+
20+
let pack_size = Sys.int_size / 2
21+
22+
let make_s ~capacity ~dummy =
23+
{
24+
head = Atomic.make 0;
25+
tail = Atomic.make (-1);
26+
buffer = Array.make capacity dummy;
27+
status = Array.init (1 + (capacity / pack_size)) (fun _ -> Atomic.make 0);
28+
rest = Atomic.make None;
1429
}
1530

16-
type 'a t =
17-
{ first : 'a s Atomic.t
18-
; last : 'a s Atomic.t
19-
}
20-
21-
let make_s ~capacity () =
22-
{ head = Atomic.make 0
23-
; tail = Atomic.make (-1)
24-
; buffer = Array.init capacity (fun _ -> Atomic.make Empty)
25-
; rest = Atomic.make None
26-
}
27-
28-
let make ?(capacity = default_capacity) () =
29-
let s = make_s ~capacity () in
30-
{ first = Atomic.make s
31-
; last = Atomic.make s
32-
}
31+
let make ?(capacity = default_capacity) ~dummy () =
32+
let s = make_s ~capacity ~dummy in
33+
{ first = Atomic.make s; last = Atomic.make s; dummy }
3334

3435
let rec gift_rest t some_s =
35-
if Atomic.compare_and_set t.rest None some_s
36-
then ()
37-
else follow_rest t some_s
36+
if not (Atomic.compare_and_set t.rest None some_s) then follow_rest t some_s
3837

3938
and follow_rest t some_s =
4039
match Atomic.get t.rest with
4140
| None -> gift_rest t some_s
4241
| Some t -> follow_rest t some_s
4342

44-
let force_rest t =
43+
let force_rest ~dummy t =
4544
match Atomic.get t.rest with
4645
| Some s -> s
47-
| None ->
48-
let s = make_s ~capacity:(Array.length t.buffer) () in
46+
| None -> (
47+
let s = make_s ~capacity:(Array.length t.buffer) ~dummy in
4948
let some_s = Some s in
50-
if Atomic.compare_and_set t.rest None some_s
51-
then s
52-
else match Atomic.get t.rest with
53-
| None -> assert false
54-
| Some rest ->
55-
gift_rest rest some_s ;
56-
rest
57-
58-
let rec push_s t x =
49+
if Atomic.compare_and_set t.rest None some_s then s
50+
else
51+
match Atomic.get t.rest with
52+
| None -> assert false
53+
| Some rest ->
54+
gift_rest rest some_s;
55+
rest)
56+
57+
let mark t i =
58+
let status = t.status.(i / pack_size) in
59+
let shift = 2 * (i mod pack_size) in
60+
let status = Atomic.fetch_and_add status (1 lsl shift) in
61+
(status lsr shift) land 1 = 0
62+
63+
let rec push_s ~dummy t x =
5964
let i = Atomic.fetch_and_add t.tail 1 in
60-
if i < 0
61-
then (let _ = force_rest t in push_s t x)
62-
else if i >= Array.length t.buffer
63-
then false
64-
else begin
65-
let cell = Array.unsafe_get t.buffer i in
66-
match Atomic.get cell with
67-
| Empty ->
68-
if Atomic.compare_and_set cell Empty (Value x)
69-
then true
70-
else begin
71-
assert (Atomic.get cell = Tombstone) ;
72-
push_s t x
73-
end
74-
| Tombstone ->
75-
push_s t x
76-
| Value _ -> assert false
77-
end
78-
79-
let rec push t x =
80-
let last = Atomic.get t.last in
81-
if push_s last x
82-
then ()
83-
else begin
84-
let rest = force_rest last in
85-
let _ : bool = Atomic.compare_and_set t.last last rest in
65+
if i < 0 then
66+
let _ = force_rest ~dummy t in
67+
push_s ~dummy t x
68+
else if i >= Array.length t.buffer then false
69+
else (
70+
t.buffer.(i) <- x;
71+
if mark t i then true
72+
else (
73+
t.buffer.(i) <- dummy;
74+
let hd = Atomic.get t.head in
75+
let (_ : bool) = Atomic.compare_and_set t.tail (i + 1) (hd + 1) in
76+
push_s ~dummy t x))
77+
78+
let rec push ({ last; dummy; _ } as t) x =
79+
let last_s = Atomic.get last in
80+
if not (push_s ~dummy last_s x) then
81+
let rest = force_rest ~dummy last_s in
82+
let (_ : bool) = Atomic.compare_and_set last last_s rest in
8683
push t x
87-
end
88-
8984

90-
type 'a pop_result =
91-
| Is_empty
92-
| Wait_for_it
93-
| Pop of 'a
85+
type 'a pop_result = Is_empty | Wait_for_it | Pop of 'a
9486

95-
let rec pop_s t =
87+
let rec pop_s ~dummy t =
9688
let current_head = Atomic.get t.head in
97-
if current_head >= Array.length t.buffer
98-
then Is_empty
99-
else if current_head >= Atomic.get t.tail
100-
then Wait_for_it
89+
if current_head >= Array.length t.buffer then Is_empty
90+
else if current_head >= Atomic.get t.tail then Wait_for_it
10191
else
10292
let i = Atomic.fetch_and_add t.head 1 in
103-
if i >= Array.length t.buffer
104-
then Is_empty
93+
if i >= Array.length t.buffer then Is_empty
94+
else if mark t i then pop_s ~dummy t
10595
else
106-
let cell = Array.unsafe_get t.buffer i in
107-
if i >= Atomic.get t.tail
108-
then tombstone t cell
109-
else spinlock ~retry:spinlock_iterations t cell
110-
111-
and tombstone t cell =
112-
if Atomic.compare_and_set cell Empty Tombstone
113-
then pop_s t
114-
else begin match Atomic.get cell with
115-
| (Value v) as old ->
116-
assert (Atomic.compare_and_set cell old Tombstone) ;
117-
Pop v
118-
| _ -> assert false
119-
end
120-
121-
and spinlock ~retry t cell =
122-
match Atomic.get cell with
123-
| (Value v) as old ->
124-
assert (Atomic.compare_and_set cell old Tombstone) ;
96+
let v = t.buffer.(i) in
97+
t.buffer.(i) <- dummy;
12598
Pop v
126-
| Empty when retry <= 0 ->
127-
tombstone t cell
128-
| Empty ->
129-
Domain.cpu_relax () ;
130-
spinlock ~retry:(retry - 1) t cell
131-
| Tombstone ->
132-
assert false
13399

134100
let rec pop t =
135101
let first = Atomic.get t.first in
136-
match pop_s first with
102+
match pop_s ~dummy:t.dummy first with
137103
| Pop v -> Some v
138104
| Wait_for_it -> None
139-
| Is_empty ->
140-
begin match Atomic.get first.rest with
105+
| Is_empty -> (
106+
match Atomic.get first.rest with
141107
| None -> None
142108
| Some rest ->
143-
let _ : bool = Atomic.compare_and_set t.first first rest in
144-
pop t
145-
end
109+
let (_ : bool) = Atomic.compare_and_set t.first first rest in
110+
pop t)

src/mpmc_queue.mli

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
type 'a t
44
(** A queue of items of type ['a]. *)
55

6-
val make : ?capacity:int -> unit -> 'a t
7-
(** [make ()] creates a new empty queue.
6+
val make : ?capacity:int -> dummy:'a -> unit -> 'a t
7+
(** [make ~dummy ()] creates a new empty queue.
88
9-
The optional parameter [?capacity] defaults to 4096 and is used to size the
10-
internal buffers of the queue: Choosing a small number lower the pause
11-
durations caused by allocations, but a larger capacity provides overall
12-
faster operations. *)
9+
- The [dummy] element is a placeholder for ['a] values.
10+
- The optional parameter [?capacity] defaults to 512 and is used to size the
11+
internal buffers of the queue: Choosing a small number lower the pause
12+
durations caused by allocations, but a larger capacity can provide overall
13+
faster operations.
14+
*)
1315

1416
val push : 'a t -> 'a -> unit
1517
(** [push t x] adds [x] to the tail of the queue. *)

0 commit comments

Comments
 (0)