Skip to content

Commit

Permalink
fix(mergeScan): No longer emits state again upon completion. (#5805)
Browse files Browse the repository at this point in the history
* fix(mergeScan): No longer emits state again upon completion.

Fixes an issue were accumulated state would be emitted a second time upon completion.

BREAKING CHANGE: `mergeScan` will no longer emit its inner state again upon completion.

related #5372

* chore(lint): remove unused import

Co-authored-by: Nicholas Jamieson <nicholas@cartant.com>
  • Loading branch information
benlesh and cartant authored Oct 8, 2020
1 parent a331e11 commit 68c2894
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 28 deletions.
21 changes: 9 additions & 12 deletions spec/operators/mergeScan-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,11 @@ describe('mergeScan', () => {
it('should handle an empty projected Observable', () => {
const e1 = hot('--a--^--b--c--d--e--f--g--|');
const e1subs = '^ !';
const expected = '---------------------(x|)';
const expected = '---------------------|';

const values = { x: <string[]>[] };

const source = e1.pipe(mergeScan((acc, x) => EMPTY, []));
const source = e1.pipe(mergeScan(() => EMPTY, []));

expectObservable(source).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
Expand All @@ -223,15 +223,11 @@ describe('mergeScan', () => {
it('handle empty', () => {
const e1 = cold('|');
const e1subs = '(^!)';
const expected = '(u|)';

const values = {
u: <string[]>[]
};
const expected = '|';

const source = e1.pipe(mergeScan((acc, x) => of(acc.concat(x)), [] as string[]));

expectObservable(source).toBe(expected, values);
expectObservable(source).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

Expand Down Expand Up @@ -309,7 +305,7 @@ describe('mergeScan', () => {
it('should emit accumulator if inner completes without value', () => {
const e1 = hot('--a--^--b--c--d--e--f--g--|');
const e1subs = '^ !';
const expected = '---------------------(x|)';
const expected = '---------------------|';

const source = e1.pipe(mergeScan((acc, x) => EMPTY, ['1']));

Expand All @@ -320,13 +316,14 @@ describe('mergeScan', () => {
it('should emit accumulator if inner completes without value after source completes', () => {
const e1 = hot('--a--^--b--c--d--e--f--g--|');
const e1subs = '^ !';
const expected = '---------------------(x|)';
const expected = '-----------------------|';
const inner = cold( '-----|');

const source = e1.pipe(
mergeScan((acc, x) => EMPTY.pipe(delay(50, rxTestScheduler)), ['1'])
mergeScan(() => inner, '1')
);

expectObservable(source).toBe(expected, {x: ['1']});
expectObservable(source).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

Expand Down
3 changes: 1 addition & 2 deletions src/internal/operators/expand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ export function expand<T, R>(
project,
concurrent,

// These unused handlers are for `xScan`-type operators
undefined,
// onBeforeNext
undefined,

// Expand-specific
Expand Down
9 changes: 3 additions & 6 deletions src/internal/operators/mergeInternals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import { OperatorSubscriber } from './OperatorSubscriber';
* @param project The projection function to get our inner sources
* @param concurrent The number of concurrent inner subscriptions
* @param onBeforeNext Additional logic to apply before nexting to our consumer
* @param onBeforeComplete Additional logic to apply before telling the consumer
* we're complete.
* @param expand If `true` this will perform an "expand" strategy, which differs only
* in that it recurses, and the inner subscription must be schedule-able.
* @param innerSubScheduler A scheduler to use to schedule inner subscriptions,
Expand All @@ -26,9 +24,9 @@ export function mergeInternals<T, R>(
project: (value: T, index: number) => ObservableInput<R>,
concurrent: number,
onBeforeNext?: (innerValue: R) => void,
onBeforeComplete?: () => void,
expand?: boolean,
innerSubScheduler?: SchedulerLike
innerSubScheduler?: SchedulerLike,
additionalTeardown?: () => void
) {
// Buffered values, in the event of going over our concurrency limit
let buffer: T[] = [];
Expand All @@ -47,8 +45,6 @@ export function mergeInternals<T, R>(
// and we don't have any active inner subscriptions, then we can
// Emit the state and complete.
if (isComplete && !buffer.length && !active) {
// In the case of `mergeScan`, we need additional handling here.
onBeforeComplete?.();
subscriber.complete();
}
};
Expand Down Expand Up @@ -129,5 +125,6 @@ export function mergeInternals<T, R>(
return () => {
// Ensure buffered values are released.
buffer = null!;
additionalTeardown?.();
};
}
4 changes: 1 addition & 3 deletions src/internal/operators/mergeMap.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/** @prettier */
import { Observable } from '../Observable';
import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types';
import { map } from './map';
import { innerFrom } from '../observable/from';
Expand Down Expand Up @@ -87,8 +86,7 @@ export function mergeMap<T, R, O extends ObservableInput<any>>(
): OperatorFunction<T, ObservedValueOf<O> | R> {
if (isFunction(resultSelector)) {
// DEPRECATED PATH
return (source: Observable<T>) =>
source.pipe(mergeMap((a, i) => innerFrom(project(a, i)).pipe(map((b: any, ii: number) => resultSelector(a, b, i, ii))), concurrent));
return mergeMap((a, i) => map((b: any, ii: number) => resultSelector(a, b, i, ii))(innerFrom(project(a, i))), concurrent);
} else if (typeof resultSelector === 'number') {
concurrent = resultSelector;
}
Expand Down
8 changes: 3 additions & 5 deletions src/internal/operators/mergeScan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ export function mergeScan<T, R>(
concurrent = Infinity
): OperatorFunction<T, R> {
return operate((source, subscriber) => {
// Whether or not we have gotten any accumulated state. This is used to
// decide whether or not to emit in the event of an empty result.
let hasState = false;
// The accumulated state.
let state = seed;

Expand All @@ -62,10 +59,11 @@ export function mergeScan<T, R>(
(value, index) => accumulator(state, value, index),
concurrent,
(value) => {
hasState = true;
state = value;
},
() => !hasState && subscriber.next(state)
false,
undefined,
() => (state = null!)
);
});
}

0 comments on commit 68c2894

Please sign in to comment.