Skip to content

Commit

Permalink
Batch link can cancel operations that are in queue or in flight
Browse files Browse the repository at this point in the history
After an operation has been subscribed to, and so queued, it is possible
to unsubscribe from it, and it will be removed from the queue.

Unsubscribing will not impact the debounce, so other operations, if any, will
not be delayed by an unsubscription.

If a batch of operation is already in flight, and all operations are unsubscribed
then the entire XHR will be cancelled. If only some operations are unsubscribed
the XHR will be left untouched.
  • Loading branch information
PowerKiKi committed Dec 27, 2021
1 parent b34638b commit 0cd5331
Show file tree
Hide file tree
Showing 3 changed files with 255 additions and 82 deletions.
2 changes: 1 addition & 1 deletion src/link/batch-http/__tests__/batchHttpLink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ describe('BatchHttpLink', () => {
},
batchInterval: 1,
//if batchKey does not work, then the batch size would be 3
batchMax: 3,
batchMax: 2,
batchKey,
}),
]);
Expand Down
201 changes: 175 additions & 26 deletions src/link/batch/__tests__/batchLink.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
import gql from 'graphql-tag';
import { print } from 'graphql';

import { ApolloLink } from '../../core/ApolloLink';
import { execute } from '../../core/execute';
import { Operation, FetchResult, GraphQLRequest } from '../../core/types';
import { Observable } from '../../../utilities/observables/Observable';
import {
BatchLink,
OperationBatcher,
BatchHandler,
BatchableRequest,
} from '../batchLink';
import { itAsync } from '../../../testing';
import {print} from 'graphql';

import {ApolloLink} from '../../core/ApolloLink';
import {execute} from '../../core/execute';
import {FetchResult, GraphQLRequest, Operation} from '../../core/types';
import {Observable, ObservableSubscription} from '../../../utilities/observables/Observable';
import {BatchableRequest, BatchHandler, BatchLink, OperationBatcher,} from '../batchLink';
import {itAsync} from '../../../testing';

interface MockedResponse {
request: GraphQLRequest;
Expand All @@ -27,7 +22,7 @@ function getKey(operation: GraphQLRequest) {
return JSON.stringify([operationName, query, variables]);
}

export function createOperation(
function createOperation(
starting: any,
operation: GraphQLRequest,
): Operation {
Expand Down Expand Up @@ -188,9 +183,9 @@ describe('OperationBatcher', () => {

expect(batcher.queuedRequests.get('')).toBeUndefined();
batcher.enqueueRequest(request).subscribe({});
expect(batcher.queuedRequests.get('')!.length).toBe(1);
expect(batcher.queuedRequests.get('')!.requests.length).toBe(1);
batcher.enqueueRequest(request).subscribe({});
expect(batcher.queuedRequests.get('')!.length).toBe(2);
expect(batcher.queuedRequests.get('')!.requests.length).toBe(2);
});

describe('request queue', () => {
Expand Down Expand Up @@ -304,7 +299,7 @@ describe('OperationBatcher', () => {
});

try {
expect(myBatcher.queuedRequests.get('')!.length).toBe(2);
expect(myBatcher.queuedRequests.get('')!.requests.length).toBe(2);
const observables: (
| Observable<FetchResult>
| undefined)[] = myBatcher.consumeQueue()!;
Expand Down Expand Up @@ -346,22 +341,22 @@ describe('OperationBatcher', () => {
myBatcher.enqueueRequest({ operation }).subscribe({});
myBatcher.enqueueRequest({ operation }).subscribe({});
myBatcher.enqueueRequest({ operation }).subscribe({});
expect(myBatcher.queuedRequests.get('')!.length).toEqual(3);
expect(myBatcher.queuedRequests.get('')!.requests.length).toEqual(3);

// 2. Run the timer halfway.
jest.advanceTimersByTime(batchInterval / 2);
expect(myBatcher.queuedRequests.get('')!.length).toEqual(3);
expect(myBatcher.queuedRequests.get('')!.requests.length).toEqual(3);

// 3. Queue a 4th request, causing the timer to reset.
myBatcher.enqueueRequest({ operation }).subscribe({});
expect(myBatcher.queuedRequests.get('')!.length).toEqual(4);
expect(myBatcher.queuedRequests.get('')!.requests.length).toEqual(4);

// 4. Run the timer to batchInterval + 1, at this point, if debounce were
// not set, the original 3 requests would have fired, but we expect
// instead that the queries will instead fire at
// (batchInterval + batchInterval / 2).
jest.advanceTimersByTime(batchInterval / 2 + 1);
expect(myBatcher.queuedRequests.get('')!.length).toEqual(4);
expect(myBatcher.queuedRequests.get('')!.requests.length).toEqual(4);

// 5. Finally, run the timer to (batchInterval + batchInterval / 2) +1,
// and expect the queue to be empty.
Expand Down Expand Up @@ -396,7 +391,7 @@ describe('OperationBatcher', () => {

batcher.enqueueRequest({ operation }).subscribe({});
try {
expect(batcher.queuedRequests.get('')!.length).toBe(1);
expect(batcher.queuedRequests.get('')!.requests.length).toBe(1);
} catch (e) {
reject(e);
}
Expand All @@ -409,6 +404,107 @@ describe('OperationBatcher', () => {
);
});

itAsync('should cancel single query in queue when unsubscribing', (resolve, reject) => {
const data = {
lastName: 'Ever',
firstName: 'Greatest',
};
const batcher = new OperationBatcher({
batchInterval: 10,
batchHandler: () =>
new Observable(observer => {
observer.next([{data}]);
setTimeout(observer.complete.bind(observer));
}),
});
const query = gql`
query {
author {
firstName
lastName
}
}
`;
const operation: Operation = createOperation({}, {query});

batcher.enqueueRequest({operation}).subscribe(() => reject('next should never be called')).unsubscribe();

expect(batcher.queuedRequests.get('')).toBeUndefined();
resolve();
});

itAsync('should cancel single query in queue with multiple subscriptions', (resolve, reject) => {
const data = {
lastName: 'Ever',
firstName: 'Greatest',
};
const batcher = new OperationBatcher({
batchInterval: 10,
batchHandler: () =>
new Observable(observer => {
observer.next([{data}]);
setTimeout(observer.complete.bind(observer));
}),
});
const query = gql`
query {
author {
firstName
lastName
}
}
`;
const operation: Operation = createOperation({}, {query});

const observable = batcher.enqueueRequest({operation});
const sub1 = observable.subscribe(() => reject('next should never be called'));
expect(batcher.queuedRequests.get('')).not.toBeUndefined();
expect(batcher.queuedRequests.get('')?.requests[0].subscribers).toBe(1);

const sub2 = observable.subscribe(() => reject('next should never be called'));
expect(batcher.queuedRequests.get('')).not.toBeUndefined();
expect(batcher.queuedRequests.get('')?.requests[0].subscribers).toBe(2);

sub1.unsubscribe();
expect(batcher.queuedRequests.get('')).not.toBeUndefined();
expect(batcher.queuedRequests.get('')?.requests[0].subscribers).toBe(1);

sub2.unsubscribe();
expect(batcher.queuedRequests.get('')).toBeUndefined();
resolve();
});

itAsync('should cancel single query in flight when unsubscribing', (resolve, reject) => {
let subscription: ObservableSubscription | undefined;

const batcher = new OperationBatcher({
batchInterval: 10,
batchHandler: () =>
new Observable(() => {
// Instead of typically starting an XHR, we trigger the unsubscription from outside
setTimeout(() => subscription?.unsubscribe(), 5);

return () => {
expect(batcher.queuedRequests.get('')).toBeUndefined();
resolve();
}
}),
});

const query = gql`
query {
author {
firstName
lastName
}
}
`;

const operation: Operation = createOperation({}, {query});

subscription = batcher.enqueueRequest({operation}).subscribe(() => reject('next should never be called'));
});

itAsync('should correctly batch multiple queries', (resolve, reject) => {
const data = {
lastName: 'Ever',
Expand Down Expand Up @@ -441,7 +537,7 @@ describe('OperationBatcher', () => {
batcher.enqueueRequest({ operation }).subscribe({});
batcher.enqueueRequest({ operation: operation2 }).subscribe({});
try {
expect(batcher.queuedRequests.get('')!.length).toBe(2);
expect(batcher.queuedRequests.get('')!.requests.length).toBe(2);
} catch (e) {
reject(e);
}
Expand All @@ -450,7 +546,7 @@ describe('OperationBatcher', () => {
// The batch shouldn't be fired yet, so we can add one more request.
batcher.enqueueRequest({ operation: operation3 }).subscribe({});
try {
expect(batcher.queuedRequests.get('')!.length).toBe(3);
expect(batcher.queuedRequests.get('')!.requests.length).toBe(3);
} catch (e) {
reject(e);
}
Expand All @@ -465,6 +561,59 @@ describe('OperationBatcher', () => {
);
});

itAsync('should cancel multiples queries in queue when unsubscribing and let pass still subscribed one', (resolve, reject) => {
const data2 = {
lastName: 'Hauser',
firstName: 'Evans',
};

const batcher = new OperationBatcher({
batchInterval: 10,
batchHandler: () =>
new Observable(observer => {
observer.next([{data: data2}]);
setTimeout(observer.complete.bind(observer));
}),
});

const query = gql`
query {
author {
firstName
lastName
}
}
`;

const operation: Operation = createOperation({}, {query});
const operation2: Operation = createOperation({}, {query});
const operation3: Operation = createOperation({}, {query});

const sub1 = batcher.enqueueRequest({operation}).subscribe(() => reject('next should never be called'));
batcher.enqueueRequest({operation: operation2}).subscribe(result => {
expect(result.data).toBe(data2);

// The batch should've been fired by now.
expect(batcher.queuedRequests.get('')).toBeUndefined();

resolve();
});

expect(batcher.queuedRequests.get('')!.requests.length).toBe(2);

sub1.unsubscribe();
expect(batcher.queuedRequests.get('')!.requests.length).toBe(1);

setTimeout(() => {
// The batch shouldn't be fired yet, so we can add one more request.
const sub3 = batcher.enqueueRequest({operation: operation3}).subscribe(() => reject('next should never be called'));
expect(batcher.queuedRequests.get('')!.requests.length).toBe(2);

sub3.unsubscribe();
expect(batcher.queuedRequests.get('')!.requests.length).toBe(1);
}, 5);
});

itAsync('should reject the promise if there is a network error', (resolve, reject) => {
const query = gql`
query {
Expand Down Expand Up @@ -765,14 +914,14 @@ describe('BatchLink', () => {
new BatchLink({
batchInterval: 1,
//if batchKey does not work, then the batch size would be 3
batchMax: 3,
batchMax: 2,
batchHandler,
batchKey,
}),
]);

let count = 0;
[1, 2, 3, 4].forEach(x => {
[1, 2, 3, 4].forEach(() => {
execute(link, {
query,
}).subscribe({
Expand Down
Loading

0 comments on commit 0cd5331

Please sign in to comment.