Skip to content

Commit 67372e0

Browse files
committed
More fine grained mutex_condvar for channels
1 parent 31ba3b4 commit 67372e0

File tree

1 file changed

+70
-51
lines changed

1 file changed

+70
-51
lines changed

lib/chan.ml

Lines changed: 70 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,46 @@
1+
type mutex_condvar = {
2+
mutex: Domain.Mutex.t;
3+
condition: Domain.Condition.t
4+
}
5+
6+
type waiting_notified =
7+
| Waiting
8+
| Notified
9+
110
type 'a contents =
2-
| Empty of {receivers: ('a option ref * Domain.Condition.t) Fun_queue.t}
3-
| NotEmpty of {senders: ('a * Domain.Condition.t) Fun_queue.t; messages: 'a Fun_queue.t}
11+
| Empty of {receivers: ('a option ref * mutex_condvar) Fun_queue.t}
12+
| NotEmpty of {senders: ('a * waiting_notified ref * mutex_condvar) Fun_queue.t; messages: 'a Fun_queue.t}
413

514
type 'a t = {
6-
mutex: Domain.Mutex.t;
715
buffer_size: int option;
816
contents: 'a contents Atomic.t
917
}
1018

11-
let condition_key : Domain.Condition.t Domain.DLS.key = Domain.DLS.new_key ()
19+
let mutex_condvar_key : mutex_condvar Domain.DLS.key = Domain.DLS.new_key ()
1220

13-
let get_condvar m =
14-
match Domain.DLS.get condition_key with
21+
let get_mutex_condvar () =
22+
match Domain.DLS.get mutex_condvar_key with
1523
| None ->
24+
let m = Domain.Mutex.create () in
1625
let c = Domain.Condition.create m in
17-
Domain.DLS.set condition_key c;
18-
c
19-
| Some c -> c
26+
let mc = {mutex=m; condition=c} in
27+
Domain.DLS.set mutex_condvar_key mc;
28+
mc
29+
| Some mc -> mc
2030

2131
let make_bounded n =
2232
if n < 0 then raise (Invalid_argument "Chan.make_bounded") ;
2333
{buffer_size= Some n;
24-
contents = Atomic.make (Empty {receivers= Fun_queue.empty; });
25-
mutex = Domain.Mutex.create ();}
34+
contents = Atomic.make (Empty {receivers= Fun_queue.empty; })}
2635

2736
let make_unbounded () =
2837
{buffer_size= None;
29-
contents = Atomic.make (Empty {receivers= Fun_queue.empty});
30-
mutex = Domain.Mutex.create ();}
38+
contents = Atomic.make (Empty {receivers= Fun_queue.empty})}
3139

3240
(* [send'] is shared by both the blocking and polling versions. Returns a
3341
* boolean indicating whether the send was successful. Hence, it always returns
3442
* [true] if [polling] is [false]. *)
35-
let send' {mutex; buffer_size; contents} v ~polling =
43+
let send' {buffer_size; contents} v ~polling =
3644
let open Fun_queue in
3745
let rec loop () =
3846
let old_contents = Atomic.get contents in
@@ -48,15 +56,21 @@ let send' {mutex; buffer_size; contents} v ~polling =
4856
begin if not polling then begin
4957
(* The channel is empty (no senders), no waiting receivers,
5058
* buffer size is 0 and we're not polling *)
51-
let condition = get_condvar mutex in
59+
let mc = get_mutex_condvar () in
60+
let cond_slot = ref Waiting in
5261
let new_contents =
5362
NotEmpty
54-
{messages= empty; senders= push empty (v, condition)}
63+
{messages= empty; senders= push empty (v, cond_slot, mc)}
5564
in
56-
Domain.Mutex.lock mutex;
5765
if Atomic.compare_and_set contents old_contents new_contents
58-
then (Domain.Condition.wait condition; true)
59-
else (Domain.Mutex.unlock mutex; loop ())
66+
then begin
67+
Domain.Mutex.lock mc.mutex;
68+
while !cond_slot = Waiting do
69+
Domain.Condition.wait mc.condition
70+
done;
71+
Domain.Mutex.unlock mc.mutex;
72+
true
73+
end else loop ()
6074
end else
6175
(* The channel is empty (no senders), no waiting receivers,
6276
* buffer size is 0 and we're polling *)
@@ -71,18 +85,18 @@ let send' {mutex; buffer_size; contents} v ~polling =
7185
if Atomic.compare_and_set contents old_contents new_contents
7286
then true
7387
else loop ()
74-
| Some ((r, condition), receivers') ->
88+
| Some ((r, mc), receivers') ->
7589
(* The channel is empty (no senders) and there are waiting receivers
7690
* *)
7791
let new_contents = Empty {receivers= receivers'} in
78-
Domain.Mutex.lock mutex;
7992
if Atomic.compare_and_set contents old_contents new_contents
8093
then begin
94+
Domain.Mutex.lock mc.mutex;
8195
r := Some v;
82-
Domain.Condition.signal condition;
83-
Domain.Mutex.unlock mutex;
96+
Domain.Condition.signal mc.condition;
97+
Domain.Mutex.unlock mc.mutex;
8498
true
85-
end else (Domain.Mutex.unlock mutex; loop ())
99+
end else loop ()
86100
end
87101
| NotEmpty {senders; messages} ->
88102
(* The channel is not empty *)
@@ -91,19 +105,19 @@ let send' {mutex; buffer_size; contents} v ~polling =
91105
begin if not polling then
92106
(* The channel is not empty, the buffer is full and we're not
93107
* polling *)
94-
let condition = get_condvar mutex in
108+
let cond_slot = ref Waiting in
109+
let mc = get_mutex_condvar () in
95110
let new_contents =
96-
NotEmpty {senders= push senders (v, condition); messages}
111+
NotEmpty {senders= push senders (v, cond_slot, mc); messages}
97112
in
98-
Domain.Mutex.lock mutex;
99113
if Atomic.compare_and_set contents old_contents new_contents then begin
100-
Domain.Condition.wait condition;
101-
Domain.Mutex.unlock mutex;
114+
Domain.Mutex.lock mc.mutex;
115+
while !cond_slot = Waiting do
116+
Domain.Condition.wait mc.condition;
117+
done;
118+
Domain.Mutex.unlock mc.mutex;
102119
true
103-
end else begin
104-
Domain.Mutex.unlock mutex;
105-
loop ()
106-
end
120+
end else loop ()
107121
else
108122
(* The channel is not empty, the buffer is full and we're
109123
* polling *)
@@ -129,7 +143,7 @@ let send_poll c v = send' c v ~polling:true
129143
(* [recv'] is shared by both the blocking and polling versions. Returns a an
130144
* optional value indicating whether the receive was successful. Hence, it
131145
* always returns [Some v] if [polling] is [false]. *)
132-
let recv' {mutex; buffer_size; contents} ~polling =
146+
let recv' {buffer_size; contents} ~polling =
133147
let open Fun_queue in
134148
let rec loop () =
135149
let old_contents = Atomic.get contents in
@@ -139,16 +153,19 @@ let recv' {mutex; buffer_size; contents} ~polling =
139153
if not polling then begin
140154
(* The channel is empty (no senders), and we're not polling *)
141155
let msg_slot = ref None in
142-
let condition = get_condvar mutex in
156+
let mc = get_mutex_condvar () in
143157
let new_contents =
144-
Empty {receivers= push receivers (msg_slot, condition)}
158+
Empty {receivers= push receivers (msg_slot, mc)}
145159
in
146-
Domain.Mutex.lock mutex;
147160
if Atomic.compare_and_set contents old_contents new_contents then
148-
(Domain.Condition.wait condition;
149-
Domain.Mutex.unlock mutex;
150-
!msg_slot)
151-
else (Domain.Mutex.unlock mutex; loop ())
161+
begin
162+
Domain.Mutex.lock mc.mutex;
163+
while !msg_slot = None do
164+
Domain.Condition.wait mc.condition;
165+
done;
166+
Domain.Mutex.unlock mc.mutex;
167+
!msg_slot
168+
end else loop ()
152169
end else
153170
(* The channel is empty (no senders), and we're polling *)
154171
None
@@ -170,7 +187,7 @@ let recv' {mutex; buffer_size; contents} ~polling =
170187
if Atomic.compare_and_set contents old_contents new_contents
171188
then Some m
172189
else loop ()
173-
| None, Some ((m, condition), senders') ->
190+
| None, Some ((m, c, mc), senders') ->
174191
(* The channel is not empty, there are no messages, and there
175192
* is a waiting sender. This is only possible is the buffer
176193
* size is 0. *)
@@ -181,26 +198,28 @@ let recv' {mutex; buffer_size; contents} ~polling =
181198
else
182199
NotEmpty {messages; senders= senders'}
183200
in
184-
Domain.Mutex.lock mutex;
185201
if Atomic.compare_and_set contents old_contents new_contents
186202
then begin
187-
Domain.Condition.signal condition;
188-
Domain.Mutex.unlock mutex;
203+
Domain.Mutex.lock mc.mutex;
204+
c := Notified;
205+
Domain.Condition.signal mc.condition;
206+
Domain.Mutex.unlock mc.mutex;
189207
Some m
190-
end else (Domain.Mutex.unlock mutex; loop ())
191-
| Some (m, messages'), Some ((ms, condition), senders') ->
208+
end else loop ()
209+
| Some (m, messages'), Some ((ms, sc, mc), senders') ->
192210
(* The channel is not empty, there is a message, and there is a
193211
* waiting sender. *)
194212
let new_contents =
195213
NotEmpty {messages= push messages' ms; senders= senders'}
196214
in
197-
Domain.Mutex.lock mutex;
198215
if Atomic.compare_and_set contents old_contents new_contents
199216
then begin
200-
Domain.Condition.signal condition;
201-
Domain.Mutex.unlock mutex;
217+
Domain.Mutex.lock mc.mutex;
218+
sc := Notified;
219+
Domain.Condition.signal mc.condition;
220+
Domain.Mutex.unlock mc.mutex;
202221
Some m
203-
end else (Domain.Mutex.unlock mutex; loop ())
222+
end else loop ()
204223
in
205224
loop ()
206225

0 commit comments

Comments
 (0)