Skip to content

Commit

Permalink
feat(operator): add every operator
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj authored and benlesh committed Oct 1, 2015
1 parent 8342430 commit d11f32e
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 0 deletions.
26 changes: 26 additions & 0 deletions perf/micro/immediate-scheduler/operators/every-predicate-this.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
var RxOld = require("rx");
var RxNew = require("../../../../index");

module.exports = function (suite) {

var predicate = function(x) {
return x < 50;
}

var testThis = {};

var oldEveryPredicateThisArgs = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate).every(predicate, testThis);
var newEveryPredicateThisArgs = RxNew.Observable.range(0, 25).every(predicate, testThis);

return suite
.add('old every(predicate, thisArg) with immediate scheduler', function () {
oldEveryPredicateThisArgs.subscribe(_next, _error, _complete);
})
.add('new every(predicate, thisArg) with immediate scheduler', function () {
newEveryPredicateThisArgs.subscribe(_next, _error, _complete);
});

function _next(x) { }
function _error(e){ }
function _complete(){ }
};
24 changes: 24 additions & 0 deletions perf/micro/immediate-scheduler/operators/every-predicate.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
var RxOld = require("rx");
var RxNew = require("../../../../index");

module.exports = function (suite) {

var predicate = function(x) {
return x < 50;
}

var oldEveryPredicateArgs = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate).every(predicate);
var newEveryPredicateArgs = RxNew.Observable.range(0, 25).every(predicate);

return suite
.add('old every(predicate) with immediate scheduler', function () {
oldEveryPredicateArgs.subscribe(_next, _error, _complete);
})
.add('new every(predicate) with immediate scheduler', function () {
newEveryPredicateArgs.subscribe(_next, _error, _complete);
});

function _next(x) { }
function _error(e){ }
function _complete(){ }
};
96 changes: 96 additions & 0 deletions spec/operators/every-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/* globals describe, it, expect, expectObservable, hot */
var Rx = require('../../dist/cjs/Rx');

describe('Observable.prototype.every()', function() {
function truePredicate(x) {
return true;
}

function predicate(x) {
return x % 5 === 0;
}

it('should emit true if source is empty', function() {
var source = hot('-----|');
var expected = '-----(x|)';

expectObservable(source.every(predicate)).toBe(expected, {x: true});
});

it('should emit false if single source of element does not match with predicate', function() {
var source = hot('--a--|');
var expected = '--(x|)';

expectObservable(source.every(predicate)).toBe(expected, {x: false});
});

it('should emit false if none of element does not match with predicate', function() {
var source = hot('--a--b--c--d--e--|');
var expected = '--(x|)';

expectObservable(source.every(predicate)).toBe(expected, {x: false});
});

it('should return false if only some of element matches with predicate', function() {
var source = hot('--a--b--c--d--e--|', {a: 5, b: 10, c: 15});
var expected = '-----------(x|)';

expectObservable(source.every(predicate)).toBe(expected, {x: false});
});

it('should emit true if single source element match with predicate', function() {
var source = hot('--a--|', {a: 5});
var expected = '-----(x|)';

expectObservable(source.every(predicate)).toBe(expected, {x: true});
});

it('should emit true if all source element matches with predicate', function() {
var source = hot('--a--b--c--d--e--|', {a: 5, b: 10, c: 15, d: 20, e: 25});
var expected = '-----------------(x|)';

expectObservable(source.every(predicate)).toBe(expected, {x: true});
});

it('should raise error if source raises error', function() {
var source = hot('--#');
var expected = '--#';

expectObservable(source.every(truePredicate)).toBe(expected);
});

it('should not completes if source never emits', function() {
var expected = '-';

expectObservable(Rx.Observable.never().every(truePredicate)).toBe(expected);
});

it('should emit true if source element matches with predicate after subscription', function() {
var source = hot('--z--^--a--b--c--d--e--|', {a: 5, b: 10, c: 15, d: 20, e: 25});
var expected = '------------------(x|)';

expectObservable(source.every(predicate)).toBe(expected, {x: true});
});

it('should emit false if source element does not match with predicate after subscription', function() {
var source = hot('--z--^--b--c--z--d--|', {a: 5, b: 10, c: 15, d: 20});
var expected = '---------(x|)';

expectObservable(source.every(predicate)).toBe(expected, {x: false});
});

it('should raise error if source raises error after subscription', function() {
var source = hot('--z--^--#');
var expected = '---#';

expectObservable(source.every(truePredicate)).toBe(expected);
});

it('should emit true if source does not emit after subscription', function() {
var source = hot('--z--^-----|');
var expected = '------(x|)';

expectObservable(source.every(predicate)).toBe(expected, {x: true});
});

});
1 change: 1 addition & 0 deletions src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export interface CoreOperators<T> {
ignoreElements?: () => Observable<T>;
isEmpty?: () => Observable<boolean>;
last?: <R>(predicate?: (value: T, index:number) => boolean, resultSelector?: (value: T, index: number) => R, thisArg?: any, defaultValue?: any) => Observable<T>;
every?: (predicate: (value: T, index:number) => boolean, thisArg?: any) => Observable<T>;
map?: <T, R>(project: (x: T, ix?: number) => R, thisArg?: any) => Observable<R>;
mapTo?: <R>(value: R) => Observable<R>;
materialize?: () => Observable<any>;
Expand Down
3 changes: 3 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ observableProto.ignoreElements = ignoreElements;
import isEmpty from './operators/isEmpty';
observableProto.isEmpty = isEmpty;

import every from './operators/every';
observableProto.every = every;

import last from './operators/last';
observableProto.last = last;

Expand Down
3 changes: 3 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ observableProto.ignoreElements = ignoreElements;
import isEmpty from './operators/isEmpty';
observableProto.isEmpty = isEmpty;

import every from './operators/every';
observableProto.every = every;

import last from './operators/last';
observableProto.last = last;

Expand Down
61 changes: 61 additions & 0 deletions src/operators/every.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Observable from '../Observable';
import Subscriber from '../Subscriber';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';

export default function every<T>(predicate: (value: T, index: number, source: Observable<T>) => boolean, thisArg?: any): Observable<T>{
return this.lift(new EveryOperator(predicate, thisArg, this));
}

class EveryOperator<T, R> implements Operator<T, R> {
constructor(private predicate: (value: T, index: number, source: Observable<T>) => boolean,
private thisArg?: any, private source?: Observable<T>) {

}

call(observer: Subscriber<R>): Subscriber<T> {
return new EverySubscriber(observer, this.predicate, this.thisArg, this.source);
}
}

class EverySubscriber<T> extends Subscriber<T> {
private predicate: Function = undefined;
private index: number = 0;

constructor(destination: Observer<T>, predicate?: (value: T, index: number, source: Observable<T>) => boolean,
private thisArg?: any, private source?: Observable<T>) {
super(destination);

if(typeof predicate === 'function') {
this.predicate = bindCallback(predicate, thisArg, 3);
}
}

private notifyComplete(everyValueMatch: boolean): void {
this.destination.next(everyValueMatch);
this.destination.complete();
}

_next(value: T) {
const predicate = this.predicate;

if (predicate === undefined) {
this.destination.error(new TypeError('predicate must be a function'));
}

let result = tryCatch(predicate)(value, this.index++, this.source);
if (result === errorObject) {
this.destination.error(result.e);
} else if (!result) {
this.notifyComplete(false);
}
}

_complete() {
this.notifyComplete(true);
}
}

0 comments on commit d11f32e

Please sign in to comment.