From 01c009fa81332db003e7e58ca98767244f3d5764 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Wed, 8 Nov 2023 13:36:57 +0000 Subject: [PATCH] Allow closing synchronous streams This isn't currently exposed in the public interface. --- lib_eio/stream.ml | 2 +- lib_eio/sync.ml | 207 +++++++++++++++++++++-------- lib_eio/sync.mli | 22 ++- lib_eio/tests/dscheck/test_sync.ml | 63 ++++++++- lib_eio/tests/sync.md | 65 ++++++++- 5 files changed, 293 insertions(+), 66 deletions(-) diff --git a/lib_eio/stream.ml b/lib_eio/stream.ml index fe179476e..6c41114ce 100644 --- a/lib_eio/stream.ml +++ b/lib_eio/stream.ml @@ -120,7 +120,7 @@ let add t v = | Locking x -> Locking.add x v let take = function - | Sync x -> Sync.take x + | Sync x -> Sync.take x |> Option.get (* todo: allow closing streams *) | Locking x -> Locking.take x let take_nonblocking = function diff --git a/lib_eio/sync.ml b/lib_eio/sync.ml index ee78c62a7..a658e98df 100644 --- a/lib_eio/sync.ml +++ b/lib_eio/sync.ml @@ -123,6 +123,12 @@ the consumer sets its cell to a request with a dummy callback that rejects all values and continues immediately. + Close + + The LSB of the balance atomic is used to indicate that the stream has been closed. + When closed, the balance is always zero and no new consumers or producers can be added. + The closing thread is responsible for cancelling all pre-existing users. + The exchange Once a producer and consumer have been paired off (and so their cell is now Finished), @@ -137,9 +143,14 @@ module Fiber_context = Eio__core.Private.Fiber_context module Suspend = Eio__core.Private.Suspend module Cancel = Eio__core.Cancel +type producer_result = + | Sent (* Consumer accepted item. *) + | Rejected (* Consumer rejected the item. Retry. *) + | Failed of exn (* Cancelled or closed. *) + type 'a item = { - v : 'a; - kp : (bool, exn) result -> unit; (* [Ok false] means consumer refused the item; retry. *) + v : 'a option; + kp : producer_result -> unit; cancel : [ | `Resuming (* In the process of resuming, so can't cancel. *) | `Suspended of (unit -> bool) (* Call this function to attempt to leave the queue. *) @@ -149,7 +160,7 @@ type 'a item = { type 'a cell = | In_transition - | Slot of ('a -> bool) + | Slot of ('a option -> bool) (* [None] if stream is closed *) | Item of 'a item | Finished @@ -169,8 +180,73 @@ end module Q = Cells.Make(Cell) +type update_result = + | Updated + | Update_refused + | Balance_closed + +module Balance : sig + type t + + val make : unit -> t + val close : t -> int option + + val get : t -> int option + (** [get t] is [None] if [t] is closed. Otherwise, it is + the number of items available (if non-negative) or the + number of consumers waiting for an item. *) + + val fetch_and_add : t -> int -> int option + (** [fetch_and_add t diff] increases the value by [diff] and returns the old value. *) + + val incr_if_negative : t -> update_result + val decr_if_positive : t -> update_result + + val pp : t Fmt.t +end = struct + type t = int Atomic.t + + let closed = 1 + let counter x = x asr 1 + let is_closed x = (x land 1) <> 0 + + let value x = + if is_closed x then None else Some (x asr 1) + + let fetch_and_add x diff = + value (Atomic.fetch_and_add x (diff lsl 1)) + + let rec decr_if_positive t = + let x = Atomic.get t in + if is_closed x then Balance_closed + else if counter x > 0 then ( + if Atomic.compare_and_set t x (x - 2) then Updated + else decr_if_positive t + ) else Update_refused + + let rec incr_if_negative t = + let x = Atomic.get t in + if is_closed x then Balance_closed + else if counter x < 0 then ( + if Atomic.compare_and_set t x (x + 2) then Updated + else incr_if_negative t + ) else Update_refused + + let make () = Atomic.make 0 + + let close t = + value (Atomic.exchange t closed) + + let get t = value (Atomic.get t) + + let pp f t = + match get t with + | Some x -> Fmt.int f x + | None -> Fmt.string f "(closed)" +end + type 'a t = { - balance : int Atomic.t; + balance : Balance.t; consumers : 'a Q.t; producers : 'a Q.t; } @@ -180,13 +256,14 @@ type 'a loc = | Long of ('a Q.segment * 'a Cell.t Atomic.t) (* Acting as suspender of cell; can cancel *) let dump f t = - Fmt.pf f "@[Sync (balance=%d)@,@[Consumers:@,%a@]@,@[Producers:@,%a@]@]" - (Atomic.get t.balance) + Fmt.pf f "@[Sync (balance=%a)@,@[Consumers:@,%a@]@,@[Producers:@,%a@]@]" + Balance.pp t.balance Q.dump t.consumers Q.dump t.producers (* Give [item] to consumer [kc]. [item]'s cell is now Finished. *) -let exchange item kc = item.kp (Ok (kc item.v)) +let exchange item kc = + item.kp (if kc item.v then Sent else Rejected) (* Add [value] to [cell]. If the cell is in transition, place [value] there and let the other party handle it later. @@ -209,20 +286,6 @@ let rec add_to_cell queue value cell = (* Cancelling *) -let rec decr_balance_if_positive t = - let cur = Atomic.get t.balance in - if cur > 0 then ( - if Atomic.compare_and_set t.balance cur (cur - 1) then true - else decr_balance_if_positive t - ) else false - -let rec incr_balance_if_negative t = - let cur = Atomic.get t.balance in - if cur < 0 then ( - if Atomic.compare_and_set t.balance cur (cur + 1) then true - else incr_balance_if_negative t - ) else false - (* Cancel [cell] on our suspend queue. This function works for both consumers and producers, as we can tell from the value what our role is (and if there isn't a value, we're finished anyway). @@ -232,7 +295,8 @@ let rec incr_balance_if_negative t = let cancel t (segment, cell) = let cancel2 update_balance ~old = if Atomic.compare_and_set cell old In_transition then ( - if update_balance t then ( + match update_balance t.balance with + | Updated -> (* At this point, we are committed to cancelling. *) begin match Atomic.exchange cell Finished with | Finished -> assert false @@ -241,7 +305,7 @@ let cancel t (segment, cell) = | Slot kc -> add_to_cell t.producers (Slot kc) (Q.next_resume t.producers) end; true - ) else ( + | Update_refused | Balance_closed -> (* We decided not to cancel. We know a resume is coming. *) if Atomic.compare_and_set cell In_transition old then false else ( @@ -253,13 +317,12 @@ let cancel t (segment, cell) = false | _ -> assert false ) - ) ) else false (* The peer resumed us first *) in match Atomic.get cell with | Finished -> false (* The peer resumed us first *) - | Slot _ as old -> cancel2 incr_balance_if_negative ~old (* We are a consumer *) - | Item _ as old -> cancel2 decr_balance_if_positive ~old (* We are a producer *) + | Slot _ as old -> cancel2 Balance.incr_if_negative ~old (* We are a consumer *) + | Item _ as old -> cancel2 Balance.decr_if_positive ~old (* We are a producer *) | In_transition -> (* Either we're initialising the cell, in which case we haven't told the application how to cancel this location yet, or we're already @@ -292,16 +355,20 @@ let rec producer_resume_cell t ~success ~in_transition cell = (* This is essentially the main [put] function, but parameterised so it can be shared with the rejoin-after-rejection case. *) -let producer_join (t : _ t) ~success ~suspend = - let old = Atomic.fetch_and_add t.balance (+1) in - if old < 0 then ( - let cell = Q.next_resume t.consumers in - producer_resume_cell t cell - ~success - ~in_transition:(fun cell -> suspend (Short cell)) - ) else ( - suspend (Long (Q.next_suspend t.producers)) - ) +let producer_join (t : _ t) ~success ~suspend ~closed = + match Balance.fetch_and_add t.balance (+1) with + | None -> closed () + | Some old -> + if old < 0 then ( + let cell = Q.next_resume t.consumers in + producer_resume_cell t cell + ~success + ~in_transition:(fun cell -> suspend (Short cell)) + ) else ( + suspend (Long (Q.next_suspend t.producers)) + ) + +let put_closed_err = Invalid_argument "Stream closed" (* Called when a consumer took our value but then rejected it. We start the put operation again, except that our fiber is already suspended @@ -310,6 +377,7 @@ let producer_join (t : _ t) ~success ~suspend = let put_already_suspended t request = producer_join t ~success:(exchange request) + ~closed:(fun () -> request.kp (Failed put_closed_err)) ~suspend:(fun loc -> let Short cell | Long (_, cell) = loc in add_to_cell t.consumers (Item request) cell; @@ -323,7 +391,7 @@ let put_already_suspended t request = (* We got cancelled after the peer removed our cell and before we updated the cancel function with the new location, or we were cancelled while doing a (non-cancellable) resume. Deal with it now. *) - if cancel t loc then request.kp (Error ex); + if cancel t loc then request.kp (Failed ex); (* else we got resumed first *) | _, Short _ -> (* We can't cancel while in the process of resuming a cell on the [consumers] queue. @@ -346,12 +414,12 @@ let put_suspend t v loc = | Long loc -> `Suspended (fun () -> cancel t loc) in let rec item = { - v; + v = Some v; cancel = Atomic.make cancel; kp = function - | Error _ as e -> enqueue e (* Cancelled by [put_already_suspended]. *) - | Ok true -> enqueue (Ok ()) (* Success! *) - | Ok false -> put_already_suspended t item (* Consumer rejected value. Restart. *) + | Failed e -> enqueue (Error e) + | Sent -> enqueue (Ok ()) (* Success! *) + | Rejected -> put_already_suspended t item (* Consumer rejected value. Restart. *) } in let Short cell | Long (_, cell) = loc in add_to_cell t.consumers (Item item) cell; @@ -368,8 +436,9 @@ let put_suspend t v loc = let rec put (t : _ t) v = producer_join t - ~success:(fun kc -> if kc v then () else put t v) + ~success:(fun kc -> if kc (Some v) then () else put t v) ~suspend:(put_suspend t v) + ~closed:(fun () -> raise put_closed_err) (* Taking. *) @@ -402,25 +471,30 @@ let take_suspend t loc = ) let take (t : _ t) = - let old = Atomic.fetch_and_add t.balance (-1) in - if old > 0 then ( - let cell = Q.next_resume t.producers in - consumer_resume_cell t cell - ~success:(fun item -> item.kp (Ok true); item.v) - ~in_transition:(fun cell -> take_suspend t (Short cell)) - ) else ( - take_suspend t (Long (Q.next_suspend t.consumers)) - ) + match Balance.fetch_and_add t.balance (-1) with + | None -> None + | Some old -> + if old > 0 then ( + let cell = Q.next_resume t.producers in + consumer_resume_cell t cell + ~success:(fun item -> item.kp Sent; item.v) + ~in_transition:(fun cell -> take_suspend t (Short cell)) + ) else ( + take_suspend t (Long (Q.next_suspend t.consumers)) + ) let reject = Slot (fun _ -> false) let take_nonblocking (t : _ t) = - if decr_balance_if_positive t then ( + match Balance.decr_if_positive t.balance with + | Balance_closed + | Update_refused -> None (* No waiting producers for us *) + | Updated -> let rec aux cell = consumer_resume_cell t cell ~success:(fun item -> - item.kp (Ok true); (* Always accept the item *) - Some item.v + item.kp Sent; (* Always accept the item *) + item.v ) ~in_transition:(fun cell -> (* Our producer is still in the process of writing its [Item], but @@ -434,7 +508,6 @@ let take_nonblocking (t : _ t) = else aux cell ) in aux (Q.next_resume t.producers) - ) else None (* No waiting producers for us *) (* Creation and status. *) @@ -442,7 +515,27 @@ let create () = { consumers = Q.make (); producers = Q.make (); - balance = Atomic.make 0; + balance = Balance.make (); } -let balance t = Atomic.get t.balance +let close t = + match Balance.close t.balance with + | None -> () (* Already closed *) + | Some old -> + if old > 0 then ( + (* Reject each waiting producer. They will try to restart and then discover the stream is closed. *) + for _ = 1 to old do + let cell = Q.next_resume t.producers in + add_to_cell t.consumers reject cell; + done + ) else ( + let reject_consumer = Item { v = None; kp = ignore; cancel = Atomic.make `Resuming } in + (* Reject each waiting consumer. *) + for _ = 1 to -old do + let cell = Q.next_resume t.consumers in + add_to_cell t.consumers reject_consumer cell + done + ) + +let balance t = + Balance.get t.balance diff --git a/lib_eio/sync.mli b/lib_eio/sync.mli index bb304b508..a9fbcab8c 100644 --- a/lib_eio/sync.mli +++ b/lib_eio/sync.mli @@ -18,14 +18,18 @@ val put : 'a t -> 'a -> unit If no consumer is available, it waits until one comes along and accepts [x]. Note: Producers are mostly handled fairly, in the order in which they arrive, - but consumers can cancel or reject values so this isn't guaranteed. *) + but consumers can cancel or reject values so this isn't guaranteed. -val take : 'a t -> 'a + @raise Invalid_argument if [t] was closed before [x] was added. *) + +val take : 'a t -> 'a option (** [take t] waits until a producer is available with an item and then returns it. Note: Consumers are mostly handled fairly, in the order in which they arrive, but producers can cancel so this isn't guaranteed if [t] is shared between - domains. *) + domains. + + Returns [None] if [t] was closed before an item was taken. *) val take_nonblocking : 'a t -> 'a option (** [take_nonblocking t] is like {!take}, but returns [None] if no producer is immediately available. @@ -37,7 +41,13 @@ val take_nonblocking : 'a t -> 'a option Since the producer reached the head of the queue while it was still joining, the queue is presumably very short in this case anyway. *) -val balance : 'a t -> int +val close : 'a t -> unit +(** [close t] prevents any further items from being added to [t]. + + Any consumers or producers that were waiting will receive an exception. + If [t] is already closed then this does nothing. *) + +val balance : 'a t -> int option (** [balance t] is the number of waiting producers minus the number of waiting consumers. If the balance is non-negative then it is the number of waiting producers. @@ -46,7 +56,9 @@ val balance : 'a t -> int If [t] is shared between domains then the value may already be out-of-date by the time this function returns, so this is mostly useful for debugging - or reporting metrics. *) + or reporting metrics. + + Returns [None] if [t] is closed. *) val dump : 'a t Fmt.t (** [dump] formats the internal state of a channel, for testing and debugging. *) diff --git a/lib_eio/tests/dscheck/test_sync.ml b/lib_eio/tests/dscheck/test_sync.ml index c42efca8c..acc22d2b8 100644 --- a/lib_eio/tests/dscheck/test_sync.ml +++ b/lib_eio/tests/dscheck/test_sync.ml @@ -23,7 +23,8 @@ let test ~prod ~cons ~take_nonblocking () = Fake_sched.run (fun () -> match T.take t with - | v -> + | None -> assert false + | Some v -> if debug then log "c%d: Recv %d" l v; received := !received + v | exception Eio__core.Cancel.Cancelled _ -> @@ -84,11 +85,69 @@ let test ~prod ~cons ~take_nonblocking () = assert (!finished_producers = prod); (* Everyone finishes by trying to cancel (if they didn't succeed immediately), so there shouldn't be any balance at the end. *) - assert (T.balance t = 0); + assert (T.balance t = Some 0); assert (!received = !expected_total); ) +(* A producer puts "A" and then closes the stream. + Two consumers try to read. One gets the "A", the other gets end-of-stream. *) +let test_close () = + let t = T.create () in + let got = ref [] in + Atomic.spawn + (fun () -> + let _ : Sync.Cancel.t option = Fake_sched.run (fun () -> T.put t "A"; T.close t) in + () + ); + for _ = 1 to 2 do + Atomic.spawn + (fun () -> + let _ : Sync.Cancel.t option = Fake_sched.run (fun () -> + let msg = T.take t |> Option.value ~default:"end-of-stream" in + got := msg :: !got + ) + in + () + ); + done; + Atomic.final (fun () -> + let results = List.sort String.compare !got in + if debug then ( + Fmt.pr "%a@." T.dump t; + Fmt.pr "%a@." Fmt.(Dump.list string) results; + ); + assert (results = ["A"; "end-of-stream"]); + assert (T.balance t = None); + ) + +(* A producer tries to add an item (but never succeeds, as there are no consumers. + At some point, the stream is closed and the operation aborts. *) +let test_close2 () = + let t = T.create () in + let result = ref "Waiting" in + Atomic.spawn + (fun () -> + let _ : Sync.Cancel.t option = Fake_sched.run (fun () -> + match T.put t "A" with + | () -> failwith "Shouldn't succeed with no consumer!" + | exception (Invalid_argument msg) -> result := msg + ) in + () + ); + Atomic.spawn (fun () -> T.close t); + Atomic.final (fun () -> + if debug then ( + Fmt.pr "%a@." T.dump t; + Fmt.pr "%s@." !result; + ); + match !result with + | "Stream closed" -> () + | x -> failwith x + ) + let () = Atomic.trace (test ~prod:1 ~cons:1 ~take_nonblocking:1); Atomic.trace (test ~prod:2 ~cons:1 ~take_nonblocking:0); Atomic.trace (test ~prod:1 ~cons:2 ~take_nonblocking:0); + Atomic.trace test_close; + Atomic.trace test_close2; diff --git a/lib_eio/tests/sync.md b/lib_eio/tests/sync.md index 63ebb1749..ea1412f7d 100644 --- a/lib_eio/tests/sync.md +++ b/lib_eio/tests/sync.md @@ -19,6 +19,7 @@ let put t v = match T.put t v with | () -> Fmt.pr "Sent %s@." v | exception (Eio.Cancel.Cancelled _) -> Fmt.pr "Send of %s was cancelled@." v + | exception (Invalid_argument msg) -> Fmt.pr "Error adding %s: %s@." v msg ) |> Option.map (fun ctx -> Fmt.pr "Waiting for a consumer for %s@." v; @@ -29,7 +30,8 @@ let take t label = Fake_sched.run (fun () -> match T.take t with - | v -> Fmt.pr "%s: Took %s@." label v + | None -> Fmt.pr "%s: Stream was closed@." label + | Some v -> Fmt.pr "%s: Took %s@." label v | exception (Eio.Cancel.Cancelled _) -> Fmt.pr "%s: Take cancelled@." label ) |> Option.map (fun ctx -> @@ -332,3 +334,64 @@ Sync (balance=0) End - : unit = () ``` + +## Closing + +Closing cancels any waiting producers: + +```ocaml +# let t : string T.t = T.create ();; +val t : string T.t = + +# put t "A" |> Option.get;; +Waiting for a consumer for A +- : Eio.Cancel.t = + +# T.close t;; +Error adding A: Stream closed +- : unit = () + +# show t;; +Sync (balance=(closed)) + Consumers: + Segment 0 (prev=None, pointers=2, cancelled=0): + In_transition (suspend) (resume) + In_transition + In_transition + In_transition + End + Producers: + Segment 0 (prev=None, pointers=2, cancelled=0): + Finished + In_transition (suspend) (resume) + In_transition + In_transition + End +- : unit = () + +# put t "B";; +Error adding B: Stream closed +- : Eio.Cancel.t option = None +``` + +Closing cancels any waiting consumers: + +```ocaml +# let t : string T.t = T.create ();; +val t : string T.t = + +# take t "A";; +A: Waiting for producer +- : Eio.Cancel.t option = Some + +# T.close t;; +A: Stream was closed +- : unit = () + +# take t "B";; +B: Stream was closed +- : Eio.Cancel.t option = None + +# T.take_nonblocking t;; +- : string option = None +```