From 94a034d6900bf728eb65587f8e1b4fd89bd24990 Mon Sep 17 00:00:00 2001 From: Justin Woo Date: Wed, 13 Jan 2016 15:52:09 +0900 Subject: [PATCH] feat(distinct): add distinct operator --- doc/index.md | 1 + spec/operators/distinct-spec.js | 226 ++++++++++++++++++++++++++++++++ src/Rx.KitchenSink.ts | 2 + src/add/operator/distinct.ts | 12 ++ src/operator/distinct.ts | 111 ++++++++++++++++ 5 files changed, 352 insertions(+) create mode 100644 spec/operators/distinct-spec.js create mode 100644 src/add/operator/distinct.ts create mode 100644 src/operator/distinct.ts diff --git a/doc/index.md b/doc/index.md index 8a4fcef4c0..de6bae78dd 100644 --- a/doc/index.md +++ b/doc/index.md @@ -41,6 +41,7 @@ - [debounce](function/index.html#static-function-debounce) - [defaultIfEmpty](function/index.html#static-function-defaultIfEmpty) - [delay](function/index.html#static-function-delay) +- [distinct](function/index.html#static-function-distinct) - [distinctUntilChanged](function/index.html#static-function-distinctUntilChanged) - [distinctUntilKeyChanged](function/index.html#static-function-distinctUntilKeyChanged) - [do](function/index.html#static-function-do) diff --git a/spec/operators/distinct-spec.js b/spec/operators/distinct-spec.js new file mode 100644 index 0000000000..7420a4c28c --- /dev/null +++ b/spec/operators/distinct-spec.js @@ -0,0 +1,226 @@ +/* globals describe, it, expect, expectObservable, expectSubscriptions, cold, hot */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.distinct()', function () { + it('should distinguish between values', function () { + var e1 = hot('--a--a--a--b--b--a--|'); + var e1subs = '^ !'; + var expected = '--a--------b--------|'; + + expectObservable(e1.distinct()).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should distinguish between values and does not completes', function () { + var e1 = hot('--a--a--a--b--b--a-'); + var e1subs = '^ '; + var expected = '--a--------b-------'; + + expectObservable(e1.distinct()).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should not completes if source never completes', function () { + var e1 = cold('-'); + var e1subs = '^'; + var expected = '-'; + + expectObservable(e1.distinct()).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should not completes if source does not completes', function () { + var e1 = hot('-'); + var e1subs = '^'; + var expected = '-'; + + expectObservable(e1.distinct()).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should complete if source is empty', function () { + var e1 = cold('|'); + var e1subs = '(^!)'; + var expected = '|'; + + expectObservable(e1.distinct()).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should complete if source does not emit', function () { + var e1 = hot('------|'); + var e1subs = '^ !'; + var expected = '------|'; + + expectObservable(e1.distinct()).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should emit if source emits single element only', function () { + var e1 = hot('--a--|'); + var e1subs = '^ !'; + var expected = '--a--|'; + + expectObservable(e1.distinct()).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should emit if source is scalar', function () { + var e1 = Observable.of('a'); + var expected = '(a|)'; + + expectObservable(e1.distinct()).toBe(expected); + }); + + it('should raises error if source raises error', function () { + var e1 = hot('--a--a--#'); + var e1subs = '^ !'; + var expected = '--a-----#'; + + expectObservable(e1.distinct()).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should raises error if source throws', function () { + var e1 = cold('#'); + var e1subs = '(^!)'; + var expected = '#'; + + expectObservable(e1.distinct()).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should not omit if source elements are all different', function () { + var e1 = hot('--a--b--c--d--e--f--|'); + var e1subs = '^ !'; + var expected = '--a--b--c--d--e--f--|'; + + expectObservable(e1.distinct()).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should allow unsubscribing early and explicitly', function () { + var e1 = hot('--a--b--b--d--a--f--|'); + var e1subs = '^ ! '; + var expected = '--a--b----- '; + var unsub = ' ! '; + + var result = e1.distinct(); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should not break unsubscription chains when unsubscribed explicitly', function () { + var e1 = hot('--a--b--b--d--a--f--|'); + var e1subs = '^ ! '; + var expected = '--a--b----- '; + var unsub = ' ! '; + + var result = e1 + .mergeMap(function (x) { return Observable.of(x); }) + .distinct() + .mergeMap(function (x) { return Observable.of(x); }); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should emit once if source elements are all same', function () { + var e1 = hot('--a--a--a--a--a--a--|'); + var e1subs = '^ !'; + var expected = '--a-----------------|'; + + expectObservable(e1.distinct()).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should emit once if comparer returns true always regardless of source emits', function () { + var e1 = hot('--a--b--c--d--e--f--|'); + var e1subs = '^ !'; + var expected = '--a-----------------|'; + + expectObservable(e1.distinct(function () { return true; })).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should emit all if comparer returns false always regardless of source emits', function () { + var e1 = hot('--a--a--a--a--a--a--|'); + var e1subs = '^ !'; + var expected = '--a--a--a--a--a--a--|'; + + expectObservable(e1.distinct(function () { return false; })).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should distinguish values by selector', function () { + var e1 = hot('--a--b--c--d--e--f--|', {a: 1, b: 2, c: 3, d: 4, e: 5, f: 6}); + var e1subs = '^ !'; + var expected = '--a-----c-----e-----|'; + var selector = function (x, y) { + return y % 2 === 0; + }; + + expectObservable(e1.distinct(selector)).toBe(expected, {a: 1, c: 3, e: 5}); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should raises error when comparer throws', function () { + var e1 = hot('--a--b--c--d--e--f--|'); + var e1subs = '^ ! '; + var expected = '--a--b--c--# '; + var selector = function (x, y) { + if (y === 'd') { + throw 'error'; + } + return x === y; + }; + + expectObservable(e1.distinct(selector)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should support a flushing stream', function () { + var e1 = hot('--a--b--a--b--a--b--|'); + var e1subs = '^ !'; + var e2 = hot('-----------x--------|'); + var e2subs = '^ !'; + var expected = '--a--b--------a--b--|'; + var selector = function (x, y) { + return x === y; + }; + + expectObservable(e1.distinct(selector, e2)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should unsubscribe from the flushing stream when the main stream is unsubbed', function () { + var e1 = hot('--a--b--a--b--a--b--|'); + var e1subs = '^ ! '; + var e2 = hot('-----------x--------|'); + var e2subs = '^ ! '; + var unsub = ' ! '; + var expected = '--a--b------'; + var selector = function (x, y) { + return x === y; + }; + + expectObservable(e1.distinct(selector, e2), unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should allow opting in to default comparator with flush', function () { + var e1 = hot('--a--b--a--b--a--b--|'); + var e1subs = '^ !'; + var e2 = hot('-----------x--------|'); + var e2subs = '^ !'; + var expected = '--a--b--------a--b--|'; + + expectObservable(e1.distinct(null, e2)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); +}); \ No newline at end of file diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 0ae4ad79a1..8fe26b95dc 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -8,6 +8,7 @@ import {Scheduler as IScheduler} from './Scheduler'; export interface KitchenSinkOperators extends CoreOperators { isEmpty?: () => Observable; elementAt?: (index: number, defaultValue?: any) => Observable; + distinct?: (compare?: (x: T, y: T) => boolean, flushes?: Observable) => Observable; distinctUntilKeyChanged?: (key: string, compare?: (x: any, y: any) => boolean) => Observable; find?: (predicate: (value: T, index: number, source: Observable) => boolean, thisArg?: any) => Observable; findIndex?: (predicate: (value: T, index: number, source: Observable) => boolean, thisArg?: any) => Observable; @@ -63,6 +64,7 @@ import './add/operator/debounce'; import './add/operator/debounceTime'; import './add/operator/defaultIfEmpty'; import './add/operator/delay'; +import './add/operator/distinct'; import './add/operator/distinctUntilChanged'; import './add/operator/distinctUntilKeyChanged'; import './add/operator/do'; diff --git a/src/add/operator/distinct.ts b/src/add/operator/distinct.ts new file mode 100644 index 0000000000..d4475718b8 --- /dev/null +++ b/src/add/operator/distinct.ts @@ -0,0 +1,12 @@ +/** + * Everything in this file is generated by the 'tools/generate-operator-patches.ts' script. + * Any manual edits to this file will be lost next time the script is run. + **/ +import {Observable} from '../../Observable'; +import {distinct} from '../../operator/distinct'; +import {KitchenSinkOperators} from '../../Rx.KitchenSink'; + +const observableProto = (>Observable.prototype); +observableProto.distinct = distinct; + +export var _void: void; \ No newline at end of file diff --git a/src/operator/distinct.ts b/src/operator/distinct.ts new file mode 100644 index 0000000000..c0118bc2b3 --- /dev/null +++ b/src/operator/distinct.ts @@ -0,0 +1,111 @@ +import {Observable} from '../Observable'; +import {Operator} from '../Operator'; +import {Subscriber} from '../Subscriber'; +import {Subscription} from '../Subscription'; +import {tryCatch} from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; + +/** + * Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items. + * If a comparator function is provided, then it will be called for each item to test for whether or not that value should be emitted. + * If a comparator function is not provided, an equality check is used by default. + * As the internal HashSet of this operator grows larger and larger, care should be taken in the domain of inputs this operator may see. + * An optional paramter is also provided such that an Observable can be provided to queue the internal HashSet to flush the values it holds. + * @param {function} [compare] optional comparison function called to test if an item is distinct from previous items in the source. + * @param {Observable} [flushes] optional Observable for flushing the internal HashSet of the operator. + * @returns {Observable} an Observable that emits items from the source Observable with distinct values. + */ +export function distinct(compare?: (x: T, y: T) => boolean, flushes?: Observable) { + return this.lift(new DistinctOperator(compare, flushes)); +} + +class DistinctOperator implements Operator { + constructor(private compare: (x: T, y: T) => boolean, private flushes: Observable) { + } + + call(subscriber: Subscriber): Subscriber { + return new DistinctSubscriber(subscriber, this.compare, this.flushes); + } +} + +class HashSet { + private set: Array = []; + + constructor(private compare: (x: T, y: T) => boolean) { + } + + private has(item: T): boolean { + for (var i = 0; i < this.set.length; i++) { + if (this.compare(this.set[i], item)) { + return true; + } + } + + return false; + } + + push(item: T): boolean { + if (this.has(item)) { + return false; + } else { + this.set.push(item); + return true; + } + } + + flush(): void { + this.set = []; + } +} + +class DistinctSubscriber extends Subscriber { + private hashSet: HashSet; + private flushSubscription: Subscription; + + constructor(destination: Subscriber, compare: (x: T, y: T) => boolean, flushes: Observable) { + super(destination); + if (typeof compare === 'function') { + this.compare = compare; + } + this.hashSet = new HashSet(this.compare); + + if (flushes) { + this.flushSubscription = flushes.subscribe(() => this.hashSet.flush()); + } + } + + private compare(x: T, y: T): boolean { + return x === y; + } + + private disposeFlushSubscription(): void { + if (this.flushSubscription) { + this.flushSubscription.unsubscribe(); + } + } + + protected _next(value: T): void { + let result: any = false; + + result = tryCatch(this.hashSet.push.bind(this.hashSet))(value); + if (result === errorObject) { + this.destination.error(errorObject.e); + return; + } + + if (result) { + this.destination.next(value); + } + } + + protected _complete(): void { + this.disposeFlushSubscription(); + super._complete(); + } + + unsubscribe(): void { + this.disposeFlushSubscription(); + super.unsubscribe(); + } + +}