@@ -3,38 +3,65 @@ import {Operator} from '../Operator';
3
3
import { Subscriber } from '../Subscriber' ;
4
4
5
5
/**
6
- * Returns an Observable that applies a specified accumulator function to the first item emitted by a source Observable,
7
- * then feeds the result of that function along with the second item emitted by the source Observable into the same
8
- * function, and so on until all items have been emitted by the source Observable, and emits the final result from
9
- * the final call to your function as its sole item.
10
- * This technique, which is called "reduce" here, is sometimes called "aggregate," "fold," "accumulate," "compress," or
11
- * "inject" in other programming contexts.
6
+ * Applies an accumulator function over the source Observable, and returns the
7
+ * accumulated result when the source completes, given an optional seed value.
8
+ *
9
+ * <span class="informal">Combines together all values emitted on the source,
10
+ * using an accumulator function that knows how to join a new source value into
11
+ * the accumulation from the past.</span>
12
12
*
13
13
* <img src="./img/reduce.png" width="100%">
14
14
*
15
- * @param {initialValue } the initial (seed) accumulator value
16
- * @param {accumulator } an accumulator function to be invoked on each item emitted by the source Observable, the
17
- * result of which will be used in the next accumulator call.
18
- * @return {Observable } an Observable that emits a single item that is the result of accumulating the output from the
19
- * items emitted by the source Observable.
15
+ * Like
16
+ * [Array.prototype.reduce()](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/reduce),
17
+ * `reduce` applies an `accumulator` function against an accumulation and each
18
+ * value of the source Observable (from the past) to reduce it to a single
19
+ * value, emitted on the output Observable. Note that `reduce` will only emit
20
+ * one value, only when the source Observable completes. It is equivalent to
21
+ * applying operator {@link scan} followed by operator {@link last}.
22
+ *
23
+ * Returns an Observable that applies a specified `accumulator` function to each
24
+ * item emitted by the source Observable. If a `seed` value is specified, then
25
+ * that value will be used as the initial value for the accumulator. If no seed
26
+ * value is specified, the first item of the source is used as the seed.
27
+ *
28
+ * @example <caption>Count the number of click events that happened in 5 seconds</caption>
29
+ * var clicksInFiveSeconds = Rx.Observable.fromEvent(document, 'click')
30
+ * .takeUntil(Rx.Observable.interval(5000));
31
+ * var ones = clicksInFiveSeconds.mapTo(1);
32
+ * var seed = 0;
33
+ * var count = ones.reduce((acc, one) => acc + one, seed);
34
+ * count.subscribe(x => console.log(x));
35
+ *
36
+ * @see {@link count }
37
+ * @see {@link expand }
38
+ * @see {@link mergeScan }
39
+ * @see {@link scan }
40
+ *
41
+ * @param {function(acc: R, value: T): R } accumulator The accumulator function
42
+ * called on each source value.
43
+ * @param {R } [seed] The initial accumulation value.
44
+ * @return {Observable<R> } An observable of the accumulated values.
45
+ * @return {Observable<R> } An Observable that emits a single value that is the
46
+ * result of accumulating the values emitted by the source Observable.
20
47
* @method reduce
21
48
* @owner Observable
22
49
*/
23
- export function reduce < T , R > ( project : ( acc : R , value : T ) => R , seed ?: R ) : Observable < R > {
24
- return this . lift ( new ReduceOperator ( project , seed ) ) ;
50
+ export function reduce < T , R > ( accumulator : ( acc : R , value : T ) => R , seed ?: R ) : Observable < R > {
51
+ return this . lift ( new ReduceOperator ( accumulator , seed ) ) ;
25
52
}
26
53
27
54
export interface ReduceSignature < T > {
28
- < R > ( project : ( acc : R , value : T ) => R , seed ?: R ) : Observable < R > ;
55
+ < R > ( accumulator : ( acc : R , value : T ) => R , seed ?: R ) : Observable < R > ;
29
56
}
30
57
31
58
export class ReduceOperator < T , R > implements Operator < T , R > {
32
59
33
- constructor ( private project : ( acc : R , value : T ) => R , private seed ?: R ) {
60
+ constructor ( private accumulator : ( acc : R , value : T ) => R , private seed ?: R ) {
34
61
}
35
62
36
63
call ( subscriber : Subscriber < R > , source : any ) : any {
37
- return source . _subscribe ( new ReduceSubscriber ( subscriber , this . project , this . seed ) ) ;
64
+ return source . _subscribe ( new ReduceSubscriber ( subscriber , this . accumulator , this . seed ) ) ;
38
65
}
39
66
}
40
67
@@ -48,12 +75,13 @@ export class ReduceSubscriber<T, R> extends Subscriber<T> {
48
75
acc : T | R ;
49
76
hasSeed : boolean ;
50
77
hasValue : boolean = false ;
51
- project : ( acc : R , value : T ) => R ;
52
78
53
- constructor ( destination : Subscriber < R > , project : ( acc : R , value : T ) => R , seed ?: R ) {
79
+ constructor ( destination : Subscriber < R > ,
80
+ private accumulator : ( acc : R , value : T ) => R ,
81
+ seed ?: R ) {
54
82
super ( destination ) ;
55
83
this . acc = seed ;
56
- this . project = project ;
84
+ this . accumulator = accumulator ;
57
85
this . hasSeed = typeof seed !== 'undefined' ;
58
86
}
59
87
@@ -69,7 +97,7 @@ export class ReduceSubscriber<T, R> extends Subscriber<T> {
69
97
private _tryReduce ( value : T ) {
70
98
let result : any ;
71
99
try {
72
- result = this . project ( < R > this . acc , value ) ;
100
+ result = this . accumulator ( < R > this . acc , value ) ;
73
101
} catch ( err ) {
74
102
this . destination . error ( err ) ;
75
103
return ;
0 commit comments