From 0429a697d98e3dafad4528f84c96449cf15e7e7b Mon Sep 17 00:00:00 2001 From: benlesh Date: Mon, 4 Sep 2017 14:14:13 -0700 Subject: [PATCH] feat(distinct): add higher-order lettable version of distinct --- src/operator/distinct.ts | 75 +---------------------- src/operators/distinct.ts | 124 ++++++++++++++++++++++++++++++++++++++ src/operators/index.ts | 1 + 3 files changed, 127 insertions(+), 73 deletions(-) create mode 100644 src/operators/distinct.ts diff --git a/src/operator/distinct.ts b/src/operator/distinct.ts index a781dbd1b4..6182f94766 100644 --- a/src/operator/distinct.ts +++ b/src/operator/distinct.ts @@ -1,11 +1,5 @@ import { Observable } from '../Observable'; -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; -import { TeardownLogic } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; -import { ISet, Set } from '../util/Set'; +import { distinct as higherOrder } from '../operators'; /** * Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items. @@ -55,70 +49,5 @@ import { ISet, Set } from '../util/Set'; export function distinct(this: Observable, keySelector?: (value: T) => K, flushes?: Observable): Observable { - return this.lift(new DistinctOperator(keySelector, flushes)); -} - -class DistinctOperator implements Operator { - constructor(private keySelector: (value: T) => K, private flushes: Observable) { - } - - call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new DistinctSubscriber(subscriber, this.keySelector, this.flushes)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -export class DistinctSubscriber extends OuterSubscriber { - private values: ISet = new Set(); - - constructor(destination: Subscriber, private keySelector: (value: T) => K, flushes: Observable) { - super(destination); - - if (flushes) { - this.add(subscribeToResult(this, flushes)); - } - } - - notifyNext(outerValue: T, innerValue: T, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - this.values.clear(); - } - - notifyError(error: any, innerSub: InnerSubscriber): void { - this._error(error); - } - - protected _next(value: T): void { - if (this.keySelector) { - this._useKeySelector(value); - } else { - this._finalizeNext(value, value); - } - } - - private _useKeySelector(value: T): void { - let key: K; - const { destination } = this; - try { - key = this.keySelector(value); - } catch (err) { - destination.error(err); - return; - } - this._finalizeNext(key, value); - } - - private _finalizeNext(key: K|T, value: T) { - const { values } = this; - if (!values.has(key)) { - values.add(key); - this.destination.next(value); - } - } - + return higherOrder(keySelector, flushes)(this); } diff --git a/src/operators/distinct.ts b/src/operators/distinct.ts new file mode 100644 index 0000000000..241cf338d2 --- /dev/null +++ b/src/operators/distinct.ts @@ -0,0 +1,124 @@ +import { Observable } from '../Observable'; +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { TeardownLogic } from '../Subscription'; +import { OuterSubscriber } from '../OuterSubscriber'; +import { InnerSubscriber } from '../InnerSubscriber'; +import { subscribeToResult } from '../util/subscribeToResult'; +import { ISet, Set } from '../util/Set'; +import { MonoTypeOperatorFunction } from '../interfaces'; + +/** + * Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items. + * + * If a keySelector function is provided, then it will project each value from the source observable into a new value that it will + * check for equality with previously projected values. If a keySelector function is not provided, it will use each value from the + * source observable directly with an equality check against previous values. + * + * In JavaScript runtimes that support `Set`, this operator will use a `Set` to improve performance of the distinct value checking. + * + * In other runtimes, this operator will use a minimal implementation of `Set` that relies on an `Array` and `indexOf` under the + * hood, so performance will degrade as more values are checked for distinction. Even in newer browsers, a long-running `distinct` + * use might result in memory leaks. To help alleviate this in some scenarios, an optional `flushes` parameter is also provided so + * that the internal `Set` can be "flushed", basically clearing it of values. + * + * @example A simple example with numbers + * Observable.of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1) + * .distinct() + * .subscribe(x => console.log(x)); // 1, 2, 3, 4 + * + * @example An example using a keySelector function + * interface Person { + * age: number, + * name: string + * } + * + * Observable.of( + * { age: 4, name: 'Foo'}, + * { age: 7, name: 'Bar'}, + * { age: 5, name: 'Foo'}) + * .distinct((p: Person) => p.name) + * .subscribe(x => console.log(x)); + * + * // displays: + * // { age: 4, name: 'Foo' } + * // { age: 7, name: 'Bar' } + * + * @see {@link distinctUntilChanged} + * @see {@link distinctUntilKeyChanged} + * + * @param {function} [keySelector] Optional function to select which value you want to check as distinct. + * @param {Observable} [flushes] Optional Observable for flushing the internal HashSet of the operator. + * @return {Observable} An Observable that emits items from the source Observable with distinct values. + * @method distinct + * @owner Observable + */ +export function distinct(keySelector?: (value: T) => K, + flushes?: Observable): MonoTypeOperatorFunction { + return (source: Observable) => source.lift(new DistinctOperator(keySelector, flushes)); +} + +class DistinctOperator implements Operator { + constructor(private keySelector: (value: T) => K, private flushes: Observable) { + } + + call(subscriber: Subscriber, source: any): TeardownLogic { + return source.subscribe(new DistinctSubscriber(subscriber, this.keySelector, this.flushes)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +export class DistinctSubscriber extends OuterSubscriber { + private values: ISet = new Set(); + + constructor(destination: Subscriber, private keySelector: (value: T) => K, flushes: Observable) { + super(destination); + + if (flushes) { + this.add(subscribeToResult(this, flushes)); + } + } + + notifyNext(outerValue: T, innerValue: T, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + this.values.clear(); + } + + notifyError(error: any, innerSub: InnerSubscriber): void { + this._error(error); + } + + protected _next(value: T): void { + if (this.keySelector) { + this._useKeySelector(value); + } else { + this._finalizeNext(value, value); + } + } + + private _useKeySelector(value: T): void { + let key: K; + const { destination } = this; + try { + key = this.keySelector(value); + } catch (err) { + destination.error(err); + return; + } + this._finalizeNext(key, value); + } + + private _finalizeNext(key: K|T, value: T) { + const { values } = this; + if (!values.has(key)) { + values.add(key); + this.destination.next(value); + } + } + +} diff --git a/src/operators/index.ts b/src/operators/index.ts index 9e27aa5e68..78c32c3aca 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -19,6 +19,7 @@ export { defaultIfEmpty } from './defaultIfEmpty'; export { delay } from './delay'; export { delayWhen } from './delayWhen'; export { dematerialize } from './dematerialize'; +export { distinct } from './distinct'; export { distinctUntilChanged } from './distinctUntilChanged'; export { distinctUntilKeyChanged } from './distinctUntilKeyChanged'; export { elementAt } from './elementAt';