Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement rollback buffer #49

Merged
merged 6 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 193 additions & 0 deletions pallas-miniprotocols/src/chainsync/buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
use std::collections::{vec_deque::Iter, VecDeque};

use crate::Point;

/// A memory buffer to handle chain rollbacks
///
/// This structure is intended to facilitate the process of managing rollbacks
/// in a chain sync process. The goal is to keep points in memory until they
/// reach a certain depth (# of confirmations). If a rollback happens, the
/// buffer will try to find the intersection, clear the orphaned points and keep
/// the remaining still in memory. Further forward rolls will accumulate from
/// the intersection.
///
/// It works by keeping a `VecDeque` data structure of points, where
/// roll-forward operations accumulate at the end of the deque and retrieving
/// confirmed points means to pop from the front of the deque.
///
/// Notice that it works by keeping track of points, not blocks. It is meant to
/// be used as a lightweight index where blocks can then be retrieved from a
/// more suitable memory structure / persistent storage.
#[derive(Debug)]
pub struct RollbackBuffer {
points: VecDeque<Point>,
}

impl Default for RollbackBuffer {
fn default() -> Self {
Self::new()
}
}

pub enum RollbackEffect {
Handled,
OutOfScope,
}

impl RollbackBuffer {
pub fn new() -> Self {
Self {
points: VecDeque::new(),
}
}

/// Adds a new point to the back of the buffer
pub fn roll_forward(&mut self, point: Point) {
self.points.push_back(point);
}

/// Retrieves all points above or equal a certain depth
pub fn pop_with_depth(&mut self, min_depth: usize) -> Vec<Point> {
match self.points.len().checked_sub(min_depth) {
Some(ready) => self.points.drain(0..ready).collect(),
None => vec![],
}
}

/// Find the position of a point within the buffer
pub fn position(&self, point: &Point) -> Option<usize> {
self.points.iter().position(|p| p.eq(point))
}

/// Iterates over the contents of the buffer
pub fn peek(&self) -> Iter<Point> {
self.points.iter()
}

/// Returns the size of the buffer (number of points)
pub fn size(&self) -> usize {
self.points.len()
}

/// Returns the newest point in the buffer
pub fn latest(&self) -> Option<&Point> {
self.points.back()
}

/// Returns the oldest point in the buffer
pub fn oldest(&self) -> Option<&Point> {
self.points.front()
}

/// Unwind the buffer up to a certain point, clearing orphaned items
///
/// If the buffer contains the rollback point, we can safely discard from
/// the back and return Ok. If the rollback point is outside the scope of
/// the buffer, we clear the whole buffer and notify a failure
/// in the rollback process.
pub fn roll_back(&mut self, point: &Point) -> RollbackEffect {
if let Some(x) = self.position(point) {
self.points.truncate(x + 1);
RollbackEffect::Handled
} else {
self.points.clear();
RollbackEffect::OutOfScope
}
}
}

#[cfg(test)]
mod tests {
use crate::{chainsync::RollbackEffect, Point};

use super::RollbackBuffer;

fn dummy_point(i: u64) -> Point {
Point::new(i, i.to_le_bytes().to_vec())
}

fn build_filled_buffer(n: usize) -> RollbackBuffer {
let mut buffer = RollbackBuffer::new();

for i in 0..n {
let point = dummy_point(i as u64);
buffer.roll_forward(point);
}

buffer
}

#[test]
fn roll_forward_accumulates_points() {
let buffer = build_filled_buffer(3);

assert!(matches!(buffer.position(&dummy_point(0)), Some(0)));
assert!(matches!(buffer.position(&dummy_point(1)), Some(1)));
assert!(matches!(buffer.position(&dummy_point(2)), Some(2)));

assert_eq!(buffer.oldest().unwrap(), &dummy_point(0));
assert_eq!(buffer.latest().unwrap(), &dummy_point(2));
}

#[test]
fn pop_from_valid_depth_works() {
let mut buffer = build_filled_buffer(5);

let ready = buffer.pop_with_depth(2);

assert_eq!(dummy_point(0), ready[0]);
assert_eq!(dummy_point(1), ready[1]);
assert_eq!(dummy_point(2), ready[2]);

assert_eq!(ready.len(), 3);

assert_eq!(buffer.oldest().unwrap(), &dummy_point(3));
assert_eq!(buffer.latest().unwrap(), &dummy_point(4));
}

#[test]
fn pop_from_excessive_depth_returns_empty() {
let mut buffer = build_filled_buffer(6);

let ready = buffer.pop_with_depth(10);

assert_eq!(ready.len(), 0);

assert_eq!(buffer.oldest().unwrap(), &dummy_point(0));
assert_eq!(buffer.latest().unwrap(), &dummy_point(5));
}

#[test]
fn roll_back_within_scope_works() {
let mut buffer = build_filled_buffer(6);

let result = buffer.roll_back(&dummy_point(2));

assert!(matches!(result, RollbackEffect::Handled));

assert_eq!(buffer.size(), 3);
assert_eq!(buffer.oldest().unwrap(), &dummy_point(0));
assert_eq!(buffer.latest().unwrap(), &dummy_point(2));

let remaining = buffer.pop_with_depth(0);

assert_eq!(dummy_point(0), remaining[0]);
assert_eq!(dummy_point(1), remaining[1]);
assert_eq!(dummy_point(2), remaining[2]);

assert_eq!(remaining.len(), 3);
}

#[test]
fn roll_back_outside_scope_works() {
let mut buffer = build_filled_buffer(6);

let result = buffer.roll_back(&dummy_point(100));

assert!(matches!(result, RollbackEffect::OutOfScope));

assert_eq!(buffer.size(), 0);
assert_eq!(buffer.oldest(), None);
assert_eq!(buffer.latest(), None);
}
}
2 changes: 2 additions & 0 deletions pallas-miniprotocols/src/chainsync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod buffer;
mod clients;
mod codec;
mod protocol;

pub use buffer::*;
pub use clients::*;
pub use codec::*;
pub use protocol::*;
2 changes: 1 addition & 1 deletion pallas-miniprotocols/src/chainsync/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{fmt::Debug, ops::Deref};

use crate::common::Point;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Tip(pub Point, pub u64);

#[derive(Debug, PartialEq, Clone)]
Expand Down
8 changes: 7 additions & 1 deletion pallas-miniprotocols/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub const TESTNET_MAGIC: u64 = 1097911063;
pub const MAINNET_MAGIC: u64 = 764824073;

/// A point within a chain
#[derive(Clone)]
#[derive(Clone, Eq, PartialEq)]
pub struct Point(pub u64, pub Vec<u8>);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the Vec<u8> here ? is it a block hash ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is the hash. This struct was there before the introduction of pallas-crypto. We should turn it into a Hash<>.


impl Debug for Point {
Expand All @@ -18,3 +18,9 @@ impl Debug for Point {
.finish()
}
}

impl Point {
pub fn new(slot: u64, hash: Vec<u8>) -> Self {
Point(slot, hash)
}
}