Skip to content

Commit

Permalink
feat(client): connectionParams may return a promise (enisdenjo#71)
Browse files Browse the repository at this point in the history
* feat: begin implementation

* feat: test server wait for connect

* test: should pass the `connectionParams` through

* refactor: the client should be recycled

* fix: should gracefully handle connectionParams errors

* style: adjustments

* docs: generate
  • Loading branch information
enisdenjo authored Nov 12, 2020
1 parent 555c2c3 commit 33f210c
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 17 deletions.
10 changes: 8 additions & 2 deletions docs/interfaces/_client_.clientoptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# Interface: ClientOptions

Configuration used for the `create` client function.
Configuration used for the GraphQL over WebSocket client.

## Hierarchy

Expand All @@ -28,12 +28,18 @@ Configuration used for the `create` client function.

### connectionParams

`Optional` **connectionParams**: Record\<string, unknown> \| () => Record\<string, unknown>
`Optional` **connectionParams**: Record\<string, unknown> \| () => Promise\<Record\<string, unknown>> \| Record\<string, unknown>

Optional parameters, passed through the `payload` field with the `ConnectionInit` message,
that the client specifies when establishing a connection with the server. You can use this
for securely passing arguments for authentication.

If you decide to return a promise, keep in mind that the server might kick you off if it
takes too long to resolve! Check the `connectionInitWaitTimeout` on the server for more info.

Throwing an error from within this function will close the socket with the `Error` message
in the close event reason.

___

### generateID
Expand Down
2 changes: 1 addition & 1 deletion docs/modules/_client_.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Name | Type |

**createClient**(`options`: [ClientOptions](../interfaces/_client_.clientoptions.md)): [Client](../interfaces/_client_.client.md)

Creates a disposable GraphQL subscriptions client.
Creates a disposable GraphQL over WebSocket client.

#### Parameters:

Expand Down
45 changes: 32 additions & 13 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,24 @@ export type EventListener<E extends Event> = E extends EventConnecting

type CancellerRef = { current: (() => void) | null };

/** Configuration used for the `create` client function. */
/** Configuration used for the GraphQL over WebSocket client. */
export interface ClientOptions {
/** URL of the GraphQL over WebSocket Protocol compliant server to connect. */
url: string;
/**
* Optional parameters, passed through the `payload` field with the `ConnectionInit` message,
* that the client specifies when establishing a connection with the server. You can use this
* for securely passing arguments for authentication.
*
* If you decide to return a promise, keep in mind that the server might kick you off if it
* takes too long to resolve! Check the `connectionInitWaitTimeout` on the server for more info.
*
* Throwing an error from within this function will close the socket with the `Error` message
* in the close event reason.
*/
connectionParams?: Record<string, unknown> | (() => Record<string, unknown>);
connectionParams?:
| Record<string, unknown>
| (() => Promise<Record<string, unknown>> | Record<string, unknown>);
/**
* Should the connection be established immediately and persisted
* or after the first listener subscribed.
Expand Down Expand Up @@ -128,7 +136,7 @@ export interface Client extends Disposable {
subscribe<T = unknown>(payload: SubscribePayload, sink: Sink<T>): () => void;
}

/** Creates a disposable GraphQL subscriptions client. */
/** Creates a disposable GraphQL over WebSocket client. */
export function createClient(options: ClientOptions): Client {
const {
url,
Expand Down Expand Up @@ -318,23 +326,34 @@ export function createClient(options: ClientOptions): Client {
}
};

// as soon as the socket opens, send the connection initalisation request
// as soon as the socket opens and the connectionParams
// resolve, send the connection initalisation request
socket.onopen = () => {
socket.onopen = null;
if (cancelled) {
socket.close(3499, 'Client cancelled the socket before connecting');
return;
}

socket.send(
stringifyMessage<MessageType.ConnectionInit>({
type: MessageType.ConnectionInit,
payload:
typeof connectionParams === 'function'
? connectionParams()
: connectionParams,
}),
);
(async () => {
try {
socket.send(
stringifyMessage<MessageType.ConnectionInit>({
type: MessageType.ConnectionInit,
payload:
typeof connectionParams === 'function'
? await connectionParams()
: connectionParams,
}),
);
} catch (err) {
// even if not open, call close again to report error
socket.close(
4400,
err instanceof Error ? err.message : new Error(err).message,
);
}
})();
};
});

Expand Down
67 changes: 67 additions & 0 deletions src/tests/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,73 @@ it('should close with error message during connecting issues', async () => {
});
});

it('should pass the `connectionParams` through', async () => {
const server = await startTServer();

let client = createClient({
url: server.url,
lazy: false,
connectionParams: { auth: 'token' },
});
await server.waitForConnect((ctx) => {
expect(ctx.connectionParams).toEqual({ auth: 'token' });
});
await client.dispose();

client = createClient({
url: server.url,
lazy: false,
connectionParams: () => ({ from: 'func' }),
});
await server.waitForConnect((ctx) => {
expect(ctx.connectionParams).toEqual({ from: 'func' });
});
await client.dispose();

client = createClient({
url: server.url,
lazy: false,
connectionParams: () => Promise.resolve({ from: 'promise' }),
});
await server.waitForConnect((ctx) => {
expect(ctx.connectionParams).toEqual({ from: 'promise' });
});
});

it('should close the socket if the `connectionParams` rejects or throws', async () => {
const server = await startTServer();

let client = createClient({
url: server.url,
retryAttempts: 0,
connectionParams: () => {
throw new Error('No auth?');
},
});

let sub = tsubscribe(client, { query: '{ getValue }' });
await sub.waitForError((err) => {
const event = err as CloseEvent;
expect(event.code).toBe(4400);
expect(event.reason).toBe('No auth?');
expect(event.wasClean).toBeTruthy();
});

client = createClient({
url: server.url,
retryAttempts: 0,
connectionParams: () => Promise.reject(new Error('No auth?')),
});

sub = tsubscribe(client, { query: '{ getValue }' });
await sub.waitForError((err) => {
const event = err as CloseEvent;
expect(event.code).toBe(4400);
expect(event.reason).toBe('No auth?');
expect(event.wasClean).toBeTruthy();
});
});

describe('query operation', () => {
it('should execute the query, "next" the result and then complete', async () => {
const { url } = await startTServer();
Expand Down
34 changes: 33 additions & 1 deletion src/tests/fixtures/simple.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { EventEmitter } from 'events';
import WebSocket from 'ws';
import net from 'net';
import http from 'http';
import { createServer, ServerOptions, Server } from '../../server';
import { createServer, ServerOptions, Server, Context } from '../../server';

// distinct server for each test; if you forget to dispose, the fixture wont
const leftovers: Dispose[] = [];
Expand All @@ -32,6 +32,10 @@ export interface TServer {
test?: (client: WebSocket) => void,
expire?: number,
) => Promise<void>;
waitForConnect: (
test?: (ctx: Context) => void,
expire?: number,
) => Promise<void>;
waitForOperation: (test?: () => void, expire?: number) => Promise<void>;
waitForComplete: (test?: () => void, expire?: number) => Promise<void>;
waitForClientClose: (test?: () => void, expire?: number) => Promise<void>;
Expand Down Expand Up @@ -138,6 +142,7 @@ export async function startTServer(
});

// create server and hook up for tracking operations
const pendingConnections: Context[] = [];
let pendingOperations = 0,
pendingCompletes = 0;
const server = await createServer(
Expand All @@ -146,6 +151,12 @@ export async function startTServer(
execute,
subscribe,
...options,
onConnect: async (...args) => {
pendingConnections.push(args[0]);
const permitted = await options?.onConnect?.(...args);
emitter.emit('conn');
return permitted;
},
onOperation: async (ctx, msg, args, result) => {
pendingOperations++;
const maybeResult = await options?.onOperation?.(
Expand Down Expand Up @@ -251,6 +262,27 @@ export async function startTServer(
}
});
},
waitForConnect(test, expire) {
return new Promise((resolve) => {
function done() {
// the on connect listener below will be called before our listener, populating the queue
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const ctx = pendingConnections.shift()!;
test?.(ctx);
resolve();
}
if (pendingConnections.length > 0) {
return done();
}
emitter.once('conn', done);
if (expire) {
setTimeout(() => {
emitter.off('conn', done); // expired
resolve();
}, expire);
}
});
},
waitForOperation(test, expire) {
return new Promise((resolve) => {
function done() {
Expand Down

0 comments on commit 33f210c

Please sign in to comment.