Skip to content

Commit

Permalink
Make Eio.Semaphore lock-free
Browse files Browse the repository at this point in the history
This uses the new Cells module to replace the use of a mutex.
  • Loading branch information
talex5 committed Jan 3, 2023
1 parent 79df549 commit ff5305f
Show file tree
Hide file tree
Showing 7 changed files with 579 additions and 41 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ test_luv:
EIO_BACKEND=luv dune runtest

dscheck:
dune exec -- ./lib_eio/tests/dscheck/test_semaphore.exe
dune exec -- ./lib_eio/tests/dscheck/test_cells.exe

docker:
Expand Down
189 changes: 189 additions & 0 deletions lib_eio/sem_state.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
(* A lock-free semaphore, using Cells.
We have a number of resources (1 in the case of a mutex). Each time a user
wants a resource, the user decrements the counter. When finished with the
resource, the user increments the counter again.
If there are more users than resources then the counter will be negative. If
a user decrements the counter to a non-negative value then it gets ownership
of one of the free resources. If it decrements the counter to a negative
value then it must wait (by allocating a cell). When a user with a resource
increments the counter *from* a negative value, that user is responsible for
resuming one waiting cell (which takes ownership of the resource). This
ensures that every waiter will get woken exactly once.
Cancellation
We could consider cancelling a request to be simply replacing the callback
with a dummy one that immediately releases the resource. However, if callers
keep cancelling then the list of cancelled requests would keep growing.
Instead, we'd like cancellation simply to undo the effects of suspending, by
incrementing the counter and marking the cell as Cancelled (so that the
resumer will ignore it and move on to the next waiter, and the Cancelled
cell can be freed).
If the cancelling user increments from a negative value then it is responsible
for waking one user, which is fine as it is waking itself. However, it may find
itself incrementing from a non-negative one if it is racing with a resumer
(if the count is non-negative then once all current operations finish there
would be no suspended users, so the process of waking this user must have
already begun).
To handle this, a cancelling user first transitions the cell to a Cancelling
state, then increments the counter, then transitions to the final Cancelled
state, in the usual case where it incremented from a negative value.
If a resumer runs at the same time then it may also increment the counter
from a non-negative value and try to wake the Cancelling user. In this
case, it instead transitions the cell from Cancelling to Resumed. The
cancelling user will notice this when it fails to CAS to Cancelled and can
handle it.
If the cancelling user sees the Resumed state after Cancelling then it knows
that the resuming user has transferred to it the responsibility of waking
one user. If the cancelling user is also responsible for waking one user
then it performs an extra resume on behalf of the resuming user.
Finally, if the cancelling user is not responsible for waking anyone (even
itself) then it leaves the cell in Cancelling (the CQS paper uses a separate
Refused state, but we don't actually need that). This can only happen when a
resume is happening at the same time. The resumer will transition to
Resumed, creating an obligation to resume, but we've just done that anyway.
We know this Cancelling state can't last long because at the moment when the
canceller incremented the counter all current waiters, including itself,
were in the process of being resumed. *)

module Cell = struct
type _ t =
| Request of (unit -> unit) (* Waiting for a resource *)
| Cancelling (* Cancellation is in progress (or refused) *)
| Cancelled (* Ignore this and use next cell instead *)
| Resumed (* Suspender now owns resource *)
| Empty

let init = Empty

let segment_order = 2

let dump f = function
| Request _ -> Fmt.string f "Request"
| Empty -> Fmt.string f "Empty"
| Resumed -> Fmt.string f "Resumed"
| Cancelling -> Fmt.string f "Cancelling"
| Cancelled -> Fmt.string f "Cancelled"
end

module Cells = Cells.Make(Cell)

type cell = unit Cell.t

type t = {
state : int Atomic.t; (* Free resources. Negative if there are waiters waiting. *)
cells : unit Cells.t;
}

type request = t * unit Cells.segment * unit Cell.t Atomic.t

(* Wake one waiter. *)
let rec resume t =
let cell = Cells.next_resume t.cells in
let rec aux () =
match (Atomic.get cell : cell) with
| Request r as cur ->
(* The common case: we have a waiter for the value *)
if Atomic.compare_and_set cell cur Resumed then r ()
else aux ()
| Empty ->
(* The consumer has reserved this cell but not yet stored the request.
We place Resumed there and the consumer will handle it soon. *)
if not (Atomic.compare_and_set cell Empty Resumed) then aux ()
| Cancelled ->
(* This waker has finished cancelling. Ignore it and resume the next one. *)
resume t
| Cancelling ->
(* The waker has started cancelling. Let it know we want to resume it
and then let it handle it (it may also have finished cancelling but
decided there's nothing left to do anyway). *)
if not (Atomic.compare_and_set cell Cancelling Resumed) then aux ()
| Resumed ->
(* This state is unreachable because we (the provider) haven't set this yet *)
assert false
in
aux ()

(* [true] on success, or [false] if we need to suspend.
You MUST call [suspend] iff this returns [false].
The reason for splitting this is because e.g. [Semaphore] needs to get
the continuation for the fiber between [acquire] and [suspend]. *)
let acquire t =
let s = Atomic.fetch_and_add t.state (-1) in
(* We got a resource if we decremented *to* a non-negative number,
which happens if we decremented *from* a positive one. *)
s > 0

let suspend t k : request option =
let (segment, cell) = Cells.next_suspend t.cells in
if Atomic.compare_and_set cell Empty (Request k) then Some (t, segment, cell)
else match Atomic.get cell with
| Resumed ->
(* We got resumed before we could add the waiter. *)
k ();
None
| Cancelling | Cancelled | Request _ | Empty ->
(* These are unreachable from the previously-observed non-Empty state
without us taking some action first *)
assert false

let release t =
let s = Atomic.fetch_and_add t.state (+1) in
if s < 0 then (
(* We incremented from a negative value.
We are therefore responsible for waking one waiter. *)
resume t
)

let cancel (t, segment, cell) =
match (Atomic.get cell : cell) with
| Request _ as old ->
if Atomic.compare_and_set cell old Cancelling then (
(* Undo the effect of [acquire] by incrementing the counter.
As always, if we increment from a negative value then we need to resume one waiter. *)
let need_resume = Atomic.fetch_and_add t.state (+1) < 0 in
if need_resume then (
if Atomic.compare_and_set cell Cancelling Cancelled then (
(* The normal case. We resumed ourself by cancelling. *)
Cells.cancel_cell segment
) else (
(* [release] got called at the same time and it also needed to resume one waiter.
So we call [resume] to handle the extra one, in addition to resuming ourself. *)
resume t
)
) else (
(* This can only happen if [release] ran at the same time and incremented the counter
before we did. Since we were suspended, and later we saw the counter
show that no one was, it must have decided to wake us. Either it has placed Resumed
in the cell, or it's about to do so. Either way, we discharge the obligation to
wake someone by resuming ourself with a cancellation. *)
);
true
) else false (* We got resumed first *)
| Resumed -> false (* We got resumed first *)
| Cancelling | Cancelled -> invalid_arg "Already cancelled!"
| Empty ->
(* To call [cancel] the user needs a [request] value,
which they only get once we've reached the [Request] state.
[Empty] is unreachable from [Request]. *)
assert false

let dump f t =
Fmt.pf f "Semaphore (state=%d)@,%a"
(Atomic.get t.state)
Cells.dump t.cells

let create n =
if n < 0 then raise (Invalid_argument "n < 0");
{
cells = Cells.make ();
state = Atomic.make n;
}
65 changes: 26 additions & 39 deletions lib_eio/semaphore.ml
Original file line number Diff line number Diff line change
@@ -1,55 +1,42 @@
type state =
| Free of int
| Waiting of unit Waiters.t

type t = {
id : Ctf.id;
mutex : Mutex.t;
mutable state : state;
state : Sem_state.t;
}

let make n =
if n < 0 then raise (Invalid_argument "n < 0");
let id = Ctf.mint_id () in
Ctf.note_created id Ctf.Semaphore;
{
id;
mutex = Mutex.create ();
state = Free n;
state = Sem_state.create n;
}

let release t =
Mutex.lock t.mutex;
Ctf.note_signal t.id;
match t.state with
| Free x when x = max_int -> Mutex.unlock t.mutex; raise (Sys_error "semaphore would overflow max_int!")
| Free x -> t.state <- Free (succ x); Mutex.unlock t.mutex
| Waiting q ->
begin match Waiters.wake_one q () with
| `Ok -> ()
| `Queue_empty -> t.state <- Free 1
end;
Mutex.unlock t.mutex
Sem_state.release t.state

let rec acquire t =
Mutex.lock t.mutex;
match t.state with
| Waiting q ->
Ctf.note_try_read t.id;
Waiters.await ~mutex:(Some t.mutex) q t.id
| Free 0 ->
t.state <- Waiting (Waiters.create ());
Mutex.unlock t.mutex;
acquire t
| Free n ->
Ctf.note_read t.id;
t.state <- Free (pred n);
Mutex.unlock t.mutex
let acquire t =
if not (Sem_state.acquire t.state) then (
(* No free resources.
We must wait until one of the existing users increments the counter and resumes us.
It's OK if they resume before we suspend; we'll just pick up the token they left. *)
Suspend.enter_unchecked (fun ctx enqueue ->
match Sem_state.suspend t.state (fun () -> enqueue (Ok ())) with
| None -> () (* Already resumed *)
| Some request ->
Ctf.note_try_read t.id;
match Fiber_context.get_error ctx with
| Some ex ->
if Sem_state.cancel request then enqueue (Error ex);
(* else already resumed *)
| None ->
Fiber_context.set_cancel_fn ctx (fun ex ->
if Sem_state.cancel request then enqueue (Error ex)
(* else already resumed *)
)
)
);
Ctf.note_read t.id

let get_value t =
Mutex.lock t.mutex;
let s = t.state in
Mutex.unlock t.mutex;
match s with
| Free n -> n
| Waiting _ -> 0
max 0 (Atomic.get t.state.state)
10 changes: 8 additions & 2 deletions lib_eio/tests/dscheck/dune
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
; We copy cells.ml here so we can build it using TracedAtomic instead of the default one.
(copy_files# (files ../../core/cells.ml))
(copy_files# (files ../../sem_state.ml))

(executable
(name test_cells)
(executables
(names test_cells test_semaphore)
(libraries dscheck optint fmt))

(rule
(alias dscheck)
(package eio)
(action (run %{exe:test_cells.exe})))

(rule
(alias dscheck)
(package eio)
(action (run %{exe:test_semaphore.exe})))
49 changes: 49 additions & 0 deletions lib_eio/tests/dscheck/test_semaphore.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
let debug = false

module T = Sem_state

let test ~capacity ~users () =
let messages = ref [] in
let log fmt = (fmt ^^ "@.") |> Format.kasprintf @@ fun msg -> messages := msg :: !messages in
if debug then log "== start ==";
let t = T.create capacity in
let running = Atomic.make 0 in
let acquire fn =
if T.acquire t then (fn (); None)
else T.suspend t fn
in
for i = 1 to users do
Atomic.spawn (fun () ->
match
acquire (fun () ->
if debug then log "%d: got resource" i;
Atomic.incr running;
Atomic.decr running;
if debug then log "%d: released resource" i;
T.release t
)
with
| None -> ()
| Some request ->
if T.cancel request then (
if debug then log "%d: cancelled request" i;
)
)
done;
Atomic.every (fun () -> assert (Atomic.get running <= capacity));
Atomic.final (fun () ->
if debug then (
List.iter print_string (List.rev !messages);
Fmt.pr "%a@." T.dump t;
);
assert (Atomic.get t.state = capacity);
(* Do a dummy non-cancelled operation to ensure the pointers end up together: *)
T.resume t;
assert (T.suspend t ignore = None);
assert (T.Cells.Position.index t.cells.suspend =
T.Cells.Position.index t.cells.resume);
)

let () =
Atomic.trace (test ~capacity:1 ~users:3);
Atomic.trace (test ~capacity:2 ~users:3)
Loading

0 comments on commit ff5305f

Please sign in to comment.