Skip to content

Commit

Permalink
fix(scan): proper indexes when seed is not supplied
Browse files Browse the repository at this point in the history
- Smaller implementation
- Improved docs
- Adds comments
- Adds a test
- Fixes weird adjustment in `reduce`.

Closes #4348
Closes #3879
  • Loading branch information
benlesh committed Sep 11, 2020
1 parent 6416935 commit f93fb9c
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 77 deletions.
10 changes: 10 additions & 0 deletions spec/operators/scan-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ describe('scan operator', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should provide the proper index if seed is skipped', () => {
const expected = [1, 2];
of(3, 3, 3).pipe(
scan((_: any, __, i) => {
expect(i).to.equal(expected.shift());
return null;
})
).subscribe();
});

it('should scan with a seed of undefined', () => {
const e1 = hot('--a--^--b--c--d--e--f--g--|');
const e1subs = '^ !';
Expand Down
2 changes: 1 addition & 1 deletion src/internal/operators/reduce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ export function reduce<V, A>(accumulator: (acc: V | A, value: V, index: number)
}
return function reduceOperatorFunction(source: Observable<V>): Observable<V | A> {
return pipe(
scan<V, V | A>((acc, value, index) => accumulator(acc, value, index + 1)),
scan<V, V | A>((acc, value, index) => accumulator(acc, value, index)),
takeLast(1),
)(source);
};
Expand Down
178 changes: 102 additions & 76 deletions src/internal/operators/scan.ts
Original file line number Diff line number Diff line change
@@ -1,111 +1,137 @@
import { Operator } from '../Operator';
/** @prettier */
import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { OperatorFunction, TeardownLogic } from '../types';
import { OperatorFunction } from '../types';
import { lift } from '../util/lift';

/* tslint:disable:max-line-length */
export function scan<V, A = V>(accumulator: (acc: A|V, value: V, index: number) => A): OperatorFunction<V, V|A>;
export function scan<V, A = V>(accumulator: (acc: A | V, value: V, index: number) => A): OperatorFunction<V, V | A>;
export function scan<V, A>(accumulator: (acc: A, value: V, index: number) => A, seed: A): OperatorFunction<V, A>;
export function scan<V, A, S>(accumulator: (acc: A|S, value: V, index: number) => A, seed: S): OperatorFunction<V, A>;
/* tslint:enable:max-line-length */
export function scan<V, A, S>(accumulator: (acc: A | S, value: V, index: number) => A, seed: S): OperatorFunction<V, A>;

// TODO: link to a "redux pattern" section in the guide (location TBD)

/**
* Applies an accumulator function over the source Observable, and returns each
* intermediate result, with an optional seed value.
* Useful for encapsulating and managing state. Applies an accumulator (or "reducer function")
* to each value from the source after an initial state is established -- either via
* a `seed` value (second argument), or from the first value from the source.
*
* <span class="informal">It's like {@link reduce}, but emits the current
* accumulation whenever the source emits a value.</span>
* accumulation state after each update</span>
*
* ![](scan.png)
*
* Combines together all values emitted on the source, using an accumulator
* function that knows how to join a new source value into the accumulation from
* the past. Is similar to {@link reduce}, but emits the intermediate
* accumulations.
* This operator maintains an internal state and emits it after processing each value as follows:
*
* 1. First value arrives
* - If a `seed` value was supplied (as the second argument to `scan`), let `state = seed` and `value = firstValue`.
* - If NO `seed` value was supplied (no second argument), let `state = firstValue` and go to 3.
* 2. Let `state = accumulator(state, value)`.
* - If an error is thrown by `accumulator`, notify the consumer of an error. The process ends.
* 3. Emit `state`.
* 4. Next value arrives, let `value = nextValue`, go to 2.
*
* ## Example
*
* An average of previous numbers. This example shows how
* not providing a `seed` can prime the stream with the
* first value from the source.
*
* ```ts
* import { interval } from 'rxjs';
* import { scan, map } from 'rxjs/operators';
*
* Returns an Observable that applies a specified `accumulator` function to each
* item emitted by the source Observable. If a `seed` value is specified, then
* that value will be used as the initial value for the accumulator. If no seed
* value is specified, the first item of the source is used as the seed.
* numbers$
* .pipe(
* // Get the sum of the numbers coming in.
* scan((total, n) => total + n),
* // Get the average by dividing the sum by the total number
* // received so var (which is 1 more than the zero-based index).
* map((sum, index) => sum / (index + 1))
* )
* .subscribe(console.log);
* ```
*
* ## Example
* Count the number of click events
*
* The Fibonacci sequence. This example shows how you can use
* a seed to prime accumulation process. Also... you know... Fibinacci.
* So important to like, computers and stuff that its whiteboarded
* in job interviews. Now you can show them the Rx version! (Please don't, haha)
*
* ```ts
* import { fromEvent } from 'rxjs';
* import { scan, mapTo } from 'rxjs/operators';
*
* const clicks = fromEvent(document, 'click');
* const ones = clicks.pipe(mapTo(1));
* const seed = 0;
* const count = ones.pipe(scan((acc, one) => acc + one, seed));
* count.subscribe(x => console.log(x));
* import { interval } from 'rxjs';
* import { scan, map, startWith } from 'rxjs/operators';
*
* const firstTwoFibs = [0, 1];
* // An endless stream of Fibonnaci numbers.
* const fibonnaci$ = interval(1000).pipe(
* // Scan to get the fibonnaci numbers (after 0, 1)
* scan(([a, b]) => [b, a + b], firstTwoFibs),
* // Get the second number in the tuple, it's the one you calculated
* map(([, n]) => n),
* // Start with our first two digits :)
* startWith(...firstTwoFibs)
* );
*
* fibonnaci$.subscribe(console.log);
* ```
*
*
* @see {@link expand}
* @see {@link mergeScan}
* @see {@link reduce}
*
* @param {function(acc: A, value: V, index: number): A} accumulator
* The accumulator function called on each source value.
* @param {V|A} [seed] The initial accumulation value.
* @return {Observable<A>} An observable of the accumulated values.
* @name scan
* @param accumulator A "reducer function". This will be called for each value after an initial state is
* acquired.
* @param seed The initial state. If this is not provided, the first value from the source will
* be used as the initial state, and emitted without going through the accumulator. All subsequent values
* will be processed by the accumulator function. If this is provided, all values will go through
* the accumulator function.
*/
export function scan<V, A, S>(accumulator: (acc: V|A|S, value: V, index: number) => A, seed?: S): OperatorFunction<V, V|A> {
let hasSeed = false;
export function scan<V, A, S>(accumulator: (acc: V | A | S, value: V, index: number) => A, seed?: S): OperatorFunction<V, V | A> {
// providing a seed of `undefined` *should* be valid and trigger
// hasSeed! so don't use `seed !== undefined` checks!
// For this reason, we have to check it here at the original call site
// otherwise inside Operator/Subscriber we won't know if `undefined`
// means they didn't provide anything or if they literally provided `undefined`
if (arguments.length >= 2) {
hasSeed = true;
}
const hasSeed = arguments.length >= 2;

return function scanOperatorFunction(source: Observable<V>) {
return lift(source, new ScanOperator(accumulator, seed, hasSeed));
return (source: Observable<V>) => {
return lift(source, function (this: Subscriber<any>, source: Observable<V>) {
const subscriber = this;
let hasState = hasSeed;
let state: any = hasSeed ? seed! : null!;
let index = 0;
source.subscribe(
new ScanSubscriber(subscriber, (value) => {
const i = index++;
if (!hasState) {
// If a seed was not passed, we use the first value from the source
// as the initial state. That means we also pass it through, and the
// accumulator (reducer) does not get executed.
hasState = true;
state = value;
} else {
// Otherwise, if we have a seed, or we already have state, we try
// to execute the accumulator, and we handle the error appropriately.
try {
state = accumulator(state, value, i);
} catch (err) {
// An error occurred in the user-provided function, forward it
// to the consumer via error notification.
subscriber.error(err);
return;
}
}
subscriber.next(state);
})
);
});
};
}

class ScanOperator<V, A, S> implements Operator<V, A> {
constructor(private accumulator: (acc: V|A|S, value: V, index: number) => A, private seed?: S, private hasSeed: boolean = false) {}

call(subscriber: Subscriber<A>, source: any): TeardownLogic {
return source.subscribe(new ScanSubscriber(subscriber, this.accumulator, this.seed, this.hasSeed));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class ScanSubscriber<V, A> extends Subscriber<V> {
private index: number = 0;

constructor(destination: Subscriber<A>, private accumulator: (acc: V|A, value: V, index: number) => A, private _state: any,
private _hasState: boolean) {
class ScanSubscriber<T> extends Subscriber<T> {
constructor(destination: Subscriber<any>, protected _next: (value: T) => void) {
super(destination);
}

protected _next(value: V): void {
const { destination } = this;
if (!this._hasState) {
this._state = value;
this._hasState = true;
destination.next(value);
} else {
const index = this.index++;
let result: A;
try {
result = this.accumulator(this._state, value, index);
} catch (err) {
destination.error(err);
return;
}
this._state = result;
destination.next(result);
}
}
}

0 comments on commit f93fb9c

Please sign in to comment.