Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Fiber.fork_seq #460

Merged
merged 2 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions lib_eio/core/eio__core.mli
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,36 @@ module Fiber : sig
This is just a convenience wrapper around {!fork}.
If [fn] raises an exception then the promise is resolved to the error, but [sw] is not failed. *)

val fork_seq : sw:Switch.t -> (('a -> unit) -> unit) -> 'a Seq.t
(** [fork_seq ~sw fn] creates (but does not start) a new fiber to run [fn yield].

Requesting the next item from the returned sequence resumes the fiber until it
calls [yield x], using [x] value as the next item in the sequence. If [fn]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this function, but is yield the best name for the iterator function within the closure? It's a little confusing since it's nothing like Fibre.yield really. Something like push or dispense (the Seq terminology) might be clearer as the function name in the example.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think yield is the standard term here (see https://en.wikipedia.org/wiki/Generator_(computer_programming)).

e.g. in Python:

def count():
    for i in range(1, 4):
        yield i

print(list(count()))

It's yield in JavaScript too: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Operators/yield

yield () is a bit like Fiber.yield () in that it suspends the caller. The difference is that yield is yielding to a particular consumer, who controls when it resumes, whereas Fiber.yield () immediately joins the run queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I'm dissatisfied that we have an entirely different terminology ("dispenser") in the Seq module, so we're not being consistent within the OCaml/eio documentation. And the use of yield being overloaded (both control flow surrender, and argument-driven resumption) in other languages is a perennial source of confusion, so I'm not sure that's a good guideline to follow.

I just wanted to voice my vague angst here; I don't think I have a clearer alternative apart from "dispense".

returns without producing a value then the result is {!Seq.Nil} (end-of-sequence).

The returned sequence can be consumed safely from another domain.
[fn] itself always runs in the domain that called [fork_seq].

Example:
{[
Switch.run @@ fun sw ->
let seq = Fiber.fork_seq ~sw (fun yield ->
for i = 1 to 3 do
traceln "Yielding %d" i;
yield i
done
) in
Seq.iter (traceln "Got: %d") seq
]}

If [fn] raises an exception then the consumer receives it.
If the consumer cancels while awaiting a value, the producer is cancelled when
it next calls [yield].
It is an error to request two items at once, or to request items out of sequence.
talex5 marked this conversation as resolved.
Show resolved Hide resolved

@param sw When the switch finishes, the fiber is cancelled (if still running).
Attempting to read from the sequence after this raises an exception. *)

val fork_daemon : sw:Switch.t -> (unit -> [`Stop_daemon]) -> unit
(** [fork_daemon] is like {!fork} except that instead of waiting for the fiber to finish,
the switch will cancel it once all non-daemon fibers are done.
Expand Down
107 changes: 107 additions & 0 deletions lib_eio/core/fiber.ml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
[@@@alert "-unstable"]

type _ Effect.t += Fork : Cancel.fiber_context * (unit -> unit) -> unit Effect.t

let yield () =
Expand Down Expand Up @@ -284,3 +286,108 @@ let with_binding var value fn =
let without_binding var fn =
let ctx = Effect.perform Cancel.Get_context in
Cancel.Fiber_context.with_vars ctx (Hmap.rem var ctx.vars) fn

(* Coroutines.

[fork_coroutine ~sw fn] creates a new fiber for [fn]. [fn] immediately suspends, setting its state to
[Ready enqueue]. A consumer can resume it by setting the state to [Running] and calling [enqueue],
while suspending itself. The consumer passes in its own [enqueue] function. They run alternatively
like this, switching between the [Ready] and [Running] states.

To finish, the coroutine fiber can set the state to [Finished] or [Failed],
or the client can set the state to [Client_cancelled].
*)

(* Note: we could easily generalise this to [('in, 'out) coroutine] if that was useful. *)
type 'out coroutine =
[ `Init
talex5 marked this conversation as resolved.
Show resolved Hide resolved
| `Ready of [`Running of 'out Suspend.enqueue] Suspend.enqueue
| `Running of 'out Suspend.enqueue
| `Finished
| `Client_cancelled of exn
| `Failed of exn ]

(* The only good reason for the state to change while the coroutine is running is if the client
cancels. Return the exception in that case. If the coroutine is buggy it might e.g. fork two
fibers and yield twice for a single request - return Invalid_argument in that case. *)
let unwrap_cancelled state =
match Atomic.get state with
| `Client_cancelled ex -> ex
| `Finished | `Failed _ -> Invalid_argument "Coroutine has already stopped!"
| `Ready _ -> Invalid_argument "Coroutine has already yielded!"
| `Init | `Running _ -> Invalid_argument "Coroutine in unexpected state!"

let run_coroutine ~state fn =
let await_request ~prev ~on_suspend =
(* Suspend and wait for the consumer to resume us: *)
Suspend.enter (fun ctx enqueue ->
let ready = `Ready enqueue in
if Atomic.compare_and_set state prev ready then (
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
if Atomic.compare_and_set state ready (`Failed ex) then
enqueue (Error ex);
(* else the client enqueued a resume for us; handle that instead *)
);
on_suspend ()
) else (
enqueue (Error (unwrap_cancelled state))
)
)
in
let current_state = ref (await_request ~prev:`Init ~on_suspend:ignore) in
fn (fun v ->
(* The coroutine wants to yield the value [v] and suspend. *)
let `Running enqueue as prev = !current_state in
current_state := await_request ~prev ~on_suspend:(fun () -> enqueue (Ok (Some v)))
);
(* [fn] has finished. End the stream. *)
if Atomic.compare_and_set state (!current_state :> _ coroutine) `Finished then (
let `Running enqueue = !current_state in
enqueue (Ok None)
) else (
raise (unwrap_cancelled state)
)

let fork_coroutine ~sw fn =
let state = Atomic.make `Init in
fork_daemon ~sw (fun () ->
try
run_coroutine ~state fn;
`Stop_daemon
with ex ->
match ex, Atomic.exchange state (`Failed ex) with
| _, `Running enqueue ->
(* A client is waiting for us. Send the error there. Also do this if we were cancelled. *)
enqueue (Error ex);
`Stop_daemon
| Cancel.Cancelled _, _ ->
(* The client isn't waiting (probably it got cancelled, then we tried to yield to it and got cancelled too).
If it tries to resume us later it will see the error. *)
`Stop_daemon
| _ ->
(* Something unexpected happened. Re-raise. *)
raise ex
);
fun () ->
Suspend.enter (fun ctx enqueue ->
let rec aux () =
match Atomic.get state with
| `Ready resume as prev ->
let running = `Running enqueue in
if Atomic.compare_and_set state prev running then (
resume (Ok running);
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
if Atomic.compare_and_set state running (`Client_cancelled ex) then
enqueue (Error ex)
)
) else aux ()
| `Finished -> enqueue (Error (Invalid_argument "Coroutine has already finished!"))
| `Failed ex | `Client_cancelled ex -> enqueue (Error (Invalid_argument ("Coroutine has already failed: " ^ Printexc.to_string ex)))
| `Running _ -> enqueue (Error (Invalid_argument "Coroutine is still running!"))
| `Init -> assert false
in
aux ()
)

let fork_seq ~sw fn =
Seq.of_dispenser (fork_coroutine ~sw fn)
200 changes: 200 additions & 0 deletions tests/fiber.md
Original file line number Diff line number Diff line change
Expand Up @@ -656,3 +656,203 @@ Values are inherited from the currently running fiber, rather than the switch.
+Key => 123
- : unit = ()
```

## fork_seq

The simple case where everything works:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Switch.run @@ fun sw ->
let seq =
Fiber.fork_seq ~sw (fun yield ->
traceln "Generator fiber starting";
for i = 1 to 3 do
traceln "Yielding %d" i;
yield i
done
)
in
traceln "Requesting 1st item";
match seq () with
| Nil -> assert false
| Cons (x, seq) ->
traceln "hd = %d" x;
traceln "Requesting remaining items";
List.of_seq seq;;
+Requesting 1st item
+Generator fiber starting
+Yielding 1
+hd = 1
+Requesting remaining items
+Yielding 2
+Yielding 3
- : int list = [2; 3]
```

The generator raises:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Switch.run @@ fun sw ->
let seq =
Fiber.fork_seq ~sw (fun yield ->
traceln "Generator fiber starting";
raise (Failure "Simulated error")
)
in
Eio.Cancel.protect (fun () -> (* (ensure we get the exception from the sequence) *)
traceln "Requesting an item";
try
ignore (seq ());
assert false
with ex -> traceln "Consumer got exception: %a" Fmt.exn ex
);;
+Requesting an item
+Generator fiber starting
+Consumer got exception: Failure("Simulated error")
- : unit = ()
```

The sequence is used after the switch is finished:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
let seq =
Switch.run (fun sw ->
Fiber.fork_seq ~sw (fun _yield -> assert false)
)
in
traceln "Requesting an item";
seq ();;
+Requesting an item
Exception:
Invalid_argument "Coroutine has already failed: Cancelled: Stdlib.Exit".
```

The sequence is used after the switch is finished, and the generator has started:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
let seq =
Switch.run (fun sw ->
let seq =
Fiber.fork_seq ~sw (fun yield ->
try yield 1
with ex -> traceln "Generator caught: %a" Fmt.exn ex; raise ex
)
in
traceln "Requesting an item";
match seq () with
| Nil -> assert false
| Cons (x, seq) ->
traceln "Got %d" x;
seq
)
in
traceln "Switch finished. Requesting another item...";
seq ();;
+Requesting an item
+Got 1
+Generator caught: Cancelled: Stdlib.Exit
+Switch finished. Requesting another item...
Exception:
Invalid_argument "Coroutine has already failed: Cancelled: Stdlib.Exit".
```

Using a sequence after it has finished normally:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Switch.run @@ fun sw ->
let seq = Fiber.fork_seq ~sw (fun yield -> yield 1; traceln "Generator done") in
let next = Seq.to_dispenser seq in
traceln "Got %a" Fmt.(Dump.option int) (next ());
traceln "Got %a" Fmt.(Dump.option int) (next ());
next ();;
+Got Some 1
+Generator done
+Got None
Exception: Invalid_argument "Coroutine has already finished!".
```

Trying to resume twice:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Switch.run @@ fun sw ->
let seq = Fiber.fork_seq ~sw (fun _yield -> Fiber.await_cancel ()) in
Fiber.both
(fun () -> ignore (seq ()))
(fun () -> ignore (seq ()));;
Exception: Invalid_argument "Coroutine is still running!".
```

Generator yields twice for a single request:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Switch.run @@ fun sw ->
let seq = Fiber.fork_seq ~sw (fun yield -> Fiber.both yield yield) in
seq ();;
Exception: Invalid_argument "Coroutine has already yielded!".
```

Yielding from a different fiber (note: end-of-sequence is still sent when the original fiber exits):

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Switch.run @@ fun sw ->
let seq = Fiber.fork_seq ~sw (fun yield ->
let p = Fiber.fork_promise ~sw (fun () -> Fiber.yield (); yield "Second fiber") in
Promise.await_exn p;
yield "Original fiber"
) in
List.of_seq seq;;
- : string list = ["Second fiber"; "Original fiber"]
```

The consumer cancels:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Switch.run @@ fun sw ->
let seq = Fiber.fork_seq ~sw (fun yield ->
traceln "Working...";
try
Fiber.yield ();
yield 1
with ex -> traceln "Generator caught: %a" Fmt.exn ex; raise ex
) in
Fiber.first
(fun () -> seq ())
(fun () -> Nil);;
+Working...
+Generator caught: Cancelled: Eio__core__Fiber.Not_first
- : int Seq.node = Seq.Nil
```

The generator is cancelled while queued to be resumed.
It runs, but cancels at the next opportunity:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Switch.run @@ fun sw ->
let seq = Fiber.fork_seq ~sw (fun yield ->
traceln "Working...";
try Fiber.check ()
with ex -> traceln "Generator caught: %a" Fmt.exn ex; raise ex
) in
traceln "Enqueue resume";
Fiber.both
(fun () -> ignore (seq () : _ Seq.node); assert false)
(fun () ->
traceln "Cancel generator";
Switch.fail sw Exit
)
+Enqueue resume
+Cancel generator
+Working...
+Generator caught: Cancelled: Stdlib.Exit
Exception: Stdlib.Exit.
```