Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flow.unsafeToPublisher #3342

Merged
merged 10 commits into from
Nov 24, 2023
55 changes: 44 additions & 11 deletions core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ package fs2
package interop
package flow

import cats.effect.IO
import cats.effect.kernel.{Async, Resource}
import cats.effect.std.Dispatcher
import cats.effect.unsafe.IORuntime

import java.util.Objects.requireNonNull
import java.util.concurrent.Flow.{Publisher, Subscriber, Subscription}
import java.util.concurrent.RejectedExecutionException
import scala.util.control.NoStackTrace

/** Implementation of a [[Publisher]].
Expand All @@ -39,22 +42,24 @@ import scala.util.control.NoStackTrace
*
* @see [[https://github.com/reactive-streams/reactive-streams-jvm#1-publisher-code]]
*/
private[flow] final class StreamPublisher[F[_], A] private (
stream: Stream[F, A],
startDispatcher: Dispatcher[F]
)(implicit F: Async[F])
extends Publisher[A] {
override def subscribe(subscriber: Subscriber[_ >: A]): Unit = {
private[flow] sealed abstract class StreamPublisher[F[_], A] private (
stream: Stream[F, A]
)(implicit
F: Async[F]
) extends Publisher[A] {
protected def runSubscription(subscribe: F[Unit]): Unit

override final def subscribe(subscriber: Subscriber[_ >: A]): Unit = {
requireNonNull(
subscriber,
"The subscriber provided to subscribe must not be null"
)
try
startDispatcher.unsafeRunAndForget(
runSubscription(
StreamSubscription.subscribe(stream, subscriber)
)
catch {
case _: IllegalStateException =>
case _: IllegalStateException | _: RejectedExecutionException =>
subscriber.onSubscribe(new Subscription {
override def cancel(): Unit = ()
override def request(x$1: Long): Unit = ()
Expand All @@ -65,13 +70,41 @@ private[flow] final class StreamPublisher[F[_], A] private (
}

private[flow] object StreamPublisher {
private final class DispatcherStreamPublisher[F[_], A](
stream: Stream[F, A],
startDispatcher: Dispatcher[F]
)(implicit
F: Async[F]
) extends StreamPublisher[F, A](stream) {
override protected final def runSubscription(subscribe: F[Unit]): Unit =
startDispatcher.unsafeRunAndForget(subscribe)
}

private final class IORuntimeStreamPublisher[A](
stream: Stream[IO, A]
)(implicit
runtime: IORuntime
) extends StreamPublisher[IO, A](stream) {
override protected final def runSubscription(subscribe: IO[Unit]): Unit =
subscribe.unsafeRunAndForget()
}

def apply[F[_], A](
stream: Stream[F, A]
)(implicit F: Async[F]): Resource[F, StreamPublisher[F, A]] =
Dispatcher.parallel[F](await = false).map { startDispatcher =>
new StreamPublisher(stream, startDispatcher)
)(implicit
F: Async[F]
): Resource[F, StreamPublisher[F, A]] =
Dispatcher.parallel[F](await = true).map { startDispatcher =>
new DispatcherStreamPublisher(stream, startDispatcher)
}

def unsafe[A](
stream: Stream[IO, A]
)(implicit
runtime: IORuntime
): StreamPublisher[IO, A] =
new IORuntimeStreamPublisher(stream)

private object CanceledStreamPublisherException
extends IllegalStateException(
"This StreamPublisher is not longer accepting subscribers"
Expand Down
28 changes: 14 additions & 14 deletions core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,49 +50,49 @@ private[flow] final class StreamSubscriber[F[_], A] private (
// Subscriber API.

/** Receives a subscription from the upstream reactive-streams system. */
override def onSubscribe(subscription: Subscription): Unit = {
override final def onSubscribe(subscription: Subscription): Unit = {
requireNonNull(
subscription,
"The subscription provided to onSubscribe must not be null"
)
nextState(Subscribe(subscription))
nextState(input = Subscribe(subscription))
}

/** Receives the next record from the upstream reactive-streams system. */
override def onNext(a: A): Unit = {
override final def onNext(a: A): Unit = {
requireNonNull(
a,
"The element provided to onNext must not be null"
)
nextState(Next(a))
nextState(input = Next(a))
}

/** Called by the upstream reactive-streams system when it fails. */
override def onError(ex: Throwable): Unit = {
override final def onError(ex: Throwable): Unit = {
requireNonNull(
ex,
"The throwable provided to onError must not be null"
)
nextState(Error(ex))
nextState(input = Error(ex))
}

/** Called by the upstream reactive-streams system when it has finished sending records. */
override def onComplete(): Unit =
nextState(Complete(canceled = false))
override final def onComplete(): Unit =
nextState(input = Complete(canceled = false))

// Interop API.

/** Creates a downstream [[Stream]] from this [[Subscriber]]. */
private[flow] def stream(subscribe: F[Unit]): Stream[F, A] = {
// Called when downstream has finished consuming records.
val finalize =
F.delay(nextState(Complete(canceled = true)))
F.delay(nextState(input = Complete(canceled = true)))

// Producer for downstream.
val dequeue1 =
F.async[Option[Chunk[Any]]] { cb =>
F.delay {
nextState(Dequeue(cb))
nextState(input = Dequeue(cb))

Some(finalize)
}
Expand All @@ -112,8 +112,8 @@ private[flow] final class StreamSubscriber[F[_], A] private (
private def run(block: => Unit): () => Unit = () => block

/** Runs a single step of the state machine. */
private def step(in: Input): State => (State, () => Unit) =
in match {
private def step(input: Input): State => (State, () => Unit) =
input match {
case Subscribe(s) => {
case Uninitialized(None) =>
Idle(s) -> noop
Expand Down Expand Up @@ -263,9 +263,9 @@ private[flow] final class StreamSubscriber[F[_], A] private (
* + `Error` & `Dequeue`: No matter the order in which they are processed, we will complete the callback with the error.
* + cancellation & any other thing: Worst case, we will lose some data that we not longer care about; and eventually reach `Terminal`.
*/
private def nextState(in: Input): Unit = {
private def nextState(input: Input): Unit = {
val (_, effect) = currentState.updateAndGet { case (state, _) =>
step(in)(state)
step(input)(state)
}
// Only run the effect after the state update took place.
effect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference}

/** Implementation of a [[Subscription]].
*
* This is used by the [[StreamUnicastPublisher]] to send elements from a [[Stream]] to a downstream reactive-streams system.
* This is used by the [[StreamPublisher]] to send elements from a [[Stream]] to a downstream reactive-streams system.
*
* @see [[https://github.com/reactive-streams/reactive-streams-jvm#3-subscription-code]]
*/
Expand All @@ -58,7 +58,9 @@ private[flow] final class StreamSubscription[F[_], A] private (
sub.onComplete()
}

private[flow] def run: F[Unit] = {
// This is a def rather than a val, because it is only used once.
// And having fields increase the instantiation cost and delay garbage collection.
def run: F[Unit] = {
val subscriptionPipe: Pipe[F, A, A] = in => {
def go(s: Stream[F, A]): Pull[F, A, Unit] =
Pull.eval(F.delay(requests.get())).flatMap { n =>
Expand Down Expand Up @@ -133,14 +135,14 @@ private[flow] final class StreamSubscription[F[_], A] private (
// then the request must be a NOOP.
// See https://github.com/zainab-ali/fs2-reactive-streams/issues/29
// and https://github.com/zainab-ali/fs2-reactive-streams/issues/46
override def cancel(): Unit = {
override final def cancel(): Unit = {
val cancelCB = canceled.getAndSet(null)
if (cancelCB ne null) {
cancelCB.apply()
}
}

override def request(n: Long): Unit =
override final def request(n: Long): Unit =
// First, confirm we are not yet cancelled.
if (canceled.get() ne null) {
// Second, ensure we were requested a positive number of elements.
Expand Down
33 changes: 28 additions & 5 deletions core/shared/src/main/scala/fs2/interop/flow/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
package fs2
package interop

import cats.effect.IO
import cats.effect.kernel.{Async, Resource}
import cats.effect.unsafe.IORuntime

import java.util.concurrent.Flow.{Publisher, Subscriber, defaultBufferSize}

Expand Down Expand Up @@ -100,7 +102,7 @@ package object flow {
subscriber.stream(subscribe(subscriber))
}

/** Creates a [[Stream]] from an [[Publisher]].
/** Creates a [[Stream]] from a [[Publisher]].
*
* @example {{{
* scala> import cats.effect.IO
Expand All @@ -116,7 +118,7 @@ package object flow {
* res0: Stream[IO, Int] = Stream(..)
* }}}
*
* @note The publisher will not receive a subscriber until the stream is run.
* @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run.
*
* @see the `toStream` extension method added to `Publisher`
*
Expand All @@ -134,12 +136,15 @@ package object flow {
/** Creates a [[Publisher]] from a [[Stream]].
*
* The stream is only ran when elements are requested.
* Closing the [[Resource]] means gracefully shutting down all active subscriptions.
* Closing the [[Resource]] means not accepting new subscriptions,
* but waiting for all active ones to finish consuming.
* Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions.
* Thus, no more elements will be published.
*
* @note This Publisher can be reused for multiple Subscribers,
* each subscription will re-run the [[Stream]] from the beginning.
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
*
* @see [[unsafeToPublisher]] for an unsafe version that returns a plain [[Publisher]].
* @see [[subscribeStream]] for a simpler version that only requires a [[Subscriber]].
*
* @param stream The [[Stream]] to transform.
Expand All @@ -151,6 +156,24 @@ package object flow {
): Resource[F, Publisher[A]] =
StreamPublisher(stream)

/** Creates a [[Publisher]] from a [[Stream]].
*
* The stream is only ran when elements are requested.
*
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
*
* @see [[toPublisher]] for a safe version that returns a [[Resource]].
*
* @param stream The [[Stream]] to transform.
*/
def unsafeToPublisher[A](
stream: Stream[IO, A]
)(implicit
runtime: IORuntime
): Publisher[A] =
StreamPublisher.unsafe(stream)

/** Allows subscribing a [[Subscriber]] to a [[Stream]].
*
* The returned program will run until
Expand Down
32 changes: 28 additions & 4 deletions core/shared/src/main/scala/fs2/interop/flow/syntax.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ package fs2
package interop
package flow

import cats.effect.IO
import cats.effect.kernel.{Async, Resource}
import cats.effect.unsafe.IORuntime

import java.util.concurrent.Flow.{Publisher, Subscriber}

object syntax {
implicit final class PublisherOps[A](private val publisher: Publisher[A]) extends AnyVal {

/** Creates a [[Stream]] from an [[Publisher]].
/** Creates a [[Stream]] from a [[Publisher]].
*
* @example {{{
* scala> import cats.effect.IO
Expand All @@ -47,6 +49,8 @@ object syntax {
* res0: Stream[IO, Int] = Stream(..)
* }}}
*
* @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run.
*
* @param chunkSize setup the number of elements asked each time from the [[Publisher]].
* A high number may be useful if the publisher is triggering from IO,
* like requesting elements from a database.
Expand All @@ -63,12 +67,15 @@ object syntax {
/** Creates a [[Publisher]] from a [[Stream]].
*
* The stream is only ran when elements are requested.
* Closing the [[Resource]] means gracefully shutting down all active subscriptions.
* Closing the [[Resource]] means not accepting new subscriptions,
* but waiting for all active ones to finish consuming.
* Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions.
* Thus, no more elements will be published.
*
* @note This Publisher can be reused for multiple Subscribers,
* each subscription will re-run the [[Stream]] from the beginning.
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
*
* @see [[unsafeToPublisher]] for an unsafe version that returns a plain [[Publisher]].
* @see [[subscribe]] for a simpler version that only requires a [[Subscriber]].
*/
def toPublisher(implicit F: Async[F]): Resource[F, Publisher[A]] =
Expand All @@ -86,6 +93,23 @@ object syntax {
flow.subscribeStream(stream, subscriber)
}

implicit final class StreamIOOps[A](private val stream: Stream[IO, A]) extends AnyVal {

/** Creates a [[Publisher]] from a [[Stream]].
*
* The stream is only ran when elements are requested.
*
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
*
* @see [[toPublisher]] for a safe version that returns a [[Resource]].
*/
def unsafeToPublisher()(implicit
runtime: IORuntime
): Publisher[A] =
flow.unsafeToPublisher(stream)
}

final class FromPublisherPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal {
def apply[A](
publisher: Publisher[A],
Expand Down