From 90c6c5184fb5ca4421248f7fc7b446f98d135ae8 Mon Sep 17 00:00:00 2001 From: OJ Kwon Date: Sun, 17 Apr 2016 13:11:30 -0700 Subject: [PATCH] fix(scan): accumulator passes current index closes #1614 --- spec/operators/scan-spec.ts | 25 +++++++++++++++++++++++++ src/operator/scan.ts | 20 ++++++++++---------- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/spec/operators/scan-spec.ts b/spec/operators/scan-spec.ts index 637b1fd13f..ef8eeb25e8 100644 --- a/spec/operators/scan-spec.ts +++ b/spec/operators/scan-spec.ts @@ -1,3 +1,4 @@ +import {expect} from 'chai'; import * as Rx from '../../dist/cjs/Rx'; declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions}; @@ -176,4 +177,28 @@ describe('Observable.prototype.scan', () => { expectObservable(source, unsub).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should pass current index to accumulator', () => { + const values = { + a: 1, b: 3, c: 5, + x: 1, y: 4, z: 9 + }; + let idx = [0, 1, 2]; + + const e1 = hot('--a--b--c--|', values); + const e1subs = '^ !'; + const expected = '--x--y--z--|'; + + const scanFunction = (o: number, value: number, index: number) => { + expect(index).to.equal(idx.shift()); + return o + value; + }; + + const scan = e1.scan(scanFunction, 0).finally(() => { + expect(idx).to.be.empty; + }); + + expectObservable(scan).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); }); diff --git a/src/operator/scan.ts b/src/operator/scan.ts index 4dc15fa2b2..61a7e1e80b 100644 --- a/src/operator/scan.ts +++ b/src/operator/scan.ts @@ -32,23 +32,23 @@ import {Subscriber} from '../Subscriber'; * @see {@link mergeScan} * @see {@link reduce} * - * @param {function(acc: R, value: T): R} accumulator The accumulator function - * called on each source value. + * @param {function(acc: R, value: T, index: number): R} accumulator + * The accumulator function called on each source value. * @param {T|R} [seed] The initial accumulation value. * @return {Observable} An observable of the accumulated values. * @method scan * @owner Observable */ -export function scan(accumulator: (acc: R, value: T) => R, seed?: T | R): Observable { +export function scan(accumulator: (acc: R, value: T, index: number) => R, seed?: T | R): Observable { return this.lift(new ScanOperator(accumulator, seed)); } export interface ScanSignature { - (accumulator: (acc: R, value: T) => R, seed?: T | R): Observable; + (accumulator: (acc: R, value: T, index: number) => R, seed?: T | R): Observable; } class ScanOperator implements Operator { - constructor(private accumulator: (acc: R, value: T) => R, private seed?: T | R) { + constructor(private accumulator: (acc: R, value: T, index: number) => R, private seed?: T | R) { } call(subscriber: Subscriber, source: any): any { @@ -62,6 +62,8 @@ class ScanOperator implements Operator { * @extends {Ignored} */ class ScanSubscriber extends Subscriber { + private index: number = 0; + private accumulatorSet: boolean = false; private _seed: T | R; get seed(): T | R { @@ -73,12 +75,9 @@ class ScanSubscriber extends Subscriber { this._seed = value; } - private accumulatorSet: boolean = false; - - constructor(destination: Subscriber, private accumulator: (acc: R, value: T) => R, seed?: T|R) { + constructor(destination: Subscriber, private accumulator: (acc: R, value: T, index: number) => R, seed?: T|R) { super(destination); this.seed = seed; - this.accumulator = accumulator; this.accumulatorSet = typeof seed !== 'undefined'; } @@ -92,9 +91,10 @@ class ScanSubscriber extends Subscriber { } private _tryNext(value: T): void { + const index = this.index++; let result: any; try { - result = this.accumulator(this.seed, value); + result = this.accumulator(this.seed, value, index); } catch (err) { this.destination.error(err); }