Skip to content

Commit

Permalink
fix(throttleTime): ensure the spacing between throttles is always at …
Browse files Browse the repository at this point in the history
…least the throttled amount

Works to align the behavior with expectations set by lodash's throttle

- Updates tests
- Ensures trailing throttle will wait to notify and then complete
- Ensures that every time we emit a value a new throttle period starts

fixes ReactiveX#3712
related ReactiveX#4864
fixes ReactiveX#2727
closes ReactiveX#4727
related ReactiveX#4429
  • Loading branch information
benlesh committed Sep 3, 2020
1 parent 02c3a1b commit 6481175
Show file tree
Hide file tree
Showing 2 changed files with 250 additions and 200 deletions.
338 changes: 196 additions & 142 deletions spec/operators/throttleTime-spec.ts
Original file line number Diff line number Diff line change
@@ -1,200 +1,254 @@
/** @prettier */
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions, time } from '../helpers/marble-testing';
import { throttleTime, take, map, mergeMap } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { of, concat, timer } from 'rxjs';

declare const rxTestScheduler: TestScheduler;
import { observableMatcher } from '../helpers/observableMatcher';

/** @test {throttleTime} */
describe('throttleTime operator', () => {
it('should immediately emit the first value in each time window', () => {
const e1 = hot('-a-x-y----b---x-cx---|');
const subs = '^ !';
const expected = '-a--------b-----c----|';

const result = e1.pipe(throttleTime(50, rxTestScheduler));
let rxTest: TestScheduler;

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
beforeEach(() => {
rxTest = new TestScheduler(observableMatcher);
});

it('should throttle events by 50 time units', (done: MochaDone) => {
of(1, 2, 3).pipe(throttleTime(50))
.subscribe((x: number) => {
expect(x).to.equal(1);
}, null, done);
});
describe('defailt behavior { leading: true, trailing: false }', () => {
it('should immediately emit the first value in each time window', () => {
rxTest.run(({ hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' -a-x-y----b---x-cx---|');
// ----| ----| ----|
const expected = '-a--------b-----c----|';
const subs = ' ^--------------------!';

it('should throttle events multiple times', () => {
const expected = ['1-0', '2-0'];
concat(
timer(0, 10, rxTestScheduler).pipe(take(3), map((x: number) => '1-' + x)),
timer(80, 10, rxTestScheduler).pipe(take(5), map((x: number) => '2-' + x))
).pipe(
throttleTime(50, rxTestScheduler)
).subscribe((x: string) => {
expect(x).to.equal(expected.shift());
});
const result = e1.pipe(throttleTime(5, rxTest));

rxTestScheduler.flush();
});
expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
});

it('should simply mirror the source if values are not emitted often enough', () => {
const e1 = hot('-a--------b-----c----|');
const subs = '^ !';
const expected = '-a--------b-----c----|';
it('should throttle events by 5 time units', (done: MochaDone) => {
of(1, 2, 3)
.pipe(throttleTime(5))
.subscribe(
(x: number) => {
expect(x).to.equal(1);
},
null,
done
);
});

expectObservable(e1.pipe(throttleTime(50, rxTestScheduler))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
it('should throttle events multiple times', () => {
const expected = ['1-0', '2-0'];
concat(
timer(0, 1, rxTest).pipe(
take(3),
map((x: number) => '1-' + x)
),
timer(8, 1, rxTest).pipe(
take(5),
map((x: number) => '2-' + x)
)
)
.pipe(throttleTime(5, rxTest))
.subscribe((x: string) => {
expect(x).to.equal(expected.shift());
});

rxTest.flush();
});

it('should handle a busy producer emitting a regular repeating sequence', () => {
const e1 = hot('abcdefabcdefabcdefabcdefa|');
const subs = '^ !';
const expected = 'a-----a-----a-----a-----a|';
it('should simply mirror the source if values are not emitted often enough', () => {
rxTest.run(({ hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' -a--------b-----c----|');
const subs = ' ^--------------------!';
const expected = '-a--------b-----c----|';

expectObservable(e1.pipe(throttleTime(50, rxTestScheduler))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
expectObservable(e1.pipe(throttleTime(5, rxTest))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
});

it('should complete when source does not emit', () => {
const e1 = hot('-----|');
const subs = '^ !';
const expected = '-----|';
it('should handle a busy producer emitting a regular repeating sequence', () => {
rxTest.run(({ hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' abcdefabcdefabcdefabcdefa|');
const subs = ' ^------------------------!';
const expected = 'a-----a-----a-----a-----a|';

expectObservable(e1.pipe(throttleTime(50, rxTestScheduler))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
expectObservable(e1.pipe(throttleTime(5, rxTest))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
});

it('should raise error when source does not emit and raises error', () => {
const e1 = hot('-----#');
const subs = '^ !';
const expected = '-----#';
it('should complete when source does not emit', () => {
rxTest.run(({ hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' -----|');
const subs = ' ^----!';
const expected = '-----|';

expectObservable(e1.pipe(throttleTime(10, rxTestScheduler))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
expectObservable(e1.pipe(throttleTime(5, rxTest))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
});

it('should handle an empty source', () => {
const e1 = cold('|');
const subs = '(^!)';
const expected = '|';
it('should raise error when source does not emit and raises error', () => {
rxTest.run(({ hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' -----#');
const subs = ' ^----!';
const expected = '-----#';

expectObservable(e1.pipe(throttleTime(30, rxTestScheduler))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
expectObservable(e1.pipe(throttleTime(10, rxTest))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
});

it('should handle a never source', () => {
const e1 = cold('-');
const subs = '^';
const expected = '-';
it('should handle an empty source', () => {
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const e1 = cold(' |');
const subs = ' (^!)';
const expected = '|';

expectObservable(e1.pipe(throttleTime(30, rxTestScheduler))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
expectObservable(e1.pipe(throttleTime(30, rxTest))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
});

it('should handle a throw source', () => {
const e1 = cold('#');
const subs = '(^!)';
const expected = '#';
it('should handle a never source', () => {
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const e1 = cold(' -');
const subs = ' ^';
const expected = '-';

expectObservable(e1.pipe(throttleTime(30, rxTestScheduler))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
expectObservable(e1.pipe(throttleTime(30, rxTest))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
});

it('should throttle and does not complete when source does not completes', () => {
const e1 = hot('-a--(bc)-------d----------------');
const unsub = ' !';
const subs = '^ !';
const expected = '-a-------------d----------------';
it('should handle a throw source', () => {
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const e1 = cold(' #');
const subs = ' (^!)';
const expected = '#';

expectObservable(e1.pipe(throttleTime(50, rxTestScheduler)), unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
expectObservable(e1.pipe(throttleTime(30, rxTest))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
});

it('should not break unsubscription chains when result is unsubscribed explicitly', () => {
const e1 = hot('-a--(bc)-------d----------------');
const subs = '^ !';
const expected = '-a-------------d----------------';
const unsub = ' !';
it('should throttle and does not complete when source does not completes', () => {
rxTest.run(({ hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' -a--(bc)-------d----------------');
const unsub = ' -------------------------------!';
const subs = ' ^------------------------------!';
const expected = '-a-------------d----------------';

const result = e1.pipe(
mergeMap((x: string) => of(x)),
throttleTime(50, rxTestScheduler),
mergeMap((x: string) => of(x))
);
expectObservable(e1.pipe(throttleTime(5, rxTest)), unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
});

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
it('should not break unsubscription chains when result is unsubscribed explicitly', () => {
rxTest.run(({ hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' -a--(bc)-------d----------------');
const subs = ' ^------------------------------!';
const expected = '-a-------------d----------------';
const unsub = ' -------------------------------!';

const result = e1.pipe(
mergeMap((x: string) => of(x)),
throttleTime(5, rxTest),
mergeMap((x: string) => of(x))
);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
});

it('should throttle values until source raises error', () => {
const e1 = hot('-a--(bc)-------d---------------#');
const subs = '^ !';
const expected = '-a-------------d---------------#';
it('should throttle values until source raises error', () => {
rxTest.run(({ hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' -a--(bc)-------d---------------#');
const subs = ' ^------------------------------!';
const expected = '-a-------------d---------------#';

expectObservable(e1.pipe(throttleTime(50, rxTestScheduler))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
expectObservable(e1.pipe(throttleTime(5, rxTest))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
});
});

describe('throttleTime(fn, { leading: true, trailing: true })', () => {
it('should immediately emit the first and last values in each time window', () => {
const e1 = hot('-a-xy-----b--x--cxxx--|');
const e1subs = '^ !';
const t = time( '----| ');
const expected = '-a---y----b---x-c---x-|';

const result = e1.pipe(throttleTime(t, rxTestScheduler, { leading: true, trailing: true }));

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
it('should immediately emit the first and last values in each time window', () => {
rxTest.run(({ hot, time, expectObservable, expectSubscriptions }) => {
const e1 = hot(' -a-xy-----b--x--cxxx--|');
const e1subs = ' ^---------------------!';
const t = time(' ----| ');
// ----|----|---|---|
const expected = '-a---y----b---x---x---(x|)';

const result = e1.pipe(throttleTime(t, rxTest, { leading: true, trailing: true }));

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});

it('should emit the value if only a single one is given', () => {
const e1 = hot('-a--------------------|');
const t = time('----| ');
const expected = '-a--------------------|';
rxTest.run(({ hot, time, expectObservable }) => {
const e1 = hot(' -a--------------------|');
const t = time(' ----| ');
const expected = '-a--------------------|';

const result = e1.pipe(throttleTime(t, rxTestScheduler, { leading: true, trailing: true }));
const result = e1.pipe(throttleTime(t, rxTest, { leading: true, trailing: true }));

expectObservable(result).toBe(expected);
expectObservable(result).toBe(expected);
});
});
});

describe('throttleTime(fn, { leading: false, trailing: true })', () => {
it('should immediately emit the last value in each time window', () => {
const e1 = hot('-a-xy-----b--x--cxxx--|');
const e1subs = '^ !';
const t = time( '----| ');
const expected = '-----y--------x-----x-|';

const result = e1.pipe(throttleTime(t, rxTestScheduler, { leading: false, trailing: true }));

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
it('should immediately emit the last value in each time window', () => {
rxTest.run(({ hot, time, expectObservable, expectSubscriptions }) => {
const e1 = hot(' -a-xy-----b--x--cxxx--|');
const e1subs = ' ^---------------------!';
const t = time(' ----| ');
// ----|---|----|---|---|
const expected = '-----y--------x---x---(x|)';

const result = e1.pipe(throttleTime(t, rxTest, { leading: false, trailing: true }));

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});

it('should emit the last throttled value when complete', () => {
const e1 = hot('-a-xy-----b--x--cxx|');
const e1subs = '^ !';
const t = time('----| ');
const expected = '-----y--------x----(x|)';
rxTest.run(({ hot, time, expectObservable, expectSubscriptions }) => {
const e1 = hot(' -a-xy-----b--x--cxx|');
const e1subs = ' ^------------------!';
const t = time(' ----| ');
// ----|---|----|---|---|
const expected = '-----y--------x---x---|';

const result = e1.pipe(throttleTime(t, rxTestScheduler, { leading: false, trailing: true }));
const result = e1.pipe(throttleTime(t, rxTest, { leading: false, trailing: true }));

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});

it('should emit the value if only a single one is given', () => {
const e1 = hot('-a--------------------|');
const t = time('----| ');
const expected = '-----a----------------|';
rxTest.run(({ hot, time, expectObservable }) => {
const e1 = hot(' -a--------------------|');
const t = time(' ----| ');
const expected = '-----a----------------|';

const result = e1.pipe(throttleTime(t, rxTestScheduler, { leading: false, trailing: true }));
const result = e1.pipe(throttleTime(t, rxTest, { leading: false, trailing: true }));

expectObservable(result).toBe(expected);
expectObservable(result).toBe(expected);
});
});
});
});
Loading

0 comments on commit 6481175

Please sign in to comment.