diff --git a/crates/core/src/db/commit_log.rs b/crates/core/src/db/commit_log.rs index 227fa8f2ee..7b165b11c2 100644 --- a/crates/core/src/db/commit_log.rs +++ b/crates/core/src/db/commit_log.rs @@ -436,13 +436,13 @@ mod tests { // No slicing yet, so offsets on segment boundaries yield an additional // COMMITS_PER_SEGMENT. - let commits = view.iter_from(20_001).map(Result::unwrap).count(); + let commits = view.iter_from(20_000).map(Result::unwrap).count(); assert_eq!(9999, commits); - let commits = view.iter_from(10_001).map(Result::unwrap).count(); + let commits = view.iter_from(10_000).map(Result::unwrap).count(); assert_eq!(19_999, commits); - let commits = view.iter_from(10_000).map(Result::unwrap).count(); + let commits = view.iter_from(9_999).map(Result::unwrap).count(); assert_eq!(29_999, commits); } } diff --git a/crates/core/src/db/message_log.rs b/crates/core/src/db/message_log.rs index f42f8b32e6..bd9c322ada 100644 --- a/crates/core/src/db/message_log.rs +++ b/crates/core/src/db/message_log.rs @@ -210,7 +210,7 @@ impl MessageLog { if end_size > self.options.max_segment_size { self.flush()?; self.segments.push(Segment { - min_offset: self.open_segment_max_offset + 1, + min_offset: self.open_segment_max_offset, size: 0, }); @@ -359,7 +359,7 @@ impl MessageLog { /// /// [POSIX]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/V2_chap02.html#tag_15_09_07 pub fn reset_to(&mut self, offset: u64) -> io::Result<()> { - log::debug!("Resetting message log to offset {offset}"); + log::info!("Resetting message log to offset {offset}"); if offset == 0 { fs::remove_dir_all(&self.root)?; *self = self @@ -391,28 +391,25 @@ impl MessageLog { file: BufReader::new(file), }; - let to_retain = self.open_segment_max_offset - offset; - let mut retained = 0; - for message in iter.by_ref().take(to_retain as usize) { + log::debug!("segment min_offset={}", segment.min_offset); + for i in segment.min_offset..offset { + let message = iter.by_ref().next().ok_or_else(|| { + io::Error::new( + io::ErrorKind::UnexpectedEof, + format!("Open segment shorter than expected: {i}"), + ) + })?; // Give up on I/O errors while reading the next message. let _ = message?; - retained += 1; } - // We maintain that: - // - // segment.min_offset <= offset <= self.open_segment_max_offset - // - // `iter` yielding fewer elements thus breaks our invariants. - assert_eq!( - to_retain, retained, - "Open segment shorter than expected: {retained} instead of {to_retain}" - ); - segment.size - iter.bytes_read() + iter.bytes_read() }; + log::debug!("new segment size {new_segment_size}"); // Truncate file to byte offset. let mut file = File::options().read(true).write(true).open(path)?; file.set_len(new_segment_size)?; + file.sync_all()?; file.seek(SeekFrom::End(0))?; self.total_size -= segment.size; @@ -488,9 +485,11 @@ impl TryFrom for IterSegment { fn try_from(view: SegmentView) -> Result { let segment = view.offset(); - File::try_from(view) - .map(BufReader::new) - .map(|file| IterSegment { segment, read: 0, file }) + File::try_from(view).map(|file| IterSegment { + segment, + read: 0, + file: BufReader::new(file), + }) } } @@ -592,7 +591,7 @@ mod tests { use std::path::Path; - use super::MessageLog; + use super::{MessageLog, Segment, HEADER_SIZE}; use spacetimedb_lib::error::ResultTest; use tempfile::{self, TempDir}; @@ -663,13 +662,16 @@ mod tests { let segments = message_log.segments_from(1_000_000).count(); assert_eq!(0, segments); - let segments = message_log.segments_from(20_001).count(); + let segments = message_log.segments_from(20_000).count(); assert_eq!(1, segments); - let segments = message_log.segments_from(10_001).count(); + let segments = message_log.segments_from(10_000).count(); assert_eq!(2, segments); - let segments = message_log.segments_from(10_000).count(); + let segments = message_log.segments_from(9_999).count(); + assert_eq!(3, segments); + + let segments = message_log.segments_from(0).count(); assert_eq!(3, segments); Ok(()) @@ -704,10 +706,25 @@ mod tests { const SEGMENTS: usize = 3; const MESSAGES_PER_SEGMENT: usize = 10_000; + #[derive(Debug)] + struct Snapshot { + segments: Vec, + total_size: u64, + max_offset: u64, + } + + impl From<&MessageLog> for Snapshot { + fn from(mlog: &MessageLog) -> Self { + Self { + segments: mlog.segments.clone(), + total_size: mlog.total_size, + max_offset: mlog.open_segment_max_offset, + } + } + } + fn go(mlog: &mut MessageLog, offset: u64) { - let last_segments_len = mlog.segments.len(); - let last_max_offset = mlog.open_segment_max_offset; - let last_open_segment_size = mlog.open_segment().size; + let snapshot = Snapshot::from(&*mlog); mlog.reset_to(offset).unwrap(); assert_eq!( @@ -718,7 +735,20 @@ mod tests { assert_eq!( mlog.total_size, mlog.segments.iter().map(|s| s.size).sum::(), - "total size must be the sum of segment sizes\n{:#?}", + "reset to {}: total size must be the sum of segment sizes\n{:#?}", + offset, + mlog + ); + let entries_delta = snapshot.max_offset - offset; + let size_delta = entries_delta * (MESSAGE.len() + HEADER_SIZE) as u64; + assert_eq!( + mlog.total_size, + snapshot.total_size - size_delta, + "reset to {}: size should have been reduced by {} entries / {} bytes\n{:#?}\n{:#?}", + offset, + entries_delta, + size_delta, + snapshot, mlog ); @@ -726,28 +756,32 @@ mod tests { assert_eq!(1, mlog.segments.len(), "one segment must exist"); assert_eq!(0, mlog.open_segment().min_offset); assert_eq!(0, mlog.open_segment().size) - } else { - let on_segment_boundary = (offset % MESSAGES_PER_SEGMENT as u64) == 0; - if !on_segment_boundary { - let entries_delta = last_max_offset - offset; - let size_delta = entries_delta * (MESSAGE.len() + super::HEADER_SIZE) as u64; - assert_eq!( - mlog.open_segment().size, - last_open_segment_size - size_delta, - "open segment should have been truncated by {} entries, {} bytes\n{:#?}", - entries_delta, - size_delta, - mlog - ); - } else { - assert_eq!( - last_segments_len - 1, - mlog.segments.len(), - "last segment should be gone\n{:#?}", - mlog - ); - } } + + let retained_segments = snapshot.segments.iter().filter(|segment| segment.min_offset <= offset); + let retained_segments_count = retained_segments.clone().count(); + assert_eq!( + mlog.segments.len(), + retained_segments_count, + "reset to {}: {} segments should have been retained\n{:#?}\n{:#?}", + offset, + retained_segments_count, + snapshot, + mlog + ); + + let last_segment_size = retained_segments + .take(retained_segments_count - 1) + .fold(mlog.total_size, |acc, segment| acc - segment.size); + assert_eq!( + mlog.segments.last().unwrap().size, + last_segment_size, + "reset to {}: last segment should have size {}\n{:#?}\n{:#?}", + offset, + last_segment_size, + snapshot, + mlog, + ); } let mut mlog = fill_log(tmp.path(), SEGMENTS, MESSAGES_PER_SEGMENT, MESSAGE)?;