From ab54ee077c917a497f9559e196a4d2ae9eb55f7e Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Fri, 19 Mar 2021 16:33:31 +0200 Subject: [PATCH 1/5] fix(race): concurrent next calls --- src/execution/__tests__/stream-test.ts | 66 +++++++++++++++++++ src/execution/execute.ts | 90 +++++++++++++++----------- 2 files changed, 118 insertions(+), 38 deletions(-) diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index d951d0d5e9..41e1f8ffd6 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -437,6 +437,72 @@ describe('Execute: stream directive', () => { }, ]); }); + it('Can stream a field that returns an async iterable', async () => { + const document = parse(` + query { + asyncIterableList @stream(initialCount: 2) { + name + id + } + } + `); + const schema = new GraphQLSchema({ query }); + + const result = await execute({ schema, document, rootValue: {} }); + + const results = []; + if (isAsyncIterable(result)) { + const asyncResults = await Promise.all([ + result.next(), + result.next(), + result.next(), + result.next(), + ]); + results.push(...asyncResults); + } + + expect(results).to.deep.equal([ + { + done: false, + value: { + data: { + asyncIterableList: [ + { + name: 'Luke', + id: '1', + }, + { + name: 'Han', + id: '2', + }, + ], + }, + hasNext: true, + }, + }, + { + done: false, + value: { + data: { + name: 'Leia', + id: '3', + }, + path: ['asyncIterableList', 2], + hasNext: true, + }, + }, + { + done: false, + value: { + hasNext: false, + }, + }, + { + done: true, + value: undefined, + }, + ]); + }); it('Handles error thrown in async iterable before initialCount is reached', async () => { const document = parse(` query { diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 71424f20b9..1d93e74a94 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -1784,47 +1784,61 @@ export class Dispatcher { done: false, }); } - return new Promise<{ - promise: Promise>; - }>((resolve) => { + return new Promise((resolve) => { + let resolved = false; this._subsequentPayloads.forEach((promise) => { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - promise.then(() => { - // resolve with actual promise, not resolved value of promise so we can remove it from this._subsequentPayloads - resolve({ promise }); - }); - }); - }) - .then(({ promise }) => { - this._subsequentPayloads.splice( - this._subsequentPayloads.indexOf(promise), - 1, - ); - return promise; - }) - .then(({ value, done }) => { - if (done && this._subsequentPayloads.length === 0) { - // async iterable resolver just finished and no more pending payloads - return { - value: { - hasNext: false, - }, - done: false, + promise.then((payload) => { + if (resolved) { + return; + } + + resolved = true; + + if (this._subsequentPayloads.length === 0) { + // a different call to next has exhausted all payloads + resolve({ value: undefined, done: true }); + return; + } + + const index = this._subsequentPayloads.indexOf(promise); + + if (index === -1) { + // a different call to next has consumed this payload + resolve(this._race()); + return; + } + + this._subsequentPayloads.splice(index, 1); + + const { value, done } = payload; + + if (done && this._subsequentPayloads.length === 0) { + // async iterable resolver just finished and no more pending payloads + resolve({ + value: { + hasNext: false, + }, + done: false, + }); + return; + } else if (done) { + // async iterable resolver just finished but there are pending payloads + // return the next one + resolve(this._race()); + return; + } + + const returnValue: ExecutionPatchResult = { + ...value, + hasNext: this._subsequentPayloads.length > 0, }; - } else if (done) { - // async iterable resolver just finished but there are pending payloads - // return the next one - return this._race(); - } - const returnValue: ExecutionPatchResult = { - ...value, - hasNext: this._subsequentPayloads.length > 0, - }; - return { - value: returnValue, - done: false, - }; + resolve({ + value: returnValue, + done: false, + }); + }); }); + }); } _next(): Promise> { From 39f0c2860668829bf968f9e849dc6dfdf942f6d3 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Sun, 21 Mar 2021 15:30:59 +0200 Subject: [PATCH 2/5] refactor test --- src/execution/__tests__/stream-test.ts | 33 +++++++++++++------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index 41e1f8ffd6..c0d5a7f3d6 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -133,6 +133,21 @@ async function complete(document: DocumentNode) { return result; } +async function completeAsync(document, numCalls) { + const schema = new GraphQLSchema({ query }); + + const result = await execute({ schema, document, rootValue: {} }); + + if (isAsyncIterable(result)) { + const promises = []; + for (let i = 0; i < numCalls; i++) { + promises.push(result.next()); + } + return Promise.all(promises); + } + return result; +} + describe('Execute: stream directive', () => { it('Can stream a list field', async () => { const document = parse('{ scalarList @stream(initialCount: 1) }'); @@ -446,22 +461,8 @@ describe('Execute: stream directive', () => { } } `); - const schema = new GraphQLSchema({ query }); - - const result = await execute({ schema, document, rootValue: {} }); - - const results = []; - if (isAsyncIterable(result)) { - const asyncResults = await Promise.all([ - result.next(), - result.next(), - result.next(), - result.next(), - ]); - results.push(...asyncResults); - } - - expect(results).to.deep.equal([ + const result = await completeAsync(document, 4); + expect(result).to.deep.equal([ { done: false, value: { From 1eaed52eaa413f079127371dd8f73b3366da9f1e Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Sun, 21 Mar 2021 15:57:30 +0200 Subject: [PATCH 3/5] use invariant --- src/execution/__tests__/stream-test.ts | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index c0d5a7f3d6..bab10697b9 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -138,14 +138,13 @@ async function completeAsync(document, numCalls) { const result = await execute({ schema, document, rootValue: {} }); - if (isAsyncIterable(result)) { - const promises = []; - for (let i = 0; i < numCalls; i++) { - promises.push(result.next()); - } - return Promise.all(promises); + invariant(isAsyncIterable(result)); + + const promises = []; + for (let i = 0; i < numCalls; i++) { + promises.push(result.next()); } - return result; + return Promise.all(promises); } describe('Execute: stream directive', () => { From f1bb199bf5c64cb38b8e7b7f93ceac3c30a4f32d Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Wed, 2 Jun 2021 17:20:54 +0300 Subject: [PATCH 4/5] disable eslint error --- src/execution/execute.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 1d93e74a94..61f256da55 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -1787,6 +1787,7 @@ export class Dispatcher { return new Promise((resolve) => { let resolved = false; this._subsequentPayloads.forEach((promise) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises promise.then((payload) => { if (resolved) { return; From c3de71a6b4653467002d590d3032445746f6acdf Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Wed, 2 Jun 2021 17:27:20 +0300 Subject: [PATCH 5/5] fix --- src/execution/__tests__/stream-test.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index bab10697b9..38d194c336 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -133,16 +133,18 @@ async function complete(document: DocumentNode) { return result; } -async function completeAsync(document, numCalls) { +async function completeAsync(document: DocumentNode, numCalls: number) { const schema = new GraphQLSchema({ query }); const result = await execute({ schema, document, rootValue: {} }); invariant(isAsyncIterable(result)); + const iterator = result[Symbol.asyncIterator](); + const promises = []; for (let i = 0; i < numCalls; i++) { - promises.push(result.next()); + promises.push(iterator.next()); } return Promise.all(promises); }