Skip to content

Commit

Permalink
Add TODOs
Browse files Browse the repository at this point in the history
  • Loading branch information
SystemFw committed Oct 2, 2020
1 parent e976c36 commit 0d380d9
Showing 1 changed file with 4 additions and 5 deletions.
9 changes: 4 additions & 5 deletions core/shared/src/main/scala/fs2/concurrent/TimedPull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ object tp {
def startTimer(t: FiniteDuration): Pull[F, INothing, Unit]
}
object TimedPull {
// newTimeout
// change Token in TimedPull
// move filter of stale timeouts in the Pull
def go[F[_]: Temporal, A, B](pull: TimedPull[F, A] => Pull[F, B, Unit]): Pipe[F, A, B] = { source =>
def now = Temporal[F].monotonic

Expand All @@ -52,7 +49,7 @@ object tp {
def nextAfter(t: Timeout): Stream[F, Timeout] =
time.discrete.unNone.dropWhile(_.id == t.id).head

// is the initial time.get.unNone fine or does it spin?
// TODO is the initial time.get.unNone fine or does it spin?
def timeouts: Stream[F, Token] =
Stream.eval(time.get).unNone.flatMap { timeout =>
Stream.eval(timeout.asOfNow).flatMap { t =>
Expand All @@ -65,7 +62,8 @@ object tp {
timeouts
.map(_.asLeft)
.mergeHaltR(source.chunks.map(_.asRight))
.evalMapFilter {
.evalMapFilter { // TODO filter of stale timeouts in uncons

case Right(c) => c.asRight[Token].some.pure[F]
case Left(id) => time.get.map(t => t.filter(_.id != id).as(id.asLeft[Chunk[A]]))
}
Expand Down Expand Up @@ -97,6 +95,7 @@ object tp {
if (c.size < n) s -> c
else {
val (unit, rest) = c.splitAt(n)
// TODO use Chunq.queue here instead of aggregating via emission?
resize(rest, s >> Pull.output1(unit))
}

Expand Down

0 comments on commit 0d380d9

Please sign in to comment.