Skip to content

Commit

Permalink
fix: chain subscriptions from interop observables for 6.x (#5178)
Browse files Browse the repository at this point in the history
* fix: chain subscriptions from interop observables

* test: add failing tests for interop observables

* fix: check for interop subscriptions

* chore: make subscribeToResult signatures safer

* chore: add test comments

* chore: add implementation comments

* chore: use esnext TypeScript lib for tests

Proxy and Reflect aren't included in the base, es2015-based config.
cartant authored and benlesh committed Dec 11, 2019
1 parent 9f87505 commit cbc7721
Showing 18 changed files with 313 additions and 22 deletions.
19 changes: 19 additions & 0 deletions spec/helpers/interop-helper-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { expect } from 'chai';
import { Observable, of, Subscriber } from 'rxjs';
import { observable as symbolObservable } from 'rxjs/internal/symbol/observable';
import { rxSubscriber as symbolSubscriber } from 'rxjs/internal/symbol/rxSubscriber';
import { asInteropObservable, asInteropSubscriber } from './interop-helper';

describe('interop helper', () => {
it('should simulate interop observables', () => {
const observable = asInteropObservable(of(42));
expect(observable).to.not.be.instanceOf(Observable);
expect(observable[symbolObservable]).to.be.a('function');
});

it('should simulate interop subscribers', () => {
const subscriber = asInteropSubscriber(new Subscriber());
expect(subscriber).to.not.be.instanceOf(Subscriber);
expect(subscriber[symbolSubscriber]).to.be.undefined;
});
});
57 changes: 57 additions & 0 deletions spec/helpers/interop-helper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { Observable, Subscriber, Subscription } from 'rxjs';
import { rxSubscriber as symbolSubscriber } from 'rxjs/internal/symbol/rxSubscriber';

/**
* Returns an observable that will be deemed by this package's implementation
* to be an observable that requires interop. The returned observable will fail
* the `instanceof Observable` test and will deem any `Subscriber` passed to
* its `subscribe` method to be untrusted.
*/
export function asInteropObservable<T>(observable: Observable<T>): Observable<T> {
return new Proxy(observable, {
get(target: Observable<T>, key: string | number | symbol) {
if (key === 'subscribe') {
const { subscribe } = target;
return interopSubscribe(subscribe);
}
return Reflect.get(target, key);
},
getPrototypeOf(target: Observable<T>) {
const { subscribe, ...rest } = Object.getPrototypeOf(target);
return {
...rest,
subscribe: interopSubscribe(subscribe)
};
}
});
}

/**
* Returns a subscriber that will be deemed by this package's implementation to
* be untrusted. The returned subscriber will fail the `instanceof Subscriber`
* test and will not include the symbol that identifies trusted subscribers.
*/
export function asInteropSubscriber<T>(subscriber: Subscriber<T>): Subscriber<T> {
return new Proxy(subscriber, {
get(target: Subscriber<T>, key: string | number | symbol) {
if (key === symbolSubscriber) {
return undefined;
}
return Reflect.get(target, key);
},
getPrototypeOf(target: Subscriber<T>) {
const { [symbolSubscriber]: symbol, ...rest } = Object.getPrototypeOf(target);
return rest;
}
});
}

function interopSubscribe<T>(subscribe: (...args: any[]) => Subscription) {
return function (this: Observable<T>, ...args: any[]): Subscription {
const [arg] = args;
if (arg instanceof Subscriber) {
return subscribe.call(this, asInteropSubscriber(arg));
}
return subscribe.apply(this, args);
};
}
22 changes: 22 additions & 0 deletions spec/operators/catch-spec.ts
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ import { TestScheduler } from 'rxjs/testing';
import * as sinon from 'sinon';
import { createObservableInputs } from '../helpers/test-helper';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { asInteropObservable } from '../helpers/interop-helper';

declare function asDiagram(arg: string): Function;

@@ -121,6 +122,27 @@ describe('catchError operator', () => {
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should unsubscribe from a caught cold caught interop observable when unsubscribed explicitly', () => {
const e1 = hot('-1-2-3-# ');
const e1subs = '^ ! ';
const e2 = cold( '5-6-7-8-9-|');
const e2subs = ' ^ ! ';
const expected = '-1-2-3-5-6-7- ';
const unsub = ' ! ';

// This test is the same as the previous test, but the observable is
// manipulated to make it look like an interop observable - an observable
// from a foreign library. Interop subscribers are treated differently:
// they are wrapped in a safe subscriber. This test ensures that
// unsubscriptions are chained all the way to the interop subscriber.

const result = e1.pipe(catchError(() => asInteropObservable(e2)));

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = concat(
34 changes: 34 additions & 0 deletions spec/operators/exhaustMap-spec.ts
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/mar
import { concat, defer, Observable, of, from } from 'rxjs';
import { exhaustMap, mergeMap, takeWhile, map } from 'rxjs/operators';
import { expect } from 'chai';
import { asInteropObservable } from '../helpers/interop-helper';

declare function asDiagram(arg: string): Function;

@@ -202,6 +203,39 @@ describe('exhaustMap', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not break unsubscription chains with interop inners when result is unsubscribed explicitly', () => {
const x = cold( '--a--b--c--| ');
const xsubs = ' ^ ! ';
const y = cold( '--d--e--f--| ');
const ysubs: string[] = [];
const z = cold( '--g--h--i--| ');
const zsubs = ' ^ ! ';
const e1 = hot('---x---------y-----------------z-------------|');
const e1subs = '^ ! ';
const expected = '-----a--b--c---------------------g- ';
const unsub = ' ! ';

const observableLookup = { x: x, y: y, z: z };

// This test is the same as the previous test, but the observable is
// manipulated to make it look like an interop observable - an observable
// from a foreign library. Interop subscribers are treated differently:
// they are wrapped in a safe subscriber. This test ensures that
// unsubscriptions are chained all the way to the interop subscriber.

const result = e1.pipe(
mergeMap(x => of(x)),
exhaustMap(value => asInteropObservable(observableLookup[value])),
mergeMap(x => of(x))
);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
expectSubscriptions(z.subscriptions).toBe(zsubs);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = concat(
31 changes: 31 additions & 0 deletions spec/operators/mergeMap-spec.ts
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ import { expect } from 'chai';
import { mergeMap, map } from 'rxjs/operators';
import { asapScheduler, defer, Observable, from, of, timer } from 'rxjs';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { asInteropObservable } from '../helpers/interop-helper';

declare const type: Function;
declare const asDiagram: Function;
@@ -260,6 +261,36 @@ describe('mergeMap', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not break unsubscription chains with interop inners when result is unsubscribed explicitly', () => {
const x = cold( '--a--b--c--d--e--| ');
const xsubs = ' ^ ! ';
const y = cold( '---f---g---h---i--|');
const ysubs = ' ^ ! ';
const e1 = hot('---------x---------y---------| ');
const e1subs = '^ ! ';
const expected = '-----------a--b--c--d- ';
const unsub = ' ! ';

const observableLookup = { x: x, y: y };

// This test manipulates the observable to make it look like an interop
// observable - an observable from a foreign library. Interop subscribers
// are treated differently: they are wrapped in a safe subscriber. This
// test ensures that unsubscriptions are chained all the way to the
// interop subscriber.

const result = e1.pipe(
mergeMap(x => of(x)),
mergeMap(value => asInteropObservable(observableLookup[value])),
mergeMap(x => of(x)),
);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should mergeMap many outer to many inner, inner never completes', () => {
const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'};
const e1 = hot('-a-------b-------c-------d-------| ');
18 changes: 18 additions & 0 deletions spec/operators/onErrorResumeNext-spec.ts
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { onErrorResumeNext, takeWhile } from 'rxjs/operators';
import { concat, defer, throwError, of } from 'rxjs';
import { asInteropObservable } from '../helpers/interop-helper';

declare function asDiagram(arg: string): Function;

@@ -129,6 +130,23 @@ describe('onErrorResumeNext operator', () => {
expect(sideEffects).to.deep.equal([1, 2]);
});

it('should unsubscribe from an interop observble upon explicit unsubscription', () => {
const source = hot('--a--b--#');
const next = cold( '--c--d--');
const nextSubs = ' ^ !';
const subs = '^ !';
const expected = '--a--b----c--';

// This test manipulates the observable to make it look like an interop
// observable - an observable from a foreign library. Interop subscribers
// are treated differently: they are wrapped in a safe subscriber. This
// test ensures that unsubscriptions are chained all the way to the
// interop subscriber.

expectObservable(source.pipe(onErrorResumeNext(asInteropObservable(next))), subs).toBe(expected);
expectSubscriptions(next.subscriptions).toBe(nextSubs);
});

it('should work with promise', (done: MochaDone) => {
const expected = [1, 2];
const source = concat(of(1), throwError('meh'));
27 changes: 26 additions & 1 deletion spec/operators/skipUntil-spec.ts
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { concat, defer, Observable, of, Subject } from 'rxjs';
import { skipUntil, mergeMap } from 'rxjs/operators';
import { asInteropObservable } from '../helpers/interop-helper';

declare function asDiagram(arg: string): Function;

@@ -97,6 +98,31 @@ describe('skipUntil', () => {
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
});

it('should not break unsubscription chains with interop inners when result is unsubscribed explicitly', () => {
const e1 = hot('--a--b--c--d--e----|');
const e1subs = '^ ! ';
const skip = hot('-------------x--| ');
const skipSubs = '^ ! ';
const expected = ('---------- ');
const unsub = ' ! ';

// This test is the same as the previous test, but the observable is
// manipulated to make it look like an interop observable - an observable
// from a foreign library. Interop subscribers are treated differently:
// they are wrapped in a safe subscriber. This test ensures that
// unsubscriptions are chained all the way to the interop subscriber.

const result = e1.pipe(
mergeMap(x => of(x)),
skipUntil(asInteropObservable(skip)),
mergeMap(x => of(x)),
);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
});

it('should skip all elements when notifier is empty', () => {
const e1 = hot('--a--b--c--d--e--|');
const e1subs = '^ !';
@@ -248,7 +274,6 @@ describe('skipUntil', () => {
});

it('should stop listening to a synchronous notifier after its first nexted value', () => {
// const source = hot('-^-o---o---o---o---o---o---|');
const sideEffects: number[] = [];
const synchronousNotifer = concat(
defer(() => {
31 changes: 31 additions & 0 deletions spec/operators/switchMap-spec.ts
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { switchMap, mergeMap, map, takeWhile } from 'rxjs/operators';
import { concat, defer, of, Observable } from 'rxjs';
import { asInteropObservable } from '../helpers/interop-helper';

declare function asDiagram(arg: string): Function;

@@ -169,6 +170,36 @@ describe('switchMap', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not break unsubscription chains with interop inners when result is unsubscribed explicitly', () => {
const x = cold( '--a--b--c--d--e--| ');
const xsubs = ' ^ ! ';
const y = cold( '---f---g---h---i--|');
const ysubs = ' ^ ! ';
const e1 = hot('---------x---------y---------| ');
const e1subs = '^ ! ';
const expected = '-----------a--b--c---- ';
const unsub = ' ! ';

const observableLookup = { x: x, y: y };

// This test is the same as the previous test, but the observable is
// manipulated to make it look like an interop observable - an observable
// from a foreign library. Interop subscribers are treated differently:
// they are wrapped in a safe subscriber. This test ensures that
// unsubscriptions are chained all the way to the interop subscriber.

const result = e1.pipe(
mergeMap(x => of(x)),
switchMap(value => asInteropObservable(observableLookup[value])),
mergeMap(x => of(x)),
);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = concat(
5 changes: 4 additions & 1 deletion spec/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
{
"extends": "../tsconfig.json"
"extends": "../tsconfig.json",
"compilerOptions": {
"lib": ["esnext", "dom"]
}
}
2 changes: 1 addition & 1 deletion spec/util/toSubscriber-spec.ts
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ describe('toSubscriber', () => {
expect(sub2.closed).to.be.true;
});

it('should not be closed when other subscriber created with same observer instance completes', () => {
it('should not be closed when other subscriber created with same observer instance completes', () => {
let observer = {
next: function () { /*noop*/ }
};
8 changes: 7 additions & 1 deletion src/internal/operators/catchError.ts
Original file line number Diff line number Diff line change
@@ -137,7 +137,13 @@ class CatchSubscriber<T, R> extends OuterSubscriber<T, T | R> {
this._unsubscribeAndRecycle();
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
subscribeToResult(this, result, undefined, undefined, innerSubscriber);
const innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber);
// The returned subscription will usually be the subscriber that was
// passed. However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
if (innerSubscription !== innerSubscriber) {
this.add(innerSubscription);
}
}
}
}
10 changes: 8 additions & 2 deletions src/internal/operators/exhaustMap.ts
Original file line number Diff line number Diff line change
@@ -122,10 +122,16 @@ class ExhaustMapSubscriber<T, R> extends OuterSubscriber<T, R> {
}

private _innerSub(result: ObservableInput<R>, value: T, index: number): void {
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
const innerSubscriber = new InnerSubscriber(this, value, index);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
subscribeToResult<T, R>(this, result, value, index, innerSubscriber);
const innerSubscription = subscribeToResult<T, R>(this, result, undefined, undefined, innerSubscriber);
// The returned subscription will usually be the subscriber that was
// passed. However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
if (innerSubscription !== innerSubscriber) {
destination.add(innerSubscription);
}
}

protected _complete(): void {
10 changes: 8 additions & 2 deletions src/internal/operators/mergeMap.ts
Original file line number Diff line number Diff line change
@@ -142,10 +142,16 @@ export class MergeMapSubscriber<T, R> extends OuterSubscriber<T, R> {
}

private _innerSub(ish: ObservableInput<R>, value: T, index: number): void {
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
const innerSubscriber = new InnerSubscriber(this, value, index);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
subscribeToResult<T, R>(this, ish, value, index, innerSubscriber);
const innerSubscription = subscribeToResult<T, R>(this, ish, undefined, undefined, innerSubscriber);
// The returned subscription will usually be the subscriber that was
// passed. However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
if (innerSubscription !== innerSubscriber) {
destination.add(innerSubscription);
}
}

protected _complete(): void {
10 changes: 8 additions & 2 deletions src/internal/operators/mergeScan.ts
Original file line number Diff line number Diff line change
@@ -103,10 +103,16 @@ export class MergeScanSubscriber<T, R> extends OuterSubscriber<T, R> {
}

private _innerSub(ish: any, value: T, index: number): void {
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
const innerSubscriber = new InnerSubscriber(this, value, index);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
subscribeToResult<T, R>(this, ish, value, index, innerSubscriber);
const innerSubscription = subscribeToResult<T, R>(this, ish, undefined, undefined, innerSubscriber);
// The returned subscription will usually be the subscriber that was
// passed. However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
if (innerSubscription !== innerSubscriber) {
destination.add(innerSubscription);
}
}

protected _complete(): void {
8 changes: 7 additions & 1 deletion src/internal/operators/onErrorResumeNext.ts
Original file line number Diff line number Diff line change
@@ -162,7 +162,13 @@ class OnErrorResumeNextSubscriber<T, R> extends OuterSubscriber<T, R> {
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
subscribeToResult(this, next, undefined, undefined, innerSubscriber);
const innerSubscription = subscribeToResult(this, next, undefined, undefined, innerSubscriber);
// The returned subscription will usually be the subscriber that was
// passed. However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
if (innerSubscription !== innerSubscriber) {
destination.add(innerSubscription);
}
} else {
this.destination.complete();
}
9 changes: 8 additions & 1 deletion src/internal/operators/skipUntil.ts
Original file line number Diff line number Diff line change
@@ -74,7 +74,14 @@ class SkipUntilSubscriber<T, R> extends OuterSubscriber<T, R> {
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
this.innerSubscription = innerSubscriber;
subscribeToResult(this, notifier, undefined, undefined, innerSubscriber);
const innerSubscription = subscribeToResult(this, notifier, undefined, undefined, innerSubscriber);
// The returned subscription will usually be the subscriber that was
// passed. However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
if (innerSubscription !== innerSubscriber) {
this.add(innerSubscription);
this.innerSubscription = innerSubscription;
}
}

protected _next(value: T) {
10 changes: 8 additions & 2 deletions src/internal/operators/switchMap.ts
Original file line number Diff line number Diff line change
@@ -133,10 +133,16 @@ class SwitchMapSubscriber<T, R> extends OuterSubscriber<T, R> {
if (innerSubscription) {
innerSubscription.unsubscribe();
}
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
const innerSubscriber = new InnerSubscriber(this, value, index);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
this.innerSubscription = subscribeToResult(this, result, value, index, innerSubscriber);
this.innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber);
// The returned subscription will usually be the subscriber that was
// passed. However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
if (this.innerSubscription !== innerSubscriber) {
destination.add(this.innerSubscription);
}
}

protected _complete(): void {
24 changes: 16 additions & 8 deletions src/internal/util/subscribeToResult.ts
Original file line number Diff line number Diff line change
@@ -5,25 +5,33 @@ import { Subscriber } from '../Subscriber';
import { subscribeTo } from './subscribeTo';
import { Observable } from '../Observable';

export function subscribeToResult<T, R>(
outerSubscriber: OuterSubscriber<T, R>,
result: any,
outerValue: undefined,
outerIndex: undefined,
innerSubscriber: InnerSubscriber<T, R>
): Subscription | undefined;

export function subscribeToResult<T, R>(
outerSubscriber: OuterSubscriber<T, R>,
result: any,
outerValue?: T,
outerIndex?: number,
destination?: Subscriber<any>
): Subscription;
outerIndex?: number
): Subscription | undefined;

export function subscribeToResult<T, R>(
outerSubscriber: OuterSubscriber<T, R>,
result: any,
outerValue?: T,
outerIndex?: number,
destination: Subscriber<any> = new InnerSubscriber(outerSubscriber, outerValue, outerIndex)
): Subscription | void {
if (destination.closed) {
innerSubscriber: Subscriber<R> = new InnerSubscriber(outerSubscriber, outerValue, outerIndex)
): Subscription | undefined {
if (innerSubscriber.closed) {
return undefined;
}
if (result instanceof Observable) {
return result.subscribe(destination);
return result.subscribe(innerSubscriber);
}
return subscribeTo(result)(destination);
return subscribeTo(result)(innerSubscriber) as Subscription;
}

0 comments on commit cbc7721

Please sign in to comment.