Skip to content
This repository has been archived by the owner on Apr 14, 2023. It is now read-only.

Commit

Permalink
feat(NA): added support into server executor for graphql-js subscribe.
Browse files Browse the repository at this point in the history
docs(NA): changelog update.

docs(NA): changelog update.

docs(NA): changelog update.

feat(NA): add subscribe from graphql.

fix(NA): check for connection state inside sendMessage.

fix(NA): merge executeFromSubscribe with executeFromExecute.

feat(NA): changelog update.

fix(NA): clear immediate action on unsubscribe.

fix(NA): removed typo from condition.
  • Loading branch information
mistic committed May 12, 2017
1 parent 6b33b99 commit 23cc224
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 11 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

- Enabled Greenkeeper and updated dependencies, includes major version bump of ws [PR #90](https://github.com/apollographql/subscriptions-transport-ws/pull/90)

### 0.6.0
- Protocol update to support queries, mutations and also subscriptions. [PR #108](https://github.com/apollographql/subscriptions-transport-ws/pull/108)
- Added support in the server for GraphQL Executor. [PR #108](https://github.com/apollographql/subscriptions-transport-ws/pull/108)
- Added support in the server executor for `graphql-js subscribe`. [PR #846](https://github.com/graphql/graphql-js/pull/846)

### 0.5.5

- Remove dependency on `graphql-tag/printer` per [graphql-tag#54](https://github.com/apollographql/graphql-tag/issues/54) [PR #98](https://github.com/apollographql/subscriptions-transport-ws/pull/98)
Expand Down
66 changes: 56 additions & 10 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,19 @@ export type ExecuteFunction = (
operationName?: string,
) => Promise<ExecutionResult>;

export type SubscribeFunction = (
schema: GraphQLSchema,
document: DocumentNode,
rootValue?: any,
contextValue?: any,
variableValues?: {[key: string]: any},
operationName?: string,
) => AsyncIterator<ExecutionResult>;

export interface Executor {
execute?: ExecuteFunction;
executeReactive?: ExecuteReactiveFunction;
subscribe?: SubscribeFunction;
}

export interface ServerOptions {
Expand All @@ -87,7 +97,7 @@ export interface ServerOptions {
}

class ExecuteAdapters {
public static executeFromExecute(execute: ExecuteFunction): ExecuteReactiveFunction {
public static executeFromExecute(execute: ExecuteFunction, subscribeFn?: SubscribeFunction): ExecuteReactiveFunction {
return (schema: GraphQLSchema,
document: DocumentNode,
rootValue?: any,
Expand All @@ -97,19 +107,55 @@ class ExecuteAdapters {
) => ({
subscribe: (observer) => {
if (ExecuteAdapters.isASubscriptionOperation(document, operationName)) {
observer.error(new Error('Subscriptions are not supported'));
if (!subscribeFn) {
observer.error(new Error('Subscriptions are not supported'));
}

let subscription: any;
let immediateAction: any;

try {
subscription = subscribeFn(schema, document, rootValue, contextValue, variableValues, operationName);

immediateAction = setImmediate(async () => {
try {
for await (let result of subscription) {
observer.next(result);
}
} catch (e) {
observer.error(e);
}

observer.complete();
});
} catch (e) {
observer.error(e);
}

return {
unsubscribe: () => {
if (subscription) {
subscription.return()
}

if (immediateAction) {
clearImmediate(immediateAction);
}
},
};

} else {
execute(schema, document, rootValue, contextValue, variableValues, operationName)
.then((result: ExecutionResult) => {
observer.next(result);
observer.complete();
},
(e) => observer.error(e));
}

return {
unsubscribe: () => { /* Promises cannot be canceled */ },
};
return {
unsubscribe: () => { /* Promises cannot be canceled */ },
};
}
},
});
}
Expand Down Expand Up @@ -256,8 +302,8 @@ export class SubscriptionServer {
throw new Error('Must provide `subscriptionManager` or `executor` and not both.');
}

if (executor && !executor.execute && !executor.executeReactive) {
throw new Error('Must define at least execute or executeReactive function');
if (executor && !executor.execute && !executor.executeReactive && !executor.subscribe) {
throw new Error('Must define at least execute, executeReactive or subscribe function');
}

if (executor && !schema) {
Expand All @@ -275,7 +321,7 @@ export class SubscriptionServer {
} else if ( executor.executeReactive ) {
this.execute = executor.executeReactive.bind(executor);
} else {
this.execute = ExecuteAdapters.executeFromExecute(executor.execute.bind(executor));
this.execute = ExecuteAdapters.executeFromExecute(executor.execute.bind(executor), executor.subscribe.bind(executor));
}
}

Expand Down Expand Up @@ -549,7 +595,7 @@ export class SubscriptionServer {
payload,
});

if (parsedMessage) {
if (parsedMessage && connectionContext.socket.readyState === WebSocket.OPEN) {
connectionContext.socket.send(JSON.stringify(parsedMessage));
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ const eventsOptions = {

const onConnectErrorOptions = {
subscriptionManager,
onConnect: (msg: any, connection:any, connectionContext: any) => {
onConnect: (msg: any, connection: any, connectionContext: any) => {
connectionContext.isLegacy = true;
throw new Error('Error');
},
Expand Down

0 comments on commit 23cc224

Please sign in to comment.