Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Undesirable wake-up semantics in Stream.take #356

Closed
iitalics opened this issue Oct 24, 2022 · 1 comment · Fixed by #358
Closed

Undesirable wake-up semantics in Stream.take #356

iitalics opened this issue Oct 24, 2022 · 1 comment · Fixed by #358

Comments

@iitalics
Copy link

iitalics commented Oct 24, 2022

I'm still trying to figure out how to properly articulate this issue, but i believe the following semantics are undesirable:

open Eio

let main _env =
  let s1 = Stream.create 1 in
  let s2 = Stream.create 1 in
  let result =
    Switch.run @@ fun sw ->
    Fiber.fork ~sw (fun () -> Fiber.yield (); Stream.add s1 "A");
    Fiber.fork ~sw (fun () -> Fiber.yield (); Stream.add s2 "B");
    Fiber.first
      (fun () -> Stream.take s1)
      (fun () -> Stream.take s2)
  in
  traceln "result: %S" result;
  traceln "S1 length: %d" (Stream.length s1);
  traceln "S2 length: %d" (Stream.length s2)

let () = Eio_main.run main

output:

+result: "A"
+S1 length: 0
+S2 length: 0

despite Stream.take s1 winning the Fiber.first race, and presumably cancelling Stream.take s2, both streams wind up empty. it would be preferable for cancellation to prevent s2 from having its element removed, in other words should print:

+result: "A"
+S1 length: 0
+S2 length: 1

i believe this is because the implementation of stream takes a short cut where adding to an empty stream will pass the item immediately to the "waiter":

eio/lib_eio/stream.ml

Lines 42 to 44 in 63f5896

let add t item =
Mutex.lock t.mutex;
match Waiters.wake_one t.readers item with

this gives the reading fiber no ability to respond to a cancellation in time. a modified approach could be to have readers be unit Waiters.t such that this wake up is just a notification that there may be items in the queue, but the fiber still has to manually check the contents again. this check would be enqueued to the scheduler which means that there is an opportunity to cancel it.

im not sure if there is an argument to be made in favor of the current semantics, but to me this seems like there is serious potential to lose data when trying to use streams in conjunction with fiber cancellation

@talex5
Copy link
Collaborator

talex5 commented Oct 25, 2022

You might be interested in Reagents, which allow these kinds of atomic patterns reliably (but are not currently integrated with Eio).

Without something like that, it can't work in the general case (multiple domains), because there is always a period of time after the first Stream.take has finished but before its fiber has exited, during which the other Stream.take could succeed.

I've found it's often easier to have a single stream with multiple writers for cases like this.

e.g.

open Eio

let main _env =
  Switch.run @@ fun sw ->
  let s = Stream.create 1 in
  Fiber.fork ~sw (fun () -> Fiber.yield (); Stream.add s "A");
  Fiber.fork ~sw (fun () -> Fiber.yield (); Stream.add s "B");
  let result = Stream.take s in
  traceln "result: %S" result;
  traceln "S length: %d" (Stream.length s)

let () = Eio_main.run main

talex5 added a commit to talex5/eio that referenced this issue Oct 29, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
talex5 added a commit to talex5/eio that referenced this issue Oct 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants