Skip to content

Commit 30dd894

Browse files
committed
feat(operator): Add count operator.
1 parent abe9a24 commit 30dd894

File tree

6 files changed

+64
-10
lines changed

6 files changed

+64
-10
lines changed

spec/operators/count-spec.js

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/* globals describe, it, expect */
2+
var Rx = require('../../dist/cjs/Rx');
3+
var Observable = Rx.Observable;
4+
5+
describe('count', function () {
6+
it('should count the values of an observable', function (done) {
7+
Observable.fromArray([1, 2, 3])
8+
.count()
9+
.subscribe(function (total) {
10+
expect(total).toEqual(3);
11+
}, null, done);
12+
});
13+
});

spec/operators/takeUntil-spec.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ describe('Observable.prototype.takeUntil()', function () {
88
var i = 0;
99
var nextSpy = jasmine.createSpy('nextSpy');
1010

11-
Observable.timer(0, 16)
12-
.takeUntil(Observable.timer(81))
11+
Observable.timer(0, 100)
12+
.takeUntil(Observable.timer(450))
1313
.subscribe(nextSpy, null, function () {
1414
expect(nextSpy.calls.count()).toBe(5);
1515
expected.forEach(function (v) {

src/Observable.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ export default class Observable<T> {
129129
map: <T, R>(project: (x: T, ix?: number) => R, thisArg?: any) => Observable<R>;
130130
mapTo: <R>(value: R) => Observable<R>;
131131
toArray: () => Observable<T[]>;
132+
count: () => Observable<number>;
132133
scan: <R>(project: (acc: R, x: T) => R, acc?: R) => Observable<R>;
133134
reduce: <R>(project: (acc: R, x: T) => R, acc?: R) => Observable<R>;
134135
startWith: <T>(x: T) => Observable<T>;

src/Rx.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ import _do from './operators/do';
7979
import map from './operators/map';
8080
import mapTo from './operators/mapTo';
8181
import toArray from './operators/toArray';
82+
import count from './operators/count';
8283
import scan from './operators/scan';
8384
import reduce from './operators/reduce';
8485
import startWith from './operators/startWith';
@@ -87,6 +88,7 @@ observableProto.do = _do;
8788
observableProto.map = map;
8889
observableProto.mapTo = mapTo;
8990
observableProto.toArray = toArray;
91+
observableProto.count = count;
9092
observableProto.scan = scan;
9193
observableProto.reduce = reduce;
9294
observableProto.startWith = startWith;

src/observables/RangeObservable.ts

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@ import Observable from '../Observable';
33

44
export default class RangeObservable<T> extends Observable<T> {
55

6-
static create(start: number = 0, count: number = 0, scheduler?: Scheduler) {
7-
return new RangeObservable(start, count, scheduler);
6+
static create(start: number = 0, end: number = 0, scheduler?: Scheduler) {
7+
return new RangeObservable(start, end, scheduler);
88
}
99

1010
static dispatch(state) {
1111

12-
const { start, index, count, subscriber } = state;
12+
const { start, index, end, subscriber } = state;
1313

14-
if (index >= count) {
14+
if (index >= end) {
1515
subscriber.complete();
1616
return;
1717
}
@@ -28,24 +28,31 @@ export default class RangeObservable<T> extends Observable<T> {
2828
(<any> this).schedule(state);
2929
}
3030

31-
constructor(private start: number, private count: number, private scheduler?: Scheduler) {
31+
private start: number;
32+
private end: number;
33+
private scheduler: Scheduler;
34+
35+
constructor(start: number, end: number, scheduler?: Scheduler) {
3236
super();
37+
this.start = start;
38+
this.end = end;
39+
this.scheduler = scheduler;
3340
}
3441

3542
_subscribe(subscriber) {
3643

3744
let index = 0;
3845
let start = this.start;
39-
const count = this.count;
46+
const end = this.end;
4047
const scheduler = this.scheduler;
4148

4249
if (scheduler) {
4350
subscriber.add(scheduler.schedule(0, {
44-
index, count, start, subscriber
51+
index, end, start, subscriber
4552
}, RangeObservable.dispatch));
4653
} else {
4754
do {
48-
if (index++ >= count) {
55+
if (index++ >= end) {
4956
subscriber.complete();
5057
break;
5158
}

src/operators/count.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import Operator from '../Operator';
2+
import Observer from '../Observer';
3+
import Subscriber from '../Subscriber';
4+
5+
export default function count() {
6+
return this.lift(new CountOperator());
7+
}
8+
9+
export class CountOperator<T, R> extends Operator<T, R> {
10+
call(observer: Observer<number>): Observer<T> {
11+
return new CountSubscriber<T>(observer);
12+
}
13+
}
14+
15+
export class CountSubscriber<T> extends Subscriber<T> {
16+
17+
count: number = 0;
18+
19+
constructor(destination: Observer<number>) {
20+
super(destination);
21+
}
22+
23+
_next(x) {
24+
this.count += 1;
25+
}
26+
27+
_complete() {
28+
this.destination.next(this.count);
29+
this.destination.complete();
30+
}
31+
}

0 commit comments

Comments
 (0)