diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index e1aa483e46..cde168b2c9 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2856,6 +2856,55 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, .widen[Either[Throwable, Unit]] ) + /** Fails this stream with a `TimeoutException` if it does not emit a new chunk within the + * given `timeout` after it is requested. + */ + def timeoutOnPull[F2[x] >: F[x]: Temporal](timeout: FiniteDuration): Stream[F2, O] = + timeoutOnPullTo( + timeout, + Stream.raiseError[F2](new TimeoutException(s"Timed out waiting for pull after $timeout")) + ) + + /** Stops pulling from this stream if it does not emit a new chunk within the + * given `timeout` after it is requested, and starts pulling from the `onTimeout` stream instead. + * + * @example {{{ + * scala> import cats.effect.IO + * scala> import cats.effect.unsafe.implicits.global + * scala> import scala.concurrent.duration._ + * scala> val s = Stream(1) ++ Stream.sleep_[IO](100.millis) ++ Stream(2).repeat.meteredStartImmediately[IO](200.millis) + * scala> s.timeoutOnPullTo(150.millis, Stream(3)).compile.toVector.unsafeRunSync() + * res0: Vector[Int] = Vector(1, 2, 3) + * }}} + */ + def timeoutOnPullTo[F2[x] >: F[x]: Temporal, O2 >: O]( + timeout: FiniteDuration, + onTimeout: => Stream[F2, O2] + ): Stream[F2, O2] = + timeoutOnPullWith[F2, O2](timeout)(_ => onTimeout) + + /** Applies the pipe `f` if this stream does not emit a new chunk within the given `timeout` after + * it is requested. + * + * @example {{{ + * scala> import cats.effect._ + * scala> import cats.effect.unsafe.implicits.global + * scala> import scala.concurrent.duration._ + * scala> val s = Stream(1) ++ Stream.sleep_[IO](100.millis) ++ Stream(2).repeat.meteredStartImmediately[IO](200.millis) + * scala> Ref[IO].of(0).flatTap { lateCount => + * | s.take(4).timeoutOnPullWith(150.millis)(Stream.exec(lateCount.update(_ + 1)) ++ _).compile.drain + * | }.flatMap(_.get).unsafeRunSync() + * res0: Int = 2 + * }}} + */ + def timeoutOnPullWith[F2[x] >: F[x]: Temporal, O2 >: O](timeout: FiniteDuration)( + f: Pipe[F2, O2, O2] + ): Stream[F2, O2] = this + .covaryAll[F2, O2] + .pull + .timeoutWith(timeout)(_.stream.through(f).pull.echo) + .stream + /** Creates a [[Publisher]] from this [[Stream]]. * * The stream is only ran when elements are requested. @@ -4955,6 +5004,31 @@ object Stream extends StreamLowPriority { pull(toTimedPull(output)) } + + /** Transforms this pull with the function `f` whenever an element is not emitted within + * the duration `t`. + * @example {{{ + * scala> import cats.effect.IO + * scala> import cats.effect.unsafe.implicits.global + * scala> import scala.concurrent.duration._ + * scala> val s = (Stream("elem") ++ Stream.sleep_[IO](600.millis)).repeat.take(3) + * scala> s.pull.timeoutWith(450.millis)(Pull.output1("late!") >> _).stream.compile.toVector.unsafeRunSync() + * res0: Vector[String] = Vector(elem, late!, elem, late!, elem) + * }}} + */ + def timeoutWith[O2 >: O](t: FiniteDuration)(f: Pull[F, O2, Unit] => Pull[F, O2, Unit])(implicit + F: Temporal[F] + ): Pull[F, O2, Unit] = + timed { timedPull => + def go(timedPull: Pull.Timed[F, O]): Pull[F, O2, Unit] = + timedPull.timeout(t) >> + timedPull.uncons.flatMap { + case Some((Right(elems), next)) => Pull.output(elems) >> go(next) + case Some((Left(_), next)) => f(go(next)) + case None => Pull.done + } + go(timedPull) + } } /** Projection of a `Stream` providing various ways to compile a `Stream[F,O]` to a `G[...]`. */