Skip to content

Commit

Permalink
feat(distinctKey): add distinctKey operator
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwoo authored and benlesh committed Jan 27, 2016
1 parent 94a034d commit fe4d57f
Show file tree
Hide file tree
Showing 5 changed files with 310 additions and 0 deletions.
1 change: 1 addition & 0 deletions doc/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
- [defaultIfEmpty](function/index.html#static-function-defaultIfEmpty)
- [delay](function/index.html#static-function-delay)
- [distinct](function/index.html#static-function-distinct)
- [distinctKey](function/index.html#static-function-distinctKey)
- [distinctUntilChanged](function/index.html#static-function-distinctUntilChanged)
- [distinctUntilKeyChanged](function/index.html#static-function-distinctUntilKeyChanged)
- [do](function/index.html#static-function-do)
Expand Down
272 changes: 272 additions & 0 deletions spec/operators/distinctKey-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
/* globals describe, it, expect, expectObservable, expectSubscriptions, cold, hot */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.distinctKey()', function () {
it.asDiagram('distinctKey(\'k\')')('should distinguish between values', function () {
var values = {a: {k: 1}, b: {k: 2}, c: {k: 3}};
var e1 = hot('-a--b-b----a-c-|', values);
var expected = '-a--b--------c-|';

var result = e1.distinctKey('k');

expectObservable(result).toBe(expected, values);
});

it('should distinguish between values', function () {
var values = {a: {val: 1}, b: {val: 2}};
var e1 = hot('--a--a--a--b--b--a--|', values);
var e1subs = '^ !';
var expected = '--a--------b--------|';

expectObservable(e1.distinctKey('val')).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should distinguish between values and does not completes', function () {
var values = {a: {val: 1}, b: {val: 2}};
var e1 = hot('--a--a--a--b--b--a-', values);
var e1subs = '^ ';
var expected = '--a--------b-------';

expectObservable(e1.distinctKey('val')).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should distinguish between values with key', function () {
var values = {a: {val: 1}, b: {valOther: 1}, c: {valOther: 3}, d: {val: 1}, e: {val: 5}};
var e1 = hot('--a--b--c--d--e--|', values);
var e1subs = '^ !';
var expected = '--a--b--------e--|';

expectObservable(e1.distinctKey('val')).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not compare if source does not have element with key', function () {
var values = {a: {valOther: 1}, b: {valOther: 1}, c: {valOther: 3}, d: {valOther: 1}, e: {valOther: 5}};
var e1 = hot('--a--b--c--d--e--|', values);
var e1subs = '^ !';
var expected = '--a--------------|';

expectObservable(e1.distinctKey('val')).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not completes if source never completes', function () {
var e1 = cold('-');
var e1subs = '^';
var expected = '-';

expectObservable(e1.distinctKey('val')).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not completes if source does not completes', function () {
var e1 = hot('-');
var e1subs = '^';
var expected = '-';

expectObservable(e1.distinctKey('val')).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should complete if source is empty', function () {
var e1 = cold('|');
var e1subs = '(^!)';
var expected = '|';

expectObservable(e1.distinctKey('val')).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should complete if source does not emit', function () {
var e1 = hot('------|');
var e1subs = '^ !';
var expected = '------|';

expectObservable(e1.distinctKey('val')).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should emit if source emits single element only', function () {
var values = {a: {val: 1}};
var e1 = hot('--a--|', values);
var e1subs = '^ !';
var expected = '--a--|';

expectObservable(e1.distinctKey('val')).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should emit if source is scalar', function () {
var values = {a: {val: 1}};
var e1 = Observable.of(values.a);
var expected = '(a|)';

expectObservable(e1.distinctKey('val')).toBe(expected, values);
});

it('should raises error if source raises error', function () {
var values = {a: {val: 1}};
var e1 = hot('--a--a--#', values);
var e1subs = '^ !';
var expected = '--a-----#';

expectObservable(e1.distinctKey('val')).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should raises error if source throws', function () {
var e1 = cold('#');
var e1subs = '(^!)';
var expected = '#';

expectObservable(e1.distinctKey('val')).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not omit if source elements are all different', function () {
var values = {a: {val: 1}, b: {val: 2}, c: {val: 3}, d: {val: 4}, e: {val: 5}};
var e1 = hot('--a--b--c--d--e--|', values);
var e1subs = '^ !';
var expected = '--a--b--c--d--e--|';

expectObservable(e1.distinctKey('val')).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should allow unsubscribing early and explicitly', function () {
var values = {a: {val: 1}, b: {val: 2}, c: {val: 3}, d: {val: 4}, e: {val: 5}};
var e1 = hot('--a--b--b--d--a--e--|', values);
var e1subs = '^ ! ';
var expected = '--a--b----- ';
var unsub = ' ! ';

var result = e1.distinctKey('val');

expectObservable(result, unsub).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not break unsubscription chains when unsubscribed explicitly', function () {
var values = {a: {val: 1}, b: {val: 2}, c: {val: 3}, d: {val: 4}, e: {val: 5}};
var e1 = hot('--a--b--b--d--a--e--|', values);
var e1subs = '^ ! ';
var expected = '--a--b----- ';
var unsub = ' ! ';

var result = e1
.mergeMap(function (x) { return Observable.of(x); })
.distinctKey('val')
.mergeMap(function (x) { return Observable.of(x); });

expectObservable(result, unsub).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should emit once if source elements are all same', function () {
var values = {a: {val: 1}};
var e1 = hot('--a--a--a--a--a--a--|', values);
var e1subs = '^ !';
var expected = '--a-----------------|';

expectObservable(e1.distinctKey('val')).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should emit once if comparer returns true always regardless of source emits', function () {
var values = {a: {val: 1}, b: {val: 2}, c: {val: 3}, d: {val: 4}, e: {val: 5}};
var e1 = hot('--a--b--c--d--e--|', values);
var e1subs = '^ !';
var expected = '--a--------------|';

expectObservable(e1.distinctKey('val', function () { return true; })).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should emit all if comparer returns false always regardless of source emits', function () {
var values = {a: {val: 1}};
var e1 = hot('--a--a--a--a--a--a--|', values);
var e1subs = '^ !';
var expected = '--a--a--a--a--a--a--|';

expectObservable(e1.distinctKey('val', function () { return false; })).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should distinguish values by selector', function () {
var values = {a: {val: 1}, b: {val: 2}, c: {val: 3}, d: {val: 4}, e: {val: 5}};
var e1 = hot('--a--b--c--d--e--|', values);
var e1subs = '^ !';
var expected = '--a-----c-----e--|';
var selector = function (x, y) {
return y % 2 === 0;
};

expectObservable(e1.distinctKey('val', selector)).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should raises error when comparer throws', function () {
var values = {a: {val: 1}, b: {val: 2}, c: {val: 3}, d: {val: 4}, e: {val: 5}};
var e1 = hot('--a--b--c--d--e--|', values);
var e1subs = '^ ! ';
var expected = '--a--b--c--# ';
var selector = function (x, y) {
if (y === 4) {
throw 'error';
}
return x === y;
};

expectObservable(e1.distinctKey('val', selector)).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should support a flushing stream', function () {
var values = {a: {val: 1}, b: {val: 2}, c: {val: 3}, d: {val: 4}, e: {val: 5}};
var e1 = hot('--a--b--a--b--a--b--|', values);
var e1subs = '^ !';
var e2 = hot('-----------x--------|');
var e2subs = '^ !';
var expected = '--a--b--------a--b--|';
var selector = function (x, y) {
return x === y;
};

expectObservable(e1.distinct(selector, e2)).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should unsubscribe from the flushing stream when the main stream is unsubbed', function () {
var values = {a: {val: 1}, b: {val: 2}, c: {val: 3}, d: {val: 4}, e: {val: 5}};
var e1 = hot('--a--b--a--b--a--b--|', values);
var e1subs = '^ ! ';
var e2 = hot('-----------x--------|');
var e2subs = '^ ! ';
var unsub = ' ! ';
var expected = '--a--b------';
var selector = function (x, y) {
return x === y;
};

expectObservable(e1.distinct(selector, e2), unsub).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should allow opting in to default comparator with flush', function () {
var values = {a: {val: 1}, b: {val: 2}, c: {val: 3}, d: {val: 4}, e: {val: 5}};
var e1 = hot('--a--b--a--b--a--b--|', values);
var e1subs = '^ !';
var e2 = hot('-----------x--------|');
var e2subs = '^ !';
var expected = '--a--b--------a--b--|';

expectObservable(e1.distinct(null, e2)).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});
});
2 changes: 2 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export interface KitchenSinkOperators<T> extends CoreOperators<T> {
isEmpty?: () => Observable<boolean>;
elementAt?: (index: number, defaultValue?: any) => Observable<T>;
distinct?: (compare?: (x: T, y: T) => boolean, flushes?: Observable<any>) => Observable<T>;
distinctKey?: (key: string, compare?: (x: T, y: T) => boolean, flushes?: Observable<any>) => Observable<T>;
distinctUntilKeyChanged?: (key: string, compare?: (x: any, y: any) => boolean) => Observable<T>;
find?: (predicate: (value: T, index: number, source: Observable<T>) => boolean, thisArg?: any) => Observable<T>;
findIndex?: (predicate: (value: T, index: number, source: Observable<T>) => boolean, thisArg?: any) => Observable<number>;
Expand Down Expand Up @@ -65,6 +66,7 @@ import './add/operator/debounceTime';
import './add/operator/defaultIfEmpty';
import './add/operator/delay';
import './add/operator/distinct';
import './add/operator/distinctKey';
import './add/operator/distinctUntilChanged';
import './add/operator/distinctUntilKeyChanged';
import './add/operator/do';
Expand Down
12 changes: 12 additions & 0 deletions src/add/operator/distinctKey.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/**
* Everything in this file is generated by the 'tools/generate-operator-patches.ts' script.
* Any manual edits to this file will be lost next time the script is run.
**/
import {Observable} from '../../Observable';
import {distinctKey} from '../../operator/distinctKey';
import {KitchenSinkOperators} from '../../Rx.KitchenSink';

const observableProto = (<KitchenSinkOperators<any>>Observable.prototype);
observableProto.distinctKey = distinctKey;

export var _void: void;
23 changes: 23 additions & 0 deletions src/operator/distinctKey.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import {distinct} from './distinct';
import {Observable} from '../Observable';

/**
* Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items,
* using a property accessed by using the key provided to check if the two items are distinct.
* If a comparator function is provided, then it will be called for each item to test for whether or not that value should be emitted.
* If a comparator function is not provided, an equality check is used by default.
* As the internal HashSet of this operator grows larger and larger, care should be taken in the domain of inputs this operator may see.
* An optional paramter is also provided such that an Observable can be provided to queue the internal HashSet to flush the values it holds.
* @param {string} key string key for object property lookup on each item.
* @param {function} [compare] optional comparison function called to test if an item is distinct from previous items in the source.
* @param {Observable} [flushes] optional Observable for flushing the internal HashSet of the operator.
* @returns {Observable} an Observable that emits items from the source Observable with distinct values.
*/
export function distinctKey<T>(key: string, compare?: (x: T, y: T) => boolean, flushes?: Observable<any>): Observable<T> {
return distinct.call(this, function(x: T, y: T) {
if (compare) {
return compare(x[key], y[key]);
}
return x[key] === y[key];
}, flushes);
}

0 comments on commit fe4d57f

Please sign in to comment.