Skip to content

Commit 336eaa1

Browse files
committed
Short circuit if entire query runs on single thread
1 parent 96231f9 commit 336eaa1

File tree

4 files changed

+48
-28
lines changed

4 files changed

+48
-28
lines changed

src/function/accumulated.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,9 @@ where
9595
db: &'db C::DbView,
9696
key: Id,
9797
) -> (Option<&'db AccumulatedMap>, InputAccumulatedValues) {
98+
let (zalsa, zalsa_local) = db.zalsas();
9899
// NEXT STEP: stash and refactor `fetch` to return an `&Memo` so we can make this work
99-
let memo = self.refresh_memo(db, db.zalsa(), key);
100+
let memo = self.refresh_memo(db, zalsa, zalsa_local, key);
100101
(
101102
memo.revisions.accumulated.as_deref(),
102103
memo.revisions.accumulated_inputs.load(),

src/function/fetch.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::function::memo::Memo;
33
use crate::function::sync::ClaimResult;
44
use crate::function::{Configuration, IngredientImpl, VerifyResult};
55
use crate::zalsa::{MemoIngredientIndex, Zalsa, ZalsaDatabase};
6-
use crate::zalsa_local::QueryRevisions;
6+
use crate::zalsa_local::{QueryRevisions, ZalsaLocal};
77
use crate::Id;
88

99
impl<C> IngredientImpl<C>
@@ -18,7 +18,7 @@ where
1818
#[cfg(debug_assertions)]
1919
let _span = tracing::debug_span!("fetch", query = ?database_key_index).entered();
2020

21-
let memo = self.refresh_memo(db, zalsa, id);
21+
let memo = self.refresh_memo(db, zalsa, zalsa_local, id);
2222
// SAFETY: We just refreshed the memo so it is guaranteed to contain a value now.
2323
let memo_value = unsafe { memo.value.as_ref().unwrap_unchecked() };
2424

@@ -41,13 +41,16 @@ where
4141
&'db self,
4242
db: &'db C::DbView,
4343
zalsa: &'db Zalsa,
44+
zalsa_local: &'db ZalsaLocal,
4445
id: Id,
4546
) -> &'db Memo<C::Output<'db>> {
4647
let memo_ingredient_index = self.memo_ingredient_index(zalsa, id);
4748
loop {
4849
if let Some(memo) = self
4950
.fetch_hot(zalsa, id, memo_ingredient_index)
50-
.or_else(|| self.fetch_cold_with_retry(zalsa, db, id, memo_ingredient_index))
51+
.or_else(|| {
52+
self.fetch_cold_with_retry(zalsa, zalsa_local, db, id, memo_ingredient_index)
53+
})
5154
{
5255
return memo;
5356
}
@@ -84,11 +87,12 @@ where
8487
fn fetch_cold_with_retry<'db>(
8588
&'db self,
8689
zalsa: &'db Zalsa,
90+
zalsa_local: &'db ZalsaLocal,
8791
db: &'db C::DbView,
8892
id: Id,
8993
memo_ingredient_index: MemoIngredientIndex,
9094
) -> Option<&'db Memo<C::Output<'db>>> {
91-
let memo = self.fetch_cold(zalsa, db, id, memo_ingredient_index)?;
95+
let memo = self.fetch_cold(zalsa, zalsa_local, db, id, memo_ingredient_index)?;
9296

9397
// If we get back a provisional cycle memo, and it's provisional on any cycle heads
9498
// that are claimed by a different thread, we can't propagate the provisional memo
@@ -98,7 +102,7 @@ where
98102
// That is only correct for fixpoint cycles, though: `FallbackImmediate` cycles
99103
// never have provisional entries.
100104
if C::CYCLE_STRATEGY == CycleRecoveryStrategy::FallbackImmediate
101-
|| !memo.provisional_retry(zalsa, self.database_key_index(id))
105+
|| !memo.provisional_retry(zalsa, zalsa_local, self.database_key_index(id))
102106
{
103107
Some(memo)
104108
} else {
@@ -109,6 +113,7 @@ where
109113
fn fetch_cold<'db>(
110114
&'db self,
111115
zalsa: &'db Zalsa,
116+
zalsa_local: &'db ZalsaLocal,
112117
db: &'db C::DbView,
113118
id: Id,
114119
memo_ingredient_index: MemoIngredientIndex,
@@ -161,7 +166,7 @@ where
161166
tracing::debug!(
162167
"hit a `FallbackImmediate` cycle at {database_key_index:#?}"
163168
);
164-
let active_query = db.zalsa_local().push_query(database_key_index, 0);
169+
let active_query = zalsa_local.push_query(database_key_index, 0);
165170
let fallback_value = C::cycle_initial(db, C::id_to_input(db, id));
166171
let mut revisions = active_query.pop();
167172
revisions.cycle_heads = CycleHeads::initial(database_key_index);
@@ -202,7 +207,7 @@ where
202207
&& old_memo.may_be_provisional()
203208
&& old_memo.verified_at.load() == zalsa.current_revision()
204209
{
205-
old_memo.await_heads(zalsa);
210+
old_memo.await_heads(zalsa, zalsa_local);
206211

207212
// It's possible that one of the cycle heads replaced the memo for this ingredient
208213
// with fixpoint initial. We ignore that memo because we know it's only a temporary memo
@@ -227,7 +232,7 @@ where
227232

228233
let memo = self.execute(
229234
db,
230-
db.zalsa_local().push_query(database_key_index, 0),
235+
zalsa_local.push_query(database_key_index, 0),
231236
opt_old_memo,
232237
);
233238

src/function/maybe_changed_after.rs

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@ use crate::function::memo::Memo;
44
use crate::function::sync::ClaimResult;
55
use crate::function::{Configuration, IngredientImpl};
66
use crate::key::DatabaseKeyIndex;
7-
use crate::plumbing::ZalsaLocal;
87
use crate::sync::atomic::Ordering;
98
use crate::zalsa::{MemoIngredientIndex, Zalsa, ZalsaDatabase};
10-
use crate::zalsa_local::{QueryEdge, QueryOrigin};
9+
use crate::zalsa_local::{QueryEdge, QueryOrigin, ZalsaLocal};
1110
use crate::{AsDynDatabase as _, Id, Revision};
1211

1312
/// Result of memo validation.
@@ -303,20 +302,7 @@ where
303302
memo = memo.tracing_debug()
304303
);
305304

306-
let cycle_heads = &memo.revisions.cycle_heads;
307-
if cycle_heads.is_empty() {
308-
return true;
309-
}
310-
311-
zalsa_local.with_query_stack(|stack| {
312-
cycle_heads.iter().all(|cycle_head| {
313-
stack
314-
.iter()
315-
.rev()
316-
.find(|query| query.database_key_index == cycle_head.database_key_index)
317-
.is_some_and(|query| query.iteration_count() == cycle_head.iteration_count)
318-
})
319-
})
305+
memo.validate_same_iteration(zalsa_local)
320306
}
321307

322308
/// VerifyResult::Unchanged if the memo's value and `changed_at` time is up-to-date in the

src/function/memo.rs

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::revision::AtomicRevision;
1111
use crate::sync::atomic::Ordering;
1212
use crate::table::memo::MemoTableWithTypesMut;
1313
use crate::zalsa::{MemoIngredientIndex, Zalsa};
14-
use crate::zalsa_local::{QueryOrigin, QueryRevisions};
14+
use crate::zalsa_local::{QueryOrigin, QueryRevisions, ZalsaLocal};
1515
use crate::{Event, EventKind, Id, Revision};
1616

1717
impl<C: Configuration> IngredientImpl<C> {
@@ -133,6 +133,7 @@ impl<V> Memo<V> {
133133
pub(super) fn provisional_retry(
134134
&self,
135135
zalsa: &Zalsa,
136+
zalsa_local: &ZalsaLocal,
136137
database_key_index: DatabaseKeyIndex,
137138
) -> bool {
138139
if self.revisions.cycle_heads.is_empty() {
@@ -143,7 +144,7 @@ impl<V> Memo<V> {
143144
return false;
144145
};
145146

146-
if self.await_heads(zalsa) {
147+
if self.await_heads(zalsa, zalsa_local) {
147148
// If we get here, we are a provisional value of
148149
// the cycle head (either initial value, or from a later iteration) and should be
149150
// returned to caller to allow fixpoint iteration to proceed.
@@ -162,7 +163,14 @@ impl<V> Memo<V> {
162163
///
163164
/// Returns `true` if awaiting the cycle heads resulted in a cycle.
164165
#[inline(never)]
165-
pub(super) fn await_heads(&self, zalsa: &Zalsa) -> bool {
166+
pub(super) fn await_heads(&self, zalsa: &Zalsa, zalsa_local: &ZalsaLocal) -> bool {
167+
// The most common case is that the entire cycle is running in the same thread.
168+
// If that's the case, short circuit and return `true` immediately.
169+
if self.validate_same_iteration(zalsa_local) {
170+
return true;
171+
}
172+
173+
// Otherwise, await all cycle heads, recursively.
166174
let mut queue: Vec<_> = self
167175
.revisions
168176
.cycle_heads
@@ -220,6 +228,26 @@ impl<V> Memo<V> {
220228
hit_cycle
221229
}
222230

231+
/// If this is a provisional memo, validate that it was cached in the same iteration of the
232+
/// same cycle(s) that we are still executing. If so, it is valid for reuse. This avoids
233+
/// runaway re-execution of the same queries within a fixpoint iteration.
234+
pub(super) fn validate_same_iteration(&self, zalsa_local: &ZalsaLocal) -> bool {
235+
let cycle_heads = &self.revisions.cycle_heads;
236+
if cycle_heads.is_empty() {
237+
return true;
238+
}
239+
240+
zalsa_local.with_query_stack(|stack| {
241+
cycle_heads.iter().all(|cycle_head| {
242+
stack
243+
.iter()
244+
.rev()
245+
.find(|query| query.database_key_index == cycle_head.database_key_index)
246+
.is_some_and(|query| query.iteration_count() == cycle_head.iteration_count)
247+
})
248+
})
249+
}
250+
223251
/// Cycle heads that should be propagated to dependent queries.
224252
#[inline(always)]
225253
pub(super) fn cycle_heads(&self) -> &CycleHeads {

0 commit comments

Comments
 (0)