Skip to content

Commit

Permalink
Small optimisations: evalMapFilter and evalFilter
Browse files Browse the repository at this point in the history
- evalMapFilter: directly implement with Pull.flatMapOutput
- evalFilter: likewise, implement with Pull.flatMapOutput
- Replace a few uses of `evalMap` with `foreach`
  • Loading branch information
diesalbla committed Oct 16, 2022
1 parent 91e7079 commit 2b623f4
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1032,8 +1032,11 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
* res0: List[Int] = List(4, 8)
* }}}
*/
def evalMapFilter[F2[x] >: F[x], O2](f: O => F2[Option[O2]]): Stream[F2, O2] =
evalMap(f).collect { case Some(v) => v }
def evalMapFilter[F2[x] >: F[x], O2](f: O => F2[Option[O2]]): Stream[F2, O2] = {
// Short definition: evalMap(f).collect { case Some(v) => v }
def evalOut(o: O): Pull[F2, O2, Unit] = Pull.eval(f(o)).flatMap(Pull.outputOption1)
underlying.flatMapOutput(evalOut).streamNoScope
}

/** Like `[[Stream#scan]]`, but accepts a function returning an `F[_]`.
*
Expand Down Expand Up @@ -1099,8 +1102,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
* Note: The result Stream will consist of chunks that are empty or 1-element-long.
* If you want to operate on chunks after using it, consider buffering, e.g. by using [[buffer]].
*/
def evalFilter[F2[x] >: F[x]](f: O => F2[Boolean]): Stream[F2, O] =
flatMap(o => Stream.eval(f(o)).ifM(Stream.emit(o), Stream.empty))
def evalFilter[F2[x] >: F[x]](f: O => F2[Boolean]): Stream[F2, O] = {
def onElem(o: O): Pull[F2, O, Unit] = Pull.eval(f(o)).ifM(Pull.output(o), Pull.empty)
underlying.flatMapOutput(onElem).streamNoScope
}

/** Like `filter`, but allows filtering based on an effect, with up to `maxConcurrent` concurrently running effects.
* The ordering of emitted elements is unchanged.
Expand Down Expand Up @@ -2196,7 +2201,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
pauseWhenTrue: Stream[F2, Boolean]
): Stream[F2, O] =
Stream.eval(SignallingRef[F2, Boolean](false)).flatMap { pauseSignal =>
def writer = pauseWhenTrue.evalMap(pauseSignal.set).drain
def writer = pauseWhenTrue.foreach(pauseSignal.set)

pauseWhen(pauseSignal).mergeHaltBoth(writer)
}
Expand Down Expand Up @@ -4138,7 +4143,7 @@ object Stream extends StreamLowPriority {

def outcomeJoiner: F[Unit] =
outcomes.stream
.evalMap(identity)
.foreach(identity)
.compile
.drain
.guaranteeCase {
Expand Down

0 comments on commit 2b623f4

Please sign in to comment.