Skip to content

Commit 1de0dbb

Browse files
committed
fix(race): concurrent next calls with defer/stream (#2975)
* fix(race): concurrent next calls * refactor test * use invariant * disable eslint error * fix
1 parent 5d6142d commit 1de0dbb

File tree

2 files changed

+81
-0
lines changed

2 files changed

+81
-0
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,22 @@ async function complete(document: DocumentNode, rootValue: unknown = {}) {
244244
return result;
245245
}
246246

247+
async function completeAsync(document: DocumentNode, numCalls: number) {
248+
const schema = new GraphQLSchema({ query });
249+
250+
const result = await execute({ schema, document, rootValue: {} });
251+
252+
assert(isAsyncIterable(result));
253+
254+
const iterator = result[Symbol.asyncIterator]();
255+
256+
const promises = [];
257+
for (let i = 0; i < numCalls; i++) {
258+
promises.push(iterator.next());
259+
}
260+
return Promise.all(promises);
261+
}
262+
247263
describe('Execute: stream directive', () => {
248264
it('Can stream a list field', async () => {
249265
const document = parse('{ scalarList @stream(initialCount: 1) }');
@@ -684,6 +700,60 @@ describe('Execute: stream directive', () => {
684700
},
685701
});
686702
});
703+
it('Can handle concurrent calls to .next() without waiting', async () => {
704+
const document = parse(`
705+
query {
706+
asyncIterableList @stream(initialCount: 2) {
707+
name
708+
id
709+
}
710+
}
711+
`);
712+
const result = await completeAsync(document, 4);
713+
expectJSON(result).toDeepEqual([
714+
{
715+
done: false,
716+
value: {
717+
data: {
718+
asyncIterableList: [
719+
{
720+
name: 'Luke',
721+
id: '1',
722+
},
723+
{
724+
name: 'Han',
725+
id: '2',
726+
},
727+
],
728+
},
729+
hasNext: true,
730+
},
731+
},
732+
{
733+
done: false,
734+
value: {
735+
data: [
736+
{
737+
name: 'Leia',
738+
id: '3',
739+
},
740+
],
741+
path: ['asyncIterableList', 2],
742+
hasNext: true,
743+
},
744+
},
745+
{
746+
done: false,
747+
value: {
748+
hasNext: false,
749+
},
750+
},
751+
{
752+
done: true,
753+
value: undefined,
754+
},
755+
]);
756+
});
687757
it('Handles error thrown in async iterable before initialCount is reached', async () => {
688758
const document = parse(`
689759
query {

src/execution/execute.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1642,7 +1642,18 @@ function yieldSubsequentPayloads(
16421642

16431643
const data = await asyncPayloadRecord.data;
16441644

1645+
if (exeContext.subsequentPayloads.length === 0) {
1646+
// a different call to next has exhausted all payloads
1647+
return { value: undefined, done: true };
1648+
}
1649+
16451650
const index = exeContext.subsequentPayloads.indexOf(asyncPayloadRecord);
1651+
1652+
if (index === -1) {
1653+
// a different call to next has consumed this payload
1654+
return race();
1655+
}
1656+
16461657
exeContext.subsequentPayloads.splice(index, 1);
16471658

16481659
if (asyncPayloadRecord.isCompletedIterator) {

0 commit comments

Comments
 (0)