From 4439308100da474aeaa5fc05dd2cbdea9780c677 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Fri, 5 Jun 2020 22:12:40 +1000 Subject: [PATCH 1/4] test: add failing interop subscriber test --- spec/Subscriber-spec.ts | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/spec/Subscriber-spec.ts b/spec/Subscriber-spec.ts index 5a61e04398..ab5efae64d 100644 --- a/spec/Subscriber-spec.ts +++ b/spec/Subscriber-spec.ts @@ -1,6 +1,7 @@ import { expect } from 'chai'; import { SafeSubscriber } from 'rxjs/internal/Subscriber'; -import { Subscriber } from 'rxjs'; +import { Subscriber, Observable } from 'rxjs'; +import { asInteropSubscriber } from './helpers/interop-helper'; /** @test {Subscriber} */ describe('Subscriber', () => { @@ -97,4 +98,22 @@ describe('Subscriber', () => { expect(argument).to.have.lengthOf(0); }); + + it('should chain interop unsubscriptions', () => { + let observableUnsubscribed = false; + let subscriberUnsubscribed = false; + let subscriptionUnsubscribed = false; + + const subscriber = new Subscriber(); + subscriber.add(() => subscriberUnsubscribed = true); + + const source = new Observable(() => () => observableUnsubscribed = true); + const subscription = source.subscribe(asInteropSubscriber(subscriber)); + subscription.add(() => subscriptionUnsubscribed = true); + subscriber.unsubscribe(); + + expect(observableUnsubscribed).to.be.true; + expect(subscriberUnsubscribed).to.be.true; + expect(subscriptionUnsubscribed).to.be.true; + }); }); From a9eaae80da789f817a4c9e8e1e771fbddf24fa6f Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 6 Jun 2020 08:39:23 +1000 Subject: [PATCH 2/4] fix: chain safe subscribers to interop subscribers Closes #5469 #5311 #2675 --- src/internal/Subscriber.ts | 6 +++--- src/internal/Subscription.ts | 6 ++++++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/internal/Subscriber.ts b/src/internal/Subscriber.ts index d6d76ea464..9b609385e1 100644 --- a/src/internal/Subscriber.ts +++ b/src/internal/Subscriber.ts @@ -1,7 +1,7 @@ import { isFunction } from './util/isFunction'; import { empty as emptyObserver } from './Observer'; import { Observer, PartialObserver } from './types'; -import { Subscription } from './Subscription'; +import { Subscription, isSubscription } from './Subscription'; import { rxSubscriber as rxSubscriberSymbol } from '../internal/symbol/rxSubscriber'; import { config } from './config'; import { hostReportError } from './util/hostReportError'; @@ -187,8 +187,8 @@ export class SafeSubscriber extends Subscriber { complete = (> observerOrNext).complete; if (observerOrNext !== emptyObserver) { context = Object.create(observerOrNext); - if (isFunction(context.unsubscribe)) { - this.add(<() => void> context.unsubscribe.bind(context)); + if (isSubscription(observerOrNext)) { + observerOrNext.add(this.unsubscribe.bind(this)); } context.unsubscribe = this.unsubscribe.bind(this); } diff --git a/src/internal/Subscription.ts b/src/internal/Subscription.ts index 0d7456a171..c040006d05 100644 --- a/src/internal/Subscription.ts +++ b/src/internal/Subscription.ts @@ -206,6 +206,12 @@ export class Subscription implements SubscriptionLike { } } +export function isSubscription(value: object): value is Subscription { + return value && + typeof (value as any).add === 'function' && + typeof (value as any).unsubscribe === 'function'; +} + function flattenUnsubscriptionErrors(errors: any[]) { return errors.reduce((errs, err) => errs.concat((err instanceof UnsubscriptionError) ? err.errors : err), []); } From 4bd71f0d481e45390d543bde6d869ff8b566d987 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 6 Jun 2020 08:43:09 +1000 Subject: [PATCH 3/4] refactor: remove subscribeWith --- spec/util/subscribeWith-spec.ts | 20 --------- src/internal/operators/finalize.ts | 3 +- src/internal/util/subscribeToObservable.ts | 3 +- src/internal/util/subscribeWith.ts | 47 ---------------------- 4 files changed, 2 insertions(+), 71 deletions(-) delete mode 100644 spec/util/subscribeWith-spec.ts delete mode 100644 src/internal/util/subscribeWith.ts diff --git a/spec/util/subscribeWith-spec.ts b/spec/util/subscribeWith-spec.ts deleted file mode 100644 index 656863a7e9..0000000000 --- a/spec/util/subscribeWith-spec.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { expect } from 'chai'; -import { of, Subscriber } from 'rxjs'; -import { subscribeWith } from 'rxjs/internal/util/subscribeWith'; -import { asInteropObservable, asInteropSubscriber } from '../helpers/interop-helper'; - -describe('subscribeWith', () => { - it('should return the subscriber for interop observables', () => { - const observable = asInteropObservable(of(42)); - const subscriber = new Subscriber(); - const subscription = subscribeWith(observable, subscriber); - expect(subscription).to.equal(subscriber); - }); - - it('should return the subscriber for interop subscribers', () => { - const observable = of(42); - const subscriber = asInteropSubscriber(new Subscriber()); - const subscription = subscribeWith(observable, subscriber); - expect(subscription).to.equal(subscriber); - }); -}); \ No newline at end of file diff --git a/src/internal/operators/finalize.ts b/src/internal/operators/finalize.ts index 11febe684d..ebeadb79c2 100644 --- a/src/internal/operators/finalize.ts +++ b/src/internal/operators/finalize.ts @@ -2,7 +2,6 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; -import { subscribeWith } from '../util/subscribeWith'; /** * Returns an Observable that mirrors the source Observable, but will call a specified function when @@ -68,7 +67,7 @@ class FinallyOperator implements Operator { } call(subscriber: Subscriber, source: any): TeardownLogic { - const subscription = subscribeWith(source, subscriber); + const subscription = source.subscribe(subscriber); subscription.add(this.callback); return subscription; } diff --git a/src/internal/util/subscribeToObservable.ts b/src/internal/util/subscribeToObservable.ts index 5cf469aecc..36b7cb283f 100644 --- a/src/internal/util/subscribeToObservable.ts +++ b/src/internal/util/subscribeToObservable.ts @@ -1,6 +1,5 @@ import { Subscriber } from '../Subscriber'; import { observable as Symbol_observable } from '../symbol/observable'; -import { subscribeWith } from './subscribeWith'; /** * Subscribes to an object that implements Symbol.observable with the given @@ -13,6 +12,6 @@ export const subscribeToObservable = (obj: any) => (subscriber: Subscriber // Should be caught by observable subscribe function error handling. throw new TypeError('Provided object does not correctly implement Symbol.observable'); } else { - return subscribeWith(obs, subscriber); + return obs.subscribe(subscriber); } }; diff --git a/src/internal/util/subscribeWith.ts b/src/internal/util/subscribeWith.ts deleted file mode 100644 index a149581cb6..0000000000 --- a/src/internal/util/subscribeWith.ts +++ /dev/null @@ -1,47 +0,0 @@ -import { Observable } from '../Observable'; -import { Subscriber } from '../Subscriber'; -import { Subscription } from '../Subscription'; - -/** - * Subscribes a subscriber to an observable and ensures the subscriber is - * returned as the subscription. - * - * When an instance of a subscriber from within this package is passed to the - * subscribe method of an observable from within this package, the returned - * subscription is the subscriber instance. - * - * Operator implementations depend upon this behaviour, so it's important - * that interop subscribers and observables behave in a similar manner. If - * they do not, unsubscription chains can be broken. - * - * This function ensures that if the subscription returned from the subscribe - * call is not the subscriber itself, the subscription is added to the - * subscriber and the subscriber is returned. Doing so will ensure that the - * unsubscription chain is not broken. - * - * This function needs to be used wherever an interop observable or - * subscriber could be encountered. There are two such places: - * - within `subscribeToObservable`; and - * - within the `call` method of each operator's `Operator` class. - * - * Within `subscribeToObservable` the observables are almost always going to - * be interop - as they're obtained via the `Symbol.observable` property. - * - * Within the `call` method, the operator's subscriber will be interop - - * relative to the source observable - if the operator is imported from a - * package that uses a different version of RxJS. - * - * @param observable the observable to subscribe to - * @param subscriber the subscriber to be subscribed - * @returns the passed-in subscriber (as the subscription) - */ -export function subscribeWith( - observable: Observable, - subscriber: Subscriber -): Subscription { - const subscription = observable.subscribe(subscriber); - if (subscription !== subscriber) { - subscriber.add(subscription); - } - return subscriber; -} \ No newline at end of file From 1d84d2e29b9a57e9db1c3f6341ce776764b5b633 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 9 Jun 2020 12:54:47 -0500 Subject: [PATCH 4/4] refactor: minor change to types to clean up code. --- src/internal/Subscription.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/internal/Subscription.ts b/src/internal/Subscription.ts index c040006d05..b4591ca7df 100644 --- a/src/internal/Subscription.ts +++ b/src/internal/Subscription.ts @@ -206,10 +206,10 @@ export class Subscription implements SubscriptionLike { } } -export function isSubscription(value: object): value is Subscription { +export function isSubscription(value: any): value is Subscription { return value && - typeof (value as any).add === 'function' && - typeof (value as any).unsubscribe === 'function'; + typeof value.add === 'function' && + typeof value.unsubscribe === 'function'; } function flattenUnsubscriptionErrors(errors: any[]) {