Skip to content

Commit

Permalink
core: Fix off-by-one in message log
Browse files Browse the repository at this point in the history
When rotating a segment, the max_offset got incremented, leading to a
gap of one between the segment's min offset and the offset of the first
commit (when viewed as a commit log).

This means that offset calculations for truncation or suffix iterators
were off by one.
  • Loading branch information
kim committed Nov 13, 2023
1 parent fa94402 commit fa0d2e5
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 51 deletions.
6 changes: 3 additions & 3 deletions crates/core/src/db/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,13 +436,13 @@ mod tests {

// No slicing yet, so offsets on segment boundaries yield an additional
// COMMITS_PER_SEGMENT.
let commits = view.iter_from(20_001).map(Result::unwrap).count();
let commits = view.iter_from(20_000).map(Result::unwrap).count();
assert_eq!(9999, commits);

let commits = view.iter_from(10_001).map(Result::unwrap).count();
let commits = view.iter_from(10_000).map(Result::unwrap).count();
assert_eq!(19_999, commits);

let commits = view.iter_from(10_000).map(Result::unwrap).count();
let commits = view.iter_from(9_999).map(Result::unwrap).count();
assert_eq!(29_999, commits);
}
}
130 changes: 82 additions & 48 deletions crates/core/src/db/message_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl MessageLog {
if end_size > self.options.max_segment_size {
self.flush()?;
self.segments.push(Segment {
min_offset: self.open_segment_max_offset + 1,
min_offset: self.open_segment_max_offset,
size: 0,
});

Expand Down Expand Up @@ -359,7 +359,7 @@ impl MessageLog {
///
/// [POSIX]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/V2_chap02.html#tag_15_09_07
pub fn reset_to(&mut self, offset: u64) -> io::Result<()> {
log::debug!("Resetting message log to offset {offset}");
log::info!("Resetting message log to offset {offset}");
if offset == 0 {
fs::remove_dir_all(&self.root)?;
*self = self
Expand Down Expand Up @@ -391,28 +391,25 @@ impl MessageLog {
file: BufReader::new(file),
};

let to_retain = self.open_segment_max_offset - offset;
let mut retained = 0;
for message in iter.by_ref().take(to_retain as usize) {
log::debug!("segment min_offset={}", segment.min_offset);
for i in segment.min_offset..offset {
let message = iter.by_ref().next().ok_or_else(|| {
io::Error::new(
io::ErrorKind::UnexpectedEof,
format!("Open segment shorter than expected: {i}"),
)
})?;
// Give up on I/O errors while reading the next message.
let _ = message?;
retained += 1;
}
// We maintain that:
//
// segment.min_offset <= offset <= self.open_segment_max_offset
//
// `iter` yielding fewer elements thus breaks our invariants.
assert_eq!(
to_retain, retained,
"Open segment shorter than expected: {retained} instead of {to_retain}"
);
segment.size - iter.bytes_read()
iter.bytes_read()
};

log::debug!("new segment size {new_segment_size}");
// Truncate file to byte offset.
let mut file = File::options().read(true).write(true).open(path)?;
file.set_len(new_segment_size)?;
file.sync_all()?;
file.seek(SeekFrom::End(0))?;

self.total_size -= segment.size;
Expand Down Expand Up @@ -488,9 +485,11 @@ impl TryFrom<SegmentView> for IterSegment {

fn try_from(view: SegmentView) -> Result<Self, Self::Error> {
let segment = view.offset();
File::try_from(view)
.map(BufReader::new)
.map(|file| IterSegment { segment, read: 0, file })
File::try_from(view).map(|file| IterSegment {
segment,
read: 0,
file: BufReader::new(file),
})
}
}

Expand Down Expand Up @@ -592,7 +591,7 @@ mod tests {

use std::path::Path;

use super::MessageLog;
use super::{MessageLog, Segment, HEADER_SIZE};
use spacetimedb_lib::error::ResultTest;
use tempfile::{self, TempDir};

Expand Down Expand Up @@ -663,13 +662,16 @@ mod tests {
let segments = message_log.segments_from(1_000_000).count();
assert_eq!(0, segments);

let segments = message_log.segments_from(20_001).count();
let segments = message_log.segments_from(20_000).count();
assert_eq!(1, segments);

let segments = message_log.segments_from(10_001).count();
let segments = message_log.segments_from(10_000).count();
assert_eq!(2, segments);

let segments = message_log.segments_from(10_000).count();
let segments = message_log.segments_from(9_999).count();
assert_eq!(3, segments);

let segments = message_log.segments_from(0).count();
assert_eq!(3, segments);

Ok(())
Expand Down Expand Up @@ -704,10 +706,25 @@ mod tests {
const SEGMENTS: usize = 3;
const MESSAGES_PER_SEGMENT: usize = 10_000;

#[derive(Debug)]
struct Snapshot {
segments: Vec<Segment>,
total_size: u64,
max_offset: u64,
}

impl From<&MessageLog> for Snapshot {
fn from(mlog: &MessageLog) -> Self {
Self {
segments: mlog.segments.clone(),
total_size: mlog.total_size,
max_offset: mlog.open_segment_max_offset,
}
}
}

fn go(mlog: &mut MessageLog, offset: u64) {
let last_segments_len = mlog.segments.len();
let last_max_offset = mlog.open_segment_max_offset;
let last_open_segment_size = mlog.open_segment().size;
let snapshot = Snapshot::from(&*mlog);

mlog.reset_to(offset).unwrap();
assert_eq!(
Expand All @@ -718,36 +735,53 @@ mod tests {
assert_eq!(
mlog.total_size,
mlog.segments.iter().map(|s| s.size).sum::<u64>(),
"total size must be the sum of segment sizes\n{:#?}",
"reset to {}: total size must be the sum of segment sizes\n{:#?}",
offset,
mlog
);
let entries_delta = snapshot.max_offset - offset;
let size_delta = entries_delta * (MESSAGE.len() + HEADER_SIZE) as u64;
assert_eq!(
mlog.total_size,
snapshot.total_size - size_delta,
"reset to {}: size should have been reduced by {} entries / {} bytes\n{:#?}\n{:#?}",
offset,
entries_delta,
size_delta,
snapshot,
mlog
);

if offset == 0 {
assert_eq!(1, mlog.segments.len(), "one segment must exist");
assert_eq!(0, mlog.open_segment().min_offset);
assert_eq!(0, mlog.open_segment().size)
} else {
let on_segment_boundary = (offset % MESSAGES_PER_SEGMENT as u64) == 0;
if !on_segment_boundary {
let entries_delta = last_max_offset - offset;
let size_delta = entries_delta * (MESSAGE.len() + super::HEADER_SIZE) as u64;
assert_eq!(
mlog.open_segment().size,
last_open_segment_size - size_delta,
"open segment should have been truncated by {} entries, {} bytes\n{:#?}",
entries_delta,
size_delta,
mlog
);
} else {
assert_eq!(
last_segments_len - 1,
mlog.segments.len(),
"last segment should be gone\n{:#?}",
mlog
);
}
}

let retained_segments = snapshot.segments.iter().filter(|segment| segment.min_offset <= offset);
let retained_segments_count = retained_segments.clone().count();
assert_eq!(
mlog.segments.len(),
retained_segments_count,
"reset to {}: {} segments should have been retained\n{:#?}\n{:#?}",
offset,
retained_segments_count,
snapshot,
mlog
);

let last_segment_size = retained_segments
.take(retained_segments_count - 1)
.fold(mlog.total_size, |acc, segment| acc - segment.size);
assert_eq!(
mlog.segments.last().unwrap().size,
last_segment_size,
"reset to {}: last segment should have size {}\n{:#?}\n{:#?}",
offset,
last_segment_size,
snapshot,
mlog,
);
}

let mut mlog = fill_log(tmp.path(), SEGMENTS, MESSAGES_PER_SEGMENT, MESSAGE)?;
Expand Down

0 comments on commit fa0d2e5

Please sign in to comment.