diff --git a/perf/micro/immediate-scheduler/operators/every-predicate-this.js b/perf/micro/immediate-scheduler/operators/every-predicate-this.js new file mode 100644 index 0000000000..0c0a2600b0 --- /dev/null +++ b/perf/micro/immediate-scheduler/operators/every-predicate-this.js @@ -0,0 +1,26 @@ +var RxOld = require("rx"); +var RxNew = require("../../../../index"); + +module.exports = function (suite) { + + var predicate = function(x) { + return x < 50; + } + + var testThis = {}; + + var oldEveryPredicateThisArgs = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate).every(predicate, testThis); + var newEveryPredicateThisArgs = RxNew.Observable.range(0, 25).every(predicate, testThis); + + return suite + .add('old every(predicate, thisArg) with immediate scheduler', function () { + oldEveryPredicateThisArgs.subscribe(_next, _error, _complete); + }) + .add('new every(predicate, thisArg) with immediate scheduler', function () { + newEveryPredicateThisArgs.subscribe(_next, _error, _complete); + }); + + function _next(x) { } + function _error(e){ } + function _complete(){ } +}; \ No newline at end of file diff --git a/perf/micro/immediate-scheduler/operators/every-predicate.js b/perf/micro/immediate-scheduler/operators/every-predicate.js new file mode 100644 index 0000000000..75eb12ab97 --- /dev/null +++ b/perf/micro/immediate-scheduler/operators/every-predicate.js @@ -0,0 +1,24 @@ +var RxOld = require("rx"); +var RxNew = require("../../../../index"); + +module.exports = function (suite) { + + var predicate = function(x) { + return x < 50; + } + + var oldEveryPredicateArgs = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate).every(predicate); + var newEveryPredicateArgs = RxNew.Observable.range(0, 25).every(predicate); + + return suite + .add('old every(predicate) with immediate scheduler', function () { + oldEveryPredicateArgs.subscribe(_next, _error, _complete); + }) + .add('new every(predicate) with immediate scheduler', function () { + newEveryPredicateArgs.subscribe(_next, _error, _complete); + }); + + function _next(x) { } + function _error(e){ } + function _complete(){ } +}; \ No newline at end of file diff --git a/spec/operators/every-spec.js b/spec/operators/every-spec.js new file mode 100644 index 0000000000..e395a48455 --- /dev/null +++ b/spec/operators/every-spec.js @@ -0,0 +1,96 @@ +/* globals describe, it, expect, expectObservable, hot */ +var Rx = require('../../dist/cjs/Rx'); + +describe('Observable.prototype.every()', function() { + function truePredicate(x) { + return true; + } + + function predicate(x) { + return x % 5 === 0; + } + + it('should emit true if source is empty', function() { + var source = hot('-----|'); + var expected = '-----(x|)'; + + expectObservable(source.every(predicate)).toBe(expected, {x: true}); + }); + + it('should emit false if single source of element does not match with predicate', function() { + var source = hot('--a--|'); + var expected = '--(x|)'; + + expectObservable(source.every(predicate)).toBe(expected, {x: false}); + }); + + it('should emit false if none of element does not match with predicate', function() { + var source = hot('--a--b--c--d--e--|'); + var expected = '--(x|)'; + + expectObservable(source.every(predicate)).toBe(expected, {x: false}); + }); + + it('should return false if only some of element matches with predicate', function() { + var source = hot('--a--b--c--d--e--|', {a: 5, b: 10, c: 15}); + var expected = '-----------(x|)'; + + expectObservable(source.every(predicate)).toBe(expected, {x: false}); + }); + + it('should emit true if single source element match with predicate', function() { + var source = hot('--a--|', {a: 5}); + var expected = '-----(x|)'; + + expectObservable(source.every(predicate)).toBe(expected, {x: true}); + }); + + it('should emit true if all source element matches with predicate', function() { + var source = hot('--a--b--c--d--e--|', {a: 5, b: 10, c: 15, d: 20, e: 25}); + var expected = '-----------------(x|)'; + + expectObservable(source.every(predicate)).toBe(expected, {x: true}); + }); + + it('should raise error if source raises error', function() { + var source = hot('--#'); + var expected = '--#'; + + expectObservable(source.every(truePredicate)).toBe(expected); + }); + + it('should not completes if source never emits', function() { + var expected = '-'; + + expectObservable(Rx.Observable.never().every(truePredicate)).toBe(expected); + }); + + it('should emit true if source element matches with predicate after subscription', function() { + var source = hot('--z--^--a--b--c--d--e--|', {a: 5, b: 10, c: 15, d: 20, e: 25}); + var expected = '------------------(x|)'; + + expectObservable(source.every(predicate)).toBe(expected, {x: true}); + }); + + it('should emit false if source element does not match with predicate after subscription', function() { + var source = hot('--z--^--b--c--z--d--|', {a: 5, b: 10, c: 15, d: 20}); + var expected = '---------(x|)'; + + expectObservable(source.every(predicate)).toBe(expected, {x: false}); + }); + + it('should raise error if source raises error after subscription', function() { + var source = hot('--z--^--#'); + var expected = '---#'; + + expectObservable(source.every(truePredicate)).toBe(expected); + }); + + it('should emit true if source does not emit after subscription', function() { + var source = hot('--z--^-----|'); + var expected = '------(x|)'; + + expectObservable(source.every(predicate)).toBe(expected, {x: true}); + }); + +}); \ No newline at end of file diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index 52b605103b..f920f89823 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -33,6 +33,7 @@ export interface CoreOperators { ignoreElements?: () => Observable; isEmpty?: () => Observable; last?: (predicate?: (value: T, index:number) => boolean, resultSelector?: (value: T, index: number) => R, thisArg?: any, defaultValue?: any) => Observable; + every?: (predicate: (value: T, index:number) => boolean, thisArg?: any) => Observable; map?: (project: (x: T, ix?: number) => R, thisArg?: any) => Observable; mapTo?: (value: R) => Observable; materialize?: () => Observable; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 020c08e9cd..5e50eb51e5 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -155,6 +155,9 @@ observableProto.ignoreElements = ignoreElements; import isEmpty from './operators/isEmpty'; observableProto.isEmpty = isEmpty; +import every from './operators/every'; +observableProto.every = every; + import last from './operators/last'; observableProto.last = last; diff --git a/src/Rx.ts b/src/Rx.ts index a1e05005cd..a959519777 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -135,6 +135,9 @@ observableProto.ignoreElements = ignoreElements; import isEmpty from './operators/isEmpty'; observableProto.isEmpty = isEmpty; +import every from './operators/every'; +observableProto.every = every; + import last from './operators/last'; observableProto.last = last; diff --git a/src/operators/every.ts b/src/operators/every.ts new file mode 100644 index 0000000000..bd0c30e581 --- /dev/null +++ b/src/operators/every.ts @@ -0,0 +1,61 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Observable from '../Observable'; +import Subscriber from '../Subscriber'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import bindCallback from '../util/bindCallback'; + +export default function every(predicate: (value: T, index: number, source: Observable) => boolean, thisArg?: any): Observable{ + return this.lift(new EveryOperator(predicate, thisArg, this)); +} + +class EveryOperator implements Operator { + constructor(private predicate: (value: T, index: number, source: Observable) => boolean, + private thisArg?: any, private source?: Observable) { + + } + + call(observer: Subscriber): Subscriber { + return new EverySubscriber(observer, this.predicate, this.thisArg, this.source); + } +} + +class EverySubscriber extends Subscriber { + private predicate: Function = undefined; + private index: number = 0; + + constructor(destination: Observer, predicate?: (value: T, index: number, source: Observable) => boolean, + private thisArg?: any, private source?: Observable) { + super(destination); + + if(typeof predicate === 'function') { + this.predicate = bindCallback(predicate, thisArg, 3); + } + } + + private notifyComplete(everyValueMatch: boolean): void { + this.destination.next(everyValueMatch); + this.destination.complete(); + } + + _next(value: T) { + const predicate = this.predicate; + + if (predicate === undefined) { + this.destination.error(new TypeError('predicate must be a function')); + } + + let result = tryCatch(predicate)(value, this.index++, this.source); + if (result === errorObject) { + this.destination.error(result.e); + } else if (!result) { + this.notifyComplete(false); + } + } + + _complete() { + this.notifyComplete(true); + } +} \ No newline at end of file