Skip to content

Commit

Permalink
Add more Eio.Condition documentation
Browse files Browse the repository at this point in the history
This adds a discussion of conditions to the README and provides examples
using them to handle signals.
  • Loading branch information
talex5 committed Feb 6, 2023
1 parent 38c337e commit 3a6505f
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 4 deletions.
199 changes: 195 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ Eio replaces existing concurrency libraries such as Lwt
* [Example: Concurrent Cache](#example-concurrent-cache)
* [Streams](#streams)
* [Example: Worker Pool](#example-worker-pool)
* [The Rest: Mutex, Semaphore and Condition](#the-rest-mutex-semaphore-and-condition)
* [Mutexes and Semaphores](#mutexes-and-semaphores)
* [Conditions](#conditions)
* [Example: Signal handlers](#example-signal-handlers)
* [Design Note: Determinism](#design-note-determinism)
* [Provider Interfaces](#provider-interfaces)
* [Example Applications](#example-applications)
Expand Down Expand Up @@ -1215,16 +1217,15 @@ The `Fiber.check ()` checks whether the worker itself has been cancelled, and ex
It's not actually necessary in this case,
because if we continue instead then the following `Stream.take` will perform the check anyway.

### The Rest: Mutex, Semaphore and Condition
### Mutexes and Semaphores

Eio also provides `Mutex`, `Semaphore` and `Condition` sub-modules.
Eio also provides `Mutex` and `Semaphore` sub-modules.
Each of these corresponds to the module with the same name in the OCaml standard library,
but allows other fibers to run while waiting instead of blocking the whole domain.
They are all safe to use in parallel from multiple domains.

- [Eio.Mutex][] provides *mutual exclusion*, so that only one fiber can access a resource at a time.
- [Eio.Semaphore][] generalises this to allow up to *n* fibers to access a resource at once.
- [Eio.Condition][] allows a fiber to wait until some condition is true.

For example, if we allow loading and saving data in a file there could be a problem
if we try to load the data while a save is in progress.
Expand Down Expand Up @@ -1291,6 +1292,196 @@ let release () =
decr in_use
```

### Conditions

[Eio.Condition][] allows a fiber to wait until some condition is true.
For example:

```ocaml
module X = struct
(* Note: this version is not safe to share across domains! *)
type t = {
mutable x : int;
changed : Eio.Condition.t;
}
let make x = { x; changed = Eio.Condition.create () }
let await_zero t =
while t.x <> 0 do Eio.Condition.await_no_mutex t.changed done;
traceln "x is now zero"
let set t x =
t.x <- x;
Eio.Condition.broadcast t.changed;
traceln "x set to %d" x
end
```

```ocaml
# Eio_mock.Backend.run @@ fun () ->
let x = X.make 5 in
Fiber.both
(fun () ->
traceln "Waiting for x to be 0";
X.await_zero x
)
(fun () -> X.set x 0);;
+Waiting for x to be 0
+x set to 0
+x is now zero
- : unit = ()
```

Note that we need a loop in `await_zero`.
This is needed because it's possible that another fiber might set it to zero
and then set it to something else before the waiting fiber resumes.

The above version is not safe to share across domains, because `await_zero` relies on the value of `x` not changing
after `x` is read but before `await_no_mutex` registers itself with the condition.
Here's a domain-safe version:

```ocaml
module Y = struct
(* Safe to share between domains. *)
type t = {
mutable y : int;
mutex : Eio.Mutex.t;
changed : Eio.Condition.t;
}
let make y = {
y;
mutex = Eio.Mutex.create ();
changed = Eio.Condition.create ();
}
let await_zero t =
Eio.Mutex.use_ro t.mutex (fun () ->
while t.y <> 0 do Eio.Condition.await t.changed t.mutex done;
traceln "y is now zero (at least until we release the mutex)"
)
let set t y =
Eio.Mutex.use_rw t.mutex ~protect:true (fun () ->
t.y <- y;
Eio.Condition.broadcast t.changed;
traceln "y set to %d" y
);
end
```

Here, `Eio.Condition.await` registers itself with `changed` and only then releases the mutex,
allowing other threads to change `y`. When it gets woken, it re-acquires the mutex.

```ocaml
# Eio_mock.Backend.run @@ fun () ->
let y = Y.make 5 in
Fiber.both
(fun () ->
traceln "Waiting for y to be 0";
Y.await_zero y
)
(fun () -> Y.set y 0);;
+Waiting for y to be 0
+y set to 0
+y is now zero (at least until we release the mutex)
- : unit = ()
```

Conditions are more difficult to use correctly than e.g. promises or streams.
In particular, it is easy to miss a notification due to `broadcast` getting called before `await`.
However, they can be useful if used carefully.

### Example: Signal handlers

On Unix-type systems, processes can react to *signals*.
For example, pressing Ctrl-C will send the `SIGINT` (interrupt) signal.

Here is an example function that allows itself to be interrupted:

```ocaml
let run_op ~interrupted =
Fiber.first
(fun () ->
Eio.Condition.await_no_mutex interrupted;
traceln "Cancelled at user's request."
)
(fun () ->
traceln "Running operation (Ctrl-C to cancel)...";
Fiber.await_cancel () (* Simulated work *)
)
```

Note that we don't need a mutex here.
We're just waiting for the number of interrupts received to change,
and, since that increases monotonically, once we get woken we always want to continue.
Also, we don't care about missing interrupts from before this operation started.

The code here is quite subtle.
We rely on the fact that the first branch of the `Fiber.first` runs first,
and only starts running the second branch once `await_no_mutex` has finished registering.
Thus, we never display the message telling the user to press Ctrl-C before we're ready
to receive it.
This isn't likely to matter if a human is responding to the message,
but if the response is automated then the delay could matter.

To run this function, we need to install a signal handler.
There are very few things that you can do safely in a signal handler.
For example, you can't take a mutex in a signal handler
because the signal might have interrupted a fiber that had already locked it.
However, you can safely call `Eio.Condition.broadcast`:

<!-- $MDX non-deterministic=command -->
```ocaml
# Eio_main.run @@ fun _env ->
let interrupted = Eio.Condition.create () in
let handle_signal (_signum : int) =
(* Warning: we're in a signal handler now.
Most operations are unsafe here, except for Eio.Condition.broadcast! *)
Eio.Condition.broadcast interrupted
in
Sys.set_signal Sys.sigint (Signal_handle handle_signal);
run_op ~interrupted;;
+Running operation (Ctrl-C to cancel)...
[ user presses Ctrl-C here ]
+Cancelled at user's request.
- : unit = ()
```

Another common pattern when using signals is using `SIGHUP`
to tell an application to reload its configuration file:

<!-- $MDX file=examples/signals/main.ml,part=main -->
```ocaml
let main ~config_changed =
while true do
Fiber.both
(fun () ->
(* First, we start waiting for SIGHUP.
This is so that if we get SIGHUP before we finish loading
the old configuration then we'll start again. *)
Eio.Condition.await_no_mutex config_changed;
traceln "Received SIGHUP";
(* We could cancel the loading fiber now, in case it's still running,
but in this example we just wait for it to finish by itself. *)
)
(fun () ->
traceln "Reading configuration ('kill -SIGHUP %d' to reload)..." (Unix.getpid ());
load_config ();
traceln "Finished reading configuration";
)
done
```

Unlike the cancellation case above, where we used `Fiber.first`,
here we use `Fiber.both` to wait until we have both read the previous version of the configuration
*and* received a request to reload, then we loop and read it again.

See the `examples/signals` directory for the full code.

## Design Note: Determinism

Within a domain, fibers are scheduled deterministically.
Expand Down
3 changes: 3 additions & 0 deletions examples/signals/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
(executable
(name main)
(libraries eio_main))
39 changes: 39 additions & 0 deletions examples/signals/main.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
open Eio.Std

let load_config () =
(* A real system would load the file and then pass it to the running service
somehow, but we're just demonstrating signal handling so just sleep to
simulate some time taken to load the new configuration. *)
Eio_unix.sleep 2.0

(* $MDX part-begin=main *)
let main ~config_changed =
while true do
Fiber.both
(fun () ->
(* First, we start waiting for SIGHUP.
This is so that if we get SIGHUP before we finish loading
the old configuration then we'll start again. *)
Eio.Condition.await_no_mutex config_changed;
traceln "Received SIGHUP";
(* We could cancel the loading fiber now, in case it's still running,
but in this example we just wait for it to finish by itself. *)
)
(fun () ->
traceln "Reading configuration ('kill -SIGHUP %d' to reload)..." (Unix.getpid ());
load_config ();
traceln "Finished reading configuration";
)
done
(* $MDX part-end *)

let () =
Eio_main.run @@ fun _env ->
let config_changed = Eio.Condition.create () in
let handle_signal (_signum : int) =
(* Warning: we're in a signal handler now.
Most operations are unsafe here, except for Eio.Condition.broadcast! *)
Eio.Condition.broadcast config_changed
in
Sys.set_signal Sys.sighup (Signal_handle handle_signal);
main ~config_changed

0 comments on commit 3a6505f

Please sign in to comment.