Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement multimap #290

Merged
merged 6 commits into from
Mar 23, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions blockchain/chain_sync/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub enum Error {
/// Error originating from key-value store
KeyValueStore(String),
/// Error originating from the AMT
AMT(String),
Amt(String),
/// Error originating from state
State(String),
/// Error in validating arbitrary data
Expand All @@ -45,7 +45,7 @@ impl fmt::Display for Error {
Error::Encoding(msg) => write!(f, "{}", msg),
Error::InvalidCid(msg) => write!(f, "{}", msg),
Error::Store(msg) => write!(f, "{}", msg),
Error::AMT(msg) => write!(f, "{}", msg),
Error::Amt(msg) => write!(f, "{}", msg),
Error::State(msg) => write!(f, "{}", msg),
Error::Validation(msg) => write!(f, "{}", msg),
Error::Other(msg) => write!(f, "chain_sync error: {}", msg),
Expand Down Expand Up @@ -91,7 +91,7 @@ impl From<StoreErr> for Error {

impl From<AmtErr> for Error {
fn from(e: AmtErr) -> Error {
Error::AMT(e.to_string())
Error::Amt(e.to_string())
}
}

Expand Down
8 changes: 4 additions & 4 deletions blockchain/chain_sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::network_handler::NetworkHandler;
use super::peer_manager::PeerManager;
use super::{Error, SyncManager, SyncNetworkContext};
use address::Address;
use amt::AMT;
use amt::Amt;
use async_std::sync::{channel, Receiver, Sender};
use async_std::task;
use blocks::{Block, BlockHeader, FullTipset, TipSetKeys, Tipset, TxMeta};
Expand Down Expand Up @@ -215,9 +215,9 @@ where
// collect bls and secp cids
let bls_cids = cids_from_messages(block.bls_msgs())?;
let secp_cids = cids_from_messages(block.secp_msgs())?;
// generate AMT and batch set message values
let bls_root = AMT::new_from_slice(self.chain_store.blockstore(), &bls_cids)?;
let secp_root = AMT::new_from_slice(self.chain_store.blockstore(), &secp_cids)?;
// generate Amt and batch set message values
let bls_root = Amt::new_from_slice(self.chain_store.blockstore(), &bls_cids)?;
let secp_root = Amt::new_from_slice(self.chain_store.blockstore(), &secp_cids)?;

let meta = TxMeta {
bls_message_root: bls_root,
Expand Down
62 changes: 49 additions & 13 deletions ipld/amt/src/amt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use ipld_blockstore::BlockStore;
///
/// Usage:
/// ```
/// use ipld_amt::AMT;
/// use ipld_amt::Amt;
///
/// let db = db::MemoryDB::default();
/// let mut amt = AMT::new(&db);
/// let mut amt = Amt::new(&db);
///
/// // Insert or remove any serializable values
/// amt.set(2, "foo".to_owned()).unwrap();
Expand All @@ -26,30 +26,32 @@ use ipld_blockstore::BlockStore;
/// let cid = amt.flush().unwrap();
/// ```
#[derive(Debug)]
pub struct AMT<'db, DB, V>
where
DB: BlockStore,
V: Clone,
{
pub struct Amt<'db, V, BS> {
root: Root<V>,
block_store: &'db DB,
block_store: &'db BS,
}

impl<'a, V: PartialEq, BS: BlockStore> PartialEq for Amt<'a, V, BS> {
fn eq(&self, other: &Self) -> bool {
self.root == other.root
}
}

impl<'db, DB, V> AMT<'db, DB, V>
impl<'db, V, BS> Amt<'db, V, BS>
where
DB: BlockStore,
V: Clone + DeserializeOwned + Serialize,
BS: BlockStore,
{
/// Constructor for Root AMT node
pub fn new(block_store: &'db DB) -> Self {
pub fn new(block_store: &'db BS) -> Self {
Self {
root: Root::default(),
block_store,
}
}

/// Constructs an AMT with a blockstore and a Cid of the root of the AMT
pub fn load(block_store: &'db DB, cid: &Cid) -> Result<Self, Error> {
pub fn load(cid: &Cid, block_store: &'db BS) -> Result<Self, Error> {
// Load root bytes from database
let root: Root<V> = block_store
.get(cid)?
Expand All @@ -69,7 +71,7 @@ where
}

/// Generates an AMT with block store and array of cbor marshallable objects and returns Cid
pub fn new_from_slice(block_store: &'db DB, vals: &[V]) -> Result<Cid, Error> {
pub fn new_from_slice(block_store: &'db BS, vals: &[V]) -> Result<Cid, Error> {
let mut t = Self::new(block_store);

t.batch_set(vals)?;
Expand Down Expand Up @@ -188,4 +190,38 @@ where
self.root.node.flush(self.block_store)?;
Ok(self.block_store.put(&self.root, Blake2b256)?)
}

/// Iterates over each value in the Amt and runs a function on the values.
///
/// The index in the amt is a `u64` and the value is the generic parameter `V` as defined
/// in the Amt.
///
/// # Examples
///
/// ```
/// use ipld_amt::Amt;
///
/// let store = db::MemoryDB::default();
///
/// let mut map: Amt<String, _> = Amt::new(&store);
/// map.set(1, "One".to_owned()).unwrap();
/// map.set(4, "Four".to_owned()).unwrap();
///
/// let mut values: Vec<(u64, String)> = Vec::new();
/// map.for_each(&mut |i, v| {
/// values.push((i, v));
/// Ok(())
/// }).unwrap();
/// assert_eq!(&values, &[(1, "One".to_owned()), (4, "Four".to_owned())]);
/// ```
#[inline]
pub fn for_each<F>(&self, f: &mut F) -> Result<(), String>
where
V: DeserializeOwned,
F: FnMut(u64, V) -> Result<(), String>,
{
self.root
.node
.for_each(self.block_store, self.height(), 0, f)
}
}
12 changes: 9 additions & 3 deletions ipld/amt/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub enum Error {
/// Error when trying to serialize an AMT without a flushed cache
Cached,
/// Custom AMT error
Custom(String),
Custom(&'static str),
}

impl fmt::Display for Error {
Expand All @@ -32,13 +32,19 @@ impl fmt::Display for Error {
Error::Db(msg) => write!(f, "{}", msg),
Error::Cached => write!(
f,
"Tried to serialize without saving cache, run flush() on AMT before serializing"
"Tried to serialize without saving cache, run flush() on Amt before serializing"
),
Error::Custom(msg) => write!(f, "Custom AMT error: {}", msg),
Error::Custom(msg) => write!(f, "Amt error: {}", msg),
}
}
}

impl From<Error> for String {
fn from(e: Error) -> Self {
e.to_string()
}
}

impl From<DBError> for Error {
fn from(e: DBError) -> Error {
Error::Db(e.to_string())
Expand Down
2 changes: 1 addition & 1 deletion ipld/amt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod error;
mod node;
mod root;

pub use self::amt::AMT;
pub use self::amt::Amt;
pub use self::bitmap::BitMap;
pub use self::error::Error;
pub(crate) use self::node::Node;
Expand Down
53 changes: 48 additions & 5 deletions ipld/amt/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ where
if bmap.get_bit(i as u64) {
let value = v_iter
.next()
.ok_or_else(|| Error::Custom("Vector length does not match bitmap".to_owned()))?;
.ok_or_else(|| Error::Custom("Vector length does not match bitmap"))?;
*e = Some(<T>::from(value.clone()));
}
}
Expand Down Expand Up @@ -173,7 +173,7 @@ where
self.bitmap().is_empty()
}

/// Gets value at given index of AMT given height
/// Gets value at given index of Amt given height
pub(super) fn get<DB: BlockStore>(
&self,
bs: &DB,
Expand Down Expand Up @@ -253,7 +253,7 @@ where
unreachable!("Value is set as cached")
}
} else {
unreachable!("Non zero height in AMT is always Links type")
unreachable!("Non zero height in Amt is always Links type")
}
}

Expand All @@ -270,7 +270,7 @@ where
}
}

/// Delete value in AMT by index
/// Delete value in Amt by index
pub(super) fn delete<DB: BlockStore>(
&mut self,
bs: &DB,
Expand All @@ -280,7 +280,7 @@ where
let sub_i = i / nodes_for_height(height);

if !self.bitmap().get_bit(sub_i) {
// Value does not exist in AMT
// Value does not exist in Amt
return Ok(false);
}

Expand Down Expand Up @@ -322,6 +322,49 @@ where
}
}
}

pub(super) fn for_each<S, F>(
&self,
store: &S,
height: u32,
offset: u64,
f: &mut F,
) -> Result<(), String>
where
F: FnMut(u64, V) -> Result<(), String>,
S: BlockStore,
{
match self {
Node::Leaf { bmap, vals } => {
for (i, v) in vals.iter().enumerate() {
if bmap.get_bit(i as u64) {
f(
offset + i as u64,
v.clone().expect("set bit should contain value"),
)?;
}
}
}
Node::Link { bmap, links } => {
for (i, l) in links.iter().enumerate() {
if bmap.get_bit(i as u64) {
let offs = offset + (i as u64 * nodes_for_height(height));
match l.as_ref().expect("bit set at index") {
Link::Cached(sub) => sub.for_each(store, height - 1, offs, f)?,
Link::Cid(cid) => {
let node = store.get::<Node<V>>(cid)?.ok_or_else(|| {
Error::Cid("Cid did not match any in database".to_owned())
})?;

node.for_each(store, height - 1, offs, f)?;
}
}
}
}
}
}
Ok(())
}
}

#[cfg(test)]
Expand Down
Loading