-
Notifications
You must be signed in to change notification settings - Fork 71
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This makes Eio.Condition safe to use from a signal handler (or GC finalizer). The cells.ml abstraction should be useful for several other synchronisation primitives (see https://arxiv.org/pdf/2111.12682.pdf).
- Loading branch information
Showing
18 changed files
with
1,246 additions
and
25 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,27 +1,33 @@ | ||
type t = { | ||
waiters: unit Waiters.t; | ||
mutex: Mutex.t; | ||
id: Ctf.id | ||
} | ||
type t = Broadcast.t | ||
|
||
let create () = { | ||
waiters = Waiters.create (); | ||
id = Ctf.mint_id (); | ||
mutex = Mutex.create (); | ||
} | ||
let create () = Broadcast.create () | ||
|
||
let await t mutex = | ||
Mutex.lock t.mutex; | ||
Eio_mutex.unlock mutex; | ||
match Waiters.await ~mutex:(Some t.mutex) t.waiters t.id with | ||
| () -> Eio_mutex.lock mutex | ||
| exception ex -> Eio_mutex.lock mutex; raise ex | ||
let await_generic ?mutex t = | ||
match | ||
Suspend.enter_unchecked (fun ctx enqueue -> | ||
match Fiber_context.get_error ctx with | ||
| Some ex -> | ||
Option.iter Eio_mutex.unlock mutex; | ||
enqueue (Error ex) | ||
| None -> | ||
match Broadcast.suspend t (fun () -> enqueue (Ok ())) with | ||
| None -> | ||
Option.iter Eio_mutex.unlock mutex | ||
| Some request -> | ||
Option.iter Eio_mutex.unlock mutex; | ||
Fiber_context.set_cancel_fn ctx (fun ex -> | ||
if Broadcast.cancel request then enqueue (Error ex) | ||
(* else already succeeded *) | ||
) | ||
) | ||
with | ||
| () -> Option.iter Eio_mutex.lock mutex | ||
| exception ex -> | ||
let bt = Printexc.get_raw_backtrace () in | ||
Option.iter Eio_mutex.lock mutex; | ||
Printexc.raise_with_backtrace ex bt | ||
|
||
let await_no_mutex t = | ||
Mutex.lock t.mutex; | ||
Waiters.await ~mutex:(Some t.mutex) t.waiters t.id | ||
let await t mutex = await_generic ~mutex t | ||
let await_no_mutex t = await_generic t | ||
|
||
let broadcast t = | ||
Mutex.lock t.mutex; | ||
Waiters.wake_all t.waiters (); | ||
Mutex.unlock t.mutex | ||
let broadcast = Broadcast.resume_all |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
(* See the Cells module for an overview of this system. | ||
Each new waiter atomically increments the "suspend" pointer and writes | ||
a callback there. The waking fiber removes all the callbacks and calls them. | ||
In this version, "resume" never gets ahead of "suspend" (broadcasting just | ||
brings it up-to-date with the "suspend" pointer). | ||
When the resume fiber runs, some of the cells reserved for callbacks might | ||
not yet have been filled. In this case, the resuming fiber just marks them | ||
as needing to be resumed. When the suspending fiber continues, it will | ||
notice this and continue immediately. *) | ||
|
||
module Cell = struct | ||
(* For any given cell, there are two actors running in parallel: the | ||
suspender and the resumer. | ||
The resumer only performs a single operation (resume). | ||
The consumer waits to be resumed and then, optionally, cancels. | ||
This means we only have three cases to think about: | ||
1. Consumer adds request (Empty -> Request). | ||
1a. Provider fulfills it (Request -> Resumed). | ||
1b. Consumer cancels it (Request -> Cancelled). | ||
2. Provider gets to cell first (Empty -> Resumed). | ||
When the consumer tries to wait, it resumes immediately. | ||
The Resumed state should never been seen. It exists only to allow the | ||
request to be GC'd promptly. We could replace it with Empty, but having | ||
separate states is clearer for debugging. *) | ||
|
||
type _ t = | ||
| Request of (unit -> unit) | ||
| Cancelled | ||
| Resumed | ||
| Empty | ||
|
||
let init = Empty | ||
|
||
let segment_order = 2 | ||
|
||
let dump f = function | ||
| Request _ -> Fmt.string f "Request" | ||
| Empty -> Fmt.string f "Empty" | ||
| Resumed -> Fmt.string f "Resumed" | ||
| Cancelled -> Fmt.string f "Cancelled" | ||
end | ||
|
||
module Cells = Cells.Make(Cell) | ||
|
||
type cell = unit Cell.t | ||
type t = unit Cells.t | ||
|
||
type request = unit Cells.segment * cell Atomic.t | ||
|
||
let rec resume cell = | ||
match (Atomic.get cell : cell) with | ||
| Request r as cur -> | ||
(* The common case: we have a waiter for the value *) | ||
if Atomic.compare_and_set cell cur Resumed then r (); | ||
(* else it was cancelled at the same time; ignore *) | ||
| Empty -> | ||
(* The consumer has reserved this cell but not yet stored the request. | ||
We place Resumed there and it will handle it soon. *) | ||
if Atomic.compare_and_set cell Empty Resumed then | ||
() (* The consumer will deal with it *) | ||
else | ||
resume cell (* The Request was added concurrently; use it *) | ||
| Cancelled -> () | ||
| Resumed -> | ||
(* This state is unreachable because we (the provider) haven't set this yet *) | ||
assert false | ||
|
||
let cancel (segment, cell) = | ||
match (Atomic.get cell : cell) with | ||
| Request _ as old -> | ||
if Atomic.compare_and_set cell old Cancelled then ( | ||
Cells.cancel_cell segment; | ||
true | ||
) else false (* We got resumed first *) | ||
| Resumed -> false (* We got resumed first *) | ||
| Cancelled -> invalid_arg "Already cancelled!" | ||
| Empty -> | ||
(* To call [cancel] the user needs a [request] value, | ||
which they only get once we've reached the [Request] state. | ||
[Empty] is unreachable from [Request]. *) | ||
assert false | ||
|
||
let suspend t k = | ||
let (_, cell) as request = Cells.next_suspend t in | ||
if Atomic.compare_and_set cell Empty (Request k) then Some request | ||
else match Atomic.get cell with | ||
| Resumed -> | ||
(* Resumed before we could add the waiter *) | ||
k (); | ||
None | ||
| Cancelled | Request _ | Empty -> | ||
(* These are unreachable from the previously-observed non-Empty state | ||
without us taking some action first *) | ||
assert false | ||
|
||
let resume_all t = | ||
Cells.resume_all t resume | ||
|
||
let create = Cells.make | ||
|
||
let dump f t = Cells.dump f t |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
(** A lock-free queue of waiters that should all be resumed at once. | ||
This uses {!Cells} internally. *) | ||
|
||
type t | ||
|
||
type request | ||
(** A handle to a pending request that can be used to cancel it. *) | ||
|
||
val create : unit -> t | ||
(** [create ()] is a fresh broadcast queue. *) | ||
|
||
val suspend : t -> (unit -> unit) -> request option | ||
(** [suspend t fn] arranges for [fn ()] to be called on {!resume_all}. | ||
[fn ()] may be called from the caller's context, or by [resume_all], | ||
so it needs to be able to cope with running in any context where that | ||
can run. For example, [fn] must be safe to call from a signal handler | ||
if [resume_all] can be called from one. [fn] must not raise. | ||
The returned request can be used to cancel. It can be [None] in the | ||
(unlikely) event that [t] got resumed before the function returned. *) | ||
|
||
val resume_all : t -> unit | ||
(** [resume_all t] calls all non-cancelled callbacks attached to [t], | ||
in the order in which they were suspended. | ||
This function is lock-free and can be used safely even from a signal handler or GC finalizer. *) | ||
|
||
val cancel : request -> bool | ||
(** [cancel request] attempts to remove a pending request. | ||
It returns [true] if the request was cancelled, or [false] if it got | ||
resumed before that could happen. *) | ||
|
||
val dump : Format.formatter -> t -> unit | ||
(** Display the internal state of a queue, for debugging. *) |
Oops, something went wrong.