diff --git a/core/shared/src/main/scala/fs2/concurrent/TimedPull.scala b/core/shared/src/main/scala/fs2/concurrent/TimedPull.scala index eec57e3e56..17bd38fa11 100644 --- a/core/shared/src/main/scala/fs2/concurrent/TimedPull.scala +++ b/core/shared/src/main/scala/fs2/concurrent/TimedPull.scala @@ -35,9 +35,6 @@ object tp { def startTimer(t: FiniteDuration): Pull[F, INothing, Unit] } object TimedPull { - // newTimeout - // change Token in TimedPull - // move filter of stale timeouts in the Pull def go[F[_]: Temporal, A, B](pull: TimedPull[F, A] => Pull[F, B, Unit]): Pipe[F, A, B] = { source => def now = Temporal[F].monotonic @@ -52,7 +49,7 @@ object tp { def nextAfter(t: Timeout): Stream[F, Timeout] = time.discrete.unNone.dropWhile(_.id == t.id).head - // is the initial time.get.unNone fine or does it spin? + // TODO is the initial time.get.unNone fine or does it spin? def timeouts: Stream[F, Token] = Stream.eval(time.get).unNone.flatMap { timeout => Stream.eval(timeout.asOfNow).flatMap { t => @@ -65,7 +62,8 @@ object tp { timeouts .map(_.asLeft) .mergeHaltR(source.chunks.map(_.asRight)) - .evalMapFilter { + .evalMapFilter { // TODO filter of stale timeouts in uncons + case Right(c) => c.asRight[Token].some.pure[F] case Left(id) => time.get.map(t => t.filter(_.id != id).as(id.asLeft[Chunk[A]])) } @@ -97,6 +95,7 @@ object tp { if (c.size < n) s -> c else { val (unit, rest) = c.splitAt(n) + // TODO use Chunq.queue here instead of aggregating via emission? resize(rest, s >> Pull.output1(unit)) }