Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e51ca70
Set `validate_final` in `execute` after removing the last cycle head
MichaReiser May 30, 2025
f573178
Add runaway query repro
MichaReiser May 25, 2025
d7ef838
Add tracing
MichaReiser May 26, 2025
60d9617
Fix part 1
MichaReiser May 27, 2025
6a1181d
Fix `cycle_head_kinds` to always return provisional for memos that ar…
MichaReiser May 27, 2025
88b7350
Fix cycle error
MichaReiser May 27, 2025
358a76f
Documentation
MichaReiser May 29, 2025
ae3c7a8
Fix await for queries depending on initial value
MichaReiser May 29, 2025
ffd437c
correctly initialize queued
MichaReiser May 29, 2025
873a9fb
Cleanup
MichaReiser May 29, 2025
558c65b
Short circuit if entire query runs on single thread
MichaReiser May 29, 2025
b6a83ba
Move parallel code into its own method
MichaReiser May 29, 2025
979f57a
Rename method, add self_key to queued
MichaReiser May 29, 2025
8b4d69c
Revert self-key changes
MichaReiser May 29, 2025
3d9c307
Move check *after* `deep_verify_memo`
MichaReiser May 29, 2025
ebb9c52
Add a test for a cycle with changing cycle heads
MichaReiser May 30, 2025
e888eb5
Short circuit more often
MichaReiser May 30, 2025
b91a002
Consider iteration in `validate_provisional`
MichaReiser May 30, 2025
0c60657
Only yield if all heads result in a cycle. Retry if even just one inn…
MichaReiser May 30, 2025
02a9053
Fix hangs
MichaReiser May 31, 2025
501f6d2
Merge branch 'master' into deeply-nested-error
MichaReiser May 31, 2025
7764e21
Cargo fmt
MichaReiser May 31, 2025
b7105f8
clippy
MichaReiser May 31, 2025
30f6eb4
Fix hang if cycle initial panics
MichaReiser May 31, 2025
08b0fa5
Rename `cycle_head_kind` enable `cycle_a_t1_b_t2_fallback` shuttle test
MichaReiser Jun 1, 2025
2979a54
Cleanup
MichaReiser Jun 1, 2025
0c8420e
Docs
MichaReiser Jun 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions src/active_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{fmt, mem, ops};
use crate::accumulator::accumulated_map::{
AccumulatedMap, AtomicInputAccumulatedValues, InputAccumulatedValues,
};
use crate::cycle::CycleHeads;
use crate::cycle::{CycleHeads, IterationCount};
use crate::durability::Durability;
use crate::hash::FxIndexSet;
use crate::key::DatabaseKeyIndex;
Expand Down Expand Up @@ -61,7 +61,7 @@ pub(crate) struct ActiveQuery {
cycle_heads: CycleHeads,

/// If this query is a cycle head, iteration count of that cycle.
iteration_count: u32,
iteration_count: IterationCount,
}

impl ActiveQuery {
Expand Down Expand Up @@ -147,7 +147,7 @@ impl ActiveQuery {
}
}

pub(super) fn iteration_count(&self) -> u32 {
pub(super) fn iteration_count(&self) -> IterationCount {
self.iteration_count
}

Expand All @@ -161,7 +161,7 @@ impl ActiveQuery {
}

impl ActiveQuery {
fn new(database_key_index: DatabaseKeyIndex, iteration_count: u32) -> Self {
fn new(database_key_index: DatabaseKeyIndex, iteration_count: IterationCount) -> Self {
ActiveQuery {
database_key_index,
durability: Durability::MAX,
Expand Down Expand Up @@ -189,7 +189,7 @@ impl ActiveQuery {
ref mut accumulated,
accumulated_inputs,
ref mut cycle_heads,
iteration_count: _,
iteration_count,
} = self;

let origin = if untracked_read {
Expand All @@ -204,6 +204,7 @@ impl ActiveQuery {
mem::take(accumulated),
mem::take(tracked_struct_ids),
mem::take(cycle_heads),
iteration_count,
);
let accumulated_inputs = AtomicInputAccumulatedValues::new(accumulated_inputs);

Expand Down Expand Up @@ -236,10 +237,14 @@ impl ActiveQuery {
tracked_struct_ids.clear();
accumulated.clear();
*cycle_heads = Default::default();
*iteration_count = 0;
*iteration_count = IterationCount::initial();
}

fn reset_for(&mut self, new_database_key_index: DatabaseKeyIndex, new_iteration_count: u32) {
fn reset_for(
&mut self,
new_database_key_index: DatabaseKeyIndex,
new_iteration_count: IterationCount,
) {
let Self {
database_key_index,
durability,
Expand Down Expand Up @@ -323,7 +328,7 @@ impl QueryStack {
pub(crate) fn push_new_query(
&mut self,
database_key_index: DatabaseKeyIndex,
iteration_count: u32,
iteration_count: IterationCount,
) {
if self.len < self.stack.len() {
self.stack[self.len].reset_for(database_key_index, iteration_count);
Expand Down Expand Up @@ -373,7 +378,7 @@ struct CapturedQuery {
durability: Durability,
changed_at: Revision,
cycle_heads: CycleHeads,
iteration_count: u32,
iteration_count: IterationCount,
}

impl fmt::Debug for CapturedQuery {
Expand Down Expand Up @@ -449,7 +454,7 @@ impl fmt::Display for Backtrace {
write!(fmt, "{idx:>4}: {database_key_index:?}")?;
if full {
write!(fmt, " -> ({changed_at:?}, {durability:#?}")?;
if !cycle_heads.is_empty() || iteration_count > 0 {
if !cycle_heads.is_empty() || !iteration_count.is_initial() {
write!(fmt, ", iteration = {iteration_count:?}")?;
}
write!(fmt, ")")?;
Expand Down
54 changes: 45 additions & 9 deletions src/cycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use crate::sync::OnceLock;
/// The maximum number of times we'll fixpoint-iterate before panicking.
///
/// Should only be relevant in case of a badly configured cycle recovery.
pub const MAX_ITERATIONS: u32 = 200;
pub const MAX_ITERATIONS: IterationCount = IterationCount(200);

pub struct UnexpectedCycle(Option<crate::Backtrace>);

Expand Down Expand Up @@ -147,7 +147,33 @@ pub enum CycleRecoveryStrategy {
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub struct CycleHead {
pub(crate) database_key_index: DatabaseKeyIndex,
pub(crate) iteration_count: u32,
pub(crate) iteration_count: IterationCount,
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Default)]
pub struct IterationCount(u8);

impl IterationCount {
pub(crate) const fn initial() -> Self {
Self(0)
}

pub(crate) const fn is_initial(self) -> bool {
self.0 == 0
}

pub(crate) const fn increment(self) -> Option<Self> {
let next = Self(self.0 + 1);
if next.0 <= MAX_ITERATIONS.0 {
Some(next)
} else {
None
}
}

pub(crate) const fn as_u32(self) -> u32 {
self.0 as u32
}
}

/// Any provisional value generated by any query in a cycle will track the cycle head(s) (can be
Expand All @@ -164,7 +190,7 @@ impl CycleHeads {
pub(crate) fn initial(database_key_index: DatabaseKeyIndex) -> Self {
Self(thin_vec![CycleHead {
database_key_index,
iteration_count: 0,
iteration_count: IterationCount::initial(),
}])
}

Expand All @@ -190,7 +216,7 @@ impl CycleHeads {
pub(crate) fn update_iteration_count(
&mut self,
cycle_head_index: DatabaseKeyIndex,
new_iteration_count: u32,
new_iteration_count: IterationCount,
) {
if let Some(cycle_head) = self
.0
Expand All @@ -208,11 +234,11 @@ impl CycleHeads {
.iter()
.find(|candidate| candidate.database_key_index == database_key_index)
{
assert_eq!(existing.iteration_count, 0);
assert_eq!(existing.iteration_count, IterationCount::initial());
} else {
self.0.push(CycleHead {
database_key_index,
iteration_count: 0,
iteration_count: IterationCount::initial(),
});
}
}
Expand Down Expand Up @@ -266,8 +292,18 @@ pub(crate) fn empty_cycle_heads() -> &'static CycleHeads {
}

#[derive(Debug, PartialEq, Eq)]
pub enum CycleHeadKind {
Provisional,
NotProvisional,
pub enum ProvisionalStatus {
Provisional { iteration: IterationCount },
Final { iteration: IterationCount },
FallbackImmediate,
}

impl ProvisionalStatus {
pub(crate) const fn iteration(&self) -> Option<IterationCount> {
match self {
ProvisionalStatus::Provisional { iteration } => Some(*iteration),
ProvisionalStatus::Final { iteration } => Some(*iteration),
ProvisionalStatus::FallbackImmediate => None,
}
}
}
3 changes: 2 additions & 1 deletion src/event.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::cycle::IterationCount;
use crate::key::DatabaseKeyIndex;
use crate::sync::thread::{self, ThreadId};
use crate::Revision;
Expand Down Expand Up @@ -61,7 +62,7 @@ pub enum EventKind {
WillIterateCycle {
/// The database-key for the cycle head. Implements `Debug`.
database_key: DatabaseKeyIndex,
iteration_count: u32,
iteration_count: IterationCount,
fell_back: bool,
},

Expand Down
67 changes: 43 additions & 24 deletions src/function.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
pub(crate) use maybe_changed_after::VerifyResult;
use std::any::Any;
use std::fmt;
use std::ptr::NonNull;

pub(crate) use maybe_changed_after::VerifyResult;
use std::sync::atomic::Ordering;
pub(crate) use sync::SyncGuard;

use crate::accumulator::accumulated_map::{AccumulatedMap, InputAccumulatedValues};
use crate::cycle::{CycleHeadKind, CycleHeads, CycleRecoveryAction, CycleRecoveryStrategy};
use crate::cycle::{
empty_cycle_heads, CycleHeads, CycleRecoveryAction, CycleRecoveryStrategy, ProvisionalStatus,
};
use crate::function::delete::DeletedEntries;
use crate::function::sync::{ClaimResult, SyncTable};
use crate::ingredient::Ingredient;
use crate::ingredient::{Ingredient, WaitForResult};
use crate::key::DatabaseKeyIndex;
use crate::plumbing::MemoIngredientMap;
use crate::salsa_struct::SalsaStructInDb;
Expand Down Expand Up @@ -244,30 +247,46 @@ where
self.maybe_changed_after(db, input, revision, cycle_heads)
}

/// True if the input `input` contains a memo that cites itself as a cycle head.
/// This indicates an intermediate value for a cycle that has not yet reached a fixed point.
fn cycle_head_kind(&self, zalsa: &Zalsa, input: Id) -> CycleHeadKind {
let is_provisional = self
.get_memo_from_table_for(zalsa, input, self.memo_ingredient_index(zalsa, input))
.is_some_and(|memo| {
memo.cycle_heads()
.into_iter()
.any(|head| head.database_key_index == self.database_key_index(input))
});
if is_provisional {
CycleHeadKind::Provisional
} else if C::CYCLE_STRATEGY == CycleRecoveryStrategy::FallbackImmediate {
CycleHeadKind::FallbackImmediate
/// Returns `final` only if the memo has the `verified_final` flag set and the cycle recovery strategy is not `FallbackImmediate`.
///
/// Otherwise, the value is still provisional. For both final and provisional, it also
/// returns the iteration in which this memo was created (always 0 except for cycle heads).
fn provisional_status(&self, zalsa: &Zalsa, input: Id) -> Option<ProvisionalStatus> {
let memo =
self.get_memo_from_table_for(zalsa, input, self.memo_ingredient_index(zalsa, input))?;

let iteration = memo.revisions.iteration();
let verified_final = memo.revisions.verified_final.load(Ordering::Relaxed);

Some(if verified_final {
if C::CYCLE_STRATEGY == CycleRecoveryStrategy::FallbackImmediate {
ProvisionalStatus::FallbackImmediate
} else {
ProvisionalStatus::Final { iteration }
}
} else {
CycleHeadKind::NotProvisional
}
ProvisionalStatus::Provisional { iteration }
})
}

fn cycle_heads<'db>(&self, zalsa: &'db Zalsa, input: Id) -> &'db CycleHeads {
self.get_memo_from_table_for(zalsa, input, self.memo_ingredient_index(zalsa, input))
.map(|memo| memo.cycle_heads())
.unwrap_or(empty_cycle_heads())
}

/// Attempts to claim `key_index`, returning `false` if a cycle occurs.
fn wait_for(&self, zalsa: &Zalsa, key_index: Id) -> bool {
/// Attempts to claim `key_index` without blocking.
///
/// * [`WaitForResult::Running`] if the `key_index` is running on another thread. It's up to the caller to block on the other thread
/// to wait until the result becomes available.
/// * [`WaitForResult::Available`] It is (or at least was) possible to claim the `key_index`
/// * [`WaitResult::Cycle`] Claiming the `key_index` results in a cycle because it's on the current's thread query stack or
/// running on another thread that is blocked on this thread.
fn wait_for<'me>(&'me self, zalsa: &'me Zalsa, key_index: Id) -> WaitForResult<'me> {
match self.sync_table.try_claim(zalsa, key_index) {
ClaimResult::Retry | ClaimResult::Claimed(_) => true,
ClaimResult::Cycle => false,
ClaimResult::Running(blocked_on) => WaitForResult::Running(blocked_on),
ClaimResult::Cycle { same_thread } => WaitForResult::Cycle { same_thread },
ClaimResult::Claimed(_) => WaitForResult::Available,
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/function/accumulated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ where
db: &'db C::DbView,
key: Id,
) -> (Option<&'db AccumulatedMap>, InputAccumulatedValues) {
let (zalsa, zalsa_local) = db.zalsas();
// NEXT STEP: stash and refactor `fetch` to return an `&Memo` so we can make this work
let memo = self.refresh_memo(db, db.zalsa(), key);
let memo = self.refresh_memo(db, zalsa, zalsa_local, key);
(
memo.revisions.accumulated(),
memo.revisions.accumulated_inputs.load(),
Expand Down
25 changes: 14 additions & 11 deletions src/function/execute.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::cycle::{CycleRecoveryStrategy, MAX_ITERATIONS};
use crate::cycle::{CycleRecoveryStrategy, IterationCount};
use crate::function::memo::Memo;
use crate::function::{Configuration, IngredientImpl};
use crate::sync::atomic::{AtomicBool, Ordering};
Expand Down Expand Up @@ -74,7 +74,9 @@ where
// Cycle participants that don't have a fallback will be discarded in
// `validate_provisional()`.
let cycle_heads = std::mem::take(cycle_heads);
let active_query = db.zalsa_local().push_query(database_key_index, 0);
let active_query = db
.zalsa_local()
.push_query(database_key_index, IterationCount::initial());
new_value = C::cycle_initial(db, C::id_to_input(db, id));
revisions = active_query.pop();
// We need to set `cycle_heads` and `verified_final` because it needs to propagate to the callers.
Expand Down Expand Up @@ -125,7 +127,7 @@ where
memo_ingredient_index: MemoIngredientIndex,
) -> (C::Output<'db>, QueryRevisions) {
let database_key_index = active_query.database_key_index;
let mut iteration_count: u32 = 0;
let mut iteration_count = IterationCount::initial();
let mut fell_back = false;

// Our provisional value from the previous iteration, when doing fixpoint iteration.
Expand Down Expand Up @@ -189,12 +191,10 @@ where
match C::recover_from_cycle(
db,
&new_value,
iteration_count,
iteration_count.as_u32(),
C::id_to_input(db, id),
) {
crate::CycleRecoveryAction::Iterate => {
tracing::debug!("{database_key_index:?}: execute: iterate again");
}
crate::CycleRecoveryAction::Iterate => {}
crate::CycleRecoveryAction::Fallback(fallback_value) => {
tracing::debug!(
"{database_key_index:?}: execute: user cycle_fn says to fall back"
Expand All @@ -208,10 +208,9 @@ where
}
// `iteration_count` can't overflow as we check it against `MAX_ITERATIONS`
// which is less than `u32::MAX`.
iteration_count += 1;
if iteration_count > MAX_ITERATIONS {
panic!("{database_key_index:?}: execute: too many cycle iterations");
}
iteration_count = iteration_count.increment().unwrap_or_else(|| {
panic!("{database_key_index:?}: execute: too many cycle iterations")
});
zalsa.event(&|| {
Event::new(EventKind::WillIterateCycle {
database_key: database_key_index,
Expand All @@ -220,6 +219,10 @@ where
})
});
cycle_heads.update_iteration_count(database_key_index, iteration_count);
revisions.update_iteration_count(iteration_count);
tracing::debug!(
"{database_key_index:?}: execute: iterate again, revisions: {revisions:#?}"
);
opt_last_provisional = Some(self.insert_memo(
zalsa,
id,
Expand Down
Loading