diff --git a/spec/Subscriber-spec.ts b/spec/Subscriber-spec.ts index bdac644988..a885fffaf3 100644 --- a/spec/Subscriber-spec.ts +++ b/spec/Subscriber-spec.ts @@ -23,7 +23,8 @@ describe('Subscriber', () => { it('should accept subscribers as a destination if they meet the proper criteria', () => { const fakeSubscriber = { [rxSubscriber](this: any) { return this; }, - _addParentTeardownLogic() { /* noop */ } + add() { /* noop */ }, + syncErrorThrowable: false }; const subscriber = new Subscriber(fakeSubscriber as any); diff --git a/spec/operators/mergeMap-spec.ts b/spec/operators/mergeMap-spec.ts index 05ca08168a..cb21aff151 100644 --- a/spec/operators/mergeMap-spec.ts +++ b/spec/operators/mergeMap-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import { mergeMap, map } from 'rxjs/operators'; -import { asapScheduler, defer, Observable, from, of } from 'rxjs'; +import { asapScheduler, defer, Observable, from, of, timer } from 'rxjs'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; declare const type: Function; @@ -717,7 +717,7 @@ describe('mergeMap', () => { // Added as a failing test when investigating: // https://github.com/ReactiveX/rxjs/issues/4071 - const results: any[] = []; + const results: (number | string)[] = []; of(1).pipe( mergeMap(() => defer(() => @@ -744,7 +744,7 @@ describe('mergeMap', () => { // Added as a failing test when investigating: // https://github.com/ReactiveX/rxjs/issues/4071 - const results: any[] = []; + const results: (number | string)[] = []; of(1).pipe( mergeMap(() => @@ -764,6 +764,30 @@ describe('mergeMap', () => { }, 0); }); + it('should support wrapped sources', (done: MochaDone) => { + + // Added as a failing test when investigating: + // https://github.com/ReactiveX/rxjs/issues/4095 + + const results: (number | string)[] = []; + + const wrapped = new Observable(subscriber => { + const subscription = timer(0, asapScheduler).subscribe(subscriber); + return () => subscription.unsubscribe(); + }); + wrapped.pipe( + mergeMap(() => timer(0, asapScheduler)) + ).subscribe({ + next(value) { results.push(value); }, + complete() { results.push('done'); } + }); + + setTimeout(() => { + expect(results).to.deep.equal([0, 'done']); + done(); + }, 0); + }); + type('should support type signatures', () => { let o: Observable; diff --git a/spec/operators/observeOn-spec.ts b/spec/operators/observeOn-spec.ts index ed218db90c..a0771ef08f 100644 --- a/spec/operators/observeOn-spec.ts +++ b/spec/operators/observeOn-spec.ts @@ -103,23 +103,21 @@ describe('observeOn operator', () => { .pipe(observeOn(asapScheduler)) .subscribe( x => { - const observeOnSubscriber = subscription._subscriptions[0]; - expect(observeOnSubscriber._subscriptions.length).to.equal(2); // one for the consumer, and one for the notification - expect(observeOnSubscriber._subscriptions[1].state.notification.kind) - .to.equal('N'); - expect(observeOnSubscriber._subscriptions[1].state.notification.value) - .to.equal(x); + // see #4106 - inner subscriptions are now added to destinations + // so the subscription will contain an ObserveOnSubscriber and a subscription for the scheduled action + expect(subscription._subscriptions.length).to.equal(2); + const actionSubscription = subscription._subscriptions[1]; + expect(actionSubscription.state.notification.kind).to.equal('N'); + expect(actionSubscription.state.notification.value).to.equal(x); results.push(x); }, err => done(err), () => { // now that the last nexted value is done, there should only be a complete notification scheduled // the consumer will have been unsubscribed via Subscriber#_parentSubscription - const observeOnSubscriber = subscription._subscriptions[0]; - expect(observeOnSubscriber._subscriptions.length).to.equal(1); // one for the complete notification - // only this completion notification should remain. - expect(observeOnSubscriber._subscriptions[0].state.notification.kind) - .to.equal('C'); + expect(subscription._subscriptions.length).to.equal(1); + const actionSubscription = subscription._subscriptions[0]; + expect(actionSubscription.state.notification.kind).to.equal('C'); // After completion, the entire _subscriptions list is nulled out anyhow, so we can't test much further than this. expect(results).to.deep.equal([1, 2, 3]); done(); diff --git a/spec/operators/switch-spec.ts b/spec/operators/switch-spec.ts index 6c1a9cb72b..b6bf4c4cab 100644 --- a/spec/operators/switch-spec.ts +++ b/spec/operators/switch-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { Observable, of, NEVER, queueScheduler, Subject } from 'rxjs'; -import { switchAll } from 'rxjs/operators'; +import { map, switchAll } from 'rxjs/operators'; declare function asDiagram(arg: string): Function; declare const type: Function; @@ -222,9 +222,9 @@ describe('switchAll', () => { it('should not leak when child completes before each switch (prevent memory leaks #2355)', () => { let iStream: Subject; const oStreamControl = new Subject(); - const oStream = oStreamControl.map(() => { - return (iStream = new Subject()); - }); + const oStream = oStreamControl.pipe( + map(() => (iStream = new Subject())) + ); const switcher = oStream.pipe(switchAll()); const result: number[] = []; let sub = switcher.subscribe((x) => result.push(x)); @@ -242,9 +242,9 @@ describe('switchAll', () => { it('should not leak if we switch before child completes (prevent memory leaks #2355)', () => { const oStreamControl = new Subject(); - const oStream = oStreamControl.map(() => { - return (new Subject()); - }); + const oStream = oStreamControl.pipe( + map(() => new Subject()) + ); const switcher = oStream.pipe(switchAll()); const result: number[] = []; let sub = switcher.subscribe((x) => result.push(x)); @@ -252,9 +252,14 @@ describe('switchAll', () => { [0, 1, 2, 3, 4].forEach((n) => { oStreamControl.next(n); // creates inner }); - // Expect two children of switch(): The oStream and the first inner + // Expect one child of switch(): The oStream expect( (sub as any)._subscriptions[0]._subscriptions.length + ).to.equal(1); + // Expect two children of subscribe(): The destination and the first inner + // See #4106 - inner subscriptions are now added to destinations + expect( + (sub as any)._subscriptions.length ).to.equal(2); sub.unsubscribe(); }); diff --git a/src/internal/Observable.ts b/src/internal/Observable.ts index 78b7506190..01669975c5 100644 --- a/src/internal/Observable.ts +++ b/src/internal/Observable.ts @@ -199,7 +199,7 @@ export class Observable implements Subscribable { if (operator) { operator.call(sink, this.source); } else { - sink._addParentTeardownLogic( + sink.add( this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ? this._subscribe(sink) : this._trySubscribe(sink) diff --git a/src/internal/Subscriber.ts b/src/internal/Subscriber.ts index 7f81c5814b..86c824ea41 100644 --- a/src/internal/Subscriber.ts +++ b/src/internal/Subscriber.ts @@ -45,7 +45,7 @@ export class Subscriber extends Subscription implements Observer { /** @internal */ syncErrorThrowable: boolean = false; protected isStopped: boolean = false; - protected destination: PartialObserver; // this `any` is the escape hatch to erase extra type param (e.g. R) + protected destination: PartialObserver | Subscriber; // this `any` is the escape hatch to erase extra type param (e.g. R) private _parentSubscription: Subscription | null = null; @@ -78,7 +78,7 @@ export class Subscriber extends Subscription implements Observer { const trustedSubscriber = destinationOrNext[rxSubscriberSymbol]() as Subscriber; this.syncErrorThrowable = trustedSubscriber.syncErrorThrowable; this.destination = trustedSubscriber; - trustedSubscriber._addParentTeardownLogic(this); + trustedSubscriber.add(this); } else { this.syncErrorThrowable = true; this.destination = new SafeSubscriber(this, > destinationOrNext); @@ -116,7 +116,6 @@ export class Subscriber extends Subscription implements Observer { if (!this.isStopped) { this.isStopped = true; this._error(err); - this._unsubscribeParentSubscription(); } } @@ -130,7 +129,6 @@ export class Subscriber extends Subscription implements Observer { if (!this.isStopped) { this.isStopped = true; this._complete(); - this._unsubscribeParentSubscription(); } } @@ -156,20 +154,6 @@ export class Subscriber extends Subscription implements Observer { this.unsubscribe(); } - /** @deprecated This is an internal implementation detail, do not use. */ - _addParentTeardownLogic(parentTeardownLogic: TeardownLogic) { - if (parentTeardownLogic !== this) { - this._parentSubscription = this.add(parentTeardownLogic); - } - } - - /** @deprecated This is an internal implementation detail, do not use. */ - _unsubscribeParentSubscription() { - if (this._parentSubscription !== null) { - this._parentSubscription.unsubscribe(); - } - } - /** @deprecated This is an internal implementation detail, do not use. */ _unsubscribeAndRecycle(): Subscriber { const { _parent, _parents } = this; @@ -326,5 +310,5 @@ export class SafeSubscriber extends Subscriber { } export function isTrustedSubscriber(obj: any) { - return obj instanceof Subscriber || ('_addParentTeardownLogic' in obj && obj[rxSubscriberSymbol]); + return obj instanceof Subscriber || ('syncErrorThrowable' in obj && obj[rxSubscriberSymbol]); } diff --git a/src/internal/Subscription.ts b/src/internal/Subscription.ts index 52380c0317..3faac56f87 100644 --- a/src/internal/Subscription.ts +++ b/src/internal/Subscription.ts @@ -45,7 +45,6 @@ export class Subscription implements SubscriptionLike { constructor(unsubscribe?: () => void) { if (unsubscribe) { ( this)._unsubscribe = unsubscribe; - } } diff --git a/src/internal/observable/zip.ts b/src/internal/observable/zip.ts index cea051a3d1..ddae0fa9c3 100644 --- a/src/internal/observable/zip.ts +++ b/src/internal/observable/zip.ts @@ -4,6 +4,7 @@ import { isArray } from '../util/isArray'; import { Operator } from '../Operator'; import { ObservableInput, PartialObserver } from '../types'; import { Subscriber } from '../Subscriber'; +import { Subscription } from '../Subscription'; import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; @@ -126,6 +127,8 @@ export class ZipSubscriber extends Subscriber { const iterators = this.iterators; const len = iterators.length; + this.unsubscribe(); + if (len === 0) { this.destination.complete(); return; @@ -135,7 +138,8 @@ export class ZipSubscriber extends Subscriber { for (let i = 0; i < len; i++) { let iterator: ZipBufferIterator = iterators[i]; if (iterator.stillUnsubscribed) { - this.add(iterator.subscribe(iterator, i)); + const destination = this.destination as Subscription; + destination.add(iterator.subscribe(iterator, i)); } else { this.active--; // not an observable } diff --git a/src/internal/operators/delay.ts b/src/internal/operators/delay.ts index 6ef71a4b64..3689e60f01 100644 --- a/src/internal/operators/delay.ts +++ b/src/internal/operators/delay.ts @@ -2,6 +2,7 @@ import { async } from '../scheduler/async'; import { isDate } from '../util/isDate'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; +import { Subscription } from '../Subscription'; import { Notification } from '../Notification'; import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, PartialObserver, SchedulerAction, SchedulerLike, TeardownLogic } from '../types'; @@ -110,7 +111,8 @@ class DelaySubscriber extends Subscriber { private _schedule(scheduler: SchedulerLike): void { this.active = true; - this.add(scheduler.schedule>(DelaySubscriber.dispatch, this.delay, { + const destination = this.destination as Subscription; + destination.add(scheduler.schedule>(DelaySubscriber.dispatch, this.delay, { source: this, destination: this.destination, scheduler: scheduler })); } @@ -137,10 +139,12 @@ class DelaySubscriber extends Subscriber { this.errored = true; this.queue = []; this.destination.error(err); + this.unsubscribe(); } protected _complete() { this.scheduleNotification(Notification.createComplete()); + this.unsubscribe(); } } diff --git a/src/internal/operators/delayWhen.ts b/src/internal/operators/delayWhen.ts index e8c071b890..d22f26bad4 100644 --- a/src/internal/operators/delayWhen.ts +++ b/src/internal/operators/delayWhen.ts @@ -132,6 +132,7 @@ class DelayWhenSubscriber extends OuterSubscriber { protected _complete(): void { this.completed = true; this.tryComplete(); + this.unsubscribe(); } private removeSubscription(subscription: InnerSubscriber): T { @@ -149,7 +150,8 @@ class DelayWhenSubscriber extends OuterSubscriber { const notifierSubscription = subscribeToResult(this, delayNotifier, value); if (notifierSubscription && !notifierSubscription.closed) { - this.add(notifierSubscription); + const destination = this.destination as Subscription; + destination.add(notifierSubscription); this.delayNotifierSubscriptions.push(notifierSubscription); } } @@ -199,6 +201,7 @@ class SubscriptionDelaySubscriber extends Subscriber { } protected _complete() { + this.unsubscribe(); this.subscribeToSource(); } diff --git a/src/internal/operators/exhaustMap.ts b/src/internal/operators/exhaustMap.ts index 67a6ee8bab..37b4e113cf 100644 --- a/src/internal/operators/exhaustMap.ts +++ b/src/internal/operators/exhaustMap.ts @@ -120,7 +120,8 @@ class ExhaustMapSubscriber extends OuterSubscriber { private _innerSub(result: ObservableInput, value: T, index: number): void { const innerSubscriber = new InnerSubscriber(this, undefined, undefined); - this.add(innerSubscriber); + const destination = this.destination as Subscription; + destination.add(innerSubscriber); subscribeToResult(this, result, value, index, innerSubscriber); } @@ -129,6 +130,7 @@ class ExhaustMapSubscriber extends OuterSubscriber { if (!this.hasSubscription) { this.destination.complete(); } + this.unsubscribe(); } notifyNext(outerValue: T, innerValue: R, @@ -142,7 +144,8 @@ class ExhaustMapSubscriber extends OuterSubscriber { } notifyComplete(innerSub: Subscription): void { - this.remove(innerSub); + const destination = this.destination as Subscription; + destination.remove(innerSub); this.hasSubscription = false; if (this.hasCompleted) { diff --git a/src/internal/operators/expand.ts b/src/internal/operators/expand.ts index 80c425a96a..33dde3bb38 100644 --- a/src/internal/operators/expand.ts +++ b/src/internal/operators/expand.ts @@ -133,7 +133,8 @@ export class ExpandSubscriber extends OuterSubscriber { this.subscribeToProjection(result, value, index); } else { const state: DispatchArg = { subscriber: this, result, value, index }; - this.add(this.scheduler.schedule>(ExpandSubscriber.dispatch, 0, state)); + const destination = this.destination as Subscription; + destination.add(this.scheduler.schedule>(ExpandSubscriber.dispatch, 0, state)); } } else { this.buffer.push(value); @@ -142,7 +143,8 @@ export class ExpandSubscriber extends OuterSubscriber { private subscribeToProjection(result: any, value: T, index: number): void { this.active++; - this.add(subscribeToResult(this, result, value, index)); + const destination = this.destination as Subscription; + destination.add(subscribeToResult(this, result, value, index)); } protected _complete(): void { @@ -150,6 +152,7 @@ export class ExpandSubscriber extends OuterSubscriber { if (this.hasCompleted && this.active === 0) { this.destination.complete(); } + this.unsubscribe(); } notifyNext(outerValue: T, innerValue: R, @@ -160,7 +163,8 @@ export class ExpandSubscriber extends OuterSubscriber { notifyComplete(innerSub: Subscription): void { const buffer = this.buffer; - this.remove(innerSub); + const destination = this.destination as Subscription; + destination.remove(innerSub); this.active--; if (buffer && buffer.length > 0) { this._next(buffer.shift()); diff --git a/src/internal/operators/mergeMap.ts b/src/internal/operators/mergeMap.ts index 3a2b9bc950..d843dcdc97 100644 --- a/src/internal/operators/mergeMap.ts +++ b/src/internal/operators/mergeMap.ts @@ -140,7 +140,8 @@ export class MergeMapSubscriber extends OuterSubscriber { private _innerSub(ish: ObservableInput, value: T, index: number): void { const innerSubscriber = new InnerSubscriber(this, undefined, undefined); - this.add(innerSubscriber); + const destination = this.destination as Subscription; + destination.add(innerSubscriber); subscribeToResult(this, ish, value, index, innerSubscriber); } @@ -149,6 +150,7 @@ export class MergeMapSubscriber extends OuterSubscriber { if (this.active === 0 && this.buffer.length === 0) { this.destination.complete(); } + this.unsubscribe(); } notifyNext(outerValue: T, innerValue: R, diff --git a/src/internal/operators/mergeScan.ts b/src/internal/operators/mergeScan.ts index 5c34d03a31..42be3d81c2 100644 --- a/src/internal/operators/mergeScan.ts +++ b/src/internal/operators/mergeScan.ts @@ -101,7 +101,8 @@ export class MergeScanSubscriber extends OuterSubscriber { private _innerSub(ish: any, value: T, index: number): void { const innerSubscriber = new InnerSubscriber(this, undefined, undefined); - this.add(innerSubscriber); + const destination = this.destination as Subscription; + destination.add(innerSubscriber); subscribeToResult(this, ish, value, index, innerSubscriber); } @@ -113,6 +114,7 @@ export class MergeScanSubscriber extends OuterSubscriber { } this.destination.complete(); } + this.unsubscribe(); } notifyNext(outerValue: T, innerValue: R, @@ -126,7 +128,8 @@ export class MergeScanSubscriber extends OuterSubscriber { notifyComplete(innerSub: Subscription): void { const buffer = this.buffer; - this.remove(innerSub); + const destination = this.destination as Subscription; + destination.remove(innerSub); this.active--; if (buffer.length > 0) { this._next(buffer.shift()); diff --git a/src/internal/operators/observeOn.ts b/src/internal/operators/observeOn.ts index 3813ce5c76..d06fe3c4d4 100644 --- a/src/internal/operators/observeOn.ts +++ b/src/internal/operators/observeOn.ts @@ -1,6 +1,7 @@ import { Observable } from '../Observable'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; +import { Subscription } from '../Subscription'; import { Notification } from '../Notification'; import { MonoTypeOperatorFunction, PartialObserver, SchedulerAction, SchedulerLike, TeardownLogic } from '../types'; @@ -88,7 +89,8 @@ export class ObserveOnSubscriber extends Subscriber { } private scheduleMessage(notification: Notification): void { - this.add(this.scheduler.schedule( + const destination = this.destination as Subscription; + destination.add(this.scheduler.schedule( ObserveOnSubscriber.dispatch, this.delay, new ObserveOnMessage(notification, this.destination) @@ -101,10 +103,12 @@ export class ObserveOnSubscriber extends Subscriber { protected _error(err: any): void { this.scheduleMessage(Notification.createError(err)); + this.unsubscribe(); } protected _complete(): void { this.scheduleMessage(Notification.createComplete()); + this.unsubscribe(); } } diff --git a/src/internal/operators/onErrorResumeNext.ts b/src/internal/operators/onErrorResumeNext.ts index d3197e5698..7c37de54c9 100644 --- a/src/internal/operators/onErrorResumeNext.ts +++ b/src/internal/operators/onErrorResumeNext.ts @@ -2,6 +2,7 @@ import { Observable } from '../Observable'; import { from } from '../observable/from'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; +import { Subscription } from '../Subscription'; import { isArray } from '../util/isArray'; import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; @@ -143,17 +144,20 @@ class OnErrorResumeNextSubscriber extends OuterSubscriber { protected _error(err: any): void { this.subscribeToNextSource(); + this.unsubscribe(); } protected _complete(): void { this.subscribeToNextSource(); + this.unsubscribe(); } private subscribeToNextSource(): void { const next = this.nextSources.shift(); if (next) { const innerSubscriber = new InnerSubscriber(this, undefined, undefined); - this.add(innerSubscriber); + const destination = this.destination as Subscription; + destination.add(innerSubscriber); subscribeToResult(this, next, undefined, undefined, innerSubscriber); } else { this.destination.complete(); diff --git a/src/internal/operators/sequenceEqual.ts b/src/internal/operators/sequenceEqual.ts index 100d9d5985..117b42bdb6 100644 --- a/src/internal/operators/sequenceEqual.ts +++ b/src/internal/operators/sequenceEqual.ts @@ -1,6 +1,7 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; +import { Subscription } from '../Subscription'; import { tryCatch } from '../util/tryCatch'; import { errorObject } from '../util/errorObject'; @@ -89,7 +90,7 @@ export class SequenceEqualSubscriber extends Subscriber { private compareTo: Observable, private comparor: (a: T, b: T) => boolean) { super(destination); - this.add(compareTo.subscribe(new SequenceEqualCompareToSubscriber(destination, this))); + (this.destination as Subscription).add(compareTo.subscribe(new SequenceEqualCompareToSubscriber(destination, this))); } protected _next(value: T): void { @@ -107,6 +108,7 @@ export class SequenceEqualSubscriber extends Subscriber { } else { this._oneComplete = true; } + this.unsubscribe(); } checkValues() { @@ -143,6 +145,14 @@ export class SequenceEqualSubscriber extends Subscriber { this.checkValues(); } } + + completeB() { + if (this._oneComplete) { + this.emit(this._a.length === 0 && this._b.length === 0); + } else { + this._oneComplete = true; + } + } } class SequenceEqualCompareToSubscriber extends Subscriber { @@ -156,9 +166,11 @@ class SequenceEqualCompareToSubscriber extends Subscriber { protected _error(err: any): void { this.parent.error(err); + this.unsubscribe(); } protected _complete(): void { - this.parent._complete(); + this.parent.completeB(); + this.unsubscribe(); } } diff --git a/src/internal/operators/switchMap.ts b/src/internal/operators/switchMap.ts index ee83978c08..0008c379ef 100644 --- a/src/internal/operators/switchMap.ts +++ b/src/internal/operators/switchMap.ts @@ -114,7 +114,8 @@ class SwitchMapSubscriber extends OuterSubscriber { innerSubscription.unsubscribe(); } const innerSubscriber = new InnerSubscriber(this, undefined, undefined); - this.add(innerSubscriber); + const destination = this.destination as Subscription; + destination.add(innerSubscriber); this.innerSubscription = subscribeToResult(this, result, value, index, innerSubscriber); } @@ -123,6 +124,7 @@ class SwitchMapSubscriber extends OuterSubscriber { if (!innerSubscription || innerSubscription.closed) { super._complete(); } + this.unsubscribe(); } protected _unsubscribe() { @@ -130,7 +132,8 @@ class SwitchMapSubscriber extends OuterSubscriber { } notifyComplete(innerSub: Subscription): void { - this.remove(innerSub); + const destination = this.destination as Subscription; + destination.remove(innerSub); this.innerSubscription = null; if (this.isStopped) { super._complete();