Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(distinctUntilChanged): Ensure reentrant code is compared properly #6014

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api_guard/dist/types/operators/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ export declare function dematerialize<N extends ObservableNotification<any>>():

export declare function distinct<T, K>(keySelector?: (value: T) => K, flushes?: Observable<any>): MonoTypeOperatorFunction<T>;

export declare function distinctUntilChanged<T>(compare?: (x: T, y: T) => boolean): MonoTypeOperatorFunction<T>;
export declare function distinctUntilChanged<T, K>(compare: (x: K, y: K) => boolean, keySelector: (x: T) => K): MonoTypeOperatorFunction<T>;
export declare function distinctUntilChanged<T>(comparator?: (previous: T, current: T) => boolean): MonoTypeOperatorFunction<T>;
export declare function distinctUntilChanged<T, K>(comparator: (previous: K, current: K) => boolean, keySelector: (value: T) => K): MonoTypeOperatorFunction<T>;

export declare function distinctUntilKeyChanged<T>(key: keyof T): MonoTypeOperatorFunction<T>;
export declare function distinctUntilKeyChanged<T, K extends keyof T>(key: K, compare: (x: T[K], y: T[K]) => boolean): MonoTypeOperatorFunction<T>;
Expand Down
32 changes: 31 additions & 1 deletion spec/operators/distinctUntilChanged-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/** @prettier */
import { expect } from 'chai';
import { distinctUntilChanged, mergeMap, take } from 'rxjs/operators';
import { of, Observable } from 'rxjs';
import { of, Observable, Subject } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

Expand Down Expand Up @@ -298,4 +298,34 @@ describe('distinctUntilChanged', () => {

expect(sideEffects).to.deep.equal([0, 1, 2]);
});

// This test is to cover a corner case where someone might write
// synchronous, reentrant code. At the time this test was authored,
// the operator was written in such a way that it would allow
// the duplicate non-distinct values to be emitted repeatedly.
it('should work properly with reentrant streams', () => {
const subject = new Subject<number | undefined>();
const results: any[] = [];
let count = 0;

subject.pipe(distinctUntilChanged()).subscribe((n) => {
results.push(n);

// Protect against an infinite loop in this test.
// That shouldn't happen.
if (++count > 2) {
throw new Error('this should have only been hit once');
}

// If we reenter with the same value, it should not
// emit again.
subject.next(1);
});

// Start with 1.
subject.next(1);

// It should only have emitted one value.
expect(results).to.deep.equal([1]);
});
});
199 changes: 151 additions & 48 deletions src/internal/operators/distinctUntilChanged.ts
Original file line number Diff line number Diff line change
@@ -1,78 +1,181 @@
import { MonoTypeOperatorFunction } from '../types';
import { identity } from '../util/identity';
import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';

/* tslint:disable:max-line-length */
export function distinctUntilChanged<T>(compare?: (x: T, y: T) => boolean): MonoTypeOperatorFunction<T>;
export function distinctUntilChanged<T, K>(compare: (x: K, y: K) => boolean, keySelector: (x: T) => K): MonoTypeOperatorFunction<T>;
/* tslint:enable:max-line-length */

/**
* Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item.
*
* If a comparator function is provided, then it will be called for each item to test for whether or not that value should be emitted.
* The comparator function shourld return true if the values are the same, and false if they are different.
* Returns a result {@link Observable} that emits all values pushed by the source observable if they
* are distinct in comparison to the last value the result observable emitted.
*
* If a comparator function is not provided, an equality check is used by default.
* 1. It will always emit the first value from the source.
* 2. For all subsequent values pushed by the source, they will be compared to the previously emitted values
* using the provided `comparator` or an `===` equality check.
* 3. If the value pushed by the source is determined to be unequal by this check, that value is emitted and
* becomes the new "previously emitted value" internally.
*
* ## Example
* A simple example with numbers
*
* A very basic example with no `comparator`. Note that `1` is emitted more than once,
* because it's distinct in comparison to the _previously emitted_ value,
* not in comparison to _all other emitted values_.
*
* ```ts
* import { of } from 'rxjs';
* import { distinctUntilChanged } from 'rxjs/operators';
*
* of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4).pipe(
* distinctUntilChanged(),
* )
* .subscribe(x => console.log(x)); // 1, 2, 1, 2, 3, 4
* of(1, 1, 1, 2, 2, 2, 1, 1, 3, 3).pipe(
* distinctUntilChanged()
* )
* .subscribe(console.log);
* // Logs: 1, 2, 1, 3
* ```
*
* An example using a compare function
* ```typescript
* ## Example
*
* With a `comparator`, you can do custom comparisons. Let's say
* you only want to emit a value when all of its components have
* changed:
*
* ```ts
* import { of } from 'rxjs';
* import { distinctUntilChanged } from 'rxjs/operators';
*
* interface Person {
* age: number,
* name: string
* }
*
*of(
* { age: 4, name: 'Foo'},
* { age: 7, name: 'Bar'},
* { age: 5, name: 'Foo'},
* { age: 6, name: 'Foo'},
* ).pipe(
* distinctUntilChanged((p: Person, q: Person) => p.name === q.name),
* )
* .subscribe(x => console.log(x));
*
* // displays:
* // { age: 4, name: 'Foo' }
* // { age: 7, name: 'Bar' }
* // { age: 5, name: 'Foo' }
* const totallyDifferentBuilds$ = of(
* { engineVersion: '1.1.0', transmissionVersion: '1.2.0' },
* { engineVersion: '1.1.0', transmissionVersion: '1.4.0' },
* { engineVersion: '1.3.0', transmissionVersion: '1.4.0' },
* { engineVersion: '1.3.0', transmissionVersion: '1.5.0' },
* { engineVersion: '2.0.0', transmissionVersion: '1.5.0' }
* ).pipe(
* distinctUntilChanged((prev, curr) => {
* return (
* prev.engineVersion === curr.engineVersion ||
* prev.transmissionVersion === curr.transmissionVersion
* );
* })
* );
*
* totallyDifferentBuilds$.subscribe(console.log);
*
* // Logs:
* // {engineVersion: "1.1.0", transmissionVersion: "1.2.0"}
* // {engineVersion: "1.3.0", transmissionVersion: "1.4.0"}
* // {engineVersion: "2.0.0", transmissionVersion: "1.5.0"}
* ```
*
* ## Example
*
* You can also provide a custom `comparator` to check that emitted
* changes are only in one direction. Let's say you only want to get
* the next record temperature:
*
* ```ts
* import { of } from "rxjs";
* import { distinctUntilChanged } from "rxjs/operators";
*
* const temps$ = of(30, 31, 20, 34, 33, 29, 35, 20);
*
* const recordHighs$ = temps$.pipe(
* distinctUntilChanged((prevHigh, temp) => {
* // If the current temp is less than
* // or the same as the previous record,
* // the record hasn't changed.
* return temp <= prevHigh;
* })
* );
*
* recordHighs$.subscribe(console.log);
* // Logs: 30, 31, 34, 35
* ```
*
* @see {@link distinct}
* @see {@link distinctUntilKeyChanged}
* @param comparator A function used to compare the previous and current values for
* equality. Defaults to a `===` check.
*/
export function distinctUntilChanged<T>(comparator?: (previous: T, current: T) => boolean): MonoTypeOperatorFunction<T>;

/**
* Returns a result {@link Observable} that emits all values pushed by the source observable if they
* are distinct in comparison to the last value the result observable emitted.
*
* 1. It will always emit the first value from the source.
* 2. The `keySelector` will be run against all values, including the first value.
* 3. For all values after the first, the selected key will be compared against the key selected from
* the previously emitted value using the `comparator`.
* 4. If the keys are determined to be unequal by this check, the value (not the key), is emitted
* and the selected key from that value is saved for future comparisons against other keys.
*
* ## Example
*
* Selecting update events only when the `updatedBy` field shows
* the account changed hands...
*
* ```ts
* // A stream of updates to a given account
* const accountUpdates$ = of(
* { updatedBy: "blesh", data: [] },
* { updatedBy: "blesh", data: [] },
* { updatedBy: "ncjamieson", data: [] },
* { updatedBy: "ncjamieson", data: [] },
* { updatedBy: "blesh", data: [] }
* );
*
* // We only want the events where it changed hands
* const changedHands$ = accountUpdates$.pipe(
* distinctUntilChanged(undefined, update => update.updatedBy)
* );
*
* @param {function} [compare] Optional comparison function called to test if an item is distinct from the previous item in the source.
* A return value of true indicates that it is the same, and a return value of false means they are different.
* @return {Observable} An Observable that emits items from the source Observable with distinct values.
* changedHands$.subscribe(console.log);
* // Logs:
* // {updatedBy: "blesh", data: Array[0]}
* // {updatedBy: "ncjamieson", data: Array[0]}
* // {updatedBy: "blesh", data: Array[0]}
* ```
*
* @param comparator A function used to compare the previous and current keys for
* equality. Defaults to a `===` check.
* @param keySelector Used to select a key value to be passed to the `comparator`.
*/
export function distinctUntilChanged<T, K>(compare?: (a: K, b: K) => boolean, keySelector?: (x: T) => K): MonoTypeOperatorFunction<T> {
compare = compare ?? defaultCompare;
export function distinctUntilChanged<T, K>(
comparator: (previous: K, current: K) => boolean,
keySelector: (value: T) => K
): MonoTypeOperatorFunction<T>;

export function distinctUntilChanged<T, K>(
comparator?: (previous: K, current: K) => boolean,
keySelector: (value: T) => K = identity as (value: T) => K
): MonoTypeOperatorFunction<T> {
// We've been allowing `null` do be passed as the `compare`, so we can't do
// a default value for the parameter, because that will only work
// for `undefined`.
comparator = comparator ?? defaultCompare;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this comment, because I tried to use a default value parameter here and it was sad. We should probably deprecate this sort of thing at some point, and make this a configuration object, I suppose.


return operate((source, subscriber) => {
let prev: any;
// The previous key, used to compare against keys selected
// from new arrivals to determine "distinctiveness".
let previousKey: K;
// Whether or not this is the first value we've gotten.
let first = true;

source.subscribe(
new OperatorSubscriber(subscriber, (value) => {
const key: any = keySelector ? keySelector(value) : value;
if (first || !compare!(prev, key)) {
// We always call the key selector.
const currentKey = keySelector(value);

// If it's the first value, we always emit it.
// Otherwise, we compare this key to the previous key, and
// if the comparer returns false, we emit.
if (first || !comparator!(previousKey, currentKey)) {
// Update our state *before* we emit the value
// as emission can be the source of re-entrant code
// in functional libraries like this. We only really
// need to do this if it's the first value, or if the
// key we're tracking in previous needs to change.
first = false;
previousKey = currentKey;

// Emit the value!
subscriber.next(value);
}
prev = key;
first = false;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can't happen after we next, it has to happen before. Otherwise the added test will fail.

})
);
});
Expand Down