Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

commitlog: Improve skipping behavior of traversals #1902

Merged
merged 1 commit into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 70 additions & 8 deletions crates/commitlog/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,49 @@ impl Commit {
///
/// The supplied [`Decoder`] is responsible for extracting individual
/// transactions from the `records` buffer.
///
/// `version` is the log format version of the current segment, and gets
/// passed to [`Decoder::decode_record`].
///
/// `from_offset` is the transaction offset within the current commit from
/// which to start decoding. That is:
///
/// * if the tx offset within the commit is smaller than `from_offset`,
/// [`Decoder::skip_record`] is called.
///
/// The iterator does not yield a value, unless `skip_record` returns an
/// error.
///
/// * if the tx offset within the commit is greater of equal to `from_offset`,
/// [`Decoder::decode_record`] is called.
///
/// The iterator yields the result of this call.
///
/// * if `from_offset` doesn't fall into the current commit, the iterator
/// yields nothing.
///
pub fn into_transactions<D: Decoder>(
self,
version: u8,
from_offset: u64,
de: &D,
) -> impl Iterator<Item = Result<Transaction<D::Record>, D::Error>> + '_ {
let records = Cursor::new(self.records);
(self.min_tx_offset..(self.min_tx_offset + self.n as u64)).scan(records, move |recs, offset| {
let mut cursor = &*recs;
let tx = de
.decode_record(version, offset, &mut cursor)
.map(|txdata| Transaction { offset, txdata });
Some(tx)
})
(self.min_tx_offset..(self.min_tx_offset + self.n as u64))
.scan(records, move |recs, offset| {
let mut cursor = &*recs;
let ret = if offset < from_offset {
de.skip_record(version, offset, &mut cursor).err().map(Err)
} else {
let tx = de
.decode_record(version, offset, &mut cursor)
.map(|txdata| Transaction { offset, txdata });
Some(tx)
};

Some(ret)
})
.flatten()
}
}

Expand Down Expand Up @@ -216,9 +246,10 @@ impl StoredCommit {
pub fn into_transactions<D: Decoder>(
self,
version: u8,
from_offset: u64,
de: &D,
) -> impl Iterator<Item = Result<Transaction<D::Record>, D::Error>> + '_ {
Commit::from(self).into_transactions(version, de)
Commit::from(self).into_transactions(version, from_offset, de)
}
}

Expand Down Expand Up @@ -272,6 +303,7 @@ mod tests {
use proptest::prelude::*;

use super::*;
use crate::{payload::ArrayDecoder, tests::helpers::enable_logging, DEFAULT_LOG_FORMAT_VERSION};

#[test]
fn commit_roundtrip() {
Expand All @@ -289,6 +321,36 @@ mod tests {
assert_eq!(commit, commit2);
}

#[test]
fn into_transactions_can_skip_txs() {
enable_logging();

let commit = Commit {
min_tx_offset: 0,
n: 4,
records: vec![0; 128],
};

let txs = commit
.into_transactions(DEFAULT_LOG_FORMAT_VERSION, 2, &ArrayDecoder::<32>)
.collect::<Result<Vec<_>, _>>()
.unwrap();

assert_eq!(
txs,
vec![
Transaction {
offset: 2,
txdata: [0u8; 32]
},
Transaction {
offset: 3,
txdata: [0; 32]
}
]
)
}

proptest! {
#[test]
fn bitflip(pos in Header::LEN..512, mask in any::<NonZeroU8>()) {
Expand Down
170 changes: 91 additions & 79 deletions crates/commitlog/src/commitlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,6 @@ pub fn commits_from<R: Repo>(repo: R, max_log_format_version: u8, offset: u64) -
if let Some(pos) = offsets.iter().rposition(|&off| off <= offset) {
offsets = offsets.split_off(pos);
}
let next_offset = offsets.first().cloned().unwrap_or(offset);
let segments = Segments {
offs: offsets.into_iter(),
repo,
Expand All @@ -315,7 +314,7 @@ pub fn commits_from<R: Repo>(repo: R, max_log_format_version: u8, offset: u64) -
Ok(Commits {
inner: None,
segments,
last_commit: CommitInfo::Initial { next_offset },
last_commit: CommitInfo::Initial { next_offset: offset },
last_error: None,
})
}
Expand Down Expand Up @@ -359,10 +358,9 @@ where
{
commits
.map(|x| x.map_err(D::Error::from))
.map_ok(move |(version, commit)| commit.into_transactions(version, de))
.map_ok(move |(version, commit)| commit.into_transactions(version, offset, de))
.flatten_ok()
.map(|x| x.and_then(|y| y))
.skip_while(move |x| x.as_ref().map(|tx| tx.offset < offset).unwrap_or(false))
}

fn fold_transactions_internal<R, D>(mut commits: CommitsWithVersion<R>, de: D, from: u64) -> Result<(), D::Error>
Expand Down Expand Up @@ -507,48 +505,75 @@ impl<R: Repo> Commits<R> {
CommitsWithVersion { inner: self }
}

/// Helper to handle a successfully extracted commit in [`Self::next`].
/// Advance the current-segment iterator to yield the next commit.
///
/// Checks that the offset sequence is contiguous.
fn next_commit(&mut self, commit: StoredCommit) -> Option<Result<StoredCommit, error::Traversal>> {
// Pop the last error. Either we'll return it below, or it's no longer
// interesting.
let prev_error = self.last_error.take();

// Skip entries before the initial commit.
if self.last_commit.adjust_initial_offset(&commit) {
self.next()
// Same offset: ignore if duplicate (same crc), else report a "fork".
} else if self.last_commit.same_offset_as(&commit) {
if !self.last_commit.same_checksum_as(&commit) {
warn!(
"forked: commit={:?} last-error={:?} last-crc={:?}",
commit,
prev_error,
self.last_commit.checksum()
);
Some(Err(error::Traversal::Forked {
offset: commit.min_tx_offset,
}))
} else {
self.next()
}
// Not the expected offset: report out-of-order.
} else if self.last_commit.expected_offset() != &commit.min_tx_offset {
warn!("out-of-order: commit={:?} last-error={:?}", commit, prev_error);
Some(Err(error::Traversal::OutOfOrder {
expected_offset: *self.last_commit.expected_offset(),
actual_offset: commit.min_tx_offset,
prev_error: prev_error.map(Box::new),
}))
// Seems legit, record info.
} else {
self.last_commit = CommitInfo::LastSeen {
tx_range: commit.tx_range(),
checksum: commit.checksum,
};
/// Checks that the offset sequence is contiguous, and may skip commits
/// until the requested offset.
///
/// Returns `None` if the segment iterator is exhausted or returns an error.
fn next_commit(&mut self) -> Option<Result<StoredCommit, error::Traversal>> {
loop {
match self.inner.as_mut()?.next()? {
Ok(commit) => {
// Pop the last error. Either we'll return it below, or it's no longer
// interesting.
let prev_error = self.last_error.take();

// Skip entries before the initial commit.
if self.last_commit.adjust_initial_offset(&commit) {
trace!("adjust initial offset");
continue;
// Same offset: ignore if duplicate (same crc), else report a "fork".
} else if self.last_commit.same_offset_as(&commit) {
if !self.last_commit.same_checksum_as(&commit) {
warn!(
"forked: commit={:?} last-error={:?} last-crc={:?}",
commit,
prev_error,
self.last_commit.checksum()
);
return Some(Err(error::Traversal::Forked {
offset: commit.min_tx_offset,
}));
} else {
trace!("ignore duplicate");
continue;
}
// Not the expected offset: report out-of-order.
} else if self.last_commit.expected_offset() != &commit.min_tx_offset {
warn!("out-of-order: commit={:?} last-error={:?}", commit, prev_error);
return Some(Err(error::Traversal::OutOfOrder {
expected_offset: *self.last_commit.expected_offset(),
actual_offset: commit.min_tx_offset,
prev_error: prev_error.map(Box::new),
}));
// Seems legit, record info.
} else {
self.last_commit = CommitInfo::LastSeen {
tx_range: commit.tx_range(),
checksum: commit.checksum,
};

return Some(Ok(commit));
}
}

Some(Ok(commit))
Err(e) => {
warn!("error reading next commit: {e}");
// Stop traversing this segment here.
//
// If this is just a partial write at the end of the segment,
// we may be able to obtain a commit with right offset from
// the next segment.
//
// If we don't, the error here is likely more helpful, but
// would be clobbered by `OutOfOrder`. Therefore we store it
// here.
self.set_last_error(e);

return None;
}
}
}
}

Expand All @@ -569,31 +594,33 @@ impl<R: Repo> Commits<R> {
};
self.last_error = Some(last_error);
}

/// If we're still looking for the initial commit, try to use the offset
/// index to advance the segment reader.
fn try_seek_to_initial_offset(&self, segment: &mut segment::Reader<R::Segment>) {
if let CommitInfo::Initial { next_offset } = &self.last_commit {
let _ = self
.segments
.repo
.get_offset_index(segment.min_tx_offset)
.map_err(Into::into)
.and_then(|index_file| segment.seek_to_offset(&index_file, *next_offset))
.inspect_err(|e| {
warn!(
"commitlog offset index is not used at segment {}: {}",
segment.min_tx_offset, e
);
});
}
}
}

impl<R: Repo> Iterator for Commits<R> {
type Item = Result<StoredCommit, error::Traversal>;

fn next(&mut self) -> Option<Self::Item> {
if let Some(commits) = self.inner.as_mut() {
if let Some(commit) = commits.next() {
match commit {
Ok(commit) => return self.next_commit(commit),
Err(e) => {
warn!("error reading next commit: {e}");
// Fall through to peek at next segment.
//
// If this is just a partial write at the end of a
// segment, we may be able to obtain a commit with the
// right offset from the next segment.
//
// However, the error here may be more helpful and would
// be clobbered by `OutOfOrder`, and so we store it
// until we recurse below.
self.set_last_error(e);
}
}
}
if let Some(item) = self.next_commit() {
return Some(item);
}

match self.segments.next() {
Expand All @@ -602,22 +629,7 @@ impl<R: Repo> Iterator for Commits<R> {
Some(segment) => segment.map_or_else(
|e| Some(Err(e.into())),
|mut segment| {
// Try to use offset index to advance segment to Intial commit
if let CommitInfo::Initial { next_offset } = self.last_commit {
let _ = self
.segments
.repo
.get_offset_index(segment.min_tx_offset)
.map_err(Into::into)
.and_then(|index_file| segment.seek_to_offset(&index_file, next_offset))
.inspect_err(|e| {
warn!(
"commitlog offset index is not used: {}, at: segment {}",
e, segment.min_tx_offset
);
});
}

self.try_seek_to_initial_offset(&mut segment);
self.inner = Some(segment.commits());
self.next()
},
Expand Down
3 changes: 1 addition & 2 deletions crates/commitlog/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ impl<const N: usize> Decoder for ArrayDecoder<N> {
tx_offset: u64,
reader: &mut R,
) -> Result<(), Self::Error> {
self.decode_record(version, tx_offset, reader)?;
Ok(())
self.decode_record(version, tx_offset, reader).map(drop)
}
}
7 changes: 5 additions & 2 deletions crates/commitlog/src/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,12 @@ impl<R: io::Read + io::Seek> Reader<R> {
self.commits()
.with_log_format_version()
.map(|x| x.map_err(Into::into))
.map_ok(move |(version, commit)| commit.into_transactions(version, de))
.flatten_ok()
.map_ok(move |(version, commit)| {
let start = commit.min_tx_offset;
commit.into_transactions(version, start, de)
})
.flatten_ok()
.map(|x| x.and_then(|y| y))
}

#[cfg(test)]
Expand Down
Loading