Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(mergeMap/mergeScan): unify logic #5754

Merged
merged 3 commits into from
Sep 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions src/internal/operators/mergeInternals.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/** @prettier */
import { Observable } from '../Observable';
import { from } from '../observable/from';
import { Subscriber } from '../Subscriber';
import { ObservableInput } from '../types';
import { OperatorSubscriber } from './OperatorSubscriber';

/**
* A process embodying the general "merge" strategy. This is used in
* `mergeMap` and `mergeScan` because the logic is otherwise nearly identical.
* @param source The original source observable
* @param subscriber The consumer subscriber
* @param project The projection function to get our inner sources
* @param concurrent The number of concurrent inner subscriptions
* @param onBeforeNext Additional logic to apply before nexting to our consumer
* @param onBeforeComplete Additional logic to apply before telling the consumer we're complete.
*/
export function mergeInternals<T, R>(
source: Observable<T>,
subscriber: Subscriber<R>,
project: (value: T, index: number) => ObservableInput<R>,
concurrent: number,
onBeforeNext?: (innerValue: R) => void,
onBeforeComplete?: () => void
) {
// Buffered values, in the event of going over our concurrency limit
let buffer: T[] = [];
// The number of active inner subscriptions.
let active = 0;
// An index to pass to our accumulator function
let index = 0;
// Whether or not the outer source has completed.
let isComplete = false;

/**
* Checks to see if we can complete our result or not.
*/
const checkComplete = () => {
// If the outer has completed, and nothing is left in the buffer,
// and we don't have any active inner subscriptions, then we can
// Emit the state and complete.
if (isComplete && !buffer.length && !active) {
// In the case of `mergeScan`, we need additional handling here.
onBeforeComplete?.();
subscriber.complete();
}
};

const doInnerSub = (value: T) => {
active++;
from(project(value, index++)).subscribe(
new OperatorSubscriber(
subscriber,
(innerValue) => {
// `mergeScan` has additional handling here. For example
// taking the inner value and updating state.
onBeforeNext?.(innerValue);
subscriber.next(innerValue);
},
// Errors are passed to the destination.
undefined,
() => {
// INNER SOURCE COMPLETE
// Decrement the active count to ensure that the next time
// we try to call `doInnerSub`, the number is accurate.
active--;
// If we have more values in the buffer, try to process those
// Note that this call will increment `active` ahead of the
// next conditional, if there were any more inner subscriptions
// to start.
while (buffer.length && active < concurrent) {
doInnerSub(buffer.shift()!);
}
// Check to see if we can complete, and complete if so.
checkComplete();
}
)
);
};

// Subscribe to our source observable.
source.subscribe(
new OperatorSubscriber(
subscriber,
// If we're under our concurrency limit, just start the inner subscription, otherwise buffer and wait.
(value) => (active < concurrent ? doInnerSub(value) : buffer.push(value)),
// Errors are passed through
undefined,
() => {
// Outer completed, make a note of it, and check to see if we can complete everything.
isComplete = true;
checkComplete();
}
)
);

// Additional teardown (for when the destination is torn down).
// Other teardown is added implicitly via subscription above.
return () => {
// Ensure buffered values are released.
buffer = null!;
};
}
97 changes: 2 additions & 95 deletions src/internal/operators/mergeMap.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
/** @prettier */
import { Observable } from '../Observable';
import { Subscription } from '../Subscription';
import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types';
import { map } from './map';
import { from } from '../observable/from';
import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';
import { mergeInternals } from './mergeInternals';

/* tslint:disable:max-line-length */
export function mergeMap<T, O extends ObservableInput<any>>(
Expand Down Expand Up @@ -92,100 +91,8 @@ export function mergeMap<T, R, O extends ObservableInput<any>>(
} else if (typeof resultSelector === 'number') {
concurrent = resultSelector;
}
return operate((source, subscriber) => {
// Whether or not the outer subscription is complete
let isComplete = false;
// The number of active inner subscriptions
let active = 0;
// The index of the value from source (used for projection)
let index = 0;
// The buffered values from the source (used for concurrency)
let buffer: T[] = [];

/**
* Called to check to see if we can complete, and completes the result if
* nothing is active.
*/
const checkComplete = () => isComplete && !active && subscriber.complete();

/**
* Attempts to start an inner subscription from a buffered value,
* so long as we don't have more active inner subscriptions than
* the concurrency limit allows.
*/
const tryInnerSub = () => {
while (active < concurrent && buffer.length > 0) {
doInnerSub(buffer.shift()!);
}
};

/**
* Creates an inner observable and subscribes to it with the
* given outer value.
* @param value the value to process
*/
const doInnerSub = (value: T) => {
// Subscribe to the inner source
active++;
subscriber.add(
from(project(value, index++)).subscribe(
new OperatorSubscriber(
subscriber,
// INNER SOURCE NEXT
// We got a value from the inner source, emit it from the result.
(innerValue) => subscriber.next(innerValue),
// Errors are sent to the consumer.
undefined,
() => {
// INNER SOURCE COMPLETE
// Decrement the active count to ensure that the next time
// we try to call `doInnerSub`, the number is accurate.
active--;
// If we have more values in the buffer, try to process those
// Note that this call will increment `active` ahead of the
// next conditional, if there were any more inner subscriptions
// to start.
buffer.length && tryInnerSub();
// Check to see if we can complete, and complete if so.
checkComplete();
}
)
)
);
};

let outerSubs: Subscription;
outerSubs = source.subscribe(
new OperatorSubscriber(
subscriber,
// OUTER SOURCE NEXT
// If we are under our concurrency limit, start the inner subscription with the value
// right away. Otherwise, push it onto the buffer and wait.
(value) => (active < concurrent ? doInnerSub(value) : buffer.push(value)),
// Let errors pass through.
undefined,
() => {
// OUTER SOURCE COMPLETE
// We don't necessarily stop here. If have any pending inner subscriptions
// we need to wait for those to be done first. That includes buffered inners
// that we haven't even subscribed to yet.
isComplete = true;
// If nothing is active, and nothing in the buffer, with no hope of getting any more
// we can complete the result
checkComplete();
// Be sure to teardown the outer subscription ASAP, in any case.
outerSubs?.unsubscribe();
}
)
);

// Additional teardown. Called when the destination is torn down.
// Other teardown is registered implicitly above during subscription.
return () => {
// Release buffered values
buffer = null!;
};
});
return operate((source, subscriber) => mergeInternals(source, subscriber, project, concurrent));
}

/**
Expand Down
95 changes: 11 additions & 84 deletions src/internal/operators/mergeScan.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
/** @prettier */
import { ObservableInput, OperatorFunction } from '../types';
import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';
import { from } from '../observable/from';
import { mergeInternals } from './mergeInternals';

/**
* Applies an accumulator function over the source Observable where the
Expand Down Expand Up @@ -48,94 +47,22 @@ export function mergeScan<T, R>(
concurrent = Infinity
): OperatorFunction<T, R> {
return operate((source, subscriber) => {
// Buffered values, in the event of going over our concurrency limit
let buffer: T[] = [];
// The number of active inner subscriptions.
let active = 0;
// Whether or not we have gotten any accumulated state. This is used to
// decide whether or not to emit in the event of an empty result.
let hasState = false;
// The accumulated state.
let state = seed;
// An index to pass to our accumulator function
let index = 0;
// Whether or not the outer source has completed.
let isComplete = false;

/**
* Checks to see if we can complete our result or not.
*/
const checkComplete = () => {
// If the outer has completed, and nothing is left in the buffer,
// and we don't have any active inner subscriptions, then we can
// Emit the state and complete.
if (isComplete && !buffer.length && !active) {
// TODO: This seems like it might result in a double emission, perhaps bad behavior?
// maybe we should change this in an upcoming major?
!hasState && subscriber.next(state);
subscriber.complete();
}
};

const doInnerSub = (value: T) => {
active++;
from(accumulator(state!, value, index++)).subscribe(
new OperatorSubscriber(
subscriber,
(innerValue) => {
hasState = true;
// Intentially terse. Set the state, then emit it.
subscriber.next((state = innerValue));
},
// Errors are passed to the destination.
undefined,

// TODO: Much of this code is duplicated from mergeMap. Perhaps
// look into a way to unify this.

() => {
// INNER SOURCE COMPLETE
// Decrement the active count to ensure that the next time
// we try to call `doInnerSub`, the number is accurate.
active--;
// If we have more values in the buffer, try to process those
// Note that this call will increment `active` ahead of the
// next conditional, if there were any more inner subscriptions
// to start.
buffer.length && tryInnerSub();
// Check to see if we can complete, and complete if so.
checkComplete();
}
)
);
};

const tryInnerSub = () => {
while (buffer.length && active < concurrent) {
doInnerSub(buffer.shift()!);
}
};

source.subscribe(
new OperatorSubscriber(
subscriber,
// If we're under our concurrency limit, just start the inner subscription, otherwise buffer and wait.
(value) => (active < concurrent ? doInnerSub(value) : buffer.push(value)),
// Errors are passed through
undefined,
() => {
// Outer completed, make a note of it, and check to see if we can complete everything.
isComplete = true;
checkComplete();
}
)
return mergeInternals(
source,
subscriber,
(value, index) => accumulator(state, value, index),
concurrent,
(value) => {
hasState = true;
state = value;
},
() => !hasState && subscriber.next(state)
);

// Additional teardown (for when the destination is torn down).
// Other teardown is added implicitly via subscription above.
return () => {
// Ensure buffered values are released.
buffer = null!;
};
});
}