diff --git a/api_guard/dist/types/operators/index.d.ts b/api_guard/dist/types/operators/index.d.ts index 8eafe2cf37..664e209916 100644 --- a/api_guard/dist/types/operators/index.d.ts +++ b/api_guard/dist/types/operators/index.d.ts @@ -80,8 +80,8 @@ export declare function dematerialize>(): export declare function distinct(keySelector?: (value: T) => K, flushes?: Observable): MonoTypeOperatorFunction; -export declare function distinctUntilChanged(compare?: (x: T, y: T) => boolean): MonoTypeOperatorFunction; -export declare function distinctUntilChanged(compare: (x: K, y: K) => boolean, keySelector: (x: T) => K): MonoTypeOperatorFunction; +export declare function distinctUntilChanged(comparator?: (previous: T, current: T) => boolean): MonoTypeOperatorFunction; +export declare function distinctUntilChanged(comparator: (previous: K, current: K) => boolean, keySelector: (value: T) => K): MonoTypeOperatorFunction; export declare function distinctUntilKeyChanged(key: keyof T): MonoTypeOperatorFunction; export declare function distinctUntilKeyChanged(key: K, compare: (x: T[K], y: T[K]) => boolean): MonoTypeOperatorFunction; diff --git a/spec/operators/distinctUntilChanged-spec.ts b/spec/operators/distinctUntilChanged-spec.ts index 2b22d7e2ba..f90c191ecc 100644 --- a/spec/operators/distinctUntilChanged-spec.ts +++ b/spec/operators/distinctUntilChanged-spec.ts @@ -1,7 +1,7 @@ /** @prettier */ import { expect } from 'chai'; import { distinctUntilChanged, mergeMap, take } from 'rxjs/operators'; -import { of, Observable } from 'rxjs'; +import { of, Observable, Subject } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; @@ -298,4 +298,34 @@ describe('distinctUntilChanged', () => { expect(sideEffects).to.deep.equal([0, 1, 2]); }); + + // This test is to cover a corner case where someone might write + // synchronous, reentrant code. At the time this test was authored, + // the operator was written in such a way that it would allow + // the duplicate non-distinct values to be emitted repeatedly. + it('should work properly with reentrant streams', () => { + const subject = new Subject(); + const results: any[] = []; + let count = 0; + + subject.pipe(distinctUntilChanged()).subscribe((n) => { + results.push(n); + + // Protect against an infinite loop in this test. + // That shouldn't happen. + if (++count > 2) { + throw new Error('this should have only been hit once'); + } + + // If we reenter with the same value, it should not + // emit again. + subject.next(1); + }); + + // Start with 1. + subject.next(1); + + // It should only have emitted one value. + expect(results).to.deep.equal([1]); + }); }); diff --git a/src/internal/operators/distinctUntilChanged.ts b/src/internal/operators/distinctUntilChanged.ts index 8f89e2215a..f7fd07b531 100644 --- a/src/internal/operators/distinctUntilChanged.ts +++ b/src/internal/operators/distinctUntilChanged.ts @@ -1,78 +1,181 @@ import { MonoTypeOperatorFunction } from '../types'; +import { identity } from '../util/identity'; import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; -/* tslint:disable:max-line-length */ -export function distinctUntilChanged(compare?: (x: T, y: T) => boolean): MonoTypeOperatorFunction; -export function distinctUntilChanged(compare: (x: K, y: K) => boolean, keySelector: (x: T) => K): MonoTypeOperatorFunction; -/* tslint:enable:max-line-length */ - /** - * Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item. - * - * If a comparator function is provided, then it will be called for each item to test for whether or not that value should be emitted. - * The comparator function shourld return true if the values are the same, and false if they are different. + * Returns a result {@link Observable} that emits all values pushed by the source observable if they + * are distinct in comparison to the last value the result observable emitted. * - * If a comparator function is not provided, an equality check is used by default. + * 1. It will always emit the first value from the source. + * 2. For all subsequent values pushed by the source, they will be compared to the previously emitted values + * using the provided `comparator` or an `===` equality check. + * 3. If the value pushed by the source is determined to be unequal by this check, that value is emitted and + * becomes the new "previously emitted value" internally. * * ## Example - * A simple example with numbers + * + * A very basic example with no `comparator`. Note that `1` is emitted more than once, + * because it's distinct in comparison to the _previously emitted_ value, + * not in comparison to _all other emitted values_. + * * ```ts * import { of } from 'rxjs'; * import { distinctUntilChanged } from 'rxjs/operators'; * - * of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4).pipe( - * distinctUntilChanged(), - * ) - * .subscribe(x => console.log(x)); // 1, 2, 1, 2, 3, 4 + * of(1, 1, 1, 2, 2, 2, 1, 1, 3, 3).pipe( + * distinctUntilChanged() + * ) + * .subscribe(console.log); + * // Logs: 1, 2, 1, 3 * ``` * - * An example using a compare function - * ```typescript + * ## Example + * + * With a `comparator`, you can do custom comparisons. Let's say + * you only want to emit a value when all of its components have + * changed: + * + * ```ts * import { of } from 'rxjs'; * import { distinctUntilChanged } from 'rxjs/operators'; * - * interface Person { - * age: number, - * name: string - * } - * - *of( - * { age: 4, name: 'Foo'}, - * { age: 7, name: 'Bar'}, - * { age: 5, name: 'Foo'}, - * { age: 6, name: 'Foo'}, - * ).pipe( - * distinctUntilChanged((p: Person, q: Person) => p.name === q.name), - * ) - * .subscribe(x => console.log(x)); - * - * // displays: - * // { age: 4, name: 'Foo' } - * // { age: 7, name: 'Bar' } - * // { age: 5, name: 'Foo' } + * const totallyDifferentBuilds$ = of( + * { engineVersion: '1.1.0', transmissionVersion: '1.2.0' }, + * { engineVersion: '1.1.0', transmissionVersion: '1.4.0' }, + * { engineVersion: '1.3.0', transmissionVersion: '1.4.0' }, + * { engineVersion: '1.3.0', transmissionVersion: '1.5.0' }, + * { engineVersion: '2.0.0', transmissionVersion: '1.5.0' } + * ).pipe( + * distinctUntilChanged((prev, curr) => { + * return ( + * prev.engineVersion === curr.engineVersion || + * prev.transmissionVersion === curr.transmissionVersion + * ); + * }) + * ); + * + * totallyDifferentBuilds$.subscribe(console.log); + * + * // Logs: + * // {engineVersion: "1.1.0", transmissionVersion: "1.2.0"} + * // {engineVersion: "1.3.0", transmissionVersion: "1.4.0"} + * // {engineVersion: "2.0.0", transmissionVersion: "1.5.0"} + * ``` + * + * ## Example + * + * You can also provide a custom `comparator` to check that emitted + * changes are only in one direction. Let's say you only want to get + * the next record temperature: + * + * ```ts + * import { of } from "rxjs"; + * import { distinctUntilChanged } from "rxjs/operators"; + * + * const temps$ = of(30, 31, 20, 34, 33, 29, 35, 20); + * + * const recordHighs$ = temps$.pipe( + * distinctUntilChanged((prevHigh, temp) => { + * // If the current temp is less than + * // or the same as the previous record, + * // the record hasn't changed. + * return temp <= prevHigh; + * }) + * ); + * + * recordHighs$.subscribe(console.log); + * // Logs: 30, 31, 34, 35 * ``` * - * @see {@link distinct} - * @see {@link distinctUntilKeyChanged} + * @param comparator A function used to compare the previous and current values for + * equality. Defaults to a `===` check. + */ +export function distinctUntilChanged(comparator?: (previous: T, current: T) => boolean): MonoTypeOperatorFunction; + +/** + * Returns a result {@link Observable} that emits all values pushed by the source observable if they + * are distinct in comparison to the last value the result observable emitted. + * + * 1. It will always emit the first value from the source. + * 2. The `keySelector` will be run against all values, including the first value. + * 3. For all values after the first, the selected key will be compared against the key selected from + * the previously emitted value using the `comparator`. + * 4. If the keys are determined to be unequal by this check, the value (not the key), is emitted + * and the selected key from that value is saved for future comparisons against other keys. + * + * ## Example + * + * Selecting update events only when the `updatedBy` field shows + * the account changed hands... + * + * ```ts + * // A stream of updates to a given account + * const accountUpdates$ = of( + * { updatedBy: "blesh", data: [] }, + * { updatedBy: "blesh", data: [] }, + * { updatedBy: "ncjamieson", data: [] }, + * { updatedBy: "ncjamieson", data: [] }, + * { updatedBy: "blesh", data: [] } + * ); + * + * // We only want the events where it changed hands + * const changedHands$ = accountUpdates$.pipe( + * distinctUntilChanged(undefined, update => update.updatedBy) + * ); * - * @param {function} [compare] Optional comparison function called to test if an item is distinct from the previous item in the source. - * A return value of true indicates that it is the same, and a return value of false means they are different. - * @return {Observable} An Observable that emits items from the source Observable with distinct values. + * changedHands$.subscribe(console.log); + * // Logs: + * // {updatedBy: "blesh", data: Array[0]} + * // {updatedBy: "ncjamieson", data: Array[0]} + * // {updatedBy: "blesh", data: Array[0]} + * ``` + * + * @param comparator A function used to compare the previous and current keys for + * equality. Defaults to a `===` check. + * @param keySelector Used to select a key value to be passed to the `comparator`. */ -export function distinctUntilChanged(compare?: (a: K, b: K) => boolean, keySelector?: (x: T) => K): MonoTypeOperatorFunction { - compare = compare ?? defaultCompare; +export function distinctUntilChanged( + comparator: (previous: K, current: K) => boolean, + keySelector: (value: T) => K +): MonoTypeOperatorFunction; + +export function distinctUntilChanged( + comparator?: (previous: K, current: K) => boolean, + keySelector: (value: T) => K = identity as (value: T) => K +): MonoTypeOperatorFunction { + // We've been allowing `null` do be passed as the `compare`, so we can't do + // a default value for the parameter, because that will only work + // for `undefined`. + comparator = comparator ?? defaultCompare; + return operate((source, subscriber) => { - let prev: any; + // The previous key, used to compare against keys selected + // from new arrivals to determine "distinctiveness". + let previousKey: K; + // Whether or not this is the first value we've gotten. let first = true; + source.subscribe( new OperatorSubscriber(subscriber, (value) => { - const key: any = keySelector ? keySelector(value) : value; - if (first || !compare!(prev, key)) { + // We always call the key selector. + const currentKey = keySelector(value); + + // If it's the first value, we always emit it. + // Otherwise, we compare this key to the previous key, and + // if the comparer returns false, we emit. + if (first || !comparator!(previousKey, currentKey)) { + // Update our state *before* we emit the value + // as emission can be the source of re-entrant code + // in functional libraries like this. We only really + // need to do this if it's the first value, or if the + // key we're tracking in previous needs to change. + first = false; + previousKey = currentKey; + + // Emit the value! subscriber.next(value); } - prev = key; - first = false; }) ); });