Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introduce RollDB for chain persistence #8

Merged
merged 1 commit into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ serde_json = "1.0.79"
minicbor = "0.14.1"
prometheus_exporter = { version = "0.8.4", default-features = false }
gasket = { path = "../../construkts/gasket-rs" }
# gasket = { git = "https://github.com/construkts/gasket-rs.git" }
thiserror = "1.0.30"
lazy_static = "1.4.0"
rayon = "1.5.3"
rocksdb = "0.19.0"
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
bincode = "1.3.3"

[dev-dependencies]
tempfile = "3.3.0"
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod crosscut;
pub mod downstream;
pub mod model;
pub mod prelude;
pub mod rolldb;
pub mod storage;
pub mod upstream;

Expand Down
60 changes: 60 additions & 0 deletions src/rolldb/blocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use pallas::crypto::hash::Hash;
use rocksdb::{ColumnFamilyRef, WriteBatch, DB};

#[derive(Debug)]
struct Key(Hash<32>);

impl TryFrom<Box<[u8]>> for Key {
type Error = super::Error;

fn try_from(value: Box<[u8]>) -> Result<Self, Self::Error> {
let inner: [u8; 32] = value[0..32].try_into().map_err(|_| super::Error::Serde)?;
let inner = Hash::<32>::from(inner);
Ok(Self(inner))
}
}

impl From<Key> for Box<[u8]> {
fn from(v: Key) -> Self {
v.0.as_slice().into()
}
}

type Value = Box<[u8]>;

#[derive(Debug)]
pub struct Entry(Key, Value);

impl TryFrom<super::RawKV> for Entry {
type Error = super::Error;

fn try_from((k, v): super::RawKV) -> Result<Self, super::Error> {
let k = k.try_into()?;

Ok(Entry(k, v))
}
}
pub fn stage_upsert(
db: &DB,
hash: super::BlockHash,
body: super::BlockBody,
batch: &mut WriteBatch,
) -> Result<(), super::Error> {
let cf = blocks_cf(db);

batch.put_cf(cf, hash, body);

Ok(())
}

pub const CF_NAME: &str = "blocks";

pub fn blocks_cf(db: &DB) -> ColumnFamilyRef {
db.cf_handle(CF_NAME).unwrap()
}

pub fn get_body(db: &DB, hash: Hash<32>) -> Result<Option<super::BlockBody>, super::Error> {
let cf = blocks_cf(db);
let key = Box::<[u8]>::from(Key(hash));
db.get_cf(cf, key).map_err(|_| super::Error::IO)
}
144 changes: 144 additions & 0 deletions src/rolldb/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
mod blocks;
mod wal;
use pallas::crypto::hash::Hash;
use std::path::Path;
use thiserror::Error;

use rocksdb::{Options, WriteBatch, DB};

#[derive(Error, Debug)]
pub enum Error {
#[error("IO error")]
IO,

#[error("serde error")]
Serde,
}

const CHAIN_CF: &str = "chain";

type BlockSlot = u64;
type BlockHash = Hash<32>;
type BlockBody = Vec<u8>;

type RawKV = (Box<[u8]>, Box<[u8]>);

pub struct RollDB {
db: DB,
wal_seq: u64,
}

impl RollDB {
pub fn open(path: impl AsRef<Path>) -> Result<Self, Error> {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);

let db = DB::open_cf(&opts, path, [blocks::CF_NAME, CHAIN_CF, wal::CF_NAME])
.map_err(|_| Error::IO)?;

let wal_seq = wal::find_lastest_seq(&db)?.into();

Ok(Self { db, wal_seq })
}

pub fn get_block(&mut self, hash: Hash<32>) -> Result<Option<BlockBody>, Error> {
blocks::get_body(&self.db, hash)
}

pub fn roll_forward(
&mut self,
slot: BlockSlot,
hash: BlockHash,
body: BlockBody,
) -> Result<(), Error> {
let mut batch = WriteBatch::default();

// keep track of the new block body
blocks::stage_upsert(&self.db, hash, body, &mut batch)?;

// advance the WAL to the new point
let new_seq = wal::stage_roll_forward(&self.db, self.wal_seq, slot, hash, &mut batch)?;

self.db.write(batch).map_err(|_| Error::IO)?;
self.wal_seq = new_seq;

Ok(())
}

pub fn roll_back(&mut self, until: BlockSlot) -> Result<(), Error> {
let mut batch = WriteBatch::default();

let new_seq = wal::stage_roll_back(&self.db, self.wal_seq, until, &mut batch)?;

self.db.write(batch).map_err(|_| Error::IO)?;
self.wal_seq = new_seq;

Ok(())
}

pub fn find_tip(&self) -> Result<Option<(BlockSlot, BlockHash)>, Error> {
// TODO: tip might be either on chain or WAL, we need to query both
wal::find_tip(&self.db)
}

pub fn crawl_wal(&self) -> wal::CrawlIterator {
wal::crawl_forward(&self.db)
}

pub fn destroy(path: impl AsRef<Path>) -> Result<(), Error> {
DB::destroy(&Options::default(), path).map_err(|_| Error::IO)
}
}

#[cfg(test)]
mod tests {
use super::{BlockBody, BlockHash, BlockSlot, RollDB};

fn with_tmp_db(op: fn(db: RollDB) -> ()) {
let path = tempfile::tempdir().unwrap().into_path();
let db = RollDB::open(path.clone()).unwrap();

op(db);

RollDB::destroy(path).unwrap();
}

fn dummy_block(slot: u64) -> (BlockSlot, BlockHash, BlockBody) {
let hash = pallas::crypto::hash::Hasher::<256>::hash(slot.to_be_bytes().as_slice());
(slot, hash, slot.to_be_bytes().to_vec())
}

#[test]
fn test_roll_forward_blackbox() {
with_tmp_db(|mut db| {
let (slot, hash, body) = dummy_block(11);
db.roll_forward(slot, hash, body.clone()).unwrap();

let persisted = db.get_block(hash).unwrap().unwrap();
assert_eq!(persisted, body);

let (tip_slot, tip_hash) = db.find_tip().unwrap().unwrap();
assert_eq!(tip_slot, slot);
assert_eq!(tip_hash, hash);
});
}

#[test]
fn test_roll_back_blackbox() {
with_tmp_db(|mut db| {
for i in 0..5 {
let (slot, hash, body) = dummy_block(i * 10);
db.roll_forward(slot, hash, body).unwrap();
}

db.roll_back(20).unwrap();

let (tip_slot, _) = db.find_tip().unwrap().unwrap();
assert_eq!(tip_slot, 20);
});
}

//TODO: test rollback beyond K
//TODO: test rollback with unknown slot
}
Loading