-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(distinctUntilChanged): Ensure reentrant code is compared properly #6014
Merged
benlesh
merged 1 commit into
ReactiveX:master
from
benlesh:fix-distinctUntilChanged-reentrancy
Feb 11, 2021
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,78 +1,181 @@ | ||
import { MonoTypeOperatorFunction } from '../types'; | ||
import { identity } from '../util/identity'; | ||
import { operate } from '../util/lift'; | ||
import { OperatorSubscriber } from './OperatorSubscriber'; | ||
|
||
/* tslint:disable:max-line-length */ | ||
export function distinctUntilChanged<T>(compare?: (x: T, y: T) => boolean): MonoTypeOperatorFunction<T>; | ||
export function distinctUntilChanged<T, K>(compare: (x: K, y: K) => boolean, keySelector: (x: T) => K): MonoTypeOperatorFunction<T>; | ||
/* tslint:enable:max-line-length */ | ||
|
||
/** | ||
* Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item. | ||
* | ||
* If a comparator function is provided, then it will be called for each item to test for whether or not that value should be emitted. | ||
* The comparator function shourld return true if the values are the same, and false if they are different. | ||
* Returns a result {@link Observable} that emits all values pushed by the source observable if they | ||
* are distinct in comparison to the last value the result observable emitted. | ||
* | ||
* If a comparator function is not provided, an equality check is used by default. | ||
* 1. It will always emit the first value from the source. | ||
* 2. For all subsequent values pushed by the source, they will be compared to the previously emitted values | ||
* using the provided `comparator` or an `===` equality check. | ||
* 3. If the value pushed by the source is determined to be unequal by this check, that value is emitted and | ||
* becomes the new "previously emitted value" internally. | ||
* | ||
* ## Example | ||
* A simple example with numbers | ||
* | ||
* A very basic example with no `comparator`. Note that `1` is emitted more than once, | ||
* because it's distinct in comparison to the _previously emitted_ value, | ||
* not in comparison to _all other emitted values_. | ||
* | ||
* ```ts | ||
* import { of } from 'rxjs'; | ||
* import { distinctUntilChanged } from 'rxjs/operators'; | ||
* | ||
* of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4).pipe( | ||
* distinctUntilChanged(), | ||
* ) | ||
* .subscribe(x => console.log(x)); // 1, 2, 1, 2, 3, 4 | ||
* of(1, 1, 1, 2, 2, 2, 1, 1, 3, 3).pipe( | ||
* distinctUntilChanged() | ||
* ) | ||
* .subscribe(console.log); | ||
* // Logs: 1, 2, 1, 3 | ||
* ``` | ||
* | ||
* An example using a compare function | ||
* ```typescript | ||
* ## Example | ||
* | ||
* With a `comparator`, you can do custom comparisons. Let's say | ||
* you only want to emit a value when all of its components have | ||
* changed: | ||
* | ||
* ```ts | ||
* import { of } from 'rxjs'; | ||
* import { distinctUntilChanged } from 'rxjs/operators'; | ||
* | ||
* interface Person { | ||
* age: number, | ||
* name: string | ||
* } | ||
* | ||
*of( | ||
* { age: 4, name: 'Foo'}, | ||
* { age: 7, name: 'Bar'}, | ||
* { age: 5, name: 'Foo'}, | ||
* { age: 6, name: 'Foo'}, | ||
* ).pipe( | ||
* distinctUntilChanged((p: Person, q: Person) => p.name === q.name), | ||
* ) | ||
* .subscribe(x => console.log(x)); | ||
* | ||
* // displays: | ||
* // { age: 4, name: 'Foo' } | ||
* // { age: 7, name: 'Bar' } | ||
* // { age: 5, name: 'Foo' } | ||
* const totallyDifferentBuilds$ = of( | ||
* { engineVersion: '1.1.0', transmissionVersion: '1.2.0' }, | ||
* { engineVersion: '1.1.0', transmissionVersion: '1.4.0' }, | ||
* { engineVersion: '1.3.0', transmissionVersion: '1.4.0' }, | ||
* { engineVersion: '1.3.0', transmissionVersion: '1.5.0' }, | ||
* { engineVersion: '2.0.0', transmissionVersion: '1.5.0' } | ||
* ).pipe( | ||
* distinctUntilChanged((prev, curr) => { | ||
* return ( | ||
* prev.engineVersion === curr.engineVersion || | ||
* prev.transmissionVersion === curr.transmissionVersion | ||
* ); | ||
* }) | ||
* ); | ||
* | ||
* totallyDifferentBuilds$.subscribe(console.log); | ||
* | ||
* // Logs: | ||
* // {engineVersion: "1.1.0", transmissionVersion: "1.2.0"} | ||
* // {engineVersion: "1.3.0", transmissionVersion: "1.4.0"} | ||
* // {engineVersion: "2.0.0", transmissionVersion: "1.5.0"} | ||
* ``` | ||
* | ||
* ## Example | ||
* | ||
* You can also provide a custom `comparator` to check that emitted | ||
* changes are only in one direction. Let's say you only want to get | ||
* the next record temperature: | ||
* | ||
* ```ts | ||
* import { of } from "rxjs"; | ||
* import { distinctUntilChanged } from "rxjs/operators"; | ||
* | ||
* const temps$ = of(30, 31, 20, 34, 33, 29, 35, 20); | ||
* | ||
* const recordHighs$ = temps$.pipe( | ||
* distinctUntilChanged((prevHigh, temp) => { | ||
* // If the current temp is less than | ||
* // or the same as the previous record, | ||
* // the record hasn't changed. | ||
* return temp <= prevHigh; | ||
* }) | ||
* ); | ||
* | ||
* recordHighs$.subscribe(console.log); | ||
* // Logs: 30, 31, 34, 35 | ||
* ``` | ||
* | ||
* @see {@link distinct} | ||
* @see {@link distinctUntilKeyChanged} | ||
* @param comparator A function used to compare the previous and current values for | ||
* equality. Defaults to a `===` check. | ||
*/ | ||
export function distinctUntilChanged<T>(comparator?: (previous: T, current: T) => boolean): MonoTypeOperatorFunction<T>; | ||
|
||
/** | ||
* Returns a result {@link Observable} that emits all values pushed by the source observable if they | ||
* are distinct in comparison to the last value the result observable emitted. | ||
* | ||
* 1. It will always emit the first value from the source. | ||
* 2. The `keySelector` will be run against all values, including the first value. | ||
* 3. For all values after the first, the selected key will be compared against the key selected from | ||
* the previously emitted value using the `comparator`. | ||
* 4. If the keys are determined to be unequal by this check, the value (not the key), is emitted | ||
* and the selected key from that value is saved for future comparisons against other keys. | ||
* | ||
* ## Example | ||
* | ||
* Selecting update events only when the `updatedBy` field shows | ||
* the account changed hands... | ||
* | ||
* ```ts | ||
* // A stream of updates to a given account | ||
* const accountUpdates$ = of( | ||
* { updatedBy: "blesh", data: [] }, | ||
* { updatedBy: "blesh", data: [] }, | ||
* { updatedBy: "ncjamieson", data: [] }, | ||
* { updatedBy: "ncjamieson", data: [] }, | ||
* { updatedBy: "blesh", data: [] } | ||
* ); | ||
* | ||
* // We only want the events where it changed hands | ||
* const changedHands$ = accountUpdates$.pipe( | ||
* distinctUntilChanged(undefined, update => update.updatedBy) | ||
* ); | ||
* | ||
* @param {function} [compare] Optional comparison function called to test if an item is distinct from the previous item in the source. | ||
* A return value of true indicates that it is the same, and a return value of false means they are different. | ||
* @return {Observable} An Observable that emits items from the source Observable with distinct values. | ||
* changedHands$.subscribe(console.log); | ||
* // Logs: | ||
* // {updatedBy: "blesh", data: Array[0]} | ||
* // {updatedBy: "ncjamieson", data: Array[0]} | ||
* // {updatedBy: "blesh", data: Array[0]} | ||
* ``` | ||
* | ||
* @param comparator A function used to compare the previous and current keys for | ||
* equality. Defaults to a `===` check. | ||
* @param keySelector Used to select a key value to be passed to the `comparator`. | ||
*/ | ||
export function distinctUntilChanged<T, K>(compare?: (a: K, b: K) => boolean, keySelector?: (x: T) => K): MonoTypeOperatorFunction<T> { | ||
compare = compare ?? defaultCompare; | ||
export function distinctUntilChanged<T, K>( | ||
comparator: (previous: K, current: K) => boolean, | ||
keySelector: (value: T) => K | ||
): MonoTypeOperatorFunction<T>; | ||
|
||
export function distinctUntilChanged<T, K>( | ||
comparator?: (previous: K, current: K) => boolean, | ||
keySelector: (value: T) => K = identity as (value: T) => K | ||
): MonoTypeOperatorFunction<T> { | ||
// We've been allowing `null` do be passed as the `compare`, so we can't do | ||
// a default value for the parameter, because that will only work | ||
// for `undefined`. | ||
comparator = comparator ?? defaultCompare; | ||
|
||
return operate((source, subscriber) => { | ||
let prev: any; | ||
// The previous key, used to compare against keys selected | ||
// from new arrivals to determine "distinctiveness". | ||
let previousKey: K; | ||
// Whether or not this is the first value we've gotten. | ||
let first = true; | ||
|
||
source.subscribe( | ||
new OperatorSubscriber(subscriber, (value) => { | ||
const key: any = keySelector ? keySelector(value) : value; | ||
if (first || !compare!(prev, key)) { | ||
// We always call the key selector. | ||
const currentKey = keySelector(value); | ||
|
||
// If it's the first value, we always emit it. | ||
// Otherwise, we compare this key to the previous key, and | ||
// if the comparer returns false, we emit. | ||
if (first || !comparator!(previousKey, currentKey)) { | ||
// Update our state *before* we emit the value | ||
// as emission can be the source of re-entrant code | ||
// in functional libraries like this. We only really | ||
// need to do this if it's the first value, or if the | ||
// key we're tracking in previous needs to change. | ||
first = false; | ||
previousKey = currentKey; | ||
|
||
// Emit the value! | ||
subscriber.next(value); | ||
} | ||
prev = key; | ||
first = false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These can't happen after we next, it has to happen before. Otherwise the added test will fail. |
||
}) | ||
); | ||
}); | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this comment, because I tried to use a default value parameter here and it was sad. We should probably deprecate this sort of thing at some point, and make this a configuration object, I suppose.