From 561b9c3a400adf4dfa90ed4b023539ab5ecdb154 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Fri, 25 Mar 2022 19:19:26 -0600 Subject: [PATCH 01/23] Reimplemented `Channel` in terms of `Queue` --- build.sbt | 6 +- .../main/scala/fs2/concurrent/Channel.scala | 154 ++++-------------- 2 files changed, 35 insertions(+), 125 deletions(-) diff --git a/build.sbt b/build.sbt index b78e4b5f82..7447f55a8f 100644 --- a/build.sbt +++ b/build.sbt @@ -188,9 +188,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform) libraryDependencies ++= Seq( "org.typelevel" %%% "cats-core" % "2.7.0", "org.typelevel" %%% "cats-laws" % "2.7.0" % Test, - "org.typelevel" %%% "cats-effect" % "3.3.6", - "org.typelevel" %%% "cats-effect-laws" % "3.3.6" % Test, - "org.typelevel" %%% "cats-effect-testkit" % "3.3.6" % Test, + "org.typelevel" %%% "cats-effect" % "3.4-387-e167c59", + "org.typelevel" %%% "cats-effect-laws" % "3.4-387-e167c59" % Test, + "org.typelevel" %%% "cats-effect-testkit" % "3.4-387-e167c59" % Test, "org.scodec" %%% "scodec-bits" % "1.1.30", "org.typelevel" %%% "scalacheck-effect-munit" % "1.0.3" % Test, "org.typelevel" %%% "munit-cats-effect-3" % "1.0.7" % Test, diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 0117e62ecc..891a926103 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -23,7 +23,7 @@ package fs2 package concurrent import cats.effect._ -import cats.effect.implicits._ +import cats.effect.std.Queue import cats.syntax.all._ /** Stream aware, multiple producer, single consumer closeable channel. @@ -105,149 +105,59 @@ sealed trait Channel[F[_], A] { /** Semantically blocks until the channel gets closed. */ def closed: F[Unit] } + object Channel { type Closed = Closed.type object Closed def unbounded[F[_]: Concurrent, A]: F[Channel[F, A]] = - bounded(Int.MaxValue) + Queue.unbounded[F, A].flatMap(impl(_)) def synchronous[F[_]: Concurrent, A]: F[Channel[F, A]] = - bounded(0) - - def bounded[F[_], A](capacity: Int)(implicit F: Concurrent[F]): F[Channel[F, A]] = { - case class State( - values: List[A], - size: Int, - waiting: Option[Deferred[F, Unit]], - producers: List[(A, Deferred[F, Unit])], - closed: Boolean - ) + Queue.synchronous[F, A].flatMap(impl(_)) - val open = State(List.empty, 0, None, List.empty, closed = false) + def bounded[F[_], A](capacity: Int)(implicit F: Concurrent[F]): F[Channel[F, A]] = + Queue.bounded[F, A](capacity).flatMap(impl(_)) - def empty(isClosed: Boolean): State = - if (isClosed) State(List.empty, 0, None, List.empty, closed = true) - else open - - (F.ref(open), F.deferred[Unit]).mapN { (state, closedGate) => + private[this] def impl[F[_]: Concurrent, A](q: Queue[F, A]): F[Channel[F, A]] = + Concurrent[F].deferred[Unit].map { closedR => new Channel[F, A] { - def sendAll: Pipe[F, A, Nothing] = { in => - (in ++ Stream.exec(close.void)) - .evalMap(send) - .takeWhile(_.isRight) - .drain - } + def sendAll: Pipe[F, A, Nothing] = + _.evalMapChunk(send(_)).onComplete(Stream.exec(close.void)).drain - def send(a: A) = - F.deferred[Unit].flatMap { producer => - F.uncancelable { poll => - state.modify { - case s @ State(_, _, _, _, closed @ true) => - (s, Channel.closed.pure[F]) - - case State(values, size, waiting, producers, closed @ false) => - if (size < capacity) - ( - State(a :: values, size + 1, None, producers, false), - notifyStream(waiting) - ) - else - ( - State(values, size, None, (a, producer) :: producers, false), - notifyStream(waiting) <* waitOnBound(producer, poll) - ) - }.flatten - } + // doesn't interrupt taking in progress + def close: F[Either[Channel.Closed, Unit]] = + closedR.complete(()).map { + case false => Left(Channel.Closed) + case true => Right(()) } - def close = - state - .modify { - case s @ State(_, _, _, _, closed @ true) => - (s, Channel.closed.pure[F]) - - case State(values, size, waiting, producers, closed @ false) => - ( - State(values, size, None, producers, true), - notifyStream(waiting) <* signalClosure - ) - } - .flatten - .uncancelable - - def isClosed = closedGate.tryGet.map(_.isDefined) - - def closed = closedGate.get + def isClosed: F[Boolean] = closedR.tryGet.map(_.isDefined) - def stream = consumeLoop.stream + def send(a: A): F[Either[Channel.Closed, Unit]] = + isClosed.ifM(Channel.Closed.asLeft[Unit].pure[F], q.offer(a).map(Right(_))) - def consumeLoop: Pull[F, A, Unit] = - Pull.eval { - F.deferred[Unit].flatMap { waiting => - state - .modify { state => - if (shouldEmit(state)) (empty(state.closed), state) - else (state.copy(waiting = waiting.some), state) + def stream: Stream[F, A] = { + val takeN: F[Chunk[A]] = + q.tryTakeN(None).flatMap { + case None => + val fallback = Spawn[F].race(q.take, closedR.get).map { + case Left(a) => Chunk.singleton(a) + case Right(_) => Chunk.empty[A] } - .flatMap { - case s @ State(values, stateSize, ignorePreviousWaiting @ _, producers, closed) => - if (shouldEmit(s)) { - var size = stateSize - var allValues = values - var unblock = F.unit - - producers.foreach { case (value, producer) => - size += 1 - allValues = value :: allValues - unblock = unblock <* producer.complete(()) - } - - val toEmit = makeChunk(allValues, size) - - unblock.as(Pull.output(toEmit) >> consumeLoop) - } else { - F.pure( - if (closed) Pull.done - else Pull.eval(waiting.get) >> consumeLoop - ) - } - } - .uncancelable - } - }.flatten - def notifyStream(waitForChanges: Option[Deferred[F, Unit]]) = - waitForChanges.traverse(_.complete(())).as(rightUnit) + isClosed.ifM(Chunk.empty[A].pure[F], fallback) - def waitOnBound(producer: Deferred[F, Unit], poll: Poll[F]) = - poll(producer.get).onCancel { - state.update { s => - s.copy(producers = s.producers.filter(_._2 ne producer)) + case Some(as) => + Chunk.seq(as).pure[F] } - } - - def signalClosure = closedGate.complete(()) - @inline private def shouldEmit(s: State) = s.values.nonEmpty || s.producers.nonEmpty - - private def makeChunk(allValues: List[A], size: Int): Chunk[A] = { - val arr = new Array[Any](size) - var i = size - 1 - var values = allValues - while (i >= 0) { - arr(i) = values.head - values = values.tail - i -= 1 - } - Chunk.array(arr).asInstanceOf[Chunk[A]] + // you can do this more efficiently, just proves a point + Stream.eval(takeN).repeat.takeWhile(!_.isEmpty).flatMap(Stream.chunk(_)) } + + def closed: F[Unit] = closedR.get } } - } - - // allocate once - private final val closed: Either[Closed, Unit] = Left(Closed) - private final val rightUnit: Either[Closed, Unit] = Right(()) } From 8e45c913c5a8046e02783a5722e63e359930d51b Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Fri, 25 Mar 2022 23:28:00 -0600 Subject: [PATCH 02/23] Adjusted take race to avoid element loss on close --- .../main/scala/fs2/concurrent/Channel.scala | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 891a926103..70a96fa669 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -142,9 +142,22 @@ object Channel { val takeN: F[Chunk[A]] = q.tryTakeN(None).flatMap { case None => - val fallback = Spawn[F].race(q.take, closedR.get).map { - case Left(a) => Chunk.singleton(a) - case Right(_) => Chunk.empty[A] + val fallback = MonadCancel[F].uncancelable { poll => + poll(Spawn[F].racePair(q.take, closedR.get)).flatMap { + case Left((oca, fiber)) => + oca.embedNever.flatMap(a => fiber.cancel.as(Chunk.singleton(a))) + + case Right((fiber, ocb)) => + ocb.embedNever.flatMap { _ => + (fiber.cancel *> fiber.join).flatMap { oca => + oca.fold( + Chunk.empty[A].pure[F], + _ => Chunk.empty[A].pure[F], + _.map(Chunk.singleton(_)) + ) + } + } + } } isClosed.ifM(Chunk.empty[A].pure[F], fallback) From dfabc5d4bc06cd96268cfcaed5d13d4e0f7c24d7 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Fri, 25 Mar 2022 23:31:39 -0600 Subject: [PATCH 03/23] Updated to snapshot with `tryTakeN` --- build.sbt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/build.sbt b/build.sbt index 7447f55a8f..1134134ab2 100644 --- a/build.sbt +++ b/build.sbt @@ -188,9 +188,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform) libraryDependencies ++= Seq( "org.typelevel" %%% "cats-core" % "2.7.0", "org.typelevel" %%% "cats-laws" % "2.7.0" % Test, - "org.typelevel" %%% "cats-effect" % "3.4-387-e167c59", - "org.typelevel" %%% "cats-effect-laws" % "3.4-387-e167c59" % Test, - "org.typelevel" %%% "cats-effect-testkit" % "3.4-387-e167c59" % Test, + "org.typelevel" %%% "cats-effect" % "3.4-389-3862cf0", + "org.typelevel" %%% "cats-effect-laws" % "3.4-389-3862cf0" % Test, + "org.typelevel" %%% "cats-effect-testkit" % "3.4-389-3862cf0" % Test, "org.scodec" %%% "scodec-bits" % "1.1.30", "org.typelevel" %%% "scalacheck-effect-munit" % "1.0.3" % Test, "org.typelevel" %%% "munit-cats-effect-3" % "1.0.7" % Test, From 4d1112b2d9ac9788837a97027e83751c5947e8d1 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Thu, 29 Sep 2022 12:57:45 -0600 Subject: [PATCH 04/23] Fixed `sendAll` upstream management for early closure --- .../src/main/scala/fs2/concurrent/Channel.scala | 5 ++++- .../test/scala/fs2/concurrent/ChannelSuite.scala | 16 ++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 6a30dde62f..5bcc3df147 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -136,7 +136,10 @@ object Channel { new Channel[F, A] { def sendAll: Pipe[F, A, Nothing] = - _.evalMapChunk(send(_)).onComplete(Stream.exec(close.void)).drain + _.evalMapChunk(send(_)) + .takeWhile(_.isRight) + .onComplete(Stream.exec(close.void)) + .drain // doesn't interrupt taking in progress def close: F[Either[Channel.Closed, Unit]] = diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index bba34d6460..bb3c05691d 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -168,4 +168,20 @@ class ChannelSuite extends Fs2Suite { TestControl.executeEmbed(test) } + + test("eagerly close sendAll upstream") { + for { + countR <- IO.ref(0) + chan <- Channel.unbounded[IO, Unit] + + incrementer = Stream.eval(countR.update(_ + 1)).repeat.take(5) + upstream = incrementer ++ Stream.eval(chan.close).drain ++ incrementer + + results <- chan.stream.concurrently(upstream.through(chan.sendAll)).compile.toList + + _ <- IO(assert(results.length == 5)) + count <- countR.get + _ <- IO(assert(count == 6)) // we have to overrun the closure to detect it + } yield () + } } From 4248a640032dbea43adc1a03b28b26e927f76903 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Thu, 29 Sep 2022 12:58:19 -0600 Subject: [PATCH 05/23] Scalafmt --- core/shared/src/main/scala/fs2/concurrent/Channel.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 5bcc3df147..008fc5f559 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -159,7 +159,8 @@ object Channel { def send(a: A): F[Either[Channel.Closed, Unit]] = isClosed.ifM( Channel.Closed.asLeft[Unit].pure[F], - (leasesR.update(_ + 1) *> q.offer(a)).guarantee(leasesR.update(_ - 1)).map(Right(_))) + (leasesR.update(_ + 1) *> q.offer(a)).guarantee(leasesR.update(_ - 1)).map(Right(_)) + ) def trySend(a: A): F[Either[Channel.Closed, Boolean]] = isClosed.ifM(Channel.Closed.asLeft[Boolean].pure[F], q.tryOffer(a).map(Right(_))) @@ -168,7 +169,7 @@ object Channel { val takeN: F[Chunk[A]] = q.tryTakeN(None).flatMap { case Nil => - val fallback = leasesDrained flatMap { b => + val fallback = leasesDrained.flatMap { b => if (b) { MonadCancel[F].uncancelable { poll => poll(Spawn[F].racePair(q.take, closedR.get)).flatMap { From 491186d5993c3846ea98418a3c44ae3f0f96eea3 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Thu, 29 Sep 2022 13:10:55 -0600 Subject: [PATCH 06/23] Upgraded to Scala 3.2.0 --- .github/workflows/ci.yml | 20 ++++++++++---------- build.sbt | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 07aa63573e..f8fe4a932e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,7 +28,7 @@ jobs: strategy: matrix: os: [ubuntu-22.04] - scala: [3.1.3, 2.12.17, 2.13.8] + scala: [3.2.0, 2.12.17, 2.13.8] java: [temurin@17] project: [rootJS, rootJVM, rootNative] runs-on: ${{ matrix.os }} @@ -161,32 +161,32 @@ jobs: ~/Library/Caches/Coursier/v1 key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} - - name: Download target directories (3.1.3, rootJS) + - name: Download target directories (3.2.0, rootJS) uses: actions/download-artifact@v2 with: - name: target-${{ matrix.os }}-${{ matrix.java }}-3.1.3-rootJS + name: target-${{ matrix.os }}-${{ matrix.java }}-3.2.0-rootJS - - name: Inflate target directories (3.1.3, rootJS) + - name: Inflate target directories (3.2.0, rootJS) run: | tar xf targets.tar rm targets.tar - - name: Download target directories (3.1.3, rootJVM) + - name: Download target directories (3.2.0, rootJVM) uses: actions/download-artifact@v2 with: - name: target-${{ matrix.os }}-${{ matrix.java }}-3.1.3-rootJVM + name: target-${{ matrix.os }}-${{ matrix.java }}-3.2.0-rootJVM - - name: Inflate target directories (3.1.3, rootJVM) + - name: Inflate target directories (3.2.0, rootJVM) run: | tar xf targets.tar rm targets.tar - - name: Download target directories (3.1.3, rootNative) + - name: Download target directories (3.2.0, rootNative) uses: actions/download-artifact@v2 with: - name: target-${{ matrix.os }}-${{ matrix.java }}-3.1.3-rootNative + name: target-${{ matrix.os }}-${{ matrix.java }}-3.2.0-rootNative - - name: Inflate target directories (3.1.3, rootNative) + - name: Inflate target directories (3.2.0, rootNative) run: | tar xf targets.tar rm targets.tar diff --git a/build.sbt b/build.sbt index 5008a6308e..8524a52eab 100644 --- a/build.sbt +++ b/build.sbt @@ -10,7 +10,7 @@ ThisBuild / startYear := Some(2013) val NewScala = "2.13.8" -ThisBuild / crossScalaVersions := Seq("3.1.3", "2.12.17", NewScala) +ThisBuild / crossScalaVersions := Seq("3.2.0", "2.12.17", NewScala) ThisBuild / tlVersionIntroduced := Map("3" -> "3.0.3") ThisBuild / githubWorkflowOSes := Seq("ubuntu-22.04") From 1d6574f802d8ef25d6544c4c55d875c1d7981730 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Thu, 29 Sep 2022 19:27:24 -0600 Subject: [PATCH 07/23] Update core/shared/src/main/scala/fs2/concurrent/Channel.scala Co-authored-by: Arman Bilge --- core/shared/src/main/scala/fs2/concurrent/Channel.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 008fc5f559..924c7314ce 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -200,7 +200,7 @@ object Channel { } // you can do this more efficiently, just proves a point - Stream.eval(takeN).repeat.takeWhile(!_.isEmpty).flatMap(Stream.chunk(_)) + Stream.eval(takeN).repeat.takeWhile(!_.isEmpty).unchunks } def closed: F[Unit] = closedR.get From 3ff9ee22c94ca2421fa302b81fdab0514da0fe4f Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Fri, 30 Sep 2022 18:18:40 -0600 Subject: [PATCH 08/23] Fixed race condition between offer, take, and close --- .../main/scala/fs2/concurrent/Channel.scala | 79 +++++++++++++------ .../scala/fs2/concurrent/ChannelSuite.scala | 15 ++++ 2 files changed, 71 insertions(+), 23 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 924c7314ce..afb1ed74e5 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -22,6 +22,7 @@ package fs2 package concurrent +import cats.Applicative import cats.effect._ import cats.effect.std.Queue import cats.effect.syntax.all._ @@ -132,7 +133,7 @@ object Channel { Queue.bounded[F, A](capacity).flatMap(impl(_)) private[this] def impl[F[_]: Concurrent, A](q: Queue[F, A]): F[Channel[F, A]] = - (Concurrent[F].deferred[Unit], Concurrent[F].ref(0)).mapN { (closedR, leasesR) => + (Concurrent[F].deferred[Unit], Lease[F]).mapN { (closedR, lease) => new Channel[F, A] { def sendAll: Pipe[F, A, Nothing] = @@ -151,7 +152,7 @@ object Channel { def isClosed: F[Boolean] = closedR.tryGet.map(_.isDefined) private[this] val leasesDrained: F[Boolean] = - leasesR.get.map(_ <= 0) + lease.isEmpty private[this] val isQuiesced: F[Boolean] = isClosed.ifM(leasesDrained, false.pure[F]) @@ -159,7 +160,7 @@ object Channel { def send(a: A): F[Either[Channel.Closed, Unit]] = isClosed.ifM( Channel.Closed.asLeft[Unit].pure[F], - (leasesR.update(_ + 1) *> q.offer(a)).guarantee(leasesR.update(_ - 1)).map(Right(_)) + lease.permit(q.offer(a)).map(Right(_)) ) def trySend(a: A): F[Either[Channel.Closed, Boolean]] = @@ -169,27 +170,21 @@ object Channel { val takeN: F[Chunk[A]] = q.tryTakeN(None).flatMap { case Nil => - val fallback = leasesDrained.flatMap { b => - if (b) { - MonadCancel[F].uncancelable { poll => - poll(Spawn[F].racePair(q.take, closedR.get)).flatMap { - case Left((oca, fiber)) => - oca.embedNever.flatMap(a => fiber.cancel.as(Chunk.singleton(a))) - - case Right((fiber, ocb)) => - ocb.embedNever.flatMap { _ => - (fiber.cancel *> fiber.join).flatMap { oca => - oca.fold( - Chunk.empty[A].pure[F], - _ => Chunk.empty[A].pure[F], - _.map(Chunk.singleton(_)) - ) - } - } + val fallback = MonadCancel[F].uncancelable { poll => + poll(Spawn[F].racePair(q.take, closedR.get.both(lease.await))).flatMap { + case Left((oca, fiber)) => + oca.embedNever.flatMap(a => fiber.cancel.as(Chunk.singleton(a))) + + case Right((fiber, ocb)) => + ocb.embedNever.flatMap { _ => + (fiber.cancel *> fiber.join).flatMap { oca => + oca.fold( + Chunk.empty[A].pure[F], + _ => Chunk.empty[A].pure[F], + _.map(Chunk.singleton(_)) + ) + } } - } - } else { - q.take.map(Chunk.singleton(_)) } } @@ -206,4 +201,42 @@ object Channel { def closed: F[Unit] = closedR.get } } + + // this is like an inverse semaphore and I'm surprised we haven't added it in std yet + private final class Lease[F[_]: Concurrent] private ( + leasesR: Ref[F, Int], + latchR: Ref[F, Deferred[F, Unit]] + ) { + + def permit[A](fa: F[A]): F[A] = + MonadCancel[F].uncancelable { poll => + val init = leasesR.modify(i => (i + 1, i + 1)).flatMap { leases => + if (leases == 1) + Concurrent[F].deferred[Unit].flatMap(latchR.set(_)) + else + Applicative[F].unit + } + + (init *> poll(fa)).guarantee { + leasesR.modify(i => (i - 1, i - 1)).flatMap { leases => + if (leases == 0) + latchR.get.flatMap(_.complete(())).void + else + Applicative[F].unit + } + } + } + + def isEmpty: F[Boolean] = leasesR.get.map(_ == 0) + + def await: F[Unit] = latchR.get.flatMap(_.get) + } + + private object Lease { + def apply[F[_]: Concurrent]: F[Lease[F]] = + Concurrent[F].deferred[Unit].flatMap { latch => + latch.complete(()) *> (Concurrent[F].ref(0), Concurrent[F].ref(latch)) + .mapN(new Lease[F](_, _)) + } + } } diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index bb3c05691d..b5fe908415 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -184,4 +184,19 @@ class ChannelSuite extends Fs2Suite { _ <- IO(assert(count == 6)) // we have to overrun the closure to detect it } yield () } + + test("sendPull") { + def blackHole(s: Stream[IO, Unit]) = + s.repeatPull(_.uncons.flatMap { + case None => Pull.pure(None) + case Some((hd, tl)) => + val action = IO.delay(0.until(hd.size).foreach(_ => ())) + Pull.eval(action).as(Some(tl)) + }) + + Channel.bounded[IO, Unit](8).flatMap { channel => + val action = List.fill(64)(()).traverse_(_ => channel.send(()).void) *> channel.close + action.start *> channel.stream.through(blackHole).compile.drain + } + } } From a60219b5932fbe1412f24fffe13f045b1c090835 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sat, 1 Oct 2022 09:57:16 -0600 Subject: [PATCH 09/23] Revert "Fixed race condition between offer, take, and close" This reverts commit 3ff9ee22c94ca2421fa302b81fdab0514da0fe4f. --- .../main/scala/fs2/concurrent/Channel.scala | 79 ++++++------------- .../scala/fs2/concurrent/ChannelSuite.scala | 15 ---- 2 files changed, 23 insertions(+), 71 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index afb1ed74e5..924c7314ce 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -22,7 +22,6 @@ package fs2 package concurrent -import cats.Applicative import cats.effect._ import cats.effect.std.Queue import cats.effect.syntax.all._ @@ -133,7 +132,7 @@ object Channel { Queue.bounded[F, A](capacity).flatMap(impl(_)) private[this] def impl[F[_]: Concurrent, A](q: Queue[F, A]): F[Channel[F, A]] = - (Concurrent[F].deferred[Unit], Lease[F]).mapN { (closedR, lease) => + (Concurrent[F].deferred[Unit], Concurrent[F].ref(0)).mapN { (closedR, leasesR) => new Channel[F, A] { def sendAll: Pipe[F, A, Nothing] = @@ -152,7 +151,7 @@ object Channel { def isClosed: F[Boolean] = closedR.tryGet.map(_.isDefined) private[this] val leasesDrained: F[Boolean] = - lease.isEmpty + leasesR.get.map(_ <= 0) private[this] val isQuiesced: F[Boolean] = isClosed.ifM(leasesDrained, false.pure[F]) @@ -160,7 +159,7 @@ object Channel { def send(a: A): F[Either[Channel.Closed, Unit]] = isClosed.ifM( Channel.Closed.asLeft[Unit].pure[F], - lease.permit(q.offer(a)).map(Right(_)) + (leasesR.update(_ + 1) *> q.offer(a)).guarantee(leasesR.update(_ - 1)).map(Right(_)) ) def trySend(a: A): F[Either[Channel.Closed, Boolean]] = @@ -170,21 +169,27 @@ object Channel { val takeN: F[Chunk[A]] = q.tryTakeN(None).flatMap { case Nil => - val fallback = MonadCancel[F].uncancelable { poll => - poll(Spawn[F].racePair(q.take, closedR.get.both(lease.await))).flatMap { - case Left((oca, fiber)) => - oca.embedNever.flatMap(a => fiber.cancel.as(Chunk.singleton(a))) - - case Right((fiber, ocb)) => - ocb.embedNever.flatMap { _ => - (fiber.cancel *> fiber.join).flatMap { oca => - oca.fold( - Chunk.empty[A].pure[F], - _ => Chunk.empty[A].pure[F], - _.map(Chunk.singleton(_)) - ) - } + val fallback = leasesDrained.flatMap { b => + if (b) { + MonadCancel[F].uncancelable { poll => + poll(Spawn[F].racePair(q.take, closedR.get)).flatMap { + case Left((oca, fiber)) => + oca.embedNever.flatMap(a => fiber.cancel.as(Chunk.singleton(a))) + + case Right((fiber, ocb)) => + ocb.embedNever.flatMap { _ => + (fiber.cancel *> fiber.join).flatMap { oca => + oca.fold( + Chunk.empty[A].pure[F], + _ => Chunk.empty[A].pure[F], + _.map(Chunk.singleton(_)) + ) + } + } } + } + } else { + q.take.map(Chunk.singleton(_)) } } @@ -201,42 +206,4 @@ object Channel { def closed: F[Unit] = closedR.get } } - - // this is like an inverse semaphore and I'm surprised we haven't added it in std yet - private final class Lease[F[_]: Concurrent] private ( - leasesR: Ref[F, Int], - latchR: Ref[F, Deferred[F, Unit]] - ) { - - def permit[A](fa: F[A]): F[A] = - MonadCancel[F].uncancelable { poll => - val init = leasesR.modify(i => (i + 1, i + 1)).flatMap { leases => - if (leases == 1) - Concurrent[F].deferred[Unit].flatMap(latchR.set(_)) - else - Applicative[F].unit - } - - (init *> poll(fa)).guarantee { - leasesR.modify(i => (i - 1, i - 1)).flatMap { leases => - if (leases == 0) - latchR.get.flatMap(_.complete(())).void - else - Applicative[F].unit - } - } - } - - def isEmpty: F[Boolean] = leasesR.get.map(_ == 0) - - def await: F[Unit] = latchR.get.flatMap(_.get) - } - - private object Lease { - def apply[F[_]: Concurrent]: F[Lease[F]] = - Concurrent[F].deferred[Unit].flatMap { latch => - latch.complete(()) *> (Concurrent[F].ref(0), Concurrent[F].ref(latch)) - .mapN(new Lease[F](_, _)) - } - } } diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index b5fe908415..bb3c05691d 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -184,19 +184,4 @@ class ChannelSuite extends Fs2Suite { _ <- IO(assert(count == 6)) // we have to overrun the closure to detect it } yield () } - - test("sendPull") { - def blackHole(s: Stream[IO, Unit]) = - s.repeatPull(_.uncons.flatMap { - case None => Pull.pure(None) - case Some((hd, tl)) => - val action = IO.delay(0.until(hd.size).foreach(_ => ())) - Pull.eval(action).as(Some(tl)) - }) - - Channel.bounded[IO, Unit](8).flatMap { channel => - val action = List.fill(64)(()).traverse_(_ => channel.send(()).void) *> channel.close - action.start *> channel.stream.through(blackHole).compile.drain - } - } } From 58dfbb03c4e800913dc9cdf4909d19f0df892bf6 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sat, 1 Oct 2022 12:31:24 -0600 Subject: [PATCH 10/23] Fixed race condition between offer, take, and close --- .../main/scala/fs2/concurrent/Channel.scala | 147 ++++++++++-------- .../scala/fs2/concurrent/ChannelSuite.scala | 19 ++- 2 files changed, 103 insertions(+), 63 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 924c7314ce..10f3a35b1e 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -22,6 +22,7 @@ package fs2 package concurrent +import cats.Applicative import cats.effect._ import cats.effect.std.Queue import cats.effect.syntax.all._ @@ -132,78 +133,102 @@ object Channel { Queue.bounded[F, A](capacity).flatMap(impl(_)) private[this] def impl[F[_]: Concurrent, A](q: Queue[F, A]): F[Channel[F, A]] = - (Concurrent[F].deferred[Unit], Concurrent[F].ref(0)).mapN { (closedR, leasesR) => - new Channel[F, A] { - - def sendAll: Pipe[F, A, Nothing] = - _.evalMapChunk(send(_)) - .takeWhile(_.isRight) - .onComplete(Stream.exec(close.void)) - .drain - - // doesn't interrupt taking in progress - def close: F[Either[Channel.Closed, Unit]] = - closedR.complete(()).map { - case false => Left(Channel.Closed) - case true => Right(()) + (Concurrent[F].ref(0), Concurrent[F].ref(false), Concurrent[F].deferred[Unit]).mapN { + (leasesR, closedR, closedLatch) => + new Channel[F, A] { + + def sendAll: Pipe[F, A, Nothing] = + _.evalMapChunk(send(_)) + .takeWhile(_.isRight) + .onComplete(Stream.exec(close.void)) + .drain + + // doesn't interrupt taking in progress + def close: F[Either[Channel.Closed, Unit]] = + (closedR.getAndSet(true), leasesR.get).tupled.flatMap { + case (false, leases) => + if (leases <= 0) + closedLatch.complete(()).as(().asRight[Channel.Closed]) + else + ().asRight[Channel.Closed].pure[F] + + case (true, _) => + Channel.Closed.asLeft[Unit].pure[F] + } + + def isClosed: F[Boolean] = closedR.get + + private[this] val isQuiesced: F[Boolean] = + (isClosed, leasesR.get.map(_ <= 0), q.size.map(_ <= 0)).mapN(_ && _ && _) + + def send(a: A): F[Either[Channel.Closed, Unit]] = { + def permit[A](fa: F[A]): F[A] = + MonadCancel[F].uncancelable { poll => + (leasesR.update(_ + 1) *> poll(fa)).guarantee { + (closedR.get, leasesR.updateAndGet(_ - 1)).tupled.flatMap { + case (true, leases) if leases <= 0 => + closedLatch.complete(()).void + + case _ => + Applicative[F].unit + } + } + } + + isClosed.ifM( + Channel.Closed.asLeft[Unit].pure[F], + permit( + // double-check inside the permit window to resolve race condition + isClosed.ifM(Channel.Closed.asLeft[Unit].pure[F], q.offer(a).map(Right(_))) + ) + ) } - def isClosed: F[Boolean] = closedR.tryGet.map(_.isDefined) - - private[this] val leasesDrained: F[Boolean] = - leasesR.get.map(_ <= 0) - - private[this] val isQuiesced: F[Boolean] = - isClosed.ifM(leasesDrained, false.pure[F]) - - def send(a: A): F[Either[Channel.Closed, Unit]] = - isClosed.ifM( - Channel.Closed.asLeft[Unit].pure[F], - (leasesR.update(_ + 1) *> q.offer(a)).guarantee(leasesR.update(_ - 1)).map(Right(_)) - ) - - def trySend(a: A): F[Either[Channel.Closed, Boolean]] = - isClosed.ifM(Channel.Closed.asLeft[Boolean].pure[F], q.tryOffer(a).map(Right(_))) - - def stream: Stream[F, A] = { - val takeN: F[Chunk[A]] = - q.tryTakeN(None).flatMap { - case Nil => - val fallback = leasesDrained.flatMap { b => - if (b) { - MonadCancel[F].uncancelable { poll => - poll(Spawn[F].racePair(q.take, closedR.get)).flatMap { - case Left((oca, fiber)) => - oca.embedNever.flatMap(a => fiber.cancel.as(Chunk.singleton(a))) - - case Right((fiber, ocb)) => - ocb.embedNever.flatMap { _ => - (fiber.cancel *> fiber.join).flatMap { oca => + def trySend(a: A): F[Either[Channel.Closed, Boolean]] = + isClosed.ifM(Channel.Closed.asLeft[Boolean].pure[F], q.tryOffer(a).map(Right(_))) + + def stream: Stream[F, A] = { + val takeN: F[Chunk[A]] = + q.tryTakeN(None).flatMap { + case Nil => + val fallback = MonadCancel[F].uncancelable { poll => + poll(Spawn[F].racePair(q.take, closedLatch.get)).flatMap { + case Left((oca, fiber)) => + oca.embedNever.flatMap(a => fiber.cancel.as(Chunk.singleton(a))) + + case Right((fiber, ocb)) => + ocb.embedNever.flatMap { _ => + (fiber.cancel *> fiber.join).flatMap { oca => + // drain any remaining data + q.tryTakeN(None).flatMap { tail => oca.fold( - Chunk.empty[A].pure[F], - _ => Chunk.empty[A].pure[F], - _.map(Chunk.singleton(_)) + (if (tail.isEmpty) Chunk.empty[A] else Chunk.seq(tail)).pure[F], + _ => + (if (tail.isEmpty) Chunk.empty[A] else Chunk.seq(tail)).pure[F], + _.map { head => + tail match { + case Nil => Chunk.singleton(head) + case tail => Chunk.seq(head :: tail) + } + } ) } } - } + } } - } else { - q.take.map(Chunk.singleton(_)) } - } - isQuiesced.ifM(Chunk.empty[A].pure[F], fallback) + isQuiesced.ifM(Chunk.empty[A].pure[F], fallback) - case as => - Chunk.seq(as).pure[F] - } + case as => + Chunk.seq(as).pure[F] + } - // you can do this more efficiently, just proves a point - Stream.eval(takeN).repeat.takeWhile(!_.isEmpty).unchunks - } + // you can do this more efficiently, just proves a point + Stream.eval(takeN).repeat.takeWhile(!_.isEmpty).unchunks + } - def closed: F[Unit] = closedR.get - } + def closed: F[Unit] = closedLatch.get + } } } diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index bb3c05691d..8db26a10c0 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -147,7 +147,7 @@ class ChannelSuite extends Fs2Suite { result <- IO.sleep(5.seconds) *> chan.stream.compile.toList } yield result - TestControl.executeEmbed(l).assertEquals((0 until 5).toList) + TestControl.executeEmbed(l).assertEquals((0 until 5).toList).parReplicateA_(100) } test("complete all blocked sends after closure") { @@ -166,7 +166,7 @@ class ChannelSuite extends Fs2Suite { _ <- IO(assert(sends.forall(_ == Right(())))) } yield () - TestControl.executeEmbed(test) + TestControl.executeEmbed(test).parReplicateA_(100) } test("eagerly close sendAll upstream") { @@ -184,4 +184,19 @@ class ChannelSuite extends Fs2Suite { _ <- IO(assert(count == 6)) // we have to overrun the closure to detect it } yield () } + + test("sendPull") { + def blackHole(s: Stream[IO, Unit]) = + s.repeatPull(_.uncons.flatMap { + case None => Pull.pure(None) + case Some((hd, tl)) => + val action = IO.delay(0.until(hd.size).foreach(_ => ())) + Pull.eval(action).as(Some(tl)) + }) + + Channel.bounded[IO, Unit](8).flatMap { channel => + val action = List.fill(64)(()).traverse_(_ => channel.send(()).void) *> channel.close + action.start *> channel.stream.through(blackHole).compile.drain + } + } } From 26d832b23d9377646b61fd50ed5d385c8ffe8fe8 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sat, 1 Oct 2022 12:39:24 -0600 Subject: [PATCH 11/23] Fixed shadowing warning --- core/shared/src/main/scala/fs2/concurrent/Channel.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 10f3a35b1e..df964ac1da 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -162,7 +162,7 @@ object Channel { (isClosed, leasesR.get.map(_ <= 0), q.size.map(_ <= 0)).mapN(_ && _ && _) def send(a: A): F[Either[Channel.Closed, Unit]] = { - def permit[A](fa: F[A]): F[A] = + def permit[E](fa: F[E]): F[E] = MonadCancel[F].uncancelable { poll => (leasesR.update(_ + 1) *> poll(fa)).guarantee { (closedR.get, leasesR.updateAndGet(_ - 1)).tupled.flatMap { From 9e21329c07ed9ec083ad860ac946bfe117fe4b73 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sat, 1 Oct 2022 13:49:16 -0600 Subject: [PATCH 12/23] Treat any topic bound over `Short.MaxValue` as unbounded --- core/shared/src/main/scala/fs2/concurrent/Topic.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Topic.scala b/core/shared/src/main/scala/fs2/concurrent/Topic.scala index 7868041760..53d12d2189 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Topic.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Topic.scala @@ -80,7 +80,9 @@ abstract class Topic[F[_], A] { self => /** Like `subscribe`, but represents the subscription explicitly as * a `Resource` which returns after the subscriber is subscribed, - * but before it has started pulling elements. + * but before it has started pulling elements. Note that any value + * of `maxQueued` which is greater than `Short.MaxValue` will be + * treated as "unbounded". */ def subscribeAwait(maxQueued: Int): Resource[F, Stream[F, A]] @@ -159,7 +161,10 @@ object Topic { def subscribeAwait(maxQueued: Int): Resource[F, Stream[F, A]] = Resource - .eval(Channel.bounded[F, A](maxQueued)) + .eval( + if (maxQueued >= Short.MaxValue) Channel.unbounded[F, A] + else Channel.bounded[F, A](maxQueued) + ) .flatMap { chan => val subscribe = state.modify { case (subs, id) => (subs.updated(id, chan), id + 1) -> id From 6063707036ea8cf8d7c6eddad150d0a1a51cd3ab Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sat, 1 Oct 2022 13:51:49 -0600 Subject: [PATCH 13/23] Added a little extra doc --- core/shared/src/main/scala/fs2/concurrent/Topic.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Topic.scala b/core/shared/src/main/scala/fs2/concurrent/Topic.scala index 53d12d2189..7f5d9bff5e 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Topic.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Topic.scala @@ -71,7 +71,8 @@ abstract class Topic[F[_], A] { self => * * If at any point, the queue backing the subscription has `maxQueued` elements in it, * any further publications semantically block until elements are dequeued from the - * subscription queue. + * subscription queue. Any value of `maxQueued` which is greater than `Short.MaxValue` + * is treated as unbounded. * * @param maxQueued maximum number of elements to enqueue to the subscription * queue before blocking publishers From 0ea3d7282b09699de0a621abce50443323662916 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Fri, 7 Oct 2022 15:54:35 -0600 Subject: [PATCH 14/23] Beefed up tests and updated to fixed CE snapshot --- build.sbt | 6 ++-- .../scala/fs2/concurrent/ChannelSuite.scala | 36 ++++++++++++++----- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/build.sbt b/build.sbt index 1963ab5b62..d03779b9e8 100644 --- a/build.sbt +++ b/build.sbt @@ -213,9 +213,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) libraryDependencies ++= Seq( "org.typelevel" %%% "cats-core" % "2.8.0", "org.typelevel" %%% "cats-laws" % "2.8.0" % Test, - "org.typelevel" %%% "cats-effect" % "3.4-4c07d83", - "org.typelevel" %%% "cats-effect-laws" % "3.4-4c07d83" % Test, - "org.typelevel" %%% "cats-effect-testkit" % "3.4-4c07d83" % Test, + "org.typelevel" %%% "cats-effect" % "3.4-148221d", + "org.typelevel" %%% "cats-effect-laws" % "3.4-148221d" % Test, + "org.typelevel" %%% "cats-effect-testkit" % "3.4-148221d" % Test, "org.scodec" %%% "scodec-bits" % "1.1.34", "org.typelevel" %%% "scalacheck-effect-munit" % "2.0.0-M2" % Test, "org.typelevel" %%% "munit-cats-effect" % "2.0.0-M3" % Test, diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index 8db26a10c0..022a29f001 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -185,18 +185,36 @@ class ChannelSuite extends Fs2Suite { } yield () } + def blackHole(s: Stream[IO, Unit]) = + s.repeatPull(_.uncons.flatMap { + case None => Pull.pure(None) + case Some((hd, tl)) => + val action = IO.delay(0.until(hd.size).foreach(_ => ())) + Pull.eval(action).as(Some(tl)) + }) + + @inline + private def sendAll(list: List[Unit], action: IO[Unit]) = + list.foldLeft(IO.unit)((acc, _) => acc *> action) + test("sendPull") { - def blackHole(s: Stream[IO, Unit]) = - s.repeatPull(_.uncons.flatMap { - case None => Pull.pure(None) - case Some((hd, tl)) => - val action = IO.delay(0.until(hd.size).foreach(_ => ())) - Pull.eval(action).as(Some(tl)) - }) - - Channel.bounded[IO, Unit](8).flatMap { channel => + val test = Channel.bounded[IO, Unit](8).flatMap { channel => val action = List.fill(64)(()).traverse_(_ => channel.send(()).void) *> channel.close action.start *> channel.stream.through(blackHole).compile.drain } + + test.replicateA_(1000) + } + + test("sendPullPar8") { + val lists = List.fill(8)(List.fill(8)(())) + + val test = Channel.bounded[IO, Unit](8).flatMap { channel => + val action = lists.parTraverse_(sendAll(_, channel.send(()).void)) *> channel.close + + action &> channel.stream.through(blackHole).compile.drain + } + + test.replicateA_(10000) } } From 487d6a82fad4daf36e46b65eee796e827739833d Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sat, 8 Oct 2022 17:42:50 -0600 Subject: [PATCH 15/23] Reimplemented `Channel` with strictly less `racePair` --- .../main/scala/fs2/concurrent/Channel.scala | 174 ++++++++++++------ .../scala/fs2/concurrent/ChannelSuite.scala | 13 ++ 2 files changed, 126 insertions(+), 61 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index df964ac1da..05da61c52a 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -124,111 +124,163 @@ object Channel { object Closed def unbounded[F[_]: Concurrent, A]: F[Channel[F, A]] = - Queue.unbounded[F, A].flatMap(impl(_)) + Queue.unbounded[F, AnyRef].flatMap(impl(_)) def synchronous[F[_]: Concurrent, A]: F[Channel[F, A]] = - Queue.synchronous[F, A].flatMap(impl(_)) + Queue.synchronous[F, AnyRef].flatMap(impl(_)) def bounded[F[_], A](capacity: Int)(implicit F: Concurrent[F]): F[Channel[F, A]] = - Queue.bounded[F, A](capacity).flatMap(impl(_)) + Queue.bounded[F, AnyRef](capacity).flatMap(impl(_)) - private[this] def impl[F[_]: Concurrent, A](q: Queue[F, A]): F[Channel[F, A]] = + // used as a marker to wake up q.take when the channel is closed + private[this] val Sentinel = new AnyRef + + private[this] val LeftClosed: Either[Channel.Closed, Unit] = Left(Channel.Closed) + private[this] val RightUnit: Either[Channel.Closed, Unit] = Right(()) + + // technically this should be A | Sentinel.type + // the queue will consist of exclusively As until we shut down, when there will be one Sentinel + private[this] def impl[F[_]: Concurrent, A](q: Queue[F, AnyRef]): F[Channel[F, A]] = (Concurrent[F].ref(0), Concurrent[F].ref(false), Concurrent[F].deferred[Unit]).mapN { (leasesR, closedR, closedLatch) => new Channel[F, A] { - def sendAll: Pipe[F, A, Nothing] = + private[this] val LeftClosedF = LeftClosed.pure[F] + private[this] val FalseF = false.pure[F] + + // might be interesting to try to optimize this more, but it needs support from CE + val sendAll: Pipe[F, A, Nothing] = _.evalMapChunk(send(_)) .takeWhile(_.isRight) .onComplete(Stream.exec(close.void)) .drain - // doesn't interrupt taking in progress - def close: F[Either[Channel.Closed, Unit]] = - (closedR.getAndSet(true), leasesR.get).tupled.flatMap { - case (false, leases) => - if (leases <= 0) - closedLatch.complete(()).as(().asRight[Channel.Closed]) - else - ().asRight[Channel.Closed].pure[F] - - case (true, _) => - Channel.Closed.asLeft[Unit].pure[F] + // setting the flag means we won't accept any more sends + val close: F[Either[Channel.Closed, Unit]] = + closedR.getAndSet(true).flatMap { b => + if (b) { + LeftClosedF + } else { + leasesR.get.flatMap { leases => + if (leases <= 0) + q.offer(Sentinel).start.as(RightUnit) + else + RightUnit.pure[F] + } + } } - def isClosed: F[Boolean] = closedR.get + val isClosed: F[Boolean] = closedR.get + // there are four states to worry about: open, closing, draining, quiesced + // in the second state, we have outstanding blocked sends + // in the third state we have data in the queue but no sends + // in the fourth state we are completely drained and can shut down the stream private[this] val isQuiesced: F[Boolean] = - (isClosed, leasesR.get.map(_ <= 0), q.size.map(_ <= 0)).mapN(_ && _ && _) + isClosed.flatMap { b => + if (b) { + leasesR.get.flatMap { leases => + if (leases <= 0) + q.size.map(_ <= 0) + else + FalseF + } + } else { + FalseF + } + } def send(a: A): F[Either[Channel.Closed, Unit]] = { - def permit[E](fa: F[E]): F[E] = + // we track the outstanding blocked offers so we can distinguish closing from draining + // the very last blocked send, when closed, is responsible for triggering the sentinel + def permit[E](fe: F[E]): F[E] = MonadCancel[F].uncancelable { poll => - (leasesR.update(_ + 1) *> poll(fa)).guarantee { - (closedR.get, leasesR.updateAndGet(_ - 1)).tupled.flatMap { - case (true, leases) if leases <= 0 => - closedLatch.complete(()).void - - case _ => + (leasesR.update(_ + 1) *> poll(fe)).guarantee { + leasesR.updateAndGet(_ - 1).flatMap { leases => + if (leases <= 0) { + closedR.get.flatMap { b => + if (b) + q.offer(Sentinel) + .start + .void // we don't want to backpressure on processing the sentinel + else + Applicative[F].unit + } + } else { Applicative[F].unit + } } } } isClosed.ifM( - Channel.Closed.asLeft[Unit].pure[F], - permit( - // double-check inside the permit window to resolve race condition - isClosed.ifM(Channel.Closed.asLeft[Unit].pure[F], q.offer(a).map(Right(_))) - ) + LeftClosedF, + permit(isClosed.ifM(LeftClosedF, q.offer(a.asInstanceOf[AnyRef]).as(RightUnit))) ) } def trySend(a: A): F[Either[Channel.Closed, Boolean]] = - isClosed.ifM(Channel.Closed.asLeft[Boolean].pure[F], q.tryOffer(a).map(Right(_))) + isClosed.flatMap { b => + if (b) + LeftClosedF.asInstanceOf[F[Either[Channel.Closed, Boolean]]] + else + q.tryOffer(a.asInstanceOf[AnyRef]).map(_.asRight[Channel.Closed]) + } - def stream: Stream[F, A] = { + val stream: Stream[F, A] = { val takeN: F[Chunk[A]] = q.tryTakeN(None).flatMap { case Nil => - val fallback = MonadCancel[F].uncancelable { poll => - poll(Spawn[F].racePair(q.take, closedLatch.get)).flatMap { - case Left((oca, fiber)) => - oca.embedNever.flatMap(a => fiber.cancel.as(Chunk.singleton(a))) - - case Right((fiber, ocb)) => - ocb.embedNever.flatMap { _ => - (fiber.cancel *> fiber.join).flatMap { oca => - // drain any remaining data - q.tryTakeN(None).flatMap { tail => - oca.fold( - (if (tail.isEmpty) Chunk.empty[A] else Chunk.seq(tail)).pure[F], - _ => - (if (tail.isEmpty) Chunk.empty[A] else Chunk.seq(tail)).pure[F], - _.map { head => - tail match { - case Nil => Chunk.singleton(head) - case tail => Chunk.seq(head :: tail) - } - } - ) - } - } - } - } + // if we land here, it either means we're consuming faster than producing + // or it means we're actually closed and we need to shut down + // this is the unhappy path either way + + val fallback = q.take.flatMap { a => + // if we get the sentinel, shut down all the things, otherwise emit + if (a eq Sentinel) + closedLatch.complete(()).as(Chunk.empty[A]) + else + Chunk.singleton(a.asInstanceOf[A]).pure[F] } - isQuiesced.ifM(Chunk.empty[A].pure[F], fallback) + // check to see if we're closed and done processing + // if we're all done, complete the latch and terminate the stream + isQuiesced.flatMap { b => + if (b) + closedLatch.complete(()).as(Chunk.empty[A]) + else + fallback + } case as => - Chunk.seq(as).pure[F] + // this is the happy path: we were able to take a chunk + // meaning we're producing as fast or faster than we're consuming + + isClosed.flatMap { b => + val back = if (b) { + // if we're closed, we have to check for the sentinel and strip it out + val as2 = as.filter(_ ne Sentinel) + + // if it's empty, we definitely stripped a sentinel, so just be done + // if it's non-empty, we can't know without expensive comparisons, so fall through + if (as2.isEmpty) + closedLatch.complete(()).as(Chunk.empty[A]) + else + Chunk.seq(as2).pure[F] + } else { + Chunk.seq(as).pure[F] + } + + back.asInstanceOf[F[Chunk[A]]] + } } - // you can do this more efficiently, just proves a point + // we will emit non-empty chunks until we see the sentinel Stream.eval(takeN).repeat.takeWhile(!_.isEmpty).unchunks } - def closed: F[Unit] = closedLatch.get + // closedLatch solely exists to support this function + val closed: F[Unit] = closedLatch.get } } } diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index 022a29f001..d0dd029d1b 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -30,6 +30,19 @@ import scala.concurrent.duration._ import org.scalacheck.effect.PropF.forAllF class ChannelSuite extends Fs2Suite { + + test("receives some simple elements above capacity and closes") { + val test = Channel.bounded[IO, Int](5).flatMap { chan => + val senders = 0.until(10).toList.parTraverse_ { i => + IO.sleep(i.millis) *> chan.send(i) + } + + senders &> (IO.sleep(15.millis) *> chan.close *> chan.stream.compile.toVector) + } + + TestControl.executeEmbed(test) + } + test("Channel receives all elements and closes") { forAllF { (source: Stream[Pure, Int]) => Channel.unbounded[IO, Int].flatMap { chan => From e0c2c78c8180f008c783c2c3c2110d309e68778f Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sat, 8 Oct 2022 20:27:01 -0600 Subject: [PATCH 16/23] Improved efficiency of `stream` pulling --- .../src/main/scala/fs2/concurrent/Channel.scala | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 05da61c52a..f2d4369f14 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -275,8 +275,18 @@ object Channel { } } - // we will emit non-empty chunks until we see the sentinel - Stream.eval(takeN).repeat.takeWhile(!_.isEmpty).unchunks + // we will emit non-empty chunks until we see the empty chunk sentinel + lazy val go: Stream[F, A] = + Stream.force { + takeN.map { c => + if (c.isEmpty) + Stream.empty + else + Stream.chunk(c) ++ go + } + } + + go } // closedLatch solely exists to support this function From ccebad83e5f56d480ec495fc75ede19cb1ea5526 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sat, 8 Oct 2022 21:27:12 -0600 Subject: [PATCH 17/23] Avoid being too slow on scalajs --- core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index d0dd029d1b..d3ff9ee65d 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -216,7 +216,7 @@ class ChannelSuite extends Fs2Suite { action.start *> channel.stream.through(blackHole).compile.drain } - test.replicateA_(1000) + test.replicateA_(if (isJVM) 1000 else 1) } test("sendPullPar8") { @@ -228,6 +228,6 @@ class ChannelSuite extends Fs2Suite { action &> channel.stream.through(blackHole).compile.drain } - test.replicateA_(10000) + test.replicateA_(if (isJVM) 10000 else 1) } } From 16f8a8dfc6fff4143a9849f4f60bf88c9773c0c7 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sat, 22 Oct 2022 10:01:02 -0600 Subject: [PATCH 18/23] `close` needs to be atomic --- .../main/scala/fs2/concurrent/Channel.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index f2d4369f14..3aa81ae769 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -157,15 +157,17 @@ object Channel { // setting the flag means we won't accept any more sends val close: F[Either[Channel.Closed, Unit]] = - closedR.getAndSet(true).flatMap { b => - if (b) { - LeftClosedF - } else { - leasesR.get.flatMap { leases => - if (leases <= 0) - q.offer(Sentinel).start.as(RightUnit) - else - RightUnit.pure[F] + MonadCancel[F].uncancelable { _ => + closedR.getAndSet(true).flatMap { b => + if (b) { + LeftClosedF + } else { + leasesR.get.flatMap { leases => + if (leases <= 0) + q.offer(Sentinel).start.as(RightUnit) + else + RightUnit.pure[F] + } } } } From 6ba11267229f32b9461a897600c7fd82b4171e94 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Tue, 25 Oct 2022 13:23:22 -0600 Subject: [PATCH 19/23] Reworked `loop` to use `Pull` directly; trying to diagnose --- core/shared/src/main/scala/fs2/Stream.scala | 10 +++- .../main/scala/fs2/concurrent/Channel.scala | 58 +++++++++---------- .../scala/fs2/concurrent/BroadcastSuite.scala | 21 +++++-- .../scala/fs2/concurrent/ChannelSuite.scala | 9 +++ 4 files changed, 59 insertions(+), 39 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index e91c464d7e..a978c8e6b9 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -559,7 +559,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, // stop background process but await for it to finalise with a result // We use F.fromEither to bring errors from the back into the fore - val stopBack: F2[Unit] = interrupt.complete(()) >> backResult.get.flatMap(F.fromEither) + val stopBack: F2[Unit] = interrupt.complete(()) >> {println("interrupting!"); backResult.get.flatMap(F.fromEither)} (Stream.bracket(compileBack.start)(_ => stopBack), watch(this)) } @@ -2106,7 +2106,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, val action = ( Semaphore[F2](concurrency.toLong), - Channel.bounded[F2, F2[Either[Throwable, O2]]](concurrency), + if (concurrency >= Short.MaxValue) + Channel.unbounded[F2, F2[Either[Throwable, O2]]] + else + Channel.bounded[F2, F2[Either[Throwable, O2]]](concurrency), Deferred[F2, Unit], Deferred[F2, Unit] ).mapN { (semaphore, channel, stop, end) => @@ -4154,11 +4157,12 @@ object Stream extends StreamLowPriority { def signalResult(fiber: Fiber[F, Throwable, Unit]): F[Unit] = done.get.flatMap { blah => - blah.flatten.fold[F[Unit]](fiber.joinWithNever)(F.raiseError) + blah.flatten.fold[F[Unit]]({println("joining"); fiber.joinWithNever})(F.raiseError) } Stream .bracket(F.start(runOuter) >> F.start(outcomeJoiner)) { fiber => + println("canceling parjoin") stop(None) >> // in case of short-circuiting, the `fiberJoiner` would not have had a chance // to wait until all fibers have been joined, so we need to do it manually diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 3aa81ae769..6094e4e7ce 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -124,13 +124,15 @@ object Channel { object Closed def unbounded[F[_]: Concurrent, A]: F[Channel[F, A]] = - Queue.unbounded[F, AnyRef].flatMap(impl(_)) + Queue.unbounded[F, AnyRef].flatMap(impl(_, false)) def synchronous[F[_]: Concurrent, A]: F[Channel[F, A]] = - Queue.synchronous[F, AnyRef].flatMap(impl(_)) + Queue.synchronous[F, AnyRef].flatMap(impl(_, true)) - def bounded[F[_], A](capacity: Int)(implicit F: Concurrent[F]): F[Channel[F, A]] = - Queue.bounded[F, AnyRef](capacity).flatMap(impl(_)) + def bounded[F[_], A](capacity: Int)(implicit F: Concurrent[F]): F[Channel[F, A]] = { + require(capacity < Short.MaxValue) + Queue.bounded[F, AnyRef](capacity).flatMap(impl(_, false)) + } // used as a marker to wake up q.take when the channel is closed private[this] val Sentinel = new AnyRef @@ -140,7 +142,7 @@ object Channel { // technically this should be A | Sentinel.type // the queue will consist of exclusively As until we shut down, when there will be one Sentinel - private[this] def impl[F[_]: Concurrent, A](q: Queue[F, AnyRef]): F[Channel[F, A]] = + private[this] def impl[F[_]: Concurrent, A](q: Queue[F, AnyRef], log: Boolean): F[Channel[F, A]] = (Concurrent[F].ref(0), Concurrent[F].ref(false), Concurrent[F].deferred[Unit]).mapN { (leasesR, closedR, closedLatch) => new Channel[F, A] { @@ -230,65 +232,59 @@ object Channel { } val stream: Stream[F, A] = { - val takeN: F[Chunk[A]] = - q.tryTakeN(None).flatMap { + lazy val loop: Pull[F, A, Unit] = { + if (log) println("looping!") + + val pullF = q.tryTakeN(None).flatMap { case Nil => // if we land here, it either means we're consuming faster than producing // or it means we're actually closed and we need to shut down // this is the unhappy path either way - val fallback = q.take.flatMap { a => + val fallback = (leasesR.get, closedR.get, closedLatch.tryGet).tupled.map(p => if (log) println(s">>> taking: $p")) >> q.take.map { a => + if (log) println("<<< took") // if we get the sentinel, shut down all the things, otherwise emit if (a eq Sentinel) - closedLatch.complete(()).as(Chunk.empty[A]) + Pull.eval(closedLatch.complete(()).void) else - Chunk.singleton(a.asInstanceOf[A]).pure[F] + Pull.output1(a.asInstanceOf[A]) >> loop } // check to see if we're closed and done processing // if we're all done, complete the latch and terminate the stream - isQuiesced.flatMap { b => + isQuiesced.map { b => if (b) - closedLatch.complete(()).as(Chunk.empty[A]) + Pull.eval(closedLatch.complete(()).void) else - fallback + Pull.eval(fallback).flatten } case as => // this is the happy path: we were able to take a chunk // meaning we're producing as fast or faster than we're consuming + if (log) println(s"tookN $as") - isClosed.flatMap { b => - val back = if (b) { + isClosed.map { b => + if (b) { // if we're closed, we have to check for the sentinel and strip it out val as2 = as.filter(_ ne Sentinel) // if it's empty, we definitely stripped a sentinel, so just be done // if it's non-empty, we can't know without expensive comparisons, so fall through if (as2.isEmpty) - closedLatch.complete(()).as(Chunk.empty[A]) + Pull.eval(closedLatch.complete(()).void) else - Chunk.seq(as2).pure[F] + Pull.output(Chunk.seq(as2.asInstanceOf[List[A]])) >> loop } else { - Chunk.seq(as).pure[F] + Pull.output(Chunk.seq(as.asInstanceOf[List[A]])) >> loop } - - back.asInstanceOf[F[Chunk[A]]] } } - // we will emit non-empty chunks until we see the empty chunk sentinel - lazy val go: Stream[F, A] = - Stream.force { - takeN.map { c => - if (c.isEmpty) - Stream.empty - else - Stream.chunk(c) ++ go - } - } + Pull.eval(pullF).flatten + } - go + loop.stream } // closedLatch solely exists to support this function diff --git a/core/shared/src/test/scala/fs2/concurrent/BroadcastSuite.scala b/core/shared/src/test/scala/fs2/concurrent/BroadcastSuite.scala index ef78504371..90a5ca93d3 100644 --- a/core/shared/src/test/scala/fs2/concurrent/BroadcastSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/BroadcastSuite.scala @@ -24,6 +24,7 @@ package concurrent import cats.effect.IO import org.scalacheck.effect.PropF.forAllF +import scala.concurrent.TimeoutException import scala.concurrent.duration._ import cats.syntax.all._ @@ -50,11 +51,14 @@ class BroadcastSuite extends Fs2Suite { } } - test("all subscribers see all elements, pipe immediately interrupted") { - forAllF { (source: Stream[Pure, Int], concurrent0: Int) => + test("all subscribers see all elements, pipe immediately interrupted".only) { + // forAllF { (source: Stream[Pure, Int], concurrent0: Int) => + val source = Stream(-1, 1323912972, 2147483647, 1909897629, -2147483648, 0) + val concurrent0 = 19 val concurrent = (concurrent0 % 20).abs.max(1) - val interruptedPipe = scala.util.Random.nextInt(concurrent) + val interruptedPipe = 0 val expected = source.compile.toVector.map(_.toString) + println(s"concurrent = $concurrent; expected = $expected") def pipe(idx: Int): Pipe[IO, Int, (Int, String)] = _.map(i => (idx, i.toString)) @@ -71,7 +75,12 @@ class BroadcastSuite extends Fs2Suite { .broadcastThrough(pipes: _*) .compile .toVector - .timeout(5.seconds) + .onCancel(IO.println("cancelation finished")) + .race(IO.sleep(5.seconds) *> IO.println("interruption fired")) + .flatMap { + case Left(r) => IO.pure(r) + case Right(_) => IO.raiseError(new TimeoutException) + } .map(x => (x.foldMap { case (k, v) => Map(k -> Vector(v)) } - interruptedPipe).values) .map { result => if (expected.nonEmpty) { @@ -79,7 +88,9 @@ class BroadcastSuite extends Fs2Suite { result.foreach(it => assertEquals(it, expected)) } else assert(result.isEmpty) } - } + .flatMap(_ => IO.println("-----------------------")) + .replicateA(100) + // } } test("pipes should not be empty") { diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index d3ff9ee65d..073fcdceb8 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -230,4 +230,13 @@ class ChannelSuite extends Fs2Suite { test.replicateA_(if (isJVM) 10000 else 1) } + + test("synchronous with many concurrents and close".only) { + val test = Channel.synchronous[IO, Int] flatMap { ch => + 0.until(20).toList.parTraverse_(i => ch.send(i).iterateWhile(_.isRight)) &> + ch.stream.concurrently(Stream.eval(ch.close.delayBy(1.seconds))).compile.drain + } + + test.parReplicateA(100) + } } From 349e3e354282261982b7b21fb53358047b6bf7b7 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Wed, 26 Oct 2022 12:24:18 -0600 Subject: [PATCH 20/23] Removed debugging logic and brought in CE snapshot with fix --- build.sbt | 6 +++--- core/shared/src/main/scala/fs2/Stream.scala | 5 ++--- .../main/scala/fs2/concurrent/Channel.scala | 14 +++++-------- .../scala/fs2/concurrent/BroadcastSuite.scala | 20 +++++-------------- .../scala/fs2/concurrent/ChannelSuite.scala | 2 +- 5 files changed, 16 insertions(+), 31 deletions(-) diff --git a/build.sbt b/build.sbt index be273a235e..8eade5020a 100644 --- a/build.sbt +++ b/build.sbt @@ -205,9 +205,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) libraryDependencies ++= Seq( "org.typelevel" %%% "cats-core" % "2.8.0", "org.typelevel" %%% "cats-laws" % "2.8.0" % Test, - "org.typelevel" %%% "cats-effect" % "3.4.0-RC2", - "org.typelevel" %%% "cats-effect-laws" % "3.4.0-RC2" % Test, - "org.typelevel" %%% "cats-effect-testkit" % "3.4.0-RC2" % Test, + "org.typelevel" %%% "cats-effect" % "3.4-7154d08", + "org.typelevel" %%% "cats-effect-laws" % "3.4-7154d08" % Test, + "org.typelevel" %%% "cats-effect-testkit" % "3.4-7154d08" % Test, "org.scodec" %%% "scodec-bits" % "1.1.34", "org.typelevel" %%% "scalacheck-effect-munit" % "2.0.0-M2" % Test, "org.typelevel" %%% "munit-cats-effect" % "2.0.0-M3" % Test, diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index a978c8e6b9..3d281d1eca 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -559,7 +559,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, // stop background process but await for it to finalise with a result // We use F.fromEither to bring errors from the back into the fore - val stopBack: F2[Unit] = interrupt.complete(()) >> {println("interrupting!"); backResult.get.flatMap(F.fromEither)} + val stopBack: F2[Unit] = interrupt.complete(()) >> backResult.get.flatMap(F.fromEither) (Stream.bracket(compileBack.start)(_ => stopBack), watch(this)) } @@ -4157,12 +4157,11 @@ object Stream extends StreamLowPriority { def signalResult(fiber: Fiber[F, Throwable, Unit]): F[Unit] = done.get.flatMap { blah => - blah.flatten.fold[F[Unit]]({println("joining"); fiber.joinWithNever})(F.raiseError) + blah.flatten.fold[F[Unit]](fiber.joinWithNever)(F.raiseError) } Stream .bracket(F.start(runOuter) >> F.start(outcomeJoiner)) { fiber => - println("canceling parjoin") stop(None) >> // in case of short-circuiting, the `fiberJoiner` would not have had a chance // to wait until all fibers have been joined, so we need to do it manually diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 6094e4e7ce..a1ffa443e7 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -124,14 +124,14 @@ object Channel { object Closed def unbounded[F[_]: Concurrent, A]: F[Channel[F, A]] = - Queue.unbounded[F, AnyRef].flatMap(impl(_, false)) + Queue.unbounded[F, AnyRef].flatMap(impl(_)) def synchronous[F[_]: Concurrent, A]: F[Channel[F, A]] = - Queue.synchronous[F, AnyRef].flatMap(impl(_, true)) + Queue.synchronous[F, AnyRef].flatMap(impl(_)) def bounded[F[_], A](capacity: Int)(implicit F: Concurrent[F]): F[Channel[F, A]] = { require(capacity < Short.MaxValue) - Queue.bounded[F, AnyRef](capacity).flatMap(impl(_, false)) + Queue.bounded[F, AnyRef](capacity).flatMap(impl(_)) } // used as a marker to wake up q.take when the channel is closed @@ -142,7 +142,7 @@ object Channel { // technically this should be A | Sentinel.type // the queue will consist of exclusively As until we shut down, when there will be one Sentinel - private[this] def impl[F[_]: Concurrent, A](q: Queue[F, AnyRef], log: Boolean): F[Channel[F, A]] = + private[this] def impl[F[_]: Concurrent, A](q: Queue[F, AnyRef]): F[Channel[F, A]] = (Concurrent[F].ref(0), Concurrent[F].ref(false), Concurrent[F].deferred[Unit]).mapN { (leasesR, closedR, closedLatch) => new Channel[F, A] { @@ -233,16 +233,13 @@ object Channel { val stream: Stream[F, A] = { lazy val loop: Pull[F, A, Unit] = { - if (log) println("looping!") - val pullF = q.tryTakeN(None).flatMap { case Nil => // if we land here, it either means we're consuming faster than producing // or it means we're actually closed and we need to shut down // this is the unhappy path either way - val fallback = (leasesR.get, closedR.get, closedLatch.tryGet).tupled.map(p => if (log) println(s">>> taking: $p")) >> q.take.map { a => - if (log) println("<<< took") + val fallback = q.take.map { a => // if we get the sentinel, shut down all the things, otherwise emit if (a eq Sentinel) Pull.eval(closedLatch.complete(()).void) @@ -262,7 +259,6 @@ object Channel { case as => // this is the happy path: we were able to take a chunk // meaning we're producing as fast or faster than we're consuming - if (log) println(s"tookN $as") isClosed.map { b => if (b) { diff --git a/core/shared/src/test/scala/fs2/concurrent/BroadcastSuite.scala b/core/shared/src/test/scala/fs2/concurrent/BroadcastSuite.scala index 90a5ca93d3..ddd7480d6b 100644 --- a/core/shared/src/test/scala/fs2/concurrent/BroadcastSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/BroadcastSuite.scala @@ -24,7 +24,6 @@ package concurrent import cats.effect.IO import org.scalacheck.effect.PropF.forAllF -import scala.concurrent.TimeoutException import scala.concurrent.duration._ import cats.syntax.all._ @@ -51,14 +50,11 @@ class BroadcastSuite extends Fs2Suite { } } - test("all subscribers see all elements, pipe immediately interrupted".only) { - // forAllF { (source: Stream[Pure, Int], concurrent0: Int) => - val source = Stream(-1, 1323912972, 2147483647, 1909897629, -2147483648, 0) - val concurrent0 = 19 + test("all subscribers see all elements, pipe immediately interrupted") { + forAllF { (source: Stream[Pure, Int], concurrent0: Int) => val concurrent = (concurrent0 % 20).abs.max(1) val interruptedPipe = 0 val expected = source.compile.toVector.map(_.toString) - println(s"concurrent = $concurrent; expected = $expected") def pipe(idx: Int): Pipe[IO, Int, (Int, String)] = _.map(i => (idx, i.toString)) @@ -75,12 +71,7 @@ class BroadcastSuite extends Fs2Suite { .broadcastThrough(pipes: _*) .compile .toVector - .onCancel(IO.println("cancelation finished")) - .race(IO.sleep(5.seconds) *> IO.println("interruption fired")) - .flatMap { - case Left(r) => IO.pure(r) - case Right(_) => IO.raiseError(new TimeoutException) - } + .timeout(5.seconds) .map(x => (x.foldMap { case (k, v) => Map(k -> Vector(v)) } - interruptedPipe).values) .map { result => if (expected.nonEmpty) { @@ -88,9 +79,8 @@ class BroadcastSuite extends Fs2Suite { result.foreach(it => assertEquals(it, expected)) } else assert(result.isEmpty) } - .flatMap(_ => IO.println("-----------------------")) - .replicateA(100) - // } + // .replicateA(100) + } } test("pipes should not be empty") { diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index 073fcdceb8..12be8dc2f8 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -232,7 +232,7 @@ class ChannelSuite extends Fs2Suite { } test("synchronous with many concurrents and close".only) { - val test = Channel.synchronous[IO, Int] flatMap { ch => + val test = Channel.synchronous[IO, Int].flatMap { ch => 0.until(20).toList.parTraverse_(i => ch.send(i).iterateWhile(_.isRight)) &> ch.stream.concurrently(Stream.eval(ch.close.delayBy(1.seconds))).compile.drain } From a39f26acfbde54258a231d371ccf1989b8af9d07 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Wed, 26 Oct 2022 14:38:43 -0600 Subject: [PATCH 21/23] Removed stray `only` --- core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index 12be8dc2f8..d8eca5dbaf 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -231,7 +231,7 @@ class ChannelSuite extends Fs2Suite { test.replicateA_(if (isJVM) 10000 else 1) } - test("synchronous with many concurrents and close".only) { + test("synchronous with many concurrents and close") { val test = Channel.synchronous[IO, Int].flatMap { ch => 0.until(20).toList.parTraverse_(i => ch.send(i).iterateWhile(_.isRight)) &> ch.stream.concurrently(Stream.eval(ch.close.delayBy(1.seconds))).compile.drain From 4a7008035d76429dd55babb393099a97f608f824 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Wed, 26 Oct 2022 14:58:38 -0600 Subject: [PATCH 22/23] Refactored two `Ref`s into one --- .../main/scala/fs2/concurrent/Channel.scala | 245 +++++++++--------- 1 file changed, 121 insertions(+), 124 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index a1ffa443e7..62c582c080 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -140,151 +140,148 @@ object Channel { private[this] val LeftClosed: Either[Channel.Closed, Unit] = Left(Channel.Closed) private[this] val RightUnit: Either[Channel.Closed, Unit] = Right(()) + private final case class State(leases: Int, closed: Boolean) + + private object State { + val Empty: State = State(0, false) + } + // technically this should be A | Sentinel.type // the queue will consist of exclusively As until we shut down, when there will be one Sentinel private[this] def impl[F[_]: Concurrent, A](q: Queue[F, AnyRef]): F[Channel[F, A]] = - (Concurrent[F].ref(0), Concurrent[F].ref(false), Concurrent[F].deferred[Unit]).mapN { - (leasesR, closedR, closedLatch) => - new Channel[F, A] { - - private[this] val LeftClosedF = LeftClosed.pure[F] - private[this] val FalseF = false.pure[F] - - // might be interesting to try to optimize this more, but it needs support from CE - val sendAll: Pipe[F, A, Nothing] = - _.evalMapChunk(send(_)) - .takeWhile(_.isRight) - .onComplete(Stream.exec(close.void)) - .drain - - // setting the flag means we won't accept any more sends - val close: F[Either[Channel.Closed, Unit]] = - MonadCancel[F].uncancelable { _ => - closedR.getAndSet(true).flatMap { b => - if (b) { - LeftClosedF - } else { - leasesR.get.flatMap { leases => - if (leases <= 0) - q.offer(Sentinel).start.as(RightUnit) - else - RightUnit.pure[F] - } - } - } - } + (Concurrent[F].ref(State.Empty), Concurrent[F].deferred[Unit]).mapN { (stateR, closedLatch) => + new Channel[F, A] { + + private[this] val LeftClosedF = LeftClosed.pure[F] + private[this] val FalseF = false.pure[F] + + // might be interesting to try to optimize this more, but it needs support from CE + val sendAll: Pipe[F, A, Nothing] = + _.evalMapChunk(send(_)) + .takeWhile(_.isRight) + .onComplete(Stream.exec(close.void)) + .drain + + // setting the flag means we won't accept any more sends + val close: F[Either[Channel.Closed, Unit]] = { + val modifyF = stateR.modify { + case State(0, false) => + State(0, true) -> q.offer(Sentinel).start.as(RightUnit) + + case State(leases, false) => + State(leases, true) -> RightUnit.pure[F] + + case st @ State(_, true) => + st -> LeftClosedF + } - val isClosed: F[Boolean] = closedR.get - - // there are four states to worry about: open, closing, draining, quiesced - // in the second state, we have outstanding blocked sends - // in the third state we have data in the queue but no sends - // in the fourth state we are completely drained and can shut down the stream - private[this] val isQuiesced: F[Boolean] = - isClosed.flatMap { b => - if (b) { - leasesR.get.flatMap { leases => - if (leases <= 0) - q.size.map(_ <= 0) - else - FalseF - } - } else { - FalseF - } - } + modifyF.flatten.uncancelable + } + + val isClosed: F[Boolean] = stateR.get.map(_.closed) + + // there are four states to worry about: open, closing, draining, quiesced + // in the second state, we have outstanding blocked sends + // in the third state we have data in the queue but no sends + // in the fourth state we are completely drained and can shut down the stream + private[this] val isQuiesced: F[Boolean] = + stateR.get.flatMap { + case State(0, true) => q.size.map(_ == 0) + case _ => FalseF + } - def send(a: A): F[Either[Channel.Closed, Unit]] = { + def send(a: A): F[Either[Channel.Closed, Unit]] = + MonadCancel[F].uncancelable { poll => // we track the outstanding blocked offers so we can distinguish closing from draining // the very last blocked send, when closed, is responsible for triggering the sentinel - def permit[E](fe: F[E]): F[E] = - MonadCancel[F].uncancelable { poll => - (leasesR.update(_ + 1) *> poll(fe)).guarantee { - leasesR.updateAndGet(_ - 1).flatMap { leases => - if (leases <= 0) { - closedR.get.flatMap { b => - if (b) - q.offer(Sentinel) - .start - .void // we don't want to backpressure on processing the sentinel - else - Applicative[F].unit - } - } else { - Applicative[F].unit - } + + val modifyF = stateR.modify { + case st @ State(_, true) => + st -> LeftClosedF + + case State(leases, false) => + val cleanupF = { + val modifyF = stateR.modify { + case State(1, true) => + State(0, true) -> q.offer(Sentinel).start.void + + case State(leases, closed) => + State(leases - 1, closed) -> Applicative[F].unit } + + modifyF.flatten } - } - isClosed.ifM( - LeftClosedF, - permit(isClosed.ifM(LeftClosedF, q.offer(a.asInstanceOf[AnyRef]).as(RightUnit))) - ) - } + val offerF = poll(q.offer(a.asInstanceOf[AnyRef]).as(RightUnit)) - def trySend(a: A): F[Either[Channel.Closed, Boolean]] = - isClosed.flatMap { b => - if (b) - LeftClosedF.asInstanceOf[F[Either[Channel.Closed, Boolean]]] - else - q.tryOffer(a.asInstanceOf[AnyRef]).map(_.asRight[Channel.Closed]) + State(leases + 1, false) -> offerF.guarantee(cleanupF).as(RightUnit) } - val stream: Stream[F, A] = { - lazy val loop: Pull[F, A, Unit] = { - val pullF = q.tryTakeN(None).flatMap { - case Nil => - // if we land here, it either means we're consuming faster than producing - // or it means we're actually closed and we need to shut down - // this is the unhappy path either way - - val fallback = q.take.map { a => - // if we get the sentinel, shut down all the things, otherwise emit - if (a eq Sentinel) - Pull.eval(closedLatch.complete(()).void) - else - Pull.output1(a.asInstanceOf[A]) >> loop - } + modifyF.flatten + } + + def trySend(a: A): F[Either[Channel.Closed, Boolean]] = + isClosed.flatMap { b => + if (b) + LeftClosedF.asInstanceOf[F[Either[Channel.Closed, Boolean]]] + else + q.tryOffer(a.asInstanceOf[AnyRef]).map(_.asRight[Channel.Closed]) + } - // check to see if we're closed and done processing - // if we're all done, complete the latch and terminate the stream - isQuiesced.map { b => - if (b) + val stream: Stream[F, A] = { + lazy val loop: Pull[F, A, Unit] = { + val pullF = q.tryTakeN(None).flatMap { + case Nil => + // if we land here, it either means we're consuming faster than producing + // or it means we're actually closed and we need to shut down + // this is the unhappy path either way + + val fallback = q.take.map { a => + // if we get the sentinel, shut down all the things, otherwise emit + if (a eq Sentinel) + Pull.eval(closedLatch.complete(()).void) + else + Pull.output1(a.asInstanceOf[A]) >> loop + } + + // check to see if we're closed and done processing + // if we're all done, complete the latch and terminate the stream + isQuiesced.map { b => + if (b) + Pull.eval(closedLatch.complete(()).void) + else + Pull.eval(fallback).flatten + } + + case as => + // this is the happy path: we were able to take a chunk + // meaning we're producing as fast or faster than we're consuming + + isClosed.map { b => + if (b) { + // if we're closed, we have to check for the sentinel and strip it out + val as2 = as.filter(_ ne Sentinel) + + // if it's empty, we definitely stripped a sentinel, so just be done + // if it's non-empty, we can't know without expensive comparisons, so fall through + if (as2.isEmpty) Pull.eval(closedLatch.complete(()).void) else - Pull.eval(fallback).flatten + Pull.output(Chunk.seq(as2.asInstanceOf[List[A]])) >> loop + } else { + Pull.output(Chunk.seq(as.asInstanceOf[List[A]])) >> loop } - - case as => - // this is the happy path: we were able to take a chunk - // meaning we're producing as fast or faster than we're consuming - - isClosed.map { b => - if (b) { - // if we're closed, we have to check for the sentinel and strip it out - val as2 = as.filter(_ ne Sentinel) - - // if it's empty, we definitely stripped a sentinel, so just be done - // if it's non-empty, we can't know without expensive comparisons, so fall through - if (as2.isEmpty) - Pull.eval(closedLatch.complete(()).void) - else - Pull.output(Chunk.seq(as2.asInstanceOf[List[A]])) >> loop - } else { - Pull.output(Chunk.seq(as.asInstanceOf[List[A]])) >> loop - } - } - } - - Pull.eval(pullF).flatten + } } - loop.stream + Pull.eval(pullF).flatten } - // closedLatch solely exists to support this function - val closed: F[Unit] = closedLatch.get + loop.stream } + + // closedLatch solely exists to support this function + val closed: F[Unit] = closedLatch.get + } } } From 9c8e420667f86b5d16a3841c6b8f5f98c91ed4ef Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Wed, 26 Oct 2022 15:08:35 -0600 Subject: [PATCH 23/23] Eagerly fulfill `closed` --- .../src/main/scala/fs2/concurrent/Channel.scala | 10 +++++----- .../src/test/scala/fs2/concurrent/ChannelSuite.scala | 12 ++++++++++++ 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 62c582c080..6996163742 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -166,10 +166,10 @@ object Channel { val close: F[Either[Channel.Closed, Unit]] = { val modifyF = stateR.modify { case State(0, false) => - State(0, true) -> q.offer(Sentinel).start.as(RightUnit) + State(0, true) -> closedLatch.complete(()) *> q.offer(Sentinel).start.as(RightUnit) case State(leases, false) => - State(leases, true) -> RightUnit.pure[F] + State(leases, true) -> closedLatch.complete(()).as(RightUnit) case st @ State(_, true) => st -> LeftClosedF @@ -239,7 +239,7 @@ object Channel { val fallback = q.take.map { a => // if we get the sentinel, shut down all the things, otherwise emit if (a eq Sentinel) - Pull.eval(closedLatch.complete(()).void) + Pull.done else Pull.output1(a.asInstanceOf[A]) >> loop } @@ -248,7 +248,7 @@ object Channel { // if we're all done, complete the latch and terminate the stream isQuiesced.map { b => if (b) - Pull.eval(closedLatch.complete(()).void) + Pull.done else Pull.eval(fallback).flatten } @@ -265,7 +265,7 @@ object Channel { // if it's empty, we definitely stripped a sentinel, so just be done // if it's non-empty, we can't know without expensive comparisons, so fall through if (as2.isEmpty) - Pull.eval(closedLatch.complete(()).void) + Pull.done else Pull.output(Chunk.seq(as2.asInstanceOf[List[A]])) >> loop } else { diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index d8eca5dbaf..ce250c0e07 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -239,4 +239,16 @@ class ChannelSuite extends Fs2Suite { test.parReplicateA(100) } + + test("complete closed immediately without draining") { + val test = Channel.bounded[IO, Int](20).flatMap { ch => + for { + _ <- 0.until(10).toList.parTraverse_(ch.send(_)) + _ <- ch.close + _ <- ch.closed + } yield () + } + + TestControl.executeEmbed(test) + } }