diff --git a/crates/core/src/db/commit_log.rs b/crates/core/src/db/commit_log.rs index 8bb1349ea97..7b165b11c26 100644 --- a/crates/core/src/db/commit_log.rs +++ b/crates/core/src/db/commit_log.rs @@ -282,8 +282,9 @@ impl From<&CommitLog> for CommitLogView { } } +/// Iterator over a single [`MessageLog`] segment, yielding [`Commit`]s. #[must_use = "iterators are lazy and do nothing unless consumed"] -struct IterSegment { +pub struct IterSegment { inner: message_log::IterSegment, } @@ -311,6 +312,12 @@ impl Iterator for IterSegment { } } +impl From for IterSegment { + fn from(inner: message_log::IterSegment) -> Self { + Self { inner } + } +} + /// Iterator over a [`CommitLogView`], yielding [`Commit`]s. /// /// Created by [`CommitLogView::iter`] and [`CommitLogView::iter_from`] @@ -429,13 +436,13 @@ mod tests { // No slicing yet, so offsets on segment boundaries yield an additional // COMMITS_PER_SEGMENT. - let commits = view.iter_from(20_001).map(Result::unwrap).count(); + let commits = view.iter_from(20_000).map(Result::unwrap).count(); assert_eq!(9999, commits); - let commits = view.iter_from(10_001).map(Result::unwrap).count(); + let commits = view.iter_from(10_000).map(Result::unwrap).count(); assert_eq!(19_999, commits); - let commits = view.iter_from(10_000).map(Result::unwrap).count(); + let commits = view.iter_from(9_999).map(Result::unwrap).count(); assert_eq!(29_999, commits); } } diff --git a/crates/core/src/db/message_log.rs b/crates/core/src/db/message_log.rs index f979ef904b3..bd9c322adaf 100644 --- a/crates/core/src/db/message_log.rs +++ b/crates/core/src/db/message_log.rs @@ -6,15 +6,40 @@ use std::{ #[cfg(target_family = "unix")] use std::os::unix::fs::FileExt; - -use anyhow::{anyhow, Context}; - -use crate::error::DBError; #[cfg(target_family = "windows")] use std::os::windows::fs::FileExt; +use derive_more::Display; +use thiserror::Error; + +use super::messages::commit::Commit; + const HEADER_SIZE: usize = 4; +/// Error returned by [`OpenOptions::open`] or [`MessageLog::open`]. +#[derive(Debug, Error)] +#[error("Failed to open message log at `{path}`: {kind}")] +pub struct OpenError { + kind: OpenErrorKind, + path: String, + #[source] + source: Option, +} + +#[derive(Debug, Display)] +pub enum OpenErrorKind { + #[display(fmt = "Could not create root directory")] + CreateRoot, + #[display(fmt = "Could not read directory or -entry")] + ReadDir, + #[display(fmt = "Could not parse file name as offset")] + ParseOffset, + #[display(fmt = "Could not open segment file")] + OpenSegment, + #[display(fmt = "Failed to read segment at byte offset {offset}")] + ReadSegment { offset: u64 }, +} + /// Options for opening a [`MessageLog`], similar to [`fs::OpenOptions`]. #[derive(Clone, Copy, Debug)] pub struct OpenOptions { @@ -33,27 +58,40 @@ impl OpenOptions { /// Open the [`MessageLog`] at `path` with the options in self. #[tracing::instrument(skip_all)] - pub fn open(&self, path: impl AsRef) -> Result { + pub fn open(&self, path: impl AsRef) -> Result { + use OpenErrorKind::*; + + fn err(kind: OpenErrorKind, path: &Path) -> impl FnOnce(S) -> OpenError + where + S: Into>, + { + let path = path.display().to_string(); + move |source| OpenError { + kind, + path, + source: source.into(), + } + } + let root = path.as_ref(); - fs::create_dir_all(root).with_context(|| format!("could not create root directory: {}", root.display()))?; + fs::create_dir_all(root).map_err(err(CreateRoot, root))?; let mut segments = Vec::new(); let mut total_size = 0; - for file in fs::read_dir(root).with_context(|| format!("unable to read root directory: {}", root.display()))? { - let dir_entry = file?; + for file in fs::read_dir(root).map_err(err(ReadDir, root))? { + let dir_entry = file.map_err(err(ReadDir, root))?; let path = dir_entry.path(); if let Some(ext) = path.extension() { if ext != "log" { continue; } - let file_stem = path + let offset = path .file_stem() - .map(|os| os.to_string_lossy()) - .ok_or_else(|| anyhow!("unexpected .log file: {}", path.display()))?; - let offset = file_stem + .unwrap_or_default() + .to_string_lossy() .parse::() - .with_context(|| format!("could not parse log offset from: {}", path.display()))?; - let size = dir_entry.metadata()?.len(); + .map_err(|_| err(ParseOffset, &path)(None))?; + let size = dir_entry.metadata().map_err(err(ReadDir, &path))?.len(); total_size += size; segments.push(Segment { @@ -75,16 +113,19 @@ impl OpenOptions { .read(true) .append(true) .create(true) - .open(&last_segment_path)?; + .open(&last_segment_path) + .map_err(err(OpenSegment, &last_segment_path))?; let mut max_offset = last_segment.min_offset; let mut cursor: u64 = 0; while cursor < last_segment.size { let mut buf = [0; HEADER_SIZE]; #[cfg(target_family = "windows")] - file.seek_read(&mut buf, cursor)?; + file.seek_read(&mut buf, cursor) + .map_err(err(ReadSegment { offset: cursor }, &last_segment_path))?; #[cfg(target_family = "unix")] - file.read_exact_at(&mut buf, cursor)?; + file.read_exact_at(&mut buf, cursor) + .map_err(err(ReadSegment { offset: cursor }, &last_segment_path))?; let message_len = u32::from_le_bytes(buf); max_offset += 1; @@ -151,7 +192,7 @@ impl std::fmt::Debug for MessageLog { // TODO: do we build the concept of batches into the message log? impl MessageLog { #[tracing::instrument(skip(path))] - pub fn open(path: impl AsRef) -> Result { + pub fn open(path: impl AsRef) -> Result { OpenOptions::default().open(path) } @@ -160,7 +201,7 @@ impl MessageLog { } #[tracing::instrument(skip_all)] - pub fn append(&mut self, message: impl AsRef<[u8]>) -> Result<(), DBError> { + pub fn append(&mut self, message: impl AsRef<[u8]>) -> io::Result<()> { let message = message.as_ref(); let mess_size = message.len() as u32; let size: u32 = mess_size + HEADER_SIZE as u32; @@ -169,7 +210,7 @@ impl MessageLog { if end_size > self.options.max_segment_size { self.flush()?; self.segments.push(Segment { - min_offset: self.open_segment_max_offset + 1, + min_offset: self.open_segment_max_offset, size: 0, }); @@ -202,7 +243,7 @@ impl MessageLog { // https://stackoverflow.com/questions/42442387/is-write-safe-to-be-called-from-multiple-threads-simultaneously/42442926#42442926 // https://github.com/facebook/rocksdb/wiki/WAL-Performance #[tracing::instrument] - pub fn flush(&mut self) -> Result<(), DBError> { + pub fn flush(&mut self) -> io::Result<()> { self.open_segment_file.flush()?; Ok(()) } @@ -212,7 +253,7 @@ impl MessageLog { // to be for sure durably written. // SEE: https://stackoverflow.com/questions/69819990/whats-the-difference-between-flush-and-sync-all #[tracing::instrument] - pub fn sync_all(&mut self) -> Result<(), DBError> { + pub fn sync_all(&mut self) -> io::Result<()> { log::trace!("fsync log file"); self.flush()?; let file = self.open_segment_file.get_ref(); @@ -228,6 +269,13 @@ impl MessageLog { self.options.max_segment_size } + /// Total number of segments currently comprising the log. + /// + /// Equivalent to `self.segments().count()`, but more efficient. + pub fn total_segments(&self) -> usize { + self.segments.len() + } + pub fn get_root(&self) -> PathBuf { self.root.clone() } @@ -310,10 +358,14 @@ impl MessageLog { /// atomic, to the extent required by [POSIX]. /// /// [POSIX]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/V2_chap02.html#tag_15_09_07 - pub fn reset_to(&mut self, offset: u64) -> Result<(), DBError> { + pub fn reset_to(&mut self, offset: u64) -> io::Result<()> { + log::info!("Resetting message log to offset {offset}"); if offset == 0 { fs::remove_dir_all(&self.root)?; - *self = self.options.open(&self.root)?; + *self = self + .options + .open(&self.root) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; return Ok(()); } @@ -339,28 +391,25 @@ impl MessageLog { file: BufReader::new(file), }; - let to_retain = self.open_segment_max_offset - offset; - let mut retained = 0; - for message in iter.by_ref().take(to_retain as usize) { + log::debug!("segment min_offset={}", segment.min_offset); + for i in segment.min_offset..offset { + let message = iter.by_ref().next().ok_or_else(|| { + io::Error::new( + io::ErrorKind::UnexpectedEof, + format!("Open segment shorter than expected: {i}"), + ) + })?; // Give up on I/O errors while reading the next message. let _ = message?; - retained += 1; } - // We maintain that: - // - // segment.min_offset <= offset <= self.open_segment_max_offset - // - // `iter` yielding fewer elements thus breaks our invariants. - assert_eq!( - to_retain, retained, - "Open segment shorter than expected: {retained} instead of {to_retain}" - ); - segment.size - iter.bytes_read() + iter.bytes_read() }; + log::debug!("new segment size {new_segment_size}"); // Truncate file to byte offset. let mut file = File::options().read(true).write(true).open(path)?; file.set_len(new_segment_size)?; + file.sync_all()?; file.seek(SeekFrom::End(0))?; self.total_size -= segment.size; @@ -418,6 +467,13 @@ impl SegmentView { self.try_into() } + /// Obtain an iterator over the [`Commit`]s the segment contains. + /// + /// Convenience for `self.try_into_iter().map(|i| i.commits())`. + pub fn try_into_iter_commits(self) -> io::Result>> { + self.try_into_iter().map(|i| i.commits()) + } + /// Turn this [`SegmentView`] into a [`Read`]able [`File`]. pub fn try_into_file(self) -> io::Result { self.try_into() @@ -429,9 +485,11 @@ impl TryFrom for IterSegment { fn try_from(view: SegmentView) -> Result { let segment = view.offset(); - File::try_from(view) - .map(BufReader::new) - .map(|file| IterSegment { segment, read: 0, file }) + File::try_from(view).map(|file| IterSegment { + segment, + read: 0, + file: BufReader::new(file), + }) } } @@ -467,12 +525,10 @@ impl IterSegment { self.read } - fn read_exact_or_none(&mut self, buf: &mut [u8]) -> Option> { - match self.file.read_exact(buf) { - Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => None, - Err(e) => Some(Err(e)), - Ok(()) => Some(Ok(())), - } + /// Turn this iterator into an iterator yielding [`Commit`]s + /// instead of raw bytes. + pub fn commits(self) -> impl Iterator> { + super::commit_log::IterSegment::from(self) } } @@ -481,14 +537,18 @@ impl Iterator for IterSegment { fn next(&mut self) -> Option { let mut buf = [0; HEADER_SIZE]; - if let Err(e) = self.read_exact_or_none(&mut buf)? { - return Some(Err(e)); + if let Err(e) = self.file.read_exact(&mut buf) { + if e.kind() == io::ErrorKind::UnexpectedEof { + return None; + } else { + return Some(Err(e)); + } } self.read += HEADER_SIZE as u64; let message_len = u32::from_le_bytes(buf); let mut buf = vec![0; message_len as usize]; - if let Err(e) = self.read_exact_or_none(&mut buf)? { + if let Err(e) = self.file.read_exact(&mut buf) { return Some(Err(e)); } self.read += message_len as u64; @@ -531,7 +591,7 @@ mod tests { use std::path::Path; - use super::MessageLog; + use super::{MessageLog, Segment, HEADER_SIZE}; use spacetimedb_lib::error::ResultTest; use tempfile::{self, TempDir}; @@ -602,13 +662,16 @@ mod tests { let segments = message_log.segments_from(1_000_000).count(); assert_eq!(0, segments); - let segments = message_log.segments_from(20_001).count(); + let segments = message_log.segments_from(20_000).count(); assert_eq!(1, segments); - let segments = message_log.segments_from(10_001).count(); + let segments = message_log.segments_from(10_000).count(); assert_eq!(2, segments); - let segments = message_log.segments_from(10_000).count(); + let segments = message_log.segments_from(9_999).count(); + assert_eq!(3, segments); + + let segments = message_log.segments_from(0).count(); assert_eq!(3, segments); Ok(()) @@ -643,10 +706,25 @@ mod tests { const SEGMENTS: usize = 3; const MESSAGES_PER_SEGMENT: usize = 10_000; + #[derive(Debug)] + struct Snapshot { + segments: Vec, + total_size: u64, + max_offset: u64, + } + + impl From<&MessageLog> for Snapshot { + fn from(mlog: &MessageLog) -> Self { + Self { + segments: mlog.segments.clone(), + total_size: mlog.total_size, + max_offset: mlog.open_segment_max_offset, + } + } + } + fn go(mlog: &mut MessageLog, offset: u64) { - let last_segments_len = mlog.segments.len(); - let last_max_offset = mlog.open_segment_max_offset; - let last_open_segment_size = mlog.open_segment().size; + let snapshot = Snapshot::from(&*mlog); mlog.reset_to(offset).unwrap(); assert_eq!( @@ -657,7 +735,20 @@ mod tests { assert_eq!( mlog.total_size, mlog.segments.iter().map(|s| s.size).sum::(), - "total size must be the sum of segment sizes\n{:#?}", + "reset to {}: total size must be the sum of segment sizes\n{:#?}", + offset, + mlog + ); + let entries_delta = snapshot.max_offset - offset; + let size_delta = entries_delta * (MESSAGE.len() + HEADER_SIZE) as u64; + assert_eq!( + mlog.total_size, + snapshot.total_size - size_delta, + "reset to {}: size should have been reduced by {} entries / {} bytes\n{:#?}\n{:#?}", + offset, + entries_delta, + size_delta, + snapshot, mlog ); @@ -665,28 +756,32 @@ mod tests { assert_eq!(1, mlog.segments.len(), "one segment must exist"); assert_eq!(0, mlog.open_segment().min_offset); assert_eq!(0, mlog.open_segment().size) - } else { - let on_segment_boundary = (offset % MESSAGES_PER_SEGMENT as u64) == 0; - if !on_segment_boundary { - let entries_delta = last_max_offset - offset; - let size_delta = entries_delta * (MESSAGE.len() + super::HEADER_SIZE) as u64; - assert_eq!( - mlog.open_segment().size, - last_open_segment_size - size_delta, - "open segment should have been truncated by {} entries, {} bytes\n{:#?}", - entries_delta, - size_delta, - mlog - ); - } else { - assert_eq!( - last_segments_len - 1, - mlog.segments.len(), - "last segment should be gone\n{:#?}", - mlog - ); - } } + + let retained_segments = snapshot.segments.iter().filter(|segment| segment.min_offset <= offset); + let retained_segments_count = retained_segments.clone().count(); + assert_eq!( + mlog.segments.len(), + retained_segments_count, + "reset to {}: {} segments should have been retained\n{:#?}\n{:#?}", + offset, + retained_segments_count, + snapshot, + mlog + ); + + let last_segment_size = retained_segments + .take(retained_segments_count - 1) + .fold(mlog.total_size, |acc, segment| acc - segment.size); + assert_eq!( + mlog.segments.last().unwrap().size, + last_segment_size, + "reset to {}: last segment should have size {}\n{:#?}\n{:#?}", + offset, + last_segment_size, + snapshot, + mlog, + ); } let mut mlog = fill_log(tmp.path(), SEGMENTS, MESSAGES_PER_SEGMENT, MESSAGE)?; diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 74b1127493e..5c02361a69e 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -5,23 +5,23 @@ use super::message_log::MessageLog; use super::ostorage::memory_object_db::MemoryObjectDB; use super::relational_operators::Relation; use crate::address::Address; -use crate::db::commit_log; use crate::db::db_metrics::DB_METRICS; use crate::db::messages::commit::Commit; use crate::db::ostorage::hashmap_object_db::HashMapObjectDB; use crate::db::ostorage::ObjectDB; -use crate::error::{DBError, DatabaseError, IndexError, TableError}; +use crate::error::{DBError, DatabaseError, IndexError, LogReplayError, TableError}; use crate::execution_context::ExecutionContext; +use crate::hash::Hash; use fs2::FileExt; use nonempty::NonEmpty; use spacetimedb_lib::PrimaryKey; use spacetimedb_primitives::{ColId, ColumnIndexAttribute, IndexId, SequenceId, TableId}; use spacetimedb_sats::data_key::ToDataKey; use spacetimedb_sats::db::def::{IndexDef, SequenceDef, TableDef, TableSchema}; -use spacetimedb_sats::hash::Hash; use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue}; use std::borrow::Cow; use std::fs::{create_dir_all, File}; +use std::io; use std::ops::RangeBounds; use std::path::Path; use std::sync::{Arc, Mutex}; @@ -94,33 +94,112 @@ impl RelationalDB { let mut last_commit_offset = None; let mut last_hash: Option = None; if let Some(message_log) = &message_log { - let message_log = message_log.lock().unwrap(); + let mut message_log = message_log.lock().unwrap(); let max_offset = message_log.open_segment_max_offset; - for commit in commit_log::Iter::from(message_log.segments()) { - let commit = commit?; - - segment_index += 1; - last_hash = commit.parent_commit_hash; - last_commit_offset = Some(commit.commit_offset); - for transaction in commit.transactions { - transaction_offset += 1; - // NOTE: Although I am creating a blobstore transaction in a - // one to one fashion for each message log transaction, this - // is just to reduce memory usage while inserting. We don't - // really care about inserting these transactionally as long - // as all of the writes get inserted. - datastore.replay_transaction(&transaction, odb.clone())?; - - let percentage = f64::floor((segment_index as f64 / max_offset as f64) * 100.0) as i32; - if percentage > last_logged_percentage && percentage % 10 == 0 { - last_logged_percentage = percentage; - log::debug!( - "[{}] Loaded {}% ({}/{})", - address, - percentage, - transaction_offset, - max_offset - ); + let total_segments = message_log.total_segments(); + + 'replay: for (segment_offset, segment) in message_log.segments().enumerate() { + for commit in segment.try_into_iter_commits()? { + let commit = match commit { + Ok(commit) => { + // We may decode a commit just fine, and detect + // that it's not well-formed only later when + // trying to decode the payload. + // As a cheaper alternative to verifying the + // parent hash, let's check if the commit + // sequence is contiguous. + let expected = last_commit_offset.map(|last| last + 1).unwrap_or(0); + if commit.commit_offset != expected { + let msg = + format!("Out-of-order commit {}, expected {}", commit.commit_offset, expected); + log::warn!("{msg}"); + + // If this is not at the end of the log, + // report as corrupted but leave files as is + // for forensics. + if segment_offset < total_segments - 1 { + return Err(LogReplayError::TrailingSegments { + segment_offset, + total_segments, + commit_offset: last_commit_offset.unwrap_or_default(), + source: io::Error::new(io::ErrorKind::Other, msg), + } + .into()); + } + + let reset_offset = expected.saturating_sub(1); + message_log + .reset_to(reset_offset) + .map_err(|source| LogReplayError::Reset { + offset: reset_offset, + source, + })?; + break 'replay; + } else { + commit + } + } + Err(e) if matches!(e.kind(), io::ErrorKind::InvalidData | io::ErrorKind::UnexpectedEof) => { + log::warn!("Corrupt commit after offset {}", last_commit_offset.unwrap_or_default()); + + // We expect that partial writes can occur at + // the end of the log, but a corrupted segment + // in the middle indicates something else is + // wrong. + if segment_offset < total_segments - 1 { + return Err(LogReplayError::TrailingSegments { + segment_offset, + total_segments, + commit_offset: last_commit_offset.unwrap_or_default(), + source: e, + } + .into()); + } + + // Else, truncate the current segment. This + // allows to continue appending to the log + // without hitting above condition. + let reset_offset = last_commit_offset.unwrap_or_default(); + message_log + .reset_to(reset_offset) + .map_err(|source| LogReplayError::Reset { + offset: reset_offset, + source, + })?; + break 'replay; + } + Err(e) => { + return Err(LogReplayError::Io { + segment_offset, + commit_offset: last_commit_offset.unwrap_or_default(), + source: e, + } + .into()) + } + }; + segment_index += 1; + last_hash = commit.parent_commit_hash; + last_commit_offset = Some(commit.commit_offset); + for transaction in commit.transactions { + transaction_offset += 1; + // NOTE: Although I am creating a blobstore transaction in a + // one to one fashion for each message log transaction, this + // is just to reduce memory usage while inserting. We don't + // really care about inserting these transactionally as long + // as all of the writes get inserted. + datastore.replay_transaction(&transaction, odb.clone())?; + + let percentage = f64::floor((segment_index as f64 / max_offset as f64) * 100.0) as i32; + if percentage > last_logged_percentage && percentage % 10 == 0 { + last_logged_percentage = percentage; + log::debug!( + "[{}] Loaded {}% ({}/{})", + address, + percentage, + transaction_offset, + max_offset + ); + } } } } @@ -637,7 +716,9 @@ pub fn open_db(path: impl AsRef, in_memory: bool, fsync: bool) -> Result, in_memory: bool, fsync: bool) -> Result) -> Result>, DBError> { let path = path.as_ref().to_path_buf(); - Ok(Arc::new(Mutex::new(MessageLog::open(path.join("mlog"))?))) + Ok(Arc::new(Mutex::new( + MessageLog::open(path.join("mlog")).map_err(|e| DBError::Other(e.into()))?, + ))) } #[cfg(test)] @@ -669,17 +752,35 @@ pub(crate) mod tests_utils { mod tests { #![allow(clippy::disallowed_macros)] - use super::*; - + use nonempty::NonEmpty; use spacetimedb_lib::error::ResultTest; + use spacetimedb_primitives::{ColId, TableId}; use spacetimedb_sats::db::auth::{StAccess, StTableType}; use spacetimedb_sats::db::def::{ColumnDef, IndexDef, IndexType, TableDef}; + use std::fs::File; + use std::io::{self, Seek, SeekFrom, Write}; + use std::ops::Range; + use std::path::Path; use std::sync::{Arc, Mutex}; + use tempfile::TempDir; - use crate::db::datastore::system_tables::*; - use crate::db::message_log::MessageLog; + use super::RelationalDB; + use crate::address::Address; + use crate::db::datastore::locking_tx_datastore::IterByColEq; + use crate::db::datastore::system_tables::StIndexRow; + use crate::db::datastore::system_tables::StSequenceRow; + use crate::db::datastore::system_tables::StTableRow; + use crate::db::datastore::system_tables::ST_INDEXES_ID; + use crate::db::datastore::system_tables::ST_SEQUENCES_ID; + use crate::db::message_log::{MessageLog, SegmentView}; + use crate::db::ostorage::sled_object_db::SledObjectDB; + use crate::db::ostorage::ObjectDB; + use crate::db::relational_db::make_default_ostorage; use crate::db::relational_db::tests_utils::make_test_db; use crate::db::relational_db::{open_db, ST_TABLES_ID}; + use crate::error::{DBError, DatabaseError, IndexError, LogReplayError}; + use crate::execution_context::ExecutionContext; + use spacetimedb_lib::{AlgebraicType, AlgebraicValue, ProductType}; use spacetimedb_sats::product; fn column(name: &str, ty: AlgebraicType) -> ColumnDef { @@ -1434,4 +1535,155 @@ mod tests { // Ok(()) // } + + #[test] + fn test_replay_corrupted_log() -> ResultTest<()> { + let tmp = TempDir::with_prefix("stdb_test")?; + let mlog_path = tmp.path().join("mlog"); + + const NUM_TRANSACTIONS: usize = 10_000; + // 64KiB should create like 11 segments + const MAX_SEGMENT_SIZE: u64 = 64 * 1024; + + let mlog = MessageLog::options() + .max_segment_size(MAX_SEGMENT_SIZE) + .open(&mlog_path) + .map(Mutex::new) + .map(Arc::new)?; + let odb = SledObjectDB::open(tmp.path().join("odb")) + .map(|odb| Box::new(odb) as Box) + .map(Mutex::new) + .map(Arc::new)?; + let reopen_db = || RelationalDB::open(tmp.path(), Some(mlog.clone()), odb.clone(), Address::zero(), false); + let db = reopen_db()?; + let ctx = ExecutionContext::default(); + + let table_id = db.with_auto_commit(&ctx, |tx| { + db.create_table( + tx, + table( + "Account", + vec![ColumnDef { + is_autoinc: true, + ..column("deposit", AlgebraicType::U64) + }], + vec![], + ), + ) + })?; + + fn balance(ctx: &ExecutionContext, db: &RelationalDB, table_id: TableId) -> ResultTest { + let balance = db.with_auto_commit(ctx, |tx| -> ResultTest { + let last = db + .iter(ctx, tx, table_id)? + .last() + .map(|row| row.view().field_as_u64(0, None)) + .transpose()? + .unwrap_or_default(); + Ok(last) + })?; + + Ok(balance) + } + + // Invalidate a segment by shrinking the file by one byte. + fn invalidate_shrink(mlog_path: &Path, segment: SegmentView) -> io::Result<()> { + let segment_file = File::options().write(true).open( + mlog_path + .join(format!("{:0>20}", segment.offset())) + .with_extension("log"), + )?; + let len = segment_file.metadata()?.len(); + eprintln!("shrink segment segment={segment:?} len={len}"); + segment_file.set_len(len - 1)?; + segment_file.sync_all() + } + + // Invalidate a segment by overwriting some portion of the file. + fn invalidate_overwrite(mlog_path: &Path, segment: SegmentView) -> io::Result<()> { + let mut segment_file = File::options().write(true).open( + mlog_path + .join(format!("{:0>20}", segment.offset())) + .with_extension("log"), + )?; + + let len = segment_file.metadata()?.len(); + let ofs = len / 2; + eprintln!("overwrite segment={segment:?} len={len} ofs={ofs}"); + segment_file.seek(SeekFrom::Start(ofs))?; + segment_file.write_all(&[255, 255, 255, 255])?; + segment_file.sync_all() + } + + // Create transactions. + for _ in 0..NUM_TRANSACTIONS { + db.with_auto_commit(&ctx, |tx| db.insert(tx, table_id, product![AlgebraicValue::U64(0)]))?; + } + assert_eq!(NUM_TRANSACTIONS as u64, balance(&ctx, &db, table_id)?); + + drop(db); + odb.lock().unwrap().sync_all()?; + mlog.lock().unwrap().sync_all()?; + + // The state must be the same after reopening the db. + let db = reopen_db()?; + assert_eq!(NUM_TRANSACTIONS as u64, balance(&ctx, &db, table_id)?); + + let total_segments = mlog.lock().unwrap().total_segments(); + assert!(total_segments > 3, "expected more than 3 segments"); + + // Close the db and pop a byte from the end of the message log. + drop(db); + let last_segment = mlog.lock().unwrap().segments().last().unwrap(); + invalidate_shrink(&mlog_path, last_segment.clone())?; + + // Assert that the final tx is lost. + let db = reopen_db()?; + assert_eq!((NUM_TRANSACTIONS - 1) as u64, balance(&ctx, &db, table_id)?); + assert_eq!( + total_segments, + mlog.lock().unwrap().total_segments(), + "no segment should have beeen removed" + ); + + // Overwrite some portion of the last segment. + drop(db); + let segment_range = Range { + start: last_segment.offset(), + end: (NUM_TRANSACTIONS - 1) as u64, + }; + invalidate_overwrite(&mlog_path, last_segment)?; + let db = reopen_db()?; + let balance = balance(&ctx, &db, table_id)?; + assert!( + segment_range.contains(&balance), + "balance {balance} should fall within {segment_range:?}" + ); + assert_eq!( + total_segments, + mlog.lock().unwrap().total_segments(), + "no segment should have beeen removed" + ); + + // Now, let's poke a segment somewhere in the middle of the log. + drop(db); + let segment = mlog.lock().unwrap().segments().nth(5).unwrap(); + invalidate_shrink(&mlog_path, segment)?; + + let res = reopen_db(); + if !matches!(res, Err(DBError::LogReplay(LogReplayError::TrailingSegments { .. }))) { + panic!("Expected replay error but got: {res:?}") + } + + // The same should happen if we overwrite instead of shrink. + let segment = mlog.lock().unwrap().segments().nth(5).unwrap(); + invalidate_overwrite(&mlog_path, segment)?; + + let res = reopen_db(); + if !matches!(res, Err(DBError::LogReplay(LogReplayError::TrailingSegments { .. }))) { + panic!("Expected replay error but got: {res:?}") + } + + Ok(()) + } } diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index b04bf116cb1..bafce05f8b1 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -1,3 +1,4 @@ +use std::io; use std::num::ParseIntError; use std::path::PathBuf; use std::sync::{MutexGuard, PoisonError}; @@ -173,6 +174,8 @@ pub enum DBError { }, #[error("SqlError: {error}, executing: `{sql}`")] Plan { sql: String, error: PlanError }, + #[error("Error replaying the commit log: {0}")] + LogReplay(#[from] LogReplayError), #[error(transparent)] Other(#[from] anyhow::Error), } @@ -212,6 +215,42 @@ impl<'a, T: ?Sized + 'a> From>> for DBE } } +#[derive(Debug, Error)] +pub enum LogReplayError { + #[error( + "Error reading segment {}/{} at commit {}: {}", + .segment_offset, + .total_segments, + .commit_offset, + .source + )] + TrailingSegments { + segment_offset: usize, + total_segments: usize, + commit_offset: u64, + #[source] + source: io::Error, + }, + #[error("Could not reset log to offset {}: {}", .offset, .source)] + Reset { + offset: u64, + #[source] + source: io::Error, + }, + #[error( + "Unexpected I/O error reading commit {} from segment {}: {}", + .commit_offset, + .segment_offset, + .source + )] + Io { + segment_offset: usize, + commit_offset: u64, + #[source] + source: io::Error, + }, +} + #[derive(Error, Debug)] pub enum NodesError { #[error("Failed to decode row: {0}")]