diff --git a/PROTOCOL.md b/PROTOCOL.md
index c21c8c45..b3c8f83c 100644
--- a/PROTOCOL.md
+++ b/PROTOCOL.md
@@ -195,11 +195,19 @@ _The client and the server has already gone through [successful connection initi
1. _Client_ generates a unique ID for the following operation
1. _Client_ dispatches the `Subscribe` message with the generated ID through the `id` field and the requested operation passed through the `payload` field
_All future communication is linked through this unique ID_
-1. _Server_ triggers the `onSubscribe` callback, if specified, and uses the returned `ExecutionArgs` for the operation
-1. _Server_ validates the request and executes the single result GraphQL operation
-1. _Server_ dispatches the `Next` message with the execution result
+1. _Server_ triggers the `onSubscribe` callback
+
+ - If `ExecutionArgs` are **not** returned, the arguments will be formed and validated using the payload
+ - If `ExecutionArgs` are returned, they will be used directly
+
+1. _Server_ executes the single result GraphQL operation using the arguments provided above
+1. _Server_ triggers the `onNext` callback
+
+ - If `ExecutionResult` is **not** returned, the direct result from the operation will be dispatched with the `Next` message
+ - If `ExecutionResult` is returned, it will be dispatched with the `Next` message
+
+1. _Server_ triggers the `onComplete` callback
1. _Server_ dispatches the `Complete` message indicating that the execution has completed
-1. _Server_ triggers the `onComplete` callback, if specified
### Streaming operation
@@ -208,14 +216,24 @@ _The client and the server has already gone through [successful connection initi
_The client and the server has already gone through [successful connection initialisation](#successful-connection-initialisation)._
1. _Client_ generates a unique ID for the following operation
-1. _Client_ dispatches the `Subscribe` message with the generated ID through the `id` field and the requested streaming operation passed through the `payload` field
+1. _Client_ dispatches the `Subscribe` message with the generated ID through the `id` field and the requested operation passed through the `payload` field
_All future communication is linked through this unique ID_
-1. _Server_ triggers the `onSubscribe` callback, if specified, and uses the returned `ExecutionArgs` for the operation
-1. _Server_ validates the request and executes the streaming GraphQL operation
+1. _Server_ triggers the `onSubscribe` callback
+
+ - If `ExecutionArgs` are **not** returned, the arguments will be formed and validated using the payload
+ - If `ExecutionArgs` are returned, they will be used directly
+
+1. _Server_ executes the streaming GraphQL operation using the arguments provided above
1. _Server_ checks if the generated ID is unique across active streaming subscriptions
+
- If **not** unique, the _server_ will close the socket with the event `4409: Subscriber for already exists`
- If unique, continue...
-1. _Server_ dispatches `Next` messages for every event in the source stream
+
+1. _Server_ triggers the `onNext` callback
+
+ - If `ExecutionResult` is **not** returned, the direct events from the source stream will be dispatched with the `Next` message
+ - If `ExecutionResult` is returned, it will be dispatched with the `Next` message instead of every event from the source stram
+
1. - _Client_ stops the subscription by dispatching a `Complete` message
- _Server_ completes the source stream
_or_
diff --git a/README.md b/README.md
index 398e2aef..d538a128 100644
--- a/README.md
+++ b/README.md
@@ -400,29 +400,20 @@ const server = https.createServer(function weServeSocketsOnly(_, res) {
createServer(
{
schema,
- execute: async (args) => {
- console.log('Execute', args);
- const result = await execute(args);
- console.debug('Execute result', result);
- return result;
- },
- subscribe: async (args) => {
- console.log('Subscribe', args);
- const subscription = await subscribe(args);
- // NOTE: `subscribe` can sometimes return a single result, I dont consider it here for sake of simplicity
- return (async function* () {
- for await (const result of subscription) {
- console.debug('Subscribe yielded result', { args, result });
- yield result;
- }
- })();
- },
onConnect: (ctx) => {
console.log('Connect', ctx);
- return true; // default behaviour - permit all connection attempts
+ },
+ onSubscribe: (ctx, msg) => {
+ console.log('Subscribe', { ctx, msg });
+ },
+ onNext: (ctx, msg, args, result) => {
+ console.debug('Next', { ctx, msg, args, result });
+ },
+ onError: (ctx, msg, errors) => {
+ console.error('Error', { ctx, msg, errors });
},
onComplete: (ctx, msg) => {
- console.debug('Complete', { ctx, msg });
+ console.log('Complete', { ctx, msg });
},
},
{
@@ -498,25 +489,58 @@ server.listen(443);
-Server usage with a custom GraphQL context
+Server usage with custom static GraphQL arguments
```typescript
-import { execute, subscribe } from 'graphql';
+import { validate, execute, subscribe } from 'graphql';
import { createServer } from 'graphql-transport-ws';
-import { schema } from 'my-graphql-schema';
+import { schema, roots, getStaticContext } from 'my-graphql';
createServer(
{
+ context: getStaticContext(),
schema,
+ roots,
execute,
subscribe,
- onSubscribe: (ctx, msg, args) => {
- return [
- {
- ...args,
- contextValue: getCustomContext(ctx, msg, args),
- },
- ];
+ },
+ {
+ server,
+ path: '/graphql',
+ },
+);
+```
+
+
+
+
+Server usage with custom dynamic GraphQL arguments and validation
+
+```typescript
+import { parse, validate, execute, subscribe } from 'graphql';
+import { createServer } from 'graphql-transport-ws';
+import { schema, getDynamicContext, myValidationRules } from 'my-graphql';
+
+createServer(
+ {
+ execute,
+ subscribe,
+ onSubscribe: (ctx, msg) => {
+ const args = {
+ schema,
+ contextValue: getDynamicContext(ctx, msg),
+ operationName: msg.payload.operationName,
+ document: parse(msg.payload.operationName),
+ variableValues: msg.payload.variables,
+ };
+
+ // dont forget to validate when returning custom execution args!
+ const errors = validate(args.schema, args.document, myValidationRules);
+ if (errors.length > 0) {
+ return errors; // return `GraphQLError[]` to send `ErrorMessage` and stop subscription
+ }
+
+ return args;
},
},
{
@@ -528,6 +552,88 @@ createServer(
+
+Server and client usage with persisted queries
+
+```typescript
+// 🛸 server
+
+import { parse, execute, subscribe } from 'graphql';
+import { createServer } from 'graphql-transport-ws';
+import { schema } from 'my-graphql-schema';
+
+type QueryID = string;
+
+const queriesStore: Record = {
+ iWantTheGreetings: {
+ schema, // you may even provide different schemas in the queries store
+ document: parse('subscription Greetings { greetings }'),
+ },
+};
+
+createServer(
+ {
+ execute,
+ subscribe,
+ onSubscribe: (_ctx, msg) => {
+ // search using `SubscriptionPayload.query` as QueryID
+ // check the client example below for better understanding
+ const hit = queriesStore[msg.payload.query];
+ if (hit) {
+ return {
+ ...hit,
+ variableValues: msg.payload.variables, // use the variables from the client
+ };
+ }
+ // if no hit, execute as usual
+ return {
+ schema,
+ operationName: msg.payload.operationName,
+ document: parse(msg.payload.operationName),
+ variableValues: msg.payload.variables,
+ };
+ },
+ },
+ {
+ server,
+ path: '/graphql',
+ },
+);
+```
+
+```typescript
+// 📺 client
+
+import { createClient } from 'graphql-transport-ws';
+
+const client = createClient({
+ url: 'wss://persisted.graphql/queries',
+});
+
+(async () => {
+ const onNext = () => {
+ /**/
+ };
+
+ await new Promise((resolve, reject) => {
+ client.subscribe(
+ {
+ query: 'iWantTheGreetings',
+ },
+ {
+ next: onNext,
+ error: reject,
+ complete: resolve,
+ },
+ );
+ });
+
+ expect(onNext).toBeCalledTimes(5); // greetings in 5 languages
+})();
+```
+
+
+
## [Documentation](docs/)
Check the [docs folder](docs/) out for [TypeDoc](https://typedoc.org) generated documentation.
diff --git a/docs/interfaces/_server_.serveroptions.md b/docs/interfaces/_server_.serveroptions.md
index 73d3449d..77e3b075 100644
--- a/docs/interfaces/_server_.serveroptions.md
+++ b/docs/interfaces/_server_.serveroptions.md
@@ -15,15 +15,16 @@
* [connectionInitWaitTimeout](_server_.serveroptions.md#connectioninitwaittimeout)
* [context](_server_.serveroptions.md#context)
* [execute](_server_.serveroptions.md#execute)
-* [formatExecutionResult](_server_.serveroptions.md#formatexecutionresult)
* [keepAlive](_server_.serveroptions.md#keepalive)
* [onComplete](_server_.serveroptions.md#oncomplete)
* [onConnect](_server_.serveroptions.md#onconnect)
+* [onError](_server_.serveroptions.md#onerror)
+* [onNext](_server_.serveroptions.md#onnext)
+* [onOperation](_server_.serveroptions.md#onoperation)
* [onSubscribe](_server_.serveroptions.md#onsubscribe)
* [roots](_server_.serveroptions.md#roots)
* [schema](_server_.serveroptions.md#schema)
* [subscribe](_server_.serveroptions.md#subscribe)
-* [validationRules](_server_.serveroptions.md#validationrules)
## Properties
@@ -31,8 +32,6 @@
• `Optional` **connectionInitWaitTimeout**: undefined \| number
-**`default`** 3 * 1000 (3 seconds)
-
The amount of time for which the
server will wait for `ConnectionInit` message.
@@ -43,37 +42,30 @@ has not sent the `ConnectionInit` message,
the server will terminate the socket by
dispatching a close event `4408: Connection initialisation timeout`
+**`default`** 3 * 1000 (3 seconds)
+
___
### context
-• `Optional` **context**: SubscriptionArgs[\"contextValue\"]
+• `Optional` **context**: unknown
A value which is provided to every resolver and holds
important contextual information like the currently
logged in user, or access to a database.
-Related operation context value will be injected to the
-`ExecutionArgs` BEFORE the `onSubscribe` callback.
+
+If you return from the `onSubscribe` callback, this
+context value will NOT be injected. You should add it
+in the returned `ExecutionArgs` from the callback.
___
### execute
-• **execute**: (args: ExecutionArgs) => Promise\ \| ExecutionResult> \| AsyncIterableIterator\ \| ExecutionResult
+• **execute**: (args: ExecutionArgs) => [OperationResult](../modules/_server_.md#operationresult)
Is the `execute` function from GraphQL which is
-used to execute the query/mutation operation.
-
-___
-
-### formatExecutionResult
-
-• `Optional` **formatExecutionResult**: [ExecutionResultFormatter](../modules/_server_.md#executionresultformatter)
-
-Format the operation execution results
-if the implementation requires an adjusted
-result. This formatter is run BEFORE the
-`onConnect` scoped formatter.
+used to execute the query and mutation operations.
___
@@ -92,17 +84,17 @@ ___
### onComplete
-• `Optional` **onComplete**: undefined \| (ctx: [Context](_server_.context.md), message: [CompleteMessage](_message_.completemessage.md)) => void
+• `Optional` **onComplete**: undefined \| (ctx: [Context](_server_.context.md), message: [CompleteMessage](_message_.completemessage.md)) => Promise\ \| void
The complete callback is executed after the
-operation has completed or the subscription
-has been closed.
+operation has completed right before sending
+the complete message to the client.
___
### onConnect
-• `Optional` **onConnect**: undefined \| (ctx: [Context](_server_.context.md)) => Promise\ \| boolean
+• `Optional` **onConnect**: undefined \| (ctx: [Context](_server_.context.md)) => Promise\ \| boolean \| void
Is the connection callback called when the
client requests the connection initialisation
@@ -110,7 +102,7 @@ through the message `ConnectionInit`. The message
payload (`connectionParams` on the client) is
present in the `Context.connectionParams`.
-- Returning `true` from the callback will
+- Returning `true` or nothing from the callback will
allow the client to connect.
- Returning `false` from the callback will
@@ -125,17 +117,79 @@ thrown `Error`.
___
+### onError
+
+• `Optional` **onError**: undefined \| (ctx: [Context](_server_.context.md), message: [ErrorMessage](_message_.errormessage.md), errors: readonly GraphQLError[]) => Promise\ \| readonly GraphQLError[] \| void
+
+Executed after an error occured right before it
+has been dispatched to the client.
+
+Use this callback to format the outgoing GraphQL
+errors before they reach the client.
+
+Returned result will be injected in the error message payload.
+
+___
+
+### onNext
+
+• `Optional` **onNext**: undefined \| (ctx: [Context](_server_.context.md), message: [NextMessage](_message_.nextmessage.md), args: ExecutionArgs, result: ExecutionResult) => Promise\ \| ExecutionResult \| void
+
+Executed after an operation has emitted a result right before
+that result has been sent to the client. Results from both
+single value and streaming operations will appear in this callback.
+
+Use this callback if you want to format the execution result
+before it reaches the client.
+
+Returned result will be injected in the next message payload.
+
+___
+
+### onOperation
+
+• `Optional` **onOperation**: undefined \| (ctx: [Context](_server_.context.md), message: [SubscribeMessage](_message_.subscribemessage.md), args: ExecutionArgs, result: [OperationResult](../modules/_server_.md#operationresult)) => Promise\<[OperationResult](../modules/_server_.md#operationresult) \| void> \| [OperationResult](../modules/_server_.md#operationresult) \| void
+
+Executed after the operation call resolves. For streaming
+operations, triggering this callback does not necessarely
+mean that there is already a result available - it means
+that the subscription process for the stream has resolved
+and that the client is now subscribed.
+
+The `OperationResult` argument is the result of operation
+execution. It can be an iterator or already value.
+
+If you want the single result and the events from a streaming
+operation, use the `onNext` callback.
+
+Use this callback to listen for subscribe operation and
+execution result manipulation.
+
+___
+
### onSubscribe
-• `Optional` **onSubscribe**: undefined \| (ctx: [Context](_server_.context.md), message: [SubscribeMessage](_message_.subscribemessage.md), args: Optional\) => Promise\<[ExecutionArgs, undefined \| [ExecutionResultFormatter](../modules/_server_.md#executionresultformatter)]> \| [ExecutionArgs, undefined \| [ExecutionResultFormatter](../modules/_server_.md#executionresultformatter)]
+• `Optional` **onSubscribe**: undefined \| (ctx: [Context](_server_.context.md), message: [SubscribeMessage](_message_.subscribemessage.md)) => Promise\ \| ExecutionArgs \| readonly GraphQLError[] \| void
-The subscribe callback executed before
-the actual operation execution. Useful
-for manipulating the execution arguments
-before the doing the operation. As a second
-item in the array, you can pass in a scoped
-execution result formatter. This formatter
-is run AFTER the root `formatExecutionResult`.
+The subscribe callback executed right after
+acknowledging the request before any payload
+processing has been performed.
+
+If you return `ExecutionArgs` from the callback,
+it will be used instead of trying to build one
+internally. In this case, you are responsible
+for providing a ready set of arguments which will
+be directly plugged in the operation execution.
+
+To report GraphQL errors simply return an array
+of them from the callback, they will be reported
+to the client through the error message.
+
+Useful for preparing the execution arguments
+following a custom logic. A typical use case are
+persisted queries, you can identify the query from
+the subscribe message and create the GraphQL operation
+execution args which are then returned by the function.
___
@@ -146,8 +200,10 @@ ___
The GraphQL root fields or resolvers to go
alongside the schema. Learn more about them
here: https://graphql.org/learn/execution/#root-fields-resolvers.
-Related operation root value will be injected to the
-`ExecutionArgs` BEFORE the `onSubscribe` callback.
+
+If you return from the `onSubscribe` callback, the
+root field value will NOT be injected. You should add it
+in the returned `ExecutionArgs` from the callback.
___
@@ -156,25 +212,17 @@ ___
• `Optional` **schema**: GraphQLSchema
The GraphQL schema on which the operations
-will be executed and validated against. If
-the schema is left undefined, one must be
-provided by in the resulting `ExecutionArgs`
-from the `onSubscribe` callback.
+will be executed and validated against.
+
+If the schema is left undefined, you're trusted to
+provide one in the returned `ExecutionArgs` from the
+`onSubscribe` callback.
___
### subscribe
-• **subscribe**: (args: ExecutionArgs) => Promise\ \| ExecutionResult>
+• **subscribe**: (args: ExecutionArgs) => [OperationResult](../modules/_server_.md#operationresult)
Is the `subscribe` function from GraphQL which is
used to execute the subscription operation.
-
-___
-
-### validationRules
-
-• `Optional` **validationRules**: readonly ValidationRule[]
-
-Custom validation rules overriding all
-validation rules defined by the GraphQL spec.
diff --git a/docs/modules/_message_.md b/docs/modules/_message_.md
index c6178acf..728ca393 100644
--- a/docs/modules/_message_.md
+++ b/docs/modules/_message_.md
@@ -24,6 +24,12 @@
* [Message](_message_.md#message)
+### Functions
+
+* [isMessage](_message_.md#ismessage)
+* [parseMessage](_message_.md#parsemessage)
+* [stringifyMessage](_message_.md#stringifymessage)
+
## Type aliases
### Message
@@ -35,3 +41,57 @@
Name | Type | Default |
------ | ------ | ------ |
`T` | [MessageType](../enums/_message_.messagetype.md) | MessageType |
+
+## Functions
+
+### isMessage
+
+â–¸ **isMessage**(`val`: unknown): val is Message
+
+Checks if the provided value is a message.
+
+#### Parameters:
+
+Name | Type |
+------ | ------ |
+`val` | unknown |
+
+**Returns:** val is Message
+
+___
+
+### parseMessage
+
+â–¸ **parseMessage**(`data`: unknown): [Message](_message_.md#message)
+
+Parses the raw websocket message data to a valid message.
+
+#### Parameters:
+
+Name | Type |
+------ | ------ |
+`data` | unknown |
+
+**Returns:** [Message](_message_.md#message)
+
+___
+
+### stringifyMessage
+
+â–¸ **stringifyMessage**\(`msg`: [Message](_message_.md#message)\): string
+
+Stringifies a valid message ready to be sent through the socket.
+
+#### Type parameters:
+
+Name | Type |
+------ | ------ |
+`T` | [MessageType](../enums/_message_.messagetype.md) |
+
+#### Parameters:
+
+Name | Type |
+------ | ------ |
+`msg` | [Message](_message_.md#message)\ |
+
+**Returns:** string
diff --git a/docs/modules/_server_.md b/docs/modules/_server_.md
index f0525037..96f722e5 100644
--- a/docs/modules/_server_.md
+++ b/docs/modules/_server_.md
@@ -14,7 +14,7 @@
### Type aliases
-* [ExecutionResultFormatter](_server_.md#executionresultformatter)
+* [OperationResult](_server_.md#operationresult)
### Functions
@@ -22,9 +22,9 @@
## Type aliases
-### ExecutionResultFormatter
+### OperationResult
-Ƭ **ExecutionResultFormatter**: (ctx: [Context](../interfaces/_server_.context.md), result: ExecutionResult) => Promise\ \| ExecutionResult
+Ƭ **OperationResult**: Promise\ \| ExecutionResult> \| AsyncIterableIterator\ \| ExecutionResult
## Functions
diff --git a/src/message.ts b/src/message.ts
index 8249a5e7..b8b45c57 100644
--- a/src/message.ts
+++ b/src/message.ts
@@ -7,10 +7,10 @@
import { GraphQLError, ExecutionResult, DocumentNode } from 'graphql';
import {
isObject,
+ areGraphQLErrors,
hasOwnProperty,
hasOwnObjectProperty,
hasOwnStringProperty,
- hasOwnArrayProperty,
} from './utils';
/** Types of messages allowed to be sent by the client/server over the WS protocol. */
@@ -78,11 +78,11 @@ export type Message<
? CompleteMessage
: never;
-/** @ignore */
+/** Checks if the provided value is a message. */
export function isMessage(val: unknown): val is Message {
if (isObject(val)) {
// all messages must have the `type` prop
- if (!hasOwnProperty(val, 'type')) {
+ if (!hasOwnStringProperty(val, 'type')) {
return false;
}
// validate other properties depending on the `type`
@@ -102,7 +102,7 @@ export function isMessage(val: unknown): val is Message {
hasOwnObjectProperty(val, 'payload') &&
(!hasOwnProperty(val.payload, 'operationName') ||
hasOwnStringProperty(val.payload, 'operationName')) &&
- (hasOwnStringProperty(val.payload, 'query') || // string query
+ (hasOwnStringProperty(val.payload, 'query') || // string query or persisted query id
hasOwnObjectProperty(val.payload, 'query')) && // document node query
(!hasOwnProperty(val.payload, 'variables') ||
hasOwnObjectProperty(val.payload, 'variables'))
@@ -116,12 +116,7 @@ export function isMessage(val: unknown): val is Message {
hasOwnObjectProperty(val.payload, 'errors'))
);
case MessageType.Error:
- return (
- hasOwnStringProperty(val, 'id') &&
- // GraphQLError
- hasOwnArrayProperty(val, 'payload') &&
- val.payload.length > 0 // must be at least one error
- );
+ return hasOwnStringProperty(val, 'id') && areGraphQLErrors(val.payload);
case MessageType.Complete:
return hasOwnStringProperty(val, 'id');
default:
@@ -131,7 +126,7 @@ export function isMessage(val: unknown): val is Message {
return false;
}
-/** @ignore */
+/** Parses the raw websocket message data to a valid message. */
export function parseMessage(data: unknown): Message {
if (isMessage(data)) {
return data;
@@ -146,10 +141,7 @@ export function parseMessage(data: unknown): Message {
return message;
}
-/**
- * @ignore
- * Helps stringifying a valid message ready to be sent through the socket.
- */
+/** Stringifies a valid message ready to be sent through the socket. */
export function stringifyMessage(
msg: Message,
): string {
diff --git a/src/server.ts b/src/server.ts
index 05bec429..4e69ceb7 100644
--- a/src/server.ts
+++ b/src/server.ts
@@ -9,62 +9,68 @@ import * as WebSocket from 'ws';
import {
OperationTypeNode,
GraphQLSchema,
- ValidationRule,
- ExecutionResult,
ExecutionArgs,
parse,
validate,
getOperationAST,
GraphQLError,
SubscriptionArgs,
+ ExecutionResult,
} from 'graphql';
import { Disposable } from './types';
import { GRAPHQL_TRANSPORT_WS_PROTOCOL } from './protocol';
import {
Message,
MessageType,
+ stringifyMessage,
parseMessage,
SubscribeMessage,
+ NextMessage,
+ ErrorMessage,
CompleteMessage,
- stringifyMessage,
} from './message';
import {
- Optional,
isObject,
isAsyncIterable,
hasOwnObjectProperty,
hasOwnStringProperty,
+ areGraphQLErrors,
} from './utils';
import { ID } from './types';
-export type ExecutionResultFormatter = (
- ctx: Context,
- result: ExecutionResult,
-) => Promise | ExecutionResult;
+export type OperationResult =
+ | Promise | ExecutionResult>
+ | AsyncIterableIterator
+ | ExecutionResult;
export interface ServerOptions {
/**
* The GraphQL schema on which the operations
- * will be executed and validated against. If
- * the schema is left undefined, one must be
- * provided by in the resulting `ExecutionArgs`
- * from the `onSubscribe` callback.
+ * will be executed and validated against.
+ *
+ * If the schema is left undefined, you're trusted to
+ * provide one in the returned `ExecutionArgs` from the
+ * `onSubscribe` callback.
*/
schema?: GraphQLSchema;
/**
* A value which is provided to every resolver and holds
* important contextual information like the currently
* logged in user, or access to a database.
- * Related operation context value will be injected to the
- * `ExecutionArgs` BEFORE the `onSubscribe` callback.
+ *
+ * If you return from the `onSubscribe` callback, this
+ * context value will NOT be injected. You should add it
+ * in the returned `ExecutionArgs` from the callback.
*/
- context?: SubscriptionArgs['contextValue'];
+ context?: unknown;
/**
* The GraphQL root fields or resolvers to go
* alongside the schema. Learn more about them
* here: https://graphql.org/learn/execution/#root-fields-resolvers.
- * Related operation root value will be injected to the
- * `ExecutionArgs` BEFORE the `onSubscribe` callback.
+ *
+ * If you return from the `onSubscribe` callback, the
+ * root field value will NOT be injected. You should add it
+ * in the returned `ExecutionArgs` from the callback.
*/
roots?: {
[operation in OperationTypeNode]?: Record<
@@ -74,21 +80,37 @@ export interface ServerOptions {
};
/**
* Is the `execute` function from GraphQL which is
- * used to execute the query/mutation operation.
+ * used to execute the query and mutation operations.
*/
- execute: (
- args: ExecutionArgs,
- ) =>
- | Promise | ExecutionResult>
- | AsyncIterableIterator
- | ExecutionResult;
+ execute: (args: ExecutionArgs) => OperationResult;
/**
* Is the `subscribe` function from GraphQL which is
* used to execute the subscription operation.
*/
- subscribe: (
- args: ExecutionArgs,
- ) => Promise | ExecutionResult>;
+ subscribe: (args: ExecutionArgs) => OperationResult;
+ /**
+ * The amount of time for which the
+ * server will wait for `ConnectionInit` message.
+ *
+ * Set the value to `Infinity`, '', 0, null or undefined to skip waiting.
+ *
+ * If the wait timeout has passed and the client
+ * has not sent the `ConnectionInit` message,
+ * the server will terminate the socket by
+ * dispatching a close event `4408: Connection initialisation timeout`
+ *
+ * @default 3 * 1000 (3 seconds)
+ */
+ connectionInitWaitTimeout?: number;
+ /**
+ * The timout between dispatched keep-alive messages. Internally the lib
+ * uses the [WebSocket Ping and Pongs]((https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#Pings_and_Pongs_The_Heartbeat_of_WebSockets)) to check that the link between
+ * the clients and the server is operating and to prevent the link from being broken due to idling.
+ * Set to nullish value to disable.
+ *
+ * @default 12 * 1000 (12 seconds)
+ */
+ keepAlive?: number;
/**
* Is the connection callback called when the
* client requests the connection initialisation
@@ -96,7 +118,7 @@ export interface ServerOptions {
* payload (`connectionParams` on the client) is
* present in the `Context.connectionParams`.
*
- * - Returning `true` from the callback will
+ * - Returning `true` or nothing from the callback will
* allow the client to connect.
*
* - Returning `false` from the callback will
@@ -109,64 +131,94 @@ export interface ServerOptions {
* the `` is the message of the
* thrown `Error`.
*/
- onConnect?: (ctx: Context) => Promise | boolean;
+ onConnect?: (ctx: Context) => Promise | boolean | void;
/**
- * @default 3 * 1000 (3 seconds)
+ * The subscribe callback executed right after
+ * acknowledging the request before any payload
+ * processing has been performed.
*
- * The amount of time for which the
- * server will wait for `ConnectionInit` message.
+ * If you return `ExecutionArgs` from the callback,
+ * it will be used instead of trying to build one
+ * internally. In this case, you are responsible
+ * for providing a ready set of arguments which will
+ * be directly plugged in the operation execution.
*
- * Set the value to `Infinity`, '', 0, null or undefined to skip waiting.
+ * To report GraphQL errors simply return an array
+ * of them from the callback, they will be reported
+ * to the client through the error message.
*
- * If the wait timeout has passed and the client
- * has not sent the `ConnectionInit` message,
- * the server will terminate the socket by
- * dispatching a close event `4408: Connection initialisation timeout`
+ * Useful for preparing the execution arguments
+ * following a custom logic. A typical use case are
+ * persisted queries, you can identify the query from
+ * the subscribe message and create the GraphQL operation
+ * execution args which are then returned by the function.
*/
- connectionInitWaitTimeout?: number;
+ onSubscribe?: (
+ ctx: Context,
+ message: SubscribeMessage,
+ ) =>
+ | Promise
+ | ExecutionArgs
+ | readonly GraphQLError[]
+ | void;
/**
- * Custom validation rules overriding all
- * validation rules defined by the GraphQL spec.
+ * Executed after the operation call resolves. For streaming
+ * operations, triggering this callback does not necessarely
+ * mean that there is already a result available - it means
+ * that the subscription process for the stream has resolved
+ * and that the client is now subscribed.
+ *
+ * The `OperationResult` argument is the result of operation
+ * execution. It can be an iterator or already value.
+ *
+ * If you want the single result and the events from a streaming
+ * operation, use the `onNext` callback.
+ *
+ * Use this callback to listen for subscribe operation and
+ * execution result manipulation.
*/
- validationRules?: readonly ValidationRule[];
+ onOperation?: (
+ ctx: Context,
+ message: SubscribeMessage,
+ args: ExecutionArgs,
+ result: OperationResult,
+ ) => Promise | OperationResult | void;
/**
- * Format the operation execution results
- * if the implementation requires an adjusted
- * result. This formatter is run BEFORE the
- * `onConnect` scoped formatter.
+ * Executed after an error occured right before it
+ * has been dispatched to the client.
+ *
+ * Use this callback to format the outgoing GraphQL
+ * errors before they reach the client.
+ *
+ * Returned result will be injected in the error message payload.
*/
- formatExecutionResult?: ExecutionResultFormatter;
+ onError?: (
+ ctx: Context,
+ message: ErrorMessage,
+ errors: readonly GraphQLError[],
+ ) => Promise | readonly GraphQLError[] | void;
/**
- * The subscribe callback executed before
- * the actual operation execution. Useful
- * for manipulating the execution arguments
- * before the doing the operation. As a second
- * item in the array, you can pass in a scoped
- * execution result formatter. This formatter
- * is run AFTER the root `formatExecutionResult`.
+ * Executed after an operation has emitted a result right before
+ * that result has been sent to the client. Results from both
+ * single value and streaming operations will appear in this callback.
+ *
+ * Use this callback if you want to format the execution result
+ * before it reaches the client.
+ *
+ * Returned result will be injected in the next message payload.
*/
- onSubscribe?: (
+ onNext?: (
ctx: Context,
- message: SubscribeMessage,
- args: Optional,
- ) =>
- | Promise<[ExecutionArgs, ExecutionResultFormatter?]>
- | [ExecutionArgs, ExecutionResultFormatter?];
+ message: NextMessage,
+ args: ExecutionArgs,
+ result: ExecutionResult,
+ ) => Promise | ExecutionResult | void;
/**
* The complete callback is executed after the
- * operation has completed or the subscription
- * has been closed.
- */
- onComplete?: (ctx: Context, message: CompleteMessage) => void;
- /**
- * The timout between dispatched keep-alive messages. Internally the lib
- * uses the [WebSocket Ping and Pongs]((https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#Pings_and_Pongs_The_Heartbeat_of_WebSockets)) to check that the link between
- * the clients and the server is operating and to prevent the link from being broken due to idling.
- * Set to nullish value to disable.
- *
- * @default 12 * 1000 (12 seconds)
+ * operation has completed right before sending
+ * the complete message to the client.
*/
- keepAlive?: number;
+ onComplete?: (ctx: Context, message: CompleteMessage) => Promise | void;
}
export interface Context {
@@ -227,13 +279,14 @@ export function createServer(
roots,
execute,
subscribe,
- onConnect,
connectionInitWaitTimeout = 3 * 1000, // 3 seconds
- validationRules,
- formatExecutionResult,
+ keepAlive = 12 * 1000, // 12 seconds
+ onConnect,
onSubscribe,
+ onOperation,
+ onNext,
+ onError,
onComplete,
- keepAlive = 12 * 1000, // 12 seconds
} = options;
const webSocketServer = isWebSocketServer(websocketOptionsOrServer)
? websocketOptionsOrServer
@@ -246,8 +299,7 @@ export function createServer(
(Array.isArray(socket.protocol) &&
socket.protocol.indexOf(GRAPHQL_TRANSPORT_WS_PROTOCOL) === -1)
) {
- socket.close(1002, 'Protocol Error');
- return;
+ return socket.close(1002, 'Protocol Error');
}
const ctxRef: { current: Context } = {
@@ -296,7 +348,6 @@ export function createServer(
}
});
- // issue a ping to the client
socket.ping();
}
}, keepAlive);
@@ -341,33 +392,21 @@ export function createServer(
});
// Sends through a message only if the socket is open.
- function sendMessage(
+ async function sendMessage(
ctx: Context,
message: Message,
- callback?: (err?: Error) => void,
) {
- return new Promise((resolve, reject) => {
- if (ctx.socket.readyState === WebSocket.OPEN) {
- try {
- ctx.socket.send(stringifyMessage(message), (err) => {
- if (callback) callback(err);
- if (err) {
- return reject(err);
- }
- return resolve();
- });
- } catch (err) {
- reject(err);
- }
- } else {
- if (callback) callback();
- resolve();
- }
- });
+ if (ctx.socket.readyState === WebSocket.OPEN) {
+ return new Promise((resolve, reject) => {
+ ctx.socket.send(stringifyMessage(message), (err) =>
+ err ? reject(err) : resolve(),
+ );
+ });
+ }
}
function makeOnMessage(ctx: Context) {
- return async function (event: WebSocket.MessageEvent) {
+ return async function onMessage(event: WebSocket.MessageEvent) {
try {
const message = parseMessage(event.data);
switch (message.type) {
@@ -383,7 +422,7 @@ export function createServer(
if (onConnect) {
const permitted = await onConnect(ctx);
- if (!permitted) {
+ if (permitted === false) {
return ctx.socket.close(4403, 'Forbidden');
}
}
@@ -400,91 +439,131 @@ export function createServer(
return ctx.socket.close(4401, 'Unauthorized');
}
- const operation = message.payload;
- const document =
- typeof operation.query === 'string'
- ? parse(operation.query)
- : operation.query;
- const operationAST = getOperationAST(
- document,
- operation.operationName,
- );
- if (!operationAST) {
- throw new Error('Unable to get operation AST');
- }
-
- let execArgsMaybeSchema: Optional = {
- contextValue: context,
- schema,
- operationName: operation.operationName,
- document,
- variableValues: operation.variables,
+ const emit = {
+ next: async (result: ExecutionResult, args: ExecutionArgs) => {
+ let nextMessage: NextMessage = {
+ id: message.id,
+ type: MessageType.Next,
+ payload: result,
+ };
+ if (onNext) {
+ const maybeResult = await onNext(
+ ctx,
+ nextMessage,
+ args,
+ result,
+ );
+ if (maybeResult) {
+ nextMessage = {
+ ...nextMessage,
+ payload: maybeResult,
+ };
+ }
+ }
+ await sendMessage(ctx, nextMessage);
+ },
+ error: async (errors: readonly GraphQLError[]) => {
+ let errorMessage: ErrorMessage = {
+ id: message.id,
+ type: MessageType.Error,
+ payload: errors,
+ };
+ if (onError) {
+ const maybeResult = await onError(ctx, errorMessage, errors);
+ if (maybeResult) {
+ errorMessage = {
+ ...errorMessage,
+ payload: maybeResult,
+ };
+ }
+ }
+ await sendMessage(ctx, errorMessage);
+ },
+ complete: async () => {
+ const completeMessage: CompleteMessage = {
+ id: message.id,
+ type: MessageType.Complete,
+ };
+ await onComplete?.(ctx, completeMessage);
+ await sendMessage(ctx, completeMessage);
+ },
};
- // if roots are provided, inject the coresponding operation root
- if (roots) {
- execArgsMaybeSchema.rootValue = roots[operationAST.operation];
- }
+ let execArgs: ExecutionArgs;
+ const maybeExecArgsOrErrors = await onSubscribe?.(ctx, message);
+ if (maybeExecArgsOrErrors) {
+ if (areGraphQLErrors(maybeExecArgsOrErrors)) {
+ return await emit.error(maybeExecArgsOrErrors);
+ }
+ execArgs = maybeExecArgsOrErrors as ExecutionArgs; // because not graphql errors
+ } else {
+ if (!schema) {
+ // you either provide a schema dynamically through
+ // `onSubscribe` or you set one up during the server setup
+ return webSocketServer.emit(
+ 'error',
+ new Error('The GraphQL schema is not provided'),
+ );
+ }
- let onSubscribeFormatter: ExecutionResultFormatter | undefined;
- if (onSubscribe) {
- [execArgsMaybeSchema, onSubscribeFormatter] = await onSubscribe(
- ctx,
- message,
- execArgsMaybeSchema,
- );
- }
- if (!execArgsMaybeSchema.schema) {
- // not providing a schema is a fatal server error
- return webSocketServer.emit(
- 'error',
- new Error('The GraphQL schema is not provided'),
+ const { operationName, query, variables } = message.payload;
+ const document = typeof query === 'string' ? parse(query) : query;
+ execArgs = {
+ contextValue: context,
+ schema,
+ operationName,
+ document,
+ variableValues: variables,
+ };
+
+ const validationErrors = validate(
+ execArgs.schema,
+ execArgs.document,
);
+ if (validationErrors.length > 0) {
+ return await emit.error(validationErrors);
+ }
}
- // the execution arguments should be complete now
- const execArgs = execArgsMaybeSchema as ExecutionArgs;
-
- // validate
- const validationErrors = validate(
- execArgs.schema,
+ const operationAST = getOperationAST(
execArgs.document,
- validationRules,
+ execArgs.operationName,
);
- if (validationErrors.length > 0) {
- return await sendMessage(ctx, {
- id: message.id,
- type: MessageType.Error,
- payload: validationErrors,
- });
+ if (!operationAST) {
+ return await emit.error([
+ new GraphQLError('Unable to identify operation'),
+ ]);
}
- // formats the incoming result and emits it to the subscriber
- const emitResult = async (result: ExecutionResult) => {
- // use the root formater first
- if (formatExecutionResult) {
- result = await formatExecutionResult(ctx, result);
- }
- // then use the subscription specific formatter
- if (onSubscribeFormatter) {
- result = await onSubscribeFormatter(ctx, result);
- }
- await sendMessage(ctx, {
- id: message.id,
- type: MessageType.Next,
- payload: result,
- });
- };
+ // if you've provided your own root through
+ // `onSubscribe`, prefer that over the root's root
+ if (!execArgs.rootValue) {
+ execArgs.rootValue = roots?.[operationAST.operation];
+ }
- // perform
- let iterableOrResult;
+ // the execution arguments have been prepared
+ // perform the operation and act accordingly
+ let operationResult;
if (operationAST.operation === 'subscription') {
- iterableOrResult = await subscribe(execArgs);
+ operationResult = await subscribe(execArgs);
} else {
- // operationAST.operation === 'query' || 'mutation'
- iterableOrResult = await execute(execArgs);
+ // operation === 'query' || 'mutation'
+ operationResult = await execute(execArgs);
+ }
+
+ if (onOperation) {
+ const maybeResult = await onOperation(
+ ctx,
+ message,
+ execArgs,
+ operationResult,
+ );
+ if (maybeResult) {
+ operationResult = maybeResult;
+ }
}
- if (isAsyncIterable(iterableOrResult)) {
+
+ if (isAsyncIterable(operationResult)) {
/** multiple emitted results */
// iterable subscriptions are distinct on ID
@@ -494,57 +573,24 @@ export function createServer(
`Subscriber for ${message.id} already exists`,
);
}
- ctx.subscriptions[message.id] = iterableOrResult;
+ ctx.subscriptions[message.id] = operationResult;
- try {
- for await (const result of iterableOrResult) {
- await emitResult(result);
- }
- // source stream completed
- const completeMessage: CompleteMessage = {
- id: message.id,
- type: MessageType.Complete,
- };
- await sendMessage(ctx, completeMessage);
- if (onComplete) {
- onComplete(ctx, completeMessage);
- }
- } catch (err) {
- await sendMessage(ctx, {
- id: message.id,
- type: MessageType.Error,
- payload: [
- new GraphQLError(
- err instanceof Error
- ? err.message
- : new Error(err).message,
- ),
- ],
- });
- } finally {
- delete ctx.subscriptions[message.id];
+ // only case where this might fail is if the socket is broken
+ for await (const result of operationResult) {
+ await emit.next(result, execArgs);
}
+ await emit.complete();
+ delete ctx.subscriptions[message.id];
} else {
/** single emitted result */
- await emitResult(iterableOrResult);
-
- // resolved
- const completeMessage: CompleteMessage = {
- id: message.id,
- type: MessageType.Complete,
- };
- await sendMessage(ctx, completeMessage);
- if (onComplete) {
- onComplete(ctx, completeMessage);
- }
+ await emit.next(operationResult, execArgs);
+ await emit.complete();
}
break;
}
case MessageType.Complete: {
- if (ctx.subscriptions[message.id]) {
- await ctx.subscriptions[message.id].return?.();
- }
+ await ctx.subscriptions[message.id]?.return?.();
break;
}
default:
diff --git a/src/tests/client.ts b/src/tests/client.ts
index 1e04716e..51ca9fb9 100644
--- a/src/tests/client.ts
+++ b/src/tests/client.ts
@@ -8,7 +8,7 @@ import { Server } from '../server';
import { createClient, EventListener } from '../client';
// Just does nothing
-export function noop(): void {
+function noop(): void {
/**/
}
diff --git a/src/tests/server.ts b/src/tests/server.ts
index 7f4fbb33..19314842 100644
--- a/src/tests/server.ts
+++ b/src/tests/server.ts
@@ -1,7 +1,19 @@
import WebSocket from 'ws';
-import { parse, buildSchema, execute, subscribe } from 'graphql';
+import {
+ parse,
+ buildSchema,
+ execute,
+ subscribe,
+ GraphQLError,
+ ExecutionArgs,
+} from 'graphql';
import { GRAPHQL_TRANSPORT_WS_PROTOCOL } from '../protocol';
-import { MessageType, parseMessage, stringifyMessage } from '../message';
+import {
+ MessageType,
+ parseMessage,
+ stringifyMessage,
+ SubscribePayload,
+} from '../message';
import { startServer, url, schema, pong } from './fixtures/simple';
let forgottenDispose: (() => Promise) | undefined;
@@ -367,7 +379,7 @@ describe('Connect', () => {
});
});
- it('should acknowledge connection if not implemented or returning `true`', async () => {
+ it('should acknowledge connection if not implemented, returning `true` or nothing', async () => {
async function test() {
const client = await createTClient();
client.ws.send(
@@ -381,18 +393,26 @@ describe('Connect', () => {
}
// no implementation
- const [, dispose] = await makeServer();
+ let [, dispose] = await makeServer();
await test();
-
await dispose();
// returns true
- await makeServer({
+ [, dispose] = await makeServer({
onConnect: () => {
return true;
},
});
await test();
+ await dispose();
+
+ // returns nothing
+ await makeServer({
+ onConnect: () => {
+ /**/
+ },
+ });
+ await test();
});
it('should pass in the `connectionParams` through the context and have other flags correctly set', async (done) => {
@@ -575,16 +595,135 @@ describe('Subscribe', () => {
});
});
- it('should pick up the schema from `onSubscribe`', async () => {
+ it('should close the socket with errors thrown from any callback', async () => {
+ const error = new Error('Stop');
+
+ // onConnect
+ let [, dispose] = await makeServer({
+ onConnect: () => {
+ throw error;
+ },
+ });
+ const client = await createTClient();
+ client.ws.send(
+ stringifyMessage({
+ type: MessageType.ConnectionInit,
+ }),
+ );
+ await client.waitForClose((event) => {
+ expect(event.code).toBe(4400);
+ expect(event.reason).toBe(error.message);
+ expect(event.wasClean).toBeTruthy();
+ });
+ await dispose();
+
+ async function test(
+ payload: SubscribePayload = {
+ query: `query { getValue }`,
+ },
+ ) {
+ const client = await createTClient();
+ client.ws.send(
+ stringifyMessage({
+ type: MessageType.ConnectionInit,
+ }),
+ );
+
+ await client.waitForMessage(({ data }) => {
+ expect(parseMessage(data).type).toBe(MessageType.ConnectionAck);
+ client.ws.send(
+ stringifyMessage({
+ id: '1',
+ type: MessageType.Subscribe,
+ payload,
+ }),
+ );
+ });
+
+ await client.waitForClose((event) => {
+ expect(event.code).toBe(4400);
+ expect(event.reason).toBe(error.message);
+ expect(event.wasClean).toBeTruthy();
+ });
+ }
+
+ // onSubscribe
+ [, dispose] = await makeServer({
+ onSubscribe: () => {
+ throw error;
+ },
+ });
+ await test();
+ await dispose();
+
+ [, dispose] = await makeServer({
+ onOperation: () => {
+ throw error;
+ },
+ });
+ await test();
+ await dispose();
+
+ // execute
+ [, dispose] = await makeServer({
+ execute: () => {
+ throw error;
+ },
+ });
+ await test();
+ await dispose();
+
+ // subscribe
+ [, dispose] = await makeServer({
+ subscribe: () => {
+ throw error;
+ },
+ });
+ await test({ query: 'subscription { greetings }' });
+ await dispose();
+
+ // onNext
+ [, dispose] = await makeServer({
+ onNext: () => {
+ throw error;
+ },
+ });
+ await test();
+ await dispose();
+
+ // onError
+ [, dispose] = await makeServer({
+ onError: () => {
+ throw error;
+ },
+ });
+ await test({ query: 'query { noExisto }' });
+ await dispose();
+
+ // onComplete
+ [, dispose] = await makeServer({
+ onComplete: () => {
+ throw error;
+ },
+ });
+ await test();
+ await dispose();
+ });
+
+ it('should directly use the execution arguments returned from `onSubscribe`', async () => {
+ const nopeArgs = {
+ schema,
+ operationName: 'Nope',
+ document: parse(`query Nope { getValue }`),
+ };
await makeServer({
schema: undefined,
- onSubscribe: (_ctx, _message, args) => {
- return [
- {
- ...args,
- schema,
- },
- ];
+ execute: (args) => {
+ expect(args).toBe(nopeArgs);
+ return execute(args);
+ },
+ onSubscribe: (_ctx, _message) => {
+ return nopeArgs;
},
});
@@ -603,16 +742,18 @@ describe('Subscribe', () => {
id: '1',
type: MessageType.Subscribe,
payload: {
- operationName: 'TestString',
- query: `query TestString {
- getValue
- }`,
+ operationName: 'Ping',
+ query: `subscribe Ping {
+ ping
+ }`,
variables: {},
},
}),
);
});
+ // because onsubscribe changed the request
+
await client.waitForMessage(({ data }) => {
expect(parseMessage(data)).toEqual({
id: '1',
@@ -626,6 +767,207 @@ describe('Subscribe', () => {
}, 30);
});
+ it('should report the graphql errors returned from `onSubscribe`', async () => {
+ await makeServer({
+ onSubscribe: (_ctx, _message) => {
+ return [new GraphQLError('Report'), new GraphQLError('Me')];
+ },
+ });
+
+ const client = await createTClient();
+
+ client.ws.send(
+ stringifyMessage({
+ type: MessageType.ConnectionInit,
+ }),
+ );
+
+ await client.waitForMessage(({ data }) => {
+ expect(parseMessage(data).type).toBe(MessageType.ConnectionAck);
+ client.ws.send(
+ stringifyMessage({
+ id: '1',
+ type: MessageType.Subscribe,
+ payload: {
+ operationName: 'Ping',
+ query: `subscribe Ping {
+ ping
+ }`,
+ variables: {},
+ },
+ }),
+ );
+ });
+
+ // because onsubscribe changed the request
+
+ await client.waitForMessage(({ data }) => {
+ expect(parseMessage(data)).toEqual({
+ id: '1',
+ type: MessageType.Error,
+ payload: [{ message: 'Report' }, { message: 'Me' }],
+ });
+ });
+
+ await client.waitForClose(() => {
+ fail('Shouldt have closed');
+ }, 30);
+ });
+
+ it('should use the execution result returned from `onNext`', async () => {
+ await makeServer({
+ onNext: (_ctx, _message) => {
+ return {
+ data: { hey: 'there' },
+ };
+ },
+ });
+
+ const client = await createTClient();
+
+ client.ws.send(
+ stringifyMessage({
+ type: MessageType.ConnectionInit,
+ }),
+ );
+
+ await client.waitForMessage(({ data }) => {
+ expect(parseMessage(data).type).toBe(MessageType.ConnectionAck);
+ client.ws.send(
+ stringifyMessage({
+ id: '1',
+ type: MessageType.Subscribe,
+ payload: {
+ query: `subscription {
+ greetings
+ }`,
+ variables: {},
+ },
+ }),
+ );
+ });
+
+ // because onnext changed the result
+
+ for (let i = 0; i < 5; i++) {
+ await client.waitForMessage(({ data }) => {
+ expect(parseMessage(data)).toEqual({
+ id: '1',
+ type: MessageType.Next,
+ payload: { data: { hey: 'there' } },
+ });
+ });
+ }
+ await client.waitForMessage(({ data }) => {
+ expect(parseMessage(data)).toEqual({
+ id: '1',
+ type: MessageType.Complete,
+ });
+ });
+
+ await client.waitForClose(() => {
+ fail('Shouldt have closed');
+ }, 30);
+ });
+
+ it('should use the graphql errors returned from `onError`', async () => {
+ await makeServer({
+ onError: (_ctx, _message) => {
+ return [new GraphQLError('Itsa me!'), new GraphQLError('Anda me!')];
+ },
+ });
+
+ const client = await createTClient();
+
+ client.ws.send(
+ stringifyMessage({
+ type: MessageType.ConnectionInit,
+ }),
+ );
+
+ await client.waitForMessage(({ data }) => {
+ expect(parseMessage(data).type).toBe(MessageType.ConnectionAck);
+ client.ws.send(
+ stringifyMessage({
+ id: '1',
+ type: MessageType.Subscribe,
+ payload: {
+ query: `query {
+ nogql
+ }`,
+ variables: {},
+ },
+ }),
+ );
+ });
+
+ // because onnext changed the result
+
+ await client.waitForMessage(({ data }) => {
+ expect(parseMessage(data)).toEqual({
+ id: '1',
+ type: MessageType.Error,
+ payload: [{ message: 'Itsa me!' }, { message: 'Anda me!' }],
+ });
+ });
+
+ await client.waitForClose(() => {
+ fail('Shouldt have closed');
+ }, 30);
+ });
+
+ it('should use the operation result returned from `onOperation`', async () => {
+ await makeServer({
+ onOperation: (_ctx, _message) => {
+ return (async function* () {
+ for (let i = 0; i < 3; i++) {
+ yield { data: { replaced: 'with me' } };
+ }
+ })();
+ },
+ });
+
+ const client = await createTClient();
+
+ client.ws.send(
+ stringifyMessage({
+ type: MessageType.ConnectionInit,
+ }),
+ );
+
+ await client.waitForMessage(({ data }) => {
+ expect(parseMessage(data).type).toBe(MessageType.ConnectionAck);
+ client.ws.send(
+ stringifyMessage({
+ id: '1',
+ type: MessageType.Subscribe,
+ payload: {
+ query: `query {
+ getValue
+ }`,
+ variables: {},
+ },
+ }),
+ );
+ });
+
+ // because onoperation changed the execution result
+
+ for (let i = 0; i < 3; i++) {
+ await client.waitForMessage(({ data }) => {
+ expect(parseMessage(data)).toEqual({
+ id: '1',
+ type: MessageType.Next,
+ payload: { data: { replaced: 'with me' } },
+ });
+ });
+ }
+
+ await client.waitForClose(() => {
+ fail('Shouldt have closed');
+ }, 30);
+ });
+
it('should execute the query of `string` type, "next" the result and then "complete"', async () => {
await makeServer({
schema,
@@ -996,6 +1338,55 @@ describe('Subscribe', () => {
expect(event.wasClean).toBeTruthy();
});
});
+
+ it('should support persisted queries', async () => {
+ const queriesStore: Record = {
+ iWantTheValue: {
+ schema,
+ document: parse('query GetValue { getValue }'),
+ },
+ };
+
+ await makeServer({
+ onSubscribe: (_ctx, msg) => {
+ // search using `SubscriptionPayload.query` as QueryID
+ // check the client example below for better understanding
+ const hit = queriesStore[msg.payload.query as string];
+ return {
+ ...hit,
+ variableValues: msg.payload.variables, // use the variables from the client
+ };
+ },
+ });
+
+ const client = await createTClient();
+ client.ws.send(
+ stringifyMessage({
+ type: MessageType.ConnectionInit,
+ }),
+ );
+
+ await client.waitForMessage(({ data }) => {
+ expect(parseMessage(data).type).toBe(MessageType.ConnectionAck);
+ client.ws.send(
+ stringifyMessage({
+ id: '1',
+ type: MessageType.Subscribe,
+ payload: {
+ query: 'iWantTheValue',
+ },
+ }),
+ );
+ });
+
+ await client.waitForMessage(({ data }) => {
+ expect(parseMessage(data)).toEqual({
+ id: '1',
+ type: MessageType.Next,
+ payload: { data: { getValue: 'value' } },
+ });
+ });
+ });
});
describe('Keep-Alive', () => {
diff --git a/src/utils.ts b/src/utils.ts
index f593b935..4f95081c 100644
--- a/src/utils.ts
+++ b/src/utils.ts
@@ -3,24 +3,28 @@
* utils
*
*/
-
-export type Optional = Pick> &
- Partial>;
+import { GraphQLError } from 'graphql';
export function isObject(val: unknown): val is Record {
return typeof val === 'object' && val !== null;
}
-export function isArray(val: unknown): val is unknown[] {
- return typeof val === 'object' && val !== null && Array.isArray(val);
-}
-
export function isAsyncIterable(
val: unknown,
): val is AsyncIterableIterator {
return typeof Object(val)[Symbol.asyncIterator] === 'function';
}
+export function areGraphQLErrors(obj: unknown): obj is GraphQLError[] {
+ return (
+ Array.isArray(obj) &&
+ // must be at least one error
+ obj.length > 0 &&
+ // error has at least a message
+ obj.every((ob) => 'message' in ob)
+ );
+}
+
export function hasOwnProperty<
O extends Record,
P extends PropertyKey
@@ -39,7 +43,9 @@ export function hasOwnArrayProperty<
O extends Record,
P extends PropertyKey
>(obj: O, prop: P): obj is O & Record