@@ -5,13 +5,25 @@ import fiberRuntime.suspend
55import scala .concurrent .ExecutionContext
66import Async .{Listener , await }
77
8+ /** A common interface for channels */
9+ trait Channel [T ]:
10+ def read ()(using Async ): T
11+ def send (x : T )(using Async ): Unit
12+ def close (): Unit
13+
14+ class ChannelClosedException extends Exception
15+
816/** An unbounded asynchronous channel. Senders do not wait for matching
917 * readers.
1018 */
11- class UnboundedChannel [T ] extends Async .OriginalSource [T ]:
19+ class AsyncChannel [T ] extends Async .OriginalSource [ T ], Channel [T ]:
1220
1321 private val pending = ListBuffer [T ]()
1422 private val waiting = mutable.Set [Listener [T ]]()
23+ private var isClosed = false
24+
25+ private def ensureOpen () =
26+ if isClosed then throw ChannelClosedException ()
1527
1628 private def drainWaiting (x : T ): Boolean =
1729 waiting.iterator.find(_(x)) match
@@ -30,11 +42,13 @@ class UnboundedChannel[T] extends Async.OriginalSource[T]:
3042 def read ()(using Async ): T = synchronized :
3143 await(this )
3244
33- def send (x : T ): Unit = synchronized :
45+ def send (x : T )(using Async ): Unit = synchronized :
46+ ensureOpen()
3447 val sent = pending.isEmpty && drainWaiting(x)
3548 if ! sent then pending += x
3649
3750 def poll (k : Listener [T ]): Boolean = synchronized :
51+ ensureOpen()
3852 drainPending(k)
3953
4054 def addListener (k : Listener [T ]): Unit = synchronized :
@@ -43,7 +57,10 @@ class UnboundedChannel[T] extends Async.OriginalSource[T]:
4357 def dropListener (k : Listener [T ]): Unit = synchronized :
4458 waiting -= k
4559
46- end UnboundedChannel
60+ def close () =
61+ isClosed = true
62+
63+ end AsyncChannel
4764
4865/** An unbuffered, synchronous channel. Senders and readers both block
4966 * until a communication between them happens. The channel provides two
@@ -52,7 +69,7 @@ end UnboundedChannel
5269 * waiting sender the data is transmitted directly. Otherwise we add
5370 * the operation to the corresponding pending set.
5471 */
55- trait SyncChannel [T ]:
72+ trait SyncChannel [T ] extends Channel [ T ] :
5673
5774 val canRead : Async .Source [T ]
5875 val canSend : Async .Source [Listener [T ]]
@@ -67,8 +84,13 @@ object SyncChannel:
6784
6885 private val pendingReads = mutable.Set [Listener [T ]]()
6986 private val pendingSends = mutable.Set [Listener [Listener [T ]]]()
87+ private var isClosed = false
88+
89+ private def ensureOpen () =
90+ if isClosed then throw ChannelClosedException ()
7091
7192 private def link [T ](pending : mutable.Set [T ], op : T => Boolean ): Boolean =
93+ ensureOpen()
7294 // Since sources are filterable, we have to match all pending readers or writers
7395 // against the incoming request
7496 pending.iterator.find(op) match
@@ -95,6 +117,9 @@ object SyncChannel:
95117 def dropListener (k : Listener [Listener [T ]]): Unit = synchronized :
96118 pendingSends -= k
97119
120+ def close () =
121+ isClosed = true
122+
98123end SyncChannel
99124
100125def TestChannel (using ExecutionContext ) =
0 commit comments