Skip to content

Commit

Permalink
Pull: turn flatMapOutput into an extension method. (#3016)
Browse files Browse the repository at this point in the history
We may like method calls, even extension methods, better than
functions. Plus, we need fewer type arguments in the calls.
  • Loading branch information
diesalbla authored Oct 16, 2022
1 parent d38a2bd commit 91e7079
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 15 deletions.
22 changes: 14 additions & 8 deletions core/shared/src/main/scala/fs2/Pull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,16 @@ object Pull extends PullLowPriority {
* Use `p.void.stream` to explicitly ignore the result of a pull.
*/
def streamNoScope: Stream[F, O] = new Stream(self)

private[fs2] def flatMapOutput[F2[x] >: F[x], O2](
f: O => Pull[F2, O2, Unit]
): Pull[F2, O2, Unit] =
self match {
case a: AlgEffect[F, Unit] => a
case r: Terminal[_] => r
case _ => FlatMapOutput(self, f)
}

}

private[this] val unit: Terminal[Unit] = Succeeded(())
Expand Down Expand Up @@ -1029,7 +1039,7 @@ object Pull extends PullLowPriority {
else {
def go(idx: Int): Pull[G, X, Unit] =
if (idx == chunk.size)
flatMapOutput[G, G, Y, X](tail, fun)
tail.flatMapOutput(fun)
else {
try {
var j = idx
Expand All @@ -1046,8 +1056,7 @@ object Pull extends PullLowPriority {
case Succeeded(_) => go(j + 1)
case Fail(err) => Fail(err)
case interruption @ Interrupted(_, _) =>
val ib = interruptBoundary(tail, interruption)
flatMapOutput[G, G, Y, X](ib, fun)
interruptBoundary(tail, interruption).flatMapOutput(fun)
}
} catch { case NonFatal(e) => Fail(e) }
}
Expand Down Expand Up @@ -1288,15 +1297,12 @@ object Pull extends PullLowPriority {
go(initScope, None, initFk, new OuterRun(init), stream)
}

@deprecated("use the extension method", "3.4.0")
private[fs2] def flatMapOutput[F[_], F2[x] >: F[x], O, O2](
p: Pull[F, O, Unit],
f: O => Pull[F2, O2, Unit]
): Pull[F2, O2, Unit] =
p match {
case a: AlgEffect[F, Unit] => a
case r: Terminal[_] => r
case _ => FlatMapOutput(p, f)
}
p.flatMapOutput(f)

private[fs2] def translate[F[_], G[_], O](
stream: Pull[F, O, Unit],
Expand Down
14 changes: 7 additions & 7 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
queue: Queue[F2, Option[Chunk[O2]]]
): Stream[F2, Nothing] = enqueueNoneTerminatedChunks(queue: QueueSink[F2, Option[Chunk[O2]]])

import Pull.StreamPullOps

/** Alias for `flatMap(o => Stream.eval(f(o)))`.
*
* @example {{{
Expand All @@ -975,8 +977,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
* is available, however, with caveats.
*/
def evalMap[F2[x] >: F[x], O2](f: O => F2[O2]): Stream[F2, O2] = {
def evalOut(o: O): Pull[F2, O2, Unit] = Pull.eval(f(o)).flatMap(Pull.output1)
Pull.flatMapOutput[F, F2, O, O2](underlying, evalOut).streamNoScope
def evalOut(o: O) = Pull.eval(f(o)).flatMap(Pull.output1)
underlying.flatMapOutput(evalOut).streamNoScope
}

/** Like `evalMap`, but operates on chunks for performance. This means this operator
Expand Down Expand Up @@ -1194,7 +1196,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
def flatMap[F2[x] >: F[x], O2](
f: O => Stream[F2, O2]
)(implicit ev: NotGiven[O <:< Nothing]): Stream[F2, O2] =
new Stream(Pull.flatMapOutput[F, F2, O, O2](underlying, (o: O) => f(o).underlying))
underlying.flatMapOutput((o: O) => f(o).underlying).streamNoScope

/** Alias for `flatMap(_ => s2)`. */
def >>[F2[x] >: F[x], O2](
Expand Down Expand Up @@ -1285,10 +1287,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
* res0: Unit = ()
* }}}
*/
def foreach[F2[x] >: F[x]](f: O => F2[Unit]): Stream[F2, Nothing] = {
def exec(o: O): Pull[F2, Nothing, Unit] = Pull.eval(f(o))
Pull.flatMapOutput[F, F2, O, Nothing](underlying, exec).streamNoScope
}
def foreach[F2[x] >: F[x]](f: O => F2[Unit]): Stream[F2, Nothing] =
underlying.flatMapOutput(o => Pull.eval(f(o))).streamNoScope

/** Partitions the input into a stream of chunks according to a discriminator function.
*
Expand Down

0 comments on commit 91e7079

Please sign in to comment.