Skip to content

Commit 2d00feb

Browse files
committed
Fix cycle error
1 parent 21f7424 commit 2d00feb

File tree

2 files changed

+74
-15
lines changed

2 files changed

+74
-15
lines changed

src/function/fetch.rs

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -179,24 +179,53 @@ where
179179
};
180180

181181
// Now that we've claimed the item, check again to see if there's a "hot" value.
182-
let opt_old_memo = self.get_memo_from_table_for(zalsa, id, memo_ingredient_index);
182+
let mut opt_old_memo = self.get_memo_from_table_for(zalsa, id, memo_ingredient_index);
183+
184+
// If this is a provisional memo from the same revision. Await all cycle heads because they could be
185+
// running on a different thread.
186+
if let Some(mut old_memo) = opt_old_memo {
187+
if old_memo.value.is_some() {
188+
if old_memo.may_be_provisional()
189+
&& old_memo.verified_at.load() == zalsa.current_revision()
190+
{
191+
opt_old_memo = loop {
192+
old_memo.await_heads(zalsa, self.database_key_index(id));
193+
194+
let new_old =
195+
self.get_memo_from_table_for(zalsa, id, memo_ingredient_index);
196+
match new_old {
197+
None => unreachable!("Expected memo to be present"),
198+
// If the new memo is the same as the old, then this means that this is still the "latest" memo for this
199+
Some(new_old) if std::ptr::eq(new_old, old_memo) => {
200+
break Some(new_old);
201+
}
202+
Some(new_old) => {
203+
tracing::debug!("Memo has been updated while waiting for cycle heads");
204+
old_memo = new_old;
205+
}
206+
}
207+
};
208+
}
209+
}
210+
}
211+
183212
if let Some(old_memo) = opt_old_memo {
184213
if old_memo.value.is_some() {
185-
let mut cycle_heads = CycleHeads::default();
186-
if let VerifyResult::Unchanged(_) = self.deep_verify_memo(
187-
db,
188-
zalsa,
189-
old_memo,
190-
self.database_key_index(id),
191-
&mut cycle_heads,
192-
) {
193-
if cycle_heads.is_empty() {
194-
// SAFETY: memo is present in memo_map and we have verified that it is
195-
// still valid for the current revision.
196-
return unsafe { Some(self.extend_memo_lifetime(old_memo)) };
197-
}
214+
let mut cycle_heads = CycleHeads::default();
215+
if let VerifyResult::Unchanged(_) = self.deep_verify_memo(
216+
db,
217+
zalsa,
218+
old_memo,
219+
self.database_key_index(id),
220+
&mut cycle_heads,
221+
) {
222+
if cycle_heads.is_empty() {
223+
// SAFETY: memo is present in memo_map and we have verified that it is
224+
// still valid for the current revision.
225+
return unsafe { Some(self.extend_memo_lifetime(old_memo)) };
198226
}
199227
}
228+
}
200229
}
201230

202231
let memo = self.execute(

src/function/memo.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,6 @@ impl<V> Memo<V> {
155155
for head in cycle_heads {
156156
let head_index = head.database_key_index;
157157

158-
159158
let ingredient = zalsa.lookup_ingredient(head_index.ingredient_index());
160159
let cycle_head_kind = ingredient.cycle_head_kind(zalsa, head_index.key_index());
161160
if matches!(
@@ -200,6 +199,37 @@ impl<V> Memo<V> {
200199
}
201200
}
202201

202+
#[inline(always)]
203+
pub(super) fn await_heads(&self, zalsa: &Zalsa, database_key_index: DatabaseKeyIndex) {
204+
for head in &self.revisions.cycle_heads {
205+
let head_index = head.database_key_index;
206+
207+
if database_key_index == head_index {
208+
continue;
209+
}
210+
211+
let ingredient = zalsa.lookup_ingredient(head_index.ingredient_index());
212+
let cycle_head_kind = ingredient.cycle_head_kind(zalsa, head_index.key_index());
213+
214+
if matches!(
215+
cycle_head_kind,
216+
CycleHeadKind::NotProvisional | CycleHeadKind::FallbackImmediate
217+
) {
218+
// This cycle is already finalized, so we don't need to wait on it;
219+
// keep looping through cycle heads.
220+
tracing::trace!("Dependent cycle head {head_index:?} has been finalized.");
221+
} else if ingredient.wait_for(zalsa, head_index.key_index()) {
222+
tracing::trace!(
223+
"Dependent cycle head {head_index:?} has been released"
224+
);
225+
} else {
226+
// We hit a cycle blocking on the cycle head; this means it's in
227+
// our own active query stack and we are responsible to resolve the
228+
// cycle
229+
}
230+
}
231+
}
232+
203233
/// Cycle heads that should be propagated to dependent queries.
204234
#[inline(always)]
205235
pub(super) fn cycle_heads(&self) -> &CycleHeads {

0 commit comments

Comments
 (0)