diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 68345542ec..d66c14c2eb 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -27,10 +27,12 @@ import scala.concurrent.duration._ import cats.{Eval => _, _} import cats.data.Ior import cats.effect.Concurrent +import cats.effect.IO import cats.effect.kernel._ import cats.effect.kernel.implicits._ import cats.effect.std.{Console, CountDownLatch, Queue, QueueSink, QueueSource, Semaphore} import cats.effect.Resource.ExitCase +import cats.effect.unsafe.IORuntime import cats.syntax.all._ import fs2.compat._ import fs2.concurrent._ @@ -38,6 +40,8 @@ import fs2.internal._ import org.typelevel.scalaccompat.annotation._ import Pull.StreamPullOps +import java.util.concurrent.Flow.{Publisher, Subscriber} + /** A stream producing output of type `O` and which may evaluate `F` effects. * * - '''Purely functional''' a value of type `Stream[F, O]` _describes_ an effectful computation. @@ -2747,6 +2751,16 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, go(Chunk.empty, this).stream } + /** Subscribes the provided [[Subscriber]] to this stream. + * + * The returned stream will run until all the stream elements were consumed. + * Canceling this stream will gracefully shutdown the subscription. + * + * @param subscriber the [[Subscriber]] that will receive the elements of the stream. + */ + def subscribe[F2[x] >: F[x]: Async, O2 >: O](subscriber: Subscriber[O2]): Stream[F2, Nothing] = + interop.flow.subscribeAsStream[F2, O2](this, subscriber) + /** Emits all elements of the input except the first one. * * @example {{{ @@ -2824,6 +2838,28 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, .widen[Either[Throwable, Unit]] ) + /** Creates a [[Publisher]] from this [[Stream]]. + * + * The stream is only ran when elements are requested. + * Ending the [[Stream]] means not accepting new subscriptions, + * but waiting for all active ones to finish consuming. + * Canceling the [[Stream]] means gracefully shutting down all active subscriptions. + * Thus, no more elements will be published. + * + * @note This [[Publisher]] can be reused for multiple [[Subscribers]], + * each [[Subscription]] will re-run the [[Stream]] from the beginning. + * + * @see [[toPublisherResource]] for a version that returns a [[Resource]] + * @see [[unsafeToPublisher]] for an unsafe version that returns a plain [[Publisher]]. + * @see [[subscribe]] for a simpler version that only requires a [[Subscriber]]. + */ + def toPublisher[F2[x] >: F[x], O2 >: O](implicit F: Async[F2]): Stream[F2, Publisher[O2]] = + Stream.resource(toPublisherResource[F2, O2]) + + /** @see [[toPublisher]] */ + def toPublisherResource[F2[x] >: F[x]: Async, O2 >: O]: Resource[F2, Publisher[O2]] = + interop.flow.toPublisher(this) + /** Translates effect type from `F` to `G` using the supplied `FunctionK`. */ def translate[F2[x] >: F[x], G[_]](u: F2 ~> G): Stream[G, O] = @@ -3698,6 +3734,36 @@ object Stream extends StreamLowPriority { await } + /** Creates a [[Stream]] from a [[Publisher]]. + * + * @example {{{ + * scala> import cats.effect.IO + * scala> import java.util.concurrent.Flow.Publisher + * scala> + * scala> def getThirdPartyPublisher(): Publisher[Int] = ??? + * scala> + * scala> // Interop with the third party library. + * scala> Stream.eval(IO.delay(getThirdPartyPublisher())).flatMap { publisher => + * | Stream.fromPublisher[IO](publisher, chunkSize = 16) + * | } + * res0: Stream[IO, Int] = Stream(..) + * }}} + * + * @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run. + * + * @see the `toStream` extension method added to `Publisher` + * + * @param publisher The [[Publisher]] to consume. + * @param chunkSize setup the number of elements asked each time from the [[Publisher]]. + * A high number may be useful if the publisher is triggering from IO, + * like requesting elements from a database. + * A high number will also lead to more elements in memory. + * The stream will not emit new element until, + * either the `Chunk` is filled or the publisher finishes. + */ + def fromPublisher[F[_]]: interop.flow.syntax.FromPublisherPartiallyApplied[F] = + interop.flow.fromPublisher + /** Like `emits`, but works for any G that has a `Foldable` instance. */ def foldable[F[x] >: Pure[x], G[_]: Foldable, O](os: G[O]): Stream[F, O] = @@ -4415,6 +4481,24 @@ object Stream extends StreamLowPriority { def toVector: Either[Throwable, Vector[O]] = to(Vector) } + /** Provides syntax for `IO` streams. */ + implicit final class IOOps[A](private val self: Stream[IO, A]) extends AnyVal { + + /** Creates a [[Publisher]] from this [[Stream]]. + * + * The stream is only ran when elements are requested. + * + * @note This [[Publisher]] can be reused for multiple [[Subscribers]], + * each [[Subscription]] will re-run the [[Stream]] from the beginning. + * + * @see [[toPublisher]] for a safe version that returns a [[Stream]]. + */ + def unsafeToPublisher()(implicit + runtime: IORuntime + ): Publisher[A] = + interop.flow.unsafeToPublisher(self) + } + /** Projection of a `Stream` providing various ways to get a `Pull` from the `Stream`. */ final class ToPull[F[_], O] private[Stream] ( private val self: Stream[F, O] diff --git a/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala b/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala index 2b6822ffbd..b93ce35a82 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala @@ -57,7 +57,7 @@ private[flow] sealed abstract class StreamPublisher[F[_], A] private ( val subscription = StreamSubscription(stream, subscriber) subscriber.onSubscribe(subscription) try - runSubscription(subscription.run) + runSubscription(subscription.run.compile.drain) catch { case _: IllegalStateException | _: RejectedExecutionException => subscriber.onError(StreamPublisher.CanceledStreamPublisherException) diff --git a/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala b/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala index b2ced427d1..a076752b73 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala @@ -23,8 +23,8 @@ package fs2 package interop package flow -import cats.effect.kernel.{Async, Outcome} -import cats.effect.syntax.all._ +import cats.effect.kernel.Async +import cats.effect.Resource.ExitCase import cats.syntax.all._ import java.util.concurrent.CancellationException @@ -60,7 +60,7 @@ private[flow] final class StreamSubscription[F[_], A] private ( // This is a def rather than a val, because it is only used once. // And having fields increase the instantiation cost and delay garbage collection. - def run: F[Unit] = { + def run: Stream[F, Nothing] = { val subscriptionPipe: Pipe[F, A, A] = in => { def go(s: Stream[F, A]): Pull[F, A, Unit] = Pull.eval(F.delay(requests.getAndSet(0))).flatMap { n => @@ -101,8 +101,14 @@ private[flow] final class StreamSubscription[F[_], A] private ( .through(subscriptionPipe) .chunks .foreach(chunk => F.delay(chunk.foreach(subscriber.onNext))) - .compile - .drain + .onFinalizeCase { + case ExitCase.Succeeded => F.delay(onComplete()) + case ExitCase.Errored(ex) => F.delay(onError(ex)) + case ExitCase.Canceled => + // if the subscriber canceled us, no further action necessary + // if we were externally canceled, this is handled below + F.unit + } val cancellation = F.asyncCheckAttempt[Unit] { cb => F.delay { @@ -116,18 +122,12 @@ private[flow] final class StreamSubscription[F[_], A] private ( } events - .race(cancellation) - .guaranteeCase { - case Outcome.Succeeded(result) => - result.flatMap { - case Left(()) => F.delay(onComplete()) // Events finished normally. - case Right(()) => F.unit // Events was canceled. - } - case Outcome.Errored(ex) => F.delay(onError(ex)) - case Outcome.Canceled() => + .mergeHaltBoth(Stream.exec(cancellation)) + .onFinalizeCase { + case ExitCase.Canceled => F.delay(onError(new CancellationException("StreamSubscription.run was canceled"))) + case _ => F.unit } - .void } // According to the spec, it's acceptable for a concurrent cancel to not @@ -202,9 +202,8 @@ private[flow] object StreamSubscription { subscriber: Subscriber[A] )(implicit F: Async[F] - ): F[Unit] = - F.delay(apply(stream, subscriber)).flatMap { subscription => - F.delay(subscriber.onSubscribe(subscription)) >> - subscription.run + ): Stream[F, Nothing] = + Stream.eval(F.delay(apply(stream, subscriber))).flatMap { subscription => + Stream.eval(F.delay(subscriber.onSubscribe(subscription))) >> subscription.run } } diff --git a/core/shared/src/main/scala/fs2/interop/flow/package.scala b/core/shared/src/main/scala/fs2/interop/flow/package.scala index 25eba9329f..32932c5cd9 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/package.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/package.scala @@ -31,15 +31,14 @@ import java.util.concurrent.Flow.{Publisher, Subscriber, defaultBufferSize} /** Implementation of the reactive-streams protocol for fs2; based on Java Flow. * * @example {{{ - * scala> import cats.effect.{IO, Resource} + * scala> import cats.effect.IO * scala> import fs2.Stream - * scala> import fs2.interop.flow.syntax._ * scala> import java.util.concurrent.Flow.Publisher * scala> * scala> val upstream: Stream[IO, Int] = Stream(1, 2, 3).covary[IO] - * scala> val publisher: Resource[IO, Publisher[Int]] = upstream.toPublisher - * scala> val downstream: Stream[IO, Int] = Stream.resource(publisher).flatMap { publisher => - * | publisher.toStream[IO](chunkSize = 16) + * scala> val publisher: Stream[IO, Publisher[Int]] = upstream.toPublisher + * scala> val downstream: Stream[IO, Int] = publisher.flatMap { publisher => + * | Stream.fromPublisher[IO](publisher, chunkSize = 16) * | } * scala> * scala> import cats.effect.unsafe.implicits.global @@ -48,6 +47,7 @@ import java.util.concurrent.Flow.{Publisher, Subscriber, defaultBufferSize} * }}} * * @see [[java.util.concurrent.Flow]] + * @deprecated All syntax has been moved directly onto [[Stream]]. */ package object flow { @@ -189,6 +189,14 @@ package object flow { )(implicit F: Async[F] ): F[Unit] = + subscribeAsStream(stream, subscriber).compile.drain + + private[fs2] def subscribeAsStream[F[_], A]( + stream: Stream[F, A], + subscriber: Subscriber[A] + )(implicit + F: Async[F] + ): Stream[F, Nothing] = StreamSubscription.subscribe(stream, subscriber) /** A default value for the `chunkSize` argument, diff --git a/core/shared/src/main/scala/fs2/interop/flow/syntax.scala b/core/shared/src/main/scala/fs2/interop/flow/syntax.scala index 5c6ee97004..ae6c367064 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/syntax.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/syntax.scala @@ -23,93 +23,29 @@ package fs2 package interop package flow -import cats.effect.IO import cats.effect.kernel.{Async, Resource} -import cats.effect.unsafe.IORuntime import java.util.concurrent.Flow.{Publisher, Subscriber} object syntax { implicit final class PublisherOps[A](private val publisher: Publisher[A]) extends AnyVal { - /** Creates a [[Stream]] from a [[Publisher]]. - * - * @example {{{ - * scala> import cats.effect.IO - * scala> import fs2.Stream - * scala> import fs2.interop.flow.syntax._ - * scala> import java.util.concurrent.Flow.Publisher - * scala> - * scala> def getThirdPartyPublisher(): Publisher[Int] = ??? - * scala> - * scala> // Interop with the third party library. - * scala> Stream.eval(IO.delay(getThirdPartyPublisher())).flatMap { publisher => - * | publisher.toStream[IO](chunkSize = 16) - * | } - * res0: Stream[IO, Int] = Stream(..) - * }}} - * - * @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run. - * - * @param chunkSize setup the number of elements asked each time from the [[Publisher]]. - * A high number may be useful if the publisher is triggering from IO, - * like requesting elements from a database. - * A high number will also lead to more elements in memory. - * The stream will not emit new element until, - * either the `Chunk` is filled or the publisher finishes. - */ + @deprecated("Use Stream.fromPublisher", "3.9.4") def toStream[F[_]](chunkSize: Int)(implicit F: Async[F]): Stream[F, A] = flow.fromPublisher(publisher, chunkSize) } implicit final class StreamOps[F[_], A](private val stream: Stream[F, A]) extends AnyVal { - /** Creates a [[Publisher]] from a [[Stream]]. - * - * The stream is only ran when elements are requested. - * Closing the [[Resource]] means not accepting new subscriptions, - * but waiting for all active ones to finish consuming. - * Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions. - * Thus, no more elements will be published. - * - * @note This [[Publisher]] can be reused for multiple [[Subscribers]], - * each [[Subscription]] will re-run the [[Stream]] from the beginning. - * - * @see [[unsafeToPublisher]] for an unsafe version that returns a plain [[Publisher]]. - * @see [[subscribe]] for a simpler version that only requires a [[Subscriber]]. - */ + @deprecated("Use Stream#toPublisherResource", "3.9.4") def toPublisher(implicit F: Async[F]): Resource[F, Publisher[A]] = flow.toPublisher(stream) - /** Subscribes the provided [[Subscriber]] to this stream. - * - * The returned program will run until - * all the stream elements were consumed. - * Cancelling this program will gracefully shutdown the subscription. - * - * @param subscriber the [[Subscriber]] that will receive the elements of the stream. - */ + @deprecated("Use Stream#subscribe", "3.9.4") def subscribe(subscriber: Subscriber[A])(implicit F: Async[F]): F[Unit] = flow.subscribeStream(stream, subscriber) } - implicit final class StreamIOOps[A](private val stream: Stream[IO, A]) extends AnyVal { - - /** Creates a [[Publisher]] from a [[Stream]]. - * - * The stream is only ran when elements are requested. - * - * @note This [[Publisher]] can be reused for multiple [[Subscribers]], - * each [[Subscription]] will re-run the [[Stream]] from the beginning. - * - * @see [[toPublisher]] for a safe version that returns a [[Resource]]. - */ - def unsafeToPublisher()(implicit - runtime: IORuntime - ): Publisher[A] = - flow.unsafeToPublisher(stream) - } - final class FromPublisherPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal { def apply[A]( publisher: Publisher[A], diff --git a/core/shared/src/test/scala/fs2/interop/flow/CancellationSpec.scala b/core/shared/src/test/scala/fs2/interop/flow/CancellationSpec.scala index d7e6283488..b76dc7a933 100644 --- a/core/shared/src/test/scala/fs2/interop/flow/CancellationSpec.scala +++ b/core/shared/src/test/scala/fs2/interop/flow/CancellationSpec.scala @@ -54,7 +54,7 @@ class CancellationSpec extends Fs2Suite { val subscriber = new DummySubscriber(flag, program) IO(StreamSubscription(s, subscriber)).flatMap { subscription => ( - subscription.run, + subscription.run.compile.drain, IO(subscriber.onSubscribe(subscription)) ).parTupled } >>