Skip to content

Commit

Permalink
fix(concurrency): ensure responses resolve in order (#753)
Browse files Browse the repository at this point in the history
Co-authored-by: François Chalifour <francoischalifour@users.noreply.github.com>
  • Loading branch information
sarahdayan and francoischalifour authored Oct 8, 2021
1 parent 82d1c20 commit d15c404
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 79 deletions.
2 changes: 1 addition & 1 deletion bundlesize.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
},
{
"path": "packages/autocomplete-js/dist/umd/index.production.js",
"maxSize": "15.75 kB"
"maxSize": "16 kB"
},
{
"path": "packages/autocomplete-preset-algolia/dist/umd/index.production.js",
Expand Down
59 changes: 49 additions & 10 deletions packages/autocomplete-core/src/__tests__/concurrency.test.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,38 @@
import userEvent from '@testing-library/user-event';

import { AutocompleteState } from '..';
import { createSource, defer } from '../../../../test/utils';
import { createAutocomplete } from '../createAutocomplete';

describe.skip('concurrency', () => {
type Item = {
label: string;
};

describe('concurrency', () => {
test('resolves the responses in order from getSources', async () => {
// These delays make the second query come back after the third one.
const delays = [100, 300, 200];
let deferCount = -1;
const sourcesDelays = [100, 150, 200];
const itemsDelays = [0, 150, 0];
let deferSourcesCount = -1;
let deferItemsCount = -1;

const getSources = ({ query }) => {
deferCount++;
deferSourcesCount++;

return defer(() => {
return [
createSource({
getItems() {
return [{ label: query }];
deferItemsCount++;

return defer(
() => [{ label: query }],
itemsDelays[deferItemsCount]
);
},
}),
];
}, delays[deferCount]);
}, sourcesDelays[deferSourcesCount]);
};
const onStateChange = jest.fn();
const autocomplete = createAutocomplete({ getSources, onStateChange });
Expand All @@ -33,11 +45,18 @@ describe.skip('concurrency', () => {
userEvent.type(input, 'b');
userEvent.type(input, 'c');

await defer(() => {}, Math.max(...delays));
const timeout = Math.max(
...sourcesDelays.map((delay, index) => delay + itemsDelays[index])
);

await defer(() => {}, timeout);

const itemsHistory: Array<{ label: string }> = (onStateChange.mock
.calls as any).flatMap((x) =>
x[0].state.collections.flatMap((x) => x.items)
let stateHistory: Array<
AutocompleteState<Item>
> = onStateChange.mock.calls.flatMap((x) => x[0].state);

const itemsHistory: Item[] = stateHistory.flatMap(({ collections }) =>
collections.flatMap((x) => x.items)
);

// The first query should have brought results.
Expand All @@ -50,6 +69,26 @@ describe.skip('concurrency', () => {
expect.objectContaining({ label: 'abc' })
);

expect(stateHistory[stateHistory.length - 1]).toEqual(
expect.objectContaining({ isOpen: true })
);

userEvent.type(input, '{backspace}'.repeat(3));

await defer(() => {}, timeout);

stateHistory = onStateChange.mock.calls.flatMap((x) => x[0].state);

// The collections are empty despite late resolving promises.
expect(stateHistory[stateHistory.length - 1].collections).toEqual([
expect.objectContaining({ items: [] }),
]);

// The panel closes despite late resolving promises.
expect(stateHistory[stateHistory.length - 1]).toEqual(
expect.objectContaining({ isOpen: false })
);

document.body.removeChild(input);
});
});
140 changes: 76 additions & 64 deletions packages/autocomplete-core/src/onInput.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
BaseItem,
InternalAutocompleteOptions,
} from './types';
import { getActiveItem } from './utils';
import { createConcurrentSafePromise, getActiveItem } from './utils';

let lastStalledId: number | null = null;

Expand All @@ -27,6 +27,8 @@ interface OnInputParams<TItem extends BaseItem>
store: AutocompleteStore<TItem>;
}

const runConcurrentSafePromise = createConcurrentSafePromise();

export function onInput<TItem extends BaseItem>({
event,
nextState = {},
Expand All @@ -52,18 +54,22 @@ export function onInput<TItem extends BaseItem>({
setActiveItemId(props.defaultActiveItemId);

if (!query && props.openOnFocus === false) {
const collections = store.getState().collections.map((collection) => ({
...collection,
items: [],
}));

setStatus('idle');
setCollections(
store.getState().collections.map((collection) => ({
...collection,
items: [],
}))
);
setCollections(collections);
setIsOpen(
nextState.isOpen ?? props.shouldPanelOpen({ state: store.getState() })
);

return Promise.resolve();
// We make sure to update the latest resolved value of the tracked
// promises to keep late resolving promises from "cancelling" the state
// updates performed in this code path.
// We chain with a void promise to respect `onInput`'s expected return type.
return runConcurrentSafePromise(collections).then(() => Promise.resolve());
}

setStatus('loading');
Expand All @@ -72,67 +78,73 @@ export function onInput<TItem extends BaseItem>({
setStatus('stalled');
}, props.stallThreshold);

return props
.getSources({
query,
refresh,
state: store.getState(),
...setters,
})
.then((sources) => {
setStatus('loading');

return Promise.all(
sources.map((source) => {
return Promise.resolve(
source.getItems({
query,
refresh,
state: store.getState(),
...setters,
})
).then((itemsOrDescription) =>
preResolve<TItem>(itemsOrDescription, source.sourceId)
);
})
)
.then(resolve)
.then((responses) => postResolve(responses, sources))
.then((collections) =>
reshape({ collections, props, state: store.getState() })
// We track the entire promise chain triggered by `onInput` before mutating
// the Autocomplete state to make sure that any state manipulation is based on
// fresh data regardless of when promises individually resolve.
// We don't track nested promises and only rely on the full chain resolution,
// meaning we should only ever manipulate the state once this concurrent-safe
// promise is resolved.
return runConcurrentSafePromise(
props
.getSources({
query,
refresh,
state: store.getState(),
...setters,
})
.then((sources) => {
return Promise.all(
sources.map((source) => {
return Promise.resolve(
source.getItems({
query,
refresh,
state: store.getState(),
...setters,
})
).then((itemsOrDescription) =>
preResolve<TItem>(itemsOrDescription, source.sourceId)
);
})
)
.then((collections) => {
setStatus('idle');
setCollections(collections as any);
const isPanelOpen = props.shouldPanelOpen({
state: store.getState(),
});
setIsOpen(
nextState.isOpen ??
((props.openOnFocus && !query && isPanelOpen) || isPanelOpen)
.then(resolve)
.then((responses) => postResolve(responses, sources))
.then((collections) =>
reshape({ collections, props, state: store.getState() })
);
})
)
.then((collections) => {
setStatus('idle');
setCollections(collections as any);
const isPanelOpen = props.shouldPanelOpen({
state: store.getState(),
});
setIsOpen(
nextState.isOpen ??
((props.openOnFocus && !query && isPanelOpen) || isPanelOpen)
);

const highlightedItem = getActiveItem(store.getState());
const highlightedItem = getActiveItem(store.getState());

if (store.getState().activeItemId !== null && highlightedItem) {
const { item, itemInputValue, itemUrl, source } = highlightedItem;
if (store.getState().activeItemId !== null && highlightedItem) {
const { item, itemInputValue, itemUrl, source } = highlightedItem;

source.onActive({
event,
item,
itemInputValue,
itemUrl,
refresh,
source,
state: store.getState(),
...setters,
});
}
})
.finally(() => {
if (lastStalledId) {
props.environment.clearTimeout(lastStalledId);
}
source.onActive({
event,
item,
itemInputValue,
itemUrl,
refresh,
source,
state: store.getState(),
...setters,
});
}
})
.finally(() => {
if (lastStalledId) {
props.environment.clearTimeout(lastStalledId);
}
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import { MaybePromise } from '@algolia/autocomplete-shared';
* This is useful to prevent older promises to resolve after a newer promise,
* otherwise resulting in stale resolved values.
*/
export function createConcurrentSafePromise<TValue>() {
export function createConcurrentSafePromise() {
let basePromiseId = -1;
let latestResolvedId = -1;
let latestResolvedValue: TValue | undefined = undefined;
let latestResolvedValue: unknown = undefined;

return function runConcurrentSafePromise(promise: MaybePromise<TValue>) {
return function runConcurrentSafePromise<TValue>(
promise: MaybePromise<TValue>
) {
basePromiseId++;
const currentPromiseId = basePromiseId;

Expand All @@ -30,7 +32,7 @@ export function createConcurrentSafePromise<TValue>() {
// | run(3) +--------> R3 |
// +----------------------------------+
if (latestResolvedValue && currentPromiseId < latestResolvedId) {
return latestResolvedValue;
return latestResolvedValue as TValue;
}

latestResolvedId = currentPromiseId;
Expand Down

0 comments on commit d15c404

Please sign in to comment.