From 1c7595e20e340c71422f517e112a258278457428 Mon Sep 17 00:00:00 2001 From: "Diego E. Alonso" Date: Sun, 16 Oct 2022 17:15:02 +0100 Subject: [PATCH] Small optimisations: evalMapFilter - evalMapFilter: directly implement with Pull.flatMapOutput - Replace a few uses of `evalMap` with `foreach` --- core/shared/src/main/scala/fs2/Stream.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 46b0a5d24d..0d590a92a8 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1032,8 +1032,11 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * res0: List[Int] = List(4, 8) * }}} */ - def evalMapFilter[F2[x] >: F[x], O2](f: O => F2[Option[O2]]): Stream[F2, O2] = - evalMap(f).collect { case Some(v) => v } + def evalMapFilter[F2[x] >: F[x], O2](f: O => F2[Option[O2]]): Stream[F2, O2] = { + // Short definition: evalMap(f).collect { case Some(v) => v } + def evalOut(o: O): Pull[F2, O2, Unit] = Pull.eval(f(o)).flatMap(Pull.outputOption1) + underlying.flatMapOutput(evalOut).streamNoScope + } /** Like `[[Stream#scan]]`, but accepts a function returning an `F[_]`. * @@ -2196,7 +2199,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, pauseWhenTrue: Stream[F2, Boolean] ): Stream[F2, O] = Stream.eval(SignallingRef[F2, Boolean](false)).flatMap { pauseSignal => - def writer = pauseWhenTrue.evalMap(pauseSignal.set).drain + def writer = pauseWhenTrue.foreach(pauseSignal.set) pauseWhen(pauseSignal).mergeHaltBoth(writer) } @@ -4138,7 +4141,7 @@ object Stream extends StreamLowPriority { def outcomeJoiner: F[Unit] = outcomes.stream - .evalMap(identity) + .foreach(identity) .compile .drain .guaranteeCase {