Skip to content

GraphQL Subscriptions #7227

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: alpha
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions src/Controllers/ParseGraphQLController.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ParseGraphQLController {
`ParseGraphQLController requires a "databaseController" to be instantiated.`
);
this.cacheController = params.cacheController;
this.isMounted = !!params.mountGraphQL;
this.isMounted = !!params.mountGraphQL || !!params.mountSubscriptions;
this.configCacheKey = GraphQLConfigKey;
}

Expand Down Expand Up @@ -145,7 +145,14 @@ class ParseGraphQLController {
if (!isValidSimpleObject(classConfig)) {
return 'it must be a valid object';
} else {
const { className, type = null, query = null, mutation = null, ...invalidKeys } = classConfig;
const {
className,
type = null,
query = null,
mutation = null,
subscription = null,
...invalidKeys
} = classConfig;
if (Object.keys(invalidKeys).length) {
return `"invalidKeys" [${Object.keys(invalidKeys)}] should not be present`;
}
Expand Down Expand Up @@ -287,6 +294,20 @@ class ParseGraphQLController {
return `"mutation" must be a valid object`;
}
}
if (subscription !== null) {
if (isValidSimpleObject(subscription)) {
const { enabled = null, alias = null, ...invalidKeys } = query;
if (Object.keys(invalidKeys).length) {
return `"subscription" contains invalid keys, [${Object.keys(invalidKeys)}]`;
} else if (enabled !== null && typeof enabled !== 'boolean') {
return `"subscription.enabled" must be a boolean`;
} else if (alias !== null && typeof alias !== 'string') {
return `"subscription.alias" must be a string`;
}
} else {
return `"subscription" must be a valid object`;
}
}
}
}
}
Expand Down Expand Up @@ -355,6 +376,11 @@ export interface ParseGraphQLClassConfig {
updateAlias: ?String,
destroyAlias: ?String,
};
/* The `subscription` object contains options for which class subscriptions are generated */
subscription: ?{
enabled: ?boolean,
alias: ?String,
};
}

export default ParseGraphQLController;
Expand Down
1 change: 1 addition & 0 deletions src/Controllers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ export function getParseGraphQLController(
): ParseGraphQLController {
return new ParseGraphQLController({
mountGraphQL: options.mountGraphQL,
mountSubscriptions: options.mountSubscriptions,
...controllerDeps,
});
}
Expand Down
24 changes: 24 additions & 0 deletions src/GraphQL/ParseGraphQLSchema.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import * as defaultGraphQLTypes from './loaders/defaultGraphQLTypes';
import * as parseClassTypes from './loaders/parseClassTypes';
import * as parseClassQueries from './loaders/parseClassQueries';
import * as parseClassMutations from './loaders/parseClassMutations';
import * as parseClassSubscriptions from './loaders/parseClassSubscriptions';
import * as defaultGraphQLQueries from './loaders/defaultGraphQLQueries';
import * as defaultGraphQLMutations from './loaders/defaultGraphQLMutations';
import ParseGraphQLController, { ParseGraphQLConfig } from '../Controllers/ParseGraphQLController';
Expand Down Expand Up @@ -58,6 +59,7 @@ const RESERVED_GRAPHQL_MUTATION_NAMES = [
'updateClass',
'deleteClass',
];
const RESERVED_GRAPHQL_SUBSCRIPTION_NAMES = [];

class ParseGraphQLSchema {
databaseController: DatabaseController;
Expand All @@ -66,6 +68,7 @@ class ParseGraphQLSchema {
log: any;
appId: string;
graphQLCustomTypeDefs: ?(string | GraphQLSchema | DocumentNode | GraphQLNamedType[]);
liveQueryClassNames: any;

constructor(
params: {
Expand All @@ -74,6 +77,7 @@ class ParseGraphQLSchema {
log: any,
appId: string,
graphQLCustomTypeDefs: ?(string | GraphQLSchema | DocumentNode | GraphQLNamedType[]),
liveQueryClassNames: any,
} = {}
) {
this.parseGraphQLController =
Expand All @@ -85,6 +89,7 @@ class ParseGraphQLSchema {
this.log = params.log || requiredParameter('You must provide a log instance!');
this.graphQLCustomTypeDefs = params.graphQLCustomTypeDefs;
this.appId = params.appId || requiredParameter('You must provide the appId!');
this.liveQueryClassNames = params.liveQueryClassNames;
}

async load() {
Expand Down Expand Up @@ -132,6 +137,9 @@ class ParseGraphQLSchema {
parseClassTypes.load(this, parseClass, parseClassConfig);
parseClassQueries.load(this, parseClass, parseClassConfig);
parseClassMutations.load(this, parseClass, parseClassConfig);
if (this.liveQueryClassNames && this.liveQueryClassNames.includes(parseClass.className)) {
parseClassSubscriptions.load(this, parseClass, parseClassConfig);
}
}
);

Expand Down Expand Up @@ -342,6 +350,22 @@ class ParseGraphQLSchema {
return field;
}

addGraphQLSubscription(fieldName, field, throwError = false, ignoreReserved = false) {
if (
(!ignoreReserved && RESERVED_GRAPHQL_SUBSCRIPTION_NAMES.includes(fieldName)) ||
this.graphQLSubscriptions[fieldName]
) {
const message = `Subscription ${fieldName} could not be added to the auto schema because it collided with an existing field.`;
if (throwError) {
throw new Error(message);
}
this.log.warn(message);
return undefined;
}
this.graphQLSubscriptions[fieldName] = field;
return field;
}

handleError(error) {
if (error instanceof Parse.Error) {
this.log.error('Parse error: ', error);
Expand Down
128 changes: 126 additions & 2 deletions src/GraphQL/ParseGraphQLServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import { SubscriptionServer } from 'subscriptions-transport-ws';
import { handleParseErrors, handleParseHeaders } from '../middlewares';
import requiredParameter from '../requiredParameter';
import defaultLogger from '../logger';
import { ParseLiveQueryServer } from '../LiveQuery/ParseLiveQueryServer';
import { ParseGraphQLSchema } from './ParseGraphQLSchema';
import ParseGraphQLController, { ParseGraphQLConfig } from '../Controllers/ParseGraphQLController';
import { WSSAdapter } from '../Adapters/WebSocketServer/WSSAdapter';

class ParseGraphQLServer {
parseGraphQLController: ParseGraphQLController;
Expand All @@ -29,6 +31,8 @@ class ParseGraphQLServer {
log: this.log,
graphQLCustomTypeDefs: this.config.graphQLCustomTypeDefs,
appId: this.parseServer.config.appId,
liveQueryClassNames:
this.parseServer.config.liveQuery && this.parseServer.config.liveQuery.classNames,
});
}

Expand Down Expand Up @@ -114,12 +118,132 @@ class ParseGraphQLServer {
}

createSubscriptions(server) {
const wssAdapter = new WSSAdapter();

new ParseLiveQueryServer(
undefined,
{
...this.parseServer.config.liveQueryServerOptions,
wssAdapter,
},
this.parseServer.config
);

SubscriptionServer.create(
{
execute,
subscribe,
onOperation: async (_message, params, webSocket) =>
Object.assign({}, params, await this._getGraphQLOptions(webSocket.upgradeReq)),
onConnect: async connectionParams => {
const keyPairs = {
applicationId: connectionParams['X-Parse-Application-Id'],
sessionToken: connectionParams['X-Parse-Session-Token'],
masterKey: connectionParams['X-Parse-Master-Key'],
installationId: connectionParams['X-Parse-Installation-Id'],
clientKey: connectionParams['X-Parse-Client-Key'],
javascriptKey: connectionParams['X-Parse-Javascript-Key'],
windowsKey: connectionParams['X-Parse-Windows-Key'],
restAPIKey: connectionParams['X-Parse-REST-API-Key'],
};

const listeners = [];

let connectResolve, connectReject;
let connectIsResolved = false;
const connectPromise = new Promise((resolve, reject) => {
connectResolve = resolve;
connectReject = reject;
});

const liveQuery = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really clever!

OPEN: 'OPEN',
readyState: 'OPEN',
on: () => {},
ping: () => {},
onmessage: () => {},
onclose: () => {},
send: message => {
message = JSON.parse(message);
if (message.op === 'connected') {
connectResolve();
connectIsResolved = true;
return;
} else if (message.op === 'error' && !connectIsResolved) {
connectReject({
code: message.code,
message: message.error,
});
return;
}
const requestId = message && message.requestId;
if (
requestId &&
typeof requestId === 'number' &&
requestId > 0 &&
requestId <= listeners.length
) {
const listener = listeners[requestId - 1];
if (listener) {
listener(message);
}
}
},
subscribe: async (query, sessionToken, listener) => {
await connectPromise;
listeners.push(listener);
liveQuery.onmessage(
JSON.stringify({
op: 'subscribe',
requestId: listeners.length,
query,
sessionToken,
})
);
},
unsubscribe: async listener => {
await connectPromise;
const index = listeners.indexOf(listener);
if (index > 0) {
liveQuery.onmessage(
JSON.stringify({
op: 'unsubscribe',
requestId: index + 1,
})
);
listeners[index] = null;
}
},
};

wssAdapter.onConnection(liveQuery);

liveQuery.onmessage(
JSON.stringify({
op: 'connect',
...keyPairs,
})
);

await connectPromise;

return { liveQuery, keyPairs };
},
onDisconnect: (_webSocket, context) => {
const { liveQuery } = context;

if (liveQuery) {
liveQuery.onclose();
}
},
onOperation: async (_message, params) => {
return {
...params,
schema: await this.parseGraphQLSchema.load(),
formatError: error => {
// Allow to console.log here to debug
return error;
},
};
},
},
{
server,
Expand Down
27 changes: 27 additions & 0 deletions src/GraphQL/loaders/defaultGraphQLTypes.js
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,29 @@ const loadArrayResult = (parseGraphQLSchema, parseClasses) => {
parseGraphQLSchema.graphQLTypes.push(ARRAY_RESULT);
};

const EVENT_KIND = new GraphQLEnumType({
name: 'EventKind',
description: 'The EventKind enum type is used in subscriptions to identify listened events.',
values: {
create: { value: 'create' },
enter: { value: 'enter' },
update: { value: 'update' },
leave: { value: 'leave' },
delete: { value: 'delete' },
all: { value: 'all' },
},
});

const EVENT_KIND_ATT = {
description: 'The event kind that was fired.',
type: new GraphQLNonNull(EVENT_KIND),
};

const EVENT_KINDS_ATT = {
description: 'The event kinds to be listened in the subscription.',
type: new GraphQLNonNull(new GraphQLList(new GraphQLNonNull(EVENT_KIND))),
};

const load = parseGraphQLSchema => {
parseGraphQLSchema.addGraphQLType(GraphQLUpload, true);
parseGraphQLSchema.addGraphQLType(ANY, true);
Expand Down Expand Up @@ -1266,6 +1289,7 @@ const load = parseGraphQLSchema => {
parseGraphQLSchema.addGraphQLType(PUBLIC_ACL, true);
parseGraphQLSchema.addGraphQLType(SUBQUERY_INPUT, true);
parseGraphQLSchema.addGraphQLType(SELECT_INPUT, true);
parseGraphQLSchema.addGraphQLType(EVENT_KIND, true);
};

export {
Expand Down Expand Up @@ -1358,6 +1382,9 @@ export {
USER_ACL,
ROLE_ACL,
PUBLIC_ACL,
EVENT_KIND,
EVENT_KIND_ATT,
EVENT_KINDS_ATT,
load,
loadArrayResult,
};
Loading