Skip to content

Commit

Permalink
Merge pull request #3346 from armanbilge/topic/deprecate-flow-syntax
Browse files Browse the repository at this point in the history
Move `j.u.c.Flow` interop methods onto Stream
  • Loading branch information
armanbilge authored Jan 17, 2024
2 parents 5ac8f7d + b4c9822 commit 70c754d
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 93 deletions.
84 changes: 84 additions & 0 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,21 @@ import scala.concurrent.duration._
import cats.{Eval => _, _}
import cats.data.Ior
import cats.effect.Concurrent
import cats.effect.IO
import cats.effect.kernel._
import cats.effect.kernel.implicits._
import cats.effect.std.{Console, CountDownLatch, Queue, QueueSink, QueueSource, Semaphore}
import cats.effect.Resource.ExitCase
import cats.effect.unsafe.IORuntime
import cats.syntax.all._
import fs2.compat._
import fs2.concurrent._
import fs2.internal._
import org.typelevel.scalaccompat.annotation._
import Pull.StreamPullOps

import java.util.concurrent.Flow.{Publisher, Subscriber}

/** A stream producing output of type `O` and which may evaluate `F` effects.
*
* - '''Purely functional''' a value of type `Stream[F, O]` _describes_ an effectful computation.
Expand Down Expand Up @@ -2747,6 +2751,16 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
go(Chunk.empty, this).stream
}

/** Subscribes the provided [[Subscriber]] to this stream.
*
* The returned stream will run until all the stream elements were consumed.
* Canceling this stream will gracefully shutdown the subscription.
*
* @param subscriber the [[Subscriber]] that will receive the elements of the stream.
*/
def subscribe[F2[x] >: F[x]: Async, O2 >: O](subscriber: Subscriber[O2]): Stream[F2, Nothing] =
interop.flow.subscribeAsStream[F2, O2](this, subscriber)

/** Emits all elements of the input except the first one.
*
* @example {{{
Expand Down Expand Up @@ -2824,6 +2838,28 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
.widen[Either[Throwable, Unit]]
)

/** Creates a [[Publisher]] from this [[Stream]].
*
* The stream is only ran when elements are requested.
* Ending the [[Stream]] means not accepting new subscriptions,
* but waiting for all active ones to finish consuming.
* Canceling the [[Stream]] means gracefully shutting down all active subscriptions.
* Thus, no more elements will be published.
*
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
*
* @see [[toPublisherResource]] for a version that returns a [[Resource]]
* @see [[unsafeToPublisher]] for an unsafe version that returns a plain [[Publisher]].
* @see [[subscribe]] for a simpler version that only requires a [[Subscriber]].
*/
def toPublisher[F2[x] >: F[x], O2 >: O](implicit F: Async[F2]): Stream[F2, Publisher[O2]] =
Stream.resource(toPublisherResource[F2, O2])

/** @see [[toPublisher]] */
def toPublisherResource[F2[x] >: F[x]: Async, O2 >: O]: Resource[F2, Publisher[O2]] =
interop.flow.toPublisher(this)

/** Translates effect type from `F` to `G` using the supplied `FunctionK`.
*/
def translate[F2[x] >: F[x], G[_]](u: F2 ~> G): Stream[G, O] =
Expand Down Expand Up @@ -3698,6 +3734,36 @@ object Stream extends StreamLowPriority {
await
}

/** Creates a [[Stream]] from a [[Publisher]].
*
* @example {{{
* scala> import cats.effect.IO
* scala> import java.util.concurrent.Flow.Publisher
* scala>
* scala> def getThirdPartyPublisher(): Publisher[Int] = ???
* scala>
* scala> // Interop with the third party library.
* scala> Stream.eval(IO.delay(getThirdPartyPublisher())).flatMap { publisher =>
* | Stream.fromPublisher[IO](publisher, chunkSize = 16)
* | }
* res0: Stream[IO, Int] = Stream(..)
* }}}
*
* @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run.
*
* @see the `toStream` extension method added to `Publisher`
*
* @param publisher The [[Publisher]] to consume.
* @param chunkSize setup the number of elements asked each time from the [[Publisher]].
* A high number may be useful if the publisher is triggering from IO,
* like requesting elements from a database.
* A high number will also lead to more elements in memory.
* The stream will not emit new element until,
* either the `Chunk` is filled or the publisher finishes.
*/
def fromPublisher[F[_]]: interop.flow.syntax.FromPublisherPartiallyApplied[F] =
interop.flow.fromPublisher

/** Like `emits`, but works for any G that has a `Foldable` instance.
*/
def foldable[F[x] >: Pure[x], G[_]: Foldable, O](os: G[O]): Stream[F, O] =
Expand Down Expand Up @@ -4415,6 +4481,24 @@ object Stream extends StreamLowPriority {
def toVector: Either[Throwable, Vector[O]] = to(Vector)
}

/** Provides syntax for `IO` streams. */
implicit final class IOOps[A](private val self: Stream[IO, A]) extends AnyVal {

/** Creates a [[Publisher]] from this [[Stream]].
*
* The stream is only ran when elements are requested.
*
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
*
* @see [[toPublisher]] for a safe version that returns a [[Stream]].
*/
def unsafeToPublisher()(implicit
runtime: IORuntime
): Publisher[A] =
interop.flow.unsafeToPublisher(self)
}

/** Projection of a `Stream` providing various ways to get a `Pull` from the `Stream`. */
final class ToPull[F[_], O] private[Stream] (
private val self: Stream[F, O]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private[flow] sealed abstract class StreamPublisher[F[_], A] private (
val subscription = StreamSubscription(stream, subscriber)
subscriber.onSubscribe(subscription)
try
runSubscription(subscription.run)
runSubscription(subscription.run.compile.drain)
catch {
case _: IllegalStateException | _: RejectedExecutionException =>
subscriber.onError(StreamPublisher.CanceledStreamPublisherException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ package fs2
package interop
package flow

import cats.effect.kernel.{Async, Outcome}
import cats.effect.syntax.all._
import cats.effect.kernel.Async
import cats.effect.Resource.ExitCase
import cats.syntax.all._

import java.util.concurrent.CancellationException
Expand Down Expand Up @@ -60,7 +60,7 @@ private[flow] final class StreamSubscription[F[_], A] private (

// This is a def rather than a val, because it is only used once.
// And having fields increase the instantiation cost and delay garbage collection.
def run: F[Unit] = {
def run: Stream[F, Nothing] = {
val subscriptionPipe: Pipe[F, A, A] = in => {
def go(s: Stream[F, A]): Pull[F, A, Unit] =
Pull.eval(F.delay(requests.getAndSet(0))).flatMap { n =>
Expand Down Expand Up @@ -101,8 +101,14 @@ private[flow] final class StreamSubscription[F[_], A] private (
.through(subscriptionPipe)
.chunks
.foreach(chunk => F.delay(chunk.foreach(subscriber.onNext)))
.compile
.drain
.onFinalizeCase {
case ExitCase.Succeeded => F.delay(onComplete())
case ExitCase.Errored(ex) => F.delay(onError(ex))
case ExitCase.Canceled =>
// if the subscriber canceled us, no further action necessary
// if we were externally canceled, this is handled below
F.unit
}

val cancellation = F.asyncCheckAttempt[Unit] { cb =>
F.delay {
Expand All @@ -116,18 +122,12 @@ private[flow] final class StreamSubscription[F[_], A] private (
}

events
.race(cancellation)
.guaranteeCase {
case Outcome.Succeeded(result) =>
result.flatMap {
case Left(()) => F.delay(onComplete()) // Events finished normally.
case Right(()) => F.unit // Events was canceled.
}
case Outcome.Errored(ex) => F.delay(onError(ex))
case Outcome.Canceled() =>
.mergeHaltBoth(Stream.exec(cancellation))
.onFinalizeCase {
case ExitCase.Canceled =>
F.delay(onError(new CancellationException("StreamSubscription.run was canceled")))
case _ => F.unit
}
.void
}

// According to the spec, it's acceptable for a concurrent cancel to not
Expand Down Expand Up @@ -202,9 +202,8 @@ private[flow] object StreamSubscription {
subscriber: Subscriber[A]
)(implicit
F: Async[F]
): F[Unit] =
F.delay(apply(stream, subscriber)).flatMap { subscription =>
F.delay(subscriber.onSubscribe(subscription)) >>
subscription.run
): Stream[F, Nothing] =
Stream.eval(F.delay(apply(stream, subscriber))).flatMap { subscription =>
Stream.eval(F.delay(subscriber.onSubscribe(subscription))) >> subscription.run
}
}
18 changes: 13 additions & 5 deletions core/shared/src/main/scala/fs2/interop/flow/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@ import java.util.concurrent.Flow.{Publisher, Subscriber, defaultBufferSize}
/** Implementation of the reactive-streams protocol for fs2; based on Java Flow.
*
* @example {{{
* scala> import cats.effect.{IO, Resource}
* scala> import cats.effect.IO
* scala> import fs2.Stream
* scala> import fs2.interop.flow.syntax._
* scala> import java.util.concurrent.Flow.Publisher
* scala>
* scala> val upstream: Stream[IO, Int] = Stream(1, 2, 3).covary[IO]
* scala> val publisher: Resource[IO, Publisher[Int]] = upstream.toPublisher
* scala> val downstream: Stream[IO, Int] = Stream.resource(publisher).flatMap { publisher =>
* | publisher.toStream[IO](chunkSize = 16)
* scala> val publisher: Stream[IO, Publisher[Int]] = upstream.toPublisher
* scala> val downstream: Stream[IO, Int] = publisher.flatMap { publisher =>
* | Stream.fromPublisher[IO](publisher, chunkSize = 16)
* | }
* scala>
* scala> import cats.effect.unsafe.implicits.global
Expand All @@ -48,6 +47,7 @@ import java.util.concurrent.Flow.{Publisher, Subscriber, defaultBufferSize}
* }}}
*
* @see [[java.util.concurrent.Flow]]
* @deprecated All syntax has been moved directly onto [[Stream]].
*/
package object flow {

Expand Down Expand Up @@ -189,6 +189,14 @@ package object flow {
)(implicit
F: Async[F]
): F[Unit] =
subscribeAsStream(stream, subscriber).compile.drain

private[fs2] def subscribeAsStream[F[_], A](
stream: Stream[F, A],
subscriber: Subscriber[A]
)(implicit
F: Async[F]
): Stream[F, Nothing] =
StreamSubscription.subscribe(stream, subscriber)

/** A default value for the `chunkSize` argument,
Expand Down
70 changes: 3 additions & 67 deletions core/shared/src/main/scala/fs2/interop/flow/syntax.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,93 +23,29 @@ package fs2
package interop
package flow

import cats.effect.IO
import cats.effect.kernel.{Async, Resource}
import cats.effect.unsafe.IORuntime

import java.util.concurrent.Flow.{Publisher, Subscriber}

object syntax {
implicit final class PublisherOps[A](private val publisher: Publisher[A]) extends AnyVal {

/** Creates a [[Stream]] from a [[Publisher]].
*
* @example {{{
* scala> import cats.effect.IO
* scala> import fs2.Stream
* scala> import fs2.interop.flow.syntax._
* scala> import java.util.concurrent.Flow.Publisher
* scala>
* scala> def getThirdPartyPublisher(): Publisher[Int] = ???
* scala>
* scala> // Interop with the third party library.
* scala> Stream.eval(IO.delay(getThirdPartyPublisher())).flatMap { publisher =>
* | publisher.toStream[IO](chunkSize = 16)
* | }
* res0: Stream[IO, Int] = Stream(..)
* }}}
*
* @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run.
*
* @param chunkSize setup the number of elements asked each time from the [[Publisher]].
* A high number may be useful if the publisher is triggering from IO,
* like requesting elements from a database.
* A high number will also lead to more elements in memory.
* The stream will not emit new element until,
* either the `Chunk` is filled or the publisher finishes.
*/
@deprecated("Use Stream.fromPublisher", "3.9.4")
def toStream[F[_]](chunkSize: Int)(implicit F: Async[F]): Stream[F, A] =
flow.fromPublisher(publisher, chunkSize)
}

implicit final class StreamOps[F[_], A](private val stream: Stream[F, A]) extends AnyVal {

/** Creates a [[Publisher]] from a [[Stream]].
*
* The stream is only ran when elements are requested.
* Closing the [[Resource]] means not accepting new subscriptions,
* but waiting for all active ones to finish consuming.
* Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions.
* Thus, no more elements will be published.
*
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
*
* @see [[unsafeToPublisher]] for an unsafe version that returns a plain [[Publisher]].
* @see [[subscribe]] for a simpler version that only requires a [[Subscriber]].
*/
@deprecated("Use Stream#toPublisherResource", "3.9.4")
def toPublisher(implicit F: Async[F]): Resource[F, Publisher[A]] =
flow.toPublisher(stream)

/** Subscribes the provided [[Subscriber]] to this stream.
*
* The returned program will run until
* all the stream elements were consumed.
* Cancelling this program will gracefully shutdown the subscription.
*
* @param subscriber the [[Subscriber]] that will receive the elements of the stream.
*/
@deprecated("Use Stream#subscribe", "3.9.4")
def subscribe(subscriber: Subscriber[A])(implicit F: Async[F]): F[Unit] =
flow.subscribeStream(stream, subscriber)
}

implicit final class StreamIOOps[A](private val stream: Stream[IO, A]) extends AnyVal {

/** Creates a [[Publisher]] from a [[Stream]].
*
* The stream is only ran when elements are requested.
*
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
*
* @see [[toPublisher]] for a safe version that returns a [[Resource]].
*/
def unsafeToPublisher()(implicit
runtime: IORuntime
): Publisher[A] =
flow.unsafeToPublisher(stream)
}

final class FromPublisherPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal {
def apply[A](
publisher: Publisher[A],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class CancellationSpec extends Fs2Suite {
val subscriber = new DummySubscriber(flag, program)
IO(StreamSubscription(s, subscriber)).flatMap { subscription =>
(
subscription.run,
subscription.run.compile.drain,
IO(subscriber.onSubscribe(subscription))
).parTupled
} >>
Expand Down

0 comments on commit 70c754d

Please sign in to comment.