diff --git a/spec/operators/mergeAll-spec.ts b/spec/operators/mergeAll-spec.ts index bb3ab73e5e..ca28f8e846 100644 --- a/spec/operators/mergeAll-spec.ts +++ b/spec/operators/mergeAll-spec.ts @@ -1,5 +1,5 @@ import { expect } from 'chai'; -import { mergeAll, mergeMap } from 'rxjs/operators'; +import { mergeAll, mergeMap, take } from 'rxjs/operators'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { throwError, from, of, Observable } from 'rxjs'; @@ -413,6 +413,36 @@ describe('mergeAll oeprator', () => { () => { done(new Error('should not be called')); }); }); + it('should finalize generators when merged if the subscription ends', () => { + const iterable = { + finalized: false, + next() { + return {value: 'duck', done: false}; + }, + return() { + this.finalized = true; + }, + [Symbol.iterator]() { + return this; + } + }; + + const results: string[] = []; + + const iterableObservable = from(iterable as any); + of(iterableObservable).pipe( + mergeAll(), + take(3) + ).subscribe( + x => results.push(x), + null, + () => results.push('GOOSE!') + ); + + expect(results).to.deep.equal(['duck', 'duck', 'duck', 'GOOSE!']); + expect(iterable.finalized).to.be.true; + }); + type(() => { /* tslint:disable:no-unused-variable */ const source1 = of(1, 2, 3); diff --git a/src/internal/operators/mergeMap.ts b/src/internal/operators/mergeMap.ts index 336a484161..2c16de0a53 100644 --- a/src/internal/operators/mergeMap.ts +++ b/src/internal/operators/mergeMap.ts @@ -139,7 +139,9 @@ export class MergeMapSubscriber extends OuterSubscriber { } private _innerSub(ish: ObservableInput, value: T, index: number): void { - this.add(subscribeToResult(this, ish, value, index)); + const innerSubscriber = new InnerSubscriber(this, undefined, undefined); + this.add(innerSubscriber); + subscribeToResult(this, ish, value, index, innerSubscriber); } protected _complete(): void { diff --git a/src/internal/util/subscribeToResult.ts b/src/internal/util/subscribeToResult.ts index ba977b2ad9..4a8df42117 100644 --- a/src/internal/util/subscribeToResult.ts +++ b/src/internal/util/subscribeToResult.ts @@ -1,19 +1,26 @@ - import { ObservableInput } from '../types'; import { Subscription } from '../Subscription'; import { InnerSubscriber } from '../InnerSubscriber'; import { OuterSubscriber } from '../OuterSubscriber'; +import { Subscriber } from '../Subscriber'; import { subscribeTo } from './subscribeTo'; -export function subscribeToResult(outerSubscriber: OuterSubscriber, - result: any, - outerValue?: T, - outerIndex?: number): Subscription; -export function subscribeToResult(outerSubscriber: OuterSubscriber, - result: ObservableInput, - outerValue?: T, - outerIndex?: number): Subscription | void { - const destination = new InnerSubscriber(outerSubscriber, outerValue, outerIndex); - +export function subscribeToResult( + outerSubscriber: OuterSubscriber, + result: any, + outerValue?: T, + outerIndex?: number, + destination?: Subscriber +): Subscription; +export function subscribeToResult( + outerSubscriber: OuterSubscriber, + result: any, + outerValue?: T, + outerIndex?: number, + destination: Subscriber = new InnerSubscriber(outerSubscriber, outerValue, outerIndex) +): Subscription | void { + if (destination.closed) { + return; + } return subscribeTo(result)(destination); }