@@ -32,29 +32,26 @@ trait Async extends AsyncConfig:
3232
3333object Async :
3434
35- abstract class AsyncImpl (val root : Cancellable , val scheduler : Scheduler )
35+ abstract class Impl (val root : Cancellable , val scheduler : Scheduler )
3636 (using boundary.Label [Unit ]) extends Async :
3737
3838 protected def checkCancellation (): Unit
3939
40- private var result : T
41-
4240 def await [T ](src : Async .Source [T ]): T =
4341 checkCancellation()
4442 var resultOpt : Option [T ] = None
45- if src.poll: x =>
46- result = x
43+ src.poll: x =>
44+ resultOpt = Some (x)
4745 true
48- then result
49- else
46+ resultOpt.getOrElse:
5047 try suspend[T , Unit ]: k =>
5148 src.onComplete: x =>
5249 scheduler.schedule: () =>
5350 k.resume(x)
5451 true // signals to `src` that result `x` was consumed
5552 finally checkCancellation()
5653
57- end AsyncImpl
54+ end Impl
5855
5956 /** The currently executing Async context */
6057 inline def current (using async : Async ): Async = async
@@ -78,11 +75,13 @@ object Async:
7875 */
7976 abstract case class FinalListener [T ]() extends Listener [T ]
8077
81- /** A source that cannot be mapped, filtered, or raced. In other words,
82- * an item coming from a direct source must be immediately consumed in
83- * another async computation; no rejection of this item is possible.
78+ /** An asynchronous data source. Sources can be persistent or ephemeral.
79+ * A persistent source will always pass same data to calls of `poll and `onComplete`.
80+ * An ephememral source can pass new data in every call.
81+ * An example of a persistent source is `Future`.
82+ * An example of an ephemeral source is `Channel`.
8483 */
85- trait DirectSource [+ T ]:
84+ trait Source [+ T ]:
8685
8786 /** If data is available at present, pass it to function `k`
8887 * and return the result if this call.
@@ -104,31 +103,29 @@ object Async:
104103 */
105104 def dropListener (k : Listener [T ]): Unit
106105
107- end DirectSource
106+ end Source
108107
109- /** An asynchronous data source. Sources can be persistent or ephemeral.
110- * A persistent source will always pass same data to calls of `poll and `onComplete`.
111- * An ephememral source can pass new data in every call.
112- * An example of a persistent source is `Future`.
113- * An example of an ephemeral source is `Channel`.
114- */
115- trait Source [+ T ] extends DirectSource [T ]:
108+ /** A source that can be mapped, filtered, or raced. Only ComposableSources
109+ * can pass `false` to the `Listener` in `poll` or `onComplete`. They do
110+ * that if the data is rejected by a filter or did not come first in a race.
111+ */
112+ trait ComposableSource [+ T ] extends Source [T ]:
116113
117114 /** Pass on data transformed by `f` */
118- def map [U ](f : T => U ): Source [U ] =
115+ def map [U ](f : T => U ): ComposableSource [U ] =
119116 new DerivedSource [T , U ](this ):
120117 def listen (x : T , k : Listener [U ]) = k(f(x))
121118
122119 /** Pass on only data matching the predicate `p` */
123- def filter (p : T => Boolean ): Source [T ] =
120+ def filter (p : T => Boolean ): ComposableSource [T ] =
124121 new DerivedSource [T , T ](this ):
125122 def listen (x : T , k : Listener [T ]) = p(x) && k(x)
126123
127- end Source
124+ end ComposableSource
128125
129126 /** As source that transforms an original source in some way */
130127
131- abstract class DerivedSource [T , U ](src : Source [T ]) extends Source [U ]:
128+ abstract class DerivedSource [T , U ](src : Source [T ]) extends ComposableSource [U ]:
132129
133130 /** Handle a value `x` passed to the original source by possibly
134131 * invokiong the continuation for this source.
@@ -148,41 +145,42 @@ object Async:
148145 end DerivedSource
149146
150147 /** Pass first result from any of `sources` to the continuation */
151- def race [T ](sources : Source [T ]* ): Source [T ] = new Source :
152-
153- def poll (k : Listener [T ]): Boolean =
154- val it = sources.iterator
155- var found = false
156- while it.hasNext && ! found do
157- it.next.poll: x =>
158- found = k(x)
159- found
160- found
161-
162- def onComplete (k : Listener [T ]): Unit =
163- val listener = new ForwardingListener [T ](this , k):
164- var foundBefore = false
165- def continueIfFirst (x : T ): Boolean = synchronized :
166- if foundBefore then false else { foundBefore = k(x); foundBefore }
167- def apply (x : T ): Boolean =
168- val found = continueIfFirst(x)
169- if found then sources.foreach(_.dropListener(this ))
170- found
171- sources.foreach(_.onComplete(listener))
172-
173- def dropListener (k : Listener [T ]): Unit =
174- val listener = new ForwardingListener [T ](this , k):
175- def apply (x : T ): Boolean = ???
176- // not to be called, we need the listener only for its
177- // hashcode and equality test.
178- sources.foreach(_.dropListener(listener))
148+ def race [T ](sources : ComposableSource [T ]* ): ComposableSource [T ] =
149+ new ComposableSource :
150+
151+ def poll (k : Listener [T ]): Boolean =
152+ val it = sources.iterator
153+ var found = false
154+ while it.hasNext && ! found do
155+ it.next.poll: x =>
156+ found = k(x)
157+ found
158+ found
159+
160+ def onComplete (k : Listener [T ]): Unit =
161+ val listener = new ForwardingListener [T ](this , k):
162+ var foundBefore = false
163+ def continueIfFirst (x : T ): Boolean = synchronized :
164+ if foundBefore then false else { foundBefore = k(x); foundBefore }
165+ def apply (x : T ): Boolean =
166+ val found = continueIfFirst(x)
167+ if found then sources.foreach(_.dropListener(this ))
168+ found
169+ sources.foreach(_.onComplete(listener))
170+
171+ def dropListener (k : Listener [T ]): Unit =
172+ val listener = new ForwardingListener [T ](this , k):
173+ def apply (x : T ): Boolean = ???
174+ // not to be called, we need the listener only for its
175+ // hashcode and equality test.
176+ sources.foreach(_.dropListener(listener))
179177
180178 end race
181179
182180 /** If left (respectively, right) source succeeds with `x`, pass `Left(x)`,
183181 * (respectively, Right(x)) on to the continuation.
184182 */
185- def either [T , U ](src1 : Source [T ], src2 : Source [U ]): Source [Either [T , U ]] =
183+ def either [T , U ](src1 : ComposableSource [T ], src2 : ComposableSource [U ]): ComposableSource [Either [T , U ]] =
186184 race[Either [T , U ]](src1.map(Left (_)), src2.map(Right (_)))
187185
188186end Async
0 commit comments