Skip to content

Commit 562d787

Browse files
committed
fix(race): concurrent next calls with defer/stream (#2975)
* fix(race): concurrent next calls * refactor test * use invariant * disable eslint error * fix
1 parent 5a794b8 commit 562d787

File tree

2 files changed

+79
-0
lines changed

2 files changed

+79
-0
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,22 @@ async function complete(document: DocumentNode, rootValue: unknown = {}) {
193193
return result;
194194
}
195195

196+
async function completeAsync(document: DocumentNode, numCalls: number) {
197+
const schema = new GraphQLSchema({ query });
198+
199+
const result = await execute({ schema, document, rootValue: {} });
200+
201+
invariant(isAsyncIterable(result));
202+
203+
const iterator = result[Symbol.asyncIterator]();
204+
205+
const promises = [];
206+
for (let i = 0; i < numCalls; i++) {
207+
promises.push(iterator.next());
208+
}
209+
return Promise.all(promises);
210+
}
211+
196212
describe('Execute: stream directive', () => {
197213
it('Can stream a list field', async () => {
198214
const document = parse('{ scalarList @stream(initialCount: 1) }');
@@ -613,6 +629,58 @@ describe('Execute: stream directive', () => {
613629
},
614630
});
615631
});
632+
it('Can handle concurrent calls to .next() without waiting', async () => {
633+
const document = parse(`
634+
query {
635+
asyncIterableList @stream(initialCount: 2) {
636+
name
637+
id
638+
}
639+
}
640+
`);
641+
const result = await completeAsync(document, 4);
642+
expectJSON(result).toDeepEqual([
643+
{
644+
done: false,
645+
value: {
646+
data: {
647+
asyncIterableList: [
648+
{
649+
name: 'Luke',
650+
id: '1',
651+
},
652+
{
653+
name: 'Han',
654+
id: '2',
655+
},
656+
],
657+
},
658+
hasNext: true,
659+
},
660+
},
661+
{
662+
done: false,
663+
value: {
664+
data: {
665+
name: 'Leia',
666+
id: '3',
667+
},
668+
path: ['asyncIterableList', 2],
669+
hasNext: true,
670+
},
671+
},
672+
{
673+
done: false,
674+
value: {
675+
hasNext: false,
676+
},
677+
},
678+
{
679+
done: true,
680+
value: undefined,
681+
},
682+
]);
683+
});
616684
it('Handles error thrown in async iterable before initialCount is reached', async () => {
617685
const document = parse(`
618686
query {

src/execution/execute.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1596,7 +1596,18 @@ function yieldSubsequentPayloads(
15961596

15971597
const data = await asyncPayloadRecord.data;
15981598

1599+
if (exeContext.subsequentPayloads.length === 0) {
1600+
// a different call to next has exhausted all payloads
1601+
return { value: undefined, done: true };
1602+
}
1603+
15991604
const index = exeContext.subsequentPayloads.indexOf(asyncPayloadRecord);
1605+
1606+
if (index === -1) {
1607+
// a different call to next has consumed this payload
1608+
return race();
1609+
}
1610+
16001611
exeContext.subsequentPayloads.splice(index, 1);
16011612

16021613
if (asyncPayloadRecord.isCompletedIterator) {

0 commit comments

Comments
 (0)