Skip to content

Commit a87be5d

Browse files
committed
refactor: extract incremental graph to separate file
Replicates graphql/graphql-js@cb43c83
1 parent 8d6fd0b commit a87be5d

File tree

3 files changed

+270
-175
lines changed

3 files changed

+270
-175
lines changed
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
"""Incremental Graphs."""
2+
3+
from __future__ import annotations
4+
5+
from asyncio import Event, Task, ensure_future
6+
from typing import (
7+
TYPE_CHECKING,
8+
Any,
9+
Awaitable,
10+
Iterator,
11+
Sequence,
12+
cast,
13+
)
14+
15+
from graphql.execution.types import (
16+
is_deferred_fragment_record,
17+
is_deferred_grouped_field_set_record,
18+
)
19+
20+
from ..pyutils.is_awaitable import is_awaitable
21+
22+
if TYPE_CHECKING:
23+
from graphql.execution.types import (
24+
DeferredFragmentRecord,
25+
DeferredGroupedFieldSetResult,
26+
IncrementalDataRecord,
27+
IncrementalDataRecordResult,
28+
ReconcilableDeferredGroupedFieldSetResult,
29+
StreamItemsRecord,
30+
StreamItemsResult,
31+
SubsequentResultRecord,
32+
)
33+
34+
__all__ = ["IncrementalGraph"]
35+
36+
37+
class IncrementalGraph:
38+
"""Helper class to execute incremental Graphs.
39+
40+
For internal use only.
41+
"""
42+
43+
_pending: dict[SubsequentResultRecord, None]
44+
_new_pending: dict[SubsequentResultRecord, None]
45+
_completed_result_queue: list[IncrementalDataRecordResult]
46+
47+
_resolve: Event | None
48+
_tasks: set[Task[Any]]
49+
50+
def __init__(self) -> None:
51+
"""Initialize the IncrementalGraph."""
52+
self._pending = {}
53+
self._new_pending = {}
54+
self._completed_result_queue = []
55+
self._resolve = None # lazy initialization
56+
self._tasks = set()
57+
58+
def add_incremental_data_records(
59+
self, incremental_data_records: Sequence[IncrementalDataRecord]
60+
) -> None:
61+
"""Add incremental data records."""
62+
for incremental_data_record in incremental_data_records:
63+
if is_deferred_grouped_field_set_record(incremental_data_record):
64+
for (
65+
deferred_fragment_record
66+
) in incremental_data_record.deferred_fragment_records:
67+
deferred_fragment_record.expected_reconcilable_results += 1
68+
self._add_deferred_fragment_record(deferred_fragment_record)
69+
70+
deferred_result = incremental_data_record.result
71+
if is_awaitable(deferred_result):
72+
73+
async def enqueue_deferred(
74+
deferred_result: Awaitable[DeferredGroupedFieldSetResult],
75+
) -> None:
76+
self._enqueue_completed_deferred_grouped_field_set(
77+
await deferred_result
78+
)
79+
80+
self._add_task(enqueue_deferred(deferred_result))
81+
else:
82+
self._enqueue_completed_deferred_grouped_field_set(
83+
deferred_result, # type: ignore
84+
)
85+
continue
86+
87+
incremental_data_record = cast("StreamItemsRecord", incremental_data_record)
88+
stream_record = incremental_data_record.stream_record
89+
if stream_record.id is None:
90+
self._new_pending[stream_record] = None
91+
92+
stream_result = incremental_data_record.result
93+
if is_awaitable(stream_result):
94+
95+
async def enqueue_stream(
96+
stream_result: Awaitable[StreamItemsResult],
97+
) -> None:
98+
self._enqueue_completed_stream_items(await stream_result)
99+
100+
self._add_task(enqueue_stream(stream_result))
101+
else:
102+
self._enqueue_completed_stream_items(stream_result) # type: ignore
103+
104+
def get_new_pending(self) -> list[SubsequentResultRecord]:
105+
"""Get new pending subsequent result records."""
106+
maybe_empty_new_pending = self._new_pending
107+
pending = self._pending
108+
add_non_empty_new_pending = self._add_non_empty_new_pending
109+
new_pending: list[SubsequentResultRecord] = []
110+
append_new_pending = new_pending.append
111+
for node in maybe_empty_new_pending:
112+
if is_deferred_fragment_record(node):
113+
if node.expected_reconcilable_results:
114+
pending[node] = None
115+
append_new_pending(node)
116+
continue
117+
for child in node.children:
118+
add_non_empty_new_pending(child, new_pending)
119+
else:
120+
pending[node] = None
121+
append_new_pending(node)
122+
self._new_pending.clear()
123+
return new_pending
124+
125+
def completed_results(self) -> Iterator[IncrementalDataRecordResult]:
126+
"""Yield completed incremental data record results."""
127+
queue = self._completed_result_queue
128+
while queue:
129+
completed_result = queue.pop(0)
130+
yield completed_result
131+
132+
def has_next(self) -> bool:
133+
"""Check if there are more results to process."""
134+
return bool(self._pending)
135+
136+
def complete_deferred_fragment(
137+
self,
138+
deferred_fragment_record: DeferredFragmentRecord,
139+
) -> list[ReconcilableDeferredGroupedFieldSetResult] | None:
140+
"""Complete a deferred fragment."""
141+
reconcilable_results = deferred_fragment_record.reconcilable_results
142+
if deferred_fragment_record.expected_reconcilable_results != len(
143+
reconcilable_results
144+
):
145+
return None
146+
del self._pending[deferred_fragment_record]
147+
new_pending = self._new_pending
148+
extend = self._completed_result_queue.extend
149+
for child in deferred_fragment_record.children:
150+
new_pending[child] = None
151+
extend(child.results)
152+
return reconcilable_results
153+
154+
def remove_subsequent_result_record(
155+
self,
156+
subsequent_result_record: SubsequentResultRecord,
157+
) -> None:
158+
"""Remove a subsequent result record as no longer pending."""
159+
del self._pending[subsequent_result_record]
160+
161+
def _add_deferred_fragment_record(
162+
self, deferred_fragment_record: DeferredFragmentRecord
163+
) -> None:
164+
"""Add deferred fragment record."""
165+
parent = deferred_fragment_record.parent
166+
if parent is None:
167+
if deferred_fragment_record.id is not None:
168+
return
169+
self._new_pending[deferred_fragment_record] = None
170+
return
171+
if deferred_fragment_record in parent.children:
172+
return
173+
parent.children[deferred_fragment_record] = None
174+
self._add_deferred_fragment_record(parent)
175+
176+
def _add_non_empty_new_pending(
177+
self,
178+
deferred_fragment_record: DeferredFragmentRecord,
179+
new_pending: list[SubsequentResultRecord],
180+
) -> None:
181+
"""Add non-empty new pending deferred fragment record."""
182+
if deferred_fragment_record.expected_reconcilable_results:
183+
self._pending[deferred_fragment_record] = None
184+
new_pending.append(deferred_fragment_record)
185+
return
186+
add = self._add_non_empty_new_pending # pragma: no cover
187+
for child in deferred_fragment_record.children: # pragma: no cover
188+
add(child, new_pending)
189+
190+
def _enqueue_completed_deferred_grouped_field_set(
191+
self, result: DeferredGroupedFieldSetResult
192+
) -> None:
193+
"""Enqueue completed deferred grouped field set result."""
194+
has_pending_parent = False
195+
for deferred_fragment_record in result.deferred_fragment_records:
196+
if deferred_fragment_record.id is not None:
197+
has_pending_parent = True
198+
deferred_fragment_record.results.append(result)
199+
append = self._completed_result_queue.append
200+
if has_pending_parent:
201+
append(result)
202+
self._trigger()
203+
204+
def _enqueue_completed_stream_items(self, result: StreamItemsResult) -> None:
205+
"""Enqueue completed stream items result."""
206+
self._completed_result_queue.append(result)
207+
self._trigger()
208+
209+
def _trigger(self) -> None:
210+
"""Trigger the resolve event."""
211+
resolve = self._resolve
212+
if resolve is not None:
213+
resolve.set()
214+
self._resolve = Event()
215+
216+
async def new_completed_result_available(self) -> None:
217+
"""Get an awaitable that resolves when a new completed result is available."""
218+
resolve = self._resolve
219+
if resolve is None:
220+
self._resolve = resolve = Event()
221+
await resolve.wait()
222+
223+
def _add_task(self, awaitable: Awaitable[Any]) -> None:
224+
"""Add the given task to the tasks set for later execution."""
225+
tasks = self._tasks
226+
task = ensure_future(awaitable)
227+
tasks.add(task)
228+
task.add_done_callback(tasks.discard)

0 commit comments

Comments
 (0)