Skip to content

Commit 19be33e

Browse files
committed
fix(race): concurrent next calls with defer/stream (#2975)
* fix(race): concurrent next calls * refactor test * use invariant * disable eslint error * fix
1 parent 05f1903 commit 19be33e

File tree

2 files changed

+81
-0
lines changed

2 files changed

+81
-0
lines changed

src/execution/__tests__/stream-test.ts

+70
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,22 @@ async function complete(document: DocumentNode, rootValue: unknown = {}) {
244244
return result;
245245
}
246246

247+
async function completeAsync(document: DocumentNode, numCalls: number) {
248+
const schema = new GraphQLSchema({ query });
249+
250+
const result = await execute({ schema, document, rootValue: {} });
251+
252+
assert(isAsyncIterable(result));
253+
254+
const iterator = result[Symbol.asyncIterator]();
255+
256+
const promises = [];
257+
for (let i = 0; i < numCalls; i++) {
258+
promises.push(iterator.next());
259+
}
260+
return Promise.all(promises);
261+
}
262+
247263
describe('Execute: stream directive', () => {
248264
it('Can stream a list field', async () => {
249265
const document = parse('{ scalarList @stream(initialCount: 1) }');
@@ -684,6 +700,60 @@ describe('Execute: stream directive', () => {
684700
},
685701
});
686702
});
703+
it('Can handle concurrent calls to .next() without waiting', async () => {
704+
const document = parse(`
705+
query {
706+
asyncIterableList @stream(initialCount: 2) {
707+
name
708+
id
709+
}
710+
}
711+
`);
712+
const result = await completeAsync(document, 4);
713+
expectJSON(result).toDeepEqual([
714+
{
715+
done: false,
716+
value: {
717+
data: {
718+
asyncIterableList: [
719+
{
720+
name: 'Luke',
721+
id: '1',
722+
},
723+
{
724+
name: 'Han',
725+
id: '2',
726+
},
727+
],
728+
},
729+
hasNext: true,
730+
},
731+
},
732+
{
733+
done: false,
734+
value: {
735+
data: [
736+
{
737+
name: 'Leia',
738+
id: '3',
739+
},
740+
],
741+
path: ['asyncIterableList', 2],
742+
hasNext: true,
743+
},
744+
},
745+
{
746+
done: false,
747+
value: {
748+
hasNext: false,
749+
},
750+
},
751+
{
752+
done: true,
753+
value: undefined,
754+
},
755+
]);
756+
});
687757
it('Handles error thrown in async iterable before initialCount is reached', async () => {
688758
const document = parse(`
689759
query {

src/execution/execute.ts

+11
Original file line numberDiff line numberDiff line change
@@ -1818,7 +1818,18 @@ function yieldSubsequentPayloads(
18181818

18191819
const data = await asyncPayloadRecord.data;
18201820

1821+
if (exeContext.subsequentPayloads.length === 0) {
1822+
// a different call to next has exhausted all payloads
1823+
return { value: undefined, done: true };
1824+
}
1825+
18211826
const index = exeContext.subsequentPayloads.indexOf(asyncPayloadRecord);
1827+
1828+
if (index === -1) {
1829+
// a different call to next has consumed this payload
1830+
return race();
1831+
}
1832+
18221833
exeContext.subsequentPayloads.splice(index, 1);
18231834

18241835
if (asyncPayloadRecord.isCompletedIterator) {

0 commit comments

Comments
 (0)