Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplemented Channel in terms of Queue #2856

Closed
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
561b9c3
Reimplemented `Channel` in terms of `Queue`
djspiewak Mar 26, 2022
8e45c91
Adjusted take race to avoid element loss on close
djspiewak Mar 26, 2022
dfabc5d
Updated to snapshot with `tryTakeN`
djspiewak Mar 26, 2022
831bd31
Merge branch 'main' into feature/non-primitive-channel
djspiewak Sep 29, 2022
4d1112b
Fixed `sendAll` upstream management for early closure
djspiewak Sep 29, 2022
4248a64
Scalafmt
djspiewak Sep 29, 2022
491186d
Upgraded to Scala 3.2.0
djspiewak Sep 29, 2022
76bfd1a
Merge branch 'main' into feature/non-primitive-channel
armanbilge Sep 30, 2022
1d6574f
Update core/shared/src/main/scala/fs2/concurrent/Channel.scala
djspiewak Sep 30, 2022
2fab62e
Merge branch 'main' into feature/non-primitive-channel
djspiewak Sep 30, 2022
3ff9ee2
Fixed race condition between offer, take, and close
djspiewak Oct 1, 2022
a60219b
Revert "Fixed race condition between offer, take, and close"
djspiewak Oct 1, 2022
58dfbb0
Fixed race condition between offer, take, and close
djspiewak Oct 1, 2022
26d832b
Fixed shadowing warning
djspiewak Oct 1, 2022
9e21329
Treat any topic bound over `Short.MaxValue` as unbounded
djspiewak Oct 1, 2022
6063707
Added a little extra doc
djspiewak Oct 1, 2022
0ea3d72
Beefed up tests and updated to fixed CE snapshot
djspiewak Oct 7, 2022
487d6a8
Reimplemented `Channel` with strictly less `racePair`
djspiewak Oct 8, 2022
e0c2c78
Improved efficiency of `stream` pulling
djspiewak Oct 9, 2022
ccebad8
Avoid being too slow on scalajs
djspiewak Oct 9, 2022
368bc4b
Merge branch 'main' into feature/non-primitive-channel
djspiewak Oct 22, 2022
16f8a8d
`close` needs to be atomic
djspiewak Oct 22, 2022
6ba1126
Reworked `loop` to use `Pull` directly; trying to diagnose
djspiewak Oct 25, 2022
349e3e3
Removed debugging logic and brought in CE snapshot with fix
djspiewak Oct 26, 2022
1dfd72d
Merge branch 'main' into feature/non-primitive-channel
djspiewak Oct 26, 2022
a39f26a
Removed stray `only`
djspiewak Oct 26, 2022
4a70080
Refactored two `Ref`s into one
djspiewak Oct 26, 2022
9c8e420
Eagerly fulfill `closed`
djspiewak Oct 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
libraryDependencies ++= Seq(
"org.typelevel" %%% "cats-core" % "2.8.0",
"org.typelevel" %%% "cats-laws" % "2.8.0" % Test,
"org.typelevel" %%% "cats-effect" % "3.4.0-RC1",
"org.typelevel" %%% "cats-effect-laws" % "3.4.0-RC1" % Test,
"org.typelevel" %%% "cats-effect-testkit" % "3.4.0-RC1" % Test,
"org.typelevel" %%% "cats-effect" % "3.4-148221d",
"org.typelevel" %%% "cats-effect-laws" % "3.4-148221d" % Test,
"org.typelevel" %%% "cats-effect-testkit" % "3.4-148221d" % Test,
"org.scodec" %%% "scodec-bits" % "1.1.34",
"org.typelevel" %%% "scalacheck-effect-munit" % "2.0.0-M2" % Test,
"org.typelevel" %%% "munit-cats-effect" % "2.0.0-M3" % Test,
Expand Down
299 changes: 153 additions & 146 deletions core/shared/src/main/scala/fs2/concurrent/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
package fs2
package concurrent

import cats.Applicative
import cats.effect._
import cats.effect.implicits._
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._

/** Stream aware, multiple producer, single consumer closeable channel.
Expand Down Expand Up @@ -116,174 +118,179 @@ sealed trait Channel[F[_], A] {
/** Semantically blocks until the channel gets closed. */
def closed: F[Unit]
}

object Channel {
type Closed = Closed.type
object Closed

def unbounded[F[_]: Concurrent, A]: F[Channel[F, A]] =
bounded(Int.MaxValue)
Queue.unbounded[F, AnyRef].flatMap(impl(_))

def synchronous[F[_]: Concurrent, A]: F[Channel[F, A]] =
bounded(0)

def bounded[F[_], A](capacity: Int)(implicit F: Concurrent[F]): F[Channel[F, A]] = {
case class State(
values: List[A],
size: Int,
waiting: Option[Deferred[F, Unit]],
producers: List[(A, Deferred[F, Unit])],
closed: Boolean
)

val open = State(List.empty, 0, None, List.empty, closed = false)

def empty(isClosed: Boolean): State =
if (isClosed) State(List.empty, 0, None, List.empty, closed = true)
else open

(F.ref(open), F.deferred[Unit]).mapN { (state, closedGate) =>
new Channel[F, A] {

def sendAll: Pipe[F, A, Nothing] = { in =>
(in ++ Stream.exec(close.void))
.evalMap(send)
.takeWhile(_.isRight)
.drain
}

def send(a: A) =
F.deferred[Unit].flatMap { producer =>
F.uncancelable { poll =>
state.modify {
case s @ State(_, _, _, _, closed @ true) =>
(s, Channel.closed[Unit].pure[F])

case State(values, size, waiting, producers, closed @ false) =>
if (size < capacity)
(
State(a :: values, size + 1, None, producers, false),
notifyStream(waiting).as(rightUnit)
)
Queue.synchronous[F, AnyRef].flatMap(impl(_))

def bounded[F[_], A](capacity: Int)(implicit F: Concurrent[F]): F[Channel[F, A]] =
Queue.bounded[F, AnyRef](capacity).flatMap(impl(_))

// used as a marker to wake up q.take when the channel is closed
private[this] val Sentinel = new AnyRef

private[this] val LeftClosed: Either[Channel.Closed, Unit] = Left(Channel.Closed)
private[this] val RightUnit: Either[Channel.Closed, Unit] = Right(())

// technically this should be A | Sentinel.type
// the queue will consist of exclusively As until we shut down, when there will be one Sentinel
private[this] def impl[F[_]: Concurrent, A](q: Queue[F, AnyRef]): F[Channel[F, A]] =
(Concurrent[F].ref(0), Concurrent[F].ref(false), Concurrent[F].deferred[Unit]).mapN {
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
(leasesR, closedR, closedLatch) =>
new Channel[F, A] {

private[this] val LeftClosedF = LeftClosed.pure[F]
private[this] val FalseF = false.pure[F]

// might be interesting to try to optimize this more, but it needs support from CE
val sendAll: Pipe[F, A, Nothing] =
_.evalMapChunk(send(_))
.takeWhile(_.isRight)
.onComplete(Stream.exec(close.void))
.drain

// setting the flag means we won't accept any more sends
val close: F[Either[Channel.Closed, Unit]] =
closedR.getAndSet(true).flatMap { b =>
if (b) {
LeftClosedF
} else {
leasesR.get.flatMap { leases =>
if (leases <= 0)
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
q.offer(Sentinel).start.as(RightUnit)
else
(
State(values, size, None, (a, producer) :: producers, false),
notifyStream(waiting).as(rightUnit) <* waitOnBound(producer, poll)
)
}.flatten
RightUnit.pure[F]
}
}
}
}

def trySend(a: A) =
state.modify {
case s @ State(_, _, _, _, closed @ true) =>
(s, Channel.closed[Boolean].pure[F])

case s @ State(values, size, waiting, producers, closed @ false) =>
if (size < capacity)
(
State(a :: values, size + 1, None, producers, false),
notifyStream(waiting).as(rightTrue)
)
else
(s, rightFalse.pure[F])
}.flatten

def close =
state
.modify {
case s @ State(_, _, _, _, closed @ true) =>
(s, Channel.closed[Unit].pure[F])

case State(values, size, waiting, producers, closed @ false) =>
(
State(values, size, None, producers, true),
notifyStream(waiting).as(rightUnit) <* signalClosure
)
val isClosed: F[Boolean] = closedR.get

// there are four states to worry about: open, closing, draining, quiesced
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
// in the second state, we have outstanding blocked sends
// in the third state we have data in the queue but no sends
// in the fourth state we are completely drained and can shut down the stream
private[this] val isQuiesced: F[Boolean] =
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
isClosed.flatMap { b =>
if (b) {
leasesR.get.flatMap { leases =>
if (leases <= 0)
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
q.size.map(_ <= 0)
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
else
FalseF
}
} else {
FalseF
}
}
.flatten
.uncancelable

def isClosed = closedGate.tryGet.map(_.isDefined)

def closed = closedGate.get

def stream = consumeLoop.stream

def consumeLoop: Pull[F, A, Unit] =
Pull.eval {
F.deferred[Unit].flatMap { waiting =>
state
.modify { state =>
if (shouldEmit(state)) (empty(state.closed), state)
else (state.copy(waiting = waiting.some), state)
}
.flatMap {
case s @ State(
initValues,
stateSize,
ignorePreviousWaiting @ _,
producers,
closed
) =>
if (shouldEmit(s)) {
var size = stateSize
val tailValues = List.newBuilder[A]
var unblock = F.unit

producers.foreach { case (value, producer) =>
size += 1
tailValues += value
unblock = unblock <* producer.complete(())
def send(a: A): F[Either[Channel.Closed, Unit]] = {
// we track the outstanding blocked offers so we can distinguish closing from draining
// the very last blocked send, when closed, is responsible for triggering the sentinel
def permit[E](fe: F[E]): F[E] =
MonadCancel[F].uncancelable { poll =>
(leasesR.update(_ + 1) *> poll(fe)).guarantee {
leasesR.updateAndGet(_ - 1).flatMap { leases =>
if (leases <= 0) {
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
closedR.get.flatMap { b =>
if (b)
q.offer(Sentinel)
.start
.void // we don't want to backpressure on processing the sentinel
else
Applicative[F].unit
}

val toEmit = makeChunk(initValues, tailValues.result(), size)

unblock.as(Pull.output(toEmit) >> consumeLoop)
} else {
F.pure(
if (closed) Pull.done
else Pull.eval(waiting.get) >> consumeLoop
)
Applicative[F].unit
}
}
}
.uncancelable
}
}.flatten
}

def notifyStream(waitForChanges: Option[Deferred[F, Unit]]) =
waitForChanges.traverse(_.complete(()))
isClosed.ifM(
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
LeftClosedF,
permit(isClosed.ifM(LeftClosedF, q.offer(a.asInstanceOf[AnyRef]).as(RightUnit)))
)
}

def waitOnBound(producer: Deferred[F, Unit], poll: Poll[F]) =
poll(producer.get).onCancel {
state.update { s =>
s.copy(producers = s.producers.filter(_._2 ne producer))
def trySend(a: A): F[Either[Channel.Closed, Boolean]] =
isClosed.flatMap { b =>
if (b)
LeftClosedF.asInstanceOf[F[Either[Channel.Closed, Boolean]]]
else
q.tryOffer(a.asInstanceOf[AnyRef]).map(_.asRight[Channel.Closed])
}
}

def signalClosure = closedGate.complete(())
val stream: Stream[F, A] = {
val takeN: F[Chunk[A]] =
q.tryTakeN(None).flatMap {
case Nil =>
// if we land here, it either means we're consuming faster than producing
// or it means we're actually closed and we need to shut down
// this is the unhappy path either way

val fallback = q.take.flatMap { a =>
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
// if we get the sentinel, shut down all the things, otherwise emit
if (a eq Sentinel)
closedLatch.complete(()).as(Chunk.empty[A])
else
Chunk.singleton(a.asInstanceOf[A]).pure[F]
}

// check to see if we're closed and done processing
// if we're all done, complete the latch and terminate the stream
isQuiesced.flatMap { b =>
if (b)
closedLatch.complete(()).as(Chunk.empty[A])
else
fallback
}

case as =>
// this is the happy path: we were able to take a chunk
// meaning we're producing as fast or faster than we're consuming

isClosed.flatMap { b =>
val back = if (b) {
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
// if we're closed, we have to check for the sentinel and strip it out
val as2 = as.filter(_ ne Sentinel)

// if it's empty, we definitely stripped a sentinel, so just be done
// if it's non-empty, we can't know without expensive comparisons, so fall through
if (as2.isEmpty)
closedLatch.complete(()).as(Chunk.empty[A])
else
Chunk.seq(as2).pure[F]
} else {
Chunk.seq(as).pure[F]
}

back.asInstanceOf[F[Chunk[A]]]
}
}

@inline private def shouldEmit(s: State) = s.values.nonEmpty || s.producers.nonEmpty
// we will emit non-empty chunks until we see the empty chunk sentinel
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
lazy val go: Stream[F, A] =
Stream.force {
takeN.map { c =>
if (c.isEmpty)
Stream.empty
else
Stream.chunk(c) ++ go
}
}

private def makeChunk(init: List[A], tail: List[A], size: Int): Chunk[A] = {
val arr = new Array[Any](size)
var i = size - 1
var values = tail
while (i >= 0) {
if (values.isEmpty) values = init
arr(i) = values.head
values = values.tail
i -= 1
go
}
Chunk.array(arr).asInstanceOf[Chunk[A]]

// closedLatch solely exists to support this function
val closed: F[Unit] = closedLatch.get
}
}
}
}

// allocate once
@inline private final def closed[A]: Either[Closed, A] = _closed
private[this] final val _closed: Either[Closed, Nothing] = Left(Closed)
private final val rightUnit: Either[Closed, Unit] = Right(())
private final val rightTrue: Either[Closed, Boolean] = Right(true)
private final val rightFalse: Either[Closed, Boolean] = Right(false)
Comment on lines -283 to -288
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, why dump all of these?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can bring them back. I think I chopped them out a while ago.

}
12 changes: 9 additions & 3 deletions core/shared/src/main/scala/fs2/concurrent/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ abstract class Topic[F[_], A] { self =>
*
* If at any point, the queue backing the subscription has `maxQueued` elements in it,
* any further publications semantically block until elements are dequeued from the
* subscription queue.
* subscription queue. Any value of `maxQueued` which is greater than `Short.MaxValue`
* is treated as unbounded.
*
* @param maxQueued maximum number of elements to enqueue to the subscription
* queue before blocking publishers
Expand All @@ -80,7 +81,9 @@ abstract class Topic[F[_], A] { self =>

/** Like `subscribe`, but represents the subscription explicitly as
* a `Resource` which returns after the subscriber is subscribed,
* but before it has started pulling elements.
* but before it has started pulling elements. Note that any value
* of `maxQueued` which is greater than `Short.MaxValue` will be
* treated as "unbounded".
*/
def subscribeAwait(maxQueued: Int): Resource[F, Stream[F, A]]

Expand Down Expand Up @@ -159,7 +162,10 @@ object Topic {

def subscribeAwait(maxQueued: Int): Resource[F, Stream[F, A]] =
Resource
.eval(Channel.bounded[F, A](maxQueued))
.eval(
if (maxQueued >= Short.MaxValue) Channel.unbounded[F, A]
else Channel.bounded[F, A](maxQueued)
)
.flatMap { chan =>
val subscribe = state.modify { case (subs, id) =>
(subs.updated(id, chan), id + 1) -> id
Expand Down
Loading