@@ -4,43 +4,52 @@ import scala.collection.mutable
44import runtime .suspend
55import scala .util .boundary
66
7- /** The underlying configuration of an async block */
8- trait AsyncConfig :
9-
10- /** The cancellable async source underlying this async computation */
11- def root : Cancellable
12-
13- /** The scheduler for runnables defined in this async computation */
14- def scheduler : Scheduler
15-
16- object AsyncConfig :
17-
18- /** A toplevel async group with given scheduler and a synthetic root that
19- * ignores cancellation requests
20- */
21- given fromScheduler (using s : Scheduler ): AsyncConfig with
22- def root = Cancellable .empty
23- def scheduler = s
24-
25- end AsyncConfig
26-
27- /** A context that allows to suspend waiting for asynchronous data sources */
28- trait Async extends AsyncConfig :
7+ /** A context that allows to suspend waiting for asynchronous data sources
8+ */
9+ trait Async extends Async .Config :
2910
3011 /** Wait for completion of async source `src` and return the result */
3112 def await [T ](src : Async .Source [T ]): T
3213
3314object Async :
3415
35- /** A marker type for Source#CanFilter */
36- opaque type Yes = Unit
16+ /** The underlying configuration of an async block */
17+ trait Config :
18+
19+ /** The cancellable async source underlying this async computation */
20+ def root : Cancellable
21+
22+ /** The scheduler for runnables defined in this async computation */
23+ def scheduler : Scheduler
3724
25+ object Config :
26+
27+ /** A toplevel async group with given scheduler and a synthetic root that
28+ * ignores cancellation requests
29+ */
30+ given fromScheduler (using s : Scheduler ): AsyncConfig with
31+ def root = Cancellable .empty
32+ def scheduler = s
33+
34+ end Config
35+
36+ /** A possible implementation of Async. Defines an `await` method based
37+ * on a method to check for cancellation that needs to be implemented by
38+ * subclasses.
39+ *
40+ * @param root the root of the Async's config
41+ * @param scheduler the scheduler of the Async's config
42+ * @param label the label of the boundary that defines the representedd async block
43+ */
3844 abstract class Impl (val root : Cancellable , val scheduler : Scheduler )
39- (using boundary.Label [Unit ]) extends Async :
45+ (using label : boundary.Label [Unit ]) extends Async :
4046
4147 protected def checkCancellation (): Unit
4248
43- def await [T ](src : Async .Source [T ]): T =
49+ /** Await a source first by polling it, and, if that fails, by suspending
50+ * in a onComplete call.
51+ */
52+ def await [T ](src : Source [T ]): T =
4453 checkCancellation()
4554 var resultOpt : Option [T ] = None
4655 src.poll: x =>
@@ -76,7 +85,7 @@ object Async:
7685 /** A listener for values that are processed directly in an async block.
7786 * Closures of type `T => Boolean` can be SAM converted to this type.
7887 */
79- abstract case class FinalListener [T ]() extends Listener [T ]
88+ abstract case class FinalListener [T ](apply : T => Boolean ) extends Listener [T ]
8089
8190 /** An asynchronous data source. Sources can be persistent or ephemeral.
8291 * A persistent source will always pass same data to calls of `poll and `onComplete`.
@@ -86,10 +95,8 @@ object Async:
8695 */
8796 trait Source [+ T ]:
8897
89- type CanFilter
90-
9198 /** If data is available at present, pass it to function `k`
92- * and return the result if this call.
99+ * and return the result of this call.
93100 * `k` returns true iff the data was consumed in an async block.
94101 * Calls to `poll` are always synchronous.
95102 */
@@ -104,14 +111,13 @@ object Async:
104111
105112 /** Signal that listener `k` is dead (i.e. will always return `false` from now on).
106113 * This permits original, (i.e. non-derived) sources like futures or channels
107- * to drop the listener from their ` waiting` sets.
114+ * to drop the listener from their waiting sets.
108115 */
109116 def dropListener (k : Listener [T ]): Unit
110117
111118 end Source
112119
113- /** As source that transforms an original source in some way */
114-
120+ /** A source that transforms an original source in some way */
115121 abstract class DerivedSource [T , U ](val original : Source [T ]) extends Source [U ]:
116122
117123 /** Handle a value `x` passed to the original source by possibly
@@ -134,24 +140,18 @@ object Async:
134140 extension [T ](src : Source [T ])
135141
136142 /** Pass on data transformed by `f` */
137- def map [U ](f : T => U ): Source [U ] { type CanFilter = src. CanFilter } =
143+ def map [U ](f : T => U ): Source [U ] =
138144 new DerivedSource [T , U ](src):
139- type CanFilter = src.CanFilter
140145 def listen (x : T , k : Listener [U ]) = k(f(x))
141146
142- extension [T ](src : Source [T ] { type CanFilter = Yes })
143-
144147 /** Pass on only data matching the predicate `p` */
145- def filter (p : T => Boolean ): Source [T ] { type CanFilter = src. CanFilter } =
148+ def filter (p : T => Boolean ): Source [T ] =
146149 new DerivedSource [T , T ](src):
147- type CanFilter = src.CanFilter
148150 def listen (x : T , k : Listener [T ]) = p(x) && k(x)
149151
150-
151152 /** Pass first result from any of `sources` to the continuation */
152- def race [T , CF ](sources : Source [T ] { type CanFilter <: CF } * ): Source [T ] { type CanFilter <: CF } =
153+ def race [T ](sources : Source [T ]* ): Source [T ] =
153154 new Source [T ]:
154- type CanFilter <: CF
155155
156156 def poll (k : Listener [T ]): Boolean =
157157 val it = sources.iterator
@@ -185,11 +185,8 @@ object Async:
185185 /** If left (respectively, right) source succeeds with `x`, pass `Left(x)`,
186186 * (respectively, Right(x)) on to the continuation.
187187 */
188- def either [T , U , CF ](
189- src1 : Source [T ] { type CanFilter <: CF },
190- src2 : Source [U ] { type CanFilter <: CF })
191- : Source [Either [T , U ]] { type CanFilter <: CF } =
192- race[Either [T , U ], CF ](src1.map(Left (_)), src2.map(Right (_)))
188+ def either [T , U ](src1 : Source [T ], src2 : Source [U ]): Source [Either [T , U ]] =
189+ race[Either [T , U ]](src1.map(Left (_)), src2.map(Right (_)))
193190
194191end Async
195192
0 commit comments