Skip to content

Commit

Permalink
feat(count): add predicate support in count()
Browse files Browse the repository at this point in the history
Add a predicate function argument (and a thisArg argument) to count()
operator. Also add respective tests that mirror RxJS 4 tests.

Resolves issue #425.
  • Loading branch information
Andre Medeiros authored and benlesh committed Oct 3, 2015
1 parent e773b5e commit 42d1add
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 9 deletions.
103 changes: 103 additions & 0 deletions spec/operators/count-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,107 @@ describe('count', function () {

expectObservable(e1.count()).toBe(expected, null, new Error('too bad'));
});

it('should handle an always-true predicate on an empty hot observable', function () {
var e1 = hot('-x-^---|');
var expected = '----(w|)';
var predicate = function () {
return true;
};

expectObservable(e1.count(predicate)).toBe(expected, { w: 0 });
});

it('should handle an always-false predicate on an empty hot observable', function () {
var e1 = hot('-x-^---|');
var expected = '----(w|)';
var predicate = function () {
return false;
};

expectObservable(e1.count(predicate)).toBe(expected, { w: 0 });
});

it('should handle an always-true predicate on a simple hot observable', function () {
var e1 = hot('-x-^-a-|');
var expected = '----(w|)';
var predicate = function () {
return true;
};

expectObservable(e1.count(predicate)).toBe(expected, { w: 1 });
});

it('should handle an always-false predicate on a simple hot observable', function () {
var e1 = hot('-x-^-a-|');
var expected = '----(w|)';
var predicate = function () {
return false;
};

expectObservable(e1.count(predicate)).toBe(expected, { w: 0 });
});

it('should handle a match-all predicate on observable with many values', function () {
var e1 = hot('-1-^-2--3--4-|');
var expected = '----------(w|)';
var predicate = function (value) {
return parseInt(value) < 10;
};

expectObservable(e1.count(predicate)).toBe(expected, { w: 3 });
});

it('should handle a match-none predicate on observable with many values', function () {
var e1 = hot('-1-^-2--3--4-|');
var expected = '----------(w|)';
var predicate = function (value) {
return parseInt(value) > 10;
};

expectObservable(e1.count(predicate)).toBe(expected, { w: 0 });
});

it('should handle an always-true predicate on observable that throws', function () {
var e1 = hot('-1-^---#');
var expected = '----#';
var predicate = function () {
return true;
};

expectObservable(e1.count(predicate)).toBe(expected);
});

it('should handle an always-false predicate on observable that throws', function () {
var e1 = hot('-1-^---#');
var expected = '----#';
var predicate = function () {
return false;
};

expectObservable(e1.count(predicate)).toBe(expected);
});

it('should handle an always-true predicate on a hot never-observable', function () {
var e1 = hot('-x-^----');
var expected = '-----';
var predicate = function () {
return true;
};

expectObservable(e1.count(predicate)).toBe(expected);
});

it('should handle a predicate that throws, on observable with many values', function () {
var e1 = hot('-1-^-2--3--|');
var expected = '-----#';
var predicate = function (value) {
if (value === '3') {
throw 'error';
}
return true;
};

expectObservable(e1.count(predicate)).toBe(expected);
});
});
51 changes: 42 additions & 9 deletions src/operators/count.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,60 @@
import Observable from '../Observable';
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';

export default function count() {
return this.lift(new CountOperator());
import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';

export default function count<T>(predicate?: (value: T,
index: number,
source: Observable<T>) => boolean,
thisArg?: any): Observable<T> {
return this.lift(new CountOperator(predicate, thisArg, this));
}

class CountOperator<T, R> implements Operator<T, R> {
call(subscriber: Subscriber<number>): Subscriber<T> {
return new CountSubscriber<T>(subscriber);
constructor(private predicate?: (value: T, index: number, source: Observable<T>) => boolean,
private thisArg?: any,
private source?: Observable<T>) {
}

call(subscriber: Subscriber<R>): Subscriber<T> {
return new CountSubscriber<T>(
subscriber, this.predicate, this.thisArg, this.source
);
}
}

class CountSubscriber<T> extends Subscriber<T> {
private predicate: Function;
private count: number = 0;
private index: number = 0;

count: number = 0;

constructor(destination: Subscriber<number>) {
constructor(destination: Observer<T>,
predicate?: (value: T, index: number, source: Observable<T>) => boolean,
private thisArg?: any,
private source?: Observable<T>) {
super(destination);
if (typeof predicate === 'function') {
this.predicate = bindCallback(predicate, thisArg, 3);
}
}

_next(x) {
this.count += 1;
_next(value: T) {
const predicate = this.predicate;
let passed: any = true;
if (predicate) {
passed = tryCatch(predicate)(value, this.index++, this.source);
if (passed === errorObject) {
this.destination.error(passed.e);
return;
}
}
if (passed) {
this.count += 1;
}
}

_complete() {
Expand Down

0 comments on commit 42d1add

Please sign in to comment.