diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d246a767..3d478c0d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ - Enabled Greenkeeper and updated dependencies, includes major version bump of ws [PR #90](https://github.com/apollographql/subscriptions-transport-ws/pull/90) +### 0.6.0 +- Protocol update to support queries, mutations and also subscriptions. [PR #108](https://github.com/apollographql/subscriptions-transport-ws/pull/108) +- Added support in the server for GraphQL Executor. [PR #108](https://github.com/apollographql/subscriptions-transport-ws/pull/108) +- Added support in the server executor for `graphql-js subscribe`. [PR #846](https://github.com/graphql/graphql-js/pull/846) + ### 0.5.5 - Remove dependency on `graphql-tag/printer` per [graphql-tag#54](https://github.com/apollographql/graphql-tag/issues/54) [PR #98](https://github.com/apollographql/subscriptions-transport-ws/pull/98) diff --git a/src/server.ts b/src/server.ts index 1ccdf7461..d0c214b76 100644 --- a/src/server.ts +++ b/src/server.ts @@ -58,9 +58,19 @@ export type ExecuteFunction = ( operationName?: string, ) => Promise; +export type SubscribeFunction = ( + schema: GraphQLSchema, + document: DocumentNode, + rootValue?: any, + contextValue?: any, + variableValues?: {[key: string]: any}, + operationName?: string, +) => AsyncIterator; + export interface Executor { execute?: ExecuteFunction; executeReactive?: ExecuteReactiveFunction; + subscribe?: SubscribeFunction; } export interface ServerOptions { @@ -87,7 +97,7 @@ export interface ServerOptions { } class ExecuteAdapters { - public static executeFromExecute(execute: ExecuteFunction): ExecuteReactiveFunction { + public static executeFromExecute(execute: ExecuteFunction, subscribeFn?: SubscribeFunction): ExecuteReactiveFunction { return (schema: GraphQLSchema, document: DocumentNode, rootValue?: any, @@ -97,7 +107,43 @@ class ExecuteAdapters { ) => ({ subscribe: (observer) => { if (ExecuteAdapters.isASubscriptionOperation(document, operationName)) { - observer.error(new Error('Subscriptions are not supported')); + if (!subscribeFn) { + observer.error(new Error('Subscriptions are not supported')); + } + + let subscription: any; + let immediateAction: any; + + try { + subscription = subscribeFn(schema, document, rootValue, contextValue, variableValues, operationName); + + immediateAction = setImmediate(async () => { + try { + for await (let result of subscription) { + observer.next(result); + } + } catch (e) { + observer.error(e); + } + + observer.complete(); + }); + } catch (e) { + observer.error(e); + } + + return { + unsubscribe: () => { + if (subscription) { + subscription.return() + } + + if (immediateAction) { + clearImmediate(immediateAction); + } + }, + }; + } else { execute(schema, document, rootValue, contextValue, variableValues, operationName) .then((result: ExecutionResult) => { @@ -105,11 +151,11 @@ class ExecuteAdapters { observer.complete(); }, (e) => observer.error(e)); - } - return { - unsubscribe: () => { /* Promises cannot be canceled */ }, - }; + return { + unsubscribe: () => { /* Promises cannot be canceled */ }, + }; + } }, }); } @@ -256,8 +302,8 @@ export class SubscriptionServer { throw new Error('Must provide `subscriptionManager` or `executor` and not both.'); } - if (executor && !executor.execute && !executor.executeReactive) { - throw new Error('Must define at least execute or executeReactive function'); + if (executor && !executor.execute && !executor.executeReactive && !executor.subscribe) { + throw new Error('Must define at least execute, executeReactive or subscribe function'); } if (executor && !schema) { @@ -275,7 +321,7 @@ export class SubscriptionServer { } else if ( executor.executeReactive ) { this.execute = executor.executeReactive.bind(executor); } else { - this.execute = ExecuteAdapters.executeFromExecute(executor.execute.bind(executor)); + this.execute = ExecuteAdapters.executeFromExecute(executor.execute.bind(executor), executor.subscribe.bind(executor)); } } @@ -549,7 +595,7 @@ export class SubscriptionServer { payload, }); - if (parsedMessage) { + if (parsedMessage && connectionContext.socket.readyState === WebSocket.OPEN) { connectionContext.socket.send(JSON.stringify(parsedMessage)); } } diff --git a/src/test/tests.ts b/src/test/tests.ts index 3f46ead33..b17590886 100644 --- a/src/test/tests.ts +++ b/src/test/tests.ts @@ -153,7 +153,7 @@ const eventsOptions = { const onConnectErrorOptions = { subscriptionManager, - onConnect: (msg: any, connection:any, connectionContext: any) => { + onConnect: (msg: any, connection: any, connectionContext: any) => { connectionContext.isLegacy = true; throw new Error('Error'); },