From 10f6843481aef4e3eb0b061ed32b74012e0d9a4a Mon Sep 17 00:00:00 2001 From: Rob Richard Date: Tue, 9 Feb 2021 13:12:34 -0500 Subject: [PATCH 1/4] Support for experimental @defer & @stream (#583) --- integrationTests/ts/package.json | 2 +- package-lock.json | 6 +- package.json | 4 +- src/__tests__/http-test.ts | 524 +++++++++++++++++++++++++- src/__tests__/isAsyncIterable-test.ts | 20 + src/index.ts | 179 ++++++++- src/isAsyncIterable.ts | 8 + 7 files changed, 715 insertions(+), 28 deletions(-) create mode 100644 src/__tests__/isAsyncIterable-test.ts create mode 100644 src/isAsyncIterable.ts diff --git a/integrationTests/ts/package.json b/integrationTests/ts/package.json index a28fc835..85af3156 100644 --- a/integrationTests/ts/package.json +++ b/integrationTests/ts/package.json @@ -6,7 +6,7 @@ "dependencies": { "@types/node": "14.0.13", "express-graphql": "file:../express-graphql.tgz", - "graphql": "14.7.0", + "graphql": "15.4.0-experimental-stream-defer.1", "typescript-3.4": "npm:typescript@3.4.x", "typescript-3.5": "npm:typescript@3.5.x", "typescript-3.6": "npm:typescript@3.6.x", diff --git a/package-lock.json b/package-lock.json index 928abda2..70d8098f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -3055,9 +3055,9 @@ } }, "graphql": { - "version": "15.4.0", - "resolved": "https://registry.npmjs.org/graphql/-/graphql-15.4.0.tgz", - "integrity": "sha512-EB3zgGchcabbsU9cFe1j+yxdzKQKAbGUWRb13DsrsMN1yyfmmIq+2+L5MqVWcDCE4V89R5AyUOi7sMOGxdsYtA==", + "version": "15.4.0-experimental-stream-defer.1", + "resolved": "https://registry.npmjs.org/graphql/-/graphql-15.4.0-experimental-stream-defer.1.tgz", + "integrity": "sha512-zlGgY7aLlIofjO0CfTpCYK/tMccnj+5jvjnkTnW5qOxYhgEltuCvpMNYOJ67gz6L1flTIigt5BVEM8JExgtW3w==", "dev": true }, "graphql-language-service-interface": { diff --git a/package.json b/package.json index ef48d894..0807e9fe 100644 --- a/package.json +++ b/package.json @@ -85,7 +85,7 @@ "express": "4.17.1", "graphiql": "1.0.6", "graphiql-subscriptions-fetcher": "0.0.2", - "graphql": "15.4.0", + "graphql": "15.4.0-experimental-stream-defer.1", "mocha": "8.2.1", "multer": "1.4.2", "nyc": "15.1.0", @@ -102,6 +102,6 @@ "unfetch": "4.2.0" }, "peerDependencies": { - "graphql": "^14.7.0 || ^15.3.0" + "graphql": "^14.7.0 || ^15.3.0 || 15.4.0-experimental-stream-defer.1" } } diff --git a/src/__tests__/http-test.ts b/src/__tests__/http-test.ts index 6a2b3218..03f5094d 100644 --- a/src/__tests__/http-test.ts +++ b/src/__tests__/http-test.ts @@ -1,4 +1,5 @@ import zlib from 'zlib'; +import type http from 'http'; import type { Server as Restify } from 'restify'; import connect from 'connect'; @@ -25,6 +26,7 @@ import { } from 'graphql'; import { graphqlHTTP } from '../index'; +import { isAsyncIterable } from '../isAsyncIterable'; type Middleware = (req: any, res: any, next: () => void) => unknown; type Server = () => { @@ -80,6 +82,12 @@ function urlString(urlParams?: { [param: string]: string }): string { return string; } +function sleep(ms = 1) { + return new Promise((r) => { + setTimeout(r, ms); + }); +} + describe('GraphQL-HTTP tests for connect', () => { runTests(() => { const app = connect(); @@ -1027,6 +1035,60 @@ function runTests(server: Server) { errors: [{ message: 'Must provide query string.' }], }); }); + + it('allows for streaming results with @defer', async () => { + const app = server(); + const fakeFlush = sinon.fake(); + + app.use((_, res, next) => { + res.flush = fakeFlush; + next(); + }); + app.post( + urlString(), + graphqlHTTP({ + schema: TestSchema, + }), + ); + + const req = app + .request() + .post(urlString()) + .send({ + query: + '{ ...frag @defer(label: "deferLabel") } fragment frag on QueryRoot { test(who: "World") }', + }) + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); + + const response = await req; + expect(fakeFlush.callCount).to.equal(2); + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 26', + '', + '{"data":{},"hasNext":true}', + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 78', + '', + '{"data":{"test":"Hello World"},"path":[],"label":"deferLabel","hasNext":false}', + '', + '-----', + '', + ].join('\r\n'), + ); + }); }); describe('Pretty printing', () => { @@ -1109,6 +1171,62 @@ function runTests(server: Server) { expect(unprettyResponse.text).to.equal('{"data":{"test":"Hello World"}}'); }); + it('supports pretty printing async iterable requests', async () => { + const app = server(); + + app.post( + urlString(), + graphqlHTTP({ + schema: TestSchema, + pretty: true, + }), + ); + + const req = app + .request() + .post(urlString()) + .send({ + query: + '{ ...frag @defer } fragment frag on QueryRoot { test(who: "World") }', + }) + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); + + const response = await req; + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 35', + '', + ['{', ' "data": {},', ' "hasNext": true', '}'].join('\n'), + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 79', + '', + [ + '{', + ' "data": {', + ' "test": "Hello World"', + ' },', + ' "path": [],', + ' "hasNext": false', + '}', + ].join('\n'), + '', + '-----', + '', + ].join('\r\n'), + ); + }); }); it('will send request and response when using thunk', async () => { @@ -1260,6 +1378,108 @@ function runTests(server: Server) { }); }); + it('allows for custom error formatting in initial payload of async iterator', async () => { + const app = server(); + + app.post( + urlString(), + graphqlHTTP({ + schema: TestSchema, + customFormatErrorFn(error) { + return { message: 'Custom error format: ' + error.message }; + }, + }), + ); + + const req = app + .request() + .post(urlString()) + .send({ + query: + '{ thrower, ...frag @defer } fragment frag on QueryRoot { test(who: "World") }', + }) + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); + + const response = await req; + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 94', + '', + '{"errors":[{"message":"Custom error format: Throws!"}],"data":{"thrower":null},"hasNext":true}', + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 57', + '', + '{"data":{"test":"Hello World"},"path":[],"hasNext":false}', + '', + '-----', + '', + ].join('\r\n'), + ); + }); + + it('allows for custom error formatting in subsequent payloads of async iterator', async () => { + const app = server(); + + app.post( + urlString(), + graphqlHTTP({ + schema: TestSchema, + customFormatErrorFn(error) { + return { message: 'Custom error format: ' + error.message }; + }, + }), + ); + + const req = app + .request() + .post(urlString()) + .send({ + query: + '{ test(who: "World"), ...frag @defer } fragment frag on QueryRoot { thrower }', + }) + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); + + const response = await req; + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 46', + '', + '{"data":{"test":"Hello World"},"hasNext":true}', + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 105', + '', + '{"data":{"thrower":null},"path":[],"errors":[{"message":"Custom error format: Throws!"}],"hasNext":false}', + '', + '-----', + '', + ].join('\r\n'), + ); + }); + it('allows for custom error formatting to elaborate', async () => { const app = server(); @@ -2127,6 +2347,10 @@ function runTests(server: Server) { async customExecuteFn(args) { seenExecuteArgs = args; const result = await Promise.resolve(execute(args)); + // istanbul ignore if this test query will never return an async iterable + if (isAsyncIterable(result)) { + return result; + } return { ...result, data: { @@ -2166,6 +2390,215 @@ function runTests(server: Server) { '{"errors":[{"message":"I did something wrong"}]}', ); }); + + it('catches first error thrown from custom execute function that returns an AsyncIterable', async () => { + const app = server(); + + app.get( + urlString(), + graphqlHTTP(() => ({ + schema: TestSchema, + customExecuteFn() { + return { + [Symbol.asyncIterator]: () => ({ + next: () => Promise.reject(new Error('I did something wrong')), + }), + }; + }, + })), + ); + + const response = await app.request().get(urlString({ query: '{test}' })); + expect(response.status).to.equal(400); + expect(response.text).to.equal( + '{"errors":[{"message":"I did something wrong"}]}', + ); + }); + + it('catches subsequent errors thrown from custom execute function that returns an AsyncIterable', async () => { + const app = server(); + + app.get( + urlString(), + graphqlHTTP(() => ({ + schema: TestSchema, + async *customExecuteFn() { + await sleep(); + yield { + data: { + test2: 'Modification', + }, + hasNext: true, + }; + throw new Error('I did something wrong'); + }, + })), + ); + + const response = await app + .request() + .get(urlString({ query: '{test}' })) + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); + + expect(response.status).to.equal(200); + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 48', + '', + '{"data":{"test2":"Modification"},"hasNext":true}', + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 64', + '', + '{"errors":[{"message":"I did something wrong"}],"hasNext":false}', + '', + '-----', + '', + ].join('\r\n'), + ); + }); + + it('calls return on underlying async iterable when connection is closed', async () => { + const app = server(); + const fakeReturn = sinon.fake(); + + app.get( + urlString(), + graphqlHTTP(() => ({ + schema: TestSchema, + // custom iterable keeps yielding until return is called + customExecuteFn() { + let returned = false; + return { + [Symbol.asyncIterator]: () => ({ + next: async () => { + await sleep(); + if (returned) { + return { value: undefined, done: true }; + } + return { + value: { data: { test: 'Hello, World' }, hasNext: true }, + done: false, + }; + }, + return: () => { + returned = true; + fakeReturn(); + return Promise.resolve({ value: undefined, done: true }); + }, + }), + }; + }, + })), + ); + + let text = ''; + const request = app + .request() + .get(urlString({ query: '{test}' })) + .parse((res, cb) => { + res.on('data', (data) => { + text = `${text}${data.toString('utf8') as string}`; + ((res as unknown) as http.IncomingMessage).destroy(); + cb(new Error('Aborted connection'), null); + }); + }); + + try { + await request; + } catch (e: unknown) { + // ignore aborted error + } + // sleep to allow time for return function to be called + await sleep(2); + expect(text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 47', + '', + '{"data":{"test":"Hello, World"},"hasNext":true}', + '', + ].join('\r\n'), + ); + expect(fakeReturn.callCount).to.equal(1); + }); + + it('handles return function on async iterable that throws', async () => { + const app = server(); + + app.get( + urlString(), + graphqlHTTP(() => ({ + schema: TestSchema, + // custom iterable keeps yielding until return is called + customExecuteFn() { + let returned = false; + return { + [Symbol.asyncIterator]: () => ({ + next: async () => { + await sleep(); + if (returned) { + return { value: undefined, done: true }; + } + return { + value: { data: { test: 'Hello, World' }, hasNext: true }, + done: false, + }; + }, + return: () => { + returned = true; + return Promise.reject(new Error('Throws!')); + }, + }), + }; + }, + })), + ); + + let text = ''; + const request = app + .request() + .get(urlString({ query: '{test}' })) + .parse((res, cb) => { + res.on('data', (data) => { + text = `${text}${data.toString('utf8') as string}`; + ((res as unknown) as http.IncomingMessage).destroy(); + cb(new Error('Aborted connection'), null); + }); + }); + + try { + await request; + } catch (e: unknown) { + // ignore aborted error + } + // sleep to allow return function to be called + await sleep(2); + expect(text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 47', + '', + '{"data":{"test":"Hello, World"},"hasNext":true}', + '', + ].join('\r\n'), + ); + }); }); describe('Custom parse function', () => { @@ -2280,6 +2713,57 @@ function runTests(server: Server) { }); }); + it('allows for custom extensions in initial and subsequent payloads of async iterator', async () => { + const app = server(); + + app.post( + urlString(), + graphqlHTTP({ + schema: TestSchema, + extensions({ result }) { + return { preservedResult: { ...result } }; + }, + }), + ); + + const req = app + .request() + .post(urlString()) + .send({ + query: + '{ hello: test(who: "Rob"), ...frag @defer } fragment frag on QueryRoot { test(who: "World") }', + }) + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); + + const response = await req; + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 124', + '', + '{"data":{"hello":"Hello Rob"},"hasNext":true,"extensions":{"preservedResult":{"data":{"hello":"Hello Rob"},"hasNext":true}}}', + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 148', + '', + '{"data":{"test":"Hello World"},"path":[],"hasNext":false,"extensions":{"preservedResult":{"data":{"test":"Hello World"},"path":[],"hasNext":false}}}', + '', + '-----', + '', + ].join('\r\n'), + ); + }); + it('extension function may be async', async () => { const app = server(); @@ -2320,12 +2804,44 @@ function runTests(server: Server) { const response = await app .request() - .get(urlString({ query: '{test}', raw: '' })) - .set('Accept', 'text/html'); + .get( + urlString({ + query: + '{ hello: test(who: "Rob"), ...frag @defer } fragment frag on QueryRoot { test(who: "World") }', + raw: '', + }), + ) + .set('Accept', 'text/html') + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); expect(response.status).to.equal(200); - expect(response.type).to.equal('application/json'); - expect(response.text).to.equal('{"data":{"test":"Hello World"}}'); + expect(response.type).to.equal('multipart/mixed'); + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 45', + '', + '{"data":{"hello":"Hello Rob"},"hasNext":true}', + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 57', + '', + '{"data":{"test":"Hello World"},"path":[],"hasNext":false}', + '', + '-----', + '', + ].join('\r\n'), + ); }); }); } diff --git a/src/__tests__/isAsyncIterable-test.ts b/src/__tests__/isAsyncIterable-test.ts new file mode 100644 index 00000000..6c5c2696 --- /dev/null +++ b/src/__tests__/isAsyncIterable-test.ts @@ -0,0 +1,20 @@ +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import { isAsyncIterable } from '../isAsyncIterable'; + +describe('isAsyncIterable', () => { + it('returns false for null', () => { + expect(isAsyncIterable(null)).to.equal(false); + }); + it('returns false for non-object', () => { + expect(isAsyncIterable(1)).to.equal(false); + }); + it('returns true for async generator function', () => { + // istanbul ignore next: test function + // eslint-disable-next-line @typescript-eslint/no-empty-function + const myGen = async function* () {}; + const result = myGen(); + expect(isAsyncIterable(result)).to.equal(true); + }); +}); diff --git a/src/index.ts b/src/index.ts index 51910376..b67928ec 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,5 @@ -import type { IncomingMessage, ServerResponse } from 'http'; +import type { IncomingMessage } from 'http'; +import { ServerResponse } from 'http'; import type { ASTVisitor, @@ -8,6 +9,9 @@ import type { ExecutionArgs, ExecutionResult, FormattedExecutionResult, + ExecutionPatchResult, + FormattedExecutionPatchResult, + AsyncExecutionResult, GraphQLSchema, GraphQLFieldResolver, GraphQLTypeResolver, @@ -30,12 +34,16 @@ import { import type { GraphiQLOptions, GraphiQLData } from './renderGraphiQL'; import { parseBody } from './parseBody'; +import { isAsyncIterable } from './isAsyncIterable'; import { renderGraphiQL } from './renderGraphiQL'; // `url` is always defined for IncomingMessage coming from http.Server type Request = IncomingMessage & { url: string }; -type Response = ServerResponse & { json?: (data: unknown) => void }; +type Response = ServerResponse & { + json?: (data: unknown) => void; + flush?: () => void; +}; type MaybePromise = Promise | T; /** @@ -94,7 +102,9 @@ export interface OptionsData { * An optional function which will be used to execute instead of default `execute` * from `graphql-js`. */ - customExecuteFn?: (args: ExecutionArgs) => MaybePromise; + customExecuteFn?: ( + args: ExecutionArgs, + ) => MaybePromise>; /** * An optional function which will be used to format any errors produced by @@ -172,7 +182,7 @@ export interface RequestInfo { /** * The result of executing the operation. */ - result: FormattedExecutionResult; + result: AsyncExecutionResult; /** * A value to pass as the context to the graphql() function. @@ -198,8 +208,12 @@ export function graphqlHTTP(options: Options): Middleware { let showGraphiQL = false; let graphiqlOptions; let formatErrorFn = formatError; + let extensionsFn; let pretty = false; + let documentAST: DocumentNode; + let executeResult; let result: ExecutionResult; + let finishedIterable = false; try { // Parse the Request to get GraphQL request parameters. @@ -227,7 +241,6 @@ export function graphqlHTTP(options: Options): Middleware { const fieldResolver = optionsData.fieldResolver; const typeResolver = optionsData.typeResolver; const graphiql = optionsData.graphiql ?? false; - const extensionsFn = optionsData.extensions; const context = optionsData.context ?? request; const parseFn = optionsData.customParseFn ?? parse; const executeFn = optionsData.customExecuteFn ?? execute; @@ -258,6 +271,25 @@ export function graphqlHTTP(options: Options): Middleware { graphiqlOptions = graphiql; } + // Collect and apply any metadata extensions if a function was provided. + // https://graphql.github.io/graphql-spec/#sec-Response-Format + if (optionsData.extensions) { + extensionsFn = (payload: AsyncExecutionResult) => { + /* istanbul ignore else: condition not reachable, required for typescript */ + if (optionsData.extensions) { + return optionsData.extensions({ + document: documentAST, + variables, + operationName, + result: payload, + context, + }); + } + /* istanbul ignore next: condition not reachable, required for typescript */ + return undefined; + }; + } + // If there is no query, but GraphiQL will be displayed, do not produce // a result, otherwise return a 400: Bad Request. if (query == null) { @@ -277,7 +309,6 @@ export function graphqlHTTP(options: Options): Middleware { } // Parse source to AST, reporting any syntax error. - let documentAST; try { documentAST = parseFn(new Source(query, 'GraphQL request')); } catch (syntaxError: unknown) { @@ -323,7 +354,7 @@ export function graphqlHTTP(options: Options): Middleware { // Perform the execution, reporting any errors creating the context. try { - result = await executeFn({ + executeResult = await executeFn({ schema, document: documentAST, rootValue, @@ -333,6 +364,35 @@ export function graphqlHTTP(options: Options): Middleware { fieldResolver, typeResolver, }); + + if (isAsyncIterable(executeResult)) { + // Get first payload from AsyncIterator. http status will reflect status + // of this payload. + const asyncIterator = getAsyncIterator( + executeResult, + ); + + response.on('close', () => { + if ( + !finishedIterable && + typeof asyncIterator.return === 'function' + ) { + asyncIterator.return().then(null, (rawError: unknown) => { + const graphqlError = getGraphQlError(rawError); + sendPartialResponse(pretty, response, { + data: undefined, + errors: [formatErrorFn(graphqlError)], + hasNext: false, + }); + }); + } + }); + + const { value } = await asyncIterator.next(); + result = value; + } else { + result = executeResult; + } } catch (contextError: unknown) { // Return 400: Bad Request if any execution context errors exist. throw httpError(400, 'GraphQL execution context error.', { @@ -340,16 +400,8 @@ export function graphqlHTTP(options: Options): Middleware { }); } - // Collect and apply any metadata extensions if a function was provided. - // https://graphql.github.io/graphql-spec/#sec-Response-Format if (extensionsFn) { - const extensions = await extensionsFn({ - document: documentAST, - variables, - operationName, - result, - context, - }); + const extensions = await extensionsFn(result); if (extensions != null) { result = { ...result, extensions }; @@ -363,6 +415,7 @@ export function graphqlHTTP(options: Options): Middleware { rawError instanceof Error ? rawError : String(rawError), ); + // eslint-disable-next-line require-atomic-updates response.statusCode = error.status; const { headers } = error; @@ -381,9 +434,12 @@ export function graphqlHTTP(options: Options): Middleware { undefined, error, ); - result = { data: undefined, errors: [graphqlError] }; + executeResult = result = { data: undefined, errors: [graphqlError] }; } else { - result = { data: undefined, errors: error.graphqlErrors }; + executeResult = result = { + data: undefined, + errors: error.graphqlErrors, + }; } } @@ -393,6 +449,7 @@ export function graphqlHTTP(options: Options): Middleware { // the resulting JSON payload. // https://graphql.github.io/graphql-spec/#sec-Data if (response.statusCode === 200 && result.data == null) { + // eslint-disable-next-line require-atomic-updates response.statusCode = 500; } @@ -402,6 +459,41 @@ export function graphqlHTTP(options: Options): Middleware { errors: result.errors?.map(formatErrorFn), }; + if (isAsyncIterable(executeResult)) { + response.setHeader('Content-Type', 'multipart/mixed; boundary="-"'); + sendPartialResponse(pretty, response, formattedResult); + try { + for await (let payload of executeResult) { + // Collect and apply any metadata extensions if a function was provided. + // https://graphql.github.io/graphql-spec/#sec-Response-Format + if (extensionsFn) { + const extensions = await extensionsFn(payload); + + if (extensions != null) { + payload = { ...payload, extensions }; + } + } + const formattedPayload: FormattedExecutionPatchResult = { + // first payload is already consumed, all subsequent payloads typed as ExecutionPatchResult + ...(payload as ExecutionPatchResult), + errors: payload.errors?.map(formatErrorFn), + }; + sendPartialResponse(pretty, response, formattedPayload); + } + } catch (rawError: unknown) { + const graphqlError = getGraphQlError(rawError); + sendPartialResponse(pretty, response, { + data: undefined, + errors: [formatErrorFn(graphqlError)], + hasNext: false, + }); + } + response.write('\r\n-----\r\n'); + response.end(); + finishedIterable = true; + return; + } + // If allowed to show GraphiQL, present it instead of JSON. if (showGraphiQL) { return respondWithGraphiQL( @@ -522,6 +614,36 @@ function canDisplayGraphiQL(request: Request, params: GraphQLParams): boolean { return !params.raw && accepts(request).types(['json', 'html']) === 'html'; } +/** + * Helper function for sending part of a multi-part response using only the core Node server APIs. + */ +function sendPartialResponse( + pretty: boolean, + response: Response, + result: FormattedExecutionResult | FormattedExecutionPatchResult, +): void { + const json = JSON.stringify(result, null, pretty ? 2 : 0); + const chunk = Buffer.from(json, 'utf8'); + const data = [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: ' + String(chunk.length), + '', + chunk, + '', + ].join('\r\n'); + response.write(data); + // flush response if compression middleware is used + if ( + typeof response.flush === 'function' && + // @ts-expect-error deprecated flush method is implemented on ServerResponse but not typed + response.flush !== ServerResponse.prototype.flush + ) { + response.flush(); + } +} + /** * Helper function for sending a response using only the core Node server APIs. */ @@ -546,3 +668,24 @@ function devAssert(condition: unknown, message: string): void { throw new TypeError(message); } } + +function getAsyncIterator( + asyncIterable: AsyncIterable, +): AsyncIterator { + const method = asyncIterable[Symbol.asyncIterator]; + return method.call(asyncIterable); +} + +function getGraphQlError(rawError: unknown) { + /* istanbul ignore next: Thrown by underlying library. */ + const error = + rawError instanceof Error ? rawError : new Error(String(rawError)); + return new GraphQLError( + error.message, + undefined, + undefined, + undefined, + undefined, + error, + ); +} diff --git a/src/isAsyncIterable.ts b/src/isAsyncIterable.ts new file mode 100644 index 00000000..b5c8ce65 --- /dev/null +++ b/src/isAsyncIterable.ts @@ -0,0 +1,8 @@ +export function isAsyncIterable( + maybeAsyncIterable: any, +): maybeAsyncIterable is AsyncIterable { + if (maybeAsyncIterable == null || typeof maybeAsyncIterable !== 'object') { + return false; + } + return typeof maybeAsyncIterable[Symbol.asyncIterator] === 'function'; +} From 012249a532dc7907874174ad8427d0908dfac9ad Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Fri, 20 Nov 2020 14:20:03 +0200 Subject: [PATCH 2/4] 0.12.0-experimental-stream-defer.1 --- package-lock.json | 2 +- package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/package-lock.json b/package-lock.json index 70d8098f..e09bd2e2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "express-graphql", - "version": "0.12.0", + "version": "0.12.0-experimental-stream-defer.1", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 0807e9fe..82919dec 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "express-graphql", - "version": "0.12.0", + "version": "0.12.0-experimental-stream-defer.1", "description": "Production ready GraphQL HTTP middleware.", "license": "MIT", "private": true, From 39df1661ffdc7e5436836039d7b40cdf06f27b6a Mon Sep 17 00:00:00 2001 From: Marais Rossouw Date: Thu, 3 Dec 2020 03:09:42 +1000 Subject: [PATCH 3/4] [incremental-delivery]: Remove the content-length requirement (#728) This removes the need for `Content-Length`, and handles the boundaries such that the next boundary is flushed as soon as possible. References: - https://github.com/graphql/graphql-over-http/pull/152 - fmg https://github.com/relay-tools/fetch-multipart-graphql/pull/22 - [meros](https://github.com/maraisr/meros) --- src/__tests__/http-test.ts | 34 ++-------------------------------- src/index.ts | 20 +++++++++----------- 2 files changed, 11 insertions(+), 43 deletions(-) diff --git a/src/__tests__/http-test.ts b/src/__tests__/http-test.ts index 03f5094d..dc7030bb 100644 --- a/src/__tests__/http-test.ts +++ b/src/__tests__/http-test.ts @@ -1074,16 +1074,12 @@ function runTests(server: Server) { '', '---', 'Content-Type: application/json; charset=utf-8', - 'Content-Length: 26', '', '{"data":{},"hasNext":true}', - '', '---', 'Content-Type: application/json; charset=utf-8', - 'Content-Length: 78', '', '{"data":{"test":"Hello World"},"path":[],"label":"deferLabel","hasNext":false}', - '', '-----', '', ].join('\r\n'), @@ -1204,13 +1200,10 @@ function runTests(server: Server) { '', '---', 'Content-Type: application/json; charset=utf-8', - 'Content-Length: 35', '', ['{', ' "data": {},', ' "hasNext": true', '}'].join('\n'), - '', '---', 'Content-Type: application/json; charset=utf-8', - 'Content-Length: 79', '', [ '{', @@ -1221,7 +1214,6 @@ function runTests(server: Server) { ' "hasNext": false', '}', ].join('\n'), - '', '-----', '', ].join('\r\n'), @@ -1413,16 +1405,12 @@ function runTests(server: Server) { '', '---', 'Content-Type: application/json; charset=utf-8', - 'Content-Length: 94', '', '{"errors":[{"message":"Custom error format: Throws!"}],"data":{"thrower":null},"hasNext":true}', - '', '---', 'Content-Type: application/json; charset=utf-8', - 'Content-Length: 57', '', '{"data":{"test":"Hello World"},"path":[],"hasNext":false}', - '', '-----', '', ].join('\r\n'), @@ -1464,16 +1452,12 @@ function runTests(server: Server) { '', '---', 'Content-Type: application/json; charset=utf-8', - 'Content-Length: 46', '', '{"data":{"test":"Hello World"},"hasNext":true}', - '', '---', 'Content-Type: application/json; charset=utf-8', - 'Content-Length: 105', '', '{"data":{"thrower":null},"path":[],"errors":[{"message":"Custom error format: Throws!"}],"hasNext":false}', - '', '-----', '', ].join('\r\n'), @@ -2453,16 +2437,12 @@ function runTests(server: Server) { '', '---', 'Content-Type: application/json; charset=utf-8', - 'Content-Length: 48', '', '{"data":{"test2":"Modification"},"hasNext":true}', - '', '---', 'Content-Type: application/json; charset=utf-8', - 'Content-Length: 64', '', '{"errors":[{"message":"I did something wrong"}],"hasNext":false}', - '', '-----', '', ].join('\r\n'), @@ -2527,10 +2507,9 @@ function runTests(server: Server) { '', '---', 'Content-Type: application/json; charset=utf-8', - 'Content-Length: 47', '', '{"data":{"test":"Hello, World"},"hasNext":true}', - '', + '---\r\n', ].join('\r\n'), ); expect(fakeReturn.callCount).to.equal(1); @@ -2592,10 +2571,9 @@ function runTests(server: Server) { '', '---', 'Content-Type: application/json; charset=utf-8', - 'Content-Length: 47', '', '{"data":{"test":"Hello, World"},"hasNext":true}', - '', + '---\r\n', ].join('\r\n'), ); }); @@ -2748,16 +2726,12 @@ function runTests(server: Server) { '', '---', 'Content-Type: application/json; charset=utf-8', - 'Content-Length: 124', '', '{"data":{"hello":"Hello Rob"},"hasNext":true,"extensions":{"preservedResult":{"data":{"hello":"Hello Rob"},"hasNext":true}}}', - '', '---', 'Content-Type: application/json; charset=utf-8', - 'Content-Length: 148', '', '{"data":{"test":"Hello World"},"path":[],"hasNext":false,"extensions":{"preservedResult":{"data":{"test":"Hello World"},"path":[],"hasNext":false}}}', - '', '-----', '', ].join('\r\n'), @@ -2828,16 +2802,12 @@ function runTests(server: Server) { '', '---', 'Content-Type: application/json; charset=utf-8', - 'Content-Length: 45', '', '{"data":{"hello":"Hello Rob"},"hasNext":true}', - '', '---', 'Content-Type: application/json; charset=utf-8', - 'Content-Length: 57', '', '{"data":{"test":"Hello World"},"path":[],"hasNext":false}', - '', '-----', '', ].join('\r\n'), diff --git a/src/index.ts b/src/index.ts index b67928ec..8ce8b184 100644 --- a/src/index.ts +++ b/src/index.ts @@ -461,6 +461,7 @@ export function graphqlHTTP(options: Options): Middleware { if (isAsyncIterable(executeResult)) { response.setHeader('Content-Type', 'multipart/mixed; boundary="-"'); + response.write('\r\n---\r\n'); sendPartialResponse(pretty, response, formattedResult); try { for await (let payload of executeResult) { @@ -488,7 +489,6 @@ export function graphqlHTTP(options: Options): Middleware { hasNext: false, }); } - response.write('\r\n-----\r\n'); response.end(); finishedIterable = true; return; @@ -624,16 +624,14 @@ function sendPartialResponse( ): void { const json = JSON.stringify(result, null, pretty ? 2 : 0); const chunk = Buffer.from(json, 'utf8'); - const data = [ - '', - '---', - 'Content-Type: application/json; charset=utf-8', - 'Content-Length: ' + String(chunk.length), - '', - chunk, - '', - ].join('\r\n'); - response.write(data); + const data = ['Content-Type: application/json; charset=utf-8', '', chunk]; + // @ts-expect-error + if (result.hasNext === true) { + data.push('---\r\n'); + } else { + data.push('-----\r\n'); + } + response.write(data.join('\r\n')); // flush response if compression middleware is used if ( typeof response.flush === 'function' && From 9acd26470a7d1df6304c60b44e8e38e4b9022238 Mon Sep 17 00:00:00 2001 From: Rob Richard Date: Wed, 9 Dec 2020 20:20:58 -0500 Subject: [PATCH 4/4] fix flaky tests --- src/__tests__/http-test.ts | 255 +++++++++++++++++------------ src/__tests__/simplePubSub-test.ts | 83 ++++++++++ src/__tests__/simplePubSub.ts | 75 +++++++++ src/index.ts | 9 +- 4 files changed, 306 insertions(+), 116 deletions(-) create mode 100644 src/__tests__/simplePubSub-test.ts create mode 100644 src/__tests__/simplePubSub.ts diff --git a/src/__tests__/http-test.ts b/src/__tests__/http-test.ts index dc7030bb..61aadfee 100644 --- a/src/__tests__/http-test.ts +++ b/src/__tests__/http-test.ts @@ -1,5 +1,6 @@ import zlib from 'zlib'; -import type http from 'http'; +import http from 'http'; +import type { AddressInfo } from 'net'; import type { Server as Restify } from 'restify'; import connect from 'connect'; @@ -7,7 +8,11 @@ import express from 'express'; import supertest from 'supertest'; import bodyParser from 'body-parser'; -import type { ASTVisitor, ValidationContext } from 'graphql'; +import type { + ASTVisitor, + ValidationContext, + AsyncExecutionResult, +} from 'graphql'; import sinon from 'sinon'; import multer from 'multer'; // cSpell:words mimetype originalname import { expect } from 'chai'; @@ -28,8 +33,11 @@ import { import { graphqlHTTP } from '../index'; import { isAsyncIterable } from '../isAsyncIterable'; +import SimplePubSub from './simplePubSub'; + type Middleware = (req: any, res: any, next: () => void) => unknown; type Server = () => { + listener: http.Server | http.RequestListener; request: () => supertest.SuperTest; use: (middleware: Middleware) => unknown; get: (path: string, middleware: Middleware) => unknown; @@ -82,10 +90,51 @@ function urlString(urlParams?: { [param: string]: string }): string { return string; } -function sleep(ms = 1) { - return new Promise((r) => { - setTimeout(r, ms); +async function streamRequest( + server: Server, + customExecuteFn?: () => AsyncIterable, +): Promise<{ + req: http.ClientRequest; + responseStream: http.IncomingMessage; +}> { + const app = server(); + app.get( + urlString(), + graphqlHTTP(() => ({ + schema: TestSchema, + customExecuteFn, + })), + ); + + let httpServer: http.Server; + if (typeof app.listener === 'function') { + httpServer = http.createServer(app.listener); + } else { + httpServer = app.listener; + } + await new Promise((resolve) => { + httpServer.listen(resolve); + }); + + const addr = httpServer.address() as AddressInfo; + const req = http.get({ + method: 'GET', + path: urlString({ query: '{test}' }), + port: addr.port, }); + + const res: http.IncomingMessage = await new Promise((resolve) => { + req.on('response', resolve); + }); + + req.on('close', () => { + httpServer.close(); + }); + + return { + req, + responseStream: res, + }; } describe('GraphQL-HTTP tests for connect', () => { @@ -99,6 +148,7 @@ describe('GraphQL-HTTP tests for connect', () => { }); return { + listener: app, request: () => supertest(app), use: app.use.bind(app), // Connect only likes using app.use. @@ -123,6 +173,7 @@ describe('GraphQL-HTTP tests for express', () => { }); return { + listener: app, request: () => supertest(app), use: app.use.bind(app), get: app.get.bind(app), @@ -148,6 +199,7 @@ describe('GraphQL-HTTP tests for restify', () => { }); return { + listener: app.server, request: () => supertest(app), use: app.use.bind(app), get: app.get.bind(app), @@ -2406,8 +2458,8 @@ function runTests(server: Server) { urlString(), graphqlHTTP(() => ({ schema: TestSchema, + // eslint-disable-next-line @typescript-eslint/require-await async *customExecuteFn() { - await sleep(); yield { data: { test2: 'Modification', @@ -2450,132 +2502,117 @@ function runTests(server: Server) { }); it('calls return on underlying async iterable when connection is closed', async () => { - const app = server(); - const fakeReturn = sinon.fake(); + const executePubSub = new SimplePubSub(); + const executeIterable = executePubSub.getSubscriber(); + const spy = sinon.spy(executeIterable[Symbol.asyncIterator](), 'return'); + expect( + executePubSub.emit({ data: { test: 'Hello, World 1' }, hasNext: true }), + ).to.equal(true); - app.get( - urlString(), - graphqlHTTP(() => ({ - schema: TestSchema, - // custom iterable keeps yielding until return is called - customExecuteFn() { - let returned = false; - return { - [Symbol.asyncIterator]: () => ({ - next: async () => { - await sleep(); - if (returned) { - return { value: undefined, done: true }; - } - return { - value: { data: { test: 'Hello, World' }, hasNext: true }, - done: false, - }; - }, - return: () => { - returned = true; - fakeReturn(); - return Promise.resolve({ value: undefined, done: true }); - }, - }), - }; - }, - })), + const { req, responseStream } = await streamRequest( + server, + () => executeIterable, ); + const iterator = responseStream[Symbol.asyncIterator](); - let text = ''; - const request = app - .request() - .get(urlString({ query: '{test}' })) - .parse((res, cb) => { - res.on('data', (data) => { - text = `${text}${data.toString('utf8') as string}`; - ((res as unknown) as http.IncomingMessage).destroy(); - cb(new Error('Aborted connection'), null); - }); - }); - - try { - await request; - } catch (e: unknown) { - // ignore aborted error - } - // sleep to allow time for return function to be called - await sleep(2); - expect(text).to.equal( + const { value: firstResponse } = await iterator.next(); + expect(firstResponse.toString('utf8')).to.equal( [ + '\r\n---', + 'Content-Type: application/json; charset=utf-8', '', - '---', + '{"data":{"test":"Hello, World 1"},"hasNext":true}', + '---\r\n', + ].join('\r\n'), + ); + + expect( + executePubSub.emit({ data: { test: 'Hello, World 2' }, hasNext: true }), + ).to.equal(true); + const { value: secondResponse } = await iterator.next(); + expect(secondResponse.toString('utf8')).to.equal( + [ 'Content-Type: application/json; charset=utf-8', '', - '{"data":{"test":"Hello, World"},"hasNext":true}', + '{"data":{"test":"Hello, World 2"},"hasNext":true}', '---\r\n', ].join('\r\n'), ); - expect(fakeReturn.callCount).to.equal(1); + + req.destroy(); + + // wait for server to call return on underlying iterable + await executeIterable.next(); + + expect(spy.calledOnce).to.equal(true); + // emit returns false because `return` cleaned up subscribers + expect( + executePubSub.emit({ data: { test: 'Hello, World 3' }, hasNext: true }), + ).to.equal(false); }); it('handles return function on async iterable that throws', async () => { - const app = server(); + const executePubSub = new SimplePubSub(); + const executeIterable = executePubSub.getSubscriber(); + const executeIterator = executeIterable[Symbol.asyncIterator](); + const originalReturn = executeIterator.return.bind(executeIterator); + const fake = sinon.fake(async () => { + await originalReturn(); + throw new Error('Throws!'); + }); + sinon.replace(executeIterator, 'return', fake); + expect( + executePubSub.emit({ + data: { test: 'Hello, World 1' }, + hasNext: true, + }), + ).to.equal(true); - app.get( - urlString(), - graphqlHTTP(() => ({ - schema: TestSchema, - // custom iterable keeps yielding until return is called - customExecuteFn() { - let returned = false; - return { - [Symbol.asyncIterator]: () => ({ - next: async () => { - await sleep(); - if (returned) { - return { value: undefined, done: true }; - } - return { - value: { data: { test: 'Hello, World' }, hasNext: true }, - done: false, - }; - }, - return: () => { - returned = true; - return Promise.reject(new Error('Throws!')); - }, - }), - }; - }, - })), + const { req, responseStream } = await streamRequest( + server, + () => executeIterable, ); + const iterator = responseStream[Symbol.asyncIterator](); - let text = ''; - const request = app - .request() - .get(urlString({ query: '{test}' })) - .parse((res, cb) => { - res.on('data', (data) => { - text = `${text}${data.toString('utf8') as string}`; - ((res as unknown) as http.IncomingMessage).destroy(); - cb(new Error('Aborted connection'), null); - }); - }); - - try { - await request; - } catch (e: unknown) { - // ignore aborted error - } - // sleep to allow return function to be called - await sleep(2); - expect(text).to.equal( + const { value: firstResponse } = await iterator.next(); + expect(firstResponse.toString('utf8')).to.equal( [ + '\r\n---', + 'Content-Type: application/json; charset=utf-8', '', - '---', + '{"data":{"test":"Hello, World 1"},"hasNext":true}', + '---\r\n', + ].join('\r\n'), + ); + + expect( + executePubSub.emit({ + data: { test: 'Hello, World 2' }, + hasNext: true, + }), + ).to.equal(true); + const { value: secondResponse } = await iterator.next(); + expect(secondResponse.toString('utf8')).to.equal( + [ 'Content-Type: application/json; charset=utf-8', '', - '{"data":{"test":"Hello, World"},"hasNext":true}', + '{"data":{"test":"Hello, World 2"},"hasNext":true}', '---\r\n', ].join('\r\n'), ); + req.destroy(); + + // wait for server to call return on underlying iterable + await executeIterable.next(); + + expect(fake.calledOnce).to.equal(true); + // emit returns false because `return` cleaned up subscribers + expect( + executePubSub.emit({ + data: { test: 'Hello, World 3' }, + hasNext: true, + }), + ).to.equal(false); }); }); diff --git a/src/__tests__/simplePubSub-test.ts b/src/__tests__/simplePubSub-test.ts new file mode 100644 index 00000000..b47f337d --- /dev/null +++ b/src/__tests__/simplePubSub-test.ts @@ -0,0 +1,83 @@ +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import SimplePubSub from './simplePubSub'; + +describe('SimplePubSub', () => { + it('subscribe async-iterator mock', async () => { + const pubsub = new SimplePubSub(); + const iterator = pubsub.getSubscriber(); + + // Queue up publishes + expect(pubsub.emit('Apple')).to.equal(true); + expect(pubsub.emit('Banana')).to.equal(true); + + // Read payloads + expect(await iterator.next()).to.deep.equal({ + done: false, + value: 'Apple', + }); + expect(await iterator.next()).to.deep.equal({ + done: false, + value: 'Banana', + }); + + // Read ahead + const i3 = iterator.next().then((x) => x); + const i4 = iterator.next().then((x) => x); + + // Publish + expect(pubsub.emit('Coconut')).to.equal(true); + expect(pubsub.emit('Durian')).to.equal(true); + + // Await out of order to get correct results + expect(await i4).to.deep.equal({ done: false, value: 'Durian' }); + expect(await i3).to.deep.equal({ done: false, value: 'Coconut' }); + + // Read ahead + const i5 = iterator.next().then((x) => x); + + // Terminate queue + await iterator.return(); + + // Publish is not caught after terminate + expect(pubsub.emit('Fig')).to.equal(false); + + // Find that cancelled read-ahead got a "done" result + expect(await i5).to.deep.equal({ done: true, value: undefined }); + + // And next returns empty completion value + expect(await iterator.next()).to.deep.equal({ + done: true, + value: undefined, + }); + }); + it('empties queue when thrown', async () => { + const pubsub = new SimplePubSub(); + const iterator = pubsub.getSubscriber(); + + expect(pubsub.emit('Apple')).to.equal(true); + + // Read payloads + expect(await iterator.next()).to.deep.equal({ + done: false, + value: 'Apple', + }); + + // Terminate queue + try { + await iterator.throw(new Error('Thrown!')); + } catch (e: unknown) { + // ignore thrown error + } + + // Publish is not caught after terminate + expect(pubsub.emit('Fig')).to.equal(false); + + // And next returns empty completion value + expect(await iterator.next()).to.deep.equal({ + done: true, + value: undefined, + }); + }); +}); diff --git a/src/__tests__/simplePubSub.ts b/src/__tests__/simplePubSub.ts new file mode 100644 index 00000000..64dfa303 --- /dev/null +++ b/src/__tests__/simplePubSub.ts @@ -0,0 +1,75 @@ +/** + * Create an AsyncIterator from an EventEmitter. Useful for mocking a + * PubSub system for tests. + */ +export default class SimplePubSub { + subscribers: Set<(arg0: T) => void>; + + constructor() { + this.subscribers = new Set(); + } + + emit(event: T): boolean { + for (const subscriber of this.subscribers) { + subscriber(event); + } + return this.subscribers.size > 0; + } + + // Use custom return type to avoid checking for optional `return` method + getSubscriber(): AsyncGenerator { + type EventResolve = (arg0: IteratorResult) => void; + + const pullQueue: Array = []; + const pushQueue: Array = []; + let listening = true; + this.subscribers.add(pushValue); + + const emptyQueue = () => { + listening = false; + this.subscribers.delete(pushValue); + for (const resolve of pullQueue) { + resolve({ value: undefined, done: true }); + } + pullQueue.length = 0; + pushQueue.length = 0; + }; + + return { + next(): Promise> { + if (!listening) { + return Promise.resolve({ value: undefined, done: true }); + } + + if (pushQueue.length > 0) { + return Promise.resolve({ + value: pushQueue.shift() as T, + done: false, + }); + } + return new Promise((resolve: EventResolve) => { + pullQueue.push(resolve); + }); + }, + return() { + emptyQueue(); + return Promise.resolve({ value: undefined, done: true }); + }, + throw(error: unknown) { + emptyQueue(); + return Promise.reject(error); + }, + [Symbol.asyncIterator]() { + return this; + }, + }; + + function pushValue(value: T): void { + if (pullQueue.length > 0) { + (pullQueue.shift() as EventResolve)({ value, done: false }); + } else { + pushQueue.push(value); + } + } + } +} diff --git a/src/index.ts b/src/index.ts index 8ce8b184..0785688c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -377,13 +377,8 @@ export function graphqlHTTP(options: Options): Middleware { !finishedIterable && typeof asyncIterator.return === 'function' ) { - asyncIterator.return().then(null, (rawError: unknown) => { - const graphqlError = getGraphQlError(rawError); - sendPartialResponse(pretty, response, { - data: undefined, - errors: [formatErrorFn(graphqlError)], - hasNext: false, - }); + asyncIterator.return().catch(() => { + // can't do anything with the error, connection is already closed }); } });