From 31698618a81be8de65a14d6f2e706d95064b2cc0 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Tue, 10 Dec 2024 11:43:39 +0100 Subject: [PATCH] commitlog: Provide `segment_len` method for segments (#2042) --- crates/commitlog/src/repo/fs.rs | 17 +++++++++++++++-- crates/commitlog/src/repo/mem.rs | 13 +++++++++++-- crates/commitlog/src/repo/mod.rs | 17 ++++++++++++++++- crates/commitlog/src/tests/partial.rs | 8 +++++++- 4 files changed, 49 insertions(+), 6 deletions(-) diff --git a/crates/commitlog/src/repo/fs.rs b/crates/commitlog/src/repo/fs.rs index 424a8f1e967..b767fd52bad 100644 --- a/crates/commitlog/src/repo/fs.rs +++ b/crates/commitlog/src/repo/fs.rs @@ -1,10 +1,10 @@ use std::fs::{self, File}; -use std::io; +use std::io::{self, Seek}; use log::{debug, warn}; use spacetimedb_paths::server::{CommitLogDir, SegmentFile}; -use super::{Repo, TxOffset, TxOffsetIndex, TxOffsetIndexMut}; +use super::{Repo, Segment, TxOffset, TxOffsetIndex, TxOffsetIndexMut}; const SEGMENT_FILE_EXT: &str = ".stdb.log"; @@ -57,6 +57,19 @@ impl Fs { } } +impl Segment for File { + fn segment_len(&mut self) -> io::Result { + let old_pos = self.stream_position()?; + let len = self.seek(io::SeekFrom::End(0))?; + // If we're already at the end of the file, avoid seeking. + if old_pos != len { + self.seek(io::SeekFrom::Start(old_pos))?; + } + + Ok(len) + } +} + impl Repo for Fs { type Segment = File; diff --git a/crates/commitlog/src/repo/mem.rs b/crates/commitlog/src/repo/mem.rs index 06efbf3868b..0d61dfdf673 100644 --- a/crates/commitlog/src/repo/mem.rs +++ b/crates/commitlog/src/repo/mem.rs @@ -28,6 +28,7 @@ impl Segment { pub fn len(&self) -> usize { self.buf.read().unwrap().len() } + pub fn is_empty(&self) -> bool { self.len() == 0 } @@ -39,6 +40,12 @@ impl From for Segment { } } +impl super::Segment for Segment { + fn segment_len(&mut self) -> io::Result { + Ok(Segment::len(self) as u64) + } +} + impl FileLike for Segment { fn fsync(&mut self) -> io::Result<()> { Ok(()) @@ -118,8 +125,10 @@ impl Repo for Memory { let mut inner = self.0.write().unwrap(); match inner.entry(offset) { btree_map::Entry::Occupied(entry) => { - if entry.get().read().unwrap().len() == 0 { - Ok(Segment::from(Arc::clone(entry.get()))) + let entry = entry.get(); + let read_guard = entry.read().unwrap(); + if read_guard.len() == 0 { + Ok(Segment::from(Arc::clone(entry))) } else { Err(io::Error::new( io::ErrorKind::AlreadyExists, diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 59eaa079384..be045a632e8 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -22,13 +22,28 @@ pub type TxOffset = u64; pub type TxOffsetIndexMut = IndexFileMut; pub type TxOffsetIndex = IndexFile; +pub trait Segment: FileLike + io::Read + io::Write + io::Seek + Send + Sync { + /// Determine the length in bytes of the segment. + /// + /// This method does not rely on metadata `fsync`, and may use up to three + /// `seek` operations. + /// + /// If the method returns successfully, the seek position before the call is + /// restored. However, if it returns an error, the seek position is + /// unspecified. + // + // TODO: Replace with `Seek::stream_len` if / when stabilized: + // https://github.com/rust-lang/rust/issues/59359 + fn segment_len(&mut self) -> io::Result; +} + /// A repository of log segments. /// /// This is mainly an internal trait to allow testing against an in-memory /// representation. pub trait Repo: Clone { /// The type of log segments managed by this repo, which must behave like a file. - type Segment: io::Read + io::Write + FileLike + io::Seek + Send + Sync + 'static; + type Segment: Segment + 'static; /// Create a new segment with the minimum transaction offset `offset`. /// diff --git a/crates/commitlog/src/tests/partial.rs b/crates/commitlog/src/tests/partial.rs index cb883510a49..44351e5dc0c 100644 --- a/crates/commitlog/src/tests/partial.rs +++ b/crates/commitlog/src/tests/partial.rs @@ -9,7 +9,7 @@ use log::debug; use crate::{ commitlog, error, payload, - repo::{self, Repo}, + repo::{self, Repo, Segment}, segment::FileLike, tests::helpers::enable_logging, Commit, Encode, Options, DEFAULT_LOG_FORMAT_VERSION, @@ -160,6 +160,12 @@ struct ShortSegment { max_len: u64, } +impl Segment for ShortSegment { + fn segment_len(&mut self) -> io::Result { + self.inner.segment_len() + } +} + impl FileLike for ShortSegment { fn fsync(&mut self) -> std::io::Result<()> { self.inner.fsync()