Skip to content

Commit

Permalink
fix: concat/merge operators will finalize inners before moving to the…
Browse files Browse the repository at this point in the history
… next (#6010)

Resolves an issue where inner observables would not finalize before the next inner observable got subscribed to. This happened in concat variants, and merge variants with concurrency limits, and could be surprising behavior to some users.

fixes #3338
  • Loading branch information
benlesh authored Feb 10, 2021
1 parent 91b5688 commit 5249a23
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 28 deletions.
38 changes: 36 additions & 2 deletions spec/operators/concatAll-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { from, throwError, of, Observable } from 'rxjs';
import { concatAll, take, mergeMap } from 'rxjs/operators';
import { from, throwError, of, Observable, defer } from 'rxjs';
import { concatAll, take, mergeMap, finalize, delay } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

Expand Down Expand Up @@ -58,6 +58,40 @@ describe('concatAll operator', () => {
);
});

it('should finalize before moving to the next observable', () => {
const results: any[] = [];

const create = (n: number) => defer(() => {
results.push(`init ${n}`);
return of(`next ${n}`).pipe(
delay(100, testScheduler),
finalize(() => {
results.push(`finalized ${n}`)
})
);
});

of(create(1), create(2), create(3)).pipe(
concatAll()
).subscribe({
next: value => results.push(value),
});

testScheduler.flush();

expect(results).to.deep.equal([
'init 1',
'next 1',
'finalized 1',
'init 2',
'next 2',
'finalized 2',
'init 3',
'next 3',
'finalized 3'
]);
});

it('should concat and raise error from promise', function(done) {
this.timeout(2000);

Expand Down
38 changes: 36 additions & 2 deletions spec/operators/concatMap-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { of, from, Observable } from 'rxjs';
import { concatMap, mergeMap, map, take } from 'rxjs/operators';
import { of, from, Observable, defer } from 'rxjs';
import { concatMap, mergeMap, map, take, finalize, delay } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

Expand Down Expand Up @@ -677,6 +677,40 @@ describe('Observable.prototype.concatMap', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});

it('should finalize before moving to the next observable', () => {
const results: any[] = [];

const create = (n: number) => defer(() => {
results.push(`init ${n}`);
return of(`next ${n}`).pipe(
delay(100, testScheduler),
finalize(() => {
results.push(`finalized ${n}`)
})
);
});

of(1, 2, 3).pipe(
concatMap(n => create(n))
).subscribe({
next: value => results.push(value),
});

testScheduler.flush();

expect(results).to.deep.equal([
'init 1',
'next 1',
'finalized 1',
'init 2',
'next 2',
'finalized 2',
'init 3',
'next 3',
'finalized 3'
]);
});

function arrayRepeat(value: string, times: number) {
let results = [];
Expand Down
11 changes: 6 additions & 5 deletions src/internal/operators/OperatorSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ export class OperatorSubscriber<T> extends Subscriber<T> {
* and send to the `destination` error handler.
* @param onComplete Handles completion notification from the subscription. Any errors that occur in
* this handler are sent to the `destination` error handler.
* @param onUnsubscribe Additional teardown logic here. This will only be called on teardown if the
* subscriber itself is not already closed. Called before any additional teardown logic is called.
* @param onFinalize Additional teardown logic here. This will only be called on teardown if the
* subscriber itself is not already closed. This is called after all other teardown logic is executed.
*/
constructor(
destination: Subscriber<any>,
onNext?: (value: T) => void,
onError?: (err: any) => void,
onComplete?: () => void,
private onUnsubscribe?: () => void
private onFinalize?: () => void
) {
// It's important - for performance reasons - that all of this class's
// members are initialized and that they are always initialized in the same
Expand Down Expand Up @@ -73,8 +73,9 @@ export class OperatorSubscriber<T> extends Subscriber<T> {
}

unsubscribe() {
// Execute additional teardown if we have any and we didn't already do so.
!this.closed && this.onUnsubscribe?.();
const { closed } = this;
super.unsubscribe();
// Execute additional teardown if we have any and we didn't already do so.
!closed && this.onFinalize?.();
}
}
59 changes: 40 additions & 19 deletions src/internal/operators/mergeInternals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export function mergeInternals<T, R>(
additionalTeardown?: () => void
) {
// Buffered values, in the event of going over our concurrency limit
let buffer: T[] = [];
const buffer: T[] = [];
// The number of active inner subscriptions.
let active = 0;
// An index to pass to our accumulator function
Expand Down Expand Up @@ -61,6 +61,11 @@ export function mergeInternals<T, R>(
// against our concurrency limit later.
active++;

// A flag used to show that the inner observable completed.
// This is checked during finalization to see if we should
// move to the next item in the buffer, if there is on.
let innerComplete = false;

// Start our inner subscription.
innerFrom(project(value, index++)).subscribe(
new OperatorSubscriber(
Expand All @@ -82,23 +87,41 @@ export function mergeInternals<T, R>(
// Errors are passed to the destination.
undefined,
() => {
// INNER SOURCE COMPLETE
// Decrement the active count to ensure that the next time
// we try to call `doInnerSub`, the number is accurate.
active--;
// If we have more values in the buffer, try to process those
// Note that this call will increment `active` ahead of the
// next conditional, if there were any more inner subscriptions
// to start.
while (buffer.length && active < concurrent) {
const bufferedValue = buffer.shift()!;
// Particularly for `expand`, we need to check to see if a scheduler was provided
// for when we want to start our inner subscription. Otherwise, we just start
// are next inner subscription.
innerSubScheduler ? subscriber.add(innerSubScheduler.schedule(() => doInnerSub(bufferedValue))) : doInnerSub(bufferedValue);
// Flag that we have completed, so we know to check the buffer
// during finalization.
innerComplete = true;
},
() => {
// During finalization, if the inner completed (it wasn't errored or
// cancelled), then we want to try the next item in the buffer if
// there is one.
if (innerComplete) {
// We have to wrap this in a try/catch because it happens during
// finalization, possibly asynchronously, and we want to pass
// any errors that happen (like in a projection function) to
// the outer Subscriber.
try {
// INNER SOURCE COMPLETE
// Decrement the active count to ensure that the next time
// we try to call `doInnerSub`, the number is accurate.
active--;
// If we have more values in the buffer, try to process those
// Note that this call will increment `active` ahead of the
// next conditional, if there were any more inner subscriptions
// to start.
while (buffer.length && active < concurrent) {
const bufferedValue = buffer.shift()!;
// Particularly for `expand`, we need to check to see if a scheduler was provided
// for when we want to start our inner subscription. Otherwise, we just start
// are next inner subscription.
innerSubScheduler ? subscriber.add(innerSubScheduler.schedule(() => doInnerSub(bufferedValue))) : doInnerSub(bufferedValue);
}
// Check to see if we can complete, and complete if so.
checkComplete();
} catch (err) {
subscriber.error(err);
}
}
// Check to see if we can complete, and complete if so.
checkComplete();
}
)
);
Expand All @@ -122,8 +145,6 @@ export function mergeInternals<T, R>(
// Additional teardown (for when the destination is torn down).
// Other teardown is added implicitly via subscription above.
return () => {
// Ensure buffered values are released.
buffer = null!;
additionalTeardown?.();
};
}

0 comments on commit 5249a23

Please sign in to comment.