diff --git a/spec/operators/concatAll-spec.ts b/spec/operators/concatAll-spec.ts index d4ce61dd03..732244deaa 100644 --- a/spec/operators/concatAll-spec.ts +++ b/spec/operators/concatAll-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; -import { from, throwError, of, Observable } from 'rxjs'; -import { concatAll, take, mergeMap } from 'rxjs/operators'; +import { from, throwError, of, Observable, defer } from 'rxjs'; +import { concatAll, take, mergeMap, finalize, delay } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; @@ -58,6 +58,40 @@ describe('concatAll operator', () => { ); }); + it('should finalize before moving to the next observable', () => { + const results: any[] = []; + + const create = (n: number) => defer(() => { + results.push(`init ${n}`); + return of(`next ${n}`).pipe( + delay(100, testScheduler), + finalize(() => { + results.push(`finalized ${n}`) + }) + ); + }); + + of(create(1), create(2), create(3)).pipe( + concatAll() + ).subscribe({ + next: value => results.push(value), + }); + + testScheduler.flush(); + + expect(results).to.deep.equal([ + 'init 1', + 'next 1', + 'finalized 1', + 'init 2', + 'next 2', + 'finalized 2', + 'init 3', + 'next 3', + 'finalized 3' + ]); + }); + it('should concat and raise error from promise', function(done) { this.timeout(2000); diff --git a/spec/operators/concatMap-spec.ts b/spec/operators/concatMap-spec.ts index 8fa3450604..e5ae13055f 100644 --- a/spec/operators/concatMap-spec.ts +++ b/spec/operators/concatMap-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; -import { of, from, Observable } from 'rxjs'; -import { concatMap, mergeMap, map, take } from 'rxjs/operators'; +import { of, from, Observable, defer } from 'rxjs'; +import { concatMap, mergeMap, map, take, finalize, delay } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; @@ -677,6 +677,40 @@ describe('Observable.prototype.concatMap', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); }); + + it('should finalize before moving to the next observable', () => { + const results: any[] = []; + + const create = (n: number) => defer(() => { + results.push(`init ${n}`); + return of(`next ${n}`).pipe( + delay(100, testScheduler), + finalize(() => { + results.push(`finalized ${n}`) + }) + ); + }); + + of(1, 2, 3).pipe( + concatMap(n => create(n)) + ).subscribe({ + next: value => results.push(value), + }); + + testScheduler.flush(); + + expect(results).to.deep.equal([ + 'init 1', + 'next 1', + 'finalized 1', + 'init 2', + 'next 2', + 'finalized 2', + 'init 3', + 'next 3', + 'finalized 3' + ]); + }); function arrayRepeat(value: string, times: number) { let results = []; diff --git a/src/internal/operators/OperatorSubscriber.ts b/src/internal/operators/OperatorSubscriber.ts index 1904c99101..ab75d36754 100644 --- a/src/internal/operators/OperatorSubscriber.ts +++ b/src/internal/operators/OperatorSubscriber.ts @@ -14,15 +14,15 @@ export class OperatorSubscriber extends Subscriber { * and send to the `destination` error handler. * @param onComplete Handles completion notification from the subscription. Any errors that occur in * this handler are sent to the `destination` error handler. - * @param onUnsubscribe Additional teardown logic here. This will only be called on teardown if the - * subscriber itself is not already closed. Called before any additional teardown logic is called. + * @param onFinalize Additional teardown logic here. This will only be called on teardown if the + * subscriber itself is not already closed. This is called after all other teardown logic is executed. */ constructor( destination: Subscriber, onNext?: (value: T) => void, onError?: (err: any) => void, onComplete?: () => void, - private onUnsubscribe?: () => void + private onFinalize?: () => void ) { // It's important - for performance reasons - that all of this class's // members are initialized and that they are always initialized in the same @@ -73,8 +73,9 @@ export class OperatorSubscriber extends Subscriber { } unsubscribe() { - // Execute additional teardown if we have any and we didn't already do so. - !this.closed && this.onUnsubscribe?.(); + const { closed } = this; super.unsubscribe(); + // Execute additional teardown if we have any and we didn't already do so. + !closed && this.onFinalize?.(); } } diff --git a/src/internal/operators/mergeInternals.ts b/src/internal/operators/mergeInternals.ts index 8cba667c5d..b84ad40020 100644 --- a/src/internal/operators/mergeInternals.ts +++ b/src/internal/operators/mergeInternals.ts @@ -28,7 +28,7 @@ export function mergeInternals( additionalTeardown?: () => void ) { // Buffered values, in the event of going over our concurrency limit - let buffer: T[] = []; + const buffer: T[] = []; // The number of active inner subscriptions. let active = 0; // An index to pass to our accumulator function @@ -61,6 +61,11 @@ export function mergeInternals( // against our concurrency limit later. active++; + // A flag used to show that the inner observable completed. + // This is checked during finalization to see if we should + // move to the next item in the buffer, if there is on. + let innerComplete = false; + // Start our inner subscription. innerFrom(project(value, index++)).subscribe( new OperatorSubscriber( @@ -82,23 +87,41 @@ export function mergeInternals( // Errors are passed to the destination. undefined, () => { - // INNER SOURCE COMPLETE - // Decrement the active count to ensure that the next time - // we try to call `doInnerSub`, the number is accurate. - active--; - // If we have more values in the buffer, try to process those - // Note that this call will increment `active` ahead of the - // next conditional, if there were any more inner subscriptions - // to start. - while (buffer.length && active < concurrent) { - const bufferedValue = buffer.shift()!; - // Particularly for `expand`, we need to check to see if a scheduler was provided - // for when we want to start our inner subscription. Otherwise, we just start - // are next inner subscription. - innerSubScheduler ? subscriber.add(innerSubScheduler.schedule(() => doInnerSub(bufferedValue))) : doInnerSub(bufferedValue); + // Flag that we have completed, so we know to check the buffer + // during finalization. + innerComplete = true; + }, + () => { + // During finalization, if the inner completed (it wasn't errored or + // cancelled), then we want to try the next item in the buffer if + // there is one. + if (innerComplete) { + // We have to wrap this in a try/catch because it happens during + // finalization, possibly asynchronously, and we want to pass + // any errors that happen (like in a projection function) to + // the outer Subscriber. + try { + // INNER SOURCE COMPLETE + // Decrement the active count to ensure that the next time + // we try to call `doInnerSub`, the number is accurate. + active--; + // If we have more values in the buffer, try to process those + // Note that this call will increment `active` ahead of the + // next conditional, if there were any more inner subscriptions + // to start. + while (buffer.length && active < concurrent) { + const bufferedValue = buffer.shift()!; + // Particularly for `expand`, we need to check to see if a scheduler was provided + // for when we want to start our inner subscription. Otherwise, we just start + // are next inner subscription. + innerSubScheduler ? subscriber.add(innerSubScheduler.schedule(() => doInnerSub(bufferedValue))) : doInnerSub(bufferedValue); + } + // Check to see if we can complete, and complete if so. + checkComplete(); + } catch (err) { + subscriber.error(err); + } } - // Check to see if we can complete, and complete if so. - checkComplete(); } ) ); @@ -122,8 +145,6 @@ export function mergeInternals( // Additional teardown (for when the destination is torn down). // Other teardown is added implicitly via subscription above. return () => { - // Ensure buffered values are released. - buffer = null!; additionalTeardown?.(); }; }