Skip to content

Commit

Permalink
fix: chain subscriptions for interop with finalize (#5239)
Browse files Browse the repository at this point in the history
* test: add failing test for #5237

* fix: check for interop subscriptions

Closes #5237

* refactor: use subscribeWith util
  • Loading branch information
cartant authored Apr 29, 2020
1 parent 4d73d1c commit 04ba662
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 5 deletions.
23 changes: 20 additions & 3 deletions spec/helpers/interop-helper.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Observable, Subject, Subscriber, Subscription } from 'rxjs';
import { Observable, Operator, Subject, Subscriber, Subscription } from 'rxjs';
import { rxSubscriber as symbolSubscriber } from 'rxjs/internal/symbol/rxSubscriber';

/**
Expand All @@ -10,16 +10,21 @@ import { rxSubscriber as symbolSubscriber } from 'rxjs/internal/symbol/rxSubscri
export function asInteropObservable<T>(observable: Observable<T>): Observable<T> {
return new Proxy(observable, {
get(target: Observable<T>, key: string | number | symbol) {
if (key === 'lift') {
const { lift } = target;
return interopLift(lift);
}
if (key === 'subscribe') {
const { subscribe } = target;
return interopSubscribe(subscribe);
}
return Reflect.get(target, key);
},
getPrototypeOf(target: Observable<T>) {
const { subscribe, ...rest } = Object.getPrototypeOf(target);
const { lift, subscribe, ...rest } = Object.getPrototypeOf(target);
return {
...rest,
lift: interopLift(lift),
subscribe: interopSubscribe(subscribe)
};
}
Expand Down Expand Up @@ -55,6 +60,18 @@ export function asInteropSubscriber<T>(subscriber: Subscriber<T>): Subscriber<T>
});
}

function interopLift<T, R>(lift: (operator: Operator<T, R>) => Observable<R>) {
return function (this: Observable<T>, operator: Operator<T, R>): Observable<R> {
const observable = lift.call(this, operator);
const { call } = observable.operator!;
observable.operator!.call = function (this: Operator<T, R>, subscriber: Subscriber<R>, source: any) {
return call.call(this, asInteropSubscriber(subscriber), source);
};
observable.source = asInteropObservable(observable.source!);
return asInteropObservable(observable);
};
}

function interopSubscribe<T>(subscribe: (...args: any[]) => Subscription) {
return function (this: Observable<T>, ...args: any[]): Subscription {
const [arg] = args;
Expand All @@ -63,4 +80,4 @@ function interopSubscribe<T>(subscribe: (...args: any[]) => Subscription) {
}
return subscribe.apply(this, args);
};
}
}
13 changes: 12 additions & 1 deletion spec/operators/finalize-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { expect } from 'chai';
import { finalize, map, share } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { of, timer, interval } from 'rxjs';
import { of, timer, interval, NEVER } from 'rxjs';
import { asInteropObservable } from '../helpers/interop-helper';

declare const type: Function;

Expand Down Expand Up @@ -161,4 +162,14 @@ describe('finalize operator', () => {
rxTestScheduler.flush();
expect(executed).to.be.true;
});

it('should handle interop source observables', () => {
// https://github.com/ReactiveX/rxjs/issues/5237
let finalized = false;
const subscription = asInteropObservable(NEVER).pipe(
finalize(() => finalized = true)
).subscribe();
subscription.unsubscribe();
expect(finalized).to.be.true;
});
});
3 changes: 2 additions & 1 deletion src/internal/operators/finalize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { Observable } from '../Observable';
import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
import { subscribeWith } from '../util/subscribeWith';

/**
* Returns an Observable that mirrors the source Observable, but will call a specified function when
Expand Down Expand Up @@ -68,7 +69,7 @@ class FinallyOperator<T> implements Operator<T, T> {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new FinallySubscriber(subscriber, this.callback));
return subscribeWith(source, new FinallySubscriber(subscriber, this.callback));
}
}

Expand Down

0 comments on commit 04ba662

Please sign in to comment.