Skip to content

Commit

Permalink
test: add failing test for ReactiveX#5237
Browse files Browse the repository at this point in the history
  • Loading branch information
cartant committed Apr 23, 2020
1 parent 259e5cd commit b4c9d83
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
23 changes: 20 additions & 3 deletions spec/helpers/interop-helper.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Observable, Subject, Subscriber, Subscription } from 'rxjs';
import { Observable, Operator, Subject, Subscriber, Subscription } from 'rxjs';
import { rxSubscriber as symbolSubscriber } from 'rxjs/internal/symbol/rxSubscriber';

/**
Expand All @@ -10,16 +10,21 @@ import { rxSubscriber as symbolSubscriber } from 'rxjs/internal/symbol/rxSubscri
export function asInteropObservable<T>(observable: Observable<T>): Observable<T> {
return new Proxy(observable, {
get(target: Observable<T>, key: string | number | symbol) {
if (key === 'lift') {
const { lift } = target;
return interopLift(lift);
}
if (key === 'subscribe') {
const { subscribe } = target;
return interopSubscribe(subscribe);
}
return Reflect.get(target, key);
},
getPrototypeOf(target: Observable<T>) {
const { subscribe, ...rest } = Object.getPrototypeOf(target);
const { lift, subscribe, ...rest } = Object.getPrototypeOf(target);
return {
...rest,
lift: interopLift(lift),
subscribe: interopSubscribe(subscribe)
};
}
Expand Down Expand Up @@ -55,6 +60,18 @@ export function asInteropSubscriber<T>(subscriber: Subscriber<T>): Subscriber<T>
});
}

function interopLift<T, R>(lift: (operator: Operator<T, R>) => Observable<R>) {
return function (this: Observable<T>, operator: Operator<T, R>): Observable<R> {
const observable = lift.call(this, operator);
const { call } = observable.operator!;
observable.operator!.call = function (this: Operator<T, R>, subscriber: Subscriber<R>, source: any) {
return call.call(this, asInteropSubscriber(subscriber), source);
};
observable.source = asInteropObservable(observable.source!);
return asInteropObservable(observable);
};
}

function interopSubscribe<T>(subscribe: (...args: any[]) => Subscription) {
return function (this: Observable<T>, ...args: any[]): Subscription {
const [arg] = args;
Expand All @@ -63,4 +80,4 @@ function interopSubscribe<T>(subscribe: (...args: any[]) => Subscription) {
}
return subscribe.apply(this, args);
};
}
}
13 changes: 12 additions & 1 deletion spec/operators/finalize-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { expect } from 'chai';
import { finalize, map, share } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { of, timer, interval } from 'rxjs';
import { of, timer, interval, NEVER } from 'rxjs';
import { asInteropObservable } from '../helpers/interop-helper';

declare const type: Function;

Expand Down Expand Up @@ -161,4 +162,14 @@ describe('finalize operator', () => {
rxTestScheduler.flush();
expect(executed).to.be.true;
});

it('should handle interop source observables', () => {
// https://github.com/ReactiveX/rxjs/issues/5237
let finalized = false;
const subscription = asInteropObservable(NEVER).pipe(
finalize(() => finalized = true)
).subscribe();
subscription.unsubscribe();
expect(finalized).to.be.true;
});
});

0 comments on commit b4c9d83

Please sign in to comment.