From f5970bf48d82d7eb28635a814b2bd44e04d64412 Mon Sep 17 00:00:00 2001 From: Mateusz Podlasin Date: Sun, 19 Mar 2017 15:27:42 +0100 Subject: [PATCH] fix(mergeAll): add source subscription to composite before actually subscribing Add subscriptions for source Observables to mergeAll composite subscription before actually subscribing to any of these Observables, so that if source Observable emits synchronously and consumer of mergeAll unsubscribes at that moment (for example `take` operator), subscription to source is unsubscribed as well and Observable stops emitting. Closes #2476 --- spec/operators/mergeAll-spec.ts | 32 +++++++++++++++++++++++++- src/internal/operators/mergeMap.ts | 4 +++- src/internal/util/subscribeToResult.ts | 29 ++++++++++++++--------- 3 files changed, 52 insertions(+), 13 deletions(-) diff --git a/spec/operators/mergeAll-spec.ts b/spec/operators/mergeAll-spec.ts index bb3ab73e5e..ca28f8e846 100644 --- a/spec/operators/mergeAll-spec.ts +++ b/spec/operators/mergeAll-spec.ts @@ -1,5 +1,5 @@ import { expect } from 'chai'; -import { mergeAll, mergeMap } from 'rxjs/operators'; +import { mergeAll, mergeMap, take } from 'rxjs/operators'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { throwError, from, of, Observable } from 'rxjs'; @@ -413,6 +413,36 @@ describe('mergeAll oeprator', () => { () => { done(new Error('should not be called')); }); }); + it('should finalize generators when merged if the subscription ends', () => { + const iterable = { + finalized: false, + next() { + return {value: 'duck', done: false}; + }, + return() { + this.finalized = true; + }, + [Symbol.iterator]() { + return this; + } + }; + + const results: string[] = []; + + const iterableObservable = from(iterable as any); + of(iterableObservable).pipe( + mergeAll(), + take(3) + ).subscribe( + x => results.push(x), + null, + () => results.push('GOOSE!') + ); + + expect(results).to.deep.equal(['duck', 'duck', 'duck', 'GOOSE!']); + expect(iterable.finalized).to.be.true; + }); + type(() => { /* tslint:disable:no-unused-variable */ const source1 = of(1, 2, 3); diff --git a/src/internal/operators/mergeMap.ts b/src/internal/operators/mergeMap.ts index 336a484161..2c16de0a53 100644 --- a/src/internal/operators/mergeMap.ts +++ b/src/internal/operators/mergeMap.ts @@ -139,7 +139,9 @@ export class MergeMapSubscriber extends OuterSubscriber { } private _innerSub(ish: ObservableInput, value: T, index: number): void { - this.add(subscribeToResult(this, ish, value, index)); + const innerSubscriber = new InnerSubscriber(this, undefined, undefined); + this.add(innerSubscriber); + subscribeToResult(this, ish, value, index, innerSubscriber); } protected _complete(): void { diff --git a/src/internal/util/subscribeToResult.ts b/src/internal/util/subscribeToResult.ts index ba977b2ad9..4a8df42117 100644 --- a/src/internal/util/subscribeToResult.ts +++ b/src/internal/util/subscribeToResult.ts @@ -1,19 +1,26 @@ - import { ObservableInput } from '../types'; import { Subscription } from '../Subscription'; import { InnerSubscriber } from '../InnerSubscriber'; import { OuterSubscriber } from '../OuterSubscriber'; +import { Subscriber } from '../Subscriber'; import { subscribeTo } from './subscribeTo'; -export function subscribeToResult(outerSubscriber: OuterSubscriber, - result: any, - outerValue?: T, - outerIndex?: number): Subscription; -export function subscribeToResult(outerSubscriber: OuterSubscriber, - result: ObservableInput, - outerValue?: T, - outerIndex?: number): Subscription | void { - const destination = new InnerSubscriber(outerSubscriber, outerValue, outerIndex); - +export function subscribeToResult( + outerSubscriber: OuterSubscriber, + result: any, + outerValue?: T, + outerIndex?: number, + destination?: Subscriber +): Subscription; +export function subscribeToResult( + outerSubscriber: OuterSubscriber, + result: any, + outerValue?: T, + outerIndex?: number, + destination: Subscriber = new InnerSubscriber(outerSubscriber, outerValue, outerIndex) +): Subscription | void { + if (destination.closed) { + return; + } return subscribeTo(result)(destination); }