Skip to content

Commit

Permalink
Allow unsubscribing stale connections on reobserve (#9791)
Browse files Browse the repository at this point in the history
  • Loading branch information
javier-garcia-meteologica authored Jun 10, 2022
1 parent 8ec45a9 commit 3d039f8
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 6 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
- Ensure `useQuery(query, { skip: true }).called === false` rather than always returning `called` as `true`. <br/>
[@KucharskiPiotr](https://github.com/KucharskiPiotr) in [#9798](https://github.com/apollographql/apollo-client/pull/9798)

- Allow abandoned `reobserve` requests to unsubscribe from their underlying `Observable`. <br/>
[@javier-garcia-meteologica](https://github.com/javier-garcia-meteologica) in [#9791](https://github.com/apollographql/apollo-client/pull/9791)

## Apollo Client 3.6.7 (2022-06-10)

### Bug Fixes
Expand Down
2 changes: 1 addition & 1 deletion src/core/ObservableQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ Did you mean to call refetch(variables) instead of refetch({ variables })?`);
// because we just want to ignore the old observable, not prematurely shut
// it down, since other consumers may be awaiting this.concast.promise.
if (this.concast && this.observer) {
this.concast.removeObserver(this.observer, true);
this.concast.removeObserver(this.observer);
}

this.concast = concast;
Expand Down
116 changes: 111 additions & 5 deletions src/core/__tests__/QueryManager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { QueryManager } from '../../QueryManager';
import { ApolloError } from '../../../errors';

// testing utils
import { waitFor } from '@testing-library/react';
import wrap from '../../../testing/core/wrap';
import observableToPromise, {
observableToPromiseAndSubscription,
Expand Down Expand Up @@ -507,10 +508,12 @@ describe('QueryManager', () => {
return new Observable(observer => {
onRequestSubscribe();

// Delay (100ms) must be bigger than unsubscribe await (5ms)
// to show clearly that the connection was aborted before completing
const timer = setTimeout(() => {
observer.next(mockedResponse.result);
observer.complete();
}, 0);
}, 100);

return () => {
onRequestUnsubscribe();
Expand Down Expand Up @@ -543,16 +546,119 @@ describe('QueryManager', () => {
subscription.unsubscribe();

return new Promise(
// Unsubscribing from the link happens after a microtask
// (Promise.resolve().then) delay, so we need to wait at least that
// long before verifying onRequestUnsubscribe was called.
resolve => setTimeout(resolve, 0)
// Unsubscribing from the link requires around 5ms to take effect
resolve => setTimeout(resolve, 5)
).then(() => {
expect(onRequestSubscribe).toHaveBeenCalledTimes(1);
expect(onRequestUnsubscribe).toHaveBeenCalledTimes(1);
}).then(resolve, reject);
});

itAsync('causes link unsubscription after reobserve', (resolve, reject) => {
const expResult = {
data: {
allPeople: {
people: [
{
name: 'Luke Skywalker',
},
],
},
},
};

const request = {
query: gql`
query people ($offset: Int) {
allPeople(first: $offset) {
people {
name
}
}
}
`,
variables: undefined
};

const mockedResponse = {
request,
result: expResult
};

const onRequestSubscribe = jest.fn();
const onRequestUnsubscribe = jest.fn();

const mockedSingleLink = new ApolloLink(() => {
return new Observable(observer => {
onRequestSubscribe();

// Delay (100ms) must be bigger than sum of reobserve and unsubscribe awaits (5ms each)
// to show clearly that the connection was aborted before completing
const timer = setTimeout(() => {
observer.next(mockedResponse.result);
observer.complete();
}, 100);

return () => {
onRequestUnsubscribe();
clearTimeout(timer);
};
});
});

const mockedQueryManger = new QueryManager({
link: mockedSingleLink,
cache: new InMemoryCache({ addTypename: false }),
defaultOptions: {
watchQuery: {
fetchPolicy: 'cache-and-network',
returnPartialData: false,
partialRefetch: true,
notifyOnNetworkStatusChange: true
},
query: {
fetchPolicy: 'network-only'
}
},
queryDeduplication: false
});

const observableQuery = mockedQueryManger.watchQuery<
(typeof expResult)['data'],
{ offset?: number | undefined }
>({
query: request.query,
variables: request.variables
});

const observerCallback = wrap(reject, () => {
reject(new Error('Link subscription should have been cancelled'));
});

const subscription = observableQuery.subscribe({
next: observerCallback,
error: observerCallback,
complete: observerCallback
});

expect(onRequestSubscribe).toHaveBeenCalledTimes(1);

// This is the most important part of this test
// Check that reobserve cancels the previous connection while watchQuery remains active
observableQuery.reobserve({ variables: { offset: 20 } });

return waitFor(() => {
// Verify that previous connection was aborted by reobserve
expect(onRequestUnsubscribe).toHaveBeenCalledTimes(1);
}).then(() => {
subscription.unsubscribe();
return waitFor(() => {
expect(onRequestSubscribe).toHaveBeenCalledTimes(2);
expect(onRequestUnsubscribe).toHaveBeenCalledTimes(2);
});
}).then(resolve, reject);
});

itAsync('supports interoperability with other Observable implementations like RxJS', (resolve, reject) => {
const expResult = {
data: {
Expand Down

0 comments on commit 3d039f8

Please sign in to comment.