@@ -80,11 +80,36 @@ let async pool f =
8080 Multi_channel. send pd.task_chan (Work (fun _ -> step (do_task f) p));
8181 p
8282
83+ let prepare_for_await chan _mode =
84+ (* Cancellation is not supported, so mode can be ignored. *)
85+ let promise = Atomic. make (Pending [] ) in
86+ let release () =
87+ match Atomic. get promise with
88+ | (Returned _ | Raised _ ) -> ()
89+ | Pending _ ->
90+ match Atomic. exchange promise (Returned () ) with
91+ | Pending ks ->
92+ ks
93+ |> List. iter @@ fun (k , c ) ->
94+ Multi_channel. send_foreign c (Work (fun _ -> continue k () ))
95+ | _ -> ()
96+ and await () =
97+ match Atomic. get promise with
98+ | (Returned _ | Raised _ ) -> ()
99+ | Pending _ -> perform (Wait (promise, chan))
100+ in
101+ Domain_local_await. { release; await }
102+
83103let rec worker task_chan =
84104 match Multi_channel. recv task_chan with
85105 | Quit -> Multi_channel. clear_local_state task_chan
86106 | Work f -> f () ; worker task_chan
87107
108+ let worker task_chan =
109+ Domain_local_await. using
110+ ~prepare_for_await: (prepare_for_await task_chan)
111+ ~while_running: (fun () -> worker task_chan)
112+
88113let run (type a ) pool (f : unit -> a ) : a =
89114 let pd = get_pool_data pool in
90115 let p = Atomic. make (Pending [] ) in
@@ -105,6 +130,11 @@ let run (type a) pool (f : unit -> a) : a =
105130 in
106131 loop ()
107132
133+ let run pool f =
134+ Domain_local_await. using
135+ ~prepare_for_await: (prepare_for_await (get_pool_data pool).task_chan)
136+ ~while_running: (fun () -> run pool f)
137+
108138let named_pools = Hashtbl. create 8
109139let named_pools_mutex = Mutex. create ()
110140
0 commit comments