From bff18272dca23938a5f5b57cec6eb8d8be5bfddf Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 30 Jul 2020 10:28:14 -0500 Subject: [PATCH] refactor: Reduce memory consumption and simplify inner and outer subscription (#5610) * refactor: simplify inner and outer subscription - Reduces memory pressure by no longer capturing outer values where they are not needed. For example, every inner subscription in mergeMap was retaining the outer value, if the outer value was a large array, that could add up - Makes subscribeToResult less ridiculous, I am not sure what I was thinking there - Leaves a few operators with more complex inner subscriptions, to be refactored later. - Removes some redundant code that removed inner subscriptions manually, this was necessary in the early days of RxJS 5, but has not been necessary in a long time * chore: rename innies-and-outies :( * chore: rename interface to SimpleOuterSubscriberLike * refactor: don't retain unused outerValues in combineLatest and race --- spec/util/subscribeToResult-spec.ts | 197 -------------------- src/internal/InnerSubscriber.ts | 29 --- src/internal/OuterSubscriber.ts | 23 --- src/internal/innerSubscribe.ts | 114 +++++++++++ src/internal/observable/combineLatest.ts | 15 +- src/internal/observable/race.ts | 19 +- src/internal/observable/zip.ts | 20 +- src/internal/operators/audit.ts | 9 +- src/internal/operators/buffer.ts | 12 +- src/internal/operators/bufferToggle.ts | 18 +- src/internal/operators/bufferWhen.ts | 12 +- src/internal/operators/catchError.ts | 10 +- src/internal/operators/debounce.ts | 12 +- src/internal/operators/delayWhen.ts | 19 +- src/internal/operators/distinct.ts | 14 +- src/internal/operators/exhaust.ts | 21 +-- src/internal/operators/exhaustMap.ts | 58 +++--- src/internal/operators/expand.ts | 34 ++-- src/internal/operators/mergeMap.ts | 49 ++--- src/internal/operators/mergeScan.ts | 28 +-- src/internal/operators/onErrorResumeNext.ts | 17 +- src/internal/operators/repeatWhen.ts | 15 +- src/internal/operators/retryWhen.ts | 12 +- src/internal/operators/sample.ts | 12 +- src/internal/operators/skipUntil.ts | 21 +-- src/internal/operators/switchMap.ts | 33 ++-- src/internal/operators/takeUntil.ts | 18 +- src/internal/operators/throttle.ts | 23 +-- src/internal/operators/timeoutWith.ts | 15 +- src/internal/operators/window.ts | 16 +- src/internal/operators/windowToggle.ts | 14 +- src/internal/operators/windowWhen.ts | 21 +-- src/internal/operators/withLatestFrom.ts | 15 +- src/internal/util/subscribeToResult.ts | 37 ---- 34 files changed, 331 insertions(+), 651 deletions(-) delete mode 100644 spec/util/subscribeToResult-spec.ts delete mode 100644 src/internal/InnerSubscriber.ts delete mode 100644 src/internal/OuterSubscriber.ts create mode 100644 src/internal/innerSubscribe.ts delete mode 100644 src/internal/util/subscribeToResult.ts diff --git a/spec/util/subscribeToResult-spec.ts b/spec/util/subscribeToResult-spec.ts deleted file mode 100644 index d5661f87fa..0000000000 --- a/spec/util/subscribeToResult-spec.ts +++ /dev/null @@ -1,197 +0,0 @@ -import { expect } from 'chai'; -import { OuterSubscriber } from 'rxjs/internal/OuterSubscriber'; -import { subscribeToResult } from 'rxjs/internal/util/subscribeToResult'; -import { iterator } from 'rxjs/internal/symbol/iterator'; -import { observable as $$symbolObservable } from 'rxjs/internal/symbol/observable'; -import { of, range, throwError } from 'rxjs'; - -describe('subscribeToResult', () => { - it('should synchronously complete when subscribed to scalarObservable', () => { - const result = of(42); - let expected: number; - const subscriber = new OuterSubscriber((x) => expected = x); - - const subscription = subscribeToResult(subscriber, result); - - expect(expected!).to.be.equal(42); - expect(subscription!.closed).to.be.true; - }); - - it('should subscribe to observables that are an instanceof Observable', (done) => { - const expected = [1, 2, 3]; - const result = range(1, 3); - - const subscriber = new OuterSubscriber(x => { - expect(expected.shift()).to.be.equal(x); - }, () => { - done(new Error('should not be called')); - }, () => { - expect(expected).to.be.empty; - done(); - }); - - subscribeToResult(subscriber, result); - }); - - it('should emit error when observable emits error', (done) => { - const result = throwError(new Error('error')); - const subscriber = new OuterSubscriber(x => { - done(new Error('should not be called')); - }, (err) => { - expect(err).to.be.an('error', 'error'); - done(); - }, () => { - done(new Error('should not be called')); - }); - - subscribeToResult(subscriber, result); - }); - - it('should subscribe to an array and emit synchronously', () => { - const result = [1, 2, 3]; - const expected: number[] = []; - - const subscriber = new OuterSubscriber(x => expected.push(x)); - - subscribeToResult(subscriber, result); - - expect(expected).to.be.deep.equal(result); - }); - - it('should subscribe to an array-like and emit synchronously', () => { - const result = { 0: 0, 1: 1, 2: 2, length: 3 }; - const expected: number[] = []; - - const subscriber = new OuterSubscriber(x => expected.push(x)); - - subscribeToResult(subscriber, result); - - expect(expected).to.be.deep.equal([0, 1, 2]); - }); - - it('should subscribe to a promise', (done) => { - const result = Promise.resolve(42); - - const subscriber = new OuterSubscriber(x => { - expect(x).to.be.equal(42); - }, () => { - done(new Error('should not be called')); - }, done); - - subscribeToResult(subscriber, result); - }); - - it('should emits error when the promise rejects', (done) => { - const result = Promise.reject(42); - - const subscriber = new OuterSubscriber(x => { - done(new Error('should not be called')); - }, (x) => { - expect(x).to.be.equal(42); - done(); - }, () => { - done(new Error('should not be called')); - }); - - subscribeToResult(subscriber, result); - }); - - it('should subscribe an iterable and emit results synchronously', () => { - let expected: number; - const iteratorResults = [ - { value: 42, done: false }, - { done: true } - ]; - - const iterable = { - [iterator]: () => { - return { - next: () => { - return iteratorResults.shift(); - } - }; - } - }; - - const subscriber = new OuterSubscriber((x: number) => expected = x); - - subscribeToResult(subscriber, iterable); - expect(expected!).to.be.equal(42); - }); - - // NOTE: From https://github.com/ReactiveX/rxjs/issues/5436 - it('should pass along errors from an iterable', () => { - const generator = function* () { - yield 1; - yield 2; - yield 3; - throw 'bad'; - }; - - const results: any[] = []; - let foundError: any = null; - - const subscriber = new OuterSubscriber({ - next: x => results.push(x), - error: err => foundError = err - }); - - subscribeToResult(subscriber, generator()); - expect(results).to.deep.equal([1, 2, 3]); - expect(foundError).to.equal('bad'); - }); - - it('should subscribe to to an object that implements Symbol.observable', (done) => { - const observableSymbolObject = { [$$symbolObservable]: () => of(42) }; - - const subscriber = new OuterSubscriber(x => { - expect(x).to.be.equal(42); - }, () => { - done(new Error('should not be called')); - }, done); - - subscribeToResult(subscriber, observableSymbolObject); - }); - - it('should throw an error if value returned by Symbol.observable call is not ' + - 'a valid observable', () => { - const observableSymbolObject = { [$$symbolObservable]: () => ({}) }; - - const subscriber = new OuterSubscriber(x => { - throw new Error('should not be called'); - }, (x) => { - throw new Error('should not be called'); - }, () => { - throw new Error('should not be called'); - }); - - expect(() => subscribeToResult(subscriber, observableSymbolObject)) - .to.throw(TypeError, 'Provided object does not correctly implement Symbol.observable'); - }); - - it('should emit an error when trying to subscribe to an unknown type of object', () => { - const subscriber = new OuterSubscriber(x => { - throw new Error('should not be called'); - }, (x) => { - throw new Error('should not be called'); - }, () => { - throw new Error('should not be called'); - }); - - expect(() => subscribeToResult(subscriber, {})) - .to.throw(TypeError, 'You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.'); - }); - - it('should emit an error when trying to subscribe to a non-object', () => { - const subscriber = new OuterSubscriber(x => { - throw new Error('should not be called'); - }, (x) => { - throw new Error('should not be called'); - }, () => { - throw new Error('should not be called'); - }); - - expect(() => subscribeToResult(subscriber, null)) - .to.throw(TypeError, `You provided 'null' where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.`); - }); -}); diff --git a/src/internal/InnerSubscriber.ts b/src/internal/InnerSubscriber.ts deleted file mode 100644 index 048e9a3761..0000000000 --- a/src/internal/InnerSubscriber.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { Subscriber } from './Subscriber'; -import { OuterSubscriber } from './OuterSubscriber'; - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -export class InnerSubscriber extends Subscriber { - private index = 0; - - constructor(private parent: OuterSubscriber, public outerValue: T, public outerIndex: number) { - super(); - } - - protected _next(value: R): void { - this.parent.notifyNext(this.outerValue, value, this.outerIndex, this.index++, this); - } - - protected _error(error: any): void { - this.parent.notifyError(error, this); - this.unsubscribe(); - } - - protected _complete(): void { - this.parent.notifyComplete(this); - this.unsubscribe(); - } -} diff --git a/src/internal/OuterSubscriber.ts b/src/internal/OuterSubscriber.ts deleted file mode 100644 index b0513217de..0000000000 --- a/src/internal/OuterSubscriber.ts +++ /dev/null @@ -1,23 +0,0 @@ -import { Subscriber } from './Subscriber'; -import { InnerSubscriber } from './InnerSubscriber'; - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -export class OuterSubscriber extends Subscriber { - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - this.destination.next(innerValue); - } - - notifyError(error: any, innerSub: InnerSubscriber): void { - this.destination.error(error); - } - - notifyComplete(innerSub: InnerSubscriber): void { - this.destination.complete(); - } -} diff --git a/src/internal/innerSubscribe.ts b/src/internal/innerSubscribe.ts new file mode 100644 index 0000000000..53311c5241 --- /dev/null +++ b/src/internal/innerSubscribe.ts @@ -0,0 +1,114 @@ +/** @prettier */ +import { Subscription } from './Subscription'; +import { Subscriber } from './Subscriber'; +import { Observable } from './Observable'; +import { subscribeTo } from './util/subscribeTo'; + +interface SimpleOuterSubscriberLike { + /** + * A handler for inner next notifications from the inner subscription + * @param innerValue the value nexted by the inner producer + */ + notifyNext(innerValue: T): void; + /** + * A handler for inner error notifications from the inner subscription + * @param err the error from the inner producer + */ + notifyError(err: any): void; + /** + * A handler for inner complete notifications from the inner subscription. + */ + notifyComplete(): void; +} + +export class SimpleInnerSubscriber extends Subscriber { + constructor(private parent: SimpleOuterSubscriberLike) { + super(); + } + + protected _next(value: T): void { + this.parent.notifyNext(value); + } + + protected _error(error: any): void { + this.parent.notifyError(error); + this.unsubscribe(); + } + + protected _complete(): void { + this.parent.notifyComplete(); + this.unsubscribe(); + } +} + +export class ComplexInnerSubscriber extends Subscriber { + constructor(private parent: ComplexOuterSubscriber, public outerValue: T, public outerIndex: number) { + super(); + } + + protected _next(value: R): void { + this.parent.notifyNext(this.outerValue, value, this.outerIndex, this); + } + + protected _error(error: any): void { + this.parent.notifyError(error); + this.unsubscribe(); + } + + protected _complete(): void { + this.parent.notifyComplete(this); + this.unsubscribe(); + } +} + +export class SimpleOuterSubscriber extends Subscriber implements SimpleOuterSubscriberLike { + notifyNext(innerValue: R): void { + this.destination.next(innerValue); + } + + notifyError(err: any): void { + this.destination.error(err); + } + + notifyComplete(): void { + this.destination.complete(); + } +} + +/** + * DO NOT USE (formerly "OuterSubscriber") + * TODO: We want to refactor this and remove it. It is retaining values it shouldn't for long + * periods of time. + */ +export class ComplexOuterSubscriber extends Subscriber { + /** + * @param _outerValue Used by: bufferToggle, delayWhen, windowToggle + * @param innerValue Used by: subclass default, combineLatest, race, bufferToggle, windowToggle, withLatestFrom + * @param _outerIndex Used by: combineLatest, race, withLatestFrom + * @param _innerSub Used by: delayWhen + */ + notifyNext(_outerValue: T, innerValue: R, _outerIndex: number, _innerSub: ComplexInnerSubscriber): void { + this.destination.next(innerValue); + } + + notifyError(error: any): void { + this.destination.error(error); + } + + /** + * @param _innerSub Used by: race, bufferToggle, delayWhen, windowToggle, windowWhen + */ + notifyComplete(_innerSub: ComplexInnerSubscriber): void { + this.destination.complete(); + } +} + +export function innerSubscribe(result: any, innerSubscriber: Subscriber): Subscription | undefined { + if (innerSubscriber.closed) { + return undefined; + } + if (result instanceof Observable) { + return result.subscribe(innerSubscriber); + } + return subscribeTo(result)(innerSubscriber) as Subscription; +} diff --git a/src/internal/observable/combineLatest.ts b/src/internal/observable/combineLatest.ts index f03e9b2805..14841a636d 100644 --- a/src/internal/observable/combineLatest.ts +++ b/src/internal/observable/combineLatest.ts @@ -3,11 +3,9 @@ import { ObservableInput, SchedulerLike, ObservedValueOf } from '../types'; import { isScheduler } from '../util/isScheduler'; import { isArray } from '../util/isArray'; import { Subscriber } from '../Subscriber'; -import { OuterSubscriber } from '../OuterSubscriber'; +import { ComplexOuterSubscriber, ComplexInnerSubscriber, innerSubscribe } from '../innerSubscribe'; import { Operator } from '../Operator'; -import { InnerSubscriber } from '../InnerSubscriber'; import { isObject } from '../util/isObject'; -import { subscribeToResult } from '../util/subscribeToResult'; import { fromArray } from './fromArray'; import { lift } from '../util/lift'; @@ -284,7 +282,7 @@ export class CombineLatestOperator implements Operator { * @ignore * @extends {Ignored} */ -export class CombineLatestSubscriber extends OuterSubscriber { +export class CombineLatestSubscriber extends ComplexOuterSubscriber { private active: number = 0; private values: any[] = []; private observables: any[] = []; @@ -309,20 +307,19 @@ export class CombineLatestSubscriber extends OuterSubscriber { this.toRespond = len; for (let i = 0; i < len; i++) { const observable = observables[i]; - this.add(subscribeToResult(this, observable, observable, i)); + this.add(innerSubscribe(observable, new ComplexInnerSubscriber(this, null, i))); } } } - notifyComplete(unused: Subscriber): void { + notifyComplete(): void { if ((this.active -= 1) === 0) { this.destination.complete(); } } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(_outerValue: T, innerValue: R, + outerIndex: number): void { const values = this.values; const oldVal = values[outerIndex]; const toRespond = !this.toRespond diff --git a/src/internal/observable/race.ts b/src/internal/observable/race.ts index 70540181e5..df8c10d61d 100644 --- a/src/internal/observable/race.ts +++ b/src/internal/observable/race.ts @@ -6,9 +6,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; import { TeardownLogic, ObservableInput, ObservedValueUnionFromArray } from '../types'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; +import { ComplexOuterSubscriber, innerSubscribe, ComplexInnerSubscriber } from '../innerSubscribe'; import { lift } from '../util/lift'; export function race[]>(observables: A): Observable>; @@ -80,7 +78,7 @@ export class RaceOperator implements Operator { * @ignore * @extends {Ignored} */ -export class RaceSubscriber extends OuterSubscriber { +export class RaceSubscriber extends ComplexOuterSubscriber { private hasFirst: boolean = false; private observables: Observable[] = []; private subscriptions: Subscription[] = []; @@ -102,7 +100,7 @@ export class RaceSubscriber extends OuterSubscriber { } else { for (let i = 0; i < len && !this.hasFirst; i++) { let observable = observables[i]; - let subscription = subscribeToResult(this, observable, observable as any, i); + const subscription = innerSubscribe(observable, new ComplexInnerSubscriber(this, null, i)); if (this.subscriptions) { this.subscriptions.push(subscription!); @@ -113,9 +111,8 @@ export class RaceSubscriber extends OuterSubscriber { } } - notifyNext(outerValue: T, innerValue: T, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(_outerValue: T, innerValue: T, + outerIndex: number): void { if (!this.hasFirst) { this.hasFirst = true; @@ -134,13 +131,13 @@ export class RaceSubscriber extends OuterSubscriber { this.destination.next(innerValue); } - notifyComplete(innerSub: InnerSubscriber): void { + notifyComplete(innerSub: ComplexInnerSubscriber): void { this.hasFirst = true; super.notifyComplete(innerSub); } - notifyError(error: any, innerSub: InnerSubscriber): void { + notifyError(error: any): void { this.hasFirst = true; - super.notifyError(error, innerSub); + super.notifyError(error); } } diff --git a/src/internal/observable/zip.ts b/src/internal/observable/zip.ts index 938ac2fabb..41d4fbecd9 100644 --- a/src/internal/observable/zip.ts +++ b/src/internal/observable/zip.ts @@ -5,11 +5,9 @@ import { Operator } from '../Operator'; import { ObservableInput, PartialObserver, ObservedValueOf } from '../types'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { iterator as Symbol_iterator } from '../../internal/symbol/iterator'; import { lift } from '../util/lift'; +import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe'; /* tslint:disable:max-line-length */ /** @deprecated resultSelector is no longer supported, pipe to map instead */ @@ -108,7 +106,6 @@ export class ZipOperator implements Operator { * @extends {Ignored} */ export class ZipSubscriber extends Subscriber { - private values: any; private resultSelector?: (...values: Array) => R; private iterators: LookAheadIterator[] = []; private active = 0; @@ -118,7 +115,6 @@ export class ZipSubscriber extends Subscriber { values: any = Object.create(null)) { super(destination); this.resultSelector = resultSelector; - this.values = values; } protected _next(value: any) { @@ -148,7 +144,7 @@ export class ZipSubscriber extends Subscriber { let iterator: ZipBufferIterator = iterators[i]; if (iterator.stillUnsubscribed) { const destination = this.destination as Subscription; - destination.add(iterator.subscribe(iterator, i)); + destination.add(iterator.subscribe()); } else { this.active--; // not an observable } @@ -258,7 +254,7 @@ class StaticArrayIterator implements LookAheadIterator { return this; } - next(value?: any): IteratorResult { + next(): IteratorResult { const i = this.index++; const array = this.array; return i < this.length ? { value: array[i], done: false } : { value: null, done: true }; @@ -278,7 +274,7 @@ class StaticArrayIterator implements LookAheadIterator { * @ignore * @extends {Ignored} */ -class ZipBufferIterator extends OuterSubscriber implements LookAheadIterator { +class ZipBufferIterator extends SimpleOuterSubscriber implements LookAheadIterator { stillUnsubscribed = true; buffer: T[] = []; isComplete = false; @@ -321,14 +317,12 @@ class ZipBufferIterator extends OuterSubscriber implements LookAhead } } - notifyNext(outerValue: T, innerValue: any, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(innerValue: any): void { this.buffer.push(innerValue); this.parent.checkIterators(); } - subscribe(value: any, index: number) { - return subscribeToResult(this, this.observable, this, index); + subscribe() { + return innerSubscribe(this.observable, new SimpleInnerSubscriber(this)); } } diff --git a/src/internal/operators/audit.ts b/src/internal/operators/audit.ts index 704d4d5047..f48d97634b 100644 --- a/src/internal/operators/audit.ts +++ b/src/internal/operators/audit.ts @@ -4,9 +4,8 @@ import { Observable } from '../Observable'; import { Subscription } from '../Subscription'; import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from '../types'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { lift } from '../util/lift'; +import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe'; /** * Ignores source values for a duration determined by another Observable, then @@ -73,7 +72,7 @@ class AuditOperator implements Operator { * @ignore * @extends {Ignored} */ -class AuditSubscriber extends OuterSubscriber { +class AuditSubscriber extends SimpleOuterSubscriber { private value: T | null = null; private hasValue: boolean = false; @@ -95,7 +94,7 @@ class AuditSubscriber extends OuterSubscriber { } catch (err) { return this.destination.error(err); } - const innerSubscription = subscribeToResult(this, duration); + const innerSubscription = innerSubscribe(duration, new SimpleInnerSubscriber(this)); if (!innerSubscription || innerSubscription.closed) { this.clearThrottle(); } else { @@ -118,7 +117,7 @@ class AuditSubscriber extends OuterSubscriber { } } - notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { + notifyNext(): void { this.clearThrottle(); } diff --git a/src/internal/operators/buffer.ts b/src/internal/operators/buffer.ts index 4cf80c78c0..2e068d6cab 100644 --- a/src/internal/operators/buffer.ts +++ b/src/internal/operators/buffer.ts @@ -1,11 +1,9 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { OperatorFunction } from '../types'; import { lift } from '../util/lift'; +import { SimpleInnerSubscriber, SimpleOuterSubscriber, innerSubscribe } from '../innerSubscribe'; /** * Buffers the source Observable values until `closingNotifier` emits. @@ -67,21 +65,19 @@ class BufferOperator implements Operator { * @ignore * @extends {Ignored} */ -class BufferSubscriber extends OuterSubscriber { +class BufferSubscriber extends SimpleOuterSubscriber { private buffer: T[] = []; constructor(destination: Subscriber, closingNotifier: Observable) { super(destination); - this.add(subscribeToResult(this, closingNotifier)); + this.add(innerSubscribe(closingNotifier, new SimpleInnerSubscriber(this))); } protected _next(value: T) { this.buffer.push(value); } - notifyNext(outerValue: T, innerValue: any, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { const buffer = this.buffer; this.buffer = []; this.destination.next(buffer); diff --git a/src/internal/operators/bufferToggle.ts b/src/internal/operators/bufferToggle.ts index 1a8770f033..2666f353db 100644 --- a/src/internal/operators/bufferToggle.ts +++ b/src/internal/operators/bufferToggle.ts @@ -2,9 +2,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { Subscription } from '../Subscription'; -import { subscribeToResult } from '../util/subscribeToResult'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; +import { ComplexOuterSubscriber, ComplexInnerSubscriber, innerSubscribe } from '../innerSubscribe'; import { OperatorFunction, SubscribableOrPromise } from '../types'; import { lift } from '../util/lift'; @@ -83,14 +81,14 @@ interface BufferContext { * @ignore * @extends {Ignored} */ -class BufferToggleSubscriber extends OuterSubscriber { +class BufferToggleSubscriber extends ComplexOuterSubscriber { private contexts: Array> = []; constructor(destination: Subscriber, - private openings: SubscribableOrPromise, + openings: SubscribableOrPromise, private closingSelector: (value: O) => SubscribableOrPromise | void) { super(destination); - this.add(subscribeToResult(this, openings)); + this.add(innerSubscribe(openings, new ComplexInnerSubscriber(this, undefined, 0))) } protected _next(value: T): void { @@ -126,13 +124,11 @@ class BufferToggleSubscriber extends OuterSubscriber { super._complete(); } - notifyNext(outerValue: any, innerValue: O, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(outerValue: any, innerValue: O): void { outerValue ? this.closeBuffer(outerValue) : this.openBuffer(innerValue); } - notifyComplete(innerSub: InnerSubscriber): void { + notifyComplete(innerSub: ComplexInnerSubscriber): void { this.closeBuffer(( innerSub).context); } @@ -168,7 +164,7 @@ class BufferToggleSubscriber extends OuterSubscriber { const context = { buffer, subscription }; contexts.push(context); - const innerSubscription = subscribeToResult(this, closingNotifier, context); + const innerSubscription = innerSubscribe(closingNotifier, new ComplexInnerSubscriber(this, context, 0)); if (!innerSubscription || innerSubscription.closed) { this.closeBuffer(context); diff --git a/src/internal/operators/bufferWhen.ts b/src/internal/operators/bufferWhen.ts index 9e61aeb66b..9279c38ebb 100644 --- a/src/internal/operators/bufferWhen.ts +++ b/src/internal/operators/bufferWhen.ts @@ -2,11 +2,9 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { OperatorFunction } from '../types'; import { lift } from '../util/lift'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /** * Buffers the source Observable values, using a factory function of closing @@ -70,7 +68,7 @@ class BufferWhenOperator implements Operator { * @ignore * @extends {Ignored} */ -class BufferWhenSubscriber extends OuterSubscriber { +class BufferWhenSubscriber extends SimpleOuterSubscriber { private buffer: T[] | undefined; private subscribing: boolean = false; private closingSubscription: Subscription | undefined; @@ -98,9 +96,7 @@ class BufferWhenSubscriber extends OuterSubscriber { this.subscribing = false; } - notifyNext(outerValue: T, innerValue: any, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { this.openBuffer(); } @@ -138,7 +134,7 @@ class BufferWhenSubscriber extends OuterSubscriber { this.closingSubscription = closingSubscription; this.add(closingSubscription); this.subscribing = true; - closingSubscription.add(subscribeToResult(this, closingNotifier)); + closingSubscription.add(innerSubscribe(closingNotifier, new SimpleInnerSubscriber(this))); this.subscribing = false; } } diff --git a/src/internal/operators/catchError.ts b/src/internal/operators/catchError.ts index 56b6c83c4d..c60fa130bc 100644 --- a/src/internal/operators/catchError.ts +++ b/src/internal/operators/catchError.ts @@ -2,11 +2,9 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; import { lift } from '../util/lift'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /* tslint:disable:max-line-length */ export function catchError>(selector: (err: any, caught: Observable) => O): OperatorFunction>; @@ -131,7 +129,7 @@ class CatchOperator implements Operator { * @ignore * @extends {Ignored} */ -class CatchSubscriber extends OuterSubscriber { +class CatchSubscriber extends SimpleOuterSubscriber { constructor(destination: Subscriber, private selector: (err: any, caught: Observable) => ObservableInput, private caught: Observable) { @@ -153,9 +151,9 @@ class CatchSubscriber extends OuterSubscriber { return; } this._unsubscribeAndRecycle(); - const innerSubscriber = new InnerSubscriber(this, undefined, undefined!); + const innerSubscriber = new SimpleInnerSubscriber(this); this.add(innerSubscriber); - subscribeToResult(this, result, undefined, undefined, innerSubscriber); + innerSubscribe(result, innerSubscriber); } } } diff --git a/src/internal/operators/debounce.ts b/src/internal/operators/debounce.ts index 3d699435b5..d0f4d9465f 100644 --- a/src/internal/operators/debounce.ts +++ b/src/internal/operators/debounce.ts @@ -4,10 +4,8 @@ import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from '../types'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { lift } from '../util/lift'; +import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe'; /** * Emits a notification from the source Observable only after a particular time span @@ -85,7 +83,7 @@ class DebounceOperator implements Operator { * @ignore * @extends {Ignored} */ -class DebounceSubscriber extends OuterSubscriber { +class DebounceSubscriber extends SimpleOuterSubscriber { private value: T | null = null; private hasValue: boolean = false; private durationSubscription: Subscription | null | undefined = null; @@ -121,15 +119,13 @@ class DebounceSubscriber extends OuterSubscriber { this.remove(subscription); } - subscription = subscribeToResult(this, duration); + subscription = innerSubscribe(duration, new SimpleInnerSubscriber(this)); if (subscription && !subscription.closed) { this.add(this.durationSubscription = subscription); } } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { this.emitValue(); } diff --git a/src/internal/operators/delayWhen.ts b/src/internal/operators/delayWhen.ts index 6caf7e6702..48628f89cc 100644 --- a/src/internal/operators/delayWhen.ts +++ b/src/internal/operators/delayWhen.ts @@ -2,9 +2,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; +import { ComplexOuterSubscriber, ComplexInnerSubscriber, innerSubscribe } from '../innerSubscribe'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; import { lift } from '../util/lift'; @@ -96,7 +94,7 @@ class DelayWhenOperator implements Operator { * @ignore * @extends {Ignored} */ -class DelayWhenSubscriber extends OuterSubscriber { +class DelayWhenSubscriber extends ComplexOuterSubscriber { private completed: boolean = false; private delayNotifierSubscriptions: Array = []; private index: number = 0; @@ -106,19 +104,18 @@ class DelayWhenSubscriber extends OuterSubscriber { super(destination); } - notifyNext(outerValue: T, innerValue: any, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(outerValue: T, _innerValue: any, + _outerIndex: number, innerSub: ComplexInnerSubscriber): void { this.destination.next(outerValue); this.removeSubscription(innerSub); this.tryComplete(); } - notifyError(error: any, innerSub: InnerSubscriber): void { + notifyError(error: any): void { this._error(error); } - notifyComplete(innerSub: InnerSubscriber): void { + notifyComplete(innerSub: ComplexInnerSubscriber): void { const value = this.removeSubscription(innerSub); if (value) { this.destination.next(value); @@ -144,7 +141,7 @@ class DelayWhenSubscriber extends OuterSubscriber { this.unsubscribe(); } - private removeSubscription(subscription: InnerSubscriber): T { + private removeSubscription(subscription: ComplexInnerSubscriber): T { subscription.unsubscribe(); const subscriptionIdx = this.delayNotifierSubscriptions.indexOf(subscription); @@ -156,7 +153,7 @@ class DelayWhenSubscriber extends OuterSubscriber { } private tryDelay(delayNotifier: Observable, value: T): void { - const notifierSubscription = subscribeToResult(this, delayNotifier, value); + const notifierSubscription = innerSubscribe(delayNotifier, new ComplexInnerSubscriber(this, value, 0)); if (notifierSubscription && !notifierSubscription.closed) { const destination = this.destination as Subscription; diff --git a/src/internal/operators/distinct.ts b/src/internal/operators/distinct.ts index 18cd0b6145..e0cba04ec9 100644 --- a/src/internal/operators/distinct.ts +++ b/src/internal/operators/distinct.ts @@ -1,11 +1,9 @@ import { Observable } from '../Observable'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; import { lift } from '../util/lift'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /** * Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items. @@ -93,24 +91,22 @@ class DistinctOperator implements Operator { * @ignore * @extends {Ignored} */ -export class DistinctSubscriber extends OuterSubscriber { +export class DistinctSubscriber extends SimpleOuterSubscriber { private values = new Set(); constructor(destination: Subscriber, private keySelector?: (value: T) => K, flushes?: Observable) { super(destination); if (flushes) { - this.add(subscribeToResult(this, flushes)); + this.add(innerSubscribe(flushes, new SimpleInnerSubscriber(this))); } } - notifyNext(outerValue: T, innerValue: T, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { this.values.clear(); } - notifyError(error: any, innerSub: InnerSubscriber): void { + notifyError(error: any): void { this._error(error); } diff --git a/src/internal/operators/exhaust.ts b/src/internal/operators/exhaust.ts index 3ea6a6ce40..045e36776f 100644 --- a/src/internal/operators/exhaust.ts +++ b/src/internal/operators/exhaust.ts @@ -2,10 +2,9 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, TeardownLogic } from '../types'; import { lift } from '../util/lift'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; export function exhaust(): OperatorFunction, T>; export function exhaust(): OperatorFunction; @@ -68,31 +67,29 @@ class SwitchFirstOperator implements Operator { * @ignore * @extends {Ignored} */ -class SwitchFirstSubscriber extends OuterSubscriber { - private hasCompleted: boolean = false; - private hasSubscription: boolean = false; +class SwitchFirstSubscriber extends SimpleOuterSubscriber { + private hasCompleted = false; + private innerSubscription?: Subscription; constructor(destination: Subscriber) { super(destination); } protected _next(value: T): void { - if (!this.hasSubscription) { - this.hasSubscription = true; - this.add(subscribeToResult(this, value)); + if (!this.innerSubscription) { + this.innerSubscription = this.add(innerSubscribe(value, new SimpleInnerSubscriber(this))); } } protected _complete(): void { this.hasCompleted = true; - if (!this.hasSubscription) { + if (!this.innerSubscription) { this.destination.complete(); } } - notifyComplete(innerSub: Subscription): void { - this.remove(innerSub); - this.hasSubscription = false; + notifyComplete(): void { + this.innerSubscription = undefined; if (this.hasCompleted) { this.destination.complete(); } diff --git a/src/internal/operators/exhaustMap.ts b/src/internal/operators/exhaustMap.ts index 86385dd5a0..ac3ad16124 100644 --- a/src/internal/operators/exhaustMap.ts +++ b/src/internal/operators/exhaustMap.ts @@ -2,13 +2,11 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; import { map } from './map'; import { from } from '../observable/from'; import { lift } from '../util/lift'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /* tslint:disable:max-line-length */ export function exhaustMap>(project: (value: T, index: number) => O): OperatorFunction>; @@ -92,53 +90,42 @@ class ExhaustMapOperator implements Operator { * @ignore * @extends {Ignored} */ -class ExhaustMapSubscriber extends OuterSubscriber { - private hasSubscription = false; +class ExhaustMapSubscriber extends SimpleOuterSubscriber { + private innerSubscription?: Subscription; private hasCompleted = false; private index = 0; - constructor(destination: Subscriber, + constructor(protected destination: Subscriber, private project: (value: T, index: number) => ObservableInput) { super(destination); } protected _next(value: T): void { - if (!this.hasSubscription) { - this.tryNext(value); + if (!this.innerSubscription) { + let result: ObservableInput; + const index = this.index++; + try { + result = this.project(value, index); + } catch (err) { + this.destination.error(err); + return; + } + const innerSubscriber = new SimpleInnerSubscriber(this); + const destination = this.destination; + destination.add(innerSubscriber); + this.innerSubscription = innerSubscribe(result, innerSubscriber); } } - private tryNext(value: T): void { - let result: ObservableInput; - const index = this.index++; - try { - result = this.project(value, index); - } catch (err) { - this.destination.error(err); - return; - } - this.hasSubscription = true; - this._innerSub(result, value, index); - } - - private _innerSub(result: ObservableInput, value: T, index: number): void { - const innerSubscriber = new InnerSubscriber(this, value, index); - const destination = this.destination as Subscription; - destination.add(innerSubscriber); - subscribeToResult(this, result, undefined, undefined, innerSubscriber); - } - protected _complete(): void { this.hasCompleted = true; - if (!this.hasSubscription) { + if (!this.innerSubscription) { this.destination.complete(); } this.unsubscribe(); } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(innerValue: R): void { this.destination.next(innerValue); } @@ -146,11 +133,8 @@ class ExhaustMapSubscriber extends OuterSubscriber { this.destination.error(err); } - notifyComplete(innerSub: Subscription): void { - const destination = this.destination as Subscription; - destination.remove(innerSub); - - this.hasSubscription = false; + notifyComplete(): void { + this.innerSubscription = undefined; if (this.hasCompleted) { this.destination.complete(); } diff --git a/src/internal/operators/expand.ts b/src/internal/operators/expand.ts index ab0baef1fb..0214fbf27c 100644 --- a/src/internal/operators/expand.ts +++ b/src/internal/operators/expand.ts @@ -1,12 +1,9 @@ import { Observable } from '../Observable'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; -import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, OperatorFunction, ObservableInput, SchedulerLike } from '../types'; import { lift } from '../util/lift'; +import { SimpleInnerSubscriber, SimpleOuterSubscriber, innerSubscribe } from '../innerSubscribe'; /* tslint:disable:max-line-length */ export function expand(project: (value: T, index: number) => ObservableInput, concurrent?: number, scheduler?: SchedulerLike): OperatorFunction; @@ -86,8 +83,6 @@ export class ExpandOperator implements Operator { interface DispatchArg { subscriber: ExpandSubscriber; result: ObservableInput; - value: any; - index: number; } /** @@ -95,13 +90,13 @@ interface DispatchArg { * @ignore * @extends {Ignored} */ -export class ExpandSubscriber extends OuterSubscriber { +export class ExpandSubscriber extends SimpleOuterSubscriber { private index: number = 0; private active: number = 0; private hasCompleted: boolean = false; private buffer: any[] | undefined; - constructor(destination: Subscriber, + constructor(protected destination: Subscriber, private project: (value: T, index: number) => ObservableInput, private concurrent: number, private scheduler?: SchedulerLike) { @@ -112,8 +107,8 @@ export class ExpandSubscriber extends OuterSubscriber { } private static dispatch(arg: DispatchArg): void { - const {subscriber, result, value, index} = arg; - subscriber.subscribeToProjection(result, value, index); + const {subscriber, result} = arg; + subscriber.subscribeToProjection(result); } protected _next(value: any): void { @@ -132,10 +127,10 @@ export class ExpandSubscriber extends OuterSubscriber { const { project } = this; const result = project(value, index); if (!this.scheduler) { - this.subscribeToProjection(result, value, index); + this.subscribeToProjection(result); } else { - const state: DispatchArg = { subscriber: this, result, value, index }; - const destination = this.destination as Subscription; + const state: DispatchArg = { subscriber: this, result }; + const destination = this.destination; destination.add(this.scheduler.schedule>( ExpandSubscriber.dispatch as any, 0, @@ -150,9 +145,8 @@ export class ExpandSubscriber extends OuterSubscriber { } } - private subscribeToProjection(result: any, value: T, index: number): void { - const destination = this.destination as Subscription; - destination.add(subscribeToResult(this, result, value, index)); + private subscribeToProjection(result: any): void { + this.destination.add(innerSubscribe(result, new SimpleInnerSubscriber(this))); } protected _complete(): void { @@ -163,16 +157,12 @@ export class ExpandSubscriber extends OuterSubscriber { this.unsubscribe(); } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(innerValue: R): void { this._next(innerValue); } - notifyComplete(innerSub: Subscription): void { + notifyComplete(): void { const buffer = this.buffer; - const destination = this.destination as Subscription; - destination.remove(innerSub); this.active--; if (buffer && buffer.length > 0) { this._next(buffer.shift()); diff --git a/src/internal/operators/mergeMap.ts b/src/internal/operators/mergeMap.ts index 60ac8877a4..c1d4afc8c0 100644 --- a/src/internal/operators/mergeMap.ts +++ b/src/internal/operators/mergeMap.ts @@ -2,13 +2,11 @@ import { Observable } from '../Observable'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; -import { subscribeToResult } from '../util/subscribeToResult'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; import { map } from './map'; import { from } from '../observable/from'; import { lift } from '../util/lift'; +import { innerSubscribe, SimpleOuterSubscriber, SimpleInnerSubscriber } from '../innerSubscribe'; /* tslint:disable:max-line-length */ export function mergeMap>(project: (value: T, index: number) => O, concurrent?: number): OperatorFunction>; @@ -107,13 +105,13 @@ export class MergeMapOperator implements Operator { * @ignore * @extends {Ignored} */ -export class MergeMapSubscriber extends OuterSubscriber { +export class MergeMapSubscriber extends SimpleOuterSubscriber { private hasCompleted: boolean = false; private buffer: T[] = []; private active: number = 0; protected index: number = 0; - constructor(destination: Subscriber, + constructor(protected destination: Subscriber, private project: (value: T, index: number) => ObservableInput, private concurrent: number = Infinity) { super(destination); @@ -121,32 +119,24 @@ export class MergeMapSubscriber extends OuterSubscriber { protected _next(value: T): void { if (this.active < this.concurrent) { - this._tryNext(value); + let result: ObservableInput; + const index = this.index++; + try { + result = this.project(value, index); + } catch (err) { + this.destination.error(err); + return; + } + this.active++; + const innerSubscriber = new SimpleInnerSubscriber(this); + const destination = this.destination as Subscription; + destination.add(innerSubscriber); + innerSubscribe(result, innerSubscriber); } else { this.buffer.push(value); } } - protected _tryNext(value: T) { - let result: ObservableInput; - const index = this.index++; - try { - result = this.project(value, index); - } catch (err) { - this.destination.error(err); - return; - } - this.active++; - this._innerSub(result, value, index); - } - - private _innerSub(ish: ObservableInput, value: T, index: number): void { - const innerSubscriber = new InnerSubscriber(this, value, index); - const destination = this.destination as Subscription; - destination.add(innerSubscriber); - subscribeToResult(this, ish, undefined, undefined, innerSubscriber); - } - protected _complete(): void { this.hasCompleted = true; if (this.active === 0 && this.buffer.length === 0) { @@ -155,15 +145,12 @@ export class MergeMapSubscriber extends OuterSubscriber { this.unsubscribe(); } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(innerValue: R): void { this.destination.next(innerValue); } - notifyComplete(innerSub: Subscription): void { + notifyComplete(): void { const buffer = this.buffer; - this.remove(innerSub); this.active--; if (buffer.length > 0) { this._next(buffer.shift()!); diff --git a/src/internal/operators/mergeScan.ts b/src/internal/operators/mergeScan.ts index 1aa1804d73..f7163279c6 100644 --- a/src/internal/operators/mergeScan.ts +++ b/src/internal/operators/mergeScan.ts @@ -1,12 +1,9 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; -import { Subscription } from '../Subscription'; -import { subscribeToResult } from '../util/subscribeToResult'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; import { ObservableInput, OperatorFunction } from '../types'; import { lift } from '../util/lift'; +import { SimpleInnerSubscriber, SimpleOuterSubscriber, innerSubscribe } from '../innerSubscribe'; /** * Applies an accumulator function over the source Observable where the @@ -70,14 +67,14 @@ export class MergeScanOperator implements Operator { * @ignore * @extends {Ignored} */ -export class MergeScanSubscriber extends OuterSubscriber { +export class MergeScanSubscriber extends SimpleOuterSubscriber { private hasValue: boolean = false; private hasCompleted: boolean = false; private buffer: Observable[] = []; private active: number = 0; protected index: number = 0; - constructor(destination: Subscriber, + constructor(protected destination: Subscriber, private accumulator: (acc: R, value: T, index: number) => ObservableInput, private acc: R, private concurrent: number) { @@ -96,17 +93,16 @@ export class MergeScanSubscriber extends OuterSubscriber { return destination.error(e); } this.active++; - this._innerSub(ish, value, index); + this._innerSub(ish); } else { this.buffer.push(value); } } - private _innerSub(ish: any, value: T, index: number): void { - const innerSubscriber = new InnerSubscriber(this, value, index); - const destination = this.destination as Subscription; - destination.add(innerSubscriber); - subscribeToResult(this, ish, undefined, undefined, innerSubscriber); + private _innerSub(ish: any): void { + const innerSubscriber = new SimpleInnerSubscriber(this); + this.destination.add(innerSubscriber); + innerSubscribe(ish, innerSubscriber); } protected _complete(): void { @@ -120,19 +116,15 @@ export class MergeScanSubscriber extends OuterSubscriber { this.unsubscribe(); } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(innerValue: R): void { const { destination } = this; this.acc = innerValue; this.hasValue = true; destination.next(innerValue); } - notifyComplete(innerSub: Subscription): void { + notifyComplete(): void { const buffer = this.buffer; - const destination = this.destination as Subscription; - destination.remove(innerSub); this.active--; if (buffer.length > 0) { this._next(buffer.shift()); diff --git a/src/internal/operators/onErrorResumeNext.ts b/src/internal/operators/onErrorResumeNext.ts index 262e6544c7..f25541e84e 100644 --- a/src/internal/operators/onErrorResumeNext.ts +++ b/src/internal/operators/onErrorResumeNext.ts @@ -2,13 +2,10 @@ import { Observable } from '../Observable'; import { from } from '../observable/from'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; -import { Subscription } from '../Subscription'; import { isArray } from '../util/isArray'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction } from '../types'; import { lift } from '../util/lift'; +import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe'; /* tslint:disable:max-line-length */ export function onErrorResumeNext(): OperatorFunction; @@ -132,17 +129,17 @@ class OnErrorResumeNextOperator implements Operator { } } -class OnErrorResumeNextSubscriber extends OuterSubscriber { +class OnErrorResumeNextSubscriber extends SimpleOuterSubscriber { constructor(protected destination: Subscriber, private nextSources: Array>) { super(destination); } - notifyError(error: any, innerSub: InnerSubscriber): void { + notifyError(): void { this.subscribeToNextSource(); } - notifyComplete(innerSub: InnerSubscriber): void { + notifyComplete(): void { this.subscribeToNextSource(); } @@ -159,10 +156,10 @@ class OnErrorResumeNextSubscriber extends OuterSubscriber { private subscribeToNextSource(): void { const next = this.nextSources.shift(); if (!!next) { - const innerSubscriber = new InnerSubscriber(this, undefined, undefined!); - const destination = this.destination as Subscription; + const innerSubscriber = new SimpleInnerSubscriber(this); + const destination = this.destination; destination.add(innerSubscriber); - subscribeToResult(this, next, undefined, undefined, innerSubscriber); + innerSubscribe(next, innerSubscriber); } else { this.destination.complete(); } diff --git a/src/internal/operators/repeatWhen.ts b/src/internal/operators/repeatWhen.ts index fa7530de1f..69ecdd52d3 100644 --- a/src/internal/operators/repeatWhen.ts +++ b/src/internal/operators/repeatWhen.ts @@ -4,12 +4,9 @@ import { Observable } from '../Observable'; import { Subject } from '../Subject'; import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; - import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; import { lift } from '../util/lift'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /** * Returns an Observable that mirrors the source Observable with the exception of a `complete`. If the source @@ -58,7 +55,7 @@ class RepeatWhenOperator implements Operator { * @ignore * @extends {Ignored} */ -class RepeatWhenSubscriber extends OuterSubscriber { +class RepeatWhenSubscriber extends SimpleOuterSubscriber { private notifications: Subject | null = null; private retries: Observable | null = null; @@ -71,14 +68,12 @@ class RepeatWhenSubscriber extends OuterSubscriber { super(destination); } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { this.sourceIsBeingSubscribedTo = true; this.source.subscribe(this); } - notifyComplete(innerSub: InnerSubscriber): void { + notifyComplete(): void { if (this.sourceIsBeingSubscribedTo === false) { return super.complete(); } @@ -135,6 +130,6 @@ class RepeatWhenSubscriber extends OuterSubscriber { return super.complete(); } this.retries = retries; - this.retriesSubscription = subscribeToResult(this, retries); + this.retriesSubscription = innerSubscribe(retries, new SimpleInnerSubscriber(this)); } } diff --git a/src/internal/operators/retryWhen.ts b/src/internal/operators/retryWhen.ts index 2a8bda0780..5368ea8611 100644 --- a/src/internal/operators/retryWhen.ts +++ b/src/internal/operators/retryWhen.ts @@ -4,12 +4,10 @@ import { Observable } from '../Observable'; import { Subject } from '../Subject'; import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; import { lift } from '../util/lift'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /** * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable @@ -82,7 +80,7 @@ class RetryWhenOperator implements Operator { * @ignore * @extends {Ignored} */ -class RetryWhenSubscriber extends OuterSubscriber { +class RetryWhenSubscriber extends SimpleOuterSubscriber { private errors: Subject | null = null; private retries: Observable | null = null; @@ -109,7 +107,7 @@ class RetryWhenSubscriber extends OuterSubscriber { } catch (e) { return super.error(e); } - retriesSubscription = subscribeToResult(this, retries); + retriesSubscription = innerSubscribe(retries, new SimpleInnerSubscriber(this)); } else { this.errors = null; this.retriesSubscription = null; @@ -139,9 +137,7 @@ class RetryWhenSubscriber extends OuterSubscriber { this.retries = null; } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { const { _unsubscribe } = this; this._unsubscribe = null!; diff --git a/src/internal/operators/sample.ts b/src/internal/operators/sample.ts index 86afa79b8b..9622ac212b 100644 --- a/src/internal/operators/sample.ts +++ b/src/internal/operators/sample.ts @@ -1,12 +1,10 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; import { lift } from '../util/lift'; +import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe'; /** * Emits the most recently emitted value from the source Observable whenever @@ -58,7 +56,7 @@ class SampleOperator implements Operator { call(subscriber: Subscriber, source: any): TeardownLogic { const sampleSubscriber = new SampleSubscriber(subscriber); const subscription = source.subscribe(sampleSubscriber); - subscription.add(subscribeToResult(sampleSubscriber, this.notifier)); + subscription.add(innerSubscribe(this.notifier, new SimpleInnerSubscriber(sampleSubscriber))); return subscription; } } @@ -68,7 +66,7 @@ class SampleOperator implements Operator { * @ignore * @extends {Ignored} */ -class SampleSubscriber extends OuterSubscriber { +class SampleSubscriber extends SimpleOuterSubscriber { private value: T | undefined; private hasValue: boolean = false; @@ -77,9 +75,7 @@ class SampleSubscriber extends OuterSubscriber { this.hasValue = true; } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { this.emitValue(); } diff --git a/src/internal/operators/skipUntil.ts b/src/internal/operators/skipUntil.ts index 60b2aa1b22..a08c5814df 100644 --- a/src/internal/operators/skipUntil.ts +++ b/src/internal/operators/skipUntil.ts @@ -1,12 +1,10 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic, ObservableInput } from '../types'; import { Subscription } from '../Subscription'; import { lift } from '../util/lift'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /** * Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item. @@ -64,29 +62,26 @@ class SkipUntilOperator implements Operator { * @ignore * @extends {Ignored} */ -class SkipUntilSubscriber extends OuterSubscriber { - - private hasValue: boolean = false; +class SkipUntilSubscriber extends SimpleOuterSubscriber { + private isTaking = false; private innerSubscription: Subscription | undefined; constructor(destination: Subscriber, notifier: ObservableInput) { super(destination); - const innerSubscriber = new InnerSubscriber(this, undefined, undefined!); + const innerSubscriber = new SimpleInnerSubscriber(this); this.add(innerSubscriber); this.innerSubscription = innerSubscriber; - subscribeToResult(this, notifier, undefined, undefined, innerSubscriber); + innerSubscribe(notifier, innerSubscriber); } protected _next(value: T) { - if (this.hasValue) { + if (this.isTaking) { super._next(value); } } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - this.hasValue = true; + notifyNext(): void { + this.isTaking = true; if (this.innerSubscription) { this.innerSubscription.unsubscribe(); } diff --git a/src/internal/operators/switchMap.ts b/src/internal/operators/switchMap.ts index e05fa29149..3fd4bc90b4 100644 --- a/src/internal/operators/switchMap.ts +++ b/src/internal/operators/switchMap.ts @@ -2,13 +2,11 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; import { map } from './map'; import { from } from '../observable/from'; import { lift } from '../util/lift'; +import { SimpleInnerSubscriber, innerSubscribe, SimpleOuterSubscriber } from '../innerSubscribe'; /* tslint:disable:max-line-length */ export function switchMap>(project: (value: T, index: number) => O): OperatorFunction>; @@ -107,11 +105,11 @@ class SwitchMapOperator implements Operator { * @ignore * @extends {Ignored} */ -class SwitchMapSubscriber extends OuterSubscriber { +class SwitchMapSubscriber extends SimpleOuterSubscriber { private index: number = 0; - private innerSubscription: Subscription | null | undefined; + private innerSubscription?: Subscription; - constructor(destination: Subscriber, + constructor(protected destination: Subscriber, private project: (value: T, index: number) => ObservableInput) { super(destination); } @@ -125,18 +123,13 @@ class SwitchMapSubscriber extends OuterSubscriber { this.destination.error(error); return; } - this._innerSub(result, value, index); - } - - private _innerSub(result: ObservableInput, value: T, index: number) { const innerSubscription = this.innerSubscription; if (innerSubscription) { innerSubscription.unsubscribe(); } - const innerSubscriber = new InnerSubscriber(this, value, index); - const destination = this.destination as Subscription; - destination.add(innerSubscriber); - this.innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber); + const innerSubscriber = new SimpleInnerSubscriber(this); + this.destination.add(innerSubscriber); + this.innerSubscription = innerSubscribe(result, innerSubscriber); } protected _complete(): void { @@ -148,21 +141,17 @@ class SwitchMapSubscriber extends OuterSubscriber { } protected _unsubscribe() { - this.innerSubscription = null!; + this.innerSubscription = undefined; } - notifyComplete(innerSub: Subscription): void { - const destination = this.destination as Subscription; - destination.remove(innerSub); - this.innerSubscription = null!; + notifyComplete(): void { + this.innerSubscription = undefined; if (this.isStopped) { super._complete(); } } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(innerValue: R): void { this.destination.next(innerValue); } } diff --git a/src/internal/operators/takeUntil.ts b/src/internal/operators/takeUntil.ts index 629ca024ad..6bee857af4 100644 --- a/src/internal/operators/takeUntil.ts +++ b/src/internal/operators/takeUntil.ts @@ -2,12 +2,10 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; import { lift } from '../util/lift'; +import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe'; /** * Emits the values emitted by the source Observable until a `notifier` @@ -58,8 +56,8 @@ class TakeUntilOperator implements Operator { call(subscriber: Subscriber, source: any): TeardownLogic { const takeUntilSubscriber = new TakeUntilSubscriber(subscriber); - const notifierSubscription = subscribeToResult(takeUntilSubscriber, this.notifier); - if (notifierSubscription && !takeUntilSubscriber.seenValue) { + const notifierSubscription = innerSubscribe(this.notifier, new SimpleInnerSubscriber(takeUntilSubscriber)); + if (notifierSubscription && !takeUntilSubscriber.notifierHasNotified) { takeUntilSubscriber.add(notifierSubscription); return source.subscribe(takeUntilSubscriber); } @@ -72,17 +70,15 @@ class TakeUntilOperator implements Operator { * @ignore * @extends {Ignored} */ -class TakeUntilSubscriber extends OuterSubscriber { - seenValue = false; +class TakeUntilSubscriber extends SimpleOuterSubscriber { + notifierHasNotified = false; constructor(destination: Subscriber, ) { super(destination); } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - this.seenValue = true; + notifyNext(): void { + this.notifierHasNotified = true; this.complete(); } diff --git a/src/internal/operators/throttle.ts b/src/internal/operators/throttle.ts index d912fc99ab..5bb43abf95 100644 --- a/src/internal/operators/throttle.ts +++ b/src/internal/operators/throttle.ts @@ -3,12 +3,10 @@ import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from '../types'; import { lift } from '../util/lift'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; export interface ThrottleConfig { leading?: boolean; @@ -88,7 +86,7 @@ class ThrottleOperator implements Operator { * @ignore * @extends {Ignored} */ -class ThrottleSubscriber extends OuterSubscriber { +class ThrottleSubscriber extends SimpleOuterSubscriber { private _throttled: Subscription | null | undefined; private _sendValue: T | null = null; private _hasValue = false; @@ -124,19 +122,14 @@ class ThrottleSubscriber extends OuterSubscriber { } private throttle(value: T): void { - const duration = this.tryDurationSelector(value); - if (!!duration) { - this.add(this._throttled = subscribeToResult(this, duration)); - } - } - - private tryDurationSelector(value: T): SubscribableOrPromise | null { + let result: SubscribableOrPromise; try { - return this.durationSelector(value); + result = this.durationSelector(value); } catch (err) { this.destination.error(err); - return null; + return } + this.add(this._throttled = innerSubscribe(result, new SimpleInnerSubscriber(this))); } private throttlingDone() { @@ -151,9 +144,7 @@ class ThrottleSubscriber extends OuterSubscriber { } } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { this.throttlingDone(); } diff --git a/src/internal/operators/timeoutWith.ts b/src/internal/operators/timeoutWith.ts index 946d4feb5f..0c4253103b 100644 --- a/src/internal/operators/timeoutWith.ts +++ b/src/internal/operators/timeoutWith.ts @@ -3,10 +3,9 @@ import { Subscriber } from '../Subscriber'; import { async } from '../scheduler/async'; import { Observable } from '../Observable'; import { isValidDate } from '../util/isDate'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, SchedulerAction, SchedulerLike, TeardownLogic } from '../types'; import { lift } from '../util/lift'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /* tslint:disable:max-line-length */ export function timeoutWith(due: number | Date, withObservable: ObservableInput, scheduler?: SchedulerLike): OperatorFunction; @@ -93,7 +92,7 @@ class TimeoutWithOperator implements Operator { * @ignore * @extends {Ignored} */ -class TimeoutWithSubscriber extends OuterSubscriber { +class TimeoutWithSubscriber extends SimpleOuterSubscriber { private action: SchedulerAction> | null = null; @@ -108,8 +107,8 @@ class TimeoutWithSubscriber extends OuterSubscriber { private static dispatchTimeout(subscriber: TimeoutWithSubscriber): void { const { withObservable } = subscriber; - ( subscriber)._unsubscribeAndRecycle(); - subscriber.add(subscribeToResult(subscriber, withObservable)); + subscriber._unsubscribeAndRecycle(); + subscriber.add(innerSubscribe(withObservable, new SimpleInnerSubscriber(subscriber))); } private scheduleTimeout(): void { @@ -120,11 +119,11 @@ class TimeoutWithSubscriber extends OuterSubscriber { // VirtualActions are immutable, so they create and return a clone. In this // case, we need to set the action reference to the most recent VirtualAction, // to ensure that's the one we clone from next time. - this.action = (>> action.schedule(this, this.waitFor)); + this.action = action.schedule(this, this.waitFor) as SchedulerAction>; } else { - this.add(this.action = (>> this.scheduler.schedule>( + this.add(this.action = (this.scheduler.schedule( TimeoutWithSubscriber.dispatchTimeout as any, this.waitFor, this - ))); + ) as SchedulerAction>)); } } diff --git a/src/internal/operators/window.ts b/src/internal/operators/window.ts index bb71085fe8..d0f6e89ff1 100644 --- a/src/internal/operators/window.ts +++ b/src/internal/operators/window.ts @@ -2,11 +2,9 @@ import { Observable } from '../Observable'; import { OperatorFunction } from '../types'; import { Subject } from '../Subject'; import { Subscriber } from '../Subscriber'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { Operator } from '../Operator'; import { lift } from '../util/lift'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /** * Branch out the source Observable values as a nested Observable whenever @@ -65,7 +63,7 @@ class WindowOperator implements Operator> { const windowSubscriber = new WindowSubscriber(subscriber); const sourceSubscription = source.subscribe(windowSubscriber); if (!sourceSubscription.closed) { - windowSubscriber.add(subscribeToResult(windowSubscriber, this.windowBoundaries)); + windowSubscriber.add(innerSubscribe(this.windowBoundaries, new SimpleInnerSubscriber(windowSubscriber))); } return sourceSubscription; } @@ -76,7 +74,7 @@ class WindowOperator implements Operator> { * @ignore * @extends {Ignored} */ -class WindowSubscriber extends OuterSubscriber { +class WindowSubscriber extends SimpleOuterSubscriber { private window: Subject = new Subject(); @@ -85,17 +83,15 @@ class WindowSubscriber extends OuterSubscriber { destination.next(this.window); } - notifyNext(outerValue: T, innerValue: any, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { this.openWindow(); } - notifyError(error: any, innerSub: InnerSubscriber): void { + notifyError(error: any): void { this._error(error); } - notifyComplete(innerSub: InnerSubscriber): void { + notifyComplete(): void { this._complete(); } diff --git a/src/internal/operators/windowToggle.ts b/src/internal/operators/windowToggle.ts index 79fb1cde25..588c0617f0 100644 --- a/src/internal/operators/windowToggle.ts +++ b/src/internal/operators/windowToggle.ts @@ -3,9 +3,7 @@ import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { Subject } from '../Subject'; import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; +import { ComplexOuterSubscriber, ComplexInnerSubscriber, innerSubscribe } from '../innerSubscribe'; import { OperatorFunction } from '../types'; import { lift } from '../util/lift'; @@ -84,7 +82,7 @@ interface WindowContext { * @ignore * @extends {Ignored} */ -class WindowToggleSubscriber extends OuterSubscriber { +class WindowToggleSubscriber extends ComplexOuterSubscriber { private contexts: WindowContext[] = []; private openSubscription: Subscription | undefined; @@ -92,7 +90,7 @@ class WindowToggleSubscriber extends OuterSubscriber { private openings: Observable, private closingSelector: (openValue: O) => Observable) { super(destination); - this.add(this.openSubscription = subscribeToResult(this, openings, openings as any)); + this.add(this.openSubscription = innerSubscribe(openings, new ComplexInnerSubscriber(this, openings, 0))); } protected _next(value: T) { @@ -154,9 +152,7 @@ class WindowToggleSubscriber extends OuterSubscriber { } } - notifyNext(outerValue: any, innerValue: any, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(outerValue: any, innerValue: any): void { if (outerValue === this.openings) { let closingNotifier; @@ -171,7 +167,7 @@ class WindowToggleSubscriber extends OuterSubscriber { const subscription = new Subscription(); const context = { window, subscription }; this.contexts.push(context); - const innerSubscription = subscribeToResult(this, closingNotifier, context as any); + const innerSubscription = innerSubscribe(closingNotifier, new ComplexInnerSubscriber(this, context, 0)); if (innerSubscription!.closed) { this.closeWindow(this.contexts.length - 1); diff --git a/src/internal/operators/windowWhen.ts b/src/internal/operators/windowWhen.ts index a479d4a471..5f55ab5259 100644 --- a/src/internal/operators/windowWhen.ts +++ b/src/internal/operators/windowWhen.ts @@ -3,12 +3,9 @@ import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { Subject } from '../Subject'; import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; +import { ComplexOuterSubscriber, ComplexInnerSubscriber, innerSubscribe } from '../innerSubscribe'; import { OperatorFunction } from '../types'; import { lift } from '../util/lift'; - /** * Branch out the source Observable values as a nested Observable using a * factory function of closing Observables to determine when to start a new @@ -73,7 +70,7 @@ class WindowOperator implements Operator> { * @ignore * @extends {Ignored} */ -class WindowSubscriber extends OuterSubscriber { +class WindowSubscriber extends ComplexOuterSubscriber { private window: Subject | undefined; private closingNotification: Subscription | undefined; @@ -83,17 +80,17 @@ class WindowSubscriber extends OuterSubscriber { this.openWindow(); } - notifyNext(outerValue: T, innerValue: any, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(_outerValue: T, _innerValue: any, + _outerIndex: number, + innerSub: ComplexInnerSubscriber): void { this.openWindow(innerSub); } - notifyError(error: any, innerSub: InnerSubscriber): void { + notifyError(error: any): void { this._error(error); } - notifyComplete(innerSub: InnerSubscriber): void { + notifyComplete(innerSub: ComplexInnerSubscriber): void { this.openWindow(innerSub); } @@ -119,7 +116,7 @@ class WindowSubscriber extends OuterSubscriber { } } - private openWindow(innerSub: InnerSubscriber | null = null): void { + private openWindow(innerSub: ComplexInnerSubscriber | null = null): void { if (innerSub) { this.remove(innerSub); innerSub.unsubscribe(); @@ -142,6 +139,6 @@ class WindowSubscriber extends OuterSubscriber { this.window.error(e); return; } - this.add(this.closingNotification = subscribeToResult(this, closingNotifier)); + this.add(this.closingNotification = innerSubscribe(closingNotifier, new ComplexInnerSubscriber(this, undefined, 0))); } } diff --git a/src/internal/operators/withLatestFrom.ts b/src/internal/operators/withLatestFrom.ts index 4413c873f2..6de8a5e972 100644 --- a/src/internal/operators/withLatestFrom.ts +++ b/src/internal/operators/withLatestFrom.ts @@ -1,9 +1,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; +import { ComplexOuterSubscriber, innerSubscribe, ComplexInnerSubscriber } from '../innerSubscribe'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; import { lift } from '../util/lift'; @@ -94,12 +92,12 @@ class WithLatestFromOperator implements Operator { * @ignore * @extends {Ignored} */ -class WithLatestFromSubscriber extends OuterSubscriber { +class WithLatestFromSubscriber extends ComplexOuterSubscriber { private values: any[]; private toRespond: number[] = []; constructor(destination: Subscriber, - private observables: Observable[], + observables: Observable[], private project?: (...values: any[]) => Observable) { super(destination); const len = observables.length; @@ -111,13 +109,12 @@ class WithLatestFromSubscriber extends OuterSubscriber { for (let i = 0; i < len; i++) { let observable = observables[i]; - this.add(subscribeToResult(this, observable, observable, i)); + this.add(innerSubscribe(observable, new ComplexInnerSubscriber(this, undefined, i))); } } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(_outerValue: T, innerValue: R, + outerIndex: number): void { this.values[outerIndex] = innerValue; const toRespond = this.toRespond; if (toRespond.length > 0) { diff --git a/src/internal/util/subscribeToResult.ts b/src/internal/util/subscribeToResult.ts deleted file mode 100644 index fdd0769340..0000000000 --- a/src/internal/util/subscribeToResult.ts +++ /dev/null @@ -1,37 +0,0 @@ -import { Subscription } from '../Subscription'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { Subscriber } from '../Subscriber'; -import { subscribeTo } from './subscribeTo'; -import { Observable } from '../Observable'; - -export function subscribeToResult( - outerSubscriber: OuterSubscriber, - result: any, - outerValue: undefined, - outerIndex: undefined, - innerSubscriber: InnerSubscriber -): Subscription | undefined; - -export function subscribeToResult( - outerSubscriber: OuterSubscriber, - result: any, - outerValue?: T, - outerIndex?: number -): Subscription | undefined; - -export function subscribeToResult( - outerSubscriber: OuterSubscriber, - result: any, - outerValue?: T, - outerIndex?: number, - innerSubscriber: Subscriber = new InnerSubscriber(outerSubscriber, outerValue, outerIndex!) -): Subscription | undefined { - if (innerSubscriber.closed) { - return undefined; - } - if (result instanceof Observable) { - return result.subscribe(innerSubscriber); - } - return subscribeTo(result)(innerSubscriber) as Subscription; -}