Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timed Pull #2062

Merged
merged 76 commits into from
Oct 23, 2020
Merged

Timed Pull #2062

merged 76 commits into from
Oct 23, 2020

Conversation

SystemFw
Copy link
Collaborator

@SystemFw SystemFw commented Oct 2, 2020

Timed Pulls aim to cover a gap in our current api: aggregation with timeouts.

Examples:

  • The current implementation of groupWithin , which is very complex
  • groupWeighedWithin, which was asked in Add groupedWeightedWithin #2060
  • In the past I've needed groupAdjacentWithin for some Kafka code
  • Things like posting a metric if the stream is not "advancing" in time

Since the demise of AsyncPull, uncons cannot be timed out, and the alternative approach needed to implement this set of combinators is complex: you need queues, a mechanism of resettable timeout, a mechanism to avoid losing messages, etc.

Timed Pull brings a much simpler, Pull-like api to the problem, but without requiring any "kernel mode" operations.

This is still very WIP, but I'd like to get some feedback about the api before I go implement the various combinators and relative tests and docs.

@SystemFw SystemFw requested a review from mpilquist October 2, 2020 22:32
def startTimer(t: FiniteDuration): Pull[F, INothing, Unit]
}
object TimedPull {
def go[F[_]: Temporal, A, B](pull: TimedPull[F, A] => Pull[F, B, Unit]): Pipe[F, A, B] = { source =>
Copy link
Member

@mpilquist mpilquist Oct 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this signature, what do you think of:

// On ToPull[F[_], O]
def timed[A](pull: TimedPull[F, O] => Pull[F, B, Unit])(implicit t: Temporal[F]): Pull[F, B, Unit]

Usage like:

s.pull.timed { tp => ... }.stream

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of doing both, kinda like Broadcast does it.
I prefer yours for normal usage tbh, my only "issue" with it is that if you go to look at TimedPull, there's nothing there showing you how to use it (and you need to refer to ToPull)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternative is to put TimedPull inside Stream, which is how StepLeg does it, and have the signature you propose exclusively. I don't mind that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm going with your suggestion for sure, since it allows reusing things like repeatPull

Copy link
Member

@mpilquist mpilquist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty cool. One question - how much of the pull creation API would need to be duplicated on TimedPull -- e.g., how do you get a unconsN and unconsLimit that's time aware?

@SystemFw
Copy link
Collaborator Author

SystemFw commented Oct 3, 2020

how do you get a unconsN and unconsLimit that's time aware?

That's a great question, one I do not have a great answer to yet. My instinct is to be light on duplication, but perhaps the two you have mentioned should be there.

My instinct is to be light on duplication,

Or rather, to be guided by the timed combinators we want to add, rather than starting out by duplicating big swathes of the Pull api

@SystemFw
Copy link
Collaborator Author

SystemFw commented Oct 3, 2020

e.g., how do you get a unconsN and unconsLimit that's time aware?

Btw, if Pull was left abstract (more like an interface than a sealed trait), there is a scary/intriguing possibility of transforming existing Pulls into timed versions, but it would need to be able to reinterpret Pull.pure as well. Not quite sure whether that's a fantastic or terrible idea :)

@mpilquist
Copy link
Member

We need explicit signaling of the timeout starts though, right? E.g. startTimeout then uncons 2 chunks versus timeout uncons timeout uncons? Or would the transformed pulls just add a timeout to each uncons?

@SystemFw
Copy link
Collaborator Author

SystemFw commented Oct 3, 2020

Or would the transformed pulls just add a timeout to each uncons?

I mean, it's a pretty far out idea, but I was thinking each uncons would handle a potential a timeout and each "return" would start/reset one. You need to know which Pull you're reinterpreting though, cause for some "return" is pure, and for others is output. But overall I'd like to get this api (the manual one) in regardless, given how hard it is to implement the equivalent functionality right now, and then think longer about Pull introspection

Comment on lines 32 to 36
trait TimedPull[F[_], A] {
type Timeout
def uncons: Pull[F, INothing, Option[(Either[Timeout, Chunk[A]], TimedPull[F, A])]]
def startTimer(t: FiniteDuration): Pull[F, INothing, Unit]
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all makes sense to me, it's a nice mirror of the Pull api 👍


// if we express timedUncons and startTimer as `F`, that guarantee is hard to achieve
// cause they could be concurrent. We could express both as Pulls, forcing a sync loop.
// need to add a note that the `unconsTimed` `Pull`, unlike `uncons`, is not idempotent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate at this? For me it's not obvious what the cause or implication of this is

Copy link
Collaborator Author

@SystemFw SystemFw Oct 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I need some time to formulate a response, it's a bit subtle (which is why I noted it down first thing).
As a start, try and get familiar with the stale timeout check in the old groupWithin

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah wait, do you mean the idempotency comment? That's not true anymore, it is idempotent now, since it's expressed in terms of uncons1. I thought you meant why using Pull in TimedPull rather than F[Unit] and F[Option[Either[Timeout, Chunk[A]]]], that's the subtle bit, but it's settled now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to the idempotency comment, yes

Copy link
Collaborator Author

@SystemFw SystemFw Oct 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, so I knew I wanted to have Pull to forbid concurrency in the api (that's the thing I thought you were asking), and at first I was thinking about having an explicit queue and do uncons = Pull.eval(dequeue1), which would mean however that uncons >> uncons would get different elements, whereas the normal Pull.uncons is idempotent. I thought it didn't matter too much anyway since it's generally used linearly, but I was happier when I found out I could avoid the explicit queue altogether and rely on merge only, instead.

@@ -706,6 +706,9 @@ class StreamCombinatorsSuite extends Fs2Suite {
}

group("groupWithin") {
// TODO more tests:
// with no waiting, it should be equal to chunkN
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

.dropWhile(_.id == initial.id)
.switchMap { timeout =>
Stream.sleep(timeout.d).as(timeout.id)
}
Copy link
Collaborator Author

@SystemFw SystemFw Oct 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question on this @mpilquist @djspiewak . I think only data can give a definite answer but I just want to gauge what you think first. Look at the diff for the implementation of timeouts.
The problem is implementing resettable timeouts.
The first one only uses one extra fiber, but it only works if calls to resetTimer never try to reset a timeout which hasn't triggered yet with a shorter one.
The second is fully general, but it switches fiber at every timeout reset.

My question is, do you think the first (based on your sensibility) would offer a significant advantage in perf and/or memory utilisation? If so, do you think it's worth trying to introduce it as a "fast-path" for when resetTimer is called in the right conditions (and only fallback to switchMap if not), or is that too much trouble for little gain?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting question. Pondering…

One thought here is that fibers are extremely cheap. The context shift around start doesn't even involve memory barriers by default, since it just plops the new fiber onto the same worker thread, so you can pretty safely spam that function. The allocation itself creates some memory pressure, but the resident size of IOFiber is not enormous. It's basically analogous to the magnitude of allocations you need to perform if you interpret .pull.uncons1 on a small (~16) chunk, but with fewer objects. Specifically, five objects get allocated in addition to the IOFiber when you call start, two of which are small arrays. The object header of IOFiber itself is the largest memory footprint in the whole menagerie.

Tldr, I suspect that vomiting out fibers is going to be fine. It's not something I would want to be doing in a hot path, but I also wouldn't want to be hitting Stream chunk boundaries within a hot path either (for the same reason), and it's around the same order of magnitude cost.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Just note that switchMap is a lot more than just creating a Fiber (queues, semaphores etc are involved), but I think the memory pressure concern would be the same, and sounds like it shouldn't be too much of an issue

* See below for detailed descriptions of `timeout` and `uncons`, and
* look at the [[ToPull.timed]] scaladoc for an example of usage.
*/
trait TimedPull[F[_], O] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on defining this as Pull.Timed instead of Stream.TimedPull?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh interesting. It is closer to ToPull in spirit, but I do like Pull.Timed and it's probably more discoverable, so if you're definitely 👍 on Pull.Timed, I'll move it there :)

@SystemFw SystemFw marked this pull request as ready for review October 23, 2020 01:27
@SystemFw
Copy link
Collaborator Author

Great, there is a failure in MemoryLeakSpec for groupWithin. Although I have a vague memory that you've seen it recently (but before this change) @mpilquist ?

@SystemFw
Copy link
Collaborator Author

oh yeah #2056 (comment) , I'll restart

@SystemFw
Copy link
Collaborator Author

I ran the groupWithin 2 memory leak test locally a couple of times and it passes with no problems (manual inspection with visualvm agrees)

@mpilquist mpilquist merged commit e355955 into typelevel:develop Oct 23, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants