forked from ocaml/dune
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfiber.mli
305 lines (231 loc) · 8.34 KB
/
fiber.mli
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
(** Concurrency library *)
open! Stdune
(** {1 Generals} *)
(** Type of fiber. A fiber represent a suspended computation. Note
that using the same fiber twice will execute it twice, which is
probably not what you want. To share the result of a fiber, use an
[Ivar.t]. *)
type 'a t
(** Create a fiber that has already terminated. *)
val return : 'a -> 'a t
(** Converts a thunk to a fiber, making sure the thunk runs in the context of the fiber
(rather than applied in the current context).
Equivalent to [(>>=) (return ())], but more explicit. *)
val of_thunk : (unit -> 'a t) -> 'a t
(** Fiber that never completes. *)
val never : 'a t
module O : sig
(** [>>>] is a sequencing operator. [a >>> b] is the fiber that
first executes [a] and then [b]. *)
val (>>>) : unit t -> 'a t -> 'a t
(** [>>=] is similar to [>>>] except that the result of the first
fiber is used to create the second one. *)
val (>>=) : 'a t -> ('a -> 'b t) -> 'b t
(** [t >>| f] is the same as [t >>= fun x -> return (f x)] but
slightly more efficient. *)
val (>>|) : 'a t -> ('a -> 'b) -> 'b t
val ( let* ) : 'a t -> ('a -> 'b t) -> 'b t
val ( let+ ) : 'a t -> ('a -> 'b) -> 'b t
end
val map : 'a t -> f:('a -> 'b) -> 'b t
val bind : 'a t -> f:('a -> 'b t) -> 'b t
(** {1 Forking execution} *)
module Future : sig
type 'a fiber
(** A future represent a promise that will eventually yield a
value. It is used to represent the result of a fiber running in
the background. *)
type 'a t
(** Wait for the given future to yield a value. *)
val wait : 'a t -> 'a fiber
(** Return [Some x] if [t] has already returned. *)
val peek : 'a t -> 'a option
end with type 'a fiber := 'a t
(** [fork f] creates a sub-fiber and return a [Future.t] to wait its result. *)
val fork : (unit -> 'a t) -> 'a Future.t t
(** [nfork l] is similar to [fork] but creates [n] sub-fibers. *)
val nfork : (unit -> 'a t) list -> 'a Future.t list t
(** [nfork_map l ~f] is the same as [nfork (List.map l ~f:(fun x () ->
f x))] but more efficient. *)
val nfork_map : 'a list -> f:('a -> 'b t) -> 'b Future.t list t
(** {1 Joining} *)
(** The following combinators are helpers to combine the result of
several fibers into one. Note that they do not introduce
parallelism. *)
val both : 'a t -> 'b t -> ('a * 'b) t
val all : 'a t list -> 'a list t
val all_unit : unit t list -> unit t
val map_all : 'a list -> f:('a -> 'b t) -> 'b list t
val map_all_unit : 'a list -> f:('a -> unit t) -> unit t
(** {1 Forking + joining} *)
(** The following functions combine forking 2 or more fibers followed
by joining the results. For every function, we give an equivalent
implementation using the more basic functions as
documentation. Note however that these functions are implemented as
primitives and so are more efficient that the suggested
implementation. *)
(** For two fibers and wait for their results:
{[
let fork_and_join f g =
fork f >>= fun a ->
fork g >>= fun b ->
both (Future.wait a) (Future.wait b)
]}
*)
val fork_and_join : (unit -> 'a t) -> (unit -> 'b t) -> ('a * 'b) t
(** Same but assume the first fiber returns [unit]:
{[
let fork_and_join_unit f g =
fork f >>= fun a ->
fork g >>= fun b ->
Future.wait a >>> Future.wait b
]}
*)
val fork_and_join_unit : (unit -> unit t) -> (unit -> 'a t) -> 'a t
(** Map a list in parallel:
{[
let parallel_map l ~f =
nfork_map l ~f >>= fun futures ->
all (List.map futures ~f:Future.wait)
]}
*)
val parallel_map : 'a list -> f:('a -> 'b t) -> 'b list t
(** Iter over a list in parallel:
{[
let parallel_iter l ~f =
nfork_map l ~f >>= fun futures ->
all_unit (List.map futures ~f:Future.wait)
]}
*)
val parallel_iter : 'a list -> f:('a -> unit t) -> unit t
(** {1 Execute once fibers} *)
module Once : sig
type 'a fiber = 'a t
type 'a t
val create : (unit -> 'a fiber) -> 'a t
(** [get t] returns the value of [t]. If [get] was never called
before on this [t], it is executed at this point, otherwise
returns a fiber that waits for the fiber from the first call to
[get t] to terminate. *)
val get : 'a t -> 'a fiber
(** [peek t] returns [Some v] if [get t] has already been called and
has yielded a value [v]. *)
val peek : 'a t -> 'a option
val peek_exn : 'a t -> 'a
end with type 'a fiber := 'a t
(** {1 Local storage} *)
(** Variables local to a fiber *)
module Var : sig
type 'a fiber = 'a t
type 'a t
(** Create a new variable *)
val create : unit -> 'a t
(** [get var] reads the value of [var]. *)
val get : 'a t -> 'a option
(** Same as [get] but raises if [var] is unset. *)
val get_exn : 'a t -> 'a
(** [set var value fiber] sets [var] to [value] during the execution
of [fiber].
For instance, the following fiber always evaluate to [true]:
{[
set v x (get_exn v >>| fun y -> x = y)
]}
*)
val set : 'a t -> 'a -> (unit -> 'b fiber) -> 'b fiber
val set_sync : 'a t -> 'a -> (unit -> 'b) -> 'b
val unset : 'a t -> (unit -> 'b fiber) -> 'b fiber
val unset_sync : 'a t -> (unit -> 'b) -> 'b
end with type 'a fiber := 'a t
(** {1 Error handling} *)
(** [with_error_handler f ~on_error] calls [on_error] for every
exception raised during the execution of [f]. This include
exceptions raised when calling [f ()] or during the execution of
fibers after [f ()] has returned. Exceptions raised by [on_error]
are passed on to the parent error handler.
It is guaranteed that after the fiber has returned a value,
[on_error] will never be called. *)
val with_error_handler
: (unit -> 'a t)
-> on_error:(Exn_with_backtrace.t -> unit)
-> 'a t
(** If [f ()] completes without raising, then [wait_errors f] is the same
as [f () >>| fun x -> Ok x]. However, if the execution of [f ()] is
aborted by an exception, then [wait_errors f] will complete and
yield [Error ()].
Note that [wait_errors] only completes after all sub-fibers have
completed. For instance, in the following code [wait_errors] will
only complete after 3s:
{[
wait_errors
(fun () ->
fork_and_join
(fun () -> sleep 1 >>| fun () -> raise Exit)
(fun () -> sleep 3))
]}
same for this code:
{[
wait_errors
(fun () ->
fork (fun () -> sleep 3) >>= fun _ ->
raise Exit)
]}
*)
val wait_errors : (unit -> 'a t) -> ('a, unit) Result.t t
(** [fold_errors f ~init ~on_error] calls [on_error] for every
exception raised during the execution of [f]. This include
exceptions raised when calling [f ()] or during the execution of
fibers after [f ()] has returned.
Exceptions raised by [on_error] are passed on to the parent error
handler. *)
val fold_errors
: (unit -> 'a t)
-> init:'b
-> on_error:(Exn_with_backtrace.t -> 'b -> 'b)
-> ('a, 'b) Result.t t
(** [collect_errors f] is:
{[
fold_errors f
~init:[]
~on_error:(fun e l -> e :: l)
]}
*)
val collect_errors
: (unit -> 'a t)
-> ('a, Exn_with_backtrace.t list) Result.t t
(** [finalize f ~finally] runs [finally] after [f ()] has terminated,
whether it fails or succeeds. *)
val finalize
: (unit -> 'a t)
-> finally:(unit -> unit t)
-> 'a t
(** {1 Synchronization} *)
(** Write once variables *)
module Ivar : sig
type 'a fiber = 'a t
(** A ivar is a synchronization variable that can be written only
once. *)
type 'a t
(** Create a new empty ivar. *)
val create : unit -> 'a t
(** Read the contents of the ivar. *)
val read : 'a t -> 'a fiber
(** Fill the ivar with the following value. This can only be called
once for a given ivar. *)
val fill : 'a t -> 'a -> unit fiber
(** Return [Some x] is [fill t x] has been called previously. *)
val peek : 'a t -> 'a option
end with type 'a fiber := 'a t
module Mutex : sig
type 'a fiber = 'a t
type t
val create : unit -> t
val with_lock : t -> (unit -> 'a fiber) -> 'a fiber
end with type 'a fiber := 'a t
(** {1 Running fibers} *)
(** Wait for one iteration of the scheduler *)
val yield : unit -> unit t
(** [run t] runs a fiber until it (and all the fibers it forked) terminate.
Returns the result if it's determined in the end, otherwise
raises [Never]. *)
val run : 'a t -> 'a
exception Never