Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request (possibly) : Dynamic metering on Streams #1270

Closed
TobiasRoland opened this issue Nov 1, 2023 · 2 comments
Closed

Feature request (possibly) : Dynamic metering on Streams #1270

TobiasRoland opened this issue Nov 1, 2023 · 2 comments

Comments

@TobiasRoland
Copy link

In the same manner that you can use a Signal[F, Boolean] to pause consumption, which I've found incredibly useful for code with e.g. dynamic feature flags to turn on/off consumption, I am hoping to throttle consumption.

1. is there currently a baked-in way to have dynamic metering on a stream, other than `evalTap(_ => doSomeDelay())

The limit of this is that someSleep won't have access to when the stream last emitted, how much time has elapsed, etc etc, which leads me to my actual question:

2. I think this is the feature request I'm asking for, if people would find this useful

// ignoring the details of where these signals come from
val targetDelaySignal: Signal[F, FiniteDuration] = createDelaySignal[F]()
val pauseSignal: Signal[F, Boolean] = createPauseSignal[F]()

// I would like to be able to:
someStream.
    .pauseWhen(pauseSignal)
    .meteredBy(targetDelay)
   .flatMap(...etcetc)

I would want the semantics of it to internally keep track of last time it produced a message, and if signal value is emitted that is less than what was last set, AND more time has passed since it was set, then it would immediately emit and then wait for the next duration to elapse. And inversely, if the duration is increased since last, then the time-delta is taken into consideration

So - in BDD format, this is the behaviour that I'm after:

Scenario 1: Initial startup
GIVEN target delay is initially 1 minute
WHEN the stream is started
THEN it emits a message every minute as long as there are more elements to emit

Scenario 2: Delay is changed to be < than it was previously
GIVEN targetDelay is initially 1 minute
AND the stream emits one message
AND 30 seconds pass
WHEN the targetDelay signal is changed to 5 seconds
THEN a message is immediately emitted because (newTime - timePassedSinceLastInvocaton) is negative or 0
AND every 5 seconds after this, a new message is emitted

Scenario 2: Delay is changed to be > than what it was previously
GIVEN targetDelay is initially 1 minute
AND the stream emits one message
AND 30 seconds pass
WHEN the targetDelay signal is changed to 45 seconds
THEN when an additional 15 seconds elapse, a message is immediately emitted
AND every 45 seconds after this, a new message is emitted
@TobiasRoland
Copy link
Author

... well, actually, thinking about it, this might be a better issue for fs2 itself 🤔

@TobiasRoland
Copy link
Author

Closing, moving over to the proper issue tracker

@TobiasRoland TobiasRoland closed this as not planned Won't fix, can't repro, duplicate, stale Nov 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

1 participant