Skip to content

Commit

Permalink
fix(exhaustMap): stop listening to a synchronous inner-obervable when…
Browse files Browse the repository at this point in the history
… unsubscribed
  • Loading branch information
peaBerberian committed Aug 19, 2018
1 parent 260d52a commit ee1a339
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 7 deletions.
29 changes: 27 additions & 2 deletions spec/operators/exhaustMap-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { Observable, of, from } from 'rxjs';
import { exhaustMap, mergeMap } from 'rxjs/operators';
import { concat, defer, Observable, of, from } from 'rxjs';
import { exhaustMap, mergeMap, takeWhile } from 'rxjs/operators';
import { expect } from 'chai';

declare function asDiagram(arg: string): Function;
Expand Down Expand Up @@ -202,6 +202,31 @@ describe('exhaustMap', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = concat(
defer(() => {
sideEffects.push(1);
return of(1);
}),
defer(() => {
sideEffects.push(2);
return of(2);
}),
defer(() => {
sideEffects.push(3);
return of(3);
})
);

of(null).pipe(
exhaustMap(() => synchronousObservable),
takeWhile((x) => x != 2) // unsubscribe at the second side-effect
).subscribe(() => { /* noop */ });

expect(sideEffects).to.deep.equal([1, 2]);
});

it('should switch inner cold observables, inner never completes', () => {
const x = cold( '--a--b--c--| ');
const xsubs = ' ^ ! ';
Expand Down
17 changes: 12 additions & 5 deletions src/internal/operators/exhaustMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,22 @@ class ExhaustMapSubscriber<T, R> extends OuterSubscriber<T, R> {
}

private tryNext(value: T): void {
let result: ObservableInput<R>;
const index = this.index++;
const destination = this.destination;
try {
const result = this.project(value, index);
this.hasSubscription = true;
this.add(subscribeToResult(this, result, value, index));
result = this.project(value, index);
} catch (err) {
destination.error(err);
this.destination.error(err);
return;
}
this.hasSubscription = true;
this._innerSub(result, value, index);
}

private _innerSub(result: ObservableInput<R>, value: T, index: number): void {
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
subscribeToResult<T, R>(this, result, value, index, innerSubscriber);
}

protected _complete(): void {
Expand Down

0 comments on commit ee1a339

Please sign in to comment.