Skip to content

Commit 9552488

Browse files
kayceesrkctk21
authored andcommitted
Implement channels using Mutex and Condition Variables
1 parent daaf239 commit 9552488

File tree

1 file changed

+71
-39
lines changed

1 file changed

+71
-39
lines changed

lib/chan.ml

Lines changed: 71 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,38 @@
11
type 'a contents =
2-
| Empty of {receivers: ('a option ref * Domain.id) Fun_queue.t}
3-
| NotEmpty of {senders: ('a * Domain.id) Fun_queue.t; messages: 'a Fun_queue.t}
2+
| Empty of {receivers: ('a option ref * Domain.Condition.t) Fun_queue.t}
3+
| NotEmpty of {senders: ('a * Domain.Condition.t) Fun_queue.t; messages: 'a Fun_queue.t}
44

5-
type 'a t = {buffer_size: int option; contents: 'a contents Atomic.t}
5+
type 'a t = {
6+
mutex: Domain.Mutex.t;
7+
buffer_size: int option;
8+
contents: 'a contents Atomic.t
9+
}
10+
11+
let condition_key : Domain.Condition.t Domain.DLS.key = Domain.DLS.new_key ()
12+
13+
let get_condvar m =
14+
match Domain.DLS.get condition_key with
15+
| None ->
16+
let c = Domain.Condition.create m in
17+
Domain.DLS.set condition_key c;
18+
c
19+
| Some c -> c
620

721
let make_bounded n =
822
if n < 0 then raise (Invalid_argument "Chan.make_bounded") ;
9-
{buffer_size= Some n; contents = Atomic.make (Empty {receivers= Fun_queue.empty})}
23+
{buffer_size= Some n;
24+
contents = Atomic.make (Empty {receivers= Fun_queue.empty; });
25+
mutex = Domain.Mutex.create ();}
1026

1127
let make_unbounded () =
12-
{buffer_size= None; contents = Atomic.make (Empty {receivers= Fun_queue.empty})}
28+
{buffer_size= None;
29+
contents = Atomic.make (Empty {receivers= Fun_queue.empty});
30+
mutex = Domain.Mutex.create ();}
1331

1432
(* [send'] is shared by both the blocking and polling versions. Returns a
1533
* boolean indicating whether the send was successful. Hence, it always returns
1634
* [true] if [polling] is [false]. *)
17-
let send' {buffer_size; contents} v ~polling =
35+
let send' {mutex; buffer_size; contents} v ~polling =
1836
let open Fun_queue in
1937
let rec loop () =
2038
let old_contents = Atomic.get contents in
@@ -30,13 +48,15 @@ let send' {buffer_size; contents} v ~polling =
3048
begin if not polling then begin
3149
(* The channel is empty (no senders), no waiting receivers,
3250
* buffer size is 0 and we're not polling *)
51+
let condition = get_condvar mutex in
3352
let new_contents =
3453
NotEmpty
35-
{messages= empty; senders= push empty (v, Domain.self ())}
54+
{messages= empty; senders= push empty (v, condition)}
3655
in
56+
Domain.Mutex.lock mutex;
3757
if Atomic.compare_and_set contents old_contents new_contents
38-
then (Domain.Sync.wait (); true)
39-
else loop ()
58+
then (Domain.Condition.wait condition; true)
59+
else (Domain.Mutex.unlock mutex; loop ())
4060
end else
4161
(* The channel is empty (no senders), no waiting receivers,
4262
* buffer size is 0 and we're polling *)
@@ -51,25 +71,18 @@ let send' {buffer_size; contents} v ~polling =
5171
if Atomic.compare_and_set contents old_contents new_contents
5272
then true
5373
else loop ()
54-
| Some ((r, d), receivers') ->
74+
| Some ((r, condition), receivers') ->
5575
(* The channel is empty (no senders) and there are waiting receivers
5676
* *)
5777
let new_contents = Empty {receivers= receivers'} in
78+
Domain.Mutex.lock mutex;
5879
if Atomic.compare_and_set contents old_contents new_contents
59-
then (
80+
then begin
6081
r := Some v;
61-
(* Notifying another domain from within a critical section is unsafe
62-
* in general. Notify blocks until the target domain is out of the
63-
* critical section. If two domains are notifying each other from
64-
* within critical section, then the program deadlocks. However,
65-
* here (and other uses of notify in send' and recv' in the channel
66-
* implementation), there is no possibility of other domains
67-
* notifying this domain; only a blocked domain will be notified,
68-
* and this domain is currently running. Hence, it is ok to notify
69-
* from within the critical section. *)
70-
Domain.Sync.notify d;
71-
true )
72-
else loop ()
82+
Domain.Condition.signal condition;
83+
Domain.Mutex.unlock mutex;
84+
true
85+
end else (Domain.Mutex.unlock mutex; loop ())
7386
end
7487
| NotEmpty {senders; messages} ->
7588
(* The channel is not empty *)
@@ -78,12 +91,19 @@ let send' {buffer_size; contents} v ~polling =
7891
begin if not polling then
7992
(* The channel is not empty, the buffer is full and we're not
8093
* polling *)
94+
let condition = get_condvar mutex in
8195
let new_contents =
82-
NotEmpty {senders= push senders (v, Domain.self ()); messages}
96+
NotEmpty {senders= push senders (v, condition); messages}
8397
in
84-
if Atomic.compare_and_set contents old_contents new_contents then
85-
( Domain.Sync.wait () ; true )
86-
else loop ()
98+
Domain.Mutex.lock mutex;
99+
if Atomic.compare_and_set contents old_contents new_contents then begin
100+
Domain.Condition.wait condition;
101+
Domain.Mutex.unlock mutex;
102+
true
103+
end else begin
104+
Domain.Mutex.unlock mutex;
105+
loop ()
106+
end
87107
else
88108
(* The channel is not empty, the buffer is full and we're
89109
* polling *)
@@ -98,7 +118,7 @@ let send' {buffer_size; contents} v ~polling =
98118
then true
99119
else loop ()
100120
in
101-
Domain.Sync.critical_section loop
121+
loop ()
102122

103123
let send c v =
104124
let r = send' c v ~polling:false in
@@ -109,7 +129,7 @@ let send_poll c v = send' c v ~polling:true
109129
(* [recv'] is shared by both the blocking and polling versions. Returns a an
110130
* optional value indicating whether the receive was successful. Hence, it
111131
* always returns [Some v] if [polling] is [false]. *)
112-
let recv' {buffer_size; contents} ~polling =
132+
let recv' {mutex; buffer_size; contents} ~polling =
113133
let open Fun_queue in
114134
let rec loop () =
115135
let old_contents = Atomic.get contents in
@@ -119,12 +139,16 @@ let recv' {buffer_size; contents} ~polling =
119139
if not polling then begin
120140
(* The channel is empty (no senders), and we're not polling *)
121141
let msg_slot = ref None in
142+
let condition = get_condvar mutex in
122143
let new_contents =
123-
Empty {receivers= push receivers (msg_slot, Domain.self ())}
144+
Empty {receivers= push receivers (msg_slot, condition)}
124145
in
146+
Domain.Mutex.lock mutex;
125147
if Atomic.compare_and_set contents old_contents new_contents then
126-
(Domain.Sync.wait (); !msg_slot)
127-
else loop ()
148+
(Domain.Condition.wait condition;
149+
Domain.Mutex.unlock mutex;
150+
!msg_slot)
151+
else (Domain.Mutex.unlock mutex; loop ())
128152
end else
129153
(* The channel is empty (no senders), and we're polling *)
130154
None
@@ -146,7 +170,7 @@ let recv' {buffer_size; contents} ~polling =
146170
if Atomic.compare_and_set contents old_contents new_contents
147171
then Some m
148172
else loop ()
149-
| None, Some ((m, s), senders') ->
173+
| None, Some ((m, condition), senders') ->
150174
(* The channel is not empty, there are no messages, and there
151175
* is a waiting sender. This is only possible is the buffer
152176
* size is 0. *)
@@ -157,20 +181,28 @@ let recv' {buffer_size; contents} ~polling =
157181
else
158182
NotEmpty {messages; senders= senders'}
159183
in
184+
Domain.Mutex.lock mutex;
160185
if Atomic.compare_and_set contents old_contents new_contents
161-
then (Domain.Sync.notify s; Some m)
162-
else loop ()
163-
| Some (m, messages'), Some ((ms, s), senders') ->
186+
then begin
187+
Domain.Condition.signal condition;
188+
Domain.Mutex.unlock mutex;
189+
Some m
190+
end else (Domain.Mutex.unlock mutex; loop ())
191+
| Some (m, messages'), Some ((ms, condition), senders') ->
164192
(* The channel is not empty, there is a message, and there is a
165193
* waiting sender. *)
166194
let new_contents =
167195
NotEmpty {messages= push messages' ms; senders= senders'}
168196
in
197+
Domain.Mutex.lock mutex;
169198
if Atomic.compare_and_set contents old_contents new_contents
170-
then (Domain.Sync.notify s; Some m)
171-
else loop ()
199+
then begin
200+
Domain.Condition.signal condition;
201+
Domain.Mutex.unlock mutex;
202+
Some m
203+
end else (Domain.Mutex.unlock mutex; loop ())
172204
in
173-
Domain.Sync.critical_section loop
205+
loop ()
174206

175207
let recv c =
176208
match recv' c ~polling:false with

0 commit comments

Comments
 (0)