Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(core) - Avoid dispatching an operation for already active cache-first/only operations #1600

Merged
merged 3 commits into from
May 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/large-frogs-tease.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@urql/core': minor
---

With the "single-source behavior" the `Client` will now also avoid executing an operation if it's already active, has a previous result available, and is either run with the `cache-first` or `cache-only` request policies. This is similar to a "short circuiting" behavior, where unnecessary work is avoided by not issuing more operations into the exchange pipeline than expected.
134 changes: 121 additions & 13 deletions packages/core/src/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ describe('shared sources behavior', () => {
jest.useRealTimers();
});

it('replays results from prior operation result as needed', async () => {
it('replays results from prior operation result as needed (cache-first)', async () => {
const exchange: Exchange = () => ops$ => {
let i = 0;
return pipe(
Expand Down Expand Up @@ -556,10 +556,111 @@ describe('shared sources behavior', () => {

expect(resultTwo).toHaveBeenCalledWith({
data: 1,
operation: queryOperation,
});

jest.advanceTimersByTime(1);

// With cache-first we don't expect a new operation to be issued
expect(resultTwo).toHaveBeenCalledTimes(1);
});

it('replays results from prior operation result as needed (network-only)', async () => {
const exchange: Exchange = () => ops$ => {
let i = 0;
return pipe(
ops$,
map(op => ({
data: ++i,
operation: op,
})),
delay(1)
);
};

const client = createClient({
url: 'test',
exchanges: [exchange],
});

const operation = makeOperation('query', queryOperation, {
...queryOperation.context,
requestPolicy: 'network-only',
});

const resultOne = jest.fn();
const resultTwo = jest.fn();

pipe(client.executeRequestOperation(operation), subscribe(resultOne));

expect(resultOne).toHaveBeenCalledTimes(0);

jest.advanceTimersByTime(1);

expect(resultOne).toHaveBeenCalledTimes(1);
expect(resultOne).toHaveBeenCalledWith({
data: 1,
operation,
});

pipe(client.executeRequestOperation(operation), subscribe(resultTwo));

expect(resultTwo).toHaveBeenCalledWith({
data: 1,
operation,
stale: true,
});

jest.advanceTimersByTime(1);

// With network-only we expect a new operation to be issued, hence a new result
expect(resultTwo).toHaveBeenCalledTimes(2);

expect(resultTwo).toHaveBeenCalledWith({
data: 2,
operation,
});
});

it('does not replay values from a past subscription', async () => {
const exchange: Exchange = () => ops$ => {
let i = 0;
return pipe(
ops$,
filter(op => op.kind !== 'teardown'),
map(op => ({
data: ++i,
operation: op,
})),
delay(1)
);
};

const client = createClient({
url: 'test',
exchanges: [exchange],
});

// We keep the source in-memory
const source = client.executeRequestOperation(queryOperation);
const resultOne = jest.fn();
let subscription;

subscription = pipe(source, subscribe(resultOne));

expect(resultOne).toHaveBeenCalledTimes(0);
jest.advanceTimersByTime(1);

expect(resultOne).toHaveBeenCalledWith({
data: 1,
operation: queryOperation,
});

subscription.unsubscribe();
const resultTwo = jest.fn();
subscription = pipe(source, subscribe(resultTwo));

expect(resultTwo).toHaveBeenCalledTimes(0);
jest.advanceTimersByTime(1);

expect(resultTwo).toHaveBeenCalledWith({
Expand All @@ -586,19 +687,21 @@ describe('shared sources behavior', () => {
exchanges: [exchange],
});

const operation = makeOperation('query', queryOperation, {
...queryOperation.context,
requestPolicy: 'network-only',
});

const resultOne = jest.fn();
const resultTwo = jest.fn();

pipe(client.executeRequestOperation(queryOperation), subscribe(resultOne));

pipe(client.executeRequestOperation(queryOperation), subscribe(resultTwo));
pipe(client.executeRequestOperation(operation), subscribe(resultOne));
pipe(client.executeRequestOperation(operation), subscribe(resultTwo));

expect(resultOne).toHaveBeenCalledTimes(1);
expect(resultTwo).toHaveBeenCalledTimes(1);

expect(resultTwo).toHaveBeenCalledWith({
data: 1,
operation: queryOperation,
operation,
stale: true,
});
});
Expand Down Expand Up @@ -628,7 +731,7 @@ describe('shared sources behavior', () => {
expect(resultTwo).toHaveBeenCalledTimes(0);
});

it('skips replaying results when a result is emitted immediately', () => {
it('skips replaying results when a result is emitted immediately (network-only)', () => {
const exchange: Exchange = () => ops$ => {
let i = 0;
return pipe(
Expand All @@ -642,26 +745,31 @@ describe('shared sources behavior', () => {
exchanges: [exchange],
});

const operation = makeOperation('query', queryOperation, {
...queryOperation.context,
requestPolicy: 'network-only',
});

const resultOne = jest.fn();
const resultTwo = jest.fn();

pipe(client.executeRequestOperation(queryOperation), subscribe(resultOne));
pipe(client.executeRequestOperation(operation), subscribe(resultOne));

expect(resultOne).toHaveBeenCalledWith({
data: 1,
operation: queryOperation,
operation,
});

pipe(client.executeRequestOperation(queryOperation), subscribe(resultTwo));
pipe(client.executeRequestOperation(operation), subscribe(resultTwo));

expect(resultTwo).toHaveBeenCalledWith({
data: 2,
operation: queryOperation,
operation,
});

expect(resultOne).toHaveBeenCalledWith({
data: 2,
operation: queryOperation,
operation,
});
});

Expand Down
7 changes: 6 additions & 1 deletion packages/core/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,14 @@ export class Client {
share
);
} else {
const mode =
operation.context.requestPolicy === 'cache-and-network' ||
operation.context.requestPolicy === 'network-only'
? 'pre'
: 'post';
active = pipe(
result$,
replayOnStart(() => {
replayOnStart(mode, () => {
this.activeOperations.set(operation.key, active!);
this.dispatchOperation(operation);
})
Expand Down
28 changes: 20 additions & 8 deletions packages/core/src/utils/streamUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@ export function withPromise<T>(source$: Source<T>): PromisifiedSource<T> {
return source$ as PromisifiedSource<T>;
}

export type ReplayMode = 'pre' | 'post';

export function replayOnStart<T extends OperationResult>(
start?: () => void
mode: ReplayMode,
start: () => void
): Operator<T, T> {
return source$ => {
let replay: T | void;

const shared$ = pipe(
source$,
onEnd(() => {
replay = undefined;
}),
onPush(value => {
replay = value;
}),
Expand All @@ -37,18 +43,24 @@ export function replayOnStart<T extends OperationResult>(
return make<T>(observer => {
const prevReplay = replay;

const subscription = pipe(
return pipe(
shared$,
onEnd(observer.complete),
onStart(() => {
if (start) start();
if (prevReplay !== undefined && prevReplay === replay)
observer.next({ ...prevReplay, stale: true });
if (mode === 'pre') {
start();
}

if (prevReplay !== undefined && prevReplay === replay) {
observer.next(
mode === 'pre' ? { ...prevReplay, stale: true } : prevReplay
);
} else if (mode === 'post') {
start();
}
}),
subscribe(observer.next)
);

return subscription.unsubscribe;
).unsubscribe;
});
};
}