diff --git a/node/src/block_ranges.rs b/node/src/block_ranges.rs new file mode 100644 index 00000000..0d37afaa --- /dev/null +++ b/node/src/block_ranges.rs @@ -0,0 +1,982 @@ +//! Ranges utilities + +use std::borrow::Borrow; +use std::fmt::{Debug, Display}; +use std::ops::{Add, RangeInclusive, Sub}; + +use serde::Serialize; +use smallvec::SmallVec; + +/// Type alias to `RangeInclusive`. +pub type BlockRange = RangeInclusive; + +/// Errors that can be produced by `BlockRanges`. +#[derive(Debug, thiserror::Error, PartialEq, Eq)] +pub enum BlockRangesError { + /// Block ranges must be sorted. + #[error("Block ranges are not sorted")] + UnsortedBlockRanges, + + /// Not a valid block range. + #[error("Invalid block range: {}", .0.display())] + InvalidBlockRange(BlockRange), + + /// Provided range overlaps with another one. + #[error("Insertion constrain not satisfied: Provided range ({}) overlaps with {}", .0.display(), .1.display())] + BlockRangeOverlap(BlockRange, BlockRange), + + /// Provided range do not have any adjacent neightbors. + #[error( + "Insertion constrain not satified: Provided range ({}) do not have any adjacent neighbors", .0.display() + )] + NoAdjacentNeighbors(BlockRange), +} + +type Result = std::result::Result; + +pub(crate) trait BlockRangeExt { + fn display(&self) -> BlockRangeDisplay; + fn validate(&self) -> Result<()>; + fn len(&self) -> u64; + fn is_adjacent(&self, other: &BlockRange) -> bool; + fn is_overlapping(&self, other: &BlockRange) -> bool; + fn left_of(&self, other: &BlockRange) -> bool; +} + +pub(crate) struct BlockRangeDisplay<'a>(&'a RangeInclusive); + +impl<'a> Display for BlockRangeDisplay<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}-{}", self.0.start(), self.0.end()) + } +} + +impl BlockRangeExt for BlockRange { + fn display(&self) -> BlockRangeDisplay { + BlockRangeDisplay(self) + } + + fn validate(&self) -> Result<()> { + if *self.start() > 0 && self.start() <= self.end() { + Ok(()) + } else { + Err(BlockRangesError::InvalidBlockRange(self.to_owned())) + } + } + + fn len(&self) -> u64 { + match self.end().checked_sub(*self.start()) { + Some(difference) => difference + 1, + None => 0, + } + } + + fn is_adjacent(&self, other: &BlockRange) -> bool { + debug_assert!(self.validate().is_ok()); + debug_assert!(other.validate().is_ok()); + + // End of `self` touches start of `other` + // + // self: |------| + // other: |------| + if *self.end() == other.start().saturating_sub(1) { + return true; + } + + // Start of `self` touches end of `other` + // + // self: |------| + // other: |------| + if self.start().saturating_sub(1) == *other.end() { + return true; + } + + false + } + + fn is_overlapping(&self, other: &BlockRange) -> bool { + debug_assert!(self.validate().is_ok()); + debug_assert!(other.validate().is_ok()); + + // `self` overlaps with start of `other` + // + // self: |------| + // other: |------| + if self.start() < other.start() && other.contains(self.end()) { + return true; + } + + // `self` overlaps with end of `other` + // + // self: |------| + // other: |------| + if self.end() > other.end() && other.contains(self.start()) { + return true; + } + + // `self` is subset of `other` + // + // self: |--| + // other: |------| + if self.start() >= other.start() && self.end() <= other.end() { + return true; + } + + // `self` is superset of `other` + // + // self: |------| + // other: |--| + if self.start() <= other.start() && self.end() >= other.end() { + return true; + } + + false + } + + /// Returns `true` if the whole range of `self` is on the left of `other`. + fn left_of(&self, other: &BlockRange) -> bool { + debug_assert!(self.validate().is_ok()); + debug_assert!(other.validate().is_ok()); + self.end() < other.start() + } +} + +/// Represents possibly multiple non-overlapping, sorted ranges of header heights +#[derive(Debug, Clone, PartialEq, Default, Serialize)] +#[serde(transparent)] +pub struct BlockRanges(SmallVec<[BlockRange; 2]>); + +/// Custom `Deserialize` that validates `BlockRanges`. +impl<'de> serde::Deserialize<'de> for BlockRanges { + fn deserialize(deserializer: D) -> Result + where + D: serde::de::Deserializer<'de>, + { + let raw_ranges = SmallVec::<[RangeInclusive; 2]>::deserialize(deserializer)?; + let ranges = BlockRanges::from_vec(raw_ranges) + .map_err(|e| serde::de::Error::custom(e.to_string()))?; + Ok(ranges) + } +} + +impl BlockRanges { + /// Create a new, empty `BlockRanges`. + pub fn new() -> BlockRanges { + BlockRanges(SmallVec::new()) + } + + /// Create a `BlockRanges` from a [`SmallVec`]. + pub fn from_vec(ranges: SmallVec<[BlockRange; 2]>) -> Result { + let mut prev: Option<&RangeInclusive> = None; + + for range in &ranges { + range.validate()?; + + if let Some(prev) = prev { + if range.start() <= prev.end() { + return Err(BlockRangesError::UnsortedBlockRanges); + } + } + + prev = Some(range); + } + + Ok(BlockRanges(ranges)) + } + + /// Returns internal representation. + pub fn into_inner(self) -> SmallVec<[BlockRange; 2]> { + self.0 + } + + /// Return whether `BlockRanges` contains provided height. + pub fn contains(&self, height: u64) -> bool { + self.0.iter().any(|r| r.contains(&height)) + } + + /// Return whether range is empty. + pub fn is_empty(&self) -> bool { + self.0.iter().all(|r| r.is_empty()) + } + + /// Return highest height in the range. + pub fn head(&self) -> Option { + self.0.last().map(|r| *r.end()) + } + + /// Return lowest height in the range. + pub fn tail(&self) -> Option { + self.0.first().map(|r| *r.start()) + } + + /// Returns first and last index of ranges overlapping or touching provided `range`. + fn find_affected_ranges(&self, range: impl Borrow) -> Option<(usize, usize)> { + let range = range.borrow(); + debug_assert!(range.validate().is_ok()); + + let mut start_idx = None; + let mut end_idx = None; + + for (i, r) in self.0.iter().enumerate() { + if r.is_overlapping(range) || r.is_adjacent(range) { + if start_idx.is_none() { + start_idx = Some(i); + } + + end_idx = Some(i); + } else if end_idx.is_some() { + // Ranges are sorted, we can skip checking the rest. + break; + } + } + + Some((start_idx?, end_idx?)) + } + + /// Checks if `to_insert` is valid range to be inserted into the header store. + /// + /// This *must* be used when implementing [`Store::insert`]. + /// + /// The constraints are as follows: + /// + /// * Insertion range must be valid. + /// * Insertion range must not overlap with any of the existing ranges. + /// * Insertion range must be appended to the left or to the right of an existing range. + /// * Insertion is always allowed on empty `BlockRanges`. + /// * New HEAD range can be created at the front if it doesn't overlap with existing one. + /// + /// Returns: + /// + /// * `Err(_)` if constraints are not met. + /// * `Ok(true, false)` if `to_insert` range is going to be merged with its left neighbor. + /// * `Ok(false, true)` if `to_insert` range is going to be merged with the right neighbor. + /// * `Ok(true, true)` if `to_insert` range is going to be merged with both neighbors. + /// * `Ok(false, false)` if `to_insert` range is going to be inserted as a new HEAD range (without merging). + /// + /// [`Store::insert`]: crate::store::Store::insert + pub fn check_insertion_constraints( + &self, + to_insert: impl Borrow, + ) -> Result<(bool, bool)> { + let to_insert = to_insert.borrow(); + to_insert.validate()?; + + let Some(head_range) = self.0.last() else { + // Allow insersion on empty store. + return Ok((false, false)); + }; + + if head_range.left_of(to_insert) { + // Allow adding a new HEAD + let prev_exists = head_range.is_adjacent(to_insert); + return Ok((prev_exists, false)); + } + + let Some((first_idx, last_idx)) = self.find_affected_ranges(to_insert) else { + return Err(BlockRangesError::NoAdjacentNeighbors(to_insert.to_owned())); + }; + + let first = &self.0[first_idx]; + let last = &self.0[last_idx]; + let num_of_ranges = last_idx - first_idx + 1; + + match num_of_ranges { + 0 => unreachable!(), + 1 => { + if first.is_overlapping(to_insert) { + Err(BlockRangesError::BlockRangeOverlap( + to_insert.to_owned(), + calc_overlap(to_insert, first, last), + )) + } else if first.left_of(to_insert) { + Ok((true, false)) + } else { + Ok((false, true)) + } + } + 2 => { + if first.is_adjacent(to_insert) && last.is_adjacent(to_insert) { + Ok((true, true)) + } else { + Err(BlockRangesError::BlockRangeOverlap( + to_insert.to_owned(), + calc_overlap(to_insert, first, last), + )) + } + } + _ => Err(BlockRangesError::BlockRangeOverlap( + to_insert.to_owned(), + calc_overlap(to_insert, first, last), + )), + } + } + + /// Returns the head height and removes it from the ranges. + pub fn pop_head(&mut self) -> Option { + let last = self.0.last_mut()?; + let head = *last.end(); + + if last.len() == 1 { + self.0.remove(self.0.len() - 1); + } else { + *last = *last.start()..=*last.end() - 1; + } + + Some(head) + } + + /// Insert a new range. + /// + /// This fails only if `range` is not valid. It allows inserting an overlapping range. + pub fn insert_relaxed(&mut self, range: impl Borrow) -> Result<()> { + let range = range.borrow(); + range.validate()?; + + match self.find_affected_ranges(range) { + // `range` must be merged with other ranges + Some((start_idx, end_idx)) => { + let start = *self.0[start_idx].start().min(range.start()); + let end = *self.0[end_idx].end().max(range.end()); + + self.0.drain(start_idx..=end_idx); + self.0.insert(start_idx, start..=end); + } + // `range` can not be merged with other ranges + None => { + for (i, r) in self.0.iter().enumerate() { + if range.end() < r.start() { + self.0.insert(i, range.to_owned()); + return Ok(()); + } + } + + self.0.push(range.to_owned()); + } + } + + Ok(()) + } + + /// Remove a range. + /// + /// This fails only if `range` is not valid. It allows removing non-existing range. + pub fn remove_relaxed(&mut self, range: impl Borrow) -> Result<()> { + let range = range.borrow(); + range.validate()?; + + let Some((start_idx, end_idx)) = self.find_affected_ranges(range) else { + // Nothing to remove + return Ok(()); + }; + + // Remove old ranges + let old_ranges = self + .0 + .drain(start_idx..=end_idx) + .collect::>(); + + // ranges: |-----| |----| |----| + // remove: |--------------| + // after remove: |---| |--| + let first_range = old_ranges.first().expect("non empty"); + let last_range = old_ranges.last().expect("non empty"); + + if range.end() < last_range.end() { + // Add the right range + self.0 + .insert(start_idx, *range.end() + 1..=*last_range.end()); + } + + if first_range.start() < range.start() { + // Add the left range + self.0 + .insert(start_idx, *first_range.start()..=*range.start() - 1); + } + + Ok(()) + } +} + +impl AsRef<[RangeInclusive]> for BlockRanges { + fn as_ref(&self) -> &[RangeInclusive] { + &self.0 + } +} + +impl Add for BlockRanges { + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + self.add(&rhs) + } +} + +impl Add<&BlockRanges> for BlockRanges { + type Output = Self; + + fn add(mut self, rhs: &BlockRanges) -> Self::Output { + for range in rhs.0.iter() { + self.insert_relaxed(range) + .expect("BlockRanges always holds valid ranges"); + } + self + } +} + +impl Sub for BlockRanges { + type Output = Self; + + fn sub(self, rhs: Self) -> Self::Output { + self.sub(&rhs) + } +} + +impl Sub<&BlockRanges> for BlockRanges { + type Output = Self; + + fn sub(mut self, rhs: &BlockRanges) -> Self::Output { + for range in rhs.0.iter() { + self.remove_relaxed(range) + .expect("BlockRanges always holds valid ranges"); + } + self + } +} + +impl Display for BlockRanges { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[")?; + for (idx, range) in self.0.iter().enumerate() { + if idx == 0 { + write!(f, "{}", range.display())?; + } else { + write!(f, ", {}", range.display())?; + } + } + write!(f, "]") + } +} + +fn calc_overlap( + to_insert: &BlockRange, + first_range: &BlockRange, + last_range: &BlockRange, +) -> BlockRange { + let start = first_range.start().max(to_insert.start()); + let end = last_range.end().min(to_insert.end()); + *start..=*end +} + +#[cfg(test)] +mod tests { + use super::*; + + fn new_block_ranges(ranges: [BlockRange; N]) -> BlockRanges { + BlockRanges::from_vec(ranges.into_iter().collect()).expect("invalid BlockRanges") + } + + #[test] + fn range_len() { + assert_eq!((0u64..=0).len(), 1); + assert_eq!((0u64..=5).len(), 6); + assert_eq!((1u64..=2).len(), 2); + assert_eq!(RangeInclusive::new(2u64, 1).len(), 0); + assert_eq!((10001u64..=20000).len(), 10000); + } + + #[test] + fn block_ranges_empty() { + assert!(new_block_ranges([]).is_empty()); + assert!(!new_block_ranges([1..=3]).is_empty()); + } + + #[test] + fn block_ranges_head() { + assert_eq!(new_block_ranges([]).head(), None); + assert_eq!(new_block_ranges([1..=3]).head(), Some(3)); + assert_eq!(new_block_ranges([1..=3, 6..=9]).head(), Some(9)); + assert_eq!(new_block_ranges([1..=3, 5..=5, 8..=9]).head(), Some(9)); + } + + #[test] + fn block_ranges_tail() { + assert_eq!(new_block_ranges([]).tail(), None); + assert_eq!(new_block_ranges([1..=3]).tail(), Some(1)); + assert_eq!(new_block_ranges([1..=3, 6..=9]).tail(), Some(1)); + assert_eq!(new_block_ranges([1..=3, 5..=5, 8..=9]).tail(), Some(1)); + } + + #[test] + fn check_range_insert_append() { + let (prev_exists, next_exists) = new_block_ranges([]) + .check_insertion_constraints(1..=5) + .unwrap(); + assert!(!prev_exists); + assert!(!next_exists); + + let (prev_exists, next_exists) = new_block_ranges([1..=4]) + .check_insertion_constraints(5..=5) + .unwrap(); + assert!(prev_exists); + assert!(!next_exists); + + let (prev_exists, next_exists) = new_block_ranges([1..=5]) + .check_insertion_constraints(6..=9) + .unwrap(); + assert!(prev_exists); + assert!(!next_exists); + + let (prev_exists, next_exists) = new_block_ranges([6..=8]) + .check_insertion_constraints(2..=5) + .unwrap(); + assert!(!prev_exists); + assert!(next_exists); + + // Allow inserting a new HEAD range + let (prev_exists, next_exists) = new_block_ranges([1..=5]) + .check_insertion_constraints(7..=9) + .unwrap(); + assert!(!prev_exists); + assert!(!next_exists); + } + + #[test] + fn check_range_insert_with_consolidation() { + let (prev_exists, next_exists) = new_block_ranges([1..=3, 6..=9]) + .check_insertion_constraints(4..=5) + .unwrap(); + assert!(prev_exists); + assert!(next_exists); + + let (prev_exists, next_exists) = new_block_ranges([1..=2, 5..=5, 8..=9]) + .check_insertion_constraints(3..=4) + .unwrap(); + assert!(prev_exists); + assert!(next_exists); + + let (prev_exists, next_exists) = new_block_ranges([1..=2, 4..=4, 8..=9]) + .check_insertion_constraints(5..=7) + .unwrap(); + assert!(prev_exists); + assert!(next_exists); + } + + #[test] + fn check_range_insert_overlapping() { + let result = new_block_ranges([1..=2]) + .check_insertion_constraints(1..=1) + .unwrap_err(); + assert_eq!(result, BlockRangesError::BlockRangeOverlap(1..=1, 1..=1)); + + let result = new_block_ranges([1..=2]) + .check_insertion_constraints(2..=2) + .unwrap_err(); + assert_eq!(result, BlockRangesError::BlockRangeOverlap(2..=2, 2..=2)); + + let result = new_block_ranges([1..=4]) + .check_insertion_constraints(2..=8) + .unwrap_err(); + assert_eq!(result, BlockRangesError::BlockRangeOverlap(2..=8, 2..=4)); + + let result = new_block_ranges([1..=4]) + .check_insertion_constraints(2..=3) + .unwrap_err(); + assert_eq!(result, BlockRangesError::BlockRangeOverlap(2..=3, 2..=3)); + + let result = new_block_ranges([5..=9]) + .check_insertion_constraints(1..=5) + .unwrap_err(); + assert_eq!(result, BlockRangesError::BlockRangeOverlap(1..=5, 5..=5)); + + let result = new_block_ranges([5..=8]) + .check_insertion_constraints(2..=8) + .unwrap_err(); + assert_eq!(result, BlockRangesError::BlockRangeOverlap(2..=8, 5..=8)); + + let result = new_block_ranges([1..=3, 6..=9]) + .check_insertion_constraints(3..=6) + .unwrap_err(); + assert_eq!(result, BlockRangesError::BlockRangeOverlap(3..=6, 3..=6)); + + let result = new_block_ranges([1..=3, 5..=6]) + .check_insertion_constraints(3..=9) + .unwrap_err(); + assert_eq!(result, BlockRangesError::BlockRangeOverlap(3..=9, 3..=6)); + + let result = new_block_ranges([2..=3, 5..=6]) + .check_insertion_constraints(1..=5) + .unwrap_err(); + assert_eq!(result, BlockRangesError::BlockRangeOverlap(1..=5, 2..=5)); + } + + #[test] + fn check_range_insert_invalid_placement() { + let result = new_block_ranges([1..=2, 7..=9]) + .check_insertion_constraints(4..=4) + .unwrap_err(); + assert_eq!(result, BlockRangesError::NoAdjacentNeighbors(4..=4)); + + let result = new_block_ranges([1..=2, 8..=9]) + .check_insertion_constraints(4..=6) + .unwrap_err(); + assert_eq!(result, BlockRangesError::NoAdjacentNeighbors(4..=6)); + + let result = new_block_ranges([4..=5, 7..=8]) + .check_insertion_constraints(1..=2) + .unwrap_err(); + assert_eq!(result, BlockRangesError::NoAdjacentNeighbors(1..=2)); + } + + #[test] + fn test_header_range_creation_ok() { + new_block_ranges([1..=3, 5..=8]); + new_block_ranges([]); + new_block_ranges([1..=1, 1000000..=2000000]); + } + + #[test] + #[should_panic] + fn test_header_range_creation_overlap() { + new_block_ranges([1..=3, 2..=5]); + } + + #[test] + #[should_panic] + fn test_header_range_creation_inverse() { + new_block_ranges([1..=3, RangeInclusive::new(9, 5)]); + } + + #[test] + #[should_panic] + fn test_header_range_creation_wrong_order() { + new_block_ranges([10..=15, 1..=5]); + } + + #[test] + fn pop_head() { + let mut ranges = new_block_ranges([]); + assert_eq!(ranges.pop_head(), None); + + let mut ranges = new_block_ranges([1..=4, 6..=8, 10..=10]); + assert_eq!(ranges.pop_head(), Some(10)); + assert_eq!(ranges.pop_head(), Some(8)); + assert_eq!(ranges.pop_head(), Some(7)); + assert_eq!(ranges.pop_head(), Some(6)); + assert_eq!(ranges.pop_head(), Some(4)); + assert_eq!(ranges.pop_head(), Some(3)); + assert_eq!(ranges.pop_head(), Some(2)); + assert_eq!(ranges.pop_head(), Some(1)); + assert_eq!(ranges.pop_head(), None); + } + + #[test] + fn validate_check() { + (1..=1).validate().unwrap(); + (1..=2).validate().unwrap(); + (0..=0).validate().unwrap_err(); + (0..=1).validate().unwrap_err(); + #[allow(clippy::reversed_empty_ranges)] + (2..=1).validate().unwrap_err(); + } + + #[test] + fn adjacent_check() { + assert!((3..=5).is_adjacent(&(1..=2))); + assert!((3..=5).is_adjacent(&(6..=8))); + + assert!(!(3..=5).is_adjacent(&(1..=1))); + assert!(!(3..=5).is_adjacent(&(7..=8))); + } + + #[test] + fn overlapping_check() { + // equal + assert!((3..=5).is_overlapping(&(3..=5))); + + // partial set + assert!((3..=5).is_overlapping(&(1..=4))); + assert!((3..=5).is_overlapping(&(1..=3))); + assert!((3..=5).is_overlapping(&(4..=8))); + assert!((3..=5).is_overlapping(&(5..=8))); + + // subset + assert!((3..=5).is_overlapping(&(4..=4))); + assert!((3..=5).is_overlapping(&(3..=4))); + assert!((3..=5).is_overlapping(&(4..=5))); + + // superset + assert!((3..=5).is_overlapping(&(1..=5))); + assert!((3..=5).is_overlapping(&(1..=6))); + assert!((3..=5).is_overlapping(&(3..=6))); + assert!((3..=5).is_overlapping(&(4..=6))); + + // not overlapping + assert!(!(3..=5).is_overlapping(&(1..=1))); + assert!(!(3..=5).is_overlapping(&(7..=8))); + } + + #[test] + fn left_of_check() { + // range is on the left of + assert!((1..=2).left_of(&(3..=4))); + assert!((1..=1).left_of(&(3..=4))); + + // range is on the right of + assert!(!(3..=4).left_of(&(1..=2))); + assert!(!(3..=4).left_of(&(1..=1))); + + // overlapping is not accepted + assert!(!(1..=3).left_of(&(3..=4))); + assert!(!(1..=5).left_of(&(3..=4))); + assert!(!(3..=4).left_of(&(1..=3))); + } + + #[test] + fn find_affected_ranges() { + let ranges = new_block_ranges([30..=50, 80..=100, 130..=150]); + + assert_eq!(ranges.find_affected_ranges(28..=28), None); + assert_eq!(ranges.find_affected_ranges(1..=15), None); + assert_eq!(ranges.find_affected_ranges(1..=28), None); + assert_eq!(ranges.find_affected_ranges(3..=28), None); + assert_eq!(ranges.find_affected_ranges(1..=29), Some((0, 0))); + assert_eq!(ranges.find_affected_ranges(1..=30), Some((0, 0))); + assert_eq!(ranges.find_affected_ranges(1..=49), Some((0, 0))); + assert_eq!(ranges.find_affected_ranges(1..=50), Some((0, 0))); + assert_eq!(ranges.find_affected_ranges(1..=51), Some((0, 0))); + + assert_eq!(ranges.find_affected_ranges(40..=51), Some((0, 0))); + assert_eq!(ranges.find_affected_ranges(50..=51), Some((0, 0))); + assert_eq!(ranges.find_affected_ranges(51..=51), Some((0, 0))); + + assert_eq!(ranges.find_affected_ranges(40..=79), Some((0, 1))); + assert_eq!(ranges.find_affected_ranges(50..=79), Some((0, 1))); + assert_eq!(ranges.find_affected_ranges(51..=79), Some((0, 1))); + assert_eq!(ranges.find_affected_ranges(50..=80), Some((0, 1))); + + assert_eq!(ranges.find_affected_ranges(52..=52), None); + assert_eq!(ranges.find_affected_ranges(52..=78), None); + assert_eq!(ranges.find_affected_ranges(52..=79), Some((1, 1))); + assert_eq!(ranges.find_affected_ranges(52..=80), Some((1, 1))); + assert_eq!(ranges.find_affected_ranges(52..=129), Some((1, 2))); + assert_eq!(ranges.find_affected_ranges(99..=129), Some((1, 2))); + assert_eq!(ranges.find_affected_ranges(100..=129), Some((1, 2))); + assert_eq!(ranges.find_affected_ranges(101..=129), Some((1, 2))); + assert_eq!(ranges.find_affected_ranges(101..=128), Some((1, 1))); + assert_eq!(ranges.find_affected_ranges(51..=129), Some((0, 2))); + + assert_eq!(ranges.find_affected_ranges(102..=128), None); + assert_eq!(ranges.find_affected_ranges(102..=120), None); + assert_eq!(ranges.find_affected_ranges(120..=128), None); + + assert_eq!(ranges.find_affected_ranges(40..=129), Some((0, 2))); + assert_eq!(ranges.find_affected_ranges(40..=140), Some((0, 2))); + assert_eq!(ranges.find_affected_ranges(20..=140), Some((0, 2))); + assert_eq!(ranges.find_affected_ranges(20..=150), Some((0, 2))); + assert_eq!(ranges.find_affected_ranges(20..=151), Some((0, 2))); + assert_eq!(ranges.find_affected_ranges(20..=160), Some((0, 2))); + + assert_eq!(ranges.find_affected_ranges(120..=129), Some((2, 2))); + assert_eq!(ranges.find_affected_ranges(120..=128), None); + assert_eq!(ranges.find_affected_ranges(120..=130), Some((2, 2))); + assert_eq!(ranges.find_affected_ranges(120..=131), Some((2, 2))); + assert_eq!(ranges.find_affected_ranges(140..=145), Some((2, 2))); + assert_eq!(ranges.find_affected_ranges(140..=150), Some((2, 2))); + assert_eq!(ranges.find_affected_ranges(140..=155), Some((2, 2))); + assert_eq!(ranges.find_affected_ranges(152..=155), None); + assert_eq!(ranges.find_affected_ranges(152..=178), None); + assert_eq!(ranges.find_affected_ranges(152..=152), None); + + assert_eq!(new_block_ranges([]).find_affected_ranges(1..=1), None); + + assert_eq!(new_block_ranges([1..=2]).find_affected_ranges(6..=9), None); + assert_eq!(new_block_ranges([4..=8]).find_affected_ranges(1..=2), None); + assert_eq!( + new_block_ranges([4..=8, 20..=30]).find_affected_ranges(1..=2), + None + ); + assert_eq!( + new_block_ranges([4..=8, 20..=30]).find_affected_ranges(10..=12), + None + ); + assert_eq!( + new_block_ranges([4..=8, 20..=30]).find_affected_ranges(32..=32), + None + ); + } + + #[test] + fn insert_relaxed_disjoined() { + let mut r = BlockRanges::default(); + r.insert_relaxed(10..=10).unwrap(); + assert_eq!(&r.0[..], &[10..=10][..]); + + let ranges = new_block_ranges([30..=50, 80..=100, 130..=150]); + + let mut r = ranges.clone(); + r.insert_relaxed(1..=1).unwrap(); + assert_eq!(&r.0[..], &[1..=1, 30..=50, 80..=100, 130..=150][..]); + + let mut r = ranges.clone(); + r.insert_relaxed(1..=28).unwrap(); + assert_eq!(&r.0[..], &[1..=28, 30..=50, 80..=100, 130..=150][..]); + + let mut r = ranges.clone(); + r.insert_relaxed(10..=28).unwrap(); + assert_eq!(&r.0[..], &[10..=28, 30..=50, 80..=100, 130..=150][..]); + + let mut r = ranges.clone(); + r.insert_relaxed(52..=78).unwrap(); + assert_eq!(&r.0[..], &[30..=50, 52..=78, 80..=100, 130..=150][..]); + + let mut r = ranges.clone(); + r.insert_relaxed(102..=128).unwrap(); + assert_eq!(&r.0[..], &[30..=50, 80..=100, 102..=128, 130..=150][..]); + + let mut r = ranges.clone(); + r.insert_relaxed(152..=152).unwrap(); + assert_eq!(&r.0[..], &[30..=50, 80..=100, 130..=150, 152..=152][..]); + + let mut r = ranges.clone(); + r.insert_relaxed(152..=170).unwrap(); + assert_eq!(&r.0[..], &[30..=50, 80..=100, 130..=150, 152..=170][..]); + + let mut r = ranges.clone(); + r.insert_relaxed(160..=170).unwrap(); + assert_eq!(&r.0[..], &[30..=50, 80..=100, 130..=150, 160..=170][..]); + } + + #[test] + fn insert_relaxed_intersected() { + let ranges = new_block_ranges([30..=50, 80..=100, 130..=150]); + + let mut r = ranges.clone(); + r.insert_relaxed(29..=29).unwrap(); + assert_eq!(&r.0[..], &[29..=50, 80..=100, 130..=150][..]); + + let mut r = ranges.clone(); + r.insert_relaxed(1..=29).unwrap(); + assert_eq!(&r.0[..], &[1..=50, 80..=100, 130..=150][..]); + + let mut r = ranges.clone(); + r.insert_relaxed(29..=35).unwrap(); + assert_eq!(&r.0[..], &[29..=50, 80..=100, 130..=150][..]); + + let mut r = ranges.clone(); + r.insert_relaxed(29..=55).unwrap(); + assert_eq!(&r.0[..], &[29..=55, 80..=100, 130..=150][..]); + + let mut r = ranges.clone(); + r.insert_relaxed(29..=78).unwrap(); + assert_eq!(&r.0[..], &[29..=78, 80..=100, 130..=150][..]); + + let mut r = ranges.clone(); + r.insert_relaxed(29..=79).unwrap(); + assert_eq!(&r.0[..], &[29..=100, 130..=150][..]); + + let mut r = ranges.clone(); + r.insert_relaxed(30..=79).unwrap(); + assert_eq!(&r.0[..], &[30..=100, 130..=150][..]); + + let mut r = ranges.clone(); + r.insert_relaxed(30..=150).unwrap(); + assert_eq!(&r.0[..], &[30..=150][..]); + + let mut r = ranges.clone(); + r.insert_relaxed(10..=170).unwrap(); + assert_eq!(&r.0[..], &[10..=170][..]); + + let mut r = ranges.clone(); + r.insert_relaxed(85..=129).unwrap(); + assert_eq!(&r.0[..], &[30..=50, 80..=150][..]); + + let mut r = ranges.clone(); + r.insert_relaxed(85..=129).unwrap(); + assert_eq!(&r.0[..], &[30..=50, 80..=150][..]); + + let mut r = ranges.clone(); + r.insert_relaxed(135..=170).unwrap(); + assert_eq!(&r.0[..], &[30..=50, 80..=100, 130..=170][..]); + + let mut r = ranges.clone(); + r.insert_relaxed(151..=170).unwrap(); + assert_eq!(&r.0[..], &[30..=50, 80..=100, 130..=170][..]); + + let mut r = new_block_ranges([1..=2, 4..=6, 8..=10, 15..=20, 80..=100]); + r.insert_relaxed(3..=79).unwrap(); + assert_eq!(&r.0[..], &[1..=100][..]); + } + + #[test] + fn remove_relaxed() { + let ranges = new_block_ranges([30..=50, 80..=100, 130..=150]); + + let mut r = ranges.clone(); + r.remove_relaxed(29..=29).unwrap(); + assert_eq!(&r.0[..], &[30..=50, 80..=100, 130..=150][..]); + + let mut r = ranges.clone(); + r.remove_relaxed(30..=30).unwrap(); + assert_eq!(&r.0[..], &[31..=50, 80..=100, 130..=150][..]); + + let mut r = ranges.clone(); + r.remove_relaxed(20..=40).unwrap(); + assert_eq!(&r.0[..], &[41..=50, 80..=100, 130..=150][..]); + + let mut r = ranges.clone(); + r.remove_relaxed(35..=40).unwrap(); + assert_eq!(&r.0[..], &[30..=34, 41..=50, 80..=100, 130..=150][..]); + + let mut r = ranges.clone(); + r.remove_relaxed(51..=129).unwrap(); + assert_eq!(&r.0[..], &[30..=50, 130..=150][..]); + + let mut r = ranges.clone(); + r.remove_relaxed(50..=130).unwrap(); + assert_eq!(&r.0[..], &[30..=49, 131..=150][..]); + + let mut r = ranges.clone(); + r.remove_relaxed(35..=49).unwrap(); + assert_eq!(&r.0[..], &[30..=34, 50..=50, 80..=100, 130..=150][..]); + + let mut r = ranges.clone(); + r.remove_relaxed(35..=50).unwrap(); + assert_eq!(&r.0[..], &[30..=34, 80..=100, 130..=150][..]); + + let mut r = ranges.clone(); + r.remove_relaxed(35..=55).unwrap(); + assert_eq!(&r.0[..], &[30..=34, 80..=100, 130..=150][..]); + + let mut r = ranges.clone(); + r.remove_relaxed(35..=135).unwrap(); + assert_eq!(&r.0[..], &[30..=34, 136..=150][..]); + + let mut r = ranges.clone(); + r.remove_relaxed(35..=170).unwrap(); + assert_eq!(&r.0[..], &[30..=34][..]); + + let mut r = ranges.clone(); + r.remove_relaxed(10..=135).unwrap(); + assert_eq!(&r.0[..], &[136..=150][..]); + + let mut r = ranges.clone(); + r.remove_relaxed(10..=170).unwrap(); + assert!(r.0.is_empty()); + + let mut r = new_block_ranges([1..=10, 12..=12, 14..=14]); + r.remove_relaxed(12..=12).unwrap(); + assert_eq!(&r.0[..], &[1..=10, 14..=14][..]); + + let mut r = new_block_ranges([1..=u64::MAX]); + r.remove_relaxed(12..=12).unwrap(); + assert_eq!(&r.0[..], &[1..=11, 13..=u64::MAX][..]); + + let mut r = new_block_ranges([1..=u64::MAX]); + r.remove_relaxed(1..=1).unwrap(); + assert_eq!(&r.0[..], &[2..=u64::MAX][..]); + + let mut r = new_block_ranges([1..=u64::MAX]); + r.remove_relaxed(u64::MAX..=u64::MAX).unwrap(); + assert_eq!(&r.0[..], &[1..=u64::MAX - 1][..]); + } +} diff --git a/node/src/daser.rs b/node/src/daser.rs index 29ad9711..6dbf4172 100644 --- a/node/src/daser.rs +++ b/node/src/daser.rs @@ -29,11 +29,10 @@ //! 5. Steps 3 and 4 are repeated concurently, unless we detect that all peers have disconnected. //! At that point Daser cleans the queue and moves back to step 1. -use std::collections::{HashSet, VecDeque}; +use std::collections::HashSet; use std::sync::Arc; use celestia_tendermint::Time; -use celestia_types::ExtendedHeader; use futures::future::BoxFuture; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; @@ -47,7 +46,7 @@ use crate::events::{EventPublisher, NodeEvent}; use crate::executor::spawn; use crate::p2p::shwap::sample_cid; use crate::p2p::{P2p, P2pError}; -use crate::store::{HeaderRanges, SamplingStatus, Store, StoreError}; +use crate::store::{BlockRanges, SamplingStatus, Store, StoreError}; const MAX_SAMPLES_NEEDED: usize = 16; @@ -134,18 +133,12 @@ where store: Arc, max_samples_needed: usize, sampling_futs: FuturesUnordered>>, - queue: VecDeque, - prev_stored_blocks: HeaderRanges, + queue: BlockRanges, + done: BlockRanges, + ongoing: BlockRanges, prev_head: Option, } -#[derive(Debug)] -struct SamplingArgs { - height: u64, - square_width: u16, - time: Time, -} - impl Worker where S: Store, @@ -158,8 +151,9 @@ where store: args.store, max_samples_needed: MAX_SAMPLES_NEEDED, sampling_futs: FuturesUnordered::new(), - queue: VecDeque::new(), - prev_stored_blocks: HeaderRanges::default(), + queue: BlockRanges::default(), + done: BlockRanges::default(), + ongoing: BlockRanges::default(), prev_head: None, }) } @@ -229,10 +223,10 @@ where loop { // If we have a new HEAD queued, schedule it now! - if let Some(queue_front) = self.queue.front().map(|args| args.height) { - if queue_front > self.prev_head.unwrap_or(0) { + if let Some(queue_head) = self.queue.head() { + if queue_head > self.prev_head.unwrap_or(0) { self.schedule_next_sample_block().await?; - self.prev_head = Some(queue_front); + self.prev_head = Some(queue_head); } } @@ -265,6 +259,9 @@ where self.store .update_sampling_metadata(height, status, Vec::new()) .await?; + + self.ongoing.remove_relaxed(height..=height).expect("invalid height"); + self.done.insert_relaxed(height..=height).expect("invalid height"); }, _ = &mut wait_new_head => { wait_new_head = store.wait_new_head(); @@ -274,38 +271,63 @@ where } self.sampling_futs.clear(); - self.queue.clear(); - self.prev_stored_blocks = HeaderRanges::default(); + self.queue = BlockRanges::default(); + self.ongoing = BlockRanges::default(); + self.done = BlockRanges::default(); self.prev_head = None; Ok(()) } async fn schedule_next_sample_block(&mut self) -> Result<()> { - let Some(args) = self.queue.pop_front() else { - return Ok(()); + // Schedule the most recent un-sampled block. + let header = loop { + let Some(height) = self.queue.pop_head() else { + return Ok(()); + }; + + match self.store.get_by_height(height).await { + Ok(header) => break header, + Err(StoreError::NotFound) => { + // Height was pruned and our queue is inconsistent. + // Repopulate queue and try again. + self.populate_queue().await?; + } + Err(e) => return Err(e.into()), + } }; + let height = header.height().value(); + let square_width = header.dah.square_width(); + // Make sure that the block is still in the sampling window. - if !in_sampling_window(args.time) { - // Queue is sorted by block height in descending order, - // so as soon as we reach a block that is not in the sampling + if !in_sampling_window(header.time()) { + // As soon as we reach a block that is not in the sampling // window, it means the rest wouldn't be either. - self.queue.clear(); + self.queue + .remove_relaxed(1..=height) + .expect("invalid height"); + self.done + .insert_relaxed(1..=height) + .expect("invalid height"); return Ok(()); } // Select random shares to be sampled - let share_indexes = random_indexes(args.square_width, self.max_samples_needed); + let share_indexes = random_indexes(square_width, self.max_samples_needed); // Update the CID list before we start sampling, otherwise it's possible for us // to leak CIDs causing associated blocks to never get cleaned from blockstore. let cids = share_indexes .iter() - .map(|(row, col)| sample_cid(*row, *col, args.height)) + .map(|(row, col)| sample_cid(*row, *col, height)) .collect::, _>>()?; + + // NOTE: Pruning window is always 1 hour bigger than sampling + // window, so after `in_sampling_window` if statement we shouldn't + // care about `StoreError::NotFound` anymore. self.store - .update_sampling_metadata(args.height, SamplingStatus::Unknown, cids) + .update_sampling_metadata(height, SamplingStatus::Unknown, cids) .await?; let p2p = self.p2p.clone(); @@ -316,8 +338,8 @@ where let now = Instant::now(); event_pub.send(NodeEvent::SamplingStarted { - height: args.height, - square_width: args.square_width, + height, + square_width, shares: share_indexes.iter().copied().collect(), }); @@ -328,7 +350,7 @@ where let p2p = p2p.clone(); async move { - let res = p2p.get_sample(row, col, args.height).await; + let res = p2p.get_sample(row, col, height).await; (row, col, res) } }) @@ -351,8 +373,8 @@ where block_accepted &= share_accepted; event_pub.send(NodeEvent::ShareSamplingResult { - height: args.height, - square_width: args.square_width, + height, + square_width, row, column, accepted: share_accepted, @@ -360,16 +382,19 @@ where } event_pub.send(NodeEvent::SamplingFinished { - height: args.height, + height, accepted: block_accepted, took: now.elapsed(), }); - Ok((args.height, block_accepted)) + Ok((height, block_accepted)) } .boxed(); self.sampling_futs.push(fut); + self.ongoing + .insert_relaxed(height..=height) + .expect("invalid height"); Ok(()) } @@ -381,70 +406,15 @@ where /// limitation that's coming from bitswap: only way for us to know if sampling /// failed is via timeout. async fn populate_queue(&mut self) -> Result<()> { - let stored_blocks = self.store.get_stored_header_ranges().await?; - let first_check = self.prev_stored_blocks.is_empty(); - - 'outer: for block_range in stored_blocks.clone().into_inner().into_iter().rev() { - for height in block_range.rev() { - if self.prev_stored_blocks.contains(height) { - // Skip blocks that were checked before - continue; - } - - // Optimization: We check if the block was accepted only if this is - // the first time we check the store (i.e. prev_stored_blocks is empty), - // otherwise we can safely assume that block needs sampling. - if first_check && is_block_accepted(&*self.store, height).await { - // Skip already sampled blocks - continue; - } - - let Ok(header) = self.store.get_by_height(height).await else { - // We reached the tail of the known heights - break 'outer; - }; + let stored = self.store.get_stored_header_ranges().await?; + let accepted = self.store.get_accepted_sampling_ranges().await?; - if !in_sampling_window(header.time()) { - // We've reached the tail of the sampling window - break 'outer; - } - - queue_sampling(&mut self.queue, &header); - } - } - - self.prev_stored_blocks = stored_blocks; + self.queue = stored - accepted - &self.done - &self.ongoing; Ok(()) } } -/// Queue sampling in descending order -fn queue_sampling(queue: &mut VecDeque, header: &ExtendedHeader) { - let args = SamplingArgs { - height: header.height().value(), - time: header.time(), - square_width: header.dah.square_width(), - }; - - if queue.is_empty() || args.height >= queue.front().unwrap().height { - queue.push_front(args); - return; - } - - if args.height <= queue.back().unwrap().height { - queue.push_back(args); - return; - } - - let pos = match queue.binary_search_by(|x| args.height.cmp(&x.height)) { - Ok(pos) => pos, - Err(pos) => pos, - }; - - queue.insert(pos, args); -} - /// Returns true if `time` is within the sampling window. fn in_sampling_window(time: Time) -> bool { let now = Time::now(); @@ -461,14 +431,6 @@ fn in_sampling_window(time: Time) -> bool { age <= SAMPLING_WINDOW } -/// Returns true if block has been sampled and accepted. -async fn is_block_accepted(store: &impl Store, height: u64) -> bool { - match store.get_sampling_metadata(height).await { - Ok(Some(metadata)) => metadata.status == SamplingStatus::Accepted, - _ => false, - } -} - /// Returns unique and random indexes that will be used for sampling. fn random_indexes(square_width: u16, max_samples_needed: usize) -> HashSet<(u16, u16)> { let samples_in_block = usize::from(square_width).pow(2); diff --git a/node/src/lib.rs b/node/src/lib.rs index 38244949..bda6f92b 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -1,6 +1,7 @@ #![cfg_attr(docsrs, feature(doc_cfg))] #![doc = include_str!("../README.md")] +pub mod block_ranges; pub mod blockstore; pub mod daser; pub mod events; diff --git a/node/src/p2p.rs b/node/src/p2p.rs index 50053631..d19ede26 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -54,6 +54,7 @@ mod header_session; pub(crate) mod shwap; mod swarm; +use crate::block_ranges::BlockRange; use crate::events::EventPublisher; use crate::executor::{self, spawn, Interval}; use crate::p2p::header_ex::{HeaderExBehaviour, HeaderExConfig}; @@ -62,7 +63,6 @@ use crate::p2p::shwap::{namespaced_data_cid, row_cid, sample_cid, ShwapMultihash use crate::p2p::swarm::new_swarm; use crate::peer_tracker::PeerTracker; use crate::peer_tracker::PeerTrackerInfo; -use crate::store::header_ranges::HeaderRange; use crate::store::Store; use crate::utils::{ celestia_protocol_id, fraudsub_ident_topic, gossipsub_ident_topic, MultiaddrExt, @@ -416,7 +416,7 @@ impl P2p { /// responsibility to verify range edges against headers existing in the store. pub(crate) async fn get_unverified_header_range( &self, - range: HeaderRange, + range: BlockRange, ) -> Result> { if range.is_empty() { return Err(HeaderExError::InvalidRequest.into()); diff --git a/node/src/p2p/header_session.rs b/node/src/p2p/header_session.rs index 9e1ccb6d..ad3016ac 100644 --- a/node/src/p2p/header_session.rs +++ b/node/src/p2p/header_session.rs @@ -3,10 +3,10 @@ use celestia_types::ExtendedHeader; use tokio::sync::{mpsc, oneshot}; use tracing::debug; +use crate::block_ranges::{BlockRange, BlockRangeExt}; use crate::executor::spawn; use crate::p2p::header_ex::utils::HeaderRequestExt; use crate::p2p::{P2pCmd, P2pError}; -use crate::store::header_ranges::{HeaderRange, RangeLengthExt}; const MAX_AMOUNT_PER_REQ: u64 = 64; const MAX_CONCURRENT_REQS: usize = 8; @@ -14,7 +14,7 @@ const MAX_CONCURRENT_REQS: usize = 8; type Result = std::result::Result; pub(crate) struct HeaderSession { - to_fetch: Option, + to_fetch: Option, cmd_tx: mpsc::Sender, response_tx: mpsc::Sender<(u64, u64, Result>)>, response_rx: mpsc::Receiver<(u64, u64, Result>)>, @@ -23,14 +23,14 @@ pub(crate) struct HeaderSession { impl HeaderSession { /// Create a new HeaderSession responsible for fetching provided range of headers. - /// `HeaderRange` can be created manually, or more probably using + /// `BlockRange` can be created manually, or more probably using /// [`Store::get_stored_header_ranges`] to fetch existing header ranges and then using /// [`calculate_fetch_range`] to return a first range that should be fetched. /// Received headers range is sent over `cmd_tx` as a vector of unverified headers. /// /// [`calculate_fetch_range`] crate::store::utils::calculate_fetch_range /// [`Store::get_stored_header_ranges`]: crate::store::Store::get_stored_header_ranges - pub(crate) fn new(range: HeaderRange, cmd_tx: mpsc::Sender) -> Self { + pub(crate) fn new(range: BlockRange, cmd_tx: mpsc::Sender) -> Self { let (response_tx, response_rx) = mpsc::channel(MAX_CONCURRENT_REQS); HeaderSession { @@ -140,7 +140,7 @@ impl HeaderSession { } /// take a next batch of up to `limit` headers from the front of the `range_to_fetch` -fn take_next_batch(range_to_fetch: &mut Option, limit: u64) -> Option { +fn take_next_batch(range_to_fetch: &mut Option, limit: u64) -> Option { // calculate potential end offset before we modify range_to_fetch let end_offset = limit.checked_sub(1)?; diff --git a/node/src/store.rs b/node/src/store.rs index d3aa3fea..4921961c 100644 --- a/node/src/store.rs +++ b/node/src/store.rs @@ -14,7 +14,8 @@ use prost::Message; use serde::{Deserialize, Serialize}; use thiserror::Error; -pub use crate::store::header_ranges::{HeaderRanges, VerifiedExtendedHeaders}; +pub use crate::block_ranges::{BlockRange, BlockRanges, BlockRangesError}; +pub use crate::store::utils::{ExtendedHeaderGeneratorExt, VerifiedExtendedHeaders}; pub use in_memory_store::InMemoryStore; #[cfg(target_arch = "wasm32")] @@ -28,9 +29,6 @@ mod indexed_db_store; #[cfg(not(target_arch = "wasm32"))] mod redb_store; -pub use header_ranges::ExtendedHeaderGeneratorExt; - -pub(crate) mod header_ranges; pub(crate) mod utils; /// Sampling metadata for a block. @@ -148,16 +146,18 @@ pub trait Store: Send + Sync + Debug { /// Insert a range of headers into the store. /// - /// Inserts are allowed at the front of the store or at the ends of any existing ranges. Edges - /// of inserted header ranges are verified against headers present in the store, if they - /// exist. + /// New insertion should pass all the constraints in [`BlockRanges::check_insertion_constraints`], + /// additionaly it should be [`ExtendedHeader::verify`]ed against neighbor headers. async fn insert(&self, headers: R) -> Result<()> where R: TryInto + Send, StoreError: From<>::Error>; - /// Return a list of header ranges currenty held in store - async fn get_stored_header_ranges(&self) -> Result; + /// Returns a list of header ranges currenty held in store. + async fn get_stored_header_ranges(&self) -> Result; + + /// Returns a list of accepted sampling ranges currently held in store. + async fn get_accepted_sampling_ranges(&self) -> Result; } /// Representation of all the errors that can occur when interacting with the [`Store`]. @@ -167,27 +167,6 @@ pub enum StoreError { #[error("Hash {0} already exists in store")] HashExists(Hash), - /// Height already exists in the store. - #[error("Height {0} already exists in store")] - HeightExists(u64), - - /// Inserted height is not following store's current head. - #[error("Failed to append header at height {1}")] - NonContinuousAppend(u64, u64), - - /// Store already contains some of the headers from the range that's being inserted - #[error("Failed to insert header range, it overlaps with one already existing in the store: {0}..={1}")] - HeaderRangeOverlap(u64, u64), - - /// Store only allows inserts that grow existing header ranges, or starting a new network head, - /// ahead of all the existing ranges - #[error("Trying to insert new header range at disallowed position: {0}..={1}")] - InsertPlacementDisallowed(u64, u64), - - /// Range of headers provided to insert is not contiguous - #[error("Provided header range has a gap between heights {0} and {1}")] - InsertRangeWithGap(u64, u64), - /// Header validation has failed. #[error("Failed to validate header at height {0}")] HeaderChecksError(u64), @@ -227,6 +206,10 @@ pub enum StoreError { /// Invalid range of headers provided. #[error("Invalid headers range")] InvalidHeadersRange, + + /// An error propagated from [`BlockRanges`] methods. + #[error(transparent)] + BlockRangesError(#[from] BlockRangesError), } #[cfg(not(target_arch = "wasm32"))] @@ -331,8 +314,6 @@ fn to_headers_range(bounds: impl RangeBounds, last_index: u64) -> Result e, + res => panic!("Invalid result: {res:?}"), + }; + + assert_eq!( + error, + BlockRangesError::BlockRangeOverlap(101..=101, 101..=101) + ); } #[rstest] @@ -570,11 +556,11 @@ mod tests { let header29 = s.get_by_height(29).await.unwrap(); let header30 = gen.next_of(&header29); - let insert_existing_result = s.insert(header30).await; - assert!(matches!( - insert_existing_result, - Err(StoreError::HeaderRangeOverlap(30, 30)) - )); + let error = match s.insert(header30).await { + Err(StoreError::BlockRangesError(e)) => e, + res => panic!("Invalid result: {res:?}"), + }; + assert_eq!(error, BlockRangesError::BlockRangeOverlap(30..=30, 30..=30)); } #[rstest] diff --git a/node/src/store/header_ranges.rs b/node/src/store/header_ranges.rs deleted file mode 100644 index 982979c5..00000000 --- a/node/src/store/header_ranges.rs +++ /dev/null @@ -1,524 +0,0 @@ -use std::fmt::Display; -use std::ops::RangeInclusive; -use std::vec; - -#[cfg(any(test, feature = "test-utils"))] -use celestia_types::test_utils::ExtendedHeaderGenerator; -use celestia_types::ExtendedHeader; -use serde::{Deserialize, Serialize}; -use smallvec::SmallVec; - -use crate::store::utils::{ranges_intersection, try_consolidate_ranges, RangeScanResult}; -use crate::store::StoreError; - -pub type HeaderRange = RangeInclusive; - -/// Span of header that's been verified internally -#[derive(Clone)] -pub struct VerifiedExtendedHeaders(Vec); - -impl IntoIterator for VerifiedExtendedHeaders { - type Item = ExtendedHeader; - type IntoIter = vec::IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() - } -} - -impl<'a> TryFrom<&'a [ExtendedHeader]> for VerifiedExtendedHeaders { - type Error = celestia_types::Error; - - fn try_from(value: &'a [ExtendedHeader]) -> Result { - value.to_vec().try_into() - } -} - -impl From for Vec { - fn from(value: VerifiedExtendedHeaders) -> Self { - value.0 - } -} - -impl AsRef<[ExtendedHeader]> for VerifiedExtendedHeaders { - fn as_ref(&self) -> &[ExtendedHeader] { - &self.0 - } -} - -/// 1-length hedaer span is internally verified, this is valid -impl From<[ExtendedHeader; 1]> for VerifiedExtendedHeaders { - fn from(value: [ExtendedHeader; 1]) -> Self { - Self(value.into()) - } -} - -impl From for VerifiedExtendedHeaders { - fn from(value: ExtendedHeader) -> Self { - Self(vec![value]) - } -} - -impl<'a> From<&'a ExtendedHeader> for VerifiedExtendedHeaders { - fn from(value: &ExtendedHeader) -> Self { - Self(vec![value.to_owned()]) - } -} - -impl TryFrom> for VerifiedExtendedHeaders { - type Error = celestia_types::Error; - - fn try_from(headers: Vec) -> Result { - let Some(head) = headers.first() else { - return Ok(VerifiedExtendedHeaders(Vec::default())); - }; - - head.verify_adjacent_range(&headers[1..])?; - - Ok(Self(headers)) - } -} - -impl VerifiedExtendedHeaders { - /// Create a new instance out of pre-checked vec of headers - /// - /// # Safety - /// - /// This function may produce invalid `VerifiedExtendedHeaders`, if passed range is not - /// validated manually - pub unsafe fn new_unchecked(headers: Vec) -> Self { - Self(headers) - } -} - -pub(crate) trait RangeLengthExt { - fn len(&self) -> u64; -} - -impl RangeLengthExt for RangeInclusive { - fn len(&self) -> u64 { - match self.end().checked_sub(*self.start()) { - Some(difference) => difference + 1, - None => 0, - } - } -} - -/// Represents possibly multiple non-overlapping, sorted ranges of header heights -#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)] -pub struct HeaderRanges(SmallVec<[HeaderRange; 2]>); - -pub(crate) trait HeaderRangesExt { - /// Check whether provided `to_insert` range can be inserted into the header ranges represented - /// by self. New range can be inserted ahead of all existing ranges to allow syncing from the - /// head but otherwise, only growing the existing ranges is allowed. - /// Returned [`RangeScanResult`] contains information necessary to persist the range - /// modification in the database manually, or one can call [`update_range`] to modify ranges in - /// memory. - fn check_range_insert(&self, to_insert: &HeaderRange) -> Result; - /// Modify the header ranges, committing insert previously checked with [`check_range_insert`] - fn update_range(&mut self, scan_information: RangeScanResult); -} - -impl HeaderRangesExt for HeaderRanges { - fn update_range(&mut self, scan_information: RangeScanResult) { - let RangeScanResult { - range_index, - range, - range_to_remove, - } = scan_information; - - if self.0.len() == range_index { - self.0.push(range); - } else { - self.0[range_index] = range; - } - - if let Some(to_remove) = range_to_remove { - self.0.remove(to_remove); - } - } - - fn check_range_insert(&self, to_insert: &HeaderRange) -> Result { - let Some(head_range) = self.0.last() else { - // Empty store case - return Ok(RangeScanResult { - range_index: 0, - range: to_insert.clone(), - range_to_remove: None, - }); - }; - - // allow inserting a new header range in front of the current head range - // +1 in here to let ranges merge below in case they're contiguous - if *to_insert.start() > head_range.end() + 1 { - return Ok(RangeScanResult { - range_index: self.0.len(), - range: to_insert.clone(), - range_to_remove: None, - }); - } - - let mut stored_ranges_iter = self.0.iter().enumerate(); - let mut found_range = loop { - let Some((idx, stored_range)) = stored_ranges_iter.next() else { - return Err(StoreError::InsertPlacementDisallowed( - *to_insert.start(), - *to_insert.end(), - )); - }; - - if let Some(intersection) = ranges_intersection(stored_range, to_insert) { - return Err(StoreError::HeaderRangeOverlap( - *intersection.start(), - *intersection.end(), - )); - } - - if let Some(consolidated) = try_consolidate_ranges(stored_range, to_insert) { - break RangeScanResult { - range_index: idx, - range: consolidated, - range_to_remove: None, - }; - } - }; - - // we have a hit, check whether we can merge with the next range too - if let Some((idx, range_after)) = stored_ranges_iter.next() { - if let Some(intersection) = ranges_intersection(range_after, to_insert) { - return Err(StoreError::HeaderRangeOverlap( - *intersection.start(), - *intersection.end(), - )); - } - - if let Some(consolidated) = try_consolidate_ranges(range_after, &found_range.range) { - found_range = RangeScanResult { - range_index: found_range.range_index, - range: consolidated, - range_to_remove: Some(idx), - }; - } - } - - Ok(found_range) - } -} - -impl HeaderRanges { - /// Return whether `HeaderRanges` contains provided height - pub fn contains(&self, height: u64) -> bool { - self.0.iter().any(|r| r.contains(&height)) - } - - /// Return whether range is empty - pub fn is_empty(&self) -> bool { - self.0.iter().all(|r| r.is_empty()) - } - - /// Return highest height in the range - pub fn head(&self) -> Option { - self.0.last().map(|r| *r.end()) - } - - /// Return lowest height in the range - pub fn tail(&self) -> Option { - self.0.first().map(|r| *r.start()) - } - - pub(crate) fn into_inner(self) -> SmallVec<[HeaderRange; 2]> { - self.0 - } - - /// Crate HeaderRanges from correctly pre-sorted, non-overlapping SmallVec of ranges - pub(crate) fn from_vec(from: SmallVec<[HeaderRange; 2]>) -> Self { - #[cfg(debug_assertions)] - { - let mut prev: Option<&RangeInclusive> = None; - - for range in &from { - assert!( - range.start() <= range.end(), - "range isn't sorted internally" - ); - - if let Some(prev) = prev { - assert!( - prev.end() < range.start(), - "header ranges aren't sorted correctly" - ); - } - - prev = Some(range); - } - } - - Self(from) - } -} - -impl Display for HeaderRanges { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "[")?; - for (idx, range) in self.0.iter().enumerate() { - if idx == 0 { - write!(f, "{}-{}", range.start(), range.end())?; - } else { - write!(f, ", {}-{}", range.start(), range.end())?; - } - } - write!(f, "]") - } -} - -pub(crate) struct PrintableHeaderRange(pub RangeInclusive); - -impl Display for PrintableHeaderRange { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}-{}", self.0.start(), self.0.end()) - } -} - -impl AsRef<[RangeInclusive]> for HeaderRanges { - fn as_ref(&self) -> &[RangeInclusive] { - &self.0 - } -} - -/// Extends test header generator for easier insertion into the store -pub trait ExtendedHeaderGeneratorExt { - /// Generate next amount verified headers - fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders; -} - -#[cfg(any(test, feature = "test-utils"))] -impl ExtendedHeaderGeneratorExt for ExtendedHeaderGenerator { - fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders { - unsafe { VerifiedExtendedHeaders::new_unchecked(self.next_many(amount)) } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use smallvec::smallvec; - - #[test] - fn range_len() { - assert_eq!((0u64..=0).len(), 1); - assert_eq!((0u64..=5).len(), 6); - assert_eq!((1u64..=2).len(), 2); - assert_eq!(RangeInclusive::new(2u64, 1).len(), 0); - assert_eq!((10001u64..=20000).len(), 10000); - } - - #[test] - fn header_ranges_empty() { - assert!(HeaderRanges::from_vec(smallvec![]).is_empty()); - assert!(!HeaderRanges::from_vec(smallvec![1..=3]).is_empty()); - } - - #[test] - fn header_ranges_head() { - assert_eq!(HeaderRanges::from_vec(smallvec![]).head(), None); - assert_eq!(HeaderRanges::from_vec(smallvec![1..=3]).head(), Some(3)); - assert_eq!( - HeaderRanges::from_vec(smallvec![1..=3, 6..=9]).head(), - Some(9) - ); - assert_eq!( - HeaderRanges::from_vec(smallvec![1..=3, 5..=5, 8..=9]).head(), - Some(9) - ); - } - - #[test] - fn header_ranges_tail() { - assert_eq!(HeaderRanges::from_vec(smallvec![]).tail(), None); - assert_eq!(HeaderRanges::from_vec(smallvec![1..=3]).tail(), Some(1)); - assert_eq!( - HeaderRanges::from_vec(smallvec![1..=3, 6..=9]).tail(), - Some(1) - ); - assert_eq!( - HeaderRanges::from_vec(smallvec![1..=3, 5..=5, 8..=9]).tail(), - Some(1) - ); - } - - #[test] - fn check_range_insert_append() { - let result = HeaderRanges::from_vec(smallvec![]) - .check_range_insert(&(1..=5)) - .unwrap(); - assert_eq!( - result, - RangeScanResult { - range_index: 0, - range: 1..=5, - range_to_remove: None, - } - ); - - let result = HeaderRanges::from_vec(smallvec![1..=4]) - .check_range_insert(&(5..=5)) - .unwrap(); - assert_eq!( - result, - RangeScanResult { - range_index: 0, - range: 1..=5, - range_to_remove: None, - } - ); - - let result = HeaderRanges::from_vec(smallvec![1..=5]) - .check_range_insert(&(6..=9)) - .unwrap(); - assert_eq!( - result, - RangeScanResult { - range_index: 0, - range: 1..=9, - range_to_remove: None, - } - ); - - let result = HeaderRanges::from_vec(smallvec![6..=8]) - .check_range_insert(&(2..=5)) - .unwrap(); - assert_eq!( - result, - RangeScanResult { - range_index: 0, - range: 2..=8, - range_to_remove: None, - } - ); - } - - #[test] - fn check_range_insert_with_consolidation() { - let result = HeaderRanges::from_vec(smallvec![1..=3, 6..=9]) - .check_range_insert(&(4..=5)) - .unwrap(); - assert_eq!( - result, - RangeScanResult { - range_index: 0, - range: 1..=9, - range_to_remove: Some(1), - } - ); - - let result = HeaderRanges::from_vec(smallvec![1..=2, 5..=5, 8..=9]) - .check_range_insert(&(3..=4)) - .unwrap(); - assert_eq!( - result, - RangeScanResult { - range_index: 0, - range: 1..=5, - range_to_remove: Some(1), - } - ); - - let result = HeaderRanges::from_vec(smallvec![1..=2, 4..=4, 8..=9]) - .check_range_insert(&(5..=7)) - .unwrap(); - assert_eq!( - result, - RangeScanResult { - range_index: 1, - range: 4..=9, - range_to_remove: Some(2), - } - ); - } - - #[test] - fn check_range_insert_overlapping() { - let result = HeaderRanges::from_vec(smallvec![1..=2]) - .check_range_insert(&(1..=1)) - .unwrap_err(); - assert!(matches!(result, StoreError::HeaderRangeOverlap(1, 1))); - - let result = HeaderRanges::from_vec(smallvec![1..=4]) - .check_range_insert(&(2..=8)) - .unwrap_err(); - assert!(matches!(result, StoreError::HeaderRangeOverlap(2, 4))); - - let result = HeaderRanges::from_vec(smallvec![1..=4]) - .check_range_insert(&(2..=3)) - .unwrap_err(); - assert!(matches!(result, StoreError::HeaderRangeOverlap(2, 3))); - - let result = HeaderRanges::from_vec(smallvec![5..=9]) - .check_range_insert(&(1..=5)) - .unwrap_err(); - assert!(matches!(result, StoreError::HeaderRangeOverlap(5, 5))); - - let result = HeaderRanges::from_vec(smallvec![5..=8]) - .check_range_insert(&(2..=8)) - .unwrap_err(); - assert!(matches!(result, StoreError::HeaderRangeOverlap(5, 8))); - - let result = HeaderRanges::from_vec(smallvec![1..=3, 6..=9]) - .check_range_insert(&(3..=6)) - .unwrap_err(); - assert!(matches!(result, StoreError::HeaderRangeOverlap(3, 3))); - } - - #[test] - fn check_range_insert_invalid_placement() { - let result = HeaderRanges::from_vec(smallvec![1..=2, 7..=9]) - .check_range_insert(&(4..=4)) - .unwrap_err(); - assert!(matches!( - result, - StoreError::InsertPlacementDisallowed(4, 4) - )); - - let result = HeaderRanges::from_vec(smallvec![1..=2, 8..=9]) - .check_range_insert(&(4..=6)) - .unwrap_err(); - assert!(matches!( - result, - StoreError::InsertPlacementDisallowed(4, 6) - )); - - let result = HeaderRanges::from_vec(smallvec![4..=5, 7..=8]) - .check_range_insert(&(1..=2)) - .unwrap_err(); - assert!(matches!( - result, - StoreError::InsertPlacementDisallowed(1, 2) - )); - } - - #[test] - fn test_header_range_creation_ok() { - HeaderRanges::from_vec(smallvec![1..=3, 5..=8]); - HeaderRanges::from_vec(smallvec![]); - HeaderRanges::from_vec(smallvec![1..=1, 1000000..=2000000]); - } - - #[test] - #[should_panic] - fn test_header_range_creation_overlap() { - HeaderRanges::from_vec(smallvec![1..=3, 2..=5]); - } - - #[test] - #[should_panic] - fn test_header_range_creation_inverse() { - HeaderRanges::from_vec(smallvec![1..=3, RangeInclusive::new(9, 5)]); - } - - #[test] - #[should_panic] - fn test_header_range_creation_wrong_order() { - HeaderRanges::from_vec(smallvec![10..=15, 1..=5]); - } -} diff --git a/node/src/store/in_memory_store.rs b/node/src/store/in_memory_store.rs index 9b32ebfa..8af232d3 100644 --- a/node/src/store/in_memory_store.rs +++ b/node/src/store/in_memory_store.rs @@ -6,12 +6,11 @@ use async_trait::async_trait; use celestia_types::hash::Hash; use celestia_types::ExtendedHeader; use cid::Cid; -use dashmap::mapref::entry::Entry as DashMapEntry; -use dashmap::DashMap; use tokio::sync::{Notify, RwLock}; use tracing::debug; -use crate::store::header_ranges::{HeaderRanges, HeaderRangesExt, VerifiedExtendedHeaders}; +use crate::block_ranges::BlockRanges; +use crate::store::utils::VerifiedExtendedHeaders; use crate::store::{Result, SamplingMetadata, SamplingStatus, Store, StoreError}; /// A non-persistent in memory [`Store`] implementation. @@ -19,8 +18,6 @@ use crate::store::{Result, SamplingMetadata, SamplingStatus, Store, StoreError}; pub struct InMemoryStore { /// Mutable part inner: RwLock, - /// Maps header height to the header sampling metadata, used by DAS - sampling_data: DashMap, /// Notify when a new header is added header_added_notifier: Notify, } @@ -32,7 +29,11 @@ struct InMemoryStoreInner { /// Maps header height to its hash, in case we need to do lookup by height height_to_hash: HashMap, /// Source of truth about headers present in the db, used to synchronise inserts - stored_ranges: HeaderRanges, + header_ranges: BlockRanges, + /// Maps header height to the header sampling metadata + sampling_data: HashMap, + /// Source of truth about accepted sampling ranges present in the db. + accepted_sampling_ranges: BlockRanges, } impl InMemoryStoreInner { @@ -40,7 +41,9 @@ impl InMemoryStoreInner { Self { headers: HashMap::new(), height_to_hash: HashMap::new(), - stored_ranges: HeaderRanges::default(), + header_ranges: BlockRanges::default(), + sampling_data: HashMap::new(), + accepted_sampling_ranges: BlockRanges::default(), } } } @@ -50,7 +53,6 @@ impl InMemoryStore { pub fn new() -> Self { InMemoryStore { inner: RwLock::new(InMemoryStoreInner::new()), - sampling_data: DashMap::new(), header_added_notifier: Notify::new(), } } @@ -98,63 +100,46 @@ impl InMemoryStore { status: SamplingStatus, cids: Vec, ) -> Result<()> { - if !self.contains_height(height).await { - return Err(StoreError::NotFound); - } - - match self.sampling_data.entry(height) { - DashMapEntry::Vacant(entry) => { - entry.insert(SamplingMetadata { status, cids }); - } - DashMapEntry::Occupied(mut entry) => { - let metadata = entry.get_mut(); - metadata.status = status; - - for cid in cids { - if !metadata.cids.contains(&cid) { - metadata.cids.push(cid); - } - } - } - } - - Ok(()) + self.inner + .write() + .await + .update_sampling_metadata(height, status, cids) + .await } async fn get_sampling_metadata(&self, height: u64) -> Result> { - if !self.contains_height(height).await { - return Err(StoreError::NotFound); - } - - let Some(metadata) = self.sampling_data.get(&height) else { - return Ok(None); - }; - - Ok(Some(metadata.clone())) + self.inner.read().await.get_sampling_metadata(height).await } - async fn get_stored_ranges(&self) -> HeaderRanges { + async fn get_stored_ranges(&self) -> BlockRanges { self.inner.read().await.get_stored_ranges() } + async fn get_accepted_sampling_ranges(&self) -> BlockRanges { + self.inner.read().await.get_accepted_sampling_ranges() + } + /// Clone the store and all its contents. Async fn due to internal use of async mutex. pub async fn async_clone(&self) -> Self { InMemoryStore { inner: RwLock::new(self.inner.read().await.clone()), - sampling_data: self.sampling_data.clone(), header_added_notifier: Notify::new(), } } } impl InMemoryStoreInner { - fn get_stored_ranges(&self) -> HeaderRanges { - self.stored_ranges.clone() + fn get_stored_ranges(&self) -> BlockRanges { + self.header_ranges.clone() + } + + fn get_accepted_sampling_ranges(&self) -> BlockRanges { + self.accepted_sampling_ranges.clone() } #[inline] fn get_head_height(&self) -> Result { - self.stored_ranges.head().ok_or(StoreError::NotFound) + self.header_ranges.head().ok_or(StoreError::NotFound) } fn contains_hash(&self, hash: &Hash) -> bool { @@ -166,7 +151,7 @@ impl InMemoryStoreInner { } fn contains_height(&self, height: u64) -> bool { - self.stored_ranges.contains(height) + self.header_ranges.contains(height) } fn get_by_height(&self, height: u64) -> Result { @@ -186,10 +171,10 @@ impl InMemoryStoreInner { }; let headers_range = head.height().value()..=tail.height().value(); - let range_scan_result = self.stored_ranges.check_range_insert(&headers_range)?; + let (prev_exists, next_exists) = self + .header_ranges + .check_insertion_constraints(&headers_range)?; - let prev_exists = headers_range.start() != range_scan_result.range.start(); - let next_exists = headers_range.end() != range_scan_result.range.end(); // header range is already internally verified against itself in `P2p::get_unverified_header_ranges` self.verify_against_neighbours(prev_exists.then_some(head), next_exists.then_some(tail))?; @@ -222,7 +207,7 @@ impl InMemoryStoreInner { height_entry.insert(hash); } - self.stored_ranges.update_range(range_scan_result); + self.header_ranges.insert_relaxed(headers_range)?; Ok(()) } @@ -263,6 +248,58 @@ impl InMemoryStoreInner { } Ok(()) } + + async fn update_sampling_metadata( + &mut self, + height: u64, + status: SamplingStatus, + cids: Vec, + ) -> Result<()> { + if !self.contains_height(height) { + return Err(StoreError::NotFound); + } + + match self.sampling_data.entry(height) { + HashMapEntry::Vacant(entry) => { + entry.insert(SamplingMetadata { status, cids }); + } + HashMapEntry::Occupied(mut entry) => { + let metadata = entry.get_mut(); + metadata.status = status; + + for cid in cids { + if !metadata.cids.contains(&cid) { + metadata.cids.push(cid); + } + } + } + } + + match status { + SamplingStatus::Accepted => self + .accepted_sampling_ranges + .insert_relaxed(height..=height) + .expect("invalid height"), + _ => self + .accepted_sampling_ranges + .remove_relaxed(height..=height) + .expect("invalid height"), + } + + Ok(()) + } + + async fn get_sampling_metadata(&self, height: u64) -> Result> { + if !self.contains_height(height) { + return Err(StoreError::NotFound); + } + + let Some(metadata) = self.sampling_data.get(&height) else { + return Ok(None); + }; + + Ok(Some(metadata.clone())) + } } #[async_trait] @@ -347,9 +384,13 @@ impl Store for InMemoryStore { self.get_sampling_metadata(height).await } - async fn get_stored_header_ranges(&self) -> Result { + async fn get_stored_header_ranges(&self) -> Result { Ok(self.get_stored_ranges().await) } + + async fn get_accepted_sampling_ranges(&self) -> Result { + Ok(self.get_accepted_sampling_ranges().await) + } } impl Default for InMemoryStore { diff --git a/node/src/store/indexed_db_store.rs b/node/src/store/indexed_db_store.rs index 7e7e69c0..debff02b 100644 --- a/node/src/store/indexed_db_store.rs +++ b/node/src/store/indexed_db_store.rs @@ -11,26 +11,32 @@ use rexie::{Direction, Index, KeyRange, ObjectStore, Rexie, TransactionMode}; use send_wrapper::SendWrapper; use serde::{Deserialize, Serialize}; use serde_wasm_bindgen::{from_value, to_value}; +use smallvec::smallvec; use tokio::sync::Notify; +use tracing::warn; +use wasm_bindgen::JsValue; -use crate::store::header_ranges::{ - HeaderRange, HeaderRanges, HeaderRangesExt, VerifiedExtendedHeaders, -}; -use crate::store::utils::RangeScanResult; +use crate::block_ranges::BlockRanges; +use crate::store::utils::VerifiedExtendedHeaders; use crate::store::{Result, SamplingMetadata, SamplingStatus, Store, StoreError}; /// indexeddb version, needs to be incremented on every schema schange -const DB_VERSION: u32 = 3; +const DB_VERSION: u32 = 4; // Data stores (SQL table analogue) used in IndexedDb const HEADER_STORE_NAME: &str = "headers"; const SAMPLING_STORE_NAME: &str = "sampling"; const RANGES_STORE_NAME: &str = "ranges"; +const SCHEMA_STORE_NAME: &str = "schema"; // Additional indexes set on HEADER_STORE, for querying by height and hash const HASH_INDEX_NAME: &str = "hash"; const HEIGHT_INDEX_NAME: &str = "height"; +const ACCEPTED_SAMPLING_RANGES_KEY: &str = "accepted_sampling_ranges"; +const HEADER_RANGES_KEY: &str = "header_ranges"; +const VERSION_KEY: &str = "version"; + #[derive(Debug, Serialize, Deserialize)] struct ExtendedHeaderEntry { // We use those fields as indexes, names need to match ones in `add_index` @@ -61,40 +67,56 @@ impl IndexedDbStore { .add_index(Index::new(HASH_INDEX_NAME, "hash").unique(true)) .add_index(Index::new(HEIGHT_INDEX_NAME, "height").unique(true)), ) - .add_object_store(ObjectStore::new(SAMPLING_STORE_NAME)) .add_object_store(ObjectStore::new(RANGES_STORE_NAME)) + .add_object_store(ObjectStore::new(SAMPLING_STORE_NAME)) + .add_object_store(ObjectStore::new(SCHEMA_STORE_NAME)) .build() .await .map_err(|e| StoreError::OpenFailed(e.to_string()))?; + // NOTE: Rexie does not expose any migration functionality, so we + // write our version in the store in order to handle it properly. + match detect_schema_version(&rexie).await? { + Some(schema_version) => { + if schema_version > DB_VERSION { + let e = format!( + "Incompatible database schema; found {}, expected {}.", + schema_version, DB_VERSION + ); + return Err(StoreError::OpenFailed(e)); + } + + migrate_older_to_v4(&rexie).await?; + } + None => { + // New database + let tx = rexie.transaction(&[SCHEMA_STORE_NAME], TransactionMode::ReadWrite)?; + + let schema_store = tx.store(SCHEMA_STORE_NAME)?; + set_schema_version(&schema_store, DB_VERSION).await?; + + tx.commit().await?; + } + } + + // Force us to write migrations! + debug_assert_eq!( + detect_schema_version(&rexie).await?, + Some(DB_VERSION), + "Some migrations are missing" + ); + let db_head = match get_head_from_database(&rexie).await { Ok(v) => Some(v), Err(StoreError::NotFound) => None, Err(e) => return Err(e), }; - let store = Self { - head: SendWrapper::new(RefCell::new(db_head.clone())), + Ok(IndexedDbStore { + head: SendWrapper::new(RefCell::new(db_head)), db: SendWrapper::new(rexie), header_added_notifier: Notify::new(), - }; - - if let Some(head) = &db_head { - if store.get_stored_header_ranges().await?.is_empty() { - let tx = store - .db - .transaction(&[RANGES_STORE_NAME], TransactionMode::ReadWrite)?; - let ranges_store = tx.store(RANGES_STORE_NAME)?; - let jsvalue_range = to_value(&(1, head.height().value()))?; - let jsvalue_key = to_value(&0)?; - - ranges_store.put(&jsvalue_range, Some(&jsvalue_key)).await?; - - tx.commit().await?; - } - } - - Ok(store) + }) } /// Delete the persistent store. @@ -145,22 +167,13 @@ impl IndexedDbStore { .map_err(|e| StoreError::CelestiaTypes(e.into())) } - async fn get_stored_header_ranges(&self) -> Result { + async fn get_stored_header_ranges(&self) -> Result { let tx = self .db .transaction(&[RANGES_STORE_NAME], TransactionMode::ReadOnly)?; let store = tx.store(RANGES_STORE_NAME)?; - let ranges = HeaderRanges::from_vec( - store - .get_all(None, None, None, Some(Direction::Next)) - .await? - .into_iter() - .map(|(_k, v)| from_value::<(u64, u64)>(v).map(|(begin, end)| begin..=end)) - .collect::>()?, - ); - - Ok(ranges) + get_ranges(&store, HEADER_RANGES_KEY).await } async fn insert(&self, headers: R) -> Result<()> @@ -180,8 +193,11 @@ impl IndexedDbStore { let header_store = tx.store(HEADER_STORE_NAME)?; let ranges_store = tx.store(RANGES_STORE_NAME)?; + let mut header_ranges = get_ranges(&ranges_store, HEADER_RANGES_KEY).await?; + let headers_range = head.height().value()..=tail.height().value(); - let (prev_exists, next_exists) = try_insert_to_range(&ranges_store, headers_range).await?; + let (prev_exists, next_exists) = + header_ranges.check_insertion_constraints(&headers_range)?; // header range is already internally verified against itself in `P2p::get_unverified_header_ranges` verify_against_neighbours( @@ -214,6 +230,11 @@ impl IndexedDbStore { header_store.add(&jsvalue_header, None).await?; } + header_ranges.insert_relaxed(headers_range)?; + set_ranges(&ranges_store, HEADER_RANGES_KEY, &header_ranges).await?; + + tx.commit().await?; + if tail.height().value() > self .head @@ -224,7 +245,7 @@ impl IndexedDbStore { { self.head.replace(Some(tail.clone())); } - tx.commit().await?; + self.header_added_notifier.notify_waiters(); Ok(()) @@ -258,17 +279,21 @@ impl IndexedDbStore { status: SamplingStatus, cids: Vec, ) -> Result<()> { - if !self.contains_height(height).await { + let tx = self.db.transaction( + &[SAMPLING_STORE_NAME, RANGES_STORE_NAME], + TransactionMode::ReadWrite, + )?; + let sampling_store = tx.store(SAMPLING_STORE_NAME)?; + let ranges_store = tx.store(RANGES_STORE_NAME)?; + + let header_ranges = get_ranges(&ranges_store, HEADER_RANGES_KEY).await?; + let mut accepted_ranges = get_ranges(&ranges_store, ACCEPTED_SAMPLING_RANGES_KEY).await?; + + if !header_ranges.contains(height) { return Err(StoreError::NotFound); } let height_key = to_value(&height)?; - - let tx = self - .db - .transaction(&[SAMPLING_STORE_NAME], TransactionMode::ReadWrite)?; - let sampling_store = tx.store(SAMPLING_STORE_NAME)?; - let previous_entry = sampling_store.get(&height_key).await?; let new_entry = if previous_entry.is_falsy() { @@ -288,11 +313,26 @@ impl IndexedDbStore { }; let metadata_jsvalue = to_value(&new_entry)?; - sampling_store .put(&metadata_jsvalue, Some(&height_key)) .await?; + match status { + SamplingStatus::Accepted => accepted_ranges + .insert_relaxed(height..=height) + .expect("invalid height"), + _ => accepted_ranges + .remove_relaxed(height..=height) + .expect("invalid height"), + } + + set_ranges( + &ranges_store, + ACCEPTED_SAMPLING_RANGES_KEY, + &accepted_ranges, + ) + .await?; + tx.commit().await?; Ok(()) @@ -317,6 +357,15 @@ impl IndexedDbStore { Ok(Some(from_value(sampling_entry)?)) } + + async fn get_sampling_ranges(&self) -> Result { + let tx = self + .db + .transaction(&[RANGES_STORE_NAME], TransactionMode::ReadWrite)?; + let store = tx.store(RANGES_STORE_NAME)?; + + get_ranges(&store, ACCEPTED_SAMPLING_RANGES_KEY).await + } } #[async_trait] @@ -409,10 +458,15 @@ impl Store for IndexedDbStore { fut.await } - async fn get_stored_header_ranges(&self) -> Result { + async fn get_stored_header_ranges(&self) -> Result { let fut = SendWrapper::new(self.get_stored_header_ranges()); fut.await } + + async fn get_accepted_sampling_ranges(&self) -> Result { + let fut = SendWrapper::new(self.get_sampling_ranges()); + fut.await + } } impl From for StoreError { @@ -431,57 +485,39 @@ impl From for StoreError { } } -async fn get_head_from_database(db: &Rexie) -> Result { - let tx = db.transaction(&[HEADER_STORE_NAME], TransactionMode::ReadOnly)?; - let store = tx.store(HEADER_STORE_NAME)?; - - let store_head = store - .get_all(None, Some(1), None, Some(Direction::Prev)) - .await? - .first() - .ok_or(StoreError::NotFound)? - .1 - .to_owned(); +async fn get_ranges(store: &rexie::Store, name: &str) -> Result { + let key = JsValue::from_str(name); + let raw_ranges = store.get(&key).await?; - let serialized_header = from_value::(store_head)?.header; + if raw_ranges.is_falsy() { + // Ranges not set yet + return Ok(BlockRanges::default()); + } - ExtendedHeader::decode(serialized_header.as_ref()) - .map_err(|e| StoreError::CelestiaTypes(e.into())) + Ok(from_value(raw_ranges)?) } -async fn try_insert_to_range( - ranges_store: &rexie::Store, - new_range: HeaderRange, -) -> Result<(bool, bool)> { - let stored_ranges = HeaderRanges::from_vec( - ranges_store - .get_all(None, None, None, Some(Direction::Next)) - .await? - .into_iter() - .map(|(_k, v)| from_value::<(u64, u64)>(v).map(|(start, end)| start..=end)) - .collect::>()?, - ); - - let RangeScanResult { - range_index, - range, - range_to_remove, - } = stored_ranges.check_range_insert(&new_range)?; +async fn set_ranges(store: &rexie::Store, name: &str, ranges: &BlockRanges) -> Result<()> { + let key = JsValue::from_str(name); + let val = to_value(ranges)?; - if let Some(to_remove) = range_to_remove { - let jsvalue_key_to_remove = to_value(&to_remove)?; - ranges_store.delete(&jsvalue_key_to_remove).await?; - } + store.put(&val, Some(&key)).await?; - let jsvalue_range = to_value(&(*range.start(), *range.end()))?; - let jsvalue_key = to_value(&range_index)?; + Ok(()) +} - ranges_store.put(&jsvalue_range, Some(&jsvalue_key)).await?; +async fn get_head_from_database(db: &Rexie) -> Result { + let tx = db.transaction( + &[HEADER_STORE_NAME, RANGES_STORE_NAME], + TransactionMode::ReadOnly, + )?; + let header_store = tx.store(HEADER_STORE_NAME)?; + let ranges_store = tx.store(RANGES_STORE_NAME)?; - let prev_exists = new_range.start() != range.start(); - let next_exists = new_range.end() != range.end(); + let ranges = get_ranges(&ranges_store, HEADER_RANGES_KEY).await?; + let head_height = ranges.head().ok_or(StoreError::NotFound)?; - Ok((prev_exists, next_exists)) + get_by_height(&header_store, head_height).await } async fn get_by_height(header_store: &rexie::Store, height: u64) -> Result { @@ -537,10 +573,143 @@ async fn verify_against_neighbours( Ok(()) } +/// Get schema version from the db, or perform a heuristic to try to determine +/// version used (for verisons <4). +async fn detect_schema_version(db: &Rexie) -> Result> { + let tx = db.transaction( + &[HEADER_STORE_NAME, RANGES_STORE_NAME, SCHEMA_STORE_NAME], + TransactionMode::ReadOnly, + )?; + let schema_store = tx.store(SCHEMA_STORE_NAME)?; + let ranges_store = tx.store(RANGES_STORE_NAME)?; + let header_store = tx.store(HEADER_STORE_NAME)?; + + // Schema version exists from v4 and above. + if let Ok(version) = get_schema_version(&schema_store).await { + return Ok(Some(version)); + } + + // If schema version does not exist in db but ranges store + // has values then assume we are in version 3. + if !store_is_empty(&ranges_store).await? { + return Ok(Some(3)); + } + + // If ranges store does not have any values but header for height 1 + // exists, then we assume we are in version 2. + let height_key = to_value(&1)?; + let height_index = header_store.index(HEIGHT_INDEX_NAME)?; + if height_index.get(&height_key).await?.is_truthy() { + return Ok(Some(2)); + } + + // Otherwise we assume we have a new db. + Ok(None) +} + +async fn store_is_empty(store: &rexie::Store) -> Result { + let vals = store.get_all(None, Some(1), None, None).await?; + Ok(vals.is_empty()) +} + +async fn get_schema_version(store: &rexie::Store) -> Result { + let key = to_value(VERSION_KEY)?; + let val = store.get(&key).await?; + Ok(from_value(val)?) +} + +async fn set_schema_version(store: &rexie::Store, version: u32) -> Result<()> { + let key = to_value(VERSION_KEY)?; + let val = to_value(&version)?; + store.put(&val, Some(&key)).await?; + Ok(()) +} + +async fn migrate_older_to_v4(db: &Rexie) -> Result<()> { + let Some(version) = detect_schema_version(db).await? else { + // New database. + return Ok(()); + }; + + if version >= 4 { + // Nothing to migrate. + return Ok(()); + } + + warn!("Migrating DB schema from v{version} to v4"); + + let tx = db.transaction( + &[HEADER_STORE_NAME, RANGES_STORE_NAME, SCHEMA_STORE_NAME], + TransactionMode::ReadWrite, + )?; + let header_store = tx.store(HEADER_STORE_NAME)?; + let ranges_store = tx.store(RANGES_STORE_NAME)?; + let schema_store = tx.store(SCHEMA_STORE_NAME)?; + + let ranges = if version <= 2 { + match v2::get_head_header(&header_store).await { + // On v2 there were no gaps between headers. + Ok(head) => BlockRanges::from_vec(smallvec![1..=head.height().value()])?, + Err(StoreError::NotFound) => BlockRanges::new(), + Err(e) => return Err(e), + } + } else { + // On v3 ranges existed but in different format. + v3::get_header_ranges(&ranges_store).await? + }; + + ranges_store.clear().await?; + set_ranges(&ranges_store, HEADER_RANGES_KEY, &ranges).await?; + + // Migrated to version 4 + set_schema_version(&schema_store, 4).await?; + + tx.commit().await?; + + Ok(()) +} + +mod v2 { + use super::*; + + pub(super) async fn get_head_header(store: &rexie::Store) -> Result { + let store_head = store + .get_all(None, Some(1), None, Some(Direction::Prev)) + .await? + .first() + .ok_or(StoreError::NotFound)? + .1 + .to_owned(); + + let serialized_header = from_value::(store_head)?.header; + + ExtendedHeader::decode(serialized_header.as_ref()) + .map_err(|e| StoreError::CelestiaTypes(e.into())) + } +} + +mod v3 { + use super::*; + + pub(super) async fn get_header_ranges(store: &rexie::Store) -> Result { + let mut ranges = BlockRanges::default(); + + for (_, raw_range) in store + .get_all(None, None, None, Some(Direction::Next)) + .await? + { + let (start, end) = from_value::<(u64, u64)>(raw_range)?; + ranges.insert_relaxed(start..=end)?; + } + + Ok(ranges) + } +} + #[cfg(test)] pub mod tests { use super::*; - use crate::store::header_ranges::ExtendedHeaderGeneratorExt; + use crate::store::utils::ExtendedHeaderGeneratorExt; use celestia_types::test_utils::ExtendedHeaderGenerator; use function_name::named; use wasm_bindgen_test::wasm_bindgen_test; diff --git a/node/src/store/redb_store.rs b/node/src/store/redb_store.rs index 52c7d184..0bf53397 100644 --- a/node/src/store/redb_store.rs +++ b/node/src/store/redb_store.rs @@ -14,18 +14,15 @@ use redb::{ }; use tokio::sync::Notify; use tokio::task::spawn_blocking; +use tracing::warn; use tracing::{debug, trace}; -use crate::store::header_ranges::{ - HeaderRange, HeaderRanges, HeaderRangesExt, VerifiedExtendedHeaders, -}; -use crate::store::utils::RangeScanResult; +use crate::block_ranges::BlockRanges; +use crate::store::utils::VerifiedExtendedHeaders; use crate::store::{Result, SamplingMetadata, SamplingStatus, Store, StoreError}; -const SCHEMA_VERSION: u64 = 1; +const SCHEMA_VERSION: u64 = 2; -const HEADER_HEIGHT_RANGES: TableDefinition<'static, u64, (u64, u64)> = - TableDefinition::new("STORE.HEIGHT_RANGES"); const HEIGHTS_TABLE: TableDefinition<'static, &[u8], u64> = TableDefinition::new("STORE.HEIGHTS"); const HEADERS_TABLE: TableDefinition<'static, u64, &[u8]> = TableDefinition::new("STORE.HEADERS"); const SAMPLING_METADATA_TABLE: TableDefinition<'static, u64, &[u8]> = @@ -33,6 +30,12 @@ const SAMPLING_METADATA_TABLE: TableDefinition<'static, u64, &[u8]> = const SCHEMA_VERSION_TABLE: TableDefinition<'static, (), u64> = TableDefinition::new("STORE.SCHEMA_VERSION"); +const RANGES_TABLE: TableDefinition<'static, &str, Vec<(u64, u64)>> = + TableDefinition::new("STORE.RANGES"); + +const ACCEPTED_SAMPING_RANGES_KEY: &str = "KEY.ACCEPTED_SAMPING_RANGES"; +const HEADER_RANGES_KEY: &str = "KEY.HEADER_RANGES"; + /// A [`Store`] implementation based on a [`redb`] database. #[derive(Debug)] pub struct RedbStore { @@ -84,20 +87,35 @@ impl RedbStore { match schema_version { Some(schema_version) => { - // TODO: When we update the schema we need to perform manual migration - if schema_version != SCHEMA_VERSION { - let e = format!("Incompatible database schema; found {schema_version}, expected {SCHEMA_VERSION}."); + if schema_version > SCHEMA_VERSION { + let e = format!( + "Incompatible database schema; found {}, expected {}.", + schema_version, SCHEMA_VERSION + ); return Err(StoreError::OpenFailed(e)); } + + // Do migrations + migrate_v1_to_v2(tx, &mut schema_version_table)?; } None => { + // New database schema_version_table.insert((), SCHEMA_VERSION)?; } } + // Force us to write migrations! + debug_assert_eq!( + schema_version_table.get(())?.map(|guard| guard.value()), + Some(SCHEMA_VERSION), + "Some migrations are missing" + ); + // create tables, so that reads later don't complain let _heights_table = tx.open_table(HEIGHTS_TABLE)?; - let _ranges_table = tx.open_table(HEADER_HEIGHT_RANGES)?; + let _headers_table = tx.open_table(HEADERS_TABLE)?; + let _ranges_table = tx.open_table(RANGES_TABLE)?; + let _sampling_table = tx.open_table(SAMPLING_METADATA_TABLE)?; Ok(()) }) @@ -160,14 +178,10 @@ impl RedbStore { async fn head_height(&self) -> Result { self.read_tx(|tx| { - let table = tx.open_table(HEADER_HEIGHT_RANGES)?; - let highest_range = get_head_range(&table)?; + let table = tx.open_table(RANGES_TABLE)?; + let header_ranges = get_ranges(&table, HEADER_RANGES_KEY)?; - if highest_range.is_empty() { - Err(StoreError::NotFound) - } else { - Ok(*highest_range.end()) - } + header_ranges.head().ok_or(StoreError::NotFound) }) .await } @@ -195,11 +209,13 @@ impl RedbStore { async fn get_head(&self) -> Result { self.read_tx(|tx| { - let height_ranges_table = tx.open_table(HEADER_HEIGHT_RANGES)?; + let ranges_table = tx.open_table(RANGES_TABLE)?; let headers_table = tx.open_table(HEADERS_TABLE)?; - let head_range = get_head_range(&height_ranges_table)?; - get_header(&headers_table, *head_range.end()) + let header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?; + let head = header_ranges.head().ok_or(StoreError::NotFound)?; + + get_header(&headers_table, head) }) .await } @@ -233,18 +249,23 @@ impl RedbStore { StoreError: From<>::Error>, { let headers = headers.try_into()?; + self.write_tx(move |tx| { let headers = headers.as_ref(); + let (Some(head), Some(tail)) = (headers.first(), headers.last()) else { return Ok(()); }; + let mut heights_table = tx.open_table(HEIGHTS_TABLE)?; let mut headers_table = tx.open_table(HEADERS_TABLE)?; - let mut height_ranges_table = tx.open_table(HEADER_HEIGHT_RANGES)?; + let mut ranges_table = tx.open_table(RANGES_TABLE)?; + let mut header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?; let headers_range = head.height().value()..=tail.height().value(); + let (prev_exists, next_exists) = - try_insert_to_range(&mut height_ranges_table, headers_range)?; + header_ranges.check_insertion_constraints(&headers_range)?; verify_against_neighbours( &headers_table, @@ -274,10 +295,12 @@ impl RedbStore { trace!("Inserted header {hash} with height {height}"); } - debug!( - "Inserted header range {:?}", - head.height().value()..=tail.height().value() - ); + + header_ranges.insert_relaxed(&headers_range)?; + set_ranges(&mut ranges_table, HEADER_RANGES_KEY, &header_ranges)?; + + debug!("Inserted header range {headers_range:?}",); + Ok(()) }) .await?; @@ -295,8 +318,12 @@ impl RedbStore { ) -> Result<()> { self.write_tx(move |tx| { let mut sampling_metadata_table = tx.open_table(SAMPLING_METADATA_TABLE)?; - let ranges_table = tx.open_table(HEADER_HEIGHT_RANGES)?; - if !get_all_ranges(&ranges_table)?.contains(height) { + let mut ranges_table = tx.open_table(RANGES_TABLE)?; + + let header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?; + let mut sampling_ranges = get_ranges(&ranges_table, ACCEPTED_SAMPING_RANGES_KEY)?; + + if !header_ranges.contains(height) { return Err(StoreError::NotFound); } @@ -323,6 +350,21 @@ impl RedbStore { sampling_metadata_table.insert(height, &serialized[..])?; + match status { + SamplingStatus::Accepted => sampling_ranges + .insert_relaxed(height..=height) + .expect("invalid height"), + _ => sampling_ranges + .remove_relaxed(height..=height) + .expect("invalid height"), + } + + set_ranges( + &mut ranges_table, + ACCEPTED_SAMPING_RANGES_KEY, + &sampling_ranges, + )?; + Ok(()) }) .await @@ -342,16 +384,24 @@ impl RedbStore { .await } - async fn get_stored_ranges(&self) -> Result { + async fn get_stored_ranges(&self) -> Result { let ranges = self .read_tx(|tx| { - let table = tx.open_table(HEADER_HEIGHT_RANGES)?; - get_all_ranges(&table) + let table = tx.open_table(RANGES_TABLE)?; + get_ranges(&table, HEADER_RANGES_KEY) }) .await?; Ok(ranges) } + + async fn get_sampling_ranges(&self) -> Result { + self.read_tx(|tx| { + let table = tx.open_table(RANGES_TABLE)?; + get_ranges(&table, ACCEPTED_SAMPING_RANGES_KEY) + }) + .await + } } #[async_trait] @@ -436,48 +486,13 @@ impl Store for RedbStore { self.get_sampling_metadata(height).await } - async fn get_stored_header_ranges(&self) -> Result { + async fn get_stored_header_ranges(&self) -> Result { Ok(self.get_stored_ranges().await?) } -} -fn try_insert_to_range( - ranges_table: &mut Table, - new_range: HeaderRange, -) -> Result<(bool, bool)> { - let stored_ranges = HeaderRanges::from_vec( - ranges_table - .iter()? - .map(|range_guard| { - let range = range_guard?.1.value(); - Ok(range.0..=range.1) - }) - .collect::>()?, - ); - - let RangeScanResult { - range_index, - range, - range_to_remove, - } = stored_ranges.check_range_insert(&new_range)?; - - if let Some(to_remove) = range_to_remove { - let (start, end) = ranges_table - .remove(u64::try_from(to_remove).expect("usize->u64"))? - .expect("missing range") - .value(); - - debug!("consolidating range, new range: {range:?}, removed {start}..={end}"); - }; - let prev_exists = new_range.start() != range.start(); - let next_exists = new_range.end() != range.end(); - - ranges_table.insert( - u64::try_from(range_index).expect("usize->u64"), - (*range.start(), *range.end()), - )?; - - Ok((prev_exists, next_exists)) + async fn get_accepted_sampling_ranges(&self) -> Result { + self.get_sampling_ranges().await + } } fn verify_against_neighbours( @@ -513,32 +528,38 @@ where Ok(()) } -fn get_head_range(ranges_table: &R) -> Result> +fn get_ranges(ranges_table: &R, name: &str) -> Result where - R: ReadableTable, + R: ReadableTable<&'static str, Vec<(u64, u64)>>, { - ranges_table - .last()? - .map(|(_key_guard, value_guard)| { - let range = value_guard.value(); - range.0..=range.1 + let raw_ranges = ranges_table + .get(name)? + .map(|guard| { + guard + .value() + .iter() + .map(|(start, end)| *start..=*end) + .collect() }) - .ok_or(StoreError::NotFound) + .unwrap_or_default(); + + Ok(BlockRanges::from_vec(raw_ranges)?) } -fn get_all_ranges(ranges_table: &R) -> Result -where - R: ReadableTable, -{ - Ok(HeaderRanges::from_vec( - ranges_table - .iter()? - .map(|range_guard| { - let range = range_guard?.1.value(); - Ok(range.0..=range.1) - }) - .collect::>()?, - )) +fn set_ranges( + ranges_table: &mut Table<&str, Vec<(u64, u64)>>, + name: &str, + ranges: &BlockRanges, +) -> Result<()> { + let raw_ranges: &[RangeInclusive] = ranges.as_ref(); + let raw_ranges = raw_ranges + .iter() + .map(|range| (*range.start(), *range.end())) + .collect::>(); + + ranges_table.insert(name, raw_ranges)?; + + Ok(()) } #[inline] @@ -619,6 +640,42 @@ impl From for StoreError { } } +fn migrate_v1_to_v2( + tx: &WriteTransaction, + schema_version_table: &mut Table<(), u64>, +) -> Result<()> { + const HEADER_HEIGHT_RANGES: TableDefinition<'static, u64, (u64, u64)> = + TableDefinition::new("STORE.HEIGHT_RANGES"); + + let schema_version = schema_version_table.get(())?.map(|guard| guard.value()); + + // We only migrate from v1 + if schema_version != Some(1) { + return Ok(()); + } + + warn!("Migrating DB schema from v1 to v2"); + + let header_ranges_table = tx.open_table(HEADER_HEIGHT_RANGES)?; + let mut ranges_table = tx.open_table(RANGES_TABLE)?; + + let raw_ranges = header_ranges_table + .iter()? + .map(|range_guard| { + let range = range_guard?.1.value(); + Ok((range.0, range.1)) + }) + .collect::>>()?; + + tx.delete_table(header_ranges_table)?; + ranges_table.insert(HEADER_RANGES_KEY, raw_ranges)?; + + // Migrated to v2 + schema_version_table.insert((), 2)?; + + Ok(()) +} + #[cfg(test)] pub mod tests { use crate::store::ExtendedHeaderGeneratorExt; diff --git a/node/src/store/utils.rs b/node/src/store/utils.rs index 3090c493..d50a670e 100644 --- a/node/src/store/utils.rs +++ b/node/src/store/utils.rs @@ -1,10 +1,12 @@ use std::ops::RangeInclusive; +#[cfg(any(test, feature = "test-utils"))] +use celestia_types::test_utils::ExtendedHeaderGenerator; use celestia_types::ExtendedHeader; +use crate::block_ranges::{BlockRange, BlockRangeExt}; use crate::executor::yield_now; -use crate::store::header_ranges::{HeaderRange, RangeLengthExt}; -use crate::store::{Result, StoreError}; +use crate::store::Result; pub(crate) const VALIDATIONS_PER_YIELD: usize = 4; @@ -15,7 +17,7 @@ pub(crate) fn calculate_range_to_fetch( store_headers: &[RangeInclusive], syncing_window_edge: Option, limit: u64, -) -> HeaderRange { +) -> BlockRange { let mut missing_range = get_most_recent_missing_range(head_height, store_headers); // truncate to syncing window, if height is known @@ -38,7 +40,7 @@ pub(crate) fn calculate_range_to_fetch( fn get_most_recent_missing_range( head_height: u64, store_headers: &[RangeInclusive], -) -> HeaderRange { +) -> BlockRange { let mut store_headers_iter = store_headers.iter().rev(); let Some(store_head_range) = store_headers_iter.next() else { @@ -57,67 +59,95 @@ fn get_most_recent_missing_range( penultimate_range_end + 1..=store_head_range.start().saturating_sub(1) } -pub(crate) fn try_consolidate_ranges( - left: &RangeInclusive, - right: &RangeInclusive, -) -> Option> { - debug_assert!(left.start() <= left.end()); - debug_assert!(right.start() <= right.end()); +/// Span of header that's been verified internally +#[derive(Clone)] +pub struct VerifiedExtendedHeaders(Vec); - if left.end() + 1 == *right.start() { - return Some(*left.start()..=*right.end()); +impl IntoIterator for VerifiedExtendedHeaders { + type Item = ExtendedHeader; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() } +} - if right.end() + 1 == *left.start() { - return Some(*right.start()..=*left.end()); +impl<'a> TryFrom<&'a [ExtendedHeader]> for VerifiedExtendedHeaders { + type Error = celestia_types::Error; + + fn try_from(value: &'a [ExtendedHeader]) -> Result { + value.to_vec().try_into() } +} - None +impl From for Vec { + fn from(value: VerifiedExtendedHeaders) -> Self { + value.0 + } } -pub(crate) fn ranges_intersection( - left: &RangeInclusive, - right: &RangeInclusive, -) -> Option> { - debug_assert!(left.start() <= left.end()); - debug_assert!(right.start() <= right.end()); +impl AsRef<[ExtendedHeader]> for VerifiedExtendedHeaders { + fn as_ref(&self) -> &[ExtendedHeader] { + &self.0 + } +} - if left.start() > right.end() || left.end() < right.start() { - return None; +/// 1-length hedaer span is internally verified, this is valid +impl From<[ExtendedHeader; 1]> for VerifiedExtendedHeaders { + fn from(value: [ExtendedHeader; 1]) -> Self { + Self(value.into()) } +} - match (left.start() >= right.start(), left.end() >= right.end()) { - (false, false) => Some(*right.start()..=*left.end()), - (false, true) => Some(right.clone()), - (true, false) => Some(left.clone()), - (true, true) => Some(*left.start()..=*right.end()), +impl From for VerifiedExtendedHeaders { + fn from(value: ExtendedHeader) -> Self { + Self(vec![value]) } } -#[derive(Debug, PartialEq)] -pub(crate) struct RangeScanResult { - /// index of the range that header is being inserted into - pub range_index: usize, - /// updated bounds of the range header is being inserted into - pub range: HeaderRange, - /// index of the range that should be removed from the table, if we're consolidating two - /// ranges. None otherwise. - pub range_to_remove: Option, +impl<'a> From<&'a ExtendedHeader> for VerifiedExtendedHeaders { + fn from(value: &ExtendedHeader) -> Self { + Self(vec![value.to_owned()]) + } } -#[allow(unused)] -pub(crate) fn verify_range_contiguous(headers: &[ExtendedHeader]) -> Result<()> { - let mut prev = None; - for h in headers { - let current_height = h.height().value(); - if let Some(prev_height) = prev { - if prev_height + 1 != current_height { - return Err(StoreError::InsertRangeWithGap(prev_height, current_height)); - } - } - prev = Some(current_height); +impl TryFrom> for VerifiedExtendedHeaders { + type Error = celestia_types::Error; + + fn try_from(headers: Vec) -> Result { + let Some(head) = headers.first() else { + return Ok(VerifiedExtendedHeaders(Vec::default())); + }; + + head.verify_adjacent_range(&headers[1..])?; + + Ok(Self(headers)) + } +} + +impl VerifiedExtendedHeaders { + /// Create a new instance out of pre-checked vec of headers + /// + /// # Safety + /// + /// This function may produce invalid `VerifiedExtendedHeaders`, if passed range is not + /// validated manually + pub unsafe fn new_unchecked(headers: Vec) -> Self { + Self(headers) + } +} + +/// Extends test header generator for easier insertion into the store +pub trait ExtendedHeaderGeneratorExt { + /// Generate next amount verified headers + fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders; +} + +#[cfg(any(test, feature = "test-utils"))] +impl ExtendedHeaderGeneratorExt for ExtendedHeaderGenerator { + fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders { + unsafe { VerifiedExtendedHeaders::new_unchecked(self.next_many(amount)) } } - Ok(()) } #[allow(unused)] @@ -221,24 +251,4 @@ mod tests { calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], Some(3900), 500); assert_eq!(fetch_range, 3901..=4000); } - - #[test] - fn intersection_non_overlapping() { - assert_eq!(ranges_intersection(&(1..=2), &(3..=4)), None); - assert_eq!(ranges_intersection(&(1..=2), &(6..=9)), None); - assert_eq!(ranges_intersection(&(3..=8), &(1..=2)), None); - assert_eq!(ranges_intersection(&(1..=2), &(4..=6)), None); - } - - #[test] - fn intersection_overlapping() { - assert_eq!(ranges_intersection(&(1..=2), &(2..=4)), Some(2..=2)); - assert_eq!(ranges_intersection(&(1..=2), &(2..=2)), Some(2..=2)); - assert_eq!(ranges_intersection(&(1..=5), &(2..=9)), Some(2..=5)); - assert_eq!(ranges_intersection(&(4..=6), &(1..=9)), Some(4..=6)); - assert_eq!(ranges_intersection(&(3..=7), &(5..=5)), Some(5..=5)); - assert_eq!(ranges_intersection(&(3..=7), &(5..=6)), Some(5..=6)); - assert_eq!(ranges_intersection(&(3..=5), &(3..=3)), Some(3..=3)); - assert_eq!(ranges_intersection(&(3..=5), &(1..=4)), Some(3..=4)); - } } diff --git a/node/src/syncer.rs b/node/src/syncer.rs index aa74a419..5f9b6879 100644 --- a/node/src/syncer.rs +++ b/node/src/syncer.rs @@ -25,10 +25,10 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, info, info_span, instrument, warn, Instrument}; use web_time::Instant; +use crate::block_ranges::{BlockRange, BlockRangeExt, BlockRanges}; use crate::events::{EventPublisher, NodeEvent}; use crate::executor::{sleep, spawn, spawn_cancellable, Interval}; use crate::p2p::{P2p, P2pError}; -use crate::store::header_ranges::{HeaderRanges, PrintableHeaderRange}; use crate::store::utils::calculate_range_to_fetch; use crate::store::{Store, StoreError}; use crate::utils::OneshotSenderExt; @@ -104,7 +104,7 @@ enum SyncerCmd { #[derive(Debug, Serialize, Deserialize)] pub struct SyncingInfo { /// Ranges of headers that are already synchronised - pub stored_headers: HeaderRanges, + pub stored_headers: BlockRanges, /// Syncing target. The latest height seen in the network that was successfully verified. pub subjective_head: u64, } @@ -186,7 +186,7 @@ where } struct Ongoing { - batch: PrintableHeaderRange, + batch: BlockRange, cancellation_token: CancellationToken, } @@ -315,7 +315,7 @@ where } if let Some(ongoing) = self.ongoing_batch.take() { - warn!("Cancelling fetching of {}", ongoing.batch); + warn!("Cancelling fetching of {}", ongoing.batch.display()); ongoing.cancellation_token.cancel(); } } @@ -341,7 +341,7 @@ where let ongoing_batch = self .ongoing_batch .as_ref() - .map(|ongoing| format!("{}", ongoing.batch)) + .map(|ongoing| format!("{}", ongoing.batch.display())) .unwrap_or_else(|| "None".to_string()); info!("syncing: head: {subjective_head}, stored headers: {stored_headers}, ongoing batches: {ongoing_batch}"); @@ -495,7 +495,7 @@ where let cancellation_token = self.cancellation_token.child_token(); self.ongoing_batch = Some(Ongoing { - batch: PrintableHeaderRange(next_batch.clone()), + batch: next_batch.clone(), cancellation_token: cancellation_token.clone(), }); @@ -520,8 +520,8 @@ where return; }; - let from_height = *ongoing.batch.0.start(); - let to_height = *ongoing.batch.0.end(); + let from_height = *ongoing.batch.start(); + let to_height = *ongoing.batch.end(); let headers = match res { Ok(headers) => headers,