Skip to content

Commit

Permalink
fix(throttle): don't signal on complete
Browse files Browse the repository at this point in the history
BREAKING CHANGE: the observable returned by the throttle operator's
duration selector must emit a next notification to end the duration.
Complete notifications no longer end the duration.
  • Loading branch information
cartant authored and benlesh committed Nov 3, 2020
1 parent 95e0b70 commit 4af0227
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 27 deletions.
60 changes: 36 additions & 24 deletions spec/operators/throttle-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ describe('throttle operator', () => {
it('should immediately emit the first value in each time window', () => {
const e1 = hot('-a-xy-----b--x--cxxx-|');
const e1subs = '^ !';
const e2 = cold( '----| ');
const e2 = cold( '----x ');
const e2subs = [' ^ ! ',
' ^ ! ',
' ^ ! '];
Expand All @@ -24,7 +24,7 @@ describe('throttle operator', () => {
it('should simply mirror the source if values are not emitted often enough', () => {
const e1 = hot('-a--------b-----c----|');
const e1subs = '^ !';
const e2 = cold( '----| ');
const e2 = cold( '----x ');
const e2subs = [' ^ ! ',
' ^ ! ',
' ^ ! '];
Expand Down Expand Up @@ -57,7 +57,7 @@ describe('throttle operator', () => {
const e1 = hot('-a-x-y-z-xyz-x-y-z----b--x-x-|');
const unsub = ' ! ';
const e1subs = '^ ! ';
const e2 = cold( '------------------| ');
const e2 = cold( '------------------x ');
const e2subs = ' ^ ! ';
const expected = '-a------------- ';

Expand All @@ -71,7 +71,7 @@ describe('throttle operator', () => {
it('should not break unsubscription chains when result is unsubscribed explicitly', () => {
const e1 = hot('-a-x-y-z-xyz-x-y-z----b--x-x-|');
const e1subs = '^ ! ';
const e2 = cold( '------------------| ');
const e2 = cold( '------------------x ');
const e2subs = ' ^ ! ';
const expected = '-a------------- ';
const unsub = ' ! ';
Expand All @@ -90,7 +90,7 @@ describe('throttle operator', () => {
it('should handle a busy producer emitting a regular repeating sequence', () => {
const e1 = hot('abcdefabcdefabcdefabcdefa|');
const e1subs = '^ !';
const e2 = cold('-----| ');
const e2 = cold('-----x ');
const e2subs = ['^ ! ',
' ^ ! ',
' ^ ! ',
Expand All @@ -105,7 +105,19 @@ describe('throttle operator', () => {
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should mirror source if durations are always empty', () => {
it('should mirror source if durations are immediate', () => {
const e1 = hot('abcdefabcdefabcdefabcdefa|');
const e1subs = '^ !';
const e2 = cold('x');
const expected = 'abcdefabcdefabcdefabcdefa|';

const result = e1.pipe(throttle(() => e2));

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should mirror source if durations are empty', () => {
const e1 = hot('abcdefabcdefabcdefabcdefa|');
const e1subs = '^ !';
const e2 = cold('|');
Expand Down Expand Up @@ -162,11 +174,11 @@ describe('throttle operator', () => {
it('should throttle using durations of constying lengths', () => {
const e1 = hot('abcdefabcdabcdefghabca| ');
const e1subs = '^ ! ';
const e2 = [cold('-----| '),
cold( '---| '),
cold( '-------| '),
cold( '--| '),
cold( '----|')];
const e2 = [cold('-----x '),
cold( '---x '),
cold( '-------x '),
cold( '--x '),
cold( '----x')];
const e2subs = ['^ ! ',
' ^ ! ',
' ^ ! ',
Expand All @@ -187,8 +199,8 @@ describe('throttle operator', () => {
it('should propagate error from duration Observable', () => {
const e1 = hot('abcdefabcdabcdefghabca| ');
const e1subs = '^ ! ';
const e2 = [cold('-----| '),
cold( '---| '),
const e2 = [cold('-----x '),
cold( '---x '),
cold( '-------# ')];
const e2subs = ['^ ! ',
' ^ ! ',
Expand All @@ -208,7 +220,7 @@ describe('throttle operator', () => {
it('should propagate error thrown from durationSelector function', () => {
const s1 = hot('--^--x--x--x--x--x--x--e--x--x--x--|');
const s1Subs = '^ !';
const n1 = cold( '----|');
const n1 = cold( '----x');
const n1Subs = [' ^ ! ',
' ^ ! ',
' ^ ! '];
Expand Down Expand Up @@ -332,7 +344,7 @@ describe('throttle operator', () => {
it('should immediately emit the first value in each time window', () => {
const e1 = hot('-a-xy-----b--x--cxxx------|');
const e1subs = '^ !';
const e2 = cold( '----| ');
const e2 = cold( '----x ');
const e2subs = [' ^ ! ',
' ^ ! ',
' ^ ! ',
Expand All @@ -351,7 +363,7 @@ describe('throttle operator', () => {
it('should work for individual values', () => {
const s1 = hot('-^-x------------------|');
const s1Subs = '^ !';
const n1 = cold( '------------------------|');
const n1 = cold( '------------------------x');
const n1Subs = [' ^ !'];
const exp = '--x------------------|';

Expand All @@ -364,7 +376,7 @@ describe('throttle operator', () => {
it('should emit trailing value after throttle duration when source completes', () => {
const e1 = hot('-a--------xy| ');
const e1subs = '^ ! ';
const e2 = cold( '----| ');
const e2 = cold( '----x ');
const e2subs = [' ^ ! ',
' ^ !'];
const expected = '-a--------x---(y|)';
Expand All @@ -381,7 +393,7 @@ describe('throttle operator', () => {
it('should immediately emit the first value in each time window', () => {
const e1 = hot('-a-xy-----b--x--cxxx------|');
const e1subs = '^ !';
const e2 = cold( '----| ');
const e2 = cold( '----x ');
const e2subs = [' ^ ! ',
' ^ ! ',
' ^ ! ',
Expand All @@ -400,7 +412,7 @@ describe('throttle operator', () => {
it('should work for individual values', () => {
const s1 = hot('-^-x------------------|');
const s1Subs = '^ !';
const n1 = cold( '------------------------|');
const n1 = cold( '------------------------x');
const n1Subs = [' ^ !'];
const exp = '--x------------------|';

Expand All @@ -410,10 +422,10 @@ describe('throttle operator', () => {
expectSubscriptions(n1.subscriptions).toBe(n1Subs);
});

it('should wait for trailing throttle to complete before completing, even if source completes', () => {
it('should wait for trailing throttle before completing, even if source completes', () => {
const source = hot( '-^--x--------y---------|');
const sourceSubs = '^ !';
const duration = cold( '------------------------|');
const duration = cold( '------------------------x');
const durationSubs = ' ^ !';
const exp = '---x-----------------------(y|)';

Expand All @@ -426,7 +438,7 @@ describe('throttle operator', () => {
it('should emit trailing value after throttle duration when source completes', () => {
const e1 = hot('-a--------x| ');
const e1subs = '^ ! ';
const e2 = cold( '----| ');
const e2 = cold( '----x ');
const e2subs = [' ^ ! ',
' ^ ! ',
' ^ !'];
Expand All @@ -442,7 +454,7 @@ describe('throttle operator', () => {
it('should emit the last trailing value after throttle duration when source completes', () => {
const e1 = hot('-a--------xy| ');
const e1subs = '^ ! ';
const e2 = cold( '----| ');
const e2 = cold( '----x ');
const e2subs = [' ^ ! ',
' ^ ! ',
' ^ !'];
Expand All @@ -458,7 +470,7 @@ describe('throttle operator', () => {
it('should complete when source completes if no value is available', () => {
const e1 = hot('-a-----|');
const e1subs = '^ !';
const e2 = cold( '----| ');
const e2 = cold( '----x ');
const e2subs = [' ^ ! ',
' ^ !'];
const expected = '-----a-|';
Expand Down
11 changes: 8 additions & 3 deletions src/internal/operators/throttle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export const defaultThrottleConfig: ThrottleConfig = {
* value arrives, it is forwarded to the output Observable, and then the timer
* is enabled by calling the `durationSelector` function with the source value,
* which returns the "duration" Observable. When the duration Observable emits a
* value or completes, the timer is disabled, and this process repeats for the
* value, the timer is disabled, and this process repeats for the
* next source value.
*
* ## Example
Expand Down Expand Up @@ -70,7 +70,7 @@ export function throttle<T>(
let throttled: Subscription | null = null;
let isComplete = false;

const throttlingDone = () => {
const endThrottling = () => {
throttled?.unsubscribe();
throttled = null;
if (trailing) {
Expand All @@ -79,9 +79,14 @@ export function throttle<T>(
}
};

const cleanupThrottling = () => {
throttled = null;
isComplete && subscriber.complete();
};

const startThrottle = (value: T) =>
(throttled = innerFrom(durationSelector(value)).subscribe(
new OperatorSubscriber(subscriber, throttlingDone, undefined, throttlingDone)
new OperatorSubscriber(subscriber, endThrottling, undefined, cleanupThrottling)
));

const send = () => {
Expand Down

0 comments on commit 4af0227

Please sign in to comment.