From 1213b2a6ec56359a83f5770d85622ee588621b7a Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Thu, 24 Oct 2024 17:12:07 +0200 Subject: [PATCH] commitlog: Improve skipping behavior of traversals The `*_from` style traversals have historically yielded commits or transactions before the given from-offset, leaving it to downstream consumers to handle skipping. While folding handles it internally, this behavior is not great for transaction iterators, due to the statefulness of decoding -- it is usually necessary to call `Decoder::skip_record` until the desired offset is found. We would also yield all commits from the start of the nearest segment boundary, which can be quite confusing when using the commit iterators directly. This patch fixes the situation by: * Setting the desired offset as the inital next offset in the `Commits` iterator, instead of the nearest segment boundary. * Looping instead of recursing in the `Commits` iterator while skipping commits, so we can skip until the initial offset without blowing the stack. * Passing the desired offset to `Commit::into_transactions`, such that `Decoder::skip_record` can be called if the offset doesn't lie on the commit boundary. --- crates/commitlog/src/commit.rs | 78 ++++++++++++-- crates/commitlog/src/commitlog.rs | 170 ++++++++++++++++-------------- crates/commitlog/src/payload.rs | 3 +- crates/commitlog/src/segment.rs | 7 +- 4 files changed, 167 insertions(+), 91 deletions(-) diff --git a/crates/commitlog/src/commit.rs b/crates/commitlog/src/commit.rs index b9b26546669..4cbab2a7975 100644 --- a/crates/commitlog/src/commit.rs +++ b/crates/commitlog/src/commit.rs @@ -123,19 +123,49 @@ impl Commit { /// /// The supplied [`Decoder`] is responsible for extracting individual /// transactions from the `records` buffer. + /// + /// `version` is the log format version of the current segment, and gets + /// passed to [`Decoder::decode_record`]. + /// + /// `from_offset` is the transaction offset within the current commit from + /// which to start decoding. That is: + /// + /// * if the tx offset within the commit is smaller than `from_offset`, + /// [`Decoder::skip_record`] is called. + /// + /// The iterator does not yield a value, unless `skip_record` returns an + /// error. + /// + /// * if the tx offset within the commit is greater of equal to `from_offset`, + /// [`Decoder::decode_record`] is called. + /// + /// The iterator yields the result of this call. + /// + /// * if `from_offset` doesn't fall into the current commit, the iterator + /// yields nothing. + /// pub fn into_transactions( self, version: u8, + from_offset: u64, de: &D, ) -> impl Iterator, D::Error>> + '_ { let records = Cursor::new(self.records); - (self.min_tx_offset..(self.min_tx_offset + self.n as u64)).scan(records, move |recs, offset| { - let mut cursor = &*recs; - let tx = de - .decode_record(version, offset, &mut cursor) - .map(|txdata| Transaction { offset, txdata }); - Some(tx) - }) + (self.min_tx_offset..(self.min_tx_offset + self.n as u64)) + .scan(records, move |recs, offset| { + let mut cursor = &*recs; + let ret = if offset < from_offset { + de.skip_record(version, offset, &mut cursor).err().map(Err) + } else { + let tx = de + .decode_record(version, offset, &mut cursor) + .map(|txdata| Transaction { offset, txdata }); + Some(tx) + }; + + Some(ret) + }) + .flatten() } } @@ -216,9 +246,10 @@ impl StoredCommit { pub fn into_transactions( self, version: u8, + from_offset: u64, de: &D, ) -> impl Iterator, D::Error>> + '_ { - Commit::from(self).into_transactions(version, de) + Commit::from(self).into_transactions(version, from_offset, de) } } @@ -272,6 +303,7 @@ mod tests { use proptest::prelude::*; use super::*; + use crate::{payload::ArrayDecoder, tests::helpers::enable_logging, DEFAULT_LOG_FORMAT_VERSION}; #[test] fn commit_roundtrip() { @@ -289,6 +321,36 @@ mod tests { assert_eq!(commit, commit2); } + #[test] + fn into_transactions_can_skip_txs() { + enable_logging(); + + let commit = Commit { + min_tx_offset: 0, + n: 4, + records: vec![0; 128], + }; + + let txs = commit + .into_transactions(DEFAULT_LOG_FORMAT_VERSION, 2, &ArrayDecoder::<32>) + .collect::, _>>() + .unwrap(); + + assert_eq!( + txs, + vec![ + Transaction { + offset: 2, + txdata: [0u8; 32] + }, + Transaction { + offset: 3, + txdata: [0; 32] + } + ] + ) + } + proptest! { #[test] fn bitflip(pos in Header::LEN..512, mask in any::()) { diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index a7282f2a88b..b8faa6634bd 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -306,7 +306,6 @@ pub fn commits_from(repo: R, max_log_format_version: u8, offset: u64) - if let Some(pos) = offsets.iter().rposition(|&off| off <= offset) { offsets = offsets.split_off(pos); } - let next_offset = offsets.first().cloned().unwrap_or(offset); let segments = Segments { offs: offsets.into_iter(), repo, @@ -315,7 +314,7 @@ pub fn commits_from(repo: R, max_log_format_version: u8, offset: u64) - Ok(Commits { inner: None, segments, - last_commit: CommitInfo::Initial { next_offset }, + last_commit: CommitInfo::Initial { next_offset: offset }, last_error: None, }) } @@ -359,10 +358,9 @@ where { commits .map(|x| x.map_err(D::Error::from)) - .map_ok(move |(version, commit)| commit.into_transactions(version, de)) + .map_ok(move |(version, commit)| commit.into_transactions(version, offset, de)) .flatten_ok() .map(|x| x.and_then(|y| y)) - .skip_while(move |x| x.as_ref().map(|tx| tx.offset < offset).unwrap_or(false)) } fn fold_transactions_internal(mut commits: CommitsWithVersion, de: D, from: u64) -> Result<(), D::Error> @@ -507,48 +505,75 @@ impl Commits { CommitsWithVersion { inner: self } } - /// Helper to handle a successfully extracted commit in [`Self::next`]. + /// Advance the current-segment iterator to yield the next commit. /// - /// Checks that the offset sequence is contiguous. - fn next_commit(&mut self, commit: StoredCommit) -> Option> { - // Pop the last error. Either we'll return it below, or it's no longer - // interesting. - let prev_error = self.last_error.take(); - - // Skip entries before the initial commit. - if self.last_commit.adjust_initial_offset(&commit) { - self.next() - // Same offset: ignore if duplicate (same crc), else report a "fork". - } else if self.last_commit.same_offset_as(&commit) { - if !self.last_commit.same_checksum_as(&commit) { - warn!( - "forked: commit={:?} last-error={:?} last-crc={:?}", - commit, - prev_error, - self.last_commit.checksum() - ); - Some(Err(error::Traversal::Forked { - offset: commit.min_tx_offset, - })) - } else { - self.next() - } - // Not the expected offset: report out-of-order. - } else if self.last_commit.expected_offset() != &commit.min_tx_offset { - warn!("out-of-order: commit={:?} last-error={:?}", commit, prev_error); - Some(Err(error::Traversal::OutOfOrder { - expected_offset: *self.last_commit.expected_offset(), - actual_offset: commit.min_tx_offset, - prev_error: prev_error.map(Box::new), - })) - // Seems legit, record info. - } else { - self.last_commit = CommitInfo::LastSeen { - tx_range: commit.tx_range(), - checksum: commit.checksum, - }; + /// Checks that the offset sequence is contiguous, and may skip commits + /// until the requested offset. + /// + /// Returns `None` if the segment iterator is exhausted or returns an error. + fn next_commit(&mut self) -> Option> { + loop { + match self.inner.as_mut()?.next()? { + Ok(commit) => { + // Pop the last error. Either we'll return it below, or it's no longer + // interesting. + let prev_error = self.last_error.take(); + + // Skip entries before the initial commit. + if self.last_commit.adjust_initial_offset(&commit) { + trace!("adjust initial offset"); + continue; + // Same offset: ignore if duplicate (same crc), else report a "fork". + } else if self.last_commit.same_offset_as(&commit) { + if !self.last_commit.same_checksum_as(&commit) { + warn!( + "forked: commit={:?} last-error={:?} last-crc={:?}", + commit, + prev_error, + self.last_commit.checksum() + ); + return Some(Err(error::Traversal::Forked { + offset: commit.min_tx_offset, + })); + } else { + trace!("ignore duplicate"); + continue; + } + // Not the expected offset: report out-of-order. + } else if self.last_commit.expected_offset() != &commit.min_tx_offset { + warn!("out-of-order: commit={:?} last-error={:?}", commit, prev_error); + return Some(Err(error::Traversal::OutOfOrder { + expected_offset: *self.last_commit.expected_offset(), + actual_offset: commit.min_tx_offset, + prev_error: prev_error.map(Box::new), + })); + // Seems legit, record info. + } else { + self.last_commit = CommitInfo::LastSeen { + tx_range: commit.tx_range(), + checksum: commit.checksum, + }; + + return Some(Ok(commit)); + } + } - Some(Ok(commit)) + Err(e) => { + warn!("error reading next commit: {e}"); + // Stop traversing this segment here. + // + // If this is just a partial write at the end of the segment, + // we may be able to obtain a commit with right offset from + // the next segment. + // + // If we don't, the error here is likely more helpful, but + // would be clobbered by `OutOfOrder`. Therefore we store it + // here. + self.set_last_error(e); + + return None; + } + } } } @@ -569,31 +594,33 @@ impl Commits { }; self.last_error = Some(last_error); } + + /// If we're still looking for the initial commit, try to use the offset + /// index to advance the segment reader. + fn try_seek_to_initial_offset(&self, segment: &mut segment::Reader) { + if let CommitInfo::Initial { next_offset } = &self.last_commit { + let _ = self + .segments + .repo + .get_offset_index(segment.min_tx_offset) + .map_err(Into::into) + .and_then(|index_file| segment.seek_to_offset(&index_file, *next_offset)) + .inspect_err(|e| { + warn!( + "commitlog offset index is not used at segment {}: {}", + segment.min_tx_offset, e + ); + }); + } + } } impl Iterator for Commits { type Item = Result; fn next(&mut self) -> Option { - if let Some(commits) = self.inner.as_mut() { - if let Some(commit) = commits.next() { - match commit { - Ok(commit) => return self.next_commit(commit), - Err(e) => { - warn!("error reading next commit: {e}"); - // Fall through to peek at next segment. - // - // If this is just a partial write at the end of a - // segment, we may be able to obtain a commit with the - // right offset from the next segment. - // - // However, the error here may be more helpful and would - // be clobbered by `OutOfOrder`, and so we store it - // until we recurse below. - self.set_last_error(e); - } - } - } + if let Some(item) = self.next_commit() { + return Some(item); } match self.segments.next() { @@ -602,22 +629,7 @@ impl Iterator for Commits { Some(segment) => segment.map_or_else( |e| Some(Err(e.into())), |mut segment| { - // Try to use offset index to advance segment to Intial commit - if let CommitInfo::Initial { next_offset } = self.last_commit { - let _ = self - .segments - .repo - .get_offset_index(segment.min_tx_offset) - .map_err(Into::into) - .and_then(|index_file| segment.seek_to_offset(&index_file, next_offset)) - .inspect_err(|e| { - warn!( - "commitlog offset index is not used: {}, at: segment {}", - e, segment.min_tx_offset - ); - }); - } - + self.try_seek_to_initial_offset(&mut segment); self.inner = Some(segment.commits()); self.next() }, diff --git a/crates/commitlog/src/payload.rs b/crates/commitlog/src/payload.rs index 82610ad76da..5661eba672f 100644 --- a/crates/commitlog/src/payload.rs +++ b/crates/commitlog/src/payload.rs @@ -124,7 +124,6 @@ impl Decoder for ArrayDecoder { tx_offset: u64, reader: &mut R, ) -> Result<(), Self::Error> { - self.decode_record(version, tx_offset, reader)?; - Ok(()) + self.decode_record(version, tx_offset, reader).map(drop) } } diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index 5c6202d98c8..8405cc23557 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -353,9 +353,12 @@ impl Reader { self.commits() .with_log_format_version() .map(|x| x.map_err(Into::into)) - .map_ok(move |(version, commit)| commit.into_transactions(version, de)) - .flatten_ok() + .map_ok(move |(version, commit)| { + let start = commit.min_tx_offset; + commit.into_transactions(version, start, de) + }) .flatten_ok() + .map(|x| x.and_then(|y| y)) } #[cfg(test)]