Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timed Pull #2062

Merged
merged 76 commits into from
Oct 23, 2020
Merged
Show file tree
Hide file tree
Changes from 66 commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
cae950a
Gather ideas for generalised groupWithin
SystemFw Oct 2, 2020
c171a75
Add key implementation notes for TimedPull
SystemFw Oct 2, 2020
d2dcf76
Sketch out TimedPull without explicit queues
SystemFw Oct 2, 2020
a9b2632
Add roughly working prototype of groupWithin based on TimedPull
SystemFw Oct 2, 2020
8cc3109
Cleanup implementation of TimedPull
SystemFw Oct 2, 2020
87cedfd
Make Timeout existential in TimedPull
SystemFw Oct 2, 2020
15ded9b
Remove stale code
SystemFw Oct 2, 2020
6cfb7a4
Add TODOs
SystemFw Oct 2, 2020
9ac3843
Start the initial timer in groupWithin with TimedPull
SystemFw Oct 2, 2020
9936e76
Add test for equivalence between groupWithin and chunkN
SystemFw Oct 10, 2020
2c99bbd
Sketch strategy for testing stale timeouts
SystemFw Oct 11, 2020
904fd93
Identity groupWithin corner case on first chunk
SystemFw Oct 11, 2020
5cfa6cf
Detail groupWithin bug
SystemFw Oct 11, 2020
e6de3b7
Add test case for chunk manipulation in groupWithin
SystemFw Oct 11, 2020
3ef1dca
Fix compile error in dotty
SystemFw Oct 11, 2020
7b20b07
Better logic to filter stale timeouts
SystemFw Oct 12, 2020
b420b74
Clean up logic for emitting non empty chunks only
SystemFw Oct 12, 2020
eb3449d
Prototype groupWithin semantics for timeout with empty accumulator
SystemFw Oct 12, 2020
365fc6d
Remove stale code
SystemFw Oct 12, 2020
1ad6e8e
Work towards reproducing the timeout reset bug in groupWithin
SystemFw Oct 12, 2020
f718fa6
Add groupWithin timeout reset test
SystemFw Oct 12, 2020
6a7ff93
Remove stale code
SystemFw Oct 13, 2020
3a45fcb
Stub TimedPullSuite
SystemFw Oct 13, 2020
41b8bc3
Add TimedPull testing ideas
SystemFw Oct 13, 2020
0e7d20c
Move TimedPull within ToPull syntax
SystemFw Oct 13, 2020
16ec2ce
Implement groupWithin with TimedPull
SystemFw Oct 13, 2020
302ab13
Do not reset timeout in groupWithin if nothing is emitted
SystemFw Oct 13, 2020
82ad529
Fix ambiguous implicit in Scala 2
SystemFw Oct 14, 2020
ea58068
Introduce TimedPull.apply to avoid AnyVal restrictions
SystemFw Oct 14, 2020
baf4494
Do not hardcode return type in TimedPull
SystemFw Oct 14, 2020
d7d8dd8
Introduce time resetting helper in groupWithin
SystemFw Oct 14, 2020
419fb21
Split hasTimedOut into its own case
SystemFw Oct 14, 2020
c1ac498
Never emit empty chunks in groupWithin
SystemFw Oct 14, 2020
f1616fb
Add test for conditions check in groupWithin
SystemFw Oct 14, 2020
c15ad19
Empty chunks cannot be emitted
SystemFw Oct 14, 2020
ac950f6
Cosmetics
SystemFw Oct 14, 2020
08013cf
Add simple test for TimedPull
SystemFw Oct 17, 2020
9d19924
More happy path tests for TimedPull
SystemFw Oct 18, 2020
c53aada
Add test for simple timeout
SystemFw Oct 18, 2020
81046b1
Identity problem with Signal and unNone
SystemFw Oct 18, 2020
d7c90f8
Sketch solution to initial busy wait in TimedPull
SystemFw Oct 19, 2020
2da164b
Do not rely on unNone in TimedPull timeout handling
SystemFw Oct 19, 2020
7237f6b
Fix busy waiting when spinning up TimedPulls
SystemFw Oct 19, 2020
c7a9584
Cosmetics
SystemFw Oct 19, 2020
ed4627f
Use ticked in test previously affected by busy waiting
SystemFw Oct 19, 2020
edc9958
reduce test runtime
SystemFw Oct 19, 2020
54d0649
Add time annotations for 2.13
SystemFw Oct 19, 2020
9c1ab1e
Add timeout reset test
SystemFw Oct 19, 2020
bf5124c
Remove dead code
SystemFw Oct 19, 2020
0dc0748
Attempt test for timeout in flight reset
SystemFw Oct 19, 2020
cf35e36
Fix numeric widening
SystemFw Oct 19, 2020
eda9939
Fix timeout reset test - TimedPull is strongly pull-based
SystemFw Oct 19, 2020
2d47cbc
Cosmetics
SystemFw Oct 19, 2020
ee60a16
More timeout tests
SystemFw Oct 20, 2020
bbab038
Test for stale timeouts
SystemFw Oct 20, 2020
f1c5cb3
Specify the ability to shorten timeouts
SystemFw Oct 20, 2020
f0b89dc
Redesign timeouts in TimedPull to allow shortening of timeouts
SystemFw Oct 20, 2020
7044f85
Eliminate custom Timeout class in TimedPull
SystemFw Oct 20, 2020
0d5dd13
Move definition of TimedPull into timed method
SystemFw Oct 20, 2020
f44178d
Remove standalone TimedPull
SystemFw Oct 20, 2020
c95e4ef
Rename TimedPull.startTimer to timeout
SystemFw Oct 21, 2020
ae13e33
Refactor TimedPullSuite
SystemFw Oct 21, 2020
b21ad2a
Add ability to cancel a timeout without having to start another one
SystemFw Oct 21, 2020
e7e93a8
Add TimedPull example
SystemFw Oct 21, 2020
4fa9e24
Add scaladoc for pull.timed
SystemFw Oct 21, 2020
c00946a
Add scaladoc for TimedPull
SystemFw Oct 21, 2020
70bcf9b
TimedPull --> Pull.Timed
SystemFw Oct 22, 2020
ab6bf2b
Fix scaladoc links
SystemFw Oct 22, 2020
2eab6b2
Complete scaladoc for Pull.Timed
SystemFw Oct 23, 2020
8ae0947
Revamp scaladoc for groupWithin
SystemFw Oct 23, 2020
b3c8203
Remove playground
SystemFw Oct 23, 2020
92c3767
Formatting
SystemFw Oct 23, 2020
5249926
Merge branch 'develop' into timed-pull
SystemFw Oct 23, 2020
7802b8b
Formatting
SystemFw Oct 23, 2020
099a13e
Fix exhaustivity warning
SystemFw Oct 23, 2020
e59a02c
Ignore the secondary groupWithin leak test
SystemFw Oct 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 150 additions & 84 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1472,97 +1472,56 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
* Empty groups, which can occur if no elements can be pulled from upstream
* in a given time window, will not be emitted.
*
* Note: a time window starts each time downstream pulls.
* Note: a time window starts each time downstream pulls. TODO: correct this
*/
def groupWithin[F2[x] >: F[x]](
n: Int,
d: FiniteDuration
)(implicit F: Temporal[F2]): Stream[F2, Chunk[O]] =
Stream
.eval {
Queue
.synchronousNoneTerminated[F2, Either[Token, Chunk[O]]]
.product(F.ref(F.unit -> false))
}
.flatMap { case (q, currentTimeout) =>
def startTimeout: Stream[F2, Token] =
Stream.eval(Token[F2]).evalTap { token =>
val timeout = F.sleep(d) >> q.enqueue1(token.asLeft.some)

// We need to cancel outstanding timeouts to avoid leaks
// on interruption, but using `Stream.bracket` or
// derivatives causes a memory leak due to all the
// finalisers accumulating. Therefore we dispose of them
// manually, with a cooperative strategy between a single
// stream finaliser, and F finalisers on each timeout.
//
// Note that to avoid races, the correctness of the
// algorithm does not depend on timely cancellation of
// previous timeouts, but uses a versioning scheme to
// ensure stale timeouts are no-ops.
timeout.start
.bracket(_ => F.unit) { fiber =>
// note the this is in a `release` action, and therefore uninterruptible
currentTimeout.modify { case st @ (cancelInFlightTimeout, streamTerminated) =>
if (streamTerminated)
// the stream finaliser will cancel the in flight
// timeout, we need to cancel the timeout we have
// just started
st -> fiber.cancel
else
// The stream finaliser hasn't run, so we cancel
// the in flight timeout and store the finaliser for
// the timeout we have just started
(fiber.cancel, streamTerminated) -> cancelInFlightTimeout
}.flatten
}
}

def producer =
this.chunks.map(_.asRight.some).through(q.enqueue).onFinalize(q.enqueue1(None))

def emitNonEmpty(c: Chunk.Queue[O]): Stream[F2, Chunk[O]] =
if (c.size > 0) Stream.emit(c.toChunk)
else Stream.empty

def resize(c: Chunk[O], s: Stream[F2, Chunk[O]]): (Stream[F2, Chunk[O]], Chunk[O]) =
if (c.size < n) s -> c
else {
val (unit, rest) = c.splitAt(n)
resize(rest, s ++ Stream.emit(unit))
}
this.covary[F2].pull.timed { timedPull =>
def resize(c: Chunk[O], s: Pull[F2, Chunk[O], Unit]): (Pull[F2, Chunk[O], Unit], Chunk[O]) =
if (c.size < n) s -> c
else {
val (unit, rest) = c.splitAt(n)
resize(rest, s >> Pull.output1(unit))
}

def go(acc: Chunk.Queue[O], currentTimeout: Token): Stream[F2, Chunk[O]] =
Stream.eval(q.dequeue1).flatMap {
case None => emitNonEmpty(acc)
case Some(e) =>
e match {
case Left(t) if t == currentTimeout =>
emitNonEmpty(acc) ++ startTimeout.flatMap { newTimeout =>
go(Chunk.Queue.empty, newTimeout)
}
case Left(_) => go(acc, currentTimeout)
case Right(c) =>
val newAcc = acc :+ c
if (newAcc.size < n)
go(newAcc, currentTimeout)
else {
val (toEmit, rest) = resize(newAcc.toChunk, Stream.empty)
toEmit ++ startTimeout.flatMap { newTimeout =>
go(Chunk.Queue(rest), newTimeout)
}
}
}
}
// Invariants:
// acc.size < n, always
// hasTimedOut == true iff a timeout has been received, and acc.isEmpty
def go(acc: Chunk.Queue[O], timedPull: Stream.TimedPull[F2, O], hasTimedOut: Boolean = false): Pull[F2, Chunk[O], Unit] =
timedPull.uncons.flatMap {
case None =>
Pull.output1(acc.toChunk).whenA(acc.nonEmpty)
case Some((e, next)) =>
def resetTimerAndGo(q: Chunk.Queue[O]) =
timedPull.timeout(d) >> go(q, next)

e match {
case Left(_) =>
if (acc.nonEmpty)
Pull.output1(acc.toChunk) >> resetTimerAndGo(Chunk.Queue.empty)
else
go(Chunk.Queue.empty, next, hasTimedOut = true)
case Right(c) if hasTimedOut =>
// it has timed out without reset, so acc is empty
val (toEmit, rest) =
if (c.size < n) Pull.output1(c) -> Chunk.empty
else resize(c, Pull.done)
toEmit >> resetTimerAndGo(Chunk.Queue(rest))
case Right(c) if !hasTimedOut =>
val newAcc = acc :+ c
if (newAcc.size < n)
go(newAcc, next)
else {
val (toEmit, rest) = resize(newAcc.toChunk, Pull.done)
toEmit >> resetTimerAndGo(Chunk.Queue(rest))
}
}
}

startTimeout
.flatMap(t => go(Chunk.Queue.empty, t).concurrently(producer))
.onFinalize {
currentTimeout
.getAndSet(F.unit -> true)
.flatMap { case (cancelInFlightTimeout, _) => cancelInFlightTimeout }
}
}
timedPull.timeout(d) >> go(Chunk.Queue.empty, timedPull)
}.stream

/**
* If `this` terminates with `Stream.raiseError(e)`, invoke `h(e)`.
Expand Down Expand Up @@ -4289,6 +4248,80 @@ object Stream extends StreamLowPriority {
Pull.output(pfx) >> Pull.pure(Some(tl.cons(sfx)))
}
}

/**
* Allows expressing `Pull` computations whose `uncons` can receive
* a user-controlled, resettable `timeout`.
* See [[TimedPull]] for more info on timed `uncons` and `timeout`.
*
* As a quick example, let's write a timed pull which emits the
* string "late!" whenever a chunk of the stream is not emitted
* within 150 milliseconds:
*
* @example {{{
* scala> import cats.effect.IO
* scala> import cats.effect.unsafe.implicits.global
* scala> import scala.concurrent.duration._
* scala> val s = (Stream("elem") ++ Stream.sleep_[IO](200.millis)).repeat.take(3)
* scala> s.pull
* | .timed { timedPull =>
* | def go(timedPull: Stream.TimedPull[IO, String]): Pull[IO, String, Unit] =
* | timedPull.timeout(150.millis) >> // starts new timeout and stops the previous one
* | timedPull.uncons.flatMap {
* | case Some((Right(elems), next)) => Pull.output(elems) >> go(next)
* | case Some((Left(_), next)) => Pull.output1("late!") >> go(next)
* | case None => Pull.done
* | }
* | go(timedPull)
* | }.stream.compile.toVector.unsafeRunSync()
* res0: Vector[String] = Vector(elem, late!, elem, late!, elem)
* }}}
*
* For a more complex example, look at the implementation of [[Stream.groupWithin]].
*/
def timed[O2, R](pull: Stream.TimedPull[F, O] => Pull[F, O2, R])(implicit F: Temporal[F]): Pull[F, O2, R] =
Pull
.eval { Token[F].mproduct(id => SignallingRef.of(id -> 0.millis)) }
.flatMap { case (initial, time) =>

def timeouts: Stream[F, Token] =
time
.discrete
.dropWhile { case (id, _) => id == initial }
.switchMap { case (id, duration) =>
// We cannot move this check into a `filter`:
// we want `switchMap` to execute and cancel the previous timeout
if (duration != 0.nanos)
Stream.sleep(duration).as(id)
else
Stream.empty
}

def output: Stream[F, Either[Token, Chunk[O]]] =
timeouts.map(_.asLeft)
.mergeHaltR(self.chunks.map(_.asRight))
.flatMap {
case chunk @ Right(_) => Stream.emit(chunk)
case timeout @ Left(id) =>
Stream
.eval(time.get)
.collect { case (currentTimeout, _) if currentTimeout == id => timeout }
}

def toTimedPull(s: Stream[F, Either[Token, Chunk[O]]]): TimedPull[F, O] = new TimedPull[F, O] {
type Timeout = Token

def uncons: Pull[F, INothing, Option[(Either[Timeout, Chunk[O]], TimedPull[F, O])]] =
s.pull.uncons1
.map( _.map { case (r, next) => r -> toTimedPull(next) })

def timeout(t: FiniteDuration): Pull[F, INothing, Unit] = Pull.eval {
Token[F].tupleRight(t).flatMap(time.set)
}
}

pull(toTimedPull(output))
}
}

/** Projection of a `Stream` providing various ways to compile a `Stream[F,O]` to a `G[...]`. */
Expand Down Expand Up @@ -4560,6 +4593,39 @@ object Stream extends StreamLowPriority {
def toVector: G[Vector[O]] = to(Vector)
}

/**
* An abstraction for writing `Pull` computations that can timeout
* while reading from a `Stream`.
*
* A `TimedPull` is not created or intepreted directly, but by calling [[timed]].
* {{{
* yourStream.pull.timed(tp => ...).stream
* }}}
*
* The argument to `timed` is a `TimedPull[F, O] => Pull[F, O2, R]`
* function which describes the pulling logic, and is often recursive, with shape:
*
* {{{
* def go(tp: TimedPull[F, A]): Pull[F, B, Unit] =
* tp.uncons.flatMap {
* case Some((Right(chunk), next)) => doSomething >> go(next)
* case Some((Left(_), next)) => doSomethingElse >> go(next)
* case None => Pull.done
* }
* }}}
*
* Where `doSomething` and `doSomethingElse` are `Pull` computations
* such as `Pull.output`, in addition to `TimedPull.timeout`.
*
* See below for detailed descriptions of `timeout` and `uncons`, and
* look at the [[ToPull.timed]] scaladoc for an example of usage.
*/
trait TimedPull[F[_], O] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on defining this as Pull.Timed instead of Stream.TimedPull?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh interesting. It is closer to ToPull in spirit, but I do like Pull.Timed and it's probably more discoverable, so if you're definitely 👍 on Pull.Timed, I'll move it there :)

type Timeout
def uncons: Pull[F, INothing, Option[(Either[Timeout, Chunk[O]], TimedPull[F, O])]]
def timeout(t: FiniteDuration): Pull[F, INothing, Unit]
}

/**
* When merging multiple streams, this represents step of one leg.
*
Expand Down
47 changes: 47 additions & 0 deletions core/shared/src/main/scala/fs2/ex.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2


object ex {

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import scala.concurrent.duration._

val emitThenWait = Stream("elem") ++ Stream.sleep_[IO](200.millis)
val s = emitThenWait.repeat.take(3)

def a = s.pull
.timed { timedPull =>
def go(timedPull: Stream.TimedPull[IO, String]): Pull[IO, String, Unit] =
timedPull.timeout(150.millis) >> // starts a timeout timer, resets the previous one
timedPull.uncons.flatMap {
case Some((Right(elems), next)) => Pull.output(elems) >> go(next)
case Some((Left(_), next)) => Pull.output1("late!") >> go(next)
case None => Pull.done
}

go(timedPull)
}.stream.compile.toVector.unsafeRunSync()
//val res0: Vector[String] = Vector(elem, late!, elem, late!, elem)
}
Loading