Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplemented Channel in terms of Queue #2856

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
561b9c3
Reimplemented `Channel` in terms of `Queue`
djspiewak Mar 26, 2022
8e45c91
Adjusted take race to avoid element loss on close
djspiewak Mar 26, 2022
dfabc5d
Updated to snapshot with `tryTakeN`
djspiewak Mar 26, 2022
831bd31
Merge branch 'main' into feature/non-primitive-channel
djspiewak Sep 29, 2022
4d1112b
Fixed `sendAll` upstream management for early closure
djspiewak Sep 29, 2022
4248a64
Scalafmt
djspiewak Sep 29, 2022
491186d
Upgraded to Scala 3.2.0
djspiewak Sep 29, 2022
76bfd1a
Merge branch 'main' into feature/non-primitive-channel
armanbilge Sep 30, 2022
1d6574f
Update core/shared/src/main/scala/fs2/concurrent/Channel.scala
djspiewak Sep 30, 2022
2fab62e
Merge branch 'main' into feature/non-primitive-channel
djspiewak Sep 30, 2022
3ff9ee2
Fixed race condition between offer, take, and close
djspiewak Oct 1, 2022
a60219b
Revert "Fixed race condition between offer, take, and close"
djspiewak Oct 1, 2022
58dfbb0
Fixed race condition between offer, take, and close
djspiewak Oct 1, 2022
26d832b
Fixed shadowing warning
djspiewak Oct 1, 2022
9e21329
Treat any topic bound over `Short.MaxValue` as unbounded
djspiewak Oct 1, 2022
6063707
Added a little extra doc
djspiewak Oct 1, 2022
0ea3d72
Beefed up tests and updated to fixed CE snapshot
djspiewak Oct 7, 2022
487d6a8
Reimplemented `Channel` with strictly less `racePair`
djspiewak Oct 8, 2022
e0c2c78
Improved efficiency of `stream` pulling
djspiewak Oct 9, 2022
ccebad8
Avoid being too slow on scalajs
djspiewak Oct 9, 2022
368bc4b
Merge branch 'main' into feature/non-primitive-channel
djspiewak Oct 22, 2022
16f8a8d
`close` needs to be atomic
djspiewak Oct 22, 2022
6ba1126
Reworked `loop` to use `Pull` directly; trying to diagnose
djspiewak Oct 25, 2022
349e3e3
Removed debugging logic and brought in CE snapshot with fix
djspiewak Oct 26, 2022
1dfd72d
Merge branch 'main' into feature/non-primitive-channel
djspiewak Oct 26, 2022
a39f26a
Removed stray `only`
djspiewak Oct 26, 2022
4a70080
Refactored two `Ref`s into one
djspiewak Oct 26, 2022
9c8e420
Eagerly fulfill `closed`
djspiewak Oct 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
libraryDependencies ++= Seq(
"org.typelevel" %%% "cats-core" % "2.8.0",
"org.typelevel" %%% "cats-laws" % "2.8.0" % Test,
"org.typelevel" %%% "cats-effect" % "3.4.0-RC2",
"org.typelevel" %%% "cats-effect-laws" % "3.4.0-RC2" % Test,
"org.typelevel" %%% "cats-effect-testkit" % "3.4.0-RC2" % Test,
"org.typelevel" %%% "cats-effect" % "3.4-7154d08",
"org.typelevel" %%% "cats-effect-laws" % "3.4-7154d08" % Test,
"org.typelevel" %%% "cats-effect-testkit" % "3.4-7154d08" % Test,
"org.scodec" %%% "scodec-bits" % "1.1.34",
"org.typelevel" %%% "scalacheck-effect-munit" % "2.0.0-M2" % Test,
"org.typelevel" %%% "munit-cats-effect" % "2.0.0-M3" % Test,
Expand Down
5 changes: 4 additions & 1 deletion core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2106,7 +2106,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
val action =
(
Semaphore[F2](concurrency.toLong),
Channel.bounded[F2, F2[Either[Throwable, O2]]](concurrency),
if (concurrency >= Short.MaxValue)
Channel.unbounded[F2, F2[Either[Throwable, O2]]]
else
Channel.bounded[F2, F2[Either[Throwable, O2]]](concurrency),
Deferred[F2, Unit],
Deferred[F2, Unit]
).mapN { (semaphore, channel, stop, end) =>
Expand Down
268 changes: 133 additions & 135 deletions core/shared/src/main/scala/fs2/concurrent/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
package fs2
package concurrent

import cats.Applicative
import cats.effect._
import cats.effect.implicits._
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._

/** Stream aware, multiple producer, single consumer closeable channel.
Expand Down Expand Up @@ -116,174 +118,170 @@ sealed trait Channel[F[_], A] {
/** Semantically blocks until the channel gets closed. */
def closed: F[Unit]
}

object Channel {
type Closed = Closed.type
object Closed

def unbounded[F[_]: Concurrent, A]: F[Channel[F, A]] =
bounded(Int.MaxValue)
Queue.unbounded[F, AnyRef].flatMap(impl(_))

def synchronous[F[_]: Concurrent, A]: F[Channel[F, A]] =
bounded(0)
Queue.synchronous[F, AnyRef].flatMap(impl(_))

def bounded[F[_], A](capacity: Int)(implicit F: Concurrent[F]): F[Channel[F, A]] = {
case class State(
values: List[A],
size: Int,
waiting: Option[Deferred[F, Unit]],
producers: List[(A, Deferred[F, Unit])],
closed: Boolean
)
require(capacity < Short.MaxValue)
Queue.bounded[F, AnyRef](capacity).flatMap(impl(_))
}

// used as a marker to wake up q.take when the channel is closed
private[this] val Sentinel = new AnyRef

private[this] val LeftClosed: Either[Channel.Closed, Unit] = Left(Channel.Closed)
private[this] val RightUnit: Either[Channel.Closed, Unit] = Right(())

val open = State(List.empty, 0, None, List.empty, closed = false)
private final case class State(leases: Int, closed: Boolean)

def empty(isClosed: Boolean): State =
if (isClosed) State(List.empty, 0, None, List.empty, closed = true)
else open
private object State {
val Empty: State = State(0, false)
}

(F.ref(open), F.deferred[Unit]).mapN { (state, closedGate) =>
// technically this should be A | Sentinel.type
// the queue will consist of exclusively As until we shut down, when there will be one Sentinel
private[this] def impl[F[_]: Concurrent, A](q: Queue[F, AnyRef]): F[Channel[F, A]] =
(Concurrent[F].ref(State.Empty), Concurrent[F].deferred[Unit]).mapN { (stateR, closedLatch) =>
new Channel[F, A] {

def sendAll: Pipe[F, A, Nothing] = { in =>
(in ++ Stream.exec(close.void))
.evalMap(send)
private[this] val LeftClosedF = LeftClosed.pure[F]
private[this] val FalseF = false.pure[F]

// might be interesting to try to optimize this more, but it needs support from CE
val sendAll: Pipe[F, A, Nothing] =
_.evalMapChunk(send(_))
.takeWhile(_.isRight)
.onComplete(Stream.exec(close.void))
.drain

// setting the flag means we won't accept any more sends
val close: F[Either[Channel.Closed, Unit]] = {
val modifyF = stateR.modify {
case State(0, false) =>
State(0, true) -> closedLatch.complete(()) *> q.offer(Sentinel).start.as(RightUnit)

case State(leases, false) =>
State(leases, true) -> closedLatch.complete(()).as(RightUnit)

case st @ State(_, true) =>
st -> LeftClosedF
}

modifyF.flatten.uncancelable
}

def send(a: A) =
F.deferred[Unit].flatMap { producer =>
F.uncancelable { poll =>
state.modify {
case s @ State(_, _, _, _, closed @ true) =>
(s, Channel.closed[Unit].pure[F])

case State(values, size, waiting, producers, closed @ false) =>
if (size < capacity)
(
State(a :: values, size + 1, None, producers, false),
notifyStream(waiting).as(rightUnit)
)
else
(
State(values, size, None, (a, producer) :: producers, false),
notifyStream(waiting).as(rightUnit) <* waitOnBound(producer, poll)
)
}.flatten
}
val isClosed: F[Boolean] = stateR.get.map(_.closed)

// there are four states to worry about: open, closing, draining, quiesced
// in the second state, we have outstanding blocked sends
// in the third state we have data in the queue but no sends
// in the fourth state we are completely drained and can shut down the stream
private[this] val isQuiesced: F[Boolean] =
stateR.get.flatMap {
case State(0, true) => q.size.map(_ == 0)
case _ => FalseF
}

def trySend(a: A) =
state.modify {
case s @ State(_, _, _, _, closed @ true) =>
(s, Channel.closed[Boolean].pure[F])

case s @ State(values, size, waiting, producers, closed @ false) =>
if (size < capacity)
(
State(a :: values, size + 1, None, producers, false),
notifyStream(waiting).as(rightTrue)
)
else
(s, rightFalse.pure[F])
}.flatten

def close =
state
.modify {
case s @ State(_, _, _, _, closed @ true) =>
(s, Channel.closed[Unit].pure[F])

case State(values, size, waiting, producers, closed @ false) =>
(
State(values, size, None, producers, true),
notifyStream(waiting).as(rightUnit) <* signalClosure
)
}
.flatten
.uncancelable
def send(a: A): F[Either[Channel.Closed, Unit]] =
MonadCancel[F].uncancelable { poll =>
// we track the outstanding blocked offers so we can distinguish closing from draining
// the very last blocked send, when closed, is responsible for triggering the sentinel

def isClosed = closedGate.tryGet.map(_.isDefined)
val modifyF = stateR.modify {
case st @ State(_, true) =>
st -> LeftClosedF

def closed = closedGate.get
case State(leases, false) =>
val cleanupF = {
val modifyF = stateR.modify {
case State(1, true) =>
State(0, true) -> q.offer(Sentinel).start.void

def stream = consumeLoop.stream
case State(leases, closed) =>
State(leases - 1, closed) -> Applicative[F].unit
}

def consumeLoop: Pull[F, A, Unit] =
Pull.eval {
F.deferred[Unit].flatMap { waiting =>
state
.modify { state =>
if (shouldEmit(state)) (empty(state.closed), state)
else (state.copy(waiting = waiting.some), state)
}
.flatMap {
case s @ State(
initValues,
stateSize,
ignorePreviousWaiting @ _,
producers,
closed
) =>
if (shouldEmit(s)) {
var size = stateSize
val tailValues = List.newBuilder[A]
var unblock = F.unit

producers.foreach { case (value, producer) =>
size += 1
tailValues += value
unblock = unblock <* producer.complete(())
}

val toEmit = makeChunk(initValues, tailValues.result(), size)

unblock.as(Pull.output(toEmit) >> consumeLoop)
} else {
F.pure(
if (closed) Pull.done
else Pull.eval(waiting.get) >> consumeLoop
)
}
modifyF.flatten
}
.uncancelable
}
}.flatten

def notifyStream(waitForChanges: Option[Deferred[F, Unit]]) =
waitForChanges.traverse(_.complete(()))
val offerF = poll(q.offer(a.asInstanceOf[AnyRef]).as(RightUnit))

def waitOnBound(producer: Deferred[F, Unit], poll: Poll[F]) =
poll(producer.get).onCancel {
state.update { s =>
s.copy(producers = s.producers.filter(_._2 ne producer))
State(leases + 1, false) -> offerF.guarantee(cleanupF).as(RightUnit)
}

modifyF.flatten
}

def trySend(a: A): F[Either[Channel.Closed, Boolean]] =
isClosed.flatMap { b =>
if (b)
LeftClosedF.asInstanceOf[F[Either[Channel.Closed, Boolean]]]
else
q.tryOffer(a.asInstanceOf[AnyRef]).map(_.asRight[Channel.Closed])
}

def signalClosure = closedGate.complete(())
val stream: Stream[F, A] = {
lazy val loop: Pull[F, A, Unit] = {
val pullF = q.tryTakeN(None).flatMap {
case Nil =>
// if we land here, it either means we're consuming faster than producing
// or it means we're actually closed and we need to shut down
// this is the unhappy path either way

val fallback = q.take.map { a =>
// if we get the sentinel, shut down all the things, otherwise emit
if (a eq Sentinel)
Pull.done
else
Pull.output1(a.asInstanceOf[A]) >> loop
}

@inline private def shouldEmit(s: State) = s.values.nonEmpty || s.producers.nonEmpty
// check to see if we're closed and done processing
// if we're all done, complete the latch and terminate the stream
isQuiesced.map { b =>
if (b)
Pull.done
else
Pull.eval(fallback).flatten
}

private def makeChunk(init: List[A], tail: List[A], size: Int): Chunk[A] = {
val arr = new Array[Any](size)
var i = size - 1
var values = tail
while (i >= 0) {
if (values.isEmpty) values = init
arr(i) = values.head
values = values.tail
i -= 1
case as =>
// this is the happy path: we were able to take a chunk
// meaning we're producing as fast or faster than we're consuming

isClosed.map { b =>
if (b) {
// if we're closed, we have to check for the sentinel and strip it out
val as2 = as.filter(_ ne Sentinel)

// if it's empty, we definitely stripped a sentinel, so just be done
// if it's non-empty, we can't know without expensive comparisons, so fall through
if (as2.isEmpty)
Pull.done
else
Pull.output(Chunk.seq(as2.asInstanceOf[List[A]])) >> loop
} else {
Pull.output(Chunk.seq(as.asInstanceOf[List[A]])) >> loop
}
}
}

Pull.eval(pullF).flatten
}
Chunk.array(arr).asInstanceOf[Chunk[A]]

loop.stream
}

// closedLatch solely exists to support this function
val closed: F[Unit] = closedLatch.get
}
}
}

// allocate once
@inline private final def closed[A]: Either[Closed, A] = _closed
private[this] final val _closed: Either[Closed, Nothing] = Left(Closed)
private final val rightUnit: Either[Closed, Unit] = Right(())
private final val rightTrue: Either[Closed, Boolean] = Right(true)
private final val rightFalse: Either[Closed, Boolean] = Right(false)
Comment on lines -283 to -288
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, why dump all of these?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can bring them back. I think I chopped them out a while ago.

}
12 changes: 9 additions & 3 deletions core/shared/src/main/scala/fs2/concurrent/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ abstract class Topic[F[_], A] { self =>
*
* If at any point, the queue backing the subscription has `maxQueued` elements in it,
* any further publications semantically block until elements are dequeued from the
* subscription queue.
* subscription queue. Any value of `maxQueued` which is greater than `Short.MaxValue`
* is treated as unbounded.
*
* @param maxQueued maximum number of elements to enqueue to the subscription
* queue before blocking publishers
Expand All @@ -80,7 +81,9 @@ abstract class Topic[F[_], A] { self =>

/** Like `subscribe`, but represents the subscription explicitly as
* a `Resource` which returns after the subscriber is subscribed,
* but before it has started pulling elements.
* but before it has started pulling elements. Note that any value
* of `maxQueued` which is greater than `Short.MaxValue` will be
* treated as "unbounded".
*/
def subscribeAwait(maxQueued: Int): Resource[F, Stream[F, A]]

Expand Down Expand Up @@ -159,7 +162,10 @@ object Topic {

def subscribeAwait(maxQueued: Int): Resource[F, Stream[F, A]] =
Resource
.eval(Channel.bounded[F, A](maxQueued))
.eval(
if (maxQueued >= Short.MaxValue) Channel.unbounded[F, A]
else Channel.bounded[F, A](maxQueued)
)
.flatMap { chan =>
val subscribe = state.modify { case (subs, id) =>
(subs.updated(id, chan), id + 1) -> id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class BroadcastSuite extends Fs2Suite {
test("all subscribers see all elements, pipe immediately interrupted") {
forAllF { (source: Stream[Pure, Int], concurrent0: Int) =>
val concurrent = (concurrent0 % 20).abs.max(1)
val interruptedPipe = scala.util.Random.nextInt(concurrent)
val interruptedPipe = 0
val expected = source.compile.toVector.map(_.toString)

def pipe(idx: Int): Pipe[IO, Int, (Int, String)] =
Expand All @@ -79,6 +79,7 @@ class BroadcastSuite extends Fs2Suite {
result.foreach(it => assertEquals(it, expected))
} else assert(result.isEmpty)
}
// .replicateA(100)
}
}

Expand Down
Loading