Skip to content

Commit

Permalink
ChainSync refactor (#693)
Browse files Browse the repository at this point in the history
* Switch chain store to threadsafe

* Update genesis to arc reference

* wip refactoring chain sync to workers in async tasks

* Update network event handling and remove NetworkHandler

* Update tipset scheduling logic

* Update peer retrieval to take a random sample of available peers

* Cleanup and enabling all existing tests

* fix worker task spawn

* Add TODO for emit event ignoring and change to error log

* oops

* Update comment

* Fix typo
  • Loading branch information
austinabell authored Sep 14, 2020
1 parent 66ca99e commit 3411459
Show file tree
Hide file tree
Showing 25 changed files with 1,319 additions and 1,069 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions blockchain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ beacon = { path = "../beacon" }
flo_stream = "0.4.0"
address = { package = "forest_address", path = "../../vm/address" }
lazy_static = "1.4"
async-std = "1.6.3"

[dev-dependencies]
multihash = "0.10.0"
Expand Down
57 changes: 34 additions & 23 deletions blockchain/chain/src/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use super::{Error, TipIndex, TipsetMetadata};
use actor::{power::State as PowerState, STORAGE_POWER_ACTOR_ADDR};
use address::Address;
use async_std::sync::RwLock;
use beacon::{BeaconEntry, IGNORE_DRAND_VAR};
use blake2b_simd::Params;
use blocks::{Block, BlockHeader, FullTipset, Tipset, TipsetKeys, TxMeta};
Expand Down Expand Up @@ -46,15 +47,17 @@ pub enum HeadChange {
Revert(Arc<Tipset>),
}

/// Generic implementation of the datastore trait and structures
/// Stores chain data such as heaviest tipset and cached tipset info at each epoch.
/// This structure is threadsafe, and all caches are wrapped in a mutex to allow a consistent
/// `ChainStore` to be shared across tasks.
pub struct ChainStore<DB> {
publisher: Publisher<HeadChange>,
publisher: RwLock<Publisher<HeadChange>>,

// key-value datastore
pub db: Arc<DB>,

// Tipset at the head of the best-known chain.
heaviest: Option<Arc<Tipset>>,
heaviest: RwLock<Option<Arc<Tipset>>>,

// tip_index tracks tipsets by epoch/parentset for use by expected consensus.
tip_index: TipIndex,
Expand All @@ -71,44 +74,50 @@ where
.map(Arc::new);
Self {
db,
publisher: Publisher::new(SINK_CAP),
publisher: Publisher::new(SINK_CAP).into(),
tip_index: TipIndex::new(),
heaviest,
heaviest: heaviest.into(),
}
}

/// Sets heaviest tipset within ChainStore and store its tipset cids under HEAD_KEY
pub async fn set_heaviest_tipset(&mut self, ts: Arc<Tipset>) -> Result<(), Error> {
pub async fn set_heaviest_tipset(&self, ts: Arc<Tipset>) -> Result<(), Error> {
self.db.write(HEAD_KEY, ts.key().marshal_cbor()?)?;
self.heaviest = Some(ts.clone());
self.publisher.publish(HeadChange::Current(ts)).await;
*self.heaviest.write().await = Some(ts.clone());
self.publisher
.write()
.await
.publish(HeadChange::Current(ts))
.await;
Ok(())
}

// subscribing returns a future sink that we can essentially iterate over using future streams
pub fn subscribe(&mut self) -> Subscriber<HeadChange> {
self.publisher.subscribe()
pub async fn subscribe(&self) -> Subscriber<HeadChange> {
self.publisher.write().await.subscribe()
}

/// Sets tip_index tracker
pub fn set_tipset_tracker(&mut self, header: &BlockHeader) -> Result<(), Error> {
let ts: Tipset = Tipset::new(vec![header.clone()])?;
// TODO this is really broken, should not be setting the tipset metadata to a tipset with just
// the one header.
pub async fn set_tipset_tracker(&self, header: &BlockHeader) -> Result<(), Error> {
let ts = Arc::new(Tipset::new(vec![header.clone()])?);
let meta = TipsetMetadata {
tipset_state_root: header.state_root().clone(),
tipset_receipts_root: header.message_receipts().clone(),
tipset: ts,
};
self.tip_index.put(&meta);
self.tip_index.put(&meta).await;
Ok(())
}

/// Writes genesis to blockstore
pub fn set_genesis(&self, header: BlockHeader) -> Result<Cid, Error> {
pub fn set_genesis(&self, header: &BlockHeader) -> Result<Cid, Error> {
set_genesis(self.blockstore(), header)
}

/// Writes tipset block headers to data store and updates heaviest tipset
pub async fn put_tipset(&mut self, ts: &Tipset) -> Result<(), Error> {
pub async fn put_tipset(&self, ts: &Tipset) -> Result<(), Error> {
persist_objects(self.blockstore(), ts.blocks())?;
// TODO determine if expanded tipset is required; see https://github.com/filecoin-project/lotus/blob/testnet/3/chain/store/store.go#L236
self.update_heaviest(ts).await?;
Expand All @@ -121,16 +130,18 @@ where
}

/// Loads heaviest tipset from datastore and sets as heaviest in chainstore
pub async fn load_heaviest_tipset(&mut self) -> Result<(), Error> {
pub async fn load_heaviest_tipset(&self) -> Result<(), Error> {
let heaviest_ts = get_heaviest_tipset(self.blockstore())?.ok_or_else(|| {
warn!("No previous chain state found");
Error::Other("No chain state found".to_owned())
})?;

// set as heaviest tipset
let heaviest_ts = Arc::new(heaviest_ts);
self.heaviest = Some(heaviest_ts.clone());
*self.heaviest.write().await = Some(heaviest_ts.clone());
self.publisher
.write()
.await
.publish(HeadChange::Current(heaviest_ts))
.await;
Ok(())
Expand All @@ -142,8 +153,8 @@ where
}

/// Returns heaviest tipset from blockstore
pub fn heaviest_tipset(&self) -> Option<Arc<Tipset>> {
self.heaviest.clone()
pub async fn heaviest_tipset(&self) -> Option<Arc<Tipset>> {
self.heaviest.read().await.clone()
}

/// Returns key-value store instance
Expand Down Expand Up @@ -183,8 +194,8 @@ where
}

/// Determines if provided tipset is heavier than existing known heaviest tipset
async fn update_heaviest(&mut self, ts: &Tipset) -> Result<(), Error> {
match &self.heaviest {
async fn update_heaviest(&self, ts: &Tipset) -> Result<(), Error> {
match self.heaviest.read().await.as_ref() {
Some(heaviest) => {
let new_weight = weight(self.blockstore(), ts)?;
let curr_weight = weight(self.blockstore(), &heaviest)?;
Expand Down Expand Up @@ -266,7 +277,7 @@ where
}
}

fn set_genesis<DB>(db: &DB, header: BlockHeader) -> Result<Cid, Error>
fn set_genesis<DB>(db: &DB, header: &BlockHeader) -> Result<Cid, Error>
where
DB: BlockStore,
{
Expand Down Expand Up @@ -657,7 +668,7 @@ mod tests {
.unwrap();

assert_eq!(cs.genesis().unwrap(), None);
cs.set_genesis(gen_block.clone()).unwrap();
cs.set_genesis(&gen_block).unwrap();
assert_eq!(cs.genesis().unwrap(), Some(gen_block));
}
}
45 changes: 32 additions & 13 deletions blockchain/chain/src/store/tip_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use super::errors::Error;
use async_std::sync::RwLock;
use blocks::{Tipset, TipsetKeys};
use cid::Cid;
use clock::ChainEpoch;
use std::collections::hash_map::{DefaultHasher, HashMap};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

/// TipsetMetadata is the type stored as the value in the TipIndex hashmap. It contains
/// a tipset pointing to blocks, the root cid of the chain's state after
/// applying the messages in this tipset to it's parent state, and the cid of the receipts
Expand All @@ -20,7 +23,8 @@ pub struct TipsetMetadata {
pub tipset_receipts_root: Cid,

/// The actual Tipset
pub tipset: Tipset,
// TODO This should not be keeping a tipset with the metadata
pub tipset: Arc<Tipset>,
}

/// Trait to allow metadata to be indexed by multiple types of structs
Expand All @@ -39,47 +43,62 @@ impl Index for TipsetKeys {}
pub struct TipIndex {
// metadata allows lookup of recorded Tipsets and their state roots
// by TipsetKey and Epoch
metadata: HashMap<u64, TipsetMetadata>,
// TODO this should be mapping epoch to a vector of Cids of block headers
metadata: RwLock<HashMap<u64, TipsetMetadata>>,
}

impl TipIndex {
/// Creates new TipIndex with empty metadata
pub fn new() -> Self {
Self {
metadata: HashMap::new(),
metadata: Default::default(),
}
}
/// Adds an entry to TipIndex's metadata
/// After this call the input TipsetMetadata can be looked up by the TipsetKey of
/// the tipset, or the tipset's epoch
pub fn put(&mut self, meta: &TipsetMetadata) {
pub async fn put(&self, meta: &TipsetMetadata) {
// retrieve parent cids to be used as hash map key
let parent_key = meta.tipset.parents();
// retrieve epoch to be used as hash map key
let epoch_key = meta.tipset.epoch();
// insert value by parent_key into hash map
self.metadata.insert(parent_key.hash_key(), meta.clone());
self.metadata
.write()
.await
.insert(parent_key.hash_key(), meta.clone());
// insert value by epoch_key into hash map
self.metadata.insert(epoch_key.hash_key(), meta.clone());
self.metadata
.write()
.await
.insert(epoch_key.hash_key(), meta.clone());
}
/// Returns the tipset given by hashed key
fn get(&self, key: u64) -> Result<TipsetMetadata, Error> {
async fn get(&self, key: u64) -> Result<TipsetMetadata, Error> {
self.metadata
.read()
.await
.get(&key)
.cloned()
.ok_or_else(|| Error::UndefinedKey("invalid metadata key".to_string()))
}

/// Returns the tipset corresponding to the hashed index
pub fn get_tipset<I: Index>(&self, idx: &I) -> Result<Tipset, Error> {
Ok(self.get(idx.hash_key()).map(|r| r.tipset)?)
pub async fn get_tipset<I: Index>(&self, idx: &I) -> Result<Arc<Tipset>, Error> {
Ok(self.get(idx.hash_key()).await.map(|r| r.tipset)?)
}
/// Returns the state root for the tipset corresponding to the index
pub fn get_tipset_state_root<I: Index>(&self, idx: &I) -> Result<Cid, Error> {
Ok(self.get(idx.hash_key()).map(|r| r.tipset_state_root)?)
pub async fn get_tipset_state_root<I: Index>(&self, idx: &I) -> Result<Cid, Error> {
Ok(self
.get(idx.hash_key())
.await
.map(|r| r.tipset_state_root)?)
}
/// Returns the receipt root for the tipset corresponding to the index
pub fn get_tipset_receipts_root<I: Index>(&self, idx: &I) -> Result<Cid, Error> {
Ok(self.get(idx.hash_key()).map(|r| r.tipset_receipts_root)?)
pub async fn get_tipset_receipts_root<I: Index>(&self, idx: &I) -> Result<Cid, Error> {
Ok(self
.get(idx.hash_key())
.await
.map(|r| r.tipset_receipts_root)?)
}
}
2 changes: 2 additions & 0 deletions blockchain/chain_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ log = "0.4.8"
async-std = { version = "1.6.0", features = ["unstable"] }
forest_libp2p = { path = "../../node/forest_libp2p" }
futures = "0.3.5"
futures-util = "0.3.5"
lru = "0.6"
thiserror = "1.0"
num-traits = "0.2"
Expand All @@ -34,6 +35,7 @@ commcid = { path = "../../utils/commcid" }
clock = { path = "../../node/clock" }
serde = { version = "1.0", features = ["derive", "rc"] }
flo_stream = "0.4.0"
rand = "0.7.3"

[dev-dependencies]
test_utils = { version = "0.1.0", path = "../../utils/test_utils/", features = ["test_constructors"] }
Expand Down
23 changes: 11 additions & 12 deletions blockchain/chain_sync/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ impl SyncBucket {
}
/// Returns true if tipset is from same chain
pub fn is_same_chain_as(&self, ts: &Tipset) -> bool {
// TODO Confirm that comparing keys will be sufficient on full tipset impl
self.tips
.iter()
.any(|t| ts.key() == t.key() || ts.key() == t.parents() || ts.parents() == t.key())
Expand All @@ -37,10 +36,6 @@ impl SyncBucket {
self.tips.push(ts);
}
}
/// Returns true if SyncBucket is empty
pub fn is_empty(&self) -> bool {
self.tips.is_empty()
}
}

/// Set of tipset buckets
Expand Down Expand Up @@ -75,13 +70,17 @@ impl SyncBucketSet {

Some(self.buckets.swap_remove(i))
}
/// Returns heaviest tipset from bucket set
pub(crate) fn heaviest(&self) -> Option<Arc<Tipset>> {
self.buckets
.iter()
.filter_map(SyncBucket::heaviest_tipset)
.max_by(|ts1, ts2| ts1.weight().cmp(ts2.weight()))

/// Returns true if tipset is related to any tipset in the bucket set.
pub(crate) fn related_to_any(&self, ts: &Tipset) -> bool {
for b in self.buckets.iter() {
if b.is_same_chain_as(ts) {
return true;
}
}
false
}

/// Returns a vector of SyncBuckets
pub(crate) fn buckets(&self) -> &[SyncBucket] {
&self.buckets
Expand Down Expand Up @@ -147,7 +146,7 @@ mod tests {
assert_eq!(
set.buckets.len(),
2,
"Inserting seperate tipset should create new bucket"
"Inserting separate tipset should create new bucket"
);
assert_eq!(set.buckets[1].tips.len(), 1);

Expand Down
4 changes: 3 additions & 1 deletion blockchain/chain_sync/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

#![recursion_limit = "1024"]

mod bad_block_cache;
mod bucket;
mod errors;
mod network_context;
mod network_handler;
mod peer_manager;
mod sync;
mod sync_state;
mod sync_worker;

// workaround for a compiler bug, see https://github.com/rust-lang/rust/issues/55779
extern crate serde;
Expand Down
Loading

0 comments on commit 3411459

Please sign in to comment.