From da0c17c0b44311ea66b821dac2790c9c9f83f5d6 Mon Sep 17 00:00:00 2001 From: Rob Richard Date: Wed, 9 Dec 2020 20:20:58 -0500 Subject: [PATCH] fix flaky tests --- src/__tests__/http-test.ts | 255 +++++++++++++++++------------ src/__tests__/simplePubSub-test.ts | 83 ++++++++++ src/__tests__/simplePubSub.ts | 75 +++++++++ src/index.ts | 9 +- 4 files changed, 306 insertions(+), 116 deletions(-) create mode 100644 src/__tests__/simplePubSub-test.ts create mode 100644 src/__tests__/simplePubSub.ts diff --git a/src/__tests__/http-test.ts b/src/__tests__/http-test.ts index dc7030bb..7cb418d3 100644 --- a/src/__tests__/http-test.ts +++ b/src/__tests__/http-test.ts @@ -1,5 +1,6 @@ import zlib from 'zlib'; -import type http from 'http'; +import http from 'http'; +import type { AddressInfo } from 'net'; import type { Server as Restify } from 'restify'; import connect from 'connect'; @@ -7,7 +8,11 @@ import express from 'express'; import supertest from 'supertest'; import bodyParser from 'body-parser'; -import type { ASTVisitor, ValidationContext } from 'graphql'; +import type { + ASTVisitor, + ValidationContext, + AsyncExecutionResult, +} from 'graphql'; import sinon from 'sinon'; import multer from 'multer'; // cSpell:words mimetype originalname import { expect } from 'chai'; @@ -28,8 +33,11 @@ import { import { graphqlHTTP } from '../index'; import { isAsyncIterable } from '../isAsyncIterable'; +import SimplePubSub from './simplePubSub'; + type Middleware = (req: any, res: any, next: () => void) => unknown; type Server = () => { + listener: http.Server | http.RequestListener; request: () => supertest.SuperTest; use: (middleware: Middleware) => unknown; get: (path: string, middleware: Middleware) => unknown; @@ -82,10 +90,51 @@ function urlString(urlParams?: { [param: string]: string }): string { return string; } -function sleep(ms = 1) { - return new Promise((r) => { - setTimeout(r, ms); +async function streamRequest( + server: Server, + customExecuteFn?: () => AsyncIterable, +): Promise<{ + req: http.ClientRequest; + responseStream: http.IncomingMessage; +}> { + const app = server(); + app.get( + urlString(), + graphqlHTTP(() => ({ + schema: TestSchema, + customExecuteFn, + })), + ); + + let httpServer: http.Server; + if (typeof app.listener === 'function') { + httpServer = http.createServer(app.listener); + } else { + httpServer = app.listener; + } + await new Promise((resolve) => { + httpServer.listen(resolve); + }); + + const addr = httpServer.address() as AddressInfo; + const req = http.get({ + method: 'GET', + path: urlString({ query: '{test}' }), + port: addr.port, }); + + const res: http.IncomingMessage = await new Promise((resolve) => { + req.on('response', resolve); + }); + + res.on('close', () => { + httpServer.close(); + }); + + return { + req, + responseStream: res, + }; } describe('GraphQL-HTTP tests for connect', () => { @@ -99,6 +148,7 @@ describe('GraphQL-HTTP tests for connect', () => { }); return { + listener: app, request: () => supertest(app), use: app.use.bind(app), // Connect only likes using app.use. @@ -123,6 +173,7 @@ describe('GraphQL-HTTP tests for express', () => { }); return { + listener: app, request: () => supertest(app), use: app.use.bind(app), get: app.get.bind(app), @@ -148,6 +199,7 @@ describe('GraphQL-HTTP tests for restify', () => { }); return { + listener: app.server, request: () => supertest(app), use: app.use.bind(app), get: app.get.bind(app), @@ -2406,8 +2458,8 @@ function runTests(server: Server) { urlString(), graphqlHTTP(() => ({ schema: TestSchema, + // eslint-disable-next-line @typescript-eslint/require-await async *customExecuteFn() { - await sleep(); yield { data: { test2: 'Modification', @@ -2450,132 +2502,117 @@ function runTests(server: Server) { }); it('calls return on underlying async iterable when connection is closed', async () => { - const app = server(); - const fakeReturn = sinon.fake(); + const executePubSub = new SimplePubSub(); + const executeIterable = executePubSub.getSubscriber(); + const spy = sinon.spy(executeIterable[Symbol.asyncIterator](), 'return'); + expect( + executePubSub.emit({ data: { test: 'Hello, World 1' }, hasNext: true }), + ).to.equal(true); - app.get( - urlString(), - graphqlHTTP(() => ({ - schema: TestSchema, - // custom iterable keeps yielding until return is called - customExecuteFn() { - let returned = false; - return { - [Symbol.asyncIterator]: () => ({ - next: async () => { - await sleep(); - if (returned) { - return { value: undefined, done: true }; - } - return { - value: { data: { test: 'Hello, World' }, hasNext: true }, - done: false, - }; - }, - return: () => { - returned = true; - fakeReturn(); - return Promise.resolve({ value: undefined, done: true }); - }, - }), - }; - }, - })), + const { req, responseStream } = await streamRequest( + server, + () => executeIterable, ); + const iterator = responseStream[Symbol.asyncIterator](); - let text = ''; - const request = app - .request() - .get(urlString({ query: '{test}' })) - .parse((res, cb) => { - res.on('data', (data) => { - text = `${text}${data.toString('utf8') as string}`; - ((res as unknown) as http.IncomingMessage).destroy(); - cb(new Error('Aborted connection'), null); - }); - }); - - try { - await request; - } catch (e: unknown) { - // ignore aborted error - } - // sleep to allow time for return function to be called - await sleep(2); - expect(text).to.equal( + const { value: firstResponse } = await iterator.next(); + expect(firstResponse.toString('utf8')).to.equal( [ + '\r\n---', + 'Content-Type: application/json; charset=utf-8', '', - '---', + '{"data":{"test":"Hello, World 1"},"hasNext":true}', + '---\r\n', + ].join('\r\n'), + ); + + expect( + executePubSub.emit({ data: { test: 'Hello, World 2' }, hasNext: true }), + ).to.equal(true); + const { value: secondResponse } = await iterator.next(); + expect(secondResponse.toString('utf8')).to.equal( + [ 'Content-Type: application/json; charset=utf-8', '', - '{"data":{"test":"Hello, World"},"hasNext":true}', + '{"data":{"test":"Hello, World 2"},"hasNext":true}', '---\r\n', ].join('\r\n'), ); - expect(fakeReturn.callCount).to.equal(1); + + req.destroy(); + + // wait for server to call return on underlying iterable + await executeIterable.next(); + + expect(spy.calledOnce).to.equal(true); + // emit returns false because `return` cleaned up subscribers + expect( + executePubSub.emit({ data: { test: 'Hello, World 3' }, hasNext: true }), + ).to.equal(false); }); it('handles return function on async iterable that throws', async () => { - const app = server(); + const executePubSub = new SimplePubSub(); + const executeIterable = executePubSub.getSubscriber(); + const executeIterator = executeIterable[Symbol.asyncIterator](); + const originalReturn = executeIterator.return.bind(executeIterator); + const fake = sinon.fake(async () => { + await originalReturn(); + throw new Error('Throws!'); + }); + sinon.replace(executeIterator, 'return', fake); + expect( + executePubSub.emit({ + data: { test: 'Hello, World 1' }, + hasNext: true, + }), + ).to.equal(true); - app.get( - urlString(), - graphqlHTTP(() => ({ - schema: TestSchema, - // custom iterable keeps yielding until return is called - customExecuteFn() { - let returned = false; - return { - [Symbol.asyncIterator]: () => ({ - next: async () => { - await sleep(); - if (returned) { - return { value: undefined, done: true }; - } - return { - value: { data: { test: 'Hello, World' }, hasNext: true }, - done: false, - }; - }, - return: () => { - returned = true; - return Promise.reject(new Error('Throws!')); - }, - }), - }; - }, - })), + const { req, responseStream } = await streamRequest( + server, + () => executeIterable, ); + const iterator = responseStream[Symbol.asyncIterator](); - let text = ''; - const request = app - .request() - .get(urlString({ query: '{test}' })) - .parse((res, cb) => { - res.on('data', (data) => { - text = `${text}${data.toString('utf8') as string}`; - ((res as unknown) as http.IncomingMessage).destroy(); - cb(new Error('Aborted connection'), null); - }); - }); - - try { - await request; - } catch (e: unknown) { - // ignore aborted error - } - // sleep to allow return function to be called - await sleep(2); - expect(text).to.equal( + const { value: firstResponse } = await iterator.next(); + expect(firstResponse.toString('utf8')).to.equal( [ + '\r\n---', + 'Content-Type: application/json; charset=utf-8', '', - '---', + '{"data":{"test":"Hello, World 1"},"hasNext":true}', + '---\r\n', + ].join('\r\n'), + ); + + expect( + executePubSub.emit({ + data: { test: 'Hello, World 2' }, + hasNext: true, + }), + ).to.equal(true); + const { value: secondResponse } = await iterator.next(); + expect(secondResponse.toString('utf8')).to.equal( + [ 'Content-Type: application/json; charset=utf-8', '', - '{"data":{"test":"Hello, World"},"hasNext":true}', + '{"data":{"test":"Hello, World 2"},"hasNext":true}', '---\r\n', ].join('\r\n'), ); + req.destroy(); + + // wait for server to call return on underlying iterable + await executeIterable.next(); + + expect(fake.calledOnce).to.equal(true); + // emit returns false because `return` cleaned up subscribers + expect( + executePubSub.emit({ + data: { test: 'Hello, World 3' }, + hasNext: true, + }), + ).to.equal(false); }); }); diff --git a/src/__tests__/simplePubSub-test.ts b/src/__tests__/simplePubSub-test.ts new file mode 100644 index 00000000..b47f337d --- /dev/null +++ b/src/__tests__/simplePubSub-test.ts @@ -0,0 +1,83 @@ +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import SimplePubSub from './simplePubSub'; + +describe('SimplePubSub', () => { + it('subscribe async-iterator mock', async () => { + const pubsub = new SimplePubSub(); + const iterator = pubsub.getSubscriber(); + + // Queue up publishes + expect(pubsub.emit('Apple')).to.equal(true); + expect(pubsub.emit('Banana')).to.equal(true); + + // Read payloads + expect(await iterator.next()).to.deep.equal({ + done: false, + value: 'Apple', + }); + expect(await iterator.next()).to.deep.equal({ + done: false, + value: 'Banana', + }); + + // Read ahead + const i3 = iterator.next().then((x) => x); + const i4 = iterator.next().then((x) => x); + + // Publish + expect(pubsub.emit('Coconut')).to.equal(true); + expect(pubsub.emit('Durian')).to.equal(true); + + // Await out of order to get correct results + expect(await i4).to.deep.equal({ done: false, value: 'Durian' }); + expect(await i3).to.deep.equal({ done: false, value: 'Coconut' }); + + // Read ahead + const i5 = iterator.next().then((x) => x); + + // Terminate queue + await iterator.return(); + + // Publish is not caught after terminate + expect(pubsub.emit('Fig')).to.equal(false); + + // Find that cancelled read-ahead got a "done" result + expect(await i5).to.deep.equal({ done: true, value: undefined }); + + // And next returns empty completion value + expect(await iterator.next()).to.deep.equal({ + done: true, + value: undefined, + }); + }); + it('empties queue when thrown', async () => { + const pubsub = new SimplePubSub(); + const iterator = pubsub.getSubscriber(); + + expect(pubsub.emit('Apple')).to.equal(true); + + // Read payloads + expect(await iterator.next()).to.deep.equal({ + done: false, + value: 'Apple', + }); + + // Terminate queue + try { + await iterator.throw(new Error('Thrown!')); + } catch (e: unknown) { + // ignore thrown error + } + + // Publish is not caught after terminate + expect(pubsub.emit('Fig')).to.equal(false); + + // And next returns empty completion value + expect(await iterator.next()).to.deep.equal({ + done: true, + value: undefined, + }); + }); +}); diff --git a/src/__tests__/simplePubSub.ts b/src/__tests__/simplePubSub.ts new file mode 100644 index 00000000..64dfa303 --- /dev/null +++ b/src/__tests__/simplePubSub.ts @@ -0,0 +1,75 @@ +/** + * Create an AsyncIterator from an EventEmitter. Useful for mocking a + * PubSub system for tests. + */ +export default class SimplePubSub { + subscribers: Set<(arg0: T) => void>; + + constructor() { + this.subscribers = new Set(); + } + + emit(event: T): boolean { + for (const subscriber of this.subscribers) { + subscriber(event); + } + return this.subscribers.size > 0; + } + + // Use custom return type to avoid checking for optional `return` method + getSubscriber(): AsyncGenerator { + type EventResolve = (arg0: IteratorResult) => void; + + const pullQueue: Array = []; + const pushQueue: Array = []; + let listening = true; + this.subscribers.add(pushValue); + + const emptyQueue = () => { + listening = false; + this.subscribers.delete(pushValue); + for (const resolve of pullQueue) { + resolve({ value: undefined, done: true }); + } + pullQueue.length = 0; + pushQueue.length = 0; + }; + + return { + next(): Promise> { + if (!listening) { + return Promise.resolve({ value: undefined, done: true }); + } + + if (pushQueue.length > 0) { + return Promise.resolve({ + value: pushQueue.shift() as T, + done: false, + }); + } + return new Promise((resolve: EventResolve) => { + pullQueue.push(resolve); + }); + }, + return() { + emptyQueue(); + return Promise.resolve({ value: undefined, done: true }); + }, + throw(error: unknown) { + emptyQueue(); + return Promise.reject(error); + }, + [Symbol.asyncIterator]() { + return this; + }, + }; + + function pushValue(value: T): void { + if (pullQueue.length > 0) { + (pullQueue.shift() as EventResolve)({ value, done: false }); + } else { + pushQueue.push(value); + } + } + } +} diff --git a/src/index.ts b/src/index.ts index 9cc5c51f..7425cf1d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -378,13 +378,8 @@ export function graphqlHTTP(options: Options): Middleware { !finishedIterable && typeof asyncIterator.return === 'function' ) { - asyncIterator.return().then(null, (rawError: unknown) => { - const graphqlError = getGraphQlError(rawError); - sendPartialResponse(pretty, response, { - data: undefined, - errors: [formatErrorFn(graphqlError)], - hasNext: false, - }); + asyncIterator.return().catch(() => { + // can't do anything with the error, connection is already closed }); } });