Skip to content

Commit

Permalink
do not initiate multiple streams at the same path
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Jan 17, 2023
1 parent f16840b commit 1440ac2
Showing 1 changed file with 31 additions and 21 deletions.
52 changes: 31 additions & 21 deletions spec/Section 6 -- Execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,21 +134,22 @@ ExecuteQuery(query, schema, variableValues, initialValue):

- Let {subsequentPayloads} be an empty list.
- Initialize {branches} to the empty set.
- Initialize {streams} to the empty set.
- Let {queryType} be the root Query type in {schema}.
- Assert: {queryType} is an Object type.
- Let {selectionSet} be the top level Selection Set in {query}.
- Let {groupedFieldSet} and {newDeferDepth} be the result of
{CollectRootFields(queryType, selectionSet, variableValues)}.
- Let {data} be the result of running {ExecuteGroupedFieldSet(groupedFieldSet,
queryType, initialValue, variableValues, subsequentPayloads, branches)}
_normally_ (allowing parallelization).
queryType, initialValue, variableValues, subsequentPayloads, branches,
streams)} _normally_ (allowing parallelization).
- Let {errors} be the list of all _field error_ raised while executing the
selection set.
- If {newDeferDepth} is defined and {ShouldBranch(branches, groupedFieldSet,
path)} is {true}:
- Call {ExecuteDeferredFragment(objectType, objectValue, groupedFieldSet,
path, newDeferDepth, variableValues, asyncRecord, subsequentPayloads,
branches)}
branches, streams)}
- If {subsequentPayloads} is empty:
- Return an unordered map containing {data} and {errors}.
- If {subsequentPayloads} is not empty:
Expand Down Expand Up @@ -177,21 +178,22 @@ ExecuteMutation(mutation, schema, variableValues, initialValue):

- Let {subsequentPayloads} be an empty list.
- Initialize {branches} to the empty set.
- Initialize {streams} to the empty set.
- Let {mutationType} be the root Mutation type in {schema}.
- Assert: {mutationType} is an Object type.
- Let {selectionSet} be the top level Selection Set in {mutation}.
- Let {groupedFieldSet} and {newDeferDepth} be the result of
{CollectRootFields(queryType, selectionSet, variableValues)}.
- Let {data} be the result of running {ExecuteGroupedFieldSet(groupedFieldSet,
mutationType, initialValue, variableValues, subsequentPayloads, branches)}
_serially_.
mutationType, initialValue, variableValues, subsequentPayloads, branches,
streams)} _serially_.
- Let {errors} be the list of all _field error_ raised while executing the
selection set.
- If {newDeferDepth} is defined and {ShouldBranch(branches, groupedFieldSet,
path)} is {true}:
- Call {ExecuteDeferredFragment(objectType, objectValue,
deferredGroupFieldSet, path, newDeferDepth, variableValues, asyncRecord,
subsequentPayloads, branches)}
subsequentPayloads, branches, streams)}
- If {subsequentPayloads} is empty:
- Return an unordered map containing {data} and {errors}.
- If {subsequentPayloads} is not empty:
Expand Down Expand Up @@ -429,7 +431,7 @@ Each represented field in the grouped field set produces an entry into a
response map.

ExecuteGroupedFieldSet(groupedFieldSet, objectType, objectValue, variableValues,
path, subsequentPayloads, branches, asyncRecord):
path, subsequentPayloads, branches, streams, asyncRecord):

- If {path} is not provided, initialize it to an empty list, unique for this
execution.
Expand All @@ -445,7 +447,7 @@ path, subsequentPayloads, branches, asyncRecord):
- If {fieldType} is defined:
- If {ShouldExecute(fieldGroup, asyncRecord)} is {true}:
- Let {responseValue} be {ExecuteField(objectType, objectValue, fieldType,
fieldGroup, variableValues, path, subsequentPayloads, branches,
fieldGroup, variableValues, path, subsequentPayloads, branches, streams,
asyncRecord)}.
- Set {responseValue} as the value for {responseKey} in {resultMap}.
- Return {resultMap}.
Expand Down Expand Up @@ -807,7 +809,8 @@ All Async Payload Records are structures containing:
#### Execute Deferred Fragment

ExecuteDeferredFragment(objectType, objectValue, groupedFieldSet, path,
deferDepth, variableValues, parentRecord, subsequentPayloads, branches):
deferDepth, variableValues, parentRecord, subsequentPayloads, branches,
streams):

- Let {deferRecord} be an async payload record created from {path} and
{deferDepth}.
Expand All @@ -825,7 +828,7 @@ deferDepth, variableValues, parentRecord, subsequentPayloads, branches):
- If {fieldType} is defined:
- Let {responseValue} be {ExecuteField(objectType, objectValue, fieldType,
fieldGroup, variableValues, path, subsequentPayloads, asyncRecord,
branches)}.
branches, streams)}.
- Set {responseValue} as the value for {responseKey} in {resultMap}.
- Append any encountered field errors to {errors}.
- If {parentRecord} is defined:
Expand Down Expand Up @@ -890,7 +893,7 @@ finally completes that value either by recursively executing another selection
set or coercing a scalar value.

ExecuteField(objectType, objectValue, fieldType, fieldGroup, variableValues,
path, subsequentPayloads, branches, asyncRecord):
path, subsequentPayloads, branches, streams, asyncRecord):

- Let {taggedField} be the value of the first entry in {fieldGroup}.
- Let {field} be the corresponding entry within {taggedField}.
Expand All @@ -903,7 +906,7 @@ path, subsequentPayloads, branches, asyncRecord):
argumentValues)}.
- Let {result} be the result of calling {CompleteValue(fieldType, fieldGroup,
resolvedValue, variableValues, fieldPath, subsequentPayloads, branches,
asyncRecord)}.
streams, asyncRecord)}.
- Return {result}.

### Coercing Field Arguments
Expand Down Expand Up @@ -1007,7 +1010,7 @@ yielded items satisfies `initialCount` specified on the `@stream` directive.
#### Execute Stream Field

ExecuteStreamField(iterator, index, fieldGroup, innerType, path, deferDepth,
parentRecord, variableValues, subsequentPayloads, branches):
parentRecord, variableValues, subsequentPayloads, branches, streams):

- If {parentRecord} is defined:
- Let {deferDepth} be equal to the corresponding entry on {asyncRecord}.
Expand All @@ -1028,12 +1031,12 @@ parentRecord, variableValues, subsequentPayloads, branches):
- Otherwise:
- Let {item} be the item retrieved from {iterator}.
- Let {data} be the result of calling {CompleteValue(innerType, fieldGroup,
item, variableValues, itemPath, subsequentPayloads, branches,
item, variableValues, itemPath, subsequentPayloads, branches, streams,
parentRecord)}.
- Append any encountered field errors to {errors}.
- Increment {index}.
- Call {ExecuteStreamField(iterator, index, fieldGroup, innerType, path,
streamRecord, variableValues, subsequentPayloads, branches)}.
streamRecord, variableValues, subsequentPayloads, branches, streams)}.
- If a field error was raised, causing a {null} to be propagated to {data},
and {innerType} is a Non-Nullable type:
- Add an entry to {payload} named `items` with the value {null}.
Expand All @@ -1050,7 +1053,7 @@ parentRecord, variableValues, subsequentPayloads, branches):
- Append {streamRecord} to {subsequentPayloads}.

CompleteValue(fieldType, fieldGroup, result, variableValues, path,
subsequentPayloads, asyncRecord, branches):
subsequentPayloads, asyncRecord, branches, streams):

- If the {fieldType} is a Non-Null type:
- Let {innerType} be the inner type of {fieldType}.
Expand Down Expand Up @@ -1080,8 +1083,9 @@ subsequentPayloads, asyncRecord, branches):
- While {result} is not closed:
- If {streamDirective} is defined and {index} is greater than or equal to
{initialCount}:
- Call {ExecuteStreamField(iterator, index, fieldGroup, innerType, path,
asyncRecord, subsequentPayloads, branches)}.
- If {ShouldStream(streams, fieldGroup)}:
- Call {ExecuteStreamField(iterator, index, fieldGroup, innerType, path,
asyncRecord, subsequentPayloads, branches, streams)}.
- Return {items}.
- Otherwise:
- Wait for the next item from {result} via the {iterator}.
Expand All @@ -1091,7 +1095,7 @@ subsequentPayloads, asyncRecord, branches):
{index} appended.
- Let {resolvedItem} be the result of calling {CompleteValue(innerType,
fieldGroup, resultItem, variableValues, itemPath, subsequentPayloads,
branches, asyncRecord)}.
branches, streams, asyncRecord)}.
- Append {resolvedItem} to {items}.
- Increment {index}.
- Return {items}.
Expand All @@ -1107,12 +1111,12 @@ subsequentPayloads, asyncRecord, branches):
- Let {resultMap} be the result of evaluating
{ExecuteGroupedFieldSet(groupedSubfieldSet, deferredGroupedSubfieldsList
objectType, result, variableValues, path, subsequentPayloads, branches,
asyncRecord)} _normally_ (allowing for parallelization).
streams, asyncRecord)} _normally_ (allowing for parallelization).
- If {newDeferDepth} is defined and {ShouldBranch(branches, groupedFieldSet,
path)} is {true}:
- Call {ExecuteDeferredFragment(objectType, objectValue, groupedFieldSet,
path, newDeferDepth, variableValues, asyncRecord, subsequentPayloads,
branches)},
branches, streams)},
- Return {resultMap}.

ShouldBranch(branches, path):
Expand All @@ -1121,6 +1125,12 @@ ShouldBranch(branches, path):
- Add {path} to {branches}.
- Return {true}.

ShouldStream(streams, path):

- If {streams} contains {path}, return {false}.
- Add {path} to {streams}.
- Return {true}.

ShouldExecute(fieldGroup, asyncPayloadRecord):

- Let {hasDepth} equal {false}.
Expand Down

0 comments on commit 1440ac2

Please sign in to comment.