Skip to content

Commit 2a09313

Browse files
committed
Documentation
1 parent f7bb021 commit 2a09313

File tree

2 files changed

+59
-124
lines changed

2 files changed

+59
-124
lines changed

src/function/fetch.rs

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,11 @@ where
113113
id: Id,
114114
memo_ingredient_index: MemoIngredientIndex,
115115
) -> Option<&'db Memo<C::Output<'db>>> {
116+
let database_key_index = self.database_key_index(id);
116117
// Try to claim this query: if someone else has claimed it already, go back and start again.
117118
let _claim_guard = match self.sync_table.try_claim(zalsa, id) {
118119
ClaimResult::Retry => return None,
119120
ClaimResult::Cycle => {
120-
let database_key_index = self.database_key_index(id);
121121
// check if there's a provisional value for this query
122122
// Note we don't `validate_may_be_provisional` the memo here as we want to reuse an
123123
// existing provisional memo if it exists
@@ -180,31 +180,33 @@ where
180180
};
181181

182182
// Now that we've claimed the item, check again to see if there's a "hot" value.
183-
let mut opt_old_memo = self.get_memo_from_table_for(zalsa, id, memo_ingredient_index);
184-
185-
// If this is a provisional memo from the same revision. Await all cycle heads because they could be
186-
// running on a different thread.
187-
if let Some(mut old_memo) = opt_old_memo {
183+
let opt_old_memo = self.get_memo_from_table_for(zalsa, id, memo_ingredient_index);
184+
185+
// If this is a provisional memo from the same revision, await all its cycle heads because
186+
// we need to ensure that only one thread is iterating on a cycle at a given time.
187+
// For example, if we have a nested cycle like so:
188+
// ```
189+
// a -> b -> c -> b
190+
// -> a
191+
//
192+
// d -> b
193+
// ```
194+
// thread 1 calls `a` and `a` completes the inner cycle `b -> c` but hasn't finished the outer cycle `a` yet.
195+
// thread 2 now calls `b`. We don't want that thread 2 iterates `b` while thread 1 is iterating `a` at the same time
196+
// because it can result in thread b overriding provisional memos that thread a has accessed already and still relies upon.
197+
//
198+
// By waiting, we ensure that thread 1 completes a (based on a provisional value for `b`) and `b`
199+
// becomes the new outer cycle, which thread 2 drives to completion.
200+
if let Some(old_memo) = opt_old_memo {
188201
if old_memo.value.is_some()
189202
&& old_memo.may_be_provisional()
190203
&& old_memo.verified_at.load() == zalsa.current_revision()
191204
{
192-
opt_old_memo = loop {
193-
old_memo.await_heads(zalsa, self.database_key_index(id));
194-
195-
let new_old = self.get_memo_from_table_for(zalsa, id, memo_ingredient_index);
196-
match new_old {
197-
None => unreachable!("Expected memo to be present"),
198-
// If the new memo is the same as the old, then this means that this is still the "latest" memo for this
199-
Some(new_old) if std::ptr::eq(new_old, old_memo) => {
200-
break Some(new_old);
201-
}
202-
Some(new_old) => {
203-
tracing::debug!("Provisional memo has been updated by another thread while waiting for its cycle heads");
204-
old_memo = new_old;
205-
}
206-
}
207-
};
205+
old_memo.await_heads(zalsa, database_key_index);
206+
207+
// It's possible that one of the cycle heads replaced the memo for this ingredient
208+
// with fixpoint initial. We ignore that memo because we know it's only a temporary memo
209+
// and instead continue with the memo we already have (acquired).
208210
}
209211
}
210212

@@ -215,7 +217,7 @@ where
215217
db,
216218
zalsa,
217219
old_memo,
218-
self.database_key_index(id),
220+
database_key_index,
219221
&mut cycle_heads,
220222
) {
221223
if cycle_heads.is_empty() {
@@ -229,7 +231,7 @@ where
229231

230232
let memo = self.execute(
231233
db,
232-
db.zalsa_local().push_query(self.database_key_index(id), 0),
234+
db.zalsa_local().push_query(database_key_index, 0),
233235
opt_old_memo,
234236
);
235237

Lines changed: 33 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,10 @@
1-
//! Test a nested-cycle scenario across three threads:
1+
//! Test a deeply nested-cycle scenario across multiple threads.
22
//!
3-
//! ```text
4-
//! Thread T1 Thread T2 Thread T3
5-
//! --------- --------- ---------
6-
//! | | |
7-
//! v | |
8-
//! query_a() | |
9-
//! ^ | v |
10-
//! | +------------> query_b() |
11-
//! | ^ | v
12-
//! | | +------------> query_c()
13-
//! | | |
14-
//! +------------------+--------------------+
3+
//! The trick is that different threads call into the same cycle from different entry queries.
154
//!
16-
//! ```
5+
//! * Thread 1: `a` -> b -> c (which calls back into d, e, b, a)
6+
//! * Thread 2: `d` -> `c`
7+
//! * Thread 3: `e` -> `c`
178
use crate::sync::thread;
189
use crate::{Knobs, KnobsDatabase};
1910

@@ -25,10 +16,6 @@ struct CycleValue(u32);
2516
const MIN: CycleValue = CycleValue(0);
2617
const MAX: CycleValue = CycleValue(3);
2718

28-
// Signal 1: T1 has entered `query_a`
29-
// Signal 2: T2 has entered `query_b`
30-
// Signal 3: T3 has entered `query_c`
31-
3219
#[salsa::tracked(cycle_fn=cycle_fn, cycle_initial=initial)]
3320
fn query_a(db: &dyn KnobsDatabase) -> CycleValue {
3421
query_b(db)
@@ -74,86 +61,32 @@ fn initial(_db: &dyn KnobsDatabase) -> CycleValue {
7461

7562
#[test_log::test]
7663
fn the_test() {
77-
// shuttle::replay(
78-
crate::sync::check(
79-
|| {
80-
tracing::info!("New run");
81-
let db_t1 = Knobs::default();
82-
let db_t2 = db_t1.clone();
83-
let db_t3 = db_t1.clone();
84-
85-
let t1 = thread::spawn(move || {
86-
let _span =
87-
tracing::debug_span!("t1", thread_id = ?thread::current().id()).entered();
88-
let result = query_a(&db_t1);
89-
db_t1.signal(1);
90-
result
91-
});
92-
let t2 = thread::spawn(move || {
93-
let _span =
94-
tracing::debug_span!("t2", thread_id = ?thread::current().id()).entered();
95-
db_t2.wait_for(1);
96-
query_d(&db_t2)
97-
});
98-
let t3 = thread::spawn(move || {
99-
let _span =
100-
tracing::debug_span!("t3", thread_id = ?thread::current().id()).entered();
101-
db_t3.wait_for(1);
102-
query_e(&db_t3)
103-
});
104-
105-
let r_t1 = t1.join().unwrap();
106-
let r_t2 = t2.join().unwrap();
107-
let r_t3 = t3.join().unwrap();
108-
109-
assert_eq!((r_t1, r_t2, r_t3), (MAX, MAX, MAX));
110-
111-
tracing::info!("Complete");
112-
}, // ,
113-
// "
114-
// 9102ac21f5a392c8f88cc0a27300000000004092244992244992244992244992244992244992
115-
// 2449922449922449922449922449922449922449922449922449922449922449922449922449
116-
// 9224499224499224491248c23664dbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddbb66d
117-
// dbb66ddbb66d9324499224499224499224499224499224499224499224499224494a92244992
118-
// 2449922449922449922449922449922449922449922449922449922449922449922449922449
119-
// 9224499224499224499224499224499224499224499224499224499224499224499224499224
120-
// 499224499224499224499224499224499224499224c91629dbb66ddbb66d922449922449d9b6
121-
// 6ddbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddb364992942449922449924892
122-
// 244992244992244992244992244992244992b429dab66ddbb66d4b922449922449b46ddbb66d
123-
// dbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddbb6
124-
// 6ddbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddb
125-
// b66ddbb66ddbb66ddbb66ddbb66ddbb66ddbb66ddb2449922449922449922449922449922449
126-
// 922449922449922449d2b66ddbb66ddbb66ddbb66ddbb66ddbb66ddb962c4992244992244992
127-
// 2449922449922449b66ddbb64d49922449922449922449922449922449922449922449922449
128-
// 922449922449924892244992244992244992246ddbb624499224499224499224499224499224
129-
// 4992244992244992244992244992244992244992244992244992244992244992244992244992
130-
// 2449922449922449922449922449922449922449922449922449922449922449922449922449
131-
// 9224499224499224499224499224499224499224499224499224499224499224499224499224
132-
// 4992244992244992244992244992244992244992244992244992244992244992244992244992
133-
// 2449922449922449922449922449922449922449922449922449922449922449922449922449
134-
// 9224499224499224499224499224499224499224499224499224499224499224499224499224
135-
// 4992244992244992244992244992244992244992244992244992244992244992244992244992
136-
// 2449922449922449922449922449922449922449922449922449922449922449922449922449
137-
// 9224499224499224499224499224499224499224499224499224499224499224499224499224
138-
// 4992244992244992244992244992244992244992244992244992244992244992244992244992
139-
// 244992244992244992244992244992244992489124499224498a244992244992244992244992
140-
// 2449922449922449922449922449922449922449922449922449529224499224492249922449
141-
// 9224499224499224499224499224499224499224499224499224499224499224499224499224
142-
// 4992244992244992244992244992244992244992244992244992244992244992244992244992
143-
// 2449922449922449922449922449922449922449922449922449922449922449922449922449
144-
// 9224499224499224499224499224499224499224499224499224499224499224499224499224
145-
// 4992244992244992244992244992244992244992244992244992244992244992244992244992
146-
// 2449922449922449922449922449922449922449922449922449922449922449922449922449
147-
// 9224499224499224499224499224499224499224499224499224499224499224499224499224
148-
// 4992244992244992244992244992244992244992244992244992244992244992244992244992
149-
// 2449922449922449922449922449922449922449922449922449922449922449922449922449
150-
// 9224499224499224499224499224499224499224499224499224499224499224499224499224
151-
// 4992244992244992244992244992244992244992244992244992244992244992244992244992
152-
// 2449922449922449922449922449922449922449922449922449922449922449922449922449
153-
// 9224499224499224499224499224499224499224499224499224499224499224499224499224
154-
// 4992244992244992244992244992244992244992244992244992244992244992244992244992
155-
// 2449922449922449922449922449922449922449922449922449922449922449922449922449
156-
// 922449922449922449922449922449922409
157-
// "
158-
);
64+
crate::sync::check(|| {
65+
let db_t1 = Knobs::default();
66+
let db_t2 = db_t1.clone();
67+
let db_t3 = db_t1.clone();
68+
69+
let t1 = thread::spawn(move || {
70+
let _span = tracing::debug_span!("t1", thread_id = ?thread::current().id()).entered();
71+
let result = query_a(&db_t1);
72+
db_t1.signal(1);
73+
result
74+
});
75+
let t2 = thread::spawn(move || {
76+
let _span = tracing::debug_span!("t2", thread_id = ?thread::current().id()).entered();
77+
db_t2.wait_for(1);
78+
query_d(&db_t2)
79+
});
80+
let t3 = thread::spawn(move || {
81+
let _span = tracing::debug_span!("t3", thread_id = ?thread::current().id()).entered();
82+
db_t3.wait_for(1);
83+
query_e(&db_t3)
84+
});
85+
86+
let r_t1 = t1.join().unwrap();
87+
let r_t2 = t2.join().unwrap();
88+
let r_t3 = t3.join().unwrap();
89+
90+
assert_eq!((r_t1, r_t2, r_t3), (MAX, MAX, MAX));
91+
});
15992
}

0 commit comments

Comments
 (0)