Skip to content

Commit

Permalink
fix(mergeAll): add source subscription to composite before actually s…
Browse files Browse the repository at this point in the history
…ubscribing (#2479)

Add subscriptions for source Observables to mergeAll composite subscription
before actually subscribing to any of these Observables, so that if
source Observable emits synchronously and consumer of mergeAll unsubscribes
at that moment (for example `take` operator), subscription to source is
unsubscribed as well and Observable stops emitting.

Closes #2476
  • Loading branch information
mpodlasin authored and benlesh committed Jul 26, 2018
1 parent 0979d31 commit 40852ff
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 13 deletions.
32 changes: 31 additions & 1 deletion spec/operators/mergeAll-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { expect } from 'chai';
import { mergeAll, mergeMap } from 'rxjs/operators';
import { mergeAll, mergeMap, take } from 'rxjs/operators';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { throwError, from, of, Observable } from 'rxjs';

Expand Down Expand Up @@ -413,6 +413,36 @@ describe('mergeAll oeprator', () => {
() => { done(new Error('should not be called')); });
});

it('should finalize generators when merged if the subscription ends', () => {
const iterable = {
finalized: false,
next() {
return {value: 'duck', done: false};
},
return() {
this.finalized = true;
},
[Symbol.iterator]() {
return this;
}
};

const results: string[] = [];

const iterableObservable = from<string>(iterable as any);
of(iterableObservable).pipe(
mergeAll(),
take(3)
).subscribe(
x => results.push(x),
null,
() => results.push('GOOSE!')
);

expect(results).to.deep.equal(['duck', 'duck', 'duck', 'GOOSE!']);
expect(iterable.finalized).to.be.true;
});

type(() => {
/* tslint:disable:no-unused-variable */
const source1 = of(1, 2, 3);
Expand Down
4 changes: 3 additions & 1 deletion src/internal/operators/mergeMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ export class MergeMapSubscriber<T, R> extends OuterSubscriber<T, R> {
}

private _innerSub(ish: ObservableInput<R>, value: T, index: number): void {
this.add(subscribeToResult<T, R>(this, ish, value, index));
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
subscribeToResult<T, R>(this, ish, value, index, innerSubscriber);
}

protected _complete(): void {
Expand Down
29 changes: 18 additions & 11 deletions src/internal/util/subscribeToResult.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@

import { ObservableInput } from '../types';
import { Subscription } from '../Subscription';
import { InnerSubscriber } from '../InnerSubscriber';
import { OuterSubscriber } from '../OuterSubscriber';
import { Subscriber } from '../Subscriber';
import { subscribeTo } from './subscribeTo';

export function subscribeToResult<T, R>(outerSubscriber: OuterSubscriber<T, R>,
result: any,
outerValue?: T,
outerIndex?: number): Subscription;
export function subscribeToResult<T>(outerSubscriber: OuterSubscriber<any, any>,
result: ObservableInput<T>,
outerValue?: T,
outerIndex?: number): Subscription | void {
const destination = new InnerSubscriber(outerSubscriber, outerValue, outerIndex);

export function subscribeToResult<T, R>(
outerSubscriber: OuterSubscriber<T, R>,
result: any,
outerValue?: T,
outerIndex?: number,
destination?: Subscriber<any>
): Subscription;
export function subscribeToResult<T, R>(
outerSubscriber: OuterSubscriber<T, R>,
result: any,
outerValue?: T,
outerIndex?: number,
destination: Subscriber<any> = new InnerSubscriber(outerSubscriber, outerValue, outerIndex)
): Subscription | void {
if (destination.closed) {
return;
}
return subscribeTo(result)(destination);
}

0 comments on commit 40852ff

Please sign in to comment.