@@ -52,10 +52,13 @@ class UpdateNotification {
5252 static Stream <UpdateNotification > throttleStream (
5353 Stream <UpdateNotification > input, Duration timeout,
5454 {UpdateNotification ? addOne}) {
55- return _throttleStream (input, timeout, addOne: addOne, throttleFirst: true ,
56- add: (a, b) {
57- return a.union (b);
58- });
55+ return _throttleStream (
56+ input: input,
57+ timeout: timeout,
58+ throttleFirst: true ,
59+ add: (a, b) => a.union (b),
60+ addOne: addOne,
61+ );
5962 }
6063
6164 /// Filter an update stream by specific tables.
@@ -67,62 +70,120 @@ class UpdateNotification {
6770 }
6871}
6972
70- /// Given a broadcast stream, return a singular throttled stream that is throttled.
71- /// This immediately starts listening .
73+ /// Throttles an [input] stream to not emit events more often than with a
74+ /// frequency of 1/ [timeout] .
7275///
73- /// Behaviour:
74- /// If there was no event in "timeout", and one comes in, it is pushed immediately.
75- /// Otherwise, we wait until the timeout is over.
76- Stream <T > _throttleStream <T extends Object >(Stream <T > input, Duration timeout,
77- {bool throttleFirst = false , T Function (T , T )? add, T ? addOne}) async * {
78- var nextPing = Completer <void >();
79- var done = false ;
80- T ? lastData;
81-
82- var listener = input.listen ((data) {
83- if (lastData != null && add != null ) {
84- lastData = add (lastData! , data);
85- } else {
86- lastData = data;
76+ /// When an event is received and no timeout window is active, it is forwarded
77+ /// downstream and a timeout window is started. For events received within a
78+ /// timeout window, [add] is called to fold events. Then when the window
79+ /// expires, pending events are emitted.
80+ /// The subscription to the [input] stream is never paused.
81+ ///
82+ /// When the returned stream is paused, an active timeout window is reset and
83+ /// restarts after the stream is resumed.
84+ ///
85+ /// If [addOne] is not null, that event will always be added when the stream is
86+ /// subscribed to.
87+ /// When [throttleFirst] is true, a timeout window begins immediately after
88+ /// listening (so that the first event, apart from [addOne] , is emitted no
89+ /// earlier than after [timeout] ).
90+ Stream <T > _throttleStream <T extends Object >({
91+ required Stream <T > input,
92+ required Duration timeout,
93+ required bool throttleFirst,
94+ required T Function (T , T ) add,
95+ required T ? addOne,
96+ }) {
97+ return Stream .multi ((listener) {
98+ T ? pendingData;
99+ Timer ? activeTimeoutWindow;
100+ var needsTimeoutWindowAfterResume = false ;
101+
102+ /// Add pending data, bypassing the active timeout window.
103+ ///
104+ /// This is used to forward error and done events immediately.
105+ bool addPendingEvents () {
106+ if (pendingData case final data? ) {
107+ pendingData = null ;
108+ listener.addSync (data);
109+ activeTimeoutWindow? .cancel ();
110+ activeTimeoutWindow = null ;
111+ return true ;
112+ } else {
113+ return false ;
114+ }
87115 }
88- if (! nextPing.isCompleted) {
89- nextPing.complete ();
116+
117+ late void Function () setTimeout;
118+
119+ /// Emits [pendingData] if no timeout window is active, and then starts a
120+ /// timeout window if necessary.
121+ void maybeEmit () {
122+ if (activeTimeoutWindow == null && ! listener.isPaused) {
123+ final didAdd = addPendingEvents ();
124+ if (didAdd) {
125+ // Schedule a pause after resume if the subscription was paused
126+ // directly in response to receiving the event. Otherwise, begin the
127+ // timeout window immediately.
128+ if (listener.isPaused) {
129+ needsTimeoutWindowAfterResume = true ;
130+ } else {
131+ setTimeout ();
132+ }
133+ }
134+ }
90135 }
91- }, onDone: () {
92- if (! nextPing.isCompleted) {
93- nextPing.complete ();
136+
137+ setTimeout = () {
138+ activeTimeoutWindow = Timer (timeout, () {
139+ activeTimeoutWindow = null ;
140+ maybeEmit ();
141+ });
142+ };
143+
144+ void onData (T data) {
145+ pendingData = switch (pendingData) {
146+ null => data,
147+ final pending => add (pending, data),
148+ };
149+ maybeEmit ();
94150 }
95151
96- done = true ;
97- });
152+ void onError (Object error, StackTrace trace) {
153+ addPendingEvents ();
154+ listener.addErrorSync (error, trace);
155+ }
156+
157+ void onDone () {
158+ addPendingEvents ();
159+ listener.closeSync ();
160+ }
161+
162+ final subscription = input.listen (onData, onError: onError, onDone: onDone);
163+
164+ listener.onPause = () {
165+ needsTimeoutWindowAfterResume = activeTimeoutWindow != null ;
166+ activeTimeoutWindow? .cancel ();
167+ activeTimeoutWindow = null ;
168+ };
169+ listener.onResume = () {
170+ if (needsTimeoutWindowAfterResume) {
171+ setTimeout ();
172+ } else {
173+ maybeEmit ();
174+ }
175+ };
176+ listener.onCancel = () async {
177+ activeTimeoutWindow? .cancel ();
178+ return subscription.cancel ();
179+ };
98180
99- try {
100181 if (addOne != null ) {
101- yield addOne;
182+ // This must not be sync, we're doing this directly in onListen
183+ listener.add (addOne);
102184 }
103185 if (throttleFirst) {
104- await Future .delayed (timeout);
105- }
106- while (! done) {
107- // If a value is available now, we'll use it immediately.
108- // If not, this waits for it.
109- await nextPing.future;
110- if (done) break ;
111-
112- // Capture any new values coming in while we wait.
113- nextPing = Completer <void >();
114- T data = lastData as T ;
115- // Clear before we yield, so that we capture new changes while yielding
116- lastData = null ;
117- yield data;
118- // Wait a minimum of this duration between tasks
119- await Future .delayed (timeout);
186+ setTimeout ();
120187 }
121- } finally {
122- if (lastData case final data? ) {
123- yield data;
124- }
125-
126- await listener.cancel ();
127- }
188+ });
128189}
0 commit comments