Skip to content

Commit

Permalink
fix(client): Dispose of subscription on complete or error messages (#23)
Browse files Browse the repository at this point in the history
* fix: subscription should be canceled on complete

* docs: update readme

* fix: actually dispose on error or complete
  • Loading branch information
enisdenjo authored Oct 1, 2020
1 parent c04cff1 commit fb4d8e9
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 18 deletions.
23 changes: 7 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,14 @@ const client = createClient({
(async () => {
const result = await new Promise((resolve, reject) => {
let result;
const dispose = client.subscribe(
client.subscribe(
{
query: '{ hello }',
},
{
next: (data) => (result = data),
error: reject,
complete: () => {
dispose();
resolve(result);
},
complete: () => resolve(result),
},
);
});
Expand All @@ -114,17 +111,14 @@ const client = createClient({
};

await new Promise((resolve, reject) => {
const dispose = client.subscribe(
client.subscribe(
{
query: 'subscription { greetings }',
},
{
next: onNext,
error: reject,
complete: () => {
dispose();
resolve();
},
complete: resolve,
},
);
});
Expand All @@ -148,13 +142,10 @@ const client = createClient({
async function execute<T>(payload: SubscribePayload) {
return new Promise((resolve, reject) => {
let result: T;
const dispose = client.subscribe<T>(payload, {
client.subscribe<T>(payload, {
next: (data) => (result = data),
error: reject,
complete: () => {
dispose();
resolve(result);
},
complete: () => resolve(result),
});
});
}
Expand All @@ -165,7 +156,7 @@ async function execute<T>(payload: SubscribePayload) {
const result = await execute({
query: '{ hello }',
});
// complete and dispose
// complete
// next = result = { data: { hello: 'Hello World!' } }
} catch (err) {
// error
Expand Down
15 changes: 13 additions & 2 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ export function createClient(options: ClientOptions): Client {
on: emitter.on,
subscribe(payload, sink) {
const id = generateID();
const cancellerRef: CancellerRef = { current: null };

const messageHandler = ({ data }: MessageEvent) => {
const message = memoParseMessage(data);
Expand All @@ -438,19 +439,28 @@ export function createClient(options: ClientOptions): Client {
case MessageType.Error: {
if (message.id === id) {
sink.error(message.payload);
// the canceller must be set at this point
// because you cannot receive a message
// if there is no existing connection
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
cancellerRef.current!();
}
return;
}
case MessageType.Complete: {
if (message.id === id) {
sink.complete();
// the canceller must be set at this point
// because you cannot receive a message
// if there is no existing connection
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
cancellerRef.current!();
}
return;
}
}
};

const cancellerRef: CancellerRef = { current: null };
(async () => {
for (;;) {
try {
Expand Down Expand Up @@ -511,7 +521,8 @@ export function createClient(options: ClientOptions): Client {
}
})()
.catch(sink.error)
.then(sink.complete); // resolves on cancel or normal closure
.then(sink.complete) // resolves on cancel or normal closure
.finally(() => (cancellerRef.current = null)); // when this promise settles there is nothing to cancel

return () => {
if (cancellerRef.current) {
Expand Down
50 changes: 50 additions & 0 deletions src/tests/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,56 @@ describe('subscription operation', () => {

expect(generateIDFn).toBeCalled();
});

it('should dispose of the subscription on complete', async () => {
const client = createClient({ url });

const completeFn = jest.fn();
client.subscribe(
{
query: `{
getValue
}`,
},
{
next: noop,
error: () => {
fail(`Unexpected error call`);
},
complete: completeFn,
},
);
await wait(20);

expect(completeFn).toBeCalled();

await wait(20);
expect(server.webSocketServer.clients.size).toBe(0);
});

it('should dispose of the subscription on error', async () => {
const client = createClient({ url });

const errorFn = jest.fn();
client.subscribe(
{
query: `{
iDontExist
}`,
},
{
next: noop,
error: errorFn,
complete: noop,
},
);
await wait(20);

expect(errorFn).toBeCalled();

await wait(20);
expect(server.webSocketServer.clients.size).toBe(0);
});
});

describe('"concurrency"', () => {
Expand Down

0 comments on commit fb4d8e9

Please sign in to comment.