-
Notifications
You must be signed in to change notification settings - Fork 71
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This uses the new Cells module to replace the use of a mutex. Co-authored-by: Vesa Karvonen <vesa.a.j.k@gmail.com>
- Loading branch information
Showing
7 changed files
with
565 additions
and
41 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
(* A lock-free semaphore, using Cells. | ||
We have a number of resources (1 in the case of a mutex). Each time a user | ||
wants a resource, the user decrements the counter. When finished with the | ||
resource, the user increments the counter again. | ||
If there are more users than resources then the counter will be negative. If | ||
a user decrements the counter to a non-negative value then it gets ownership | ||
of one of the free resources. If it decrements the counter to a negative | ||
value then it must wait (by allocating a cell). When a user with a resource | ||
increments the counter *from* a negative value, that user is responsible for | ||
resuming one waiting cell (transferring ownership of the resource). This | ||
ensures that every waiter will get woken exactly once. | ||
Cancellation | ||
We could consider cancelling a request to be simply replacing the callback | ||
with a dummy one that immediately releases the resource. However, if callers | ||
keep cancelling then the list of cancelled requests would keep growing. | ||
Instead, we'd like cancellation simply to undo the effects of suspending, by | ||
incrementing the counter and marking the cell as Finished (so that the | ||
resumer will ignore it and move on to the next waiter, and the Finished | ||
cell can be freed). | ||
If the cancelling user increments from a negative value then it is responsible | ||
for waking one user, which is fine as it is waking itself. However, it may find | ||
itself incrementing from a non-negative one if it is racing with a resumer | ||
(if the count is non-negative then once all current operations finish there | ||
would be no suspended users, so the process of waking this user must have | ||
already begun). | ||
To handle this, a cancelling user first transitions the cell to In_transition, | ||
then increments the counter, then transitions to the final Finished state, | ||
in the usual case where it incremented from a negative value. | ||
If a resumer runs at the same time then it may also increment the counter | ||
from a non-negative value and try to wake this cell. It transitions the cell | ||
from In_transition to Finished. The cancelling user will notice this when it | ||
fails to CAS to Finished and can handle it. | ||
If the cancelling user sees the Finished state after In_transition then it | ||
knows that the resuming user has transferred to it the responsibility of | ||
waking one user. If the cancelling user is also responsible for waking one | ||
user then it performs an extra resume on behalf of the resuming user. | ||
Finally, if the cancelling user is not responsible for waking anyone (even | ||
itself) then it leaves the cell in In_transition (the CQS paper uses a | ||
separate Refused state, but we don't actually need that). This can only | ||
happen when a resume is happening at the same time. The resumer will | ||
transition to Finished, creating an obligation to resume, but we've just | ||
done that anyway. We know this In_transition state can't last long because | ||
at the moment when the canceller incremented the counter all current | ||
waiters, including itself, were in the process of being resumed. *) | ||
|
||
module Cell = struct | ||
type _ t = | ||
| In_transition (* The suspender will try to CAS this soon. *) | ||
| Request of (unit -> unit) (* Waiting for a resource. *) | ||
| Finished (* Ownership of the resource has been transferred, | ||
or the suspender cancelled. *) | ||
|
||
let init = In_transition | ||
(* We only resume when we know another thread is suspended or in the process of suspending. *) | ||
|
||
let segment_order = 2 | ||
|
||
let dump f = function | ||
| Request _ -> Fmt.string f "Request" | ||
| Finished -> Fmt.string f "Finished" | ||
| In_transition -> Fmt.string f "In_transition" | ||
end | ||
|
||
module Cells = Cells.Make(Cell) | ||
|
||
type cell = unit Cell.t | ||
|
||
type t = { | ||
state : int Atomic.t; (* Free resources. Negative if there are waiters waiting. *) | ||
cells : unit Cells.t; | ||
} | ||
|
||
type request = t * unit Cells.segment * unit Cell.t Atomic.t | ||
|
||
(* Wake one waiter (and give it the resource being released). *) | ||
let rec resume t = | ||
let cell = Cells.next_resume t.cells in | ||
match (Atomic.exchange cell Finished : cell) with | ||
| Request r -> | ||
(* The common case: there was a waiter for the value. | ||
We pass ownership of the resource to it. *) | ||
r () | ||
| Finished -> | ||
(* The waiter has finished cancelling. Ignore it and resume the next one. *) | ||
resume t | ||
| In_transition -> | ||
(* The consumer is in the middle of doing something and will soon try to | ||
CAS to a new state. It will see that we got there first and handle the | ||
resume when it's done. *) | ||
() | ||
|
||
(* [true] on success, or [false] if we need to suspend. | ||
You MUST call [suspend] iff this returns [false]. | ||
The reason for splitting this is because e.g. [Semaphore] needs to get | ||
the continuation for the fiber between [acquire] and [suspend]. *) | ||
let acquire t = | ||
let s = Atomic.fetch_and_add t.state (-1) in | ||
(* We got a resource if we decremented *to* a non-negative number, | ||
which happens if we decremented *from* a positive one. *) | ||
s > 0 | ||
|
||
let suspend t k : request option = | ||
let (segment, cell) = Cells.next_suspend t.cells in | ||
if Atomic.compare_and_set cell In_transition (Request k) then Some (t, segment, cell) | ||
else match Atomic.get cell with | ||
| Finished -> | ||
(* We got resumed before we could add the waiter. *) | ||
k (); | ||
None | ||
| Request _ | In_transition -> | ||
(* These are unreachable from the previously-observed non-In_transition state | ||
without us taking some action first *) | ||
assert false | ||
|
||
let release t = | ||
let s = Atomic.fetch_and_add t.state (+1) in | ||
if s < 0 then ( | ||
(* We incremented from a negative value. | ||
We are therefore responsible for waking one waiter. *) | ||
resume t | ||
) | ||
|
||
let cancel (t, segment, cell) = | ||
match (Atomic.get cell : cell) with | ||
| Request _ as old -> | ||
if Atomic.compare_and_set cell old In_transition then ( | ||
(* Undo the effect of [acquire] by incrementing the counter. | ||
As always, if we increment from a negative value then we need to resume one waiter. *) | ||
let need_resume = Atomic.fetch_and_add t.state (+1) < 0 in | ||
if need_resume then ( | ||
if Atomic.compare_and_set cell In_transition Finished then ( | ||
(* The normal case. We resumed ourself by cancelling. | ||
This is the only case we need to tell the segment because in all | ||
other cases the resumer has already reached this segment so | ||
freeing it is pointless. *) | ||
Cells.cancel_cell segment | ||
) else ( | ||
(* [release] got called at the same time and it also needed to resume one waiter. | ||
So we call [resume] to handle the extra one, in addition to resuming ourself. *) | ||
resume t | ||
) | ||
) else ( | ||
(* This can only happen if [release] ran at the same time and incremented the counter | ||
before we did. Since we were suspended, and later we saw the counter | ||
show that no one was, it must have decided to wake us. Either it has placed Finished | ||
in the cell, or it's about to do so. Either way, we discharge the obligation to | ||
wake someone by resuming ourself with a cancellation. | ||
The resource returns to the free pool. *) | ||
); | ||
true | ||
) else false (* We got resumed first *) | ||
| Finished -> false (* We got resumed first *) | ||
| In_transition -> invalid_arg "Already cancelling!" | ||
|
||
let dump f t = | ||
Fmt.pf f "Semaphore (state=%d)@,%a" | ||
(Atomic.get t.state) | ||
Cells.dump t.cells | ||
|
||
let create n = | ||
if n < 0 then raise (Invalid_argument "n < 0"); | ||
{ | ||
cells = Cells.make (); | ||
state = Atomic.make n; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,55 +1,42 @@ | ||
type state = | ||
| Free of int | ||
| Waiting of unit Waiters.t | ||
|
||
type t = { | ||
id : Ctf.id; | ||
mutex : Mutex.t; | ||
mutable state : state; | ||
state : Sem_state.t; | ||
} | ||
|
||
let make n = | ||
if n < 0 then raise (Invalid_argument "n < 0"); | ||
let id = Ctf.mint_id () in | ||
Ctf.note_created id Ctf.Semaphore; | ||
{ | ||
id; | ||
mutex = Mutex.create (); | ||
state = Free n; | ||
state = Sem_state.create n; | ||
} | ||
|
||
let release t = | ||
Mutex.lock t.mutex; | ||
Ctf.note_signal t.id; | ||
match t.state with | ||
| Free x when x = max_int -> Mutex.unlock t.mutex; raise (Sys_error "semaphore would overflow max_int!") | ||
| Free x -> t.state <- Free (succ x); Mutex.unlock t.mutex | ||
| Waiting q -> | ||
begin match Waiters.wake_one q () with | ||
| `Ok -> () | ||
| `Queue_empty -> t.state <- Free 1 | ||
end; | ||
Mutex.unlock t.mutex | ||
Sem_state.release t.state | ||
|
||
let rec acquire t = | ||
Mutex.lock t.mutex; | ||
match t.state with | ||
| Waiting q -> | ||
Ctf.note_try_read t.id; | ||
Waiters.await ~mutex:(Some t.mutex) q t.id | ||
| Free 0 -> | ||
t.state <- Waiting (Waiters.create ()); | ||
Mutex.unlock t.mutex; | ||
acquire t | ||
| Free n -> | ||
Ctf.note_read t.id; | ||
t.state <- Free (pred n); | ||
Mutex.unlock t.mutex | ||
let acquire t = | ||
if not (Sem_state.acquire t.state) then ( | ||
(* No free resources. | ||
We must wait until one of the existing users increments the counter and resumes us. | ||
It's OK if they resume before we suspend; we'll just pick up the token they left. *) | ||
Suspend.enter_unchecked (fun ctx enqueue -> | ||
match Sem_state.suspend t.state (fun () -> enqueue (Ok ())) with | ||
| None -> () (* Already resumed *) | ||
| Some request -> | ||
Ctf.note_try_read t.id; | ||
match Fiber_context.get_error ctx with | ||
| Some ex -> | ||
if Sem_state.cancel request then enqueue (Error ex); | ||
(* else already resumed *) | ||
| None -> | ||
Fiber_context.set_cancel_fn ctx (fun ex -> | ||
if Sem_state.cancel request then enqueue (Error ex) | ||
(* else already resumed *) | ||
) | ||
) | ||
); | ||
Ctf.note_read t.id | ||
|
||
let get_value t = | ||
Mutex.lock t.mutex; | ||
let s = t.state in | ||
Mutex.unlock t.mutex; | ||
match s with | ||
| Free n -> n | ||
| Waiting _ -> 0 | ||
max 0 (Atomic.get t.state.state) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,17 @@ | ||
; We copy cells.ml here so we can build it using TracedAtomic instead of the default one. | ||
(copy_files# (files ../../core/cells.ml)) | ||
(copy_files# (files ../../sem_state.ml)) | ||
|
||
(executable | ||
(name test_cells) | ||
(executables | ||
(names test_cells test_semaphore) | ||
(libraries dscheck optint fmt)) | ||
|
||
(rule | ||
(alias dscheck) | ||
(package eio) | ||
(action (run %{exe:test_cells.exe}))) | ||
|
||
(rule | ||
(alias dscheck) | ||
(package eio) | ||
(action (run %{exe:test_semaphore.exe}))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
let debug = false | ||
|
||
module T = Sem_state | ||
|
||
let test ~capacity ~users () = | ||
let messages = ref [] in | ||
let log fmt = (fmt ^^ "@.") |> Format.kasprintf @@ fun msg -> messages := msg :: !messages in | ||
if debug then log "== start =="; | ||
let t = T.create capacity in | ||
let running = Atomic.make 0 in | ||
let acquire fn = | ||
if T.acquire t then (fn (); None) | ||
else T.suspend t fn | ||
in | ||
for i = 1 to users do | ||
Atomic.spawn (fun () -> | ||
match | ||
acquire (fun () -> | ||
if debug then log "%d: got resource" i; | ||
Atomic.incr running; | ||
Atomic.decr running; | ||
if debug then log "%d: released resource" i; | ||
T.release t | ||
) | ||
with | ||
| None -> () | ||
| Some request -> | ||
if T.cancel request then ( | ||
if debug then log "%d: cancelled request" i; | ||
) | ||
) | ||
done; | ||
Atomic.every (fun () -> assert (Atomic.get running <= capacity)); | ||
Atomic.final (fun () -> | ||
if debug then ( | ||
List.iter print_string (List.rev !messages); | ||
Fmt.pr "%a@." T.dump t; | ||
); | ||
assert (Atomic.get t.state = capacity); | ||
(* Do a dummy non-cancelled operation to ensure the pointers end up together: *) | ||
T.resume t; | ||
assert (T.suspend t ignore = None); | ||
assert (T.Cells.Position.index t.cells.suspend = | ||
T.Cells.Position.index t.cells.resume); | ||
) | ||
|
||
let () = | ||
Atomic.trace (test ~capacity:1 ~users:3); | ||
Atomic.trace (test ~capacity:2 ~users:3) |
Oops, something went wrong.