Skip to content

Commit 15e650a

Browse files
committed
Nits
1 parent d7e0774 commit 15e650a

File tree

6 files changed

+37
-15
lines changed

6 files changed

+37
-15
lines changed

src/cycle.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,10 @@ impl IterationCount {
130130
self.0 == 0
131131
}
132132

133+
pub(crate) const fn panicked() -> Self {
134+
Self(u8::MAX)
135+
}
136+
133137
pub(crate) const fn increment(self) -> Option<Self> {
134138
let next = Self(self.0 + 1);
135139
if next.0 <= MAX_ITERATIONS.0 {
@@ -226,7 +230,7 @@ impl CycleHeads {
226230

227231
pub(crate) fn contains(&self, value: &DatabaseKeyIndex) -> bool {
228232
self.into_iter()
229-
.any(|head| head.database_key_index == *value)
233+
.any(|head| head.database_key_index == *value && !head.removed.load(Ordering::Relaxed))
230234
}
231235

232236
pub(crate) fn clear_except(&self, except: DatabaseKeyIndex) {

src/function/execute.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,12 +187,27 @@ where
187187

188188
let mut cycle_heads = std::mem::take(cycle_heads);
189189

190+
// Recursively resolve all cycle heads that this head depends on.
191+
// This isn't required in a single-threaded execution but it's not guaranteed that all nested cycles are listed
192+
// in cycle heads in a multi-threaded execution:
193+
//
194+
// t1: a -> b
195+
// t2: c -> b (blocks on t1)
196+
// t1: a -> b -> c (cycle, returns fixpoint initial with c(0) in heads)
197+
// t1: a -> b (completes b, b has c(0) in its cycle heads, releases `b`, which resumes `t2`, and `retry_provisional` blocks on `c` (t2))
198+
// t2: c -> a (cycle, returns fixpoint initial for a with a(0) in heads)
199+
// t2: completes c, `provisional_retry` blocks on `a` (t2)
200+
// t1: a (complets `b` with `c` in heads)
201+
//
202+
// Note how `a` only depends on `c` but not `a`. This is because `a` only saw the initial value of `c` and wasn't updated when `c` completed.
203+
// That's why we need to resolve the cycle heads recursively to `cycle_heads` contains all cycle heads at the moment this query completed.
190204
let mut queue: SmallVec<[DatabaseKeyIndex; 4]> = cycle_heads
191205
.iter()
192206
.map(|head| head.database_key_index)
193207
.filter(|head| *head != database_key_index)
194208
.collect();
195209

210+
// TODO: Can we also resolve whether the cycles have converged here?
196211
while let Some(head) = queue.pop() {
197212
let ingredient = zalsa.lookup_ingredient(head.ingredient_index());
198213
let nested_heads = ingredient.cycle_heads(zalsa, head.key_index());
@@ -230,10 +245,9 @@ where
230245
memo.value.as_ref()
231246
};
232247

233-
234248
let last_provisional_value = last_provisional_value.expect(
235-
"`fetch_cold_cycle` should have inserted a provisional memo with Cycle::initial",
236-
);
249+
"`fetch_cold_cycle` should have inserted a provisional memo with Cycle::initial",
250+
);
237251
crate::tracing::debug!(
238252
"{database_key_index:?}: execute: \
239253
I am a cycle head, comparing last provisional value with new value"
@@ -487,6 +501,10 @@ impl<C: Configuration> Drop for ClearCycleHeadIfPanicking<'_, C> {
487501
if std::thread::panicking() {
488502
let revisions =
489503
QueryRevisions::fixpoint_initial(self.ingredient.database_key_index(self.id));
504+
revisions.update_iteration_count_mut(
505+
self.ingredient.database_key_index(self.id),
506+
IterationCount::panicked(),
507+
);
490508

491509
let memo = Memo::new(None, self.zalsa.current_revision(), revisions);
492510
self.ingredient

src/function/fetch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ where
232232
}
233233
}
234234

235-
let memo = self.execute(db,zalsa, zalsa_local, database_key_index, opt_old_memo);
235+
let memo = self.execute(db, zalsa, zalsa_local, database_key_index, opt_old_memo);
236236

237237
Some(memo)
238238
}

src/function/maybe_changed_after.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,7 @@ where
446446
/// If this is a provisional memo, validate that it was cached in the same iteration of the
447447
/// same cycle(s) that we are still executing. If so, it is valid for reuse. This avoids
448448
/// runaway re-execution of the same queries within a fixpoint iteration.
449+
#[inline]
449450
fn validate_same_iteration(
450451
&self,
451452
zalsa: &Zalsa,

src/zalsa_local.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,10 @@ struct QueryRevisionsExtraInner {
568568

569569
cycle_converged: bool,
570570

571-
#[cfg_attr(feature = "persistence", serde(with = "crate::zalsa_local::persistence::atomic_bool"))]
571+
#[cfg_attr(
572+
feature = "persistence",
573+
serde(with = "crate::zalsa_local::persistence::atomic_bool")
574+
)]
572575
nested_cycle: AtomicBool,
573576
}
574577

@@ -1265,7 +1268,7 @@ pub(crate) mod persistence {
12651268
}
12661269
}
12671270
}
1268-
1271+
12691272
// A workaround the fact that `shuttle` atomic types do not implement `serde::{Serialize, Deserialize}`.
12701273
pub(super) mod verified_final {
12711274
use crate::sync::atomic::{AtomicBool, Ordering};
@@ -1302,5 +1305,4 @@ pub(crate) mod persistence {
13021305
serde::Deserialize::deserialize(deserializer).map(AtomicBool::new)
13031306
}
13041307
}
1305-
13061308
}

tests/parallel/cycle_nested_deep_conditional_changed.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,34 +108,31 @@ fn the_test() {
108108
}
109109

110110
let t1 = thread::spawn(move || {
111-
let _span = tracing::debug_span!("t1", thread_id = ?thread::current().id()).entered();
111+
let _span = tracing::info_span!("t1", thread_id = ?thread::current().id()).entered();
112112
let (db, input) = get_db(|db, input| {
113113
query_a(db, input);
114114
});
115115

116-
let _span = tracing::debug_span!("t1", thread_id = ?thread::current().id()).entered();
117-
118116
query_a(&db, input)
119117
});
120118
let t2 = thread::spawn(move || {
121-
let _span = tracing::debug_span!("t2", thread_id = ?thread::current().id()).entered();
119+
let _span = tracing::info_span!("t2", thread_id = ?thread::current().id()).entered();
122120
let (db, input) = get_db(|db, input| {
123121
query_b(db, input);
124122
});
125123

126-
let _span = tracing::debug_span!("t4", thread_id = ?thread::current().id()).entered();
127124
query_b(&db, input)
128125
});
129126
let t3 = thread::spawn(move || {
130-
let _span = tracing::debug_span!("t3", thread_id = ?thread::current().id()).entered();
127+
let _span = tracing::info_span!("t3", thread_id = ?thread::current().id()).entered();
131128
let (db, input) = get_db(|db, input| {
132129
query_d(db, input);
133130
});
134131

135132
query_d(&db, input)
136133
});
137134
let t4 = thread::spawn(move || {
138-
let _span = tracing::debug_span!("t4", thread_id = ?thread::current().id()).entered();
135+
let _span = tracing::info_span!("t4", thread_id = ?thread::current().id()).entered();
139136

140137
let (db, input) = get_db(|db, input| {
141138
query_e(db, input);

0 commit comments

Comments
 (0)