Skip to content

Commit

Permalink
Fix value table iteration vs log processing race (#230)
Browse files Browse the repository at this point in the history
* Fix value table iteration vs log processing race

* fmt

* Add value table iteration to fuzz test

* fmt
  • Loading branch information
arkpar authored Oct 18, 2023
1 parent 2629139 commit 70e0929
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 6 deletions.
5 changes: 5 additions & 0 deletions fuzz/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub enum Action<O: Debug> {
Restart,
IterPrev,
IterNext,
IterValues,
}

#[derive(Clone)]
Expand Down Expand Up @@ -331,6 +332,10 @@ pub trait DbSimulator {
);
}
},
Action::IterValues =>
if db.iter.is_none() {
db.db.iter_column_while(0, |_| true).unwrap();
},
}
retry_operation(|| Self::check_db_and_model_are_equals(&db.db, &layers)).unwrap();
}
Expand Down
5 changes: 5 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ struct DbInner {
flush_worker_wait: Arc<WaitCondvar<bool>>,
cleanup_worker_wait: WaitCondvar<bool>,
cleanup_queue_wait: WaitCondvar<bool>,
iteration_lock: Mutex<()>,
last_enacted: AtomicU64,
next_reindex: AtomicU64,
bg_err: Mutex<Option<Arc<Error>>>,
Expand Down Expand Up @@ -229,6 +230,7 @@ impl DbInner {
flush_worker_wait: Arc::new(WaitCondvar::new()),
cleanup_worker_wait: WaitCondvar::new(),
cleanup_queue_wait: WaitCondvar::new(),
iteration_lock: Mutex::new(()),
next_reindex: AtomicU64::new(1),
last_enacted: AtomicU64::new(last_enacted),
bg_err: Mutex::new(None),
Expand Down Expand Up @@ -568,6 +570,7 @@ impl DbInner {
}

fn enact_logs(&self, validation_mode: bool) -> Result<bool> {
let _iteration_lock = self.iteration_lock.lock();
let cleared = {
let reader = match self.log.read_next(validation_mode) {
Ok(reader) => reader,
Expand Down Expand Up @@ -871,13 +874,15 @@ impl DbInner {
}

fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
let _lock = self.iteration_lock.lock();
match &self.columns[c as usize] {
Column::Hash(column) => column.iter_values(&self.log, f),
Column::Tree(_) => unimplemented!(),
}
}

fn iter_column_index_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> {
let _lock = self.iteration_lock.lock();
match &self.columns[c as usize] {
Column::Hash(column) => column.iter_index(&self.log, f),
Column::Tree(_) => unimplemented!(),
Expand Down
17 changes: 11 additions & 6 deletions src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ pub struct ValueTable {
pub id: TableId,
pub entry_size: u16,
file: crate::file::TableFile,
filled: AtomicU64,
filled: AtomicU64, // Number of entries from the POV of the log overlay.
written: AtomicU64, // Actual number of entries on disk.
last_removed: AtomicU64,
dirty_header: AtomicBool,
multipart: bool,
Expand Down Expand Up @@ -423,6 +424,7 @@ impl ValueTable {
entry_size,
file,
filled: AtomicU64::new(filled),
written: AtomicU64::new(filled),
last_removed: AtomicU64::new(last_removed),
dirty_header: AtomicBool::new(false),
multipart,
Expand Down Expand Up @@ -928,6 +930,7 @@ impl ValueTable {
let mut header = Header::default();
log.read(&mut header.0)?;
self.file.write_at(&header.0, 0)?;
self.written.store(header.filled(), Ordering::Relaxed);
log::trace!(target: "parity-db", "{}: Enacted header, {} filled", self.id, header.filled());
return Ok(())
}
Expand Down Expand Up @@ -992,6 +995,7 @@ impl ValueTable {
}
self.last_removed.store(last_removed, Ordering::Relaxed);
self.filled.store(filled, Ordering::Relaxed);
self.written.store(filled, Ordering::Relaxed);
Ok(())
}

Expand Down Expand Up @@ -1028,8 +1032,8 @@ impl ValueTable {
log: &impl LogQuery,
mut f: impl FnMut(u64, u32, Vec<u8>, bool) -> bool,
) -> Result<()> {
let filled = self.filled.load(Ordering::Relaxed);
for index in 1..filled {
let written = self.written.load(Ordering::Relaxed);
for index in 1..written {
let mut result = Vec::new();
// expect only indexed key.
let mut _fetch_key = Default::default();
Expand Down Expand Up @@ -1084,14 +1088,14 @@ impl ValueTable {

/// Validate free records sequence.
pub fn check_free_refs(&self) -> Result<u64> {
let filled = self.filled.load(Ordering::Relaxed);
let written = self.written.load(Ordering::Relaxed);
let mut next = self.last_removed.load(Ordering::Relaxed);
let mut len = 0;
while next != 0 {
if next >= filled {
if next >= written {
return Err(crate::error::Error::Corruption(format!(
"Bad removed ref {} out of {}",
next, filled
next, written
)))
}
let mut buf = PartialEntry::new_uninit();
Expand Down Expand Up @@ -1491,6 +1495,7 @@ mod test {
for (k, v) in &entries {
table.write_insert_plan(k, &v, writer, compressed).unwrap();
}
table.complete_plan(writer).unwrap();
});

let mut res = Vec::new();
Expand Down

0 comments on commit 70e0929

Please sign in to comment.