From eef7fb1362d2a47a54a1767b50f52a897fdaeffc Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sun, 25 Oct 2020 13:43:41 +1000 Subject: [PATCH 1/7] fix(audit): don't signal on complete BREAKING CHANGE: the observable returned by the audit operator's duration selector must emit a next notification to end the duration. Complete notifications no longer end the duration. --- spec/operators/audit-spec.ts | 38 ++++++++++++++++----------------- src/internal/operators/audit.ts | 9 ++++++-- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/spec/operators/audit-spec.ts b/spec/operators/audit-spec.ts index 34a89687df..60a49d3edc 100644 --- a/spec/operators/audit-spec.ts +++ b/spec/operators/audit-spec.ts @@ -16,7 +16,7 @@ describe('audit operator', () => { testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => { const e1 = hot(' -a-xy-----b--x--cxxx-|'); const e1subs = ' ^--------------------!'; - const e2 = cold(' ----| '); + const e2 = cold(' ----x '); const e2subs = [ ' -^---! ', ' ----------^---! ', @@ -36,7 +36,7 @@ describe('audit operator', () => { testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => { const e1 = hot(' -a--------b-----c----|'); const e1subs = ' ^--------------------!'; - const e2 = cold(' ----| '); + const e2 = cold(' ----x '); const e2subs = [ ' -^---! ', ' ----------^---! ', @@ -122,7 +122,7 @@ describe('audit operator', () => { testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => { const e1 = hot(' abcdefabcdefabcdefabcdefa| '); const e1subs = ' ^------------------------! '; - const e2 = cold(' -----| '); + const e2 = cold(' -----x '); const e2subs = [ ' ^----! ', ' ------^----! ', @@ -140,11 +140,11 @@ describe('audit operator', () => { }); }); - it('should mirror source if durations are always empty', () => { + it('should mirror source if durations are immediate', () => { testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => { const e1 = hot(' abcdefabcdefabcdefabcdefa|'); const e1subs = ' ^------------------------!'; - const e2 = cold(' |'); + const e2 = cold(' x'); const expected = 'abcdefabcdefabcdefabcdefa|'; const result = e1.pipe(audit(() => e2)); @@ -154,12 +154,12 @@ describe('audit operator', () => { }); }); - it('should mirror source if durations are EMPTY', () => { + it('should emit no values if durations are EMPTY', () => { testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { const e1 = hot('abcdefabcdefabcdefabcdefa|'); const e1subs = '^------------------------!'; const e2 = EMPTY; - const expected = 'abcdefabcdefabcdefabcdefa|'; + const expected = '-------------------------|'; const result = e1.pipe(audit(() => e2)); @@ -235,11 +235,11 @@ describe('audit operator', () => { const e1 = hot(' abcdefabcdabcdefghabca| '); const e1subs = ' ^---------------------! '; const e2 = [ - cold(' -----| '), - cold(' ---| '), - cold(' -------| '), - cold(' --| '), - cold(' ----| ') + cold(' -----x '), + cold(' ---x '), + cold(' -------x '), + cold(' --x '), + cold(' ----x ') ]; const e2subs = [ ' ^----! ', @@ -266,8 +266,8 @@ describe('audit operator', () => { const e1 = hot(' abcdefabcdabcdefghabca|'); const e1subs = ' ^----------------! '; const e2 = [ - cold(' -----| '), - cold(' ---| '), + cold(' -----x '), + cold(' ---x '), cold(' -------# ') ]; const e2subs = [ @@ -293,13 +293,13 @@ describe('audit operator', () => { const e1 = hot('abcdefabcdabcdefghabca| '); const e1subs = '^---------! '; const e2 = [ - cold(' -----| '), - cold(' ---| '), - cold(' -------| ') + cold(' -----x '), + cold(' ---x '), + cold(' -------x ') ]; const e2subs = [ ' ^----! ', - ' ------^--! ' + ' ------^--! ' ]; const expected = '-----f---d# '; @@ -458,7 +458,7 @@ describe('audit operator', () => { testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => { const e1 = hot('-a--------xy| '); const e1subs = ' ^-----------! '; - const e2 = cold(' ----| '); + const e2 = cold(' ----x '); const e2subs = [ ' -^---! ', ' ----------^---!' diff --git a/src/internal/operators/audit.ts b/src/internal/operators/audit.ts index b1ec9c5bc0..5a91fd76bf 100644 --- a/src/internal/operators/audit.ts +++ b/src/internal/operators/audit.ts @@ -23,7 +23,7 @@ import { OperatorSubscriber } from './OperatorSubscriber'; * Initially, the timer is disabled. As soon as the first source value arrives, * the timer is enabled by calling the `durationSelector` function with the * source value, which returns the "duration" Observable. When the duration - * Observable emits a value or completes, the timer is disabled, then the most + * Observable emits a value, the timer is disabled, then the most * recent source value is emitted on the output Observable, and this process * repeats for the next source value. * @@ -69,6 +69,11 @@ export function audit(durationSelector: (value: T) => ObservableInput): isComplete && subscriber.complete(); }; + const cleanupDuration = () => { + durationSubscriber = null; + isComplete && subscriber.complete(); + }; + source.subscribe( new OperatorSubscriber( subscriber, @@ -77,7 +82,7 @@ export function audit(durationSelector: (value: T) => ObservableInput): lastValue = value; if (!durationSubscriber) { innerFrom(durationSelector(value)).subscribe( - (durationSubscriber = new OperatorSubscriber(subscriber, endDuration, undefined, endDuration)) + (durationSubscriber = new OperatorSubscriber(subscriber, endDuration, undefined, cleanupDuration)) ); } }, From 7c4189c405f7406b7ba8ef8ede0b6538f790dc2a Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sun, 25 Oct 2020 13:51:28 +1000 Subject: [PATCH 2/7] fix(sample): don't signal on complete BREAKING CHANGE: the sample operator's notifier observable must emit a next notification to effect a sample. Complete notifications no longer effect a sample. --- spec/operators/sample-spec.ts | 4 ++-- src/internal/operators/sample.ts | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/spec/operators/sample-spec.ts b/spec/operators/sample-spec.ts index 00bff3a107..03e15c32ca 100644 --- a/spec/operators/sample-spec.ts +++ b/spec/operators/sample-spec.ts @@ -56,12 +56,12 @@ describe('sample operator', () => { expectSubscriptions(e2.subscriptions).toBe(e2subs); }); - it('should sample when the notifier completes', () => { + it('should not sample when the notifier completes', () => { const e1 = hot('----a-^------b----------|'); const e1subs = '^ !'; const e2 = hot( '-----x-----|'); const e2subs = '^ !'; - const expected = '-----------b------|'; + const expected = '------------------|'; expectObservable(e1.pipe(sample(e2))).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); diff --git a/src/internal/operators/sample.ts b/src/internal/operators/sample.ts index a348e4e796..bb7df60166 100644 --- a/src/internal/operators/sample.ts +++ b/src/internal/operators/sample.ts @@ -1,8 +1,8 @@ /** @prettier */ import { Observable } from '../Observable'; - import { MonoTypeOperatorFunction } from '../types'; import { operate } from '../util/lift'; +import { noop } from '../util/noop'; import { OperatorSubscriber } from './OperatorSubscriber'; /** @@ -14,7 +14,7 @@ import { OperatorSubscriber } from './OperatorSubscriber'; * * ![](sample.png) * - * Whenever the `notifier` Observable emits a value or completes, `sample` + * Whenever the `notifier` Observable emits a value, `sample` * looks at the source Observable and emits whichever value it has most recently * emitted since the previous sampling, unless the source has not emitted * anything since the previous sampling. The `notifier` is subscribed to as soon @@ -41,7 +41,7 @@ import { OperatorSubscriber } from './OperatorSubscriber'; * source Observable. * @return {Observable} An Observable that emits the results of sampling the * values emitted by the source Observable whenever the notifier Observable - * emits value or completes. + * emits value. */ export function sample(notifier: Observable): MonoTypeOperatorFunction { return operate((source, subscriber) => { @@ -61,6 +61,6 @@ export function sample(notifier: Observable): MonoTypeOperatorFunction Date: Sun, 25 Oct 2020 14:11:05 +1000 Subject: [PATCH 3/7] fix(throttle): don't signal on complete BREAKING CHANGE: the observable returned by the throttle operator's duration selector must emit a next notification to end the duration. Complete notifications no longer end the duration. --- spec/operators/throttle-spec.ts | 60 ++++++++++++++++++------------ src/internal/operators/throttle.ts | 11 ++++-- 2 files changed, 44 insertions(+), 27 deletions(-) diff --git a/spec/operators/throttle-spec.ts b/spec/operators/throttle-spec.ts index e4d9f0e44f..a854abdc95 100644 --- a/spec/operators/throttle-spec.ts +++ b/spec/operators/throttle-spec.ts @@ -8,7 +8,7 @@ describe('throttle operator', () => { it('should immediately emit the first value in each time window', () => { const e1 = hot('-a-xy-----b--x--cxxx-|'); const e1subs = '^ !'; - const e2 = cold( '----| '); + const e2 = cold( '----x '); const e2subs = [' ^ ! ', ' ^ ! ', ' ^ ! ']; @@ -24,7 +24,7 @@ describe('throttle operator', () => { it('should simply mirror the source if values are not emitted often enough', () => { const e1 = hot('-a--------b-----c----|'); const e1subs = '^ !'; - const e2 = cold( '----| '); + const e2 = cold( '----x '); const e2subs = [' ^ ! ', ' ^ ! ', ' ^ ! ']; @@ -57,7 +57,7 @@ describe('throttle operator', () => { const e1 = hot('-a-x-y-z-xyz-x-y-z----b--x-x-|'); const unsub = ' ! '; const e1subs = '^ ! '; - const e2 = cold( '------------------| '); + const e2 = cold( '------------------x '); const e2subs = ' ^ ! '; const expected = '-a------------- '; @@ -71,7 +71,7 @@ describe('throttle operator', () => { it('should not break unsubscription chains when result is unsubscribed explicitly', () => { const e1 = hot('-a-x-y-z-xyz-x-y-z----b--x-x-|'); const e1subs = '^ ! '; - const e2 = cold( '------------------| '); + const e2 = cold( '------------------x '); const e2subs = ' ^ ! '; const expected = '-a------------- '; const unsub = ' ! '; @@ -90,7 +90,7 @@ describe('throttle operator', () => { it('should handle a busy producer emitting a regular repeating sequence', () => { const e1 = hot('abcdefabcdefabcdefabcdefa|'); const e1subs = '^ !'; - const e2 = cold('-----| '); + const e2 = cold('-----x '); const e2subs = ['^ ! ', ' ^ ! ', ' ^ ! ', @@ -105,7 +105,19 @@ describe('throttle operator', () => { expectSubscriptions(e2.subscriptions).toBe(e2subs); }); - it('should mirror source if durations are always empty', () => { + it('should mirror source if durations are immediate', () => { + const e1 = hot('abcdefabcdefabcdefabcdefa|'); + const e1subs = '^ !'; + const e2 = cold('x'); + const expected = 'abcdefabcdefabcdefabcdefa|'; + + const result = e1.pipe(throttle(() => e2)); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should mirror source if durations are empty', () => { const e1 = hot('abcdefabcdefabcdefabcdefa|'); const e1subs = '^ !'; const e2 = cold('|'); @@ -162,11 +174,11 @@ describe('throttle operator', () => { it('should throttle using durations of constying lengths', () => { const e1 = hot('abcdefabcdabcdefghabca| '); const e1subs = '^ ! '; - const e2 = [cold('-----| '), - cold( '---| '), - cold( '-------| '), - cold( '--| '), - cold( '----|')]; + const e2 = [cold('-----x '), + cold( '---x '), + cold( '-------x '), + cold( '--x '), + cold( '----x')]; const e2subs = ['^ ! ', ' ^ ! ', ' ^ ! ', @@ -187,8 +199,8 @@ describe('throttle operator', () => { it('should propagate error from duration Observable', () => { const e1 = hot('abcdefabcdabcdefghabca| '); const e1subs = '^ ! '; - const e2 = [cold('-----| '), - cold( '---| '), + const e2 = [cold('-----x '), + cold( '---x '), cold( '-------# ')]; const e2subs = ['^ ! ', ' ^ ! ', @@ -208,7 +220,7 @@ describe('throttle operator', () => { it('should propagate error thrown from durationSelector function', () => { const s1 = hot('--^--x--x--x--x--x--x--e--x--x--x--|'); const s1Subs = '^ !'; - const n1 = cold( '----|'); + const n1 = cold( '----x'); const n1Subs = [' ^ ! ', ' ^ ! ', ' ^ ! ']; @@ -332,7 +344,7 @@ describe('throttle operator', () => { it('should immediately emit the first value in each time window', () => { const e1 = hot('-a-xy-----b--x--cxxx------|'); const e1subs = '^ !'; - const e2 = cold( '----| '); + const e2 = cold( '----x '); const e2subs = [' ^ ! ', ' ^ ! ', ' ^ ! ', @@ -351,7 +363,7 @@ describe('throttle operator', () => { it('should work for individual values', () => { const s1 = hot('-^-x------------------|'); const s1Subs = '^ !'; - const n1 = cold( '------------------------|'); + const n1 = cold( '------------------------x'); const n1Subs = [' ^ !']; const exp = '--x------------------|'; @@ -364,7 +376,7 @@ describe('throttle operator', () => { it('should emit trailing value after throttle duration when source completes', () => { const e1 = hot('-a--------xy| '); const e1subs = '^ ! '; - const e2 = cold( '----| '); + const e2 = cold( '----x '); const e2subs = [' ^ ! ', ' ^ !']; const expected = '-a--------x---(y|)'; @@ -381,7 +393,7 @@ describe('throttle operator', () => { it('should immediately emit the first value in each time window', () => { const e1 = hot('-a-xy-----b--x--cxxx------|'); const e1subs = '^ !'; - const e2 = cold( '----| '); + const e2 = cold( '----x '); const e2subs = [' ^ ! ', ' ^ ! ', ' ^ ! ', @@ -400,7 +412,7 @@ describe('throttle operator', () => { it('should work for individual values', () => { const s1 = hot('-^-x------------------|'); const s1Subs = '^ !'; - const n1 = cold( '------------------------|'); + const n1 = cold( '------------------------x'); const n1Subs = [' ^ !']; const exp = '--x------------------|'; @@ -410,10 +422,10 @@ describe('throttle operator', () => { expectSubscriptions(n1.subscriptions).toBe(n1Subs); }); - it('should wait for trailing throttle to complete before completing, even if source completes', () => { + it('should wait for trailing throttle before completing, even if source completes', () => { const source = hot( '-^--x--------y---------|'); const sourceSubs = '^ !'; - const duration = cold( '------------------------|'); + const duration = cold( '------------------------x'); const durationSubs = ' ^ !'; const exp = '---x-----------------------(y|)'; @@ -426,7 +438,7 @@ describe('throttle operator', () => { it('should emit trailing value after throttle duration when source completes', () => { const e1 = hot('-a--------x| '); const e1subs = '^ ! '; - const e2 = cold( '----| '); + const e2 = cold( '----x '); const e2subs = [' ^ ! ', ' ^ ! ', ' ^ !']; @@ -442,7 +454,7 @@ describe('throttle operator', () => { it('should emit the last trailing value after throttle duration when source completes', () => { const e1 = hot('-a--------xy| '); const e1subs = '^ ! '; - const e2 = cold( '----| '); + const e2 = cold( '----x '); const e2subs = [' ^ ! ', ' ^ ! ', ' ^ !']; @@ -458,7 +470,7 @@ describe('throttle operator', () => { it('should complete when source completes if no value is available', () => { const e1 = hot('-a-----|'); const e1subs = '^ !'; - const e2 = cold( '----| '); + const e2 = cold( '----x '); const e2subs = [' ^ ! ', ' ^ !']; const expected = '-----a-|'; diff --git a/src/internal/operators/throttle.ts b/src/internal/operators/throttle.ts index 9dff66248e..d7c48c848d 100644 --- a/src/internal/operators/throttle.ts +++ b/src/internal/operators/throttle.ts @@ -32,7 +32,7 @@ export const defaultThrottleConfig: ThrottleConfig = { * value arrives, it is forwarded to the output Observable, and then the timer * is enabled by calling the `durationSelector` function with the source value, * which returns the "duration" Observable. When the duration Observable emits a - * value or completes, the timer is disabled, and this process repeats for the + * value, the timer is disabled, and this process repeats for the * next source value. * * ## Example @@ -70,7 +70,7 @@ export function throttle( let throttled: Subscription | null = null; let isComplete = false; - const throttlingDone = () => { + const endThrottling = () => { throttled?.unsubscribe(); throttled = null; if (trailing) { @@ -79,9 +79,14 @@ export function throttle( } }; + const cleanupThrottling = () => { + throttled = null; + isComplete && subscriber.complete(); + }; + const startThrottle = (value: T) => (throttled = innerFrom(durationSelector(value)).subscribe( - new OperatorSubscriber(subscriber, throttlingDone, undefined, throttlingDone) + new OperatorSubscriber(subscriber, endThrottling, undefined, cleanupThrottling) )); const send = () => { From 72468609efa3d5ad1d73c5bda4c974d62e0299a2 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sun, 25 Oct 2020 16:30:51 +1000 Subject: [PATCH 4/7] fix(debounce): don't signal on complete BREAKING CHANGE: the observable returned by the debounce operator's duration selector must emit a next notification to end the duration. Complete notifications no longer end the duration. --- spec/operators/debounce-spec.ts | 58 +++++++++++++++--------------- src/internal/operators/debounce.ts | 6 ++-- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/spec/operators/debounce-spec.ts b/spec/operators/debounce-spec.ts index c6ae33eebf..bb86033e55 100644 --- a/spec/operators/debounce-spec.ts +++ b/spec/operators/debounce-spec.ts @@ -20,7 +20,7 @@ describe('debounce', () => { it('should debounce values by a specified cold Observable', () => { testScheduler.run(({ cold, hot, expectObservable }) => { const e1 = hot(' -a--bc--d---|'); - const e2 = cold(' --| '); + const e2 = cold(' --x '); const expected = '---a---c--d-|'; const result = e1.pipe(debounce(() => e2)); @@ -364,11 +364,11 @@ describe('debounce', () => { }); }); - it('should mirror the source when given an empty selector Observable', () => { + it('should ignore all values except last, when given an empty selector Observable', () => { testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { - const e1 = hot(' --------a-x-yz---bxy---z--c--x--y--z|'); - const e1subs = ' ^-----------------------------------!'; - const expected = '--------a-x-yz---bxy---z--c--x--y--z|'; + const e1 = hot(' --------a-x-yz---bxy---z--c--x--y--z| '); + const e1subs = ' ^-----------------------------------! '; + const expected = '------------------------------------(z|)'; function selectorFunction(x: string) { return EMPTY; @@ -394,20 +394,20 @@ describe('debounce', () => { }); }); - it('should delay element by selector observable completes when it does not emits', () => { + it('should not delay by selector observable completes when it does not emits', () => { testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { - const e1 = hot(' --------a--------b--------c---------|'); - const e1subs = ' ^-----------------------------------!'; - const expected = '---------a---------b---------c------|'; + const e1 = hot(' --------a--------b--------c---------| '); + const e1subs = ' ^-----------------------------------! '; + const expected = '------------------------------------(c|)'; const selector = [ - cold(' -| '), - cold(' --| '), - cold(' ---| '), + cold(' -| '), + cold(' --| '), + cold(' ---| '), ]; const selectorSubs = [ - ' --------^! ', - ' -----------------^-! ', - ' --------------------------^--! ', + ' --------^! ', + ' -----------------^-! ', + ' --------------------------^--! ', ]; expectObservable(e1.pipe(debounce(() => selector.shift()!))).toBe(expected); @@ -418,24 +418,24 @@ describe('debounce', () => { }); }); - it('should debounce by selector observable completes when it does not emits', () => { + it('should not debounce by selector observable completes when it does not emits', () => { testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { - const e1 = hot(' ----a--b-c---------de-------------|'); - const e1subs = ' ^---------------------------------!'; - const expected = '-----a------c------------e--------|'; + const e1 = hot(' ----a--b-c---------de-------------| '); + const e1subs = ' ^---------------------------------! '; + const expected = '----------------------------------(e|)'; const selector = [ - cold(' -| '), - cold(' --| '), - cold(' ---| '), - cold(' ----| '), - cold(' -----| '), + cold(' -| '), + cold(' --| '), + cold(' ---| '), + cold(' ----| '), + cold(' -----| '), ]; const selectorSubs = [ - ' ----^! ', - ' -------^-! ', - ' ---------^--! ', - ' -------------------^! ', - ' --------------------^----! ', + ' ----^! ', + ' -------^-! ', + ' ---------^--! ', + ' -------------------^! ', + ' --------------------^----! ', ]; expectObservable(e1.pipe(debounce(() => selector.shift()!))).toBe(expected); diff --git a/src/internal/operators/debounce.ts b/src/internal/operators/debounce.ts index 4d6e5b23fb..c4bc0589af 100644 --- a/src/internal/operators/debounce.ts +++ b/src/internal/operators/debounce.ts @@ -1,8 +1,8 @@ /** @prettier */ import { Subscriber } from '../Subscriber'; import { MonoTypeOperatorFunction, ObservableInput } from '../types'; - import { operate } from '../util/lift'; +import { noop } from '../util/noop'; import { OperatorSubscriber } from './OperatorSubscriber'; import { innerFrom } from '../observable/from'; @@ -20,7 +20,7 @@ import { innerFrom } from '../observable/from'; * This operator keeps track of the most recent notification from the source * Observable, and spawns a duration Observable by calling the * `durationSelector` function. The notification is emitted only when the duration - * Observable emits a notification or completes, and if no other notification was emitted on + * Observable emits a next notification, and if no other notification was emitted on * the source Observable since the duration Observable was spawned. If a new * notification appears before the duration Observable emits, the previous notification will * not be emitted and a new duration is scheduled from `durationSelector` is scheduled. @@ -97,7 +97,7 @@ export function debounce(durationSelector: (value: T) => ObservableInput lastValue = value; // Capture our duration subscriber, so we can unsubscribe it when we're notified // and we're going to emit the value. - durationSubscriber = new OperatorSubscriber(subscriber, emit, undefined, emit); + durationSubscriber = new OperatorSubscriber(subscriber, emit, undefined, noop); // Subscribe to the duration. innerFrom(durationSelector(value)).subscribe(durationSubscriber); }, From 3bcdbb7f8847d0c409a956799dfc470976a2a702 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sun, 25 Oct 2020 16:54:21 +1000 Subject: [PATCH 5/7] fix(bufferWhen): don't signal on complete BREAKING CHANGE: the observable returned by the bufferWhen operator's closing selector must emit a next notification to close the buffer. Complete notifications no longer close the buffer. --- spec/operators/bufferWhen-spec.ts | 18 ++++++++---------- src/internal/operators/bufferWhen.ts | 4 ++-- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/spec/operators/bufferWhen-spec.ts b/spec/operators/bufferWhen-spec.ts index 0799ba2834..80f8df1cbd 100644 --- a/spec/operators/bufferWhen-spec.ts +++ b/spec/operators/bufferWhen-spec.ts @@ -87,7 +87,7 @@ describe('bufferWhen operator', () => { }); }); - it('should emit buffers using varying empty delayed closings', () => { + it('should not emit buffers using varying empty delayed closings', () => { testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => { const e1 = hot('--a--^---b---c---d---e---f---g---h------| '); const subs = ' ^----------------------------------! '; @@ -98,14 +98,12 @@ describe('bufferWhen operator', () => { ]; const closeSubs = [ ' ^--------------! ', - ' ---------------^---------! ', - ' -------------------------^---------! ' + ' ', + ' ', ]; - const expected = ' ---------------x---------y---------(z|)'; + const expected = ' -----------------------------------(x|)'; const values = { - x: ['b', 'c', 'd'], - y: ['e', 'f', 'g'], - z: ['h'] + x: ['b', 'c', 'd', 'e', 'f', 'g', 'h'] }; let i = 0; @@ -359,13 +357,13 @@ describe('bufferWhen operator', () => { }); }); - // bufferWhen is not supposed to handle a factory that returns always empty + // bufferWhen is not supposed to handle a factory that returns always sync // closing Observables, because doing such would constantly recreate a new // buffer in a synchronous infinite loop until the stack overflows. This also // happens with buffer in RxJS 4. - it('should NOT handle hot inner empty', (done: MochaDone) => { + it('should NOT handle synchronous inner', (done: MochaDone) => { const source = of(1, 2, 3, 4, 5, 6, 7, 8, 9); - const closing = EMPTY; + const closing = of(1); const TOO_MANY_INVOCATIONS = 30; source.pipe( diff --git a/src/internal/operators/bufferWhen.ts b/src/internal/operators/bufferWhen.ts index 734b4b820d..8dd4f18c76 100644 --- a/src/internal/operators/bufferWhen.ts +++ b/src/internal/operators/bufferWhen.ts @@ -2,6 +2,7 @@ import { Subscriber } from '../Subscriber'; import { ObservableInput, OperatorFunction } from '../types'; import { operate } from '../util/lift'; +import { noop } from '../util/noop'; import { OperatorSubscriber } from './OperatorSubscriber'; import { innerFrom } from '../observable/from'; @@ -68,8 +69,7 @@ export function bufferWhen(closingSelector: () => ObservableInput): Oper b && subscriber.next(b); // Get a new closing notifier and subscribe to it. - // TODO: We probably want to stop counting `completion` as a notification here. - innerFrom(closingSelector()).subscribe((closingSubscriber = new OperatorSubscriber(subscriber, openBuffer, undefined, openBuffer))); + innerFrom(closingSelector()).subscribe((closingSubscriber = new OperatorSubscriber(subscriber, openBuffer, undefined, noop))); }; // Start the first buffer. From 5558d2f67af5e0116f7c845f47c7a9f859f9f500 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Tue, 27 Oct 2020 10:43:07 +1000 Subject: [PATCH 6/7] fix(bufferToggle): don't signal on complete BREAKING CHANGE: the observable returned by the bufferToggle operator's closing selector must emit a next notification to close the buffer. Complete notifications no longer close the buffer. --- spec/operators/bufferToggle-spec.ts | 115 +++++++++++++------------ src/internal/operators/bufferToggle.ts | 7 +- 2 files changed, 64 insertions(+), 58 deletions(-) diff --git a/spec/operators/bufferToggle-spec.ts b/spec/operators/bufferToggle-spec.ts index f9933d8f43..8fb1766205 100644 --- a/spec/operators/bufferToggle-spec.ts +++ b/spec/operators/bufferToggle-spec.ts @@ -58,20 +58,20 @@ describe('bufferToggle operator', () => { it('should emit buffers using varying cold closings', () => { testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => { - const e1 = hot('--a--^---b---c---d---e---f---g---h------| '); - const e2 = cold(' --x-----------y--------z---| '); - const subs = ' ^----------------------------------! '; + const e1 = hot('--a--^---b---c---d---e---f---g---h------| '); + const e2 = cold(' --x-----------y--------z---| '); + const subs = ' ^----------------------------------! '; const closings = [ - cold(' ---------------s--| '), - cold(' ----(s|) '), - cold(' ---------------(s|)') + cold(' ---------------s--| '), + cold(' ----(s|) '), + cold(' ---------------(s|)') ]; const closeSubs = [ - ' --^--------------! ', - ' --------------^---! ', - ' -----------------------^-----------! ' + ' --^--------------! ', + ' --------------^---! ', + ' -----------------------^-----------! ' ]; - const expected = '-----------------ij----------------(k|) '; + const expected = ' -----------------ij----------------(k|) '; const values = { i: ['b', 'c', 'd', 'e'], j: ['e'], @@ -100,11 +100,11 @@ describe('bufferToggle operator', () => { sub: ' --^--------------! ' }, { - obs: hot(' -----3----4-------(s|) '), + obs: hot(' -----3----4-------(s|) '), sub: ' --------------^---! ' }, { - obs: hot(' -------3----4-------5----------------s| '), + obs: hot(' -------3----4-------5----------------s|'), sub: ' -----------------------^-----------! ' } ]; @@ -129,18 +129,18 @@ describe('bufferToggle operator', () => { it('should emit buffers using varying empty delayed closings', () => { testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => { - const e1 = hot('--a--^---b---c---d---e---f---g---h------| '); - const e2 = cold(' --x-----------y--------z---| '); - const subs = ' ^----------------------------------! '; + const e1 = hot('--a--^---b---c---d---e---f---g---h------| '); + const e2 = cold(' --x-----------y--------z---| '); + const subs = ' ^----------------------------------! '; const closings = [ - cold(' ---------------| '), - cold(' ----| '), - cold(' ---------------| ') + cold(' ---------------| '), + cold(' ----| '), + cold(' ---------------| ') ]; - const expected = ' -----------------ij----------------(k|)'; + const expected = ' -----------------------------------(ijk|)'; const values = { - i: ['b', 'c', 'd', 'e'], - j: ['e'], + i: ['b', 'c', 'd', 'e', 'f', 'g', 'h'], + j: ['e', 'f', 'g', 'h'], k: ['g', 'h'] }; @@ -158,9 +158,9 @@ describe('bufferToggle operator', () => { const subs = ' ^---------! '; const e2 = cold(' --x-----------y--------z---| '); const closings = [ - cold(' ---------------s--| '), - cold(' ----(s|) '), - cold(' ---------------(s|) ') + cold(' ---------------s--| '), + cold(' ----(s|) '), + cold(' ---------------(s|)') ]; const csub0 = ' --^-------! '; const expected = ' ----------- '; @@ -182,16 +182,16 @@ describe('bufferToggle operator', () => { it('should not break unsubscription chains when result is unsubscribed explicitly', () => { testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => { - const e1 = hot('--a--^---b---c---d---e---f---g---h------| '); - const subs = ' ^-----------------! '; - const e2 = cold(' --x-----------y--------z---| '); + const e1 = hot('--a--^---b---c---d---e---f---g---h------| '); + const subs = ' ^-----------------! '; + const e2 = cold(' --x-----------y--------z---| '); const closings = [ - cold(' ---------------s--| '), - cold(' ----(s|) '), - cold(' ---------------(s|)') + cold(' ---------------s--| '), + cold(' ----(s|) '), + cold(' ---------------(s|)') ]; - const expected = ' -----------------i- '; - const unsub = ' ------------------! '; + const expected = ' -----------------i- '; + const unsub = ' ------------------! '; const values = { i: ['b', 'c', 'd', 'e'] }; @@ -210,16 +210,16 @@ describe('bufferToggle operator', () => { it('should propagate error thrown from closingSelector', () => { testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => { - const e1 = hot('--a--^---b---c---d---e---f---g---h------| '); - const e2 = cold(' --x-----------y--------z---| '); - const subs = ' ^-------------! '; + const e1 = hot('--a--^---b---c---d---e---f---g---h------| '); + const e2 = cold(' --x-----------y--------z---| '); + const subs = ' ^-------------! '; const closings = [ - cold(' ---------------s--| '), - cold(' ----(s|) '), - cold(' ---------------(s|)') + cold(' ---------------s--| '), + cold(' ----(s|) '), + cold(' ---------------(s|)') ]; - const closeSubs0 = '--^-----------! '; - const expected = '--------------# '; + const closeSubs0 = ' --^-----------! '; + const expected = ' --------------# '; let i = 0; const result = e1.pipe( @@ -245,10 +245,10 @@ describe('bufferToggle operator', () => { const e2 = cold(' --x-----------y--------z---| '); const subs = ' ^-------------! '; const closings = [ - cold(' ---------------s--| '), - cold(' # ') + cold(' ---------------s--| '), + cold(' # ') ]; - const closeSubs = [ + const closeSubs = [ ' --^-----------! ', ' --------------(^!) ' ]; @@ -270,8 +270,8 @@ describe('bufferToggle operator', () => { const e2 = cold(' --x-----------y--------z---| '); const subs = ' ^------------------! '; const closings = [ - cold(' ---------------s--| '), - cold(' -----# ') + cold(' ---------------s--| '), + cold(' -----# ') ]; const closeSubs = [ ' --^--------------! ', @@ -298,10 +298,10 @@ describe('bufferToggle operator', () => { const e2 = cold(' --x-----------y--------z---|'); const subs = ' ^------------------! '; const closings = [ - cold(' ---------------s--| '), - cold(' -------s| ') + cold(' ---------------s--| '), + cold(' -------s| ') ]; - const closeSubs = [ + const closeSubs = [ ' --^--------------! ', ' --------------^----! ' ]; @@ -353,6 +353,10 @@ describe('bufferToggle operator', () => { const e1 = hot(' -'); const e2 = cold(' --o-----o------o-----o---o-----|'); const e3 = cold(' --c-|'); + // --c-| + // --c-| + // --c-| + // --c-| const unsub = ' --------------------------------------------!'; const subs = ' ^-------------------------------------------!'; const expected = '----x-----x------x-----x---x-----------------'; @@ -500,14 +504,19 @@ describe('bufferToggle operator', () => { it('should handle empty closing observable', () => { testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => { - const e1 = hot('--a--^---b---c---d---e---f---g---h------|'); - const subs = ' ^----------------------------------!'; - const e2 = cold(' --x-----------y--------z---| '); - const expected = ' --l-----------m--------n-----------|'; + const e1 = hot('--a--^---b---c---d---e---f---g---h------| '); + const subs = ' ^----------------------------------! '; + const e2 = cold(' --x-----------y--------z---| '); + const expected = ' -----------------------------------(ijk|)'; + const values = { + i: ['b', 'c', 'd', 'e', 'f', 'g', 'h'], + j: ['e', 'f', 'g', 'h'], + k: ['g', 'h'] + }; const result = e1.pipe(bufferToggle(e2, () => EMPTY)); - expectObservable(result).toBe(expected, {l: [], m: [], n: []}); + expectObservable(result).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(subs); }); }); diff --git a/src/internal/operators/bufferToggle.ts b/src/internal/operators/bufferToggle.ts index 3ed839241c..452b211991 100644 --- a/src/internal/operators/bufferToggle.ts +++ b/src/internal/operators/bufferToggle.ts @@ -69,10 +69,7 @@ export function bufferToggle( // when the closing notifier emits, we can tear it down. const closingSubscription = new Subscription(); - // This is captured here, because we emit on both next or - // if the closing notifier completes without value. - // TODO: We probably want to not have closing notifiers emit!! - const emit = () => { + const emitBuffer = () => { arrRemove(buffers, buffer); subscriber.next(buffer); closingSubscription.unsubscribe(); @@ -80,7 +77,7 @@ export function bufferToggle( // The line below will add the subscription to the parent subscriber *and* the closing subscription. closingSubscription.add( - innerFrom(closingSelector(openValue)).subscribe(new OperatorSubscriber(subscriber, emit, undefined, emit)) + innerFrom(closingSelector(openValue)).subscribe(new OperatorSubscriber(subscriber, emitBuffer, undefined, noop)) ); }, undefined, From 8fcbc6c142156cb0746228954afd8a5ea7f1db7c Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Tue, 27 Oct 2020 11:17:39 +1000 Subject: [PATCH 7/7] fix(windowToggle): don't signal on complete BREAKING CHANGE: the observable returned by the windowToggle operator's closing selector must emit a next notification to close the window. Complete notifications no longer close the window. Closes #5838 --- spec/operators/windowToggle-spec.ts | 20 ++++++++++---------- src/internal/operators/windowToggle.ts | 4 ++-- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/spec/operators/windowToggle-spec.ts b/spec/operators/windowToggle-spec.ts index 4e0e3128ba..95e05ef63d 100644 --- a/spec/operators/windowToggle-spec.ts +++ b/spec/operators/windowToggle-spec.ts @@ -13,9 +13,9 @@ describe('windowToggle', () => { const subs = '^ !'; const e2 = cold( '----w--------w--------w--|'); const e2subs = '^ !'; - const e3 = cold( '-----| '); - // -----(c|) - // -----(c|) + const e3 = cold( '-----x '); + // -----x + // -----x const e3subs = [ ' ^ ! ', // eslint-disable-line array-bracket-spacing ' ^ ! ', ' ^ !']; @@ -121,17 +121,17 @@ describe('windowToggle', () => { expectSubscriptions(closings[2].obs.subscriptions).toBe(closings[2].sub); }); - it('should emit windows using constying empty delayed closings', () => { + it('should emit windows using varying empty delayed closings', () => { const e1 = hot('--a--^---b---c---d---e---f---g---h------| '); const e1subs = '^ ! '; const e2 = cold('--x-----------y--------z---| '); const e2subs = '^ ! '; const close = [cold( '---------------| '), - cold( '----| '), - cold( '---------------|')]; + cold( '----| '), + cold( '---------------|')]; const expected = '--x-----------y--------z-----------| '; - const x = cold( '--b---c---d---e| '); - const y = cold( '--e-| '); + const x = cold( '--b---c---d---e---f---g---h------| '); + const y = cold( '--e---f---g---h------| '); const z = cold( '-g---h------| '); const values = { x, y, z }; @@ -419,8 +419,8 @@ describe('windowToggle', () => { const e2subs = '^ ! '; const e3 = EMPTY; const expected = '---x---------------y---------------|'; - const x = cold( '|'); - const y = cold( '|'); + const x = cold( '-b---c---d---e---f---g---h------|'); + const y = cold( '-f---g---h------|'); const values = { x, y }; const result = e1.pipe(windowToggle(e2, () => e3)); diff --git a/src/internal/operators/windowToggle.ts b/src/internal/operators/windowToggle.ts index eb146b0494..ba2b57b88e 100644 --- a/src/internal/operators/windowToggle.ts +++ b/src/internal/operators/windowToggle.ts @@ -50,7 +50,7 @@ import { arrRemove } from '../util/arrRemove'; * windows. * @param {function(value: O): Observable} closingSelector A function that takes * the value emitted by the `openings` observable and returns an Observable, - * which, when it emits (either `next` or `complete`), signals that the + * which, when it emits a next notification, signals that the * associated window should complete. * @return {Observable>} An observable of windows, which in turn * are Observables. @@ -92,7 +92,7 @@ export function windowToggle( subscriber.next(window.asObservable()); - closingSubscription.add(closingNotifier.subscribe(new OperatorSubscriber(subscriber, closeWindow, handleError, closeWindow))); + closingSubscription.add(closingNotifier.subscribe(new OperatorSubscriber(subscriber, closeWindow, handleError, noop))); }, undefined, noop