-
Notifications
You must be signed in to change notification settings - Fork 71
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This is similar to `Lwt_pool`.
- Loading branch information
Showing
7 changed files
with
449 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
(* A pool is a sequence of cells containing either available slots or consumers waiting for them. | ||
A slot may or may not contain an actual resource. | ||
To use a resource: | ||
1. Get the next "suspend" cell. If it contains a resource slot, use it. | ||
2. If no slot is ready and we're below capacity, create a new slot and add it (to the next resume cell). | ||
3. Either way, wait for the cell to be resumed with a slot. | ||
4. Once you have a slot, ensure it contains a resource, creating one if not. | ||
5. When done, add the slot back (in the next resume cell). | ||
*) | ||
|
||
(* Import these directly because we copy this file for the dscheck tests. *) | ||
module Fiber_context = Eio__core.Private.Fiber_context | ||
module Suspend = Eio__core.Private.Suspend | ||
|
||
type 'a slot = 'a option ref | ||
|
||
module Cell = struct | ||
(* The possible behaviours are: | ||
1. Suspender : In_transition -> Request Suspender waits for a resource | ||
1.1. Resumer : Request -> Finished Resumer then providers a resource | ||
1.2. Suspender : Request -> Finished Suspender cancels | ||
2. Resumer : In_transition -> Resource Resumer provides a spare resource | ||
2.1. Suspender : Resource -> Finished Suspender doesn't need to wait | ||
*) | ||
|
||
type 'a t = | ||
| In_transition | ||
| Request of ('a slot -> unit) | ||
| Resource of 'a slot | ||
| Finished | ||
|
||
let init = In_transition | ||
|
||
let segment_order = 2 | ||
|
||
let dump f = function | ||
| In_transition -> Fmt.string f "In_transition" | ||
| Request _ -> Fmt.string f "Request" | ||
| Resource _ -> Fmt.string f "Resource" | ||
| Finished -> Fmt.string f "Finished" | ||
end | ||
|
||
module Q = Cells.Make(Cell) | ||
|
||
type 'a t = { | ||
slots : int Atomic.t; (* Total resources, available and in use *) | ||
max_slots : int; | ||
alloc : unit -> 'a; | ||
validate : 'a -> bool; | ||
dispose : 'a -> unit; | ||
q : 'a Q.t; | ||
} | ||
|
||
let create ?(validate=Fun.const true) ?(dispose=ignore) max_size alloc = | ||
if max_size <= 0 then invalid_arg "Pool.create: max_size is <= 0"; | ||
{ | ||
slots = Atomic.make 0; | ||
max_slots = max_size; | ||
alloc; | ||
validate; | ||
dispose; | ||
q = Q.make (); | ||
} | ||
|
||
(* [add t x] adds [x] to the queue of available slots. *) | ||
let rec add t x = | ||
let cell = Q.next_resume t.q in | ||
let rec aux () = | ||
match Atomic.get cell with | ||
| In_transition -> if not (Atomic.compare_and_set cell In_transition (Resource x)) then aux () | ||
| Finished -> add t x (* The consumer cancelled. Get another cell and retry. *) | ||
| Request r as prev -> | ||
if Atomic.compare_and_set cell prev Finished then ( | ||
r x (* We had a consumer waiting. Give it to them. *) | ||
) else add t x (* Consumer cancelled; retry with another cell. *) | ||
| Resource _ -> assert false (* Can't happen; only a resumer can set this, and we're the resumer. *) | ||
in | ||
aux () | ||
|
||
(* Try to cancel by transitioning from [Request] to [Finished]. | ||
This can only be called after previously transitioning to [Request]. *) | ||
let cancel segment cell = | ||
match Atomic.exchange cell Cell.Finished with | ||
| Request _ -> Q.cancel_cell segment; true | ||
| Finished -> false (* Already resumed; reject cancellation *) | ||
| In_transition | Resource _ -> assert false (* Can't get here from [Request]. *) | ||
|
||
(* If [t] is under capacity, add another (empty) slot. *) | ||
let rec maybe_add_slot t = | ||
let current = Atomic.get t.slots in | ||
if current < t.max_slots then ( | ||
if Atomic.compare_and_set t.slots current (current + 1) then add t (ref None) | ||
else maybe_add_slot t (* Concurrent update; try again *) | ||
) | ||
|
||
(* [run_with t f slot] ensures that [slot] contains a valid resource and then runs [f resource] with it. | ||
Afterwards, the slot is returned to [t]. *) | ||
let run_with t f slot = | ||
match | ||
begin match !slot with | ||
| Some x when t.validate x -> f x | ||
| Some x -> | ||
slot := None; | ||
t.dispose x; | ||
let x = t.alloc () in | ||
slot := Some x; | ||
f x | ||
| None -> | ||
let x = t.alloc () in | ||
slot := Some x; | ||
f x | ||
end | ||
with | ||
| r -> | ||
add t slot; | ||
r | ||
| exception ex -> | ||
let bt = Printexc.get_raw_backtrace () in | ||
add t slot; | ||
Printexc.raise_with_backtrace ex bt | ||
|
||
let use t f = | ||
let segment, cell = Q.next_suspend t.q in | ||
match Atomic.get cell with | ||
| Finished | Request _ -> assert false | ||
| Resource slot -> | ||
Atomic.set cell Finished; (* Allow value to be GC'd *) | ||
run_with t f slot | ||
| In_transition -> | ||
(* It would have been better if more resources were available. | ||
If we still have capacity, add a new slot now. *) | ||
maybe_add_slot t; | ||
(* No item is available right now. Start waiting *) | ||
let slot = | ||
Suspend.enter_unchecked (fun ctx enqueue -> | ||
let r x = enqueue (Ok x) in | ||
if Atomic.compare_and_set cell In_transition (Request r) then ( | ||
match Fiber_context.get_error ctx with | ||
| Some ex -> | ||
if cancel segment cell then enqueue (Error ex); | ||
(* else being resumed *) | ||
| None -> | ||
Fiber_context.set_cancel_fn ctx (fun ex -> | ||
if cancel segment cell then enqueue (Error ex) | ||
(* else being resumed *) | ||
) | ||
) else ( | ||
match Atomic.exchange cell Finished with | ||
| Resource x -> enqueue (Ok x) | ||
| _ -> assert false | ||
); | ||
) | ||
in | ||
(* assert (Atomic.get cell = Finished); *) | ||
run_with t f slot |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
(** This is useful to manage a collection of resources where creating new ones is expensive | ||
and so you want to reuse them where possible. | ||
Example: | ||
{[ | ||
let buffer_pool = Eio.Pool.create 10 (fun () -> Bytes.create 1024) in | ||
Eio.Pool.use buffer_pool (fun buf -> ...) | ||
]} | ||
Note: If you just need to limit how many resources are in use, it is simpler to use {!Eio.Semaphore} instead. | ||
*) | ||
|
||
type 'a t | ||
|
||
val create : | ||
?validate:('a -> bool) -> | ||
?dispose:('a -> unit) -> | ||
int -> | ||
(unit -> 'a) -> | ||
'a t | ||
(** [create n alloc] is a fresh pool which allows up to [n] resources to be live at a time. | ||
It uses [alloc] to create new resources as needed. | ||
If [alloc] raises an exception then that use fails, but future calls to {!use} will retry. | ||
The [alloc] function is called in the context of the fiber trying to use the pool. | ||
If the pool is shared between domains and the resources are attached to a switch, this | ||
might cause trouble (since switches can't be shared between domains). | ||
You might therefore want to make [alloc] request a resource from the main domain rather than creating one itself. | ||
You should also take care about handling cancellation in [alloc], since resources are typically | ||
attached to a switch with the lifetime of the pool, meaning that if [alloc] fails then they won't | ||
be freed automatically until the pool itself is finished. | ||
@param validate If given, this is used to check each resource before using it. | ||
If it returns [false], the pool removes it with [dispose] and then allocates a fresh resource. | ||
@param dispose Used to free resources rejected by [validate]. | ||
If it raises, the exception is passed on to the user, | ||
but resource is still considered to have been disposed. *) | ||
|
||
val use : 'a t -> ('a -> 'b) -> 'b | ||
(** [use t fn] waits for some resource [x] to be available and then runs [f x]. | ||
Afterwards (on success or error), [x] is returned to the pool. *) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
module T = Pool | ||
module Cancel = Eio__core.Cancel | ||
|
||
exception Abort | ||
|
||
(* [clients] threads try to use a pool of size [n]. | ||
If [cancel] is set, they also try to cancel, and accept | ||
that as success too. *) | ||
let test ~n ~clients ~cancel () = | ||
let t = T.create n (fun () -> ()) in | ||
let used = Atomic.make 0 in | ||
let finished = ref 0 in | ||
for _ = 1 to clients do | ||
Atomic.spawn (fun () -> | ||
let ctx = | ||
Fake_sched.run @@ fun () -> | ||
try | ||
T.use t (fun () -> Atomic.incr used); | ||
incr finished; | ||
with Cancel.Cancelled Abort -> | ||
incr finished; | ||
in | ||
if cancel then | ||
Option.iter (fun c -> Cancel.cancel c Abort) ctx | ||
) | ||
done; | ||
Atomic.final (fun () -> | ||
if not cancel then Atomic.check (fun () -> Atomic.get used = clients); | ||
Atomic.check (fun () -> !finished = clients); | ||
) | ||
|
||
let () = | ||
Atomic.trace (test ~n:1 ~clients:2 ~cancel:false); | ||
Atomic.trace (test ~n:1 ~clients:2 ~cancel:true) |
Oops, something went wrong.