From 611c223471ed3aeee55a80e12d4cdc1a86176b15 Mon Sep 17 00:00:00 2001 From: Denis Badurina Date: Thu, 12 May 2022 16:52:30 +0200 Subject: [PATCH] fix(server): Should clean up subscription reservations on abrupt errors without relying on connection close --- src/__tests__/server.ts | 48 +++++++++- src/server.ts | 189 +++++++++++++++++++++------------------- 2 files changed, 145 insertions(+), 92 deletions(-) diff --git a/src/__tests__/server.ts b/src/__tests__/server.ts index 6703a9e0..a519e59b 100644 --- a/src/__tests__/server.ts +++ b/src/__tests__/server.ts @@ -8,7 +8,7 @@ import { ExecutionResult, GraphQLSchema, } from 'graphql'; -import { handleProtocols, makeServer } from '../server'; +import { Context, handleProtocols, makeServer } from '../server'; import { GRAPHQL_TRANSPORT_WS_PROTOCOL, CloseCode, @@ -1658,6 +1658,52 @@ describe('Subscribe', () => { await server.waitForComplete(); }); + + it('should clean up subscription reservations on abrupt errors without relying on close', async (done) => { + let currCtx: Context; + makeServer({ + connectionInitWaitTimeout: 0, // defaults to 3 seconds + schema, + execute: () => { + throw null; + }, + onSubscribe: (ctx) => { + currCtx = ctx; + }, + }).opened( + { + protocol: GRAPHQL_TRANSPORT_WS_PROTOCOL, + send: () => { + /**/ + }, + close: () => { + fail("Shouldn't have closed"); + }, + onMessage: async (cb) => { + await cb(stringifyMessage({ type: MessageType.ConnectionInit })); + + try { + // will throw because of execute impl + await cb( + stringifyMessage({ + id: '1', + type: MessageType.Subscribe, + payload: { + query: '{ getValue }', + }, + }), + ); + fail("Subscribe shouldn't have succeeded"); + } catch { + // we dont close the connection but still expect the subscriptions to clean up + expect(Object.entries(currCtx.subscriptions)).toHaveLength(0); + done(); + } + }, + }, + {}, + ); + }); }); describe('Disconnect/close', () => { diff --git a/src/server.ts b/src/server.ts index 5db2a357..684e1b9c 100644 --- a/src/server.ts +++ b/src/server.ts @@ -723,103 +723,110 @@ export function makeServer< }, }; - let execArgs: ExecutionArgs; - const maybeExecArgsOrErrors = await onSubscribe?.(ctx, message); - if (maybeExecArgsOrErrors) { - if (areGraphQLErrors(maybeExecArgsOrErrors)) - return await emit.error(maybeExecArgsOrErrors); - else if (Array.isArray(maybeExecArgsOrErrors)) - throw new Error( - 'Invalid return value from onSubscribe hook, expected an array of GraphQLError objects', + try { + let execArgs: ExecutionArgs; + const maybeExecArgsOrErrors = await onSubscribe?.(ctx, message); + if (maybeExecArgsOrErrors) { + if (areGraphQLErrors(maybeExecArgsOrErrors)) + return await emit.error(maybeExecArgsOrErrors); + else if (Array.isArray(maybeExecArgsOrErrors)) + throw new Error( + 'Invalid return value from onSubscribe hook, expected an array of GraphQLError objects', + ); + // not errors, is exec args + execArgs = maybeExecArgsOrErrors; + } else { + // you either provide a schema dynamically through + // `onSubscribe` or you set one up during the server setup + if (!schema) + throw new Error('The GraphQL schema is not provided'); + + const args = { + operationName: payload.operationName, + document: parse(payload.query), + variableValues: payload.variables, + }; + execArgs = { + ...args, + schema: + typeof schema === 'function' + ? await schema(ctx, message, args) + : schema, + }; + const validationErrors = (validate ?? graphqlValidate)( + execArgs.schema, + execArgs.document, ); - // not errors, is exec args - execArgs = maybeExecArgsOrErrors; - } else { - // you either provide a schema dynamically through - // `onSubscribe` or you set one up during the server setup - if (!schema) - throw new Error('The GraphQL schema is not provided'); - - const args = { - operationName: payload.operationName, - document: parse(payload.query), - variableValues: payload.variables, - }; - execArgs = { - ...args, - schema: - typeof schema === 'function' - ? await schema(ctx, message, args) - : schema, - }; - const validationErrors = (validate ?? graphqlValidate)( - execArgs.schema, + if (validationErrors.length > 0) + return await emit.error(validationErrors); + } + + const operationAST = getOperationAST( execArgs.document, + execArgs.operationName, ); - if (validationErrors.length > 0) - return await emit.error(validationErrors); - } - - const operationAST = getOperationAST( - execArgs.document, - execArgs.operationName, - ); - if (!operationAST) - return await emit.error([ - new GraphQLError('Unable to identify operation'), - ]); - - // if `onSubscribe` didnt specify a rootValue, inject one - if (!('rootValue' in execArgs)) - execArgs.rootValue = roots?.[operationAST.operation]; - - // if `onSubscribe` didn't specify a context, inject one - if (!('contextValue' in execArgs)) - execArgs.contextValue = - typeof context === 'function' - ? await context(ctx, message, execArgs) - : context; - - // the execution arguments have been prepared - // perform the operation and act accordingly - let operationResult; - if (operationAST.operation === 'subscription') - operationResult = await (subscribe ?? graphqlSubscribe)(execArgs); - // operation === 'query' || 'mutation' - else operationResult = await (execute ?? graphqlExecute)(execArgs); - - const maybeResult = await onOperation?.( - ctx, - message, - execArgs, - operationResult, - ); - if (maybeResult) operationResult = maybeResult; - - if (isAsyncIterable(operationResult)) { - /** multiple emitted results */ - if (!(id in ctx.subscriptions)) { - // subscription was completed/canceled before the operation settled - if (isAsyncGenerator(operationResult)) - operationResult.return(undefined); - } else { - ctx.subscriptions[id] = operationResult; - for await (const result of operationResult) { - await emit.next(result, execArgs); + if (!operationAST) + return await emit.error([ + new GraphQLError('Unable to identify operation'), + ]); + + // if `onSubscribe` didnt specify a rootValue, inject one + if (!('rootValue' in execArgs)) + execArgs.rootValue = roots?.[operationAST.operation]; + + // if `onSubscribe` didn't specify a context, inject one + if (!('contextValue' in execArgs)) + execArgs.contextValue = + typeof context === 'function' + ? await context(ctx, message, execArgs) + : context; + + // the execution arguments have been prepared + // perform the operation and act accordingly + let operationResult; + if (operationAST.operation === 'subscription') + operationResult = await (subscribe ?? graphqlSubscribe)( + execArgs, + ); + // operation === 'query' || 'mutation' + else + operationResult = await (execute ?? graphqlExecute)(execArgs); + + const maybeResult = await onOperation?.( + ctx, + message, + execArgs, + operationResult, + ); + if (maybeResult) operationResult = maybeResult; + + if (isAsyncIterable(operationResult)) { + /** multiple emitted results */ + if (!(id in ctx.subscriptions)) { + // subscription was completed/canceled before the operation settled + if (isAsyncGenerator(operationResult)) + operationResult.return(undefined); + } else { + ctx.subscriptions[id] = operationResult; + for await (const result of operationResult) { + await emit.next(result, execArgs); + } } + } else { + /** single emitted result */ + // if the client completed the subscription before the single result + // became available, he effectively canceled it and no data should be sent + if (id in ctx.subscriptions) + await emit.next(operationResult, execArgs); } - } else { - /** single emitted result */ - // if the client completed the subscription before the single result - // became available, he effectively canceled it and no data should be sent - if (id in ctx.subscriptions) - await emit.next(operationResult, execArgs); - } - // lack of subscription at this point indicates that the client - // completed the subscription, he doesnt need to be reminded - await emit.complete(id in ctx.subscriptions); - delete ctx.subscriptions[id]; + // lack of subscription at this point indicates that the client + // completed the subscription, he doesnt need to be reminded + await emit.complete(id in ctx.subscriptions); + } finally { + // whatever happens to the subscription, we finally want to get rid of the reservation + delete ctx.subscriptions[id]; + } return; } case MessageType.Complete: {