Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move j.u.c.Flow interop methods onto Stream #3346

Merged
merged 7 commits into from
Jan 17, 2024

Conversation

armanbilge
Copy link
Member

@armanbilge armanbilge commented Nov 24, 2023

So that it Just Works™️ without the annoying import fs2.interop.flow.syntax._, which is now deprecated. cc @BalmungSan

subscription.run
): Stream[F, Nothing] =
Stream.eval(apply(stream, subscriber)).flatMap { subscription =>
Stream.eval(F.delay(subscriber.onSubscribe(subscription))) >> subscription.run
Copy link
Contributor

@BalmungSan BalmungSan Nov 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be exec and ++, shouldn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's exec, then subscription.run will never evaluate.

The two options are:

Stream.eval(...) >> subscription.run
Stream.exec(...) ++ subscription.run

The former felt more appropriate to me, it establishes the monadic dependence. Not sure what the subtle differences are.

Comment on lines +4484 to +4500
/** Provides syntax for `IO` streams. */
implicit final class IOOps[A](private val self: Stream[IO, A]) extends AnyVal {

/** Creates a [[Publisher]] from this [[Stream]].
*
* The stream is only ran when elements are requested.
*
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
*
* @see [[toPublisher]] for a safe version that returns a [[Stream]].
*/
def unsafeToPublisher()(implicit
runtime: IORuntime
): Publisher[A] =
interop.flow.unsafeToPublisher(self)
}
Copy link
Member Author

@armanbilge armanbilge Nov 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the one I'm most interested in. It is the Stream analog of io.unsafeToFuture().

The motivation is to facilitate interop with other libraries e.g. raquo/Airstream#114:

EventStream.fromPublisher(fs2Stream.unsafeToPublisher())

@BalmungSan
Copy link
Contributor

A couple of random thoughts against this:

  1. This is interop with a different "library" so it makes sense to be on the interop package.
  2. Is an elimination form, and folks think Stream => IO is the way, plus this one produces an "impure" thing.
  3. It requires Java 9+.
  4. We are already used to syntax imports even if we hate them.

A couple of random thoughts in favor of this:

  1. I really dislike syntax imports; this is one of the reasons I prefer to use IO over F.
  2. API discoverability.
  3. These may be very useful / common for folks doing interop with other libraries.
  4. Monix had these as part of their Obersavle API.

@armanbilge
Copy link
Member Author

I think those are really good points, which is why I asked Luis to post them here. I think a reasonable compromise could be to keep Stream#unsafeToPublisher and Stream.fromPublisher since those are the easiest to use and cover the most important use-cases. The interop.flow package can continue to be used for everything else.

@armanbilge armanbilge merged commit 70c754d into typelevel:main Jan 17, 2024
15 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants