Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move j.u.c.Flow interop methods onto Stream #3346

Merged
merged 7 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,21 @@ import scala.concurrent.duration._
import cats.{Eval => _, _}
import cats.data.Ior
import cats.effect.Concurrent
import cats.effect.IO
import cats.effect.kernel._
import cats.effect.kernel.implicits._
import cats.effect.std.{Console, CountDownLatch, Queue, QueueSink, QueueSource, Semaphore}
import cats.effect.Resource.ExitCase
import cats.effect.unsafe.IORuntime
import cats.syntax.all._
import fs2.compat._
import fs2.concurrent._
import fs2.internal._
import org.typelevel.scalaccompat.annotation._
import Pull.StreamPullOps

import java.util.concurrent.Flow.{Publisher, Subscriber}

/** A stream producing output of type `O` and which may evaluate `F` effects.
*
* - '''Purely functional''' a value of type `Stream[F, O]` _describes_ an effectful computation.
Expand Down Expand Up @@ -2747,6 +2751,16 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
go(Chunk.empty, this).stream
}

/** Subscribes the provided [[Subscriber]] to this stream.
*
* The returned stream will run until all the stream elements were consumed.
* Canceling this stream will gracefully shutdown the subscription.
*
* @param subscriber the [[Subscriber]] that will receive the elements of the stream.
*/
def subscribe[F2[x] >: F[x]: Async, O2 >: O](subscriber: Subscriber[O2]): Stream[F2, Nothing] =
interop.flow.subscribeAsStream[F2, O2](this, subscriber)

/** Emits all elements of the input except the first one.
*
* @example {{{
Expand Down Expand Up @@ -2824,6 +2838,28 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
.widen[Either[Throwable, Unit]]
)

/** Creates a [[Publisher]] from this [[Stream]].
*
* The stream is only ran when elements are requested.
* Closing the [[Resource]] means not accepting new subscriptions,
* but waiting for all active ones to finish consuming.
* Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions.
* Thus, no more elements will be published.
*
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
*
* @see [[toPublisherResource]] for a version that returns a [[Resource]]
* @see [[unsafeToPublisher]] for an unsafe version that returns a plain [[Publisher]].
* @see [[subscribe]] for a simpler version that only requires a [[Subscriber]].
*/
def toPublisher[F2[x] >: F[x], O2 >: O](implicit F: Async[F2]): Stream[F2, Publisher[O2]] =
Stream.resource(toPublisherResource[F2, O2])

/** @see [[toPublisher]] */
def toPublisherResource[F2[x] >: F[x]: Async, O2 >: O]: Resource[F2, Publisher[O2]] =
interop.flow.toPublisher(this)

/** Translates effect type from `F` to `G` using the supplied `FunctionK`.
*/
def translate[F2[x] >: F[x], G[_]](u: F2 ~> G): Stream[G, O] =
Expand Down Expand Up @@ -3698,6 +3734,36 @@ object Stream extends StreamLowPriority {
await
}

/** Creates a [[Stream]] from a [[Publisher]].
*
* @example {{{
* scala> import cats.effect.IO
* scala> import java.util.concurrent.Flow.Publisher
* scala>
* scala> def getThirdPartyPublisher(): Publisher[Int] = ???
* scala>
* scala> // Interop with the third party library.
* scala> Stream.eval(IO.delay(getThirdPartyPublisher())).flatMap { publisher =>
* | Stream.fromPublisher[IO](publisher, chunkSize = 16)
* | }
* res0: Stream[IO, Int] = Stream(..)
* }}}
*
* @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run.
*
* @see the `toStream` extension method added to `Publisher`
*
* @param publisher The [[Publisher]] to consume.
* @param chunkSize setup the number of elements asked each time from the [[Publisher]].
* A high number may be useful if the publisher is triggering from IO,
* like requesting elements from a database.
* A high number will also lead to more elements in memory.
* The stream will not emit new element until,
* either the `Chunk` is filled or the publisher finishes.
*/
def fromPublisher[F[_]]: interop.flow.syntax.FromPublisherPartiallyApplied[F] =
interop.flow.fromPublisher

/** Like `emits`, but works for any G that has a `Foldable` instance.
*/
def foldable[F[x] >: Pure[x], G[_]: Foldable, O](os: G[O]): Stream[F, O] =
Expand Down Expand Up @@ -4415,6 +4481,24 @@ object Stream extends StreamLowPriority {
def toVector: Either[Throwable, Vector[O]] = to(Vector)
}

/** Provides syntax for `IO` streams. */
implicit final class IOOps[A](private val self: Stream[IO, A]) extends AnyVal {

/** Creates a [[Publisher]] from this [[Stream]].
*
* The stream is only ran when elements are requested.
*
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
*
* @see [[toPublisher]] for a safe version that returns a [[Stream]].
*/
def unsafeToPublisher()(implicit
runtime: IORuntime
): Publisher[A] =
interop.flow.unsafeToPublisher(self)
}
Comment on lines +4484 to +4500
Copy link
Member Author

@armanbilge armanbilge Nov 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the one I'm most interested in. It is the Stream analog of io.unsafeToFuture().

The motivation is to facilitate interop with other libraries e.g. raquo/Airstream#114:

EventStream.fromPublisher(fs2Stream.unsafeToPublisher())


/** Projection of a `Stream` providing various ways to get a `Pull` from the `Stream`. */
final class ToPull[F[_], O] private[Stream] (
private val self: Stream[F, O]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[flow] sealed abstract class StreamPublisher[F[_], A] private (
)
try
runSubscription(
StreamSubscription.subscribe(stream, subscriber)
StreamSubscription.subscribe(stream, subscriber).compile.drain
)
catch {
case _: IllegalStateException | _: RejectedExecutionException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ package fs2
package interop
package flow

import cats.effect.kernel.{Async, Outcome}
import cats.effect.syntax.all._
import cats.effect.kernel.Async
import cats.effect.Resource.ExitCase
import cats.syntax.all._

import java.util.concurrent.CancellationException
Expand Down Expand Up @@ -60,7 +60,7 @@ private[flow] final class StreamSubscription[F[_], A] private (

// This is a def rather than a val, because it is only used once.
// And having fields increase the instantiation cost and delay garbage collection.
def run: F[Unit] = {
def run: Stream[F, Nothing] = {
val subscriptionPipe: Pipe[F, A, A] = in => {
def go(s: Stream[F, A]): Pull[F, A, Unit] =
Pull.eval(F.delay(requests.get())).flatMap { n =>
Expand Down Expand Up @@ -100,8 +100,14 @@ private[flow] final class StreamSubscription[F[_], A] private (
.through(subscriptionPipe)
.chunks
.foreach(chunk => F.delay(chunk.foreach(sub.onNext)))
.compile
.drain
.onFinalizeCase {
case ExitCase.Succeeded => F.delay(onComplete())
case ExitCase.Errored(ex) => F.delay(onError(ex))
case ExitCase.Canceled =>
// if the subscriber canceled us, no further action necessary
// if we were externally canceled, this is handled below
F.unit
}

val cancellation = F.asyncCheckAttempt[Unit] { cb =>
F.delay {
Expand All @@ -115,18 +121,12 @@ private[flow] final class StreamSubscription[F[_], A] private (
}

events
.race(cancellation)
.guaranteeCase {
case Outcome.Succeeded(result) =>
result.flatMap {
case Left(()) => F.delay(onComplete()) // Events finished normally.
case Right(()) => F.unit // Events was canceled.
}
case Outcome.Errored(ex) => F.delay(onError(ex))
case Outcome.Canceled() =>
.mergeHaltBoth(Stream.exec(cancellation))
.onFinalizeCase {
case ExitCase.Canceled =>
F.delay(onError(new CancellationException("StreamSubscription.run was canceled")))
case _ => F.unit
}
.void
}

// According to the spec, it's acceptable for a concurrent cancel to not
Expand Down Expand Up @@ -192,9 +192,8 @@ private[flow] object StreamSubscription {

def subscribe[F[_], A](stream: Stream[F, A], subscriber: Subscriber[A])(implicit
F: Async[F]
): F[Unit] =
apply(stream, subscriber).flatMap { subscription =>
F.delay(subscriber.onSubscribe(subscription)) >>
subscription.run
): Stream[F, Nothing] =
Stream.eval(apply(stream, subscriber)).flatMap { subscription =>
Stream.eval(F.delay(subscriber.onSubscribe(subscription))) >> subscription.run
Copy link
Contributor

@BalmungSan BalmungSan Nov 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be exec and ++, shouldn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's exec, then subscription.run will never evaluate.

The two options are:

Stream.eval(...) >> subscription.run
Stream.exec(...) ++ subscription.run

The former felt more appropriate to me, it establishes the monadic dependence. Not sure what the subtle differences are.

}
}
18 changes: 13 additions & 5 deletions core/shared/src/main/scala/fs2/interop/flow/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@ import java.util.concurrent.Flow.{Publisher, Subscriber, defaultBufferSize}
/** Implementation of the reactive-streams protocol for fs2; based on Java Flow.
*
* @example {{{
* scala> import cats.effect.{IO, Resource}
* scala> import cats.effect.IO
* scala> import fs2.Stream
* scala> import fs2.interop.flow.syntax._
* scala> import java.util.concurrent.Flow.Publisher
* scala>
* scala> val upstream: Stream[IO, Int] = Stream(1, 2, 3).covary[IO]
* scala> val publisher: Resource[IO, Publisher[Int]] = upstream.toPublisher
* scala> val downstream: Stream[IO, Int] = Stream.resource(publisher).flatMap { publisher =>
* | publisher.toStream[IO](chunkSize = 16)
* scala> val publisher: Stream[IO, Publisher[Int]] = upstream.toPublisher
* scala> val downstream: Stream[IO, Int] = publisher.flatMap { publisher =>
* | Stream.fromPublisher[IO](publisher, chunkSize = 16)
* | }
* scala>
* scala> import cats.effect.unsafe.implicits.global
Expand All @@ -48,6 +47,7 @@ import java.util.concurrent.Flow.{Publisher, Subscriber, defaultBufferSize}
* }}}
*
* @see [[java.util.concurrent.Flow]]
* @deprecated All syntax has been moved directly onto [[Stream]].
*/
package object flow {

Expand Down Expand Up @@ -189,6 +189,14 @@ package object flow {
)(implicit
F: Async[F]
): F[Unit] =
subscribeAsStream(stream, subscriber).compile.drain

private[fs2] def subscribeAsStream[F[_], A](
stream: Stream[F, A],
subscriber: Subscriber[A]
)(implicit
F: Async[F]
): Stream[F, Nothing] =
StreamSubscription.subscribe(stream, subscriber)

/** A default value for the `chunkSize` argument,
Expand Down
70 changes: 3 additions & 67 deletions core/shared/src/main/scala/fs2/interop/flow/syntax.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,93 +23,29 @@ package fs2
package interop
package flow

import cats.effect.IO
import cats.effect.kernel.{Async, Resource}
import cats.effect.unsafe.IORuntime

import java.util.concurrent.Flow.{Publisher, Subscriber}

object syntax {
implicit final class PublisherOps[A](private val publisher: Publisher[A]) extends AnyVal {

/** Creates a [[Stream]] from a [[Publisher]].
*
* @example {{{
* scala> import cats.effect.IO
* scala> import fs2.Stream
* scala> import fs2.interop.flow.syntax._
* scala> import java.util.concurrent.Flow.Publisher
* scala>
* scala> def getThirdPartyPublisher(): Publisher[Int] = ???
* scala>
* scala> // Interop with the third party library.
* scala> Stream.eval(IO.delay(getThirdPartyPublisher())).flatMap { publisher =>
* | publisher.toStream[IO](chunkSize = 16)
* | }
* res0: Stream[IO, Int] = Stream(..)
* }}}
*
* @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run.
*
* @param chunkSize setup the number of elements asked each time from the [[Publisher]].
* A high number may be useful if the publisher is triggering from IO,
* like requesting elements from a database.
* A high number will also lead to more elements in memory.
* The stream will not emit new element until,
* either the `Chunk` is filled or the publisher finishes.
*/
@deprecated("Use Stream.fromPublisher", "3.9.4")
def toStream[F[_]](chunkSize: Int)(implicit F: Async[F]): Stream[F, A] =
flow.fromPublisher(publisher, chunkSize)
}

implicit final class StreamOps[F[_], A](private val stream: Stream[F, A]) extends AnyVal {

/** Creates a [[Publisher]] from a [[Stream]].
*
* The stream is only ran when elements are requested.
* Closing the [[Resource]] means not accepting new subscriptions,
* but waiting for all active ones to finish consuming.
* Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions.
* Thus, no more elements will be published.
*
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
*
* @see [[unsafeToPublisher]] for an unsafe version that returns a plain [[Publisher]].
* @see [[subscribe]] for a simpler version that only requires a [[Subscriber]].
*/
@deprecated("Use Stream#toPublisherResource", "3.9.4")
def toPublisher(implicit F: Async[F]): Resource[F, Publisher[A]] =
flow.toPublisher(stream)

/** Subscribes the provided [[Subscriber]] to this stream.
*
* The returned program will run until
* all the stream elements were consumed.
* Cancelling this program will gracefully shutdown the subscription.
*
* @param subscriber the [[Subscriber]] that will receive the elements of the stream.
*/
@deprecated("Use Stream#subscribe", "3.9.4")
def subscribe(subscriber: Subscriber[A])(implicit F: Async[F]): F[Unit] =
flow.subscribeStream(stream, subscriber)
}

implicit final class StreamIOOps[A](private val stream: Stream[IO, A]) extends AnyVal {

/** Creates a [[Publisher]] from a [[Stream]].
*
* The stream is only ran when elements are requested.
*
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
*
* @see [[toPublisher]] for a safe version that returns a [[Resource]].
*/
def unsafeToPublisher()(implicit
runtime: IORuntime
): Publisher[A] =
flow.unsafeToPublisher(stream)
}

final class FromPublisherPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal {
def apply[A](
publisher: Publisher[A],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class CancellationSpec extends Fs2Suite {
val subscriber = new DummySubscriber(flag, program)
StreamSubscription(s, subscriber).flatMap { subscription =>
(
subscription.run,
subscription.run.compile.drain,
IO(subscriber.onSubscribe(subscription))
).parTupled
} >>
Expand Down