From 2b48ae3403626c1042882c9779cb707810c176a2 Mon Sep 17 00:00:00 2001 From: austinabell Date: Fri, 21 Feb 2020 08:52:30 -0500 Subject: [PATCH 01/21] Remove chain sync (old) --- Cargo.toml | 1 - Makefile | 1 - blockchain/chain_sync/Cargo.toml | 9 ----- blockchain/chain_sync/src/block_msg.rs | 42 --------------------- blockchain/chain_sync/src/block_proposer.rs | 12 ------ blockchain/chain_sync/src/lib.rs | 7 ---- 6 files changed, 72 deletions(-) delete mode 100644 blockchain/chain_sync/Cargo.toml delete mode 100644 blockchain/chain_sync/src/block_msg.rs delete mode 100644 blockchain/chain_sync/src/block_proposer.rs delete mode 100644 blockchain/chain_sync/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 21ad46eb96fd..5dd9d9f82720 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,6 @@ members = [ "blockchain", "blockchain/blocks", "blockchain/chain", - "blockchain/chain_sync", "blockchain/sync_manager", "blockchain/state_manager", "vm", diff --git a/Makefile b/Makefile index f23c83f501c6..2d55ca96f9a0 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,6 @@ clean: @cargo clean -p forest_libp2p @cargo clean -p blockchain @cargo clean -p forest_blocks - @cargo clean -p chain_sync @cargo clean -p sync_manager @cargo clean -p vm @cargo clean -p forest_address diff --git a/blockchain/chain_sync/Cargo.toml b/blockchain/chain_sync/Cargo.toml deleted file mode 100644 index 00d89f9be41e..000000000000 --- a/blockchain/chain_sync/Cargo.toml +++ /dev/null @@ -1,9 +0,0 @@ -[package] -name = "chain_sync" -version = "0.1.0" -authors = ["ChainSafe Systems "] -edition = "2018" - -[dependencies] -libp2p = "0.15.0" -blocks = { package = "forest_blocks", path = "../blocks" } diff --git a/blockchain/chain_sync/src/block_msg.rs b/blockchain/chain_sync/src/block_msg.rs deleted file mode 100644 index 99b64491f66d..000000000000 --- a/blockchain/chain_sync/src/block_msg.rs +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2020 ChainSafe Systems -// SPDX-License-Identifier: Apache-2.0, MIT - -use blocks::TipSetKeys; -use libp2p::PeerId; -use std::fmt; - -/// A container used to decode pubsub messages into prior to validation and propagation. -/// See https://github.com/filecoin-project/go-filecoin/blob/master/internal/pkg/block/chain_info.go for reference -pub struct BlockMsg { - // the originator of the TipSetKey propagation wave - _source: PeerId, - // the peer that sent us the TipSetKey message - _sender: PeerId, - // proposed canonical tipset keys - _head: TipSetKeys, - // proposed chain height - _height: u64, -} - -impl BlockMsg { - /// Creates a BlockMsg container - fn _new(_source: PeerId, _sender: PeerId, _head: TipSetKeys, _height: u64) -> Self { - Self { - _source, - _sender, - _head, - _height, - } - } -} - -impl fmt::Display for BlockMsg { - /// Human-readable string representation of a block msg - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "{} {} {:?} {}", - self._source, self._sender, self._head, self._height - ) - } -} diff --git a/blockchain/chain_sync/src/block_proposer.rs b/blockchain/chain_sync/src/block_proposer.rs deleted file mode 100644 index a292f1d6afa0..000000000000 --- a/blockchain/chain_sync/src/block_proposer.rs +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright 2020 ChainSafe Systems -// SPDX-License-Identifier: Apache-2.0, MIT - -use super::BlockMsg; -use std::io; - -/// Used by callers to propose new blocks for inclusion in the chain -pub trait BlockProposer { - fn send_hello(&self, bm: BlockMsg) -> Result<(), io::Error>; - fn send_own_block(&self, bm: BlockMsg) -> Result<(), io::Error>; - fn send_gossip_block(&self, bm: BlockMsg) -> Result<(), io::Error>; -} diff --git a/blockchain/chain_sync/src/lib.rs b/blockchain/chain_sync/src/lib.rs deleted file mode 100644 index 86de12ce7bd3..000000000000 --- a/blockchain/chain_sync/src/lib.rs +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright 2020 ChainSafe Systems -// SPDX-License-Identifier: Apache-2.0, MIT - -mod block_msg; -mod block_proposer; - -pub use block_msg::*; From 240ae865bde8900a0bf23a372a91dc94d4f56c1a Mon Sep 17 00:00:00 2001 From: austinabell Date: Fri, 21 Feb 2020 09:00:17 -0500 Subject: [PATCH 02/21] rename sync_manager -> chain_sync --- Cargo.toml | 2 +- Makefile | 2 +- blockchain/{sync_manager => chain_sync}/Cargo.toml | 2 +- blockchain/{sync_manager => chain_sync}/src/bucket.rs | 0 blockchain/{sync_manager => chain_sync}/src/errors.rs | 0 blockchain/{sync_manager => chain_sync}/src/lib.rs | 0 blockchain/{sync_manager => chain_sync}/src/manager.rs | 0 blockchain/{sync_manager => chain_sync}/src/sync.rs | 0 blockchain/{sync_manager => chain_sync}/tests/manager_test.rs | 2 +- 9 files changed, 4 insertions(+), 4 deletions(-) rename blockchain/{sync_manager => chain_sync}/Cargo.toml (97%) rename blockchain/{sync_manager => chain_sync}/src/bucket.rs (100%) rename blockchain/{sync_manager => chain_sync}/src/errors.rs (100%) rename blockchain/{sync_manager => chain_sync}/src/lib.rs (100%) rename blockchain/{sync_manager => chain_sync}/src/manager.rs (100%) rename blockchain/{sync_manager => chain_sync}/src/sync.rs (100%) rename blockchain/{sync_manager => chain_sync}/tests/manager_test.rs (98%) diff --git a/Cargo.toml b/Cargo.toml index 5dd9d9f82720..584f50119a94 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,8 +4,8 @@ members = [ "blockchain", "blockchain/blocks", "blockchain/chain", - "blockchain/sync_manager", "blockchain/state_manager", + "blockchain/chain_sync", "vm", "vm/actor", "vm/address", diff --git a/Makefile b/Makefile index 2d55ca96f9a0..a4b6290837f8 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ clean: @cargo clean -p forest_libp2p @cargo clean -p blockchain @cargo clean -p forest_blocks - @cargo clean -p sync_manager + @cargo clean -p chain_sync @cargo clean -p vm @cargo clean -p forest_address @cargo clean -p actor diff --git a/blockchain/sync_manager/Cargo.toml b/blockchain/chain_sync/Cargo.toml similarity index 97% rename from blockchain/sync_manager/Cargo.toml rename to blockchain/chain_sync/Cargo.toml index ea69751a33ce..e7f3fe4f8d88 100644 --- a/blockchain/sync_manager/Cargo.toml +++ b/blockchain/chain_sync/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "sync_manager" +name = "chain_sync" version = "0.1.0" authors = ["ChainSafe Systems "] edition = "2018" diff --git a/blockchain/sync_manager/src/bucket.rs b/blockchain/chain_sync/src/bucket.rs similarity index 100% rename from blockchain/sync_manager/src/bucket.rs rename to blockchain/chain_sync/src/bucket.rs diff --git a/blockchain/sync_manager/src/errors.rs b/blockchain/chain_sync/src/errors.rs similarity index 100% rename from blockchain/sync_manager/src/errors.rs rename to blockchain/chain_sync/src/errors.rs diff --git a/blockchain/sync_manager/src/lib.rs b/blockchain/chain_sync/src/lib.rs similarity index 100% rename from blockchain/sync_manager/src/lib.rs rename to blockchain/chain_sync/src/lib.rs diff --git a/blockchain/sync_manager/src/manager.rs b/blockchain/chain_sync/src/manager.rs similarity index 100% rename from blockchain/sync_manager/src/manager.rs rename to blockchain/chain_sync/src/manager.rs diff --git a/blockchain/sync_manager/src/sync.rs b/blockchain/chain_sync/src/sync.rs similarity index 100% rename from blockchain/sync_manager/src/sync.rs rename to blockchain/chain_sync/src/sync.rs diff --git a/blockchain/sync_manager/tests/manager_test.rs b/blockchain/chain_sync/tests/manager_test.rs similarity index 98% rename from blockchain/sync_manager/tests/manager_test.rs rename to blockchain/chain_sync/tests/manager_test.rs index 862426e2e42a..4b7873b37899 100644 --- a/blockchain/sync_manager/tests/manager_test.rs +++ b/blockchain/chain_sync/tests/manager_test.rs @@ -2,10 +2,10 @@ // SPDX-License-Identifier: Apache-2.0, MIT use blocks::{BlockHeader, Tipset}; +use chain_sync::SyncManager; use cid::{multihash::Hash::Blake2b256, Cid}; use num_bigint::BigUint; use std::rc::Rc; -use sync_manager::SyncManager; fn create_header(weight: u64, parent_bz: &[u8], cached_bytes: &[u8]) -> BlockHeader { let header = BlockHeader::builder() From 367ea00227dd7c1eb3d65c87f62a326e4db7b005 Mon Sep 17 00:00:00 2001 From: austinabell Date: Fri, 21 Feb 2020 10:29:42 -0500 Subject: [PATCH 03/21] Refactor types and update state_manager from cherry pick --- blockchain/chain/Cargo.toml | 1 + blockchain/chain/src/store/chain_store.rs | 32 ++++++++++++++--------- blockchain/chain_sync/Cargo.toml | 2 +- blockchain/chain_sync/src/sync.rs | 28 ++++++++++---------- blockchain/state_manager/src/lib.rs | 20 +++++++------- ipld/amt/Cargo.toml | 2 +- ipld/amt/src/amt.rs | 2 +- ipld/amt/src/node.rs | 2 +- ipld/amt/tests/amt_tests.rs | 2 +- vm/interpreter/Cargo.toml | 1 + vm/interpreter/src/lib.rs | 11 +++++--- 11 files changed, 58 insertions(+), 45 deletions(-) diff --git a/blockchain/chain/Cargo.toml b/blockchain/chain/Cargo.toml index fde7dfe19615..dd3e61d7a51b 100644 --- a/blockchain/chain/Cargo.toml +++ b/blockchain/chain/Cargo.toml @@ -14,6 +14,7 @@ encoding = { package = "forest_encoding", path = "../../encoding" } serde = { version = "1.0", features = ["derive"] } num-bigint = { path = "../../math/bigint", package = "forest_bigint" } message = { package = "forest_message", path = "../../vm/message" } +ipld_blockstore = { path = "../../ipld/blockstore" } [dev-dependencies] address = { package = "forest_address", path = "../../vm/address" } diff --git a/blockchain/chain/src/store/chain_store.rs b/blockchain/chain/src/store/chain_store.rs index 2e1711ef8ca4..1f90979e7a62 100644 --- a/blockchain/chain/src/store/chain_store.rs +++ b/blockchain/chain/src/store/chain_store.rs @@ -4,38 +4,37 @@ use super::{Error, TipIndex, TipSetMetadata}; use blocks::{BlockHeader, Tipset}; use cid::Cid; -use db::{Error as DbError, Read, RocksDb as Blockstore, Write}; +use db::Error as DbError; use encoding::{de::DeserializeOwned, from_slice, Cbor}; +use ipld_blockstore::BlockStore; use message::{SignedMessage, UnsignedMessage}; use num_bigint::BigUint; -use std::path::Path; /// Generic implementation of the datastore trait and structures -pub struct ChainStore<'a> { +pub struct ChainStore<'db, DB> { // TODO add IPLD Store // TODO add StateTreeLoader // TODO add a pubsub channel that publishes an event every time the head changes. // key-value datastore - db: Blockstore, + db: &'db DB, // CID of the genesis block. genesis: Cid, // Tipset at the head of the best-known chain. - heaviest: &'a Tipset, + heaviest: Tipset, // tip_index tracks tipsets by epoch/parentset for use by expected consensus. tip_index: TipIndex, } -impl<'a> ChainStore<'a> { +impl<'db, DB> ChainStore<'db, DB> +where + DB: BlockStore, +{ /// constructor - pub fn new(path: &Path, gen: Cid, heaviest: &'a Tipset) -> Result { - let mut db = Blockstore::new(path.to_path_buf()); - // initialize key-value store - db.open()?; - + pub fn new(db: &'db DB, gen: Cid, heaviest: Tipset) -> Result { Ok(Self { db, tip_index: TipIndex::new(), @@ -43,6 +42,7 @@ impl<'a> ChainStore<'a> { heaviest, }) } + /// Sets tip_index tracker pub fn set_tipset_tracker(&mut self, header: &BlockHeader) -> Result<(), Error> { let ts: Tipset = Tipset::new(vec![header.clone()])?; @@ -58,11 +58,13 @@ impl<'a> ChainStore<'a> { // TODO Ok(BigUint::from(0 as u32)) } + /// Writes genesis to blockstore pub fn set_genesis(&self, header: BlockHeader) -> Result<(), DbError> { let ts: Tipset = Tipset::new(vec![header])?; Ok(self.persist_headers(&ts)?) } + /// Writes encoded blockheader data to blockstore pub fn persist_headers(&self, tip: &Tipset) -> Result<(), DbError> { let mut raw_header_data = Vec::new(); @@ -76,6 +78,7 @@ impl<'a> ChainStore<'a> { } Ok(self.db.bulk_write(&keys, &raw_header_data)?) } + /// Writes encoded message data to blockstore pub fn put_messages(&self, msgs: &[T]) -> Result<(), Error> { for m in msgs { @@ -88,6 +91,7 @@ impl<'a> ChainStore<'a> { } Ok(()) } + /// Returns genesis blockheader from blockstore pub fn genesis(&self) -> Result { let bz = self.db.read(self.genesis.key())?; @@ -98,14 +102,17 @@ impl<'a> ChainStore<'a> { Some(ref x) => from_slice(&x)?, } } + /// Returns heaviest tipset from blockstore pub fn heaviest_tipset(&self) -> &Tipset { &self.heaviest } + /// Returns key-value store instance - pub fn blockstore(&self) -> &Blockstore { + pub fn blockstore(&self) -> &DB { &self.db } + /// Returns Tipset from key-value store from provided cids pub fn tipset(&self, cids: &[Cid]) -> Result { let mut block_headers = Vec::new(); @@ -138,6 +145,7 @@ impl<'a> ChainStore<'a> { let secp_msgs: Vec = self.messages_from_cids(Vec::new())?; Ok((bls_msgs, secp_msgs)) } + /// Returns messages from key-value store pub fn messages_from_cids(&self, keys: Vec<&Cid>) -> Result, Error> where diff --git a/blockchain/chain_sync/Cargo.toml b/blockchain/chain_sync/Cargo.toml index e7f3fe4f8d88..383615291946 100644 --- a/blockchain/chain_sync/Cargo.toml +++ b/blockchain/chain_sync/Cargo.toml @@ -12,7 +12,7 @@ db = { path = "../../node/db" } encoding = { package = "forest_encoding", path = "../../encoding" } libp2p = "0.15.0" cid = { package = "forest_cid", path = "../../ipld/cid" } -blockstore = { package = "ipld_blockstore", path = "../../ipld/blockstore" } +ipld_blockstore = { path = "../../ipld/blockstore" } chain = { path = "../chain" } message = { package = "forest_message", path = "../../vm/message" } multihash = "0.9.4" diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index c1043475b2fa..efe14ad58dbe 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -8,11 +8,11 @@ use super::manager::SyncManager; use address::Address; use amt::AMT; use blocks::{Block, FullTipset, TipSetKeys, Tipset, TxMeta}; -use blockstore::BlockStore; use chain::ChainStore; use cid::Cid; use crypto::is_valid_signature; use encoding::{Cbor, Error as EncodingError}; +use ipld_blockstore::BlockStore; use libp2p::core::PeerId; use message::Message; use num_bigint::BigUint; @@ -20,18 +20,16 @@ use state_manager::StateManager; use state_tree::{HamtStateTree, StateTree}; use std::collections::HashMap; -/// Syncer updates the key-value store based on series of validation checks adhering to consensus rules, can query -/// the network for blocks and assists in informing the network of incoming blocks -pub struct Syncer<'a, 'b, T: StateTree> { +pub struct ChainSyncer<'db, DB, ST> { // TODO add ability to send msg to all subscribers indicating incoming blocks // TODO add block sync /// manages retrieving and updates state objects - state_manager: StateManager<'a, 'b, T>, - /// manages sync buckets - sync_manager: SyncManager, - /// access and store tipsets / blocks / messages - chain_store: ChainStore<'a>, - /// the known genesis tipset + state_manager: StateManager<'db, ST, DB>, + // manages sync buckets + chain_sync: SyncManager, + // access and store tipsets / blocks / messages + chain_store: ChainStore<'db, DB>, + // the known genesis tipset _genesis: Tipset, /// self peerId _own: PeerId, @@ -43,9 +41,10 @@ struct MsgMetaData { sequence: u64, } -impl<'a, 'b, T> Syncer<'a, 'b, T> +impl<'a, DB, ST> ChainSyncer<'a, DB, ST> where - T: StateTree, + DB: BlockStore, + ST: StateTree, { /// TODO add constructor @@ -65,14 +64,15 @@ where // TODO Add peer to blocksync // compare target_weight to heaviest weight stored; ignore otherwise - let best_weight = self.chain_store.heaviest_tipset().blocks()[0].weight(); + let heaviest_tipset = self.chain_store.heaviest_tipset(); + let best_weight = heaviest_tipset.blocks()[0].weight(); let target_weight = fts.blocks()[0].header().weight(); if !target_weight.lt(&best_weight) { // Store incoming block header self.chain_store.persist_headers(&fts.tipset()?)?; // Set peer head - self.sync_manager.set_peer_head(from, fts.tipset()?); + self.chain_sync.set_peer_head(from, fts.tipset()?); } // incoming tipset from miners does not appear to be better than our best chain, ignoring for now Ok(()) diff --git a/blockchain/state_manager/src/lib.rs b/blockchain/state_manager/src/lib.rs index c3f39df86643..650043e15dee 100644 --- a/blockchain/state_manager/src/lib.rs +++ b/blockchain/state_manager/src/lib.rs @@ -1,31 +1,29 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -#![allow(dead_code)] - mod errors; pub use self::errors::*; use actor::{MinerInfo, StorageMinerActorState}; use address::Address; use blockstore::BlockStore; -use chain::ChainStore; use encoding::de::DeserializeOwned; use state_tree::StateTree; /// Intermediary for retrieving state objects and updating actor states -pub struct StateManager<'a, 'b, T: StateTree> { - cs: &'b ChainStore<'a>, - tree: T, +pub struct StateManager<'db, ST, DB> { + bs: &'db DB, + tree: ST, } -impl<'a, 'b, T> StateManager<'a, 'b, T> +impl<'db, T, DB> StateManager<'db, T, DB> where T: StateTree, + DB: BlockStore, { /// constructor - pub fn new(cs: &'b ChainStore<'a>, tree: T) -> Self { - Self { cs, tree } + pub fn new(bs: &'db DB, tree: T) -> Self { + Self { bs, tree } } /// Loads actor state from IPLD Store fn load_actor_state(&self, addr: &Address) -> Result @@ -36,7 +34,7 @@ where .tree .get_actor(addr) .ok_or_else(|| Error::State("Could not retrieve actor from state tree".to_owned()))?; - let act: D = self.cs.blockstore().get(&actor.state)?.ok_or_else(|| { + let act: D = self.bs.get(&actor.state)?.ok_or_else(|| { Error::State("Could not retrieve actor state from IPLD store".to_owned()) })?; Ok(act) @@ -49,7 +47,7 @@ where /// Returns the amount of space in each sector committed to the network by this miner pub fn miner_sector_size(&self, addr: &Address) -> Result { let act: StorageMinerActorState = self.load_actor_state(addr)?; - let info: MinerInfo = self.cs.blockstore().get(&act.info)?.ok_or_else(|| { + let info: MinerInfo = self.bs.get(&act.info)?.ok_or_else(|| { Error::State("Could not retrieve miner info from IPLD store".to_owned()) })?; Ok(*info.sector_size()) diff --git a/ipld/amt/Cargo.toml b/ipld/amt/Cargo.toml index 41f5ad00a74d..31c6f461ca2e 100644 --- a/ipld/amt/Cargo.toml +++ b/ipld/amt/Cargo.toml @@ -9,7 +9,7 @@ cid = { package = "forest_cid", path = "../cid" } db = { path = "../../node/db" } encoding = { package = "forest_encoding", path = "../../encoding" } serde = { version = "1.0", features = ["derive"] } -blockstore = { package = "ipld_blockstore", path = "../blockstore" } +ipld_blockstore = { path = "../blockstore" } [dev-dependencies] hex = "0.4.0" diff --git a/ipld/amt/src/amt.rs b/ipld/amt/src/amt.rs index 7dc04087b84c..f33c9dfd4557 100644 --- a/ipld/amt/src/amt.rs +++ b/ipld/amt/src/amt.rs @@ -2,9 +2,9 @@ // SPDX-License-Identifier: Apache-2.0, MIT use crate::{node::Link, nodes_for_height, BitMap, Error, Node, Root, MAX_INDEX, WIDTH}; -use blockstore::BlockStore; use cid::Cid; use encoding::{de::DeserializeOwned, ser::Serialize}; +use ipld_blockstore::BlockStore; /// Array Mapped Trie allows for the insertion and persistence of data, serializable to a CID /// diff --git a/ipld/amt/src/node.rs b/ipld/amt/src/node.rs index 369d615e8d95..25019f48c530 100644 --- a/ipld/amt/src/node.rs +++ b/ipld/amt/src/node.rs @@ -2,12 +2,12 @@ // SPDX-License-Identifier: Apache-2.0, MIT use crate::{nodes_for_height, BitMap, Error, WIDTH}; -use blockstore::BlockStore; use cid::Cid; use encoding::{ de::{self, Deserialize, DeserializeOwned}, ser::{self, Serialize}, }; +use ipld_blockstore::BlockStore; /// This represents a link to another Node #[derive(PartialEq, Eq, Clone, Debug)] diff --git a/ipld/amt/tests/amt_tests.rs b/ipld/amt/tests/amt_tests.rs index c675c6d4c7c4..d9208a4740cb 100644 --- a/ipld/amt/tests/amt_tests.rs +++ b/ipld/amt/tests/amt_tests.rs @@ -1,9 +1,9 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use blockstore::BlockStore; use encoding::{de::DeserializeOwned, ser::Serialize}; use ipld_amt::{Error, AMT, MAX_INDEX}; +use ipld_blockstore::BlockStore; use std::fmt::Debug; fn assert_get(a: &mut AMT, i: u64, v: &V) diff --git a/vm/interpreter/Cargo.toml b/vm/interpreter/Cargo.toml index 969f05e64389..4574787006a3 100644 --- a/vm/interpreter/Cargo.toml +++ b/vm/interpreter/Cargo.toml @@ -12,3 +12,4 @@ blocks = { package = "forest_blocks", path = "../../blockchain/blocks" } chain = { path = "../../blockchain/chain" } clock = { path = "../../node/clock" } vm = { path = "../" } +ipld_blockstore = { path = "../../ipld/blockstore" } diff --git a/vm/interpreter/src/lib.rs b/vm/interpreter/src/lib.rs index 38dfa3d1512e..7ddbf04af350 100644 --- a/vm/interpreter/src/lib.rs +++ b/vm/interpreter/src/lib.rs @@ -5,6 +5,7 @@ use address::Address; use blocks::Tipset; use chain::ChainStore; use clock::ChainEpoch; +use ipld_blockstore::BlockStore; use message::{MessageReceipt, SignedMessage, UnsignedMessage}; use state_tree::StateTree; use vm::TokenAmount; @@ -26,12 +27,16 @@ impl VMInterpreter { /// Applies the state transition for a single message /// Returns result StateTree, receipts from the transaction, and the miner penalty token amount - pub fn apply_message( + pub fn apply_message<'db, DB, ST>( _in_tree: &ST, - _chain: &ChainStore, + _chain: &ChainStore<'db, DB>, _msg: &UnsignedMessage, _miner_addr: &Address, - ) -> (ST, MessageReceipt, TokenAmount) { + ) -> (ST, MessageReceipt, TokenAmount) + where + DB: BlockStore, + ST: StateTree, + { // TODO todo!() } From a9079765ca58aac86736677939e189427f229854 Mon Sep 17 00:00:00 2001 From: austinabell Date: Sun, 23 Feb 2020 15:07:44 -0500 Subject: [PATCH 04/21] Wip set up framework for syncing and persisting headers (broken during cherry-pick) --- blockchain/chain/src/store/chain_store.rs | 10 +-- blockchain/chain_sync/Cargo.toml | 1 + blockchain/chain_sync/src/lib.rs | 3 +- blockchain/chain_sync/src/sync.rs | 88 +++++++++++++++++++---- blockchain/state_manager/src/lib.rs | 8 +-- 5 files changed, 88 insertions(+), 22 deletions(-) diff --git a/blockchain/chain/src/store/chain_store.rs b/blockchain/chain/src/store/chain_store.rs index 1f90979e7a62..73e8e07521f1 100644 --- a/blockchain/chain/src/store/chain_store.rs +++ b/blockchain/chain/src/store/chain_store.rs @@ -9,6 +9,7 @@ use encoding::{de::DeserializeOwned, from_slice, Cbor}; use ipld_blockstore::BlockStore; use message::{SignedMessage, UnsignedMessage}; use num_bigint::BigUint; +use std::rc::Rc; /// Generic implementation of the datastore trait and structures pub struct ChainStore<'db, DB> { @@ -23,7 +24,8 @@ pub struct ChainStore<'db, DB> { genesis: Cid, // Tipset at the head of the best-known chain. - heaviest: Tipset, + // TODO revisit if this should be pointer to tipset on heap + heaviest: Rc, // tip_index tracks tipsets by epoch/parentset for use by expected consensus. tip_index: TipIndex, @@ -34,7 +36,7 @@ where DB: BlockStore, { /// constructor - pub fn new(db: &'db DB, gen: Cid, heaviest: Tipset) -> Result { + pub fn new(db: &'db DB, gen: Cid, heaviest: Rc) -> Result { Ok(Self { db, tip_index: TipIndex::new(), @@ -104,8 +106,8 @@ where } /// Returns heaviest tipset from blockstore - pub fn heaviest_tipset(&self) -> &Tipset { - &self.heaviest + pub fn heaviest_tipset(&self) -> Rc { + self.heaviest.clone() } /// Returns key-value store instance diff --git a/blockchain/chain_sync/Cargo.toml b/blockchain/chain_sync/Cargo.toml index 383615291946..f498d6da08f6 100644 --- a/blockchain/chain_sync/Cargo.toml +++ b/blockchain/chain_sync/Cargo.toml @@ -20,6 +20,7 @@ state_tree = { path = "../../vm/state_tree/" } state_manager = { path = "../state_manager/" } num-bigint = { path = "../../math/bigint", package = "forest_bigint" } crypto = { path= "../../crypto" } +log = "0.4.8" [dev-dependencies] cid = { package = "forest_cid", path = "../../ipld/cid" } diff --git a/blockchain/chain_sync/src/lib.rs b/blockchain/chain_sync/src/lib.rs index c505b6fe97ef..37aefd026bcd 100644 --- a/blockchain/chain_sync/src/lib.rs +++ b/blockchain/chain_sync/src/lib.rs @@ -6,5 +6,6 @@ mod errors; mod manager; mod sync; -pub use self::errors::*; +pub use self::errors::Error; pub use self::manager::SyncManager; +pub use sync::*; diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index efe14ad58dbe..4e8bed573bcf 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -1,8 +1,6 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -#![allow(dead_code)] - use super::errors::Error; use super::manager::SyncManager; use address::Address; @@ -11,28 +9,31 @@ use blocks::{Block, FullTipset, TipSetKeys, Tipset, TxMeta}; use chain::ChainStore; use cid::Cid; use crypto::is_valid_signature; +use db::Error as DBError; use encoding::{Cbor, Error as EncodingError}; use ipld_blockstore::BlockStore; use libp2p::core::PeerId; +use log::info; use message::Message; use num_bigint::BigUint; use state_manager::StateManager; use state_tree::{HamtStateTree, StateTree}; use std::collections::HashMap; +use std::rc::Rc; pub struct ChainSyncer<'db, DB, ST> { // TODO add ability to send msg to all subscribers indicating incoming blocks // TODO add block sync /// manages retrieving and updates state objects - state_manager: StateManager<'db, ST, DB>, + state_manager: StateManager<'db, DB, ST>, // manages sync buckets - chain_sync: SyncManager, + sync_manager: SyncManager, + // access and store tipsets / blocks / messages chain_store: ChainStore<'db, DB>, + // the known genesis tipset _genesis: Tipset, - /// self peerId - _own: PeerId, } /// Message data used to ensure valid state transition @@ -41,17 +42,43 @@ struct MsgMetaData { sequence: u64, } -impl<'a, DB, ST> ChainSyncer<'a, DB, ST> +impl<'db, DB> ChainSyncer<'db, DB, HamtStateTree> where DB: BlockStore, - ST: StateTree, { - /// TODO add constructor + pub fn new(chain_store: ChainStore<'db, DB>) -> Self { + // TODO import genesis from storage + let _genesis = Tipset::default(); + + // TODO change from being default when impl + let sync_manager = SyncManager::default(); + + Self { + chain_store, + _genesis, + sync_manager, + } + } + /// Starts syncing process + pub fn sync(&mut self, head: Rc) -> Result<(), Error> { + info!("Starting syncing process"); + + // Get heaviest tipset from storage to sync toward + let heaviest = self.chain_store.heaviest_tipset(); + + // Sync headers from network from head to heaviest from storage + let headers = self.sync_headers_reverse(head, heaviest)?; + + // Persist header chain pulled from network + self.persist_headers(&headers)?; + + Ok(()) + } /// informs the syncer about a new potential tipset /// This should be called when connecting to new peers, and additionally /// when receiving new blocks from the network - fn inform_new_head(&self, from: PeerId, fts: FullTipset) -> Result<(), Error> { + pub fn inform_new_head(&self, from: PeerId, fts: FullTipset) -> Result<(), Error> { // check if full block is nil and if so return error if fts.blocks().is_empty() { return Err(Error::NoBlocks); @@ -72,7 +99,7 @@ where // Store incoming block header self.chain_store.persist_headers(&fts.tipset()?)?; // Set peer head - self.chain_sync.set_peer_head(from, fts.tipset()?); + self.sync_manager.set_peer_head(from, fts.tipset()?); } // incoming tipset from miners does not appear to be better than our best chain, ignoring for now Ok(()) @@ -110,7 +137,7 @@ where } /// Returns FullTipset from store if TipSetKeys exist in key-value store otherwise requests FullTipset /// from block sync - fn fetch_tipsets(&self, _peer_id: PeerId, tsk: TipSetKeys) -> Result { + pub fn fetch_tipset(&self, _peer_id: PeerId, tsk: TipSetKeys) -> Result { let fts = match self.load_fts(tsk) { Ok(fts) => fts, // TODO call into block sync to request FullTipset -> self.blocksync.get_full_tipset(_peer_id, tsk) @@ -253,8 +280,43 @@ where Ok(()) } + + /// Syncs chain data and persists it to blockstore + fn sync_headers_reverse( + &mut self, + head: Rc, + _to: Rc, + ) -> Result, Error> { + info!("Syncing headers from: {:?}", head.key()); + + // Loop until most recent tipset height is less than to tipset height + { + // Check parent cids + + // Try to load tipset from local storage + + // Load blocks from network using blocksync + + // Loop through each tipset received from network + { + // Check Cids of blocks against bad block cache + {} + + // Add tipset to vector of tipsets to return + } + } + + todo!() + } + + // Persists headers from tipset slice to chain store + fn persist_headers(&self, tipsets: &[Tipset]) -> Result<(), DBError> { + tipsets + .iter() + .try_for_each(|ts| self.chain_store.persist_headers(ts)) + } } -pub fn cids_from_messages(messages: &[T]) -> Result, EncodingError> { +fn cids_from_messages(messages: &[T]) -> Result, EncodingError> { messages.iter().map(Cbor::cid).collect() } diff --git a/blockchain/state_manager/src/lib.rs b/blockchain/state_manager/src/lib.rs index 650043e15dee..73e373e6df6b 100644 --- a/blockchain/state_manager/src/lib.rs +++ b/blockchain/state_manager/src/lib.rs @@ -11,18 +11,18 @@ use encoding::de::DeserializeOwned; use state_tree::StateTree; /// Intermediary for retrieving state objects and updating actor states -pub struct StateManager<'db, ST, DB> { +pub struct StateManager<'db, DB, ST> { bs: &'db DB, tree: ST, } -impl<'db, T, DB> StateManager<'db, T, DB> +impl<'db, DB, ST> StateManager<'db, DB, ST> where - T: StateTree, + ST: StateTree, DB: BlockStore, { /// constructor - pub fn new(bs: &'db DB, tree: T) -> Self { + pub fn new(bs: &'db DB, tree: ST) -> Self { Self { bs, tree } } /// Loads actor state from IPLD Store From 905b412d6c5f666bc9ed0b5c1d39ee852ffb1ee2 Mon Sep 17 00:00:00 2001 From: austinabell Date: Sun, 23 Feb 2020 17:08:53 -0500 Subject: [PATCH 05/21] Build out syncing and update function APIs --- blockchain/blocks/src/tipset.rs | 4 +++ blockchain/chain/src/store/chain_store.rs | 6 ++-- blockchain/chain_sync/src/sync.rs | 38 +++++++++++++++++------ node/clock/src/lib.rs | 2 +- 4 files changed, 36 insertions(+), 14 deletions(-) diff --git a/blockchain/blocks/src/tipset.rs b/blockchain/blocks/src/tipset.rs index 17972a1106b5..5f7fc9a86e16 100644 --- a/blockchain/blocks/src/tipset.rs +++ b/blockchain/blocks/src/tipset.rs @@ -145,6 +145,10 @@ impl Tipset { }, }) } + /// Returns epoch of the tipset + pub fn epoch(&self) -> &ChainEpoch { + &self.blocks[0].epoch() + } /// Returns all blocks in tipset pub fn blocks(&self) -> &[BlockHeader] { &self.blocks diff --git a/blockchain/chain/src/store/chain_store.rs b/blockchain/chain/src/store/chain_store.rs index 73e8e07521f1..bdff45ab5df9 100644 --- a/blockchain/chain/src/store/chain_store.rs +++ b/blockchain/chain/src/store/chain_store.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0, MIT use super::{Error, TipIndex, TipSetMetadata}; -use blocks::{BlockHeader, Tipset}; +use blocks::{BlockHeader, Tipset, TipSetKeys}; use cid::Cid; use db::Error as DbError; use encoding::{de::DeserializeOwned, from_slice, Cbor}; @@ -116,9 +116,9 @@ where } /// Returns Tipset from key-value store from provided cids - pub fn tipset(&self, cids: &[Cid]) -> Result { + pub fn tipset_from_keys(&self, cids: &TipSetKeys) -> Result { let mut block_headers = Vec::new(); - for c in cids { + for c in cids.tipset_keys() { let raw_header = self.db.read(c.key())?; if let Some(x) = raw_header { // decode raw header into BlockHeader diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index 4e8bed573bcf..ec2d94672406 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -137,7 +137,7 @@ where } /// Returns FullTipset from store if TipSetKeys exist in key-value store otherwise requests FullTipset /// from block sync - pub fn fetch_tipset(&self, _peer_id: PeerId, tsk: TipSetKeys) -> Result { + pub fn fetch_tipset(&self, _peer_id: PeerId, tsk: &TipSetKeys) -> Result { let fts = match self.load_fts(tsk) { Ok(fts) => fts, // TODO call into block sync to request FullTipset -> self.blocksync.get_full_tipset(_peer_id, tsk) @@ -146,10 +146,10 @@ where Ok(fts) } /// Returns a reconstructed FullTipset from store if keys exist - fn load_fts(&self, keys: TipSetKeys) -> Result { + fn load_fts(&self, keys: &TipSetKeys) -> Result { let mut blocks = Vec::new(); // retrieve tipset from store based on passed in TipSetKeys - let ts = self.chain_store.tipset(keys.tipset_keys())?; + let ts = self.chain_store.tipset_from_keys(keys)?; for header in ts.blocks() { // retrieve bls and secp messages from specified BlockHeader let (bls_msgs, secp_msgs) = self.chain_store.messages(&header)?; @@ -255,7 +255,7 @@ where return Err(Error::Validation("Signature is nil in header".to_string())); } - let base_tipset = self.load_fts(TipSetKeys::new(header.parents().cids.clone()))?; + let base_tipset = self.load_fts(&TipSetKeys::new(header.parents().cids.clone()))?; // time stamp checks header.validate_timestamps(&base_tipset)?; @@ -285,17 +285,35 @@ where fn sync_headers_reverse( &mut self, head: Rc, - _to: Rc, - ) -> Result, Error> { + to: Rc, + ) -> Result>, Error> { info!("Syncing headers from: {:?}", head.key()); + let mut return_set = vec![Rc::clone(&head)]; + + let to_epoch = to.blocks()[0].epoch(); + + let mut accepted_blocks: Vec = Vec::new(); + // Loop until most recent tipset height is less than to tipset height - { + while let Some(ts) = return_set.last() { + if ts.epoch() < to_epoch { + // Current tipset is less than epoch of tipset syncing toward + break; + } + // Check parent cids + // TODO check if cids exist in rejected blocks cache when implemented - // Try to load tipset from local storage + // Try to load parent tipset from local storage + if let Ok(ts) = self.chain_store.tipset_from_keys(ts.parents()) { + // Add blocks in tipset to accepted chain and push the tipset to return set + accepted_blocks.extend_from_slice(ts.key().tipset_keys()); + return_set.push(Rc::new(ts)); + } // Load blocks from network using blocksync + // TODO once blocksync interface added // Loop through each tipset received from network { @@ -306,11 +324,11 @@ where } } - todo!() + Ok(return_set) } // Persists headers from tipset slice to chain store - fn persist_headers(&self, tipsets: &[Tipset]) -> Result<(), DBError> { + fn persist_headers(&self, tipsets: &[Rc]) -> Result<(), DBError> { tipsets .iter() .try_for_each(|ts| self.chain_store.persist_headers(ts)) diff --git a/node/clock/src/lib.rs b/node/clock/src/lib.rs index 13d42a6475f4..5f9978857c60 100644 --- a/node/clock/src/lib.rs +++ b/node/clock/src/lib.rs @@ -12,7 +12,7 @@ use std::ops::Sub; const _ISO_FORMAT: &str = "%FT%X.%.9F"; const EPOCH_DURATION: i32 = 15; -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default, PartialOrd)] /// An epoch represents a single valid state in the blockchain pub struct ChainEpoch(u64); From 4310e5408363b0e4403ab42666ab9fa35cb885a5 Mon Sep 17 00:00:00 2001 From: austinabell Date: Mon, 24 Feb 2020 12:34:51 -0500 Subject: [PATCH 06/21] Implement more header syncing logic and updated functions (broken during cherry-pick) --- blockchain/blocks/src/tipset.rs | 7 ++- blockchain/chain/Cargo.toml | 1 + blockchain/chain/src/blocksync/mod.rs | 6 ++ blockchain/chain/src/blocksync/provider.rs | 10 ++++ blockchain/chain/src/lib.rs | 2 + blockchain/chain/src/store/chain_store.rs | 6 +- blockchain/chain_sync/src/errors.rs | 3 + blockchain/chain_sync/src/sync.rs | 70 ++++++++++++++++++---- node/clock/src/lib.rs | 18 +++++- 9 files changed, 103 insertions(+), 20 deletions(-) create mode 100644 blockchain/chain/src/blocksync/mod.rs create mode 100644 blockchain/chain/src/blocksync/provider.rs diff --git a/blockchain/blocks/src/tipset.rs b/blockchain/blocks/src/tipset.rs index 5f7fc9a86e16..57fa7c7e1e1c 100644 --- a/blockchain/blocks/src/tipset.rs +++ b/blockchain/blocks/src/tipset.rs @@ -22,10 +22,10 @@ pub struct TipSetKeys { } impl TipSetKeys { - /// constructor pub fn new(cids: Vec) -> Self { Self { cids } } + /// checks whether the set contains exactly the same CIDs as another. fn equals(&self, key: &TipSetKeys) -> bool { if self.cids.len() != key.cids.len() { @@ -38,8 +38,9 @@ impl TipSetKeys { } true } - /// Returns tipset keys - pub fn tipset_keys(&self) -> &[Cid] { + + /// Returns tipset header cids + pub fn cids(&self) -> &[Cid] { &self.cids } } diff --git a/blockchain/chain/Cargo.toml b/blockchain/chain/Cargo.toml index dd3e61d7a51b..e1bddd63c051 100644 --- a/blockchain/chain/Cargo.toml +++ b/blockchain/chain/Cargo.toml @@ -15,6 +15,7 @@ serde = { version = "1.0", features = ["derive"] } num-bigint = { path = "../../math/bigint", package = "forest_bigint" } message = { package = "forest_message", path = "../../vm/message" } ipld_blockstore = { path = "../../ipld/blockstore" } +async-trait = "0.1.24" [dev-dependencies] address = { package = "forest_address", path = "../../vm/address" } diff --git a/blockchain/chain/src/blocksync/mod.rs b/blockchain/chain/src/blocksync/mod.rs new file mode 100644 index 000000000000..def19ef1ee2f --- /dev/null +++ b/blockchain/chain/src/blocksync/mod.rs @@ -0,0 +1,6 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +mod provider; + +pub use self::provider::*; diff --git a/blockchain/chain/src/blocksync/provider.rs b/blockchain/chain/src/blocksync/provider.rs new file mode 100644 index 000000000000..4a553f6c12bb --- /dev/null +++ b/blockchain/chain/src/blocksync/provider.rs @@ -0,0 +1,10 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use async_trait::async_trait; +use blocks::{TipSetKeys, Tipset}; + +#[async_trait] +pub trait BlockSyncProvider { + async fn get_headers(&self, tsk: &TipSetKeys, count: u64) -> Result, String>; +} diff --git a/blockchain/chain/src/lib.rs b/blockchain/chain/src/lib.rs index d1ac7db3289c..084351f3ad81 100644 --- a/blockchain/chain/src/lib.rs +++ b/blockchain/chain/src/lib.rs @@ -1,6 +1,8 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +mod blocksync; mod store; +pub use self::blocksync::*; pub use self::store::*; diff --git a/blockchain/chain/src/store/chain_store.rs b/blockchain/chain/src/store/chain_store.rs index bdff45ab5df9..9849d9c79b1b 100644 --- a/blockchain/chain/src/store/chain_store.rs +++ b/blockchain/chain/src/store/chain_store.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0, MIT use super::{Error, TipIndex, TipSetMetadata}; -use blocks::{BlockHeader, Tipset, TipSetKeys}; +use blocks::{BlockHeader, TipSetKeys, Tipset}; use cid::Cid; use db::Error as DbError; use encoding::{de::DeserializeOwned, from_slice, Cbor}; @@ -116,9 +116,9 @@ where } /// Returns Tipset from key-value store from provided cids - pub fn tipset_from_keys(&self, cids: &TipSetKeys) -> Result { + pub fn tipset_from_keys(&self, tsk: &TipSetKeys) -> Result { let mut block_headers = Vec::new(); - for c in cids.tipset_keys() { + for c in tsk.cids() { let raw_header = self.db.read(c.key())?; if let Some(x) = raw_header { // decode raw header into BlockHeader diff --git a/blockchain/chain_sync/src/errors.rs b/blockchain/chain_sync/src/errors.rs index caf7467dd210..f4a177dcb621 100644 --- a/blockchain/chain_sync/src/errors.rs +++ b/blockchain/chain_sync/src/errors.rs @@ -31,6 +31,8 @@ pub enum Error { State(String), /// Error in validating arbitrary data Validation(String), + /// Any other error that does not need to be specifically handled + Other(String), } impl fmt::Display for Error { @@ -46,6 +48,7 @@ impl fmt::Display for Error { Error::AMT(msg) => write!(f, "{}", msg), Error::State(msg) => write!(f, "{}", msg), Error::Validation(msg) => write!(f, "{}", msg), + Error::Other(msg) => write!(f, "chain_sync error: {}", msg), } } } diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index ec2d94672406..8bc1dfffa3e2 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -6,7 +6,7 @@ use super::manager::SyncManager; use address::Address; use amt::AMT; use blocks::{Block, FullTipset, TipSetKeys, Tipset, TxMeta}; -use chain::ChainStore; +use chain::{BlockSyncProvider, ChainStore}; use cid::Cid; use crypto::is_valid_signature; use db::Error as DBError; @@ -18,6 +18,7 @@ use message::Message; use num_bigint::BigUint; use state_manager::StateManager; use state_tree::{HamtStateTree, StateTree}; +use std::cmp::min; use std::collections::HashMap; use std::rc::Rc; @@ -32,6 +33,9 @@ pub struct ChainSyncer<'db, DB, ST> { // access and store tipsets / blocks / messages chain_store: ChainStore<'db, DB>, + /// Provider for BlockSync service + block_sync: BS, + // the known genesis tipset _genesis: Tipset, } @@ -42,11 +46,12 @@ struct MsgMetaData { sequence: u64, } -impl<'db, DB> ChainSyncer<'db, DB, HamtStateTree> +impl<'db, DB, BS> ChainSyncer<'db, DB, BS> where DB: BlockStore, + BS: BlockSyncProvider, { - pub fn new(chain_store: ChainStore<'db, DB>) -> Self { + pub fn new(chain_store: ChainStore<'db, DB>, block_sync: BS) -> Self { // TODO import genesis from storage let _genesis = Tipset::default(); @@ -55,20 +60,21 @@ where Self { chain_store, + block_sync, _genesis, sync_manager, } } /// Starts syncing process - pub fn sync(&mut self, head: Rc) -> Result<(), Error> { + pub async fn sync(&mut self, head: Rc) -> Result<(), Error> { info!("Starting syncing process"); // Get heaviest tipset from storage to sync toward let heaviest = self.chain_store.heaviest_tipset(); // Sync headers from network from head to heaviest from storage - let headers = self.sync_headers_reverse(head, heaviest)?; + let headers = self.sync_headers_reverse(head, heaviest).await?; // Persist header chain pulled from network self.persist_headers(&headers)?; @@ -282,7 +288,7 @@ where } /// Syncs chain data and persists it to blockstore - fn sync_headers_reverse( + async fn sync_headers_reverse( &mut self, head: Rc, to: Rc, @@ -293,10 +299,11 @@ where let to_epoch = to.blocks()[0].epoch(); + // TODO use accepted_blocks as cache to mark bad blocks on failed sync let mut accepted_blocks: Vec = Vec::new(); // Loop until most recent tipset height is less than to tipset height - while let Some(ts) = return_set.last() { + 'sync: while let Some(ts) = return_set.last() { if ts.epoch() < to_epoch { // Current tipset is less than epoch of tipset syncing toward break; @@ -308,25 +315,66 @@ where // Try to load parent tipset from local storage if let Ok(ts) = self.chain_store.tipset_from_keys(ts.parents()) { // Add blocks in tipset to accepted chain and push the tipset to return set - accepted_blocks.extend_from_slice(ts.key().tipset_keys()); + accepted_blocks.extend_from_slice(ts.key().cids()); return_set.push(Rc::new(ts)); + continue; } + const REQUEST_WINDOW: u64 = 500; + let epoch_diff = u64::from(ts.epoch() - to_epoch); + let window = min(epoch_diff, REQUEST_WINDOW); + // Load blocks from network using blocksync - // TODO once blocksync interface added + let tipsets: Vec = self + .block_sync + .get_headers(ts.parents(), window) + .await + .map_err(|e| Error::Other(e))?; // Loop through each tipset received from network - { + for ts in tipsets { + if ts.epoch() < to_epoch { + // Break out of sync loop if epoch lower than to tipset + // This should not be hit if response from server is correct + break 'sync; + } // Check Cids of blocks against bad block cache - {} + for _ in ts.key().cids() { + // TODO + } + accepted_blocks.extend_from_slice(ts.key().cids()); // Add tipset to vector of tipsets to return + return_set.push(Rc::new(ts)); } } + let last_ts = return_set.last().ok_or(Error::Other( + "Return set should contain a tipset".to_owned(), + ))?; + + // Check if local chain was fork + if last_ts.key().cids() != to.key().cids() { + if last_ts.parents() == to.parents() { + // block received part of same tipset as best block + // This removes need to sync fork + return Ok(return_set); + } + self.sync_fork(last_ts.clone(), to).await?; + } + Ok(return_set) } + async fn sync_fork( + &mut self, + _head: Rc, + _to: Rc, + ) -> Result>, Error> { + // TODO sync fork until tipsets are equal or reaches genesis + todo!() + } + // Persists headers from tipset slice to chain store fn persist_headers(&self, tipsets: &[Rc]) -> Result<(), DBError> { tipsets diff --git a/node/clock/src/lib.rs b/node/clock/src/lib.rs index 5f9978857c60..ba4c185e2e88 100644 --- a/node/clock/src/lib.rs +++ b/node/clock/src/lib.rs @@ -77,9 +77,21 @@ impl Sub for ChainEpoch { type Output = ChainEpoch; fn sub(self, other: ChainEpoch) -> ChainEpoch { - ChainEpoch { - 0: self.0 - other.0, - } + ChainEpoch(self.0 - other.0) + } +} + +impl Sub for &ChainEpoch { + type Output = ChainEpoch; + + fn sub(self, other: &ChainEpoch) -> ChainEpoch { + ChainEpoch(self.0 - other.0) + } +} + +impl From for u64 { + fn from(ce: ChainEpoch) -> u64 { + ce.0 } } From 8e9000f9e037bcbed4debac066cb417d94159e81 Mon Sep 17 00:00:00 2001 From: austinabell Date: Wed, 26 Feb 2020 14:26:33 -0500 Subject: [PATCH 07/21] Connected p2p and chain sync for requests (wow what a fun cherry-pick!) --- .gitignore | 3 +- blockchain/blocks/src/header.rs | 2 +- blockchain/blocks/src/tipset.rs | 2 +- blockchain/chain/src/blocksync/mod.rs | 6 - blockchain/chain/src/blocksync/provider.rs | 10 -- blockchain/chain/src/lib.rs | 2 - blockchain/chain/src/store/chain_store.rs | 21 ++- blockchain/chain_sync/Cargo.toml | 7 +- blockchain/chain_sync/src/bucket.rs | 26 +-- blockchain/chain_sync/src/lib.rs | 2 + blockchain/chain_sync/src/manager.rs | 6 +- blockchain/chain_sync/src/network_context.rs | 89 ++++++++++ blockchain/chain_sync/src/sync.rs | 173 ++++++++++++++----- blockchain/chain_sync/tests/manager_test.rs | 10 +- forest/Cargo.toml | 1 + forest/src/main.rs | 18 ++ node/forest_libp2p/src/rpc/mod.rs | 2 +- node/forest_libp2p/src/service.rs | 8 +- 18 files changed, 286 insertions(+), 102 deletions(-) delete mode 100644 blockchain/chain/src/blocksync/mod.rs delete mode 100644 blockchain/chain/src/blocksync/provider.rs create mode 100644 blockchain/chain_sync/src/network_context.rs diff --git a/.gitignore b/.gitignore index 49d0978d071e..419691178107 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ .idea/ config.toml .DS_STORE -.vscode \ No newline at end of file +.vscode +/chain_db diff --git a/blockchain/blocks/src/header.rs b/blockchain/blocks/src/header.rs index 7a4dd91ffff1..74ea48107399 100644 --- a/blockchain/blocks/src/header.rs +++ b/blockchain/blocks/src/header.rs @@ -42,7 +42,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; /// .build_and_validate() /// .unwrap(); /// ``` -#[derive(Clone, Debug, PartialEq, Builder)] +#[derive(Clone, Debug, PartialEq, Builder, Default)] #[builder(name = "BlockHeaderBuilder")] pub struct BlockHeader { // CHAIN LINKING diff --git a/blockchain/blocks/src/tipset.rs b/blockchain/blocks/src/tipset.rs index 57fa7c7e1e1c..ff5a945690bc 100644 --- a/blockchain/blocks/src/tipset.rs +++ b/blockchain/blocks/src/tipset.rs @@ -66,7 +66,7 @@ impl<'de> de::Deserialize<'de> for TipSetKeys { /// An immutable set of blocks at the same height with the same parent set. /// Blocks in a tipset are canonically ordered by ticket size. -#[derive(Clone, PartialEq, Debug, Default)] +#[derive(Clone, PartialEq, Debug)] pub struct Tipset { blocks: Vec, key: TipSetKeys, diff --git a/blockchain/chain/src/blocksync/mod.rs b/blockchain/chain/src/blocksync/mod.rs deleted file mode 100644 index def19ef1ee2f..000000000000 --- a/blockchain/chain/src/blocksync/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -// Copyright 2020 ChainSafe Systems -// SPDX-License-Identifier: Apache-2.0, MIT - -mod provider; - -pub use self::provider::*; diff --git a/blockchain/chain/src/blocksync/provider.rs b/blockchain/chain/src/blocksync/provider.rs deleted file mode 100644 index 4a553f6c12bb..000000000000 --- a/blockchain/chain/src/blocksync/provider.rs +++ /dev/null @@ -1,10 +0,0 @@ -// Copyright 2020 ChainSafe Systems -// SPDX-License-Identifier: Apache-2.0, MIT - -use async_trait::async_trait; -use blocks::{TipSetKeys, Tipset}; - -#[async_trait] -pub trait BlockSyncProvider { - async fn get_headers(&self, tsk: &TipSetKeys, count: u64) -> Result, String>; -} diff --git a/blockchain/chain/src/lib.rs b/blockchain/chain/src/lib.rs index 084351f3ad81..d1ac7db3289c 100644 --- a/blockchain/chain/src/lib.rs +++ b/blockchain/chain/src/lib.rs @@ -1,8 +1,6 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -mod blocksync; mod store; -pub use self::blocksync::*; pub use self::store::*; diff --git a/blockchain/chain/src/store/chain_store.rs b/blockchain/chain/src/store/chain_store.rs index 9849d9c79b1b..bf0563ed84f3 100644 --- a/blockchain/chain/src/store/chain_store.rs +++ b/blockchain/chain/src/store/chain_store.rs @@ -9,7 +9,7 @@ use encoding::{de::DeserializeOwned, from_slice, Cbor}; use ipld_blockstore::BlockStore; use message::{SignedMessage, UnsignedMessage}; use num_bigint::BigUint; -use std::rc::Rc; +use std::sync::Arc; /// Generic implementation of the datastore trait and structures pub struct ChainStore<'db, DB> { @@ -20,12 +20,9 @@ pub struct ChainStore<'db, DB> { // key-value datastore db: &'db DB, - // CID of the genesis block. - genesis: Cid, - // Tipset at the head of the best-known chain. // TODO revisit if this should be pointer to tipset on heap - heaviest: Rc, + heaviest: Arc, // tip_index tracks tipsets by epoch/parentset for use by expected consensus. tip_index: TipIndex, @@ -36,13 +33,14 @@ where DB: BlockStore, { /// constructor - pub fn new(db: &'db DB, gen: Cid, heaviest: Rc) -> Result { - Ok(Self { + pub fn new(db: &'db DB) -> Self { + // TODO pull heaviest tipset from data storage + let heaviest = Arc::new(Tipset::new(vec![BlockHeader::default()]).unwrap()); + Self { db, tip_index: TipIndex::new(), - genesis: gen, heaviest, - }) + } } /// Sets tip_index tracker @@ -96,7 +94,8 @@ where /// Returns genesis blockheader from blockstore pub fn genesis(&self) -> Result { - let bz = self.db.read(self.genesis.key())?; + // TODO change data store for pulling genesis + let bz = self.db.read("gen_block")?; match bz { None => Err(Error::UndefinedKey( "Genesis key does not exist".to_string(), @@ -106,7 +105,7 @@ where } /// Returns heaviest tipset from blockstore - pub fn heaviest_tipset(&self) -> Rc { + pub fn heaviest_tipset(&self) -> Arc { self.heaviest.clone() } diff --git a/blockchain/chain_sync/Cargo.toml b/blockchain/chain_sync/Cargo.toml index f498d6da08f6..fa544b8bc6f7 100644 --- a/blockchain/chain_sync/Cargo.toml +++ b/blockchain/chain_sync/Cargo.toml @@ -19,8 +19,13 @@ multihash = "0.9.4" state_tree = { path = "../../vm/state_tree/" } state_manager = { path = "../state_manager/" } num-bigint = { path = "../../math/bigint", package = "forest_bigint" } -crypto = { path= "../../crypto" } +crypto = { path = "../../crypto" } log = "0.4.8" +async-std = { version = "1.4.0", features = ["unstable"] } +forest_libp2p = { path = "../../node/forest_libp2p" } +pin-project = "0.4.8" +fnv = "1.0.6" +futures = "0.3.2" [dev-dependencies] cid = { package = "forest_cid", path = "../../ipld/cid" } diff --git a/blockchain/chain_sync/src/bucket.rs b/blockchain/chain_sync/src/bucket.rs index 543e2ee842ad..60adef048c22 100644 --- a/blockchain/chain_sync/src/bucket.rs +++ b/blockchain/chain_sync/src/bucket.rs @@ -2,21 +2,21 @@ // SPDX-License-Identifier: Apache-2.0, MIT use blocks::Tipset; -use std::rc::Rc; +use std::sync::Arc; /// SyncBucket defines a bucket of tipsets to sync #[derive(Clone, Default)] struct SyncBucket { - tips: Vec>, + tips: Vec>, } impl SyncBucket { /// Constructor for tipset bucket - fn new(tips: Vec>) -> SyncBucket { + fn new(tips: Vec>) -> SyncBucket { Self { tips } } /// heaviest_tipset returns the tipset with the max weight - fn heaviest_tipset(&self) -> Option> { + fn heaviest_tipset(&self) -> Option> { if self.tips.is_empty() { return None; } @@ -34,7 +34,7 @@ impl SyncBucket { false } - fn add(&mut self, ts: Rc) { + fn add(&mut self, ts: Arc) { if !self.tips.iter().any(|t| *t == ts) { self.tips.push(ts); } @@ -48,7 +48,7 @@ pub(crate) struct SyncBucketSet { } impl SyncBucketSet { - pub(crate) fn insert(&mut self, tipset: Rc) { + pub(crate) fn insert(&mut self, tipset: Arc) { for b in self.buckets.iter_mut() { if b.same_chain_as(&tipset) { b.add(tipset); @@ -57,9 +57,9 @@ impl SyncBucketSet { } self.buckets.push(SyncBucket::new(vec![tipset])) } - pub(crate) fn heaviest(&self) -> Option> { + pub(crate) fn heaviest(&self) -> Option> { // Transform max values from each bucket into a Vec - let vals: Vec> = self + let vals: Vec> = self .buckets .iter() .filter_map(|b| b.heaviest_tipset()) @@ -94,8 +94,8 @@ mod tests { #[test] fn heaviest_tipset() { - let l_tip = Rc::new(Tipset::new(vec![create_header(1, b"", b"")]).unwrap()); - let h_tip = Rc::new(Tipset::new(vec![create_header(3, b"", b"")]).unwrap()); + let l_tip = Arc::new(Tipset::new(vec![create_header(1, b"", b"")]).unwrap()); + let h_tip = Arc::new(Tipset::new(vec![create_header(3, b"", b"")]).unwrap()); // Test the comparison of tipsets let bucket = SyncBucket::new(vec![l_tip.clone(), h_tip]); @@ -116,13 +116,13 @@ mod tests { #[test] fn sync_bucket_inserts() { let mut set = SyncBucketSet::default(); - let tipset1 = Rc::new(Tipset::new(vec![create_header(1, b"1", b"1")]).unwrap()); + let tipset1 = Arc::new(Tipset::new(vec![create_header(1, b"1", b"1")]).unwrap()); set.insert(tipset1.clone()); assert_eq!(set.buckets.len(), 1); assert_eq!(set.buckets[0].tips.len(), 1); // Assert a tipset on non relating chain is put in another bucket - let tipset2 = Rc::new(Tipset::new(vec![create_header(2, b"2", b"2")]).unwrap()); + let tipset2 = Arc::new(Tipset::new(vec![create_header(2, b"2", b"2")]).unwrap()); set.insert(tipset2); assert_eq!( set.buckets.len(), @@ -132,7 +132,7 @@ mod tests { assert_eq!(set.buckets[1].tips.len(), 1); // Assert a tipset connected to the first - let tipset3 = Rc::new(Tipset::new(vec![create_header(3, b"1", b"1")]).unwrap()); + let tipset3 = Arc::new(Tipset::new(vec![create_header(3, b"1", b"1")]).unwrap()); assert_eq!(tipset1.key(), tipset3.key()); set.insert(tipset3); assert_eq!( diff --git a/blockchain/chain_sync/src/lib.rs b/blockchain/chain_sync/src/lib.rs index 37aefd026bcd..ed4589efef4a 100644 --- a/blockchain/chain_sync/src/lib.rs +++ b/blockchain/chain_sync/src/lib.rs @@ -4,8 +4,10 @@ mod bucket; mod errors; mod manager; +mod network_context; mod sync; pub use self::errors::Error; pub use self::manager::SyncManager; +pub use network_context::*; pub use sync::*; diff --git a/blockchain/chain_sync/src/manager.rs b/blockchain/chain_sync/src/manager.rs index 8364dab9dc85..a5ad21469802 100644 --- a/blockchain/chain_sync/src/manager.rs +++ b/blockchain/chain_sync/src/manager.rs @@ -4,7 +4,7 @@ use super::bucket::SyncBucketSet; use blocks::Tipset; use libp2p::core::PeerId; -use std::rc::Rc; +use std::sync::Arc; #[derive(Default)] pub struct SyncManager { @@ -12,11 +12,11 @@ pub struct SyncManager { } impl SyncManager { - pub fn schedule_tipset(&mut self, tipset: Rc) { + pub fn schedule_tipset(&mut self, tipset: Arc) { // TODO implement interactions for syncing state when SyncManager built out self.sync_queue.insert(tipset); } - pub fn select_sync_target(&self) -> Option> { + pub fn select_sync_target(&self) -> Option> { self.sync_queue.heaviest() } pub fn set_peer_head(&self, _peer: PeerId, _ts: Tipset) { diff --git a/blockchain/chain_sync/src/network_context.rs b/blockchain/chain_sync/src/network_context.rs new file mode 100644 index 000000000000..7d420dddd56d --- /dev/null +++ b/blockchain/chain_sync/src/network_context.rs @@ -0,0 +1,89 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use async_std::sync::Sender; +use blocks::TipSetKeys; +use forest_libp2p::{ + rpc::{BlockSyncRequest, RPCEvent, RPCRequest, RequestId}, + NetworkMessage, +}; +use libp2p::core::PeerId; +use log::trace; + +/// Context used in chain sync to handle sequential +pub struct SyncNetworkContext { + /// Channel to send network messages through p2p service + network_send: Sender, + + /// Handles sequential request ID enumeration for requests + request_id: RequestId, +} + +// #[async_trait] +// impl BlockSyncProvider for SyncNetworkContext { +// async fn get_headers(&mut self, _tsk: &TipSetKeys, _count: u64) -> Result, String> { +// self.blocksync_request(peer_id: PeerId, request: Message) +// Ok(vec![]) +// } +// } + +impl SyncNetworkContext { + pub fn new(network_send: Sender) -> Self { + Self { + network_send, + request_id: 0, + } + } + + pub async fn blocksync_headers( + &mut self, + peer_id: PeerId, + tsk: &TipSetKeys, + count: u64, + ) -> RequestId { + self.blocksync_request( + peer_id, + BlockSyncRequest { + start: tsk.cids().to_vec(), + request_len: count, + options: 1, + }, + ) + .await + } + + /// Send a blocksync request to the network + pub async fn blocksync_request( + &mut self, + peer_id: PeerId, + request: BlockSyncRequest, + ) -> RequestId { + trace!("Sending Blocksync Request {:?}", request); + self.send_rpc_request(peer_id, RPCRequest::Blocksync(request)) + .await + } + + /// Send any RPC request to the network + pub async fn send_rpc_request( + &mut self, + peer_id: PeerId, + rpc_request: RPCRequest, + ) -> RequestId { + let request_id = self.request_id; + self.request_id += 1; + self.send_rpc_event(peer_id, RPCEvent::Request(request_id, rpc_request)) + .await; + request_id + } + + /// Handles sending the base event to the network service + async fn send_rpc_event(&mut self, peer_id: PeerId, event: RPCEvent) { + self.network_send + .send(NetworkMessage::RPC { + peer_id, + // TODO probably change event name to event from request + request: event, + }) + .await + } +} diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index 8bc1dfffa3e2..e577512affbc 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -1,43 +1,92 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use super::errors::Error; -use super::manager::SyncManager; +use super::{Error, SyncManager, SyncNetworkContext}; use address::Address; use amt::AMT; -use blocks::{Block, FullTipset, TipSetKeys, Tipset, TxMeta}; -use chain::{BlockSyncProvider, ChainStore}; +use async_std::prelude::*; +use async_std::stream::Stream; +use async_std::sync::{Receiver, Sender}; +use blocks::{Block, BlockHeader, FullTipset, TipSetKeys, Tipset, TxMeta}; +use chain::ChainStore; use cid::Cid; use crypto::is_valid_signature; use db::Error as DBError; use encoding::{Cbor, Error as EncodingError}; +use forest_libp2p::{NetworkEvent, NetworkMessage}; +use futures::{select, FutureExt}; use ipld_blockstore::BlockStore; use libp2p::core::PeerId; use log::info; use message::Message; use num_bigint::BigUint; +use pin_project::pin_project; use state_manager::StateManager; use state_tree::{HamtStateTree, StateTree}; use std::cmp::min; use std::collections::HashMap; -use std::rc::Rc; +use std::future::Future; +use std::sync::Arc; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +#[derive(PartialEq, Debug, Clone)] +/// Current state of the ChainSyncer +enum SyncState { + /// No useful peers, bootstrapping network to be able to make BlockSync requests + Stalled, + + /// Syncing to checkpoint (using BlockSync for now) + _SyncCheckpoint, + + /// Receive new blocks from the network and sync toward heaviest tipset + _ChainCatchup, + + /// Once all blocks are validated to the heaviest chain, follow network + /// by receiving blocks over the network and validating them + _Follow, +} +#[pin_project] pub struct ChainSyncer<'db, DB, ST> { - // TODO add ability to send msg to all subscribers indicating incoming blocks - // TODO add block sync + /// Syncing state of chain sync + _state: SyncState, + /// manages retrieving and updates state objects state_manager: StateManager<'db, DB, ST>, - // manages sync buckets + + /// manages sync buckets sync_manager: SyncManager, - // access and store tipsets / blocks / messages + /// access and store tipsets / blocks / messages chain_store: ChainStore<'db, DB>, - /// Provider for BlockSync service - block_sync: BS, + // /// Provider for BlockSync service + // block_sync: BS, + /// Context to be able to send requests to p2p network + network: SyncNetworkContext, - // the known genesis tipset + /// the known genesis tipset _genesis: Tipset, + + /// Channel for incoming network events to be handled by syncer + #[pin] + network_rx: Receiver, +} + +// TODO probably remove this in the future, polling as such probably unnecessary +impl<'db, DB, ST> Future for ChainSyncer<'db, DB, ST> { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.project().network_rx.poll_next(cx) { + Poll::Ready(Some(event)) => info!("chain syncer received event: {:?}", event), + Poll::Pending | Poll::Ready(None) => (), + }; + Poll::Pending + } } /// Message data used to ensure valid state transition @@ -46,29 +95,58 @@ struct MsgMetaData { sequence: u64, } -impl<'db, DB, BS> ChainSyncer<'db, DB, BS> +impl<'db, DB> ChainSyncer<'db, DB, HamtStateTree> where DB: BlockStore, - BS: BlockSyncProvider, + // BS: BlockSyncProvider, { - pub fn new(chain_store: ChainStore<'db, DB>, block_sync: BS) -> Self { + pub fn new( + db: &'db DB, + network_send: Sender, + network_rx: Receiver, + ) -> Result { // TODO import genesis from storage - let _genesis = Tipset::default(); + let _genesis = Tipset::new(vec![BlockHeader::default()])?; // TODO change from being default when impl let sync_manager = SyncManager::default(); - Self { + let chain_store = ChainStore::new(db); + + let state_manager = StateManager::new(db, HamtStateTree::default()); + + let network = SyncNetworkContext::new(network_send); + + Ok(Self { + _state: SyncState::Stalled, + state_manager, chain_store, - block_sync, + network, _genesis, sync_manager, + network_rx, + }) + } + + pub async fn poll_tmp(&mut self) { + let mut nw = self.network_rx.clone().fuse(); + loop { + select! { + network_msg = nw.next().fuse() => match network_msg { + // TODO remove this comment (testing use) + // Some(NetworkEvent::PubsubMessage{source, ..}) => { + // let req_id = self.network.blocksync_headers("QmUbiXETSptmbSYNtvCWEBYqXDKGwiBCCFWgJn1ebGiFiV".parse().unwrap(), self._genesis.key(), 10).await; + // }, + Some(event) => println!("received some other event: {:?}", event), + None => break, + } + } } } /// Starts syncing process - pub async fn sync(&mut self, head: Rc) -> Result<(), Error> { - info!("Starting syncing process"); + pub async fn sync(mut self, head: Tipset) -> Result<(), Error> { + info!("Starting chain sync"); // Get heaviest tipset from storage to sync toward let heaviest = self.chain_store.heaviest_tipset(); @@ -81,6 +159,7 @@ where Ok(()) } + /// informs the syncer about a new potential tipset /// This should be called when connecting to new peers, and additionally /// when receiving new blocks from the network @@ -224,7 +303,7 @@ where msg_meta_data.insert(msg.from().clone(), updated_state); Ok(()) } - let mut msg_meta_data: HashMap = HashMap::new(); + let mut msg_meta_data: HashMap = HashMap::default(); // TODO retrieve tipset state and load state tree // temporary let tree = HamtStateTree::default(); @@ -288,16 +367,21 @@ where } /// Syncs chain data and persists it to blockstore + // TODO function signature surely won't stay this way async fn sync_headers_reverse( &mut self, - head: Rc, - to: Rc, - ) -> Result>, Error> { + head: Tipset, + to: Arc, + ) -> Result, Error> { info!("Syncing headers from: {:?}", head.key()); - let mut return_set = vec![Rc::clone(&head)]; + let mut return_set = vec![head]; - let to_epoch = to.blocks()[0].epoch(); + let to_epoch = to + .blocks() + .get(0) + .ok_or_else(|| Error::Blockchain("Tipset must not be empty".to_owned()))? + .epoch(); // TODO use accepted_blocks as cache to mark bad blocks on failed sync let mut accepted_blocks: Vec = Vec::new(); @@ -316,20 +400,22 @@ where if let Ok(ts) = self.chain_store.tipset_from_keys(ts.parents()) { // Add blocks in tipset to accepted chain and push the tipset to return set accepted_blocks.extend_from_slice(ts.key().cids()); - return_set.push(Rc::new(ts)); + return_set.push(ts); continue; } const REQUEST_WINDOW: u64 = 500; let epoch_diff = u64::from(ts.epoch() - to_epoch); - let window = min(epoch_diff, REQUEST_WINDOW); + let _window = min(epoch_diff, REQUEST_WINDOW); - // Load blocks from network using blocksync - let tipsets: Vec = self - .block_sync - .get_headers(ts.parents(), window) - .await - .map_err(|e| Error::Other(e))?; + // // Load blocks from network using blocksync + // TODO add sending blocksync req back + // let tipsets: Vec = self + // .network + // .get_headers(ts.parents(), window) + // .await + // .map_err(|e| Error::Other(e))?; + let tipsets: Vec = vec![]; // Loop through each tipset received from network for ts in tipsets { @@ -345,13 +431,13 @@ where accepted_blocks.extend_from_slice(ts.key().cids()); // Add tipset to vector of tipsets to return - return_set.push(Rc::new(ts)); + return_set.push(ts); } } - let last_ts = return_set.last().ok_or(Error::Other( - "Return set should contain a tipset".to_owned(), - ))?; + let last_ts = return_set + .last() + .ok_or_else(|| Error::Other("Return set should contain a tipset".to_owned()))?; // Check if local chain was fork if last_ts.key().cids() != to.key().cids() { @@ -360,23 +446,20 @@ where // This removes need to sync fork return Ok(return_set); } - self.sync_fork(last_ts.clone(), to).await?; + // TODO add fork to return set + let _fork = self.sync_fork(&last_ts, &to).await?; } Ok(return_set) } - async fn sync_fork( - &mut self, - _head: Rc, - _to: Rc, - ) -> Result>, Error> { + async fn sync_fork(&mut self, _head: &Tipset, _to: &Tipset) -> Result>, Error> { // TODO sync fork until tipsets are equal or reaches genesis todo!() } // Persists headers from tipset slice to chain store - fn persist_headers(&self, tipsets: &[Rc]) -> Result<(), DBError> { + fn persist_headers(&self, tipsets: &[Tipset]) -> Result<(), DBError> { tipsets .iter() .try_for_each(|ts| self.chain_store.persist_headers(ts)) diff --git a/blockchain/chain_sync/tests/manager_test.rs b/blockchain/chain_sync/tests/manager_test.rs index 4b7873b37899..4f41a1a833bb 100644 --- a/blockchain/chain_sync/tests/manager_test.rs +++ b/blockchain/chain_sync/tests/manager_test.rs @@ -5,7 +5,7 @@ use blocks::{BlockHeader, Tipset}; use chain_sync::SyncManager; use cid::{multihash::Hash::Blake2b256, Cid}; use num_bigint::BigUint; -use std::rc::Rc; +use std::sync::Arc; fn create_header(weight: u64, parent_bz: &[u8], cached_bytes: &[u8]) -> BlockHeader { let header = BlockHeader::builder() @@ -20,7 +20,7 @@ fn create_header(weight: u64, parent_bz: &[u8], cached_bytes: &[u8]) -> BlockHea #[test] fn schedule_tipset() { let header = create_header(0, b"", b""); - let tipset = Rc::new(Tipset::new(vec![header]).unwrap()); + let tipset = Arc::new(Tipset::new(vec![header]).unwrap()); let mut manager = SyncManager::default(); manager.schedule_tipset(tipset.clone()); { @@ -32,9 +32,9 @@ fn schedule_tipset() { #[test] fn heaviest_different_chain() { - let l_tipset = Rc::new(Tipset::new(vec![create_header(1, b"1", b"1")]).unwrap()); - let m_tipset = Rc::new(Tipset::new(vec![create_header(2, b"2", b"2")]).unwrap()); - let h_tipset = Rc::new(Tipset::new(vec![create_header(3, b"1", b"1")]).unwrap()); + let l_tipset = Arc::new(Tipset::new(vec![create_header(1, b"1", b"1")]).unwrap()); + let m_tipset = Arc::new(Tipset::new(vec![create_header(2, b"2", b"2")]).unwrap()); + let h_tipset = Arc::new(Tipset::new(vec![create_header(3, b"1", b"1")]).unwrap()); let mut manager = SyncManager::default(); manager.schedule_tipset(l_tipset.clone()); manager.schedule_tipset(m_tipset.clone()); diff --git a/forest/Cargo.toml b/forest/Cargo.toml index efe91aa0f001..4f23a4f4244a 100644 --- a/forest/Cargo.toml +++ b/forest/Cargo.toml @@ -17,3 +17,4 @@ async-std = { version = "1.4.0", features = ["attributes"] } serde = { version = "1.0", features = ["derive"] } pretty_env_logger = "0.3" ctrlc = "3.1.4" +chain_sync = { path = "../blockchain/chain_sync" } diff --git a/forest/src/main.rs b/forest/src/main.rs index b26ada310511..d62ed4c1f780 100644 --- a/forest/src/main.rs +++ b/forest/src/main.rs @@ -5,6 +5,8 @@ mod cli; mod logger; use self::cli::cli; use async_std::task; +use chain_sync::ChainSyncer; +use db::RocksDb; use forest_libp2p::{get_keypair, Libp2pService}; use libp2p::identity::{ed25519, Keypair}; use log::{info, trace}; @@ -62,15 +64,31 @@ async fn main() { // Start libp2p service let p2p_service = Libp2pService::new(&config.network, net_keypair); + let network_rx = p2p_service.network_receiver(); + let network_send = p2p_service.network_sender(); + + // Start services let p2p_thread = task::spawn(async { p2p_service.run().await; }); + let sync_thread = task::spawn(async { + // Initialize database + let mut db = RocksDb::new("chain_db"); + db.open().unwrap(); + + let mut chain_syncer = ChainSyncer::new(&db, network_send, network_rx).unwrap(); + chain_syncer.poll_tmp().await; + // chain_syncer + // .sync(Tipset::new(vec![BlockHeader::default()]).unwrap()) + // .await + }); // Block until ctrl-c is hit block_until_sigint(); // Drop threads drop(p2p_thread); + drop(sync_thread); info!("Forest finish shutdown"); } diff --git a/node/forest_libp2p/src/rpc/mod.rs b/node/forest_libp2p/src/rpc/mod.rs index aea0e8041bb4..e55918c7fd39 100644 --- a/node/forest_libp2p/src/rpc/mod.rs +++ b/node/forest_libp2p/src/rpc/mod.rs @@ -15,7 +15,7 @@ pub use handler::*; pub use protocol::*; pub use rpc_message::*; -type RequestId = usize; +pub type RequestId = usize; /// The return type used in the behaviour and the resultant event from the protocols handler. #[derive(Debug, Clone, PartialEq)] diff --git a/node/forest_libp2p/src/service.rs b/node/forest_libp2p/src/service.rs index e75f57a37233..6a2fa50cfe91 100644 --- a/node/forest_libp2p/src/service.rs +++ b/node/forest_libp2p/src/service.rs @@ -140,9 +140,13 @@ impl Libp2pService { info!("RPC event {:?}", event); match event { RPCEvent::Response(req_id, res) => { - info!("response: {:?}", res); + self.network_sender_out.send(NetworkEvent::RPCResponse { + req_id, + response: res, + }).await; } RPCEvent::Request(req_id, req) => { + // TODO implement handling incoming requests // send the response swarm_stream.get_mut().send_rpc(peer_id, RPCEvent::Response(1, RPCResponse::Blocksync(BlockSyncResponse { chain: vec![], @@ -176,7 +180,7 @@ impl Libp2pService { self.network_sender_in.clone() } - /// Returns a `Receiver` to listen to GossipSub messages + /// Returns a `Receiver` to listen to network events pub fn network_receiver(&self) -> Receiver { self.network_receiver_out.clone() } From 7e661fb9f9bd94484c89b790ef238f0e2a4d6d94 Mon Sep 17 00:00:00 2001 From: austinabell Date: Wed, 26 Feb 2020 19:58:03 -0500 Subject: [PATCH 08/21] Clean up changes --- blockchain/chain/Cargo.toml | 2 -- blockchain/chain_sync/Cargo.toml | 5 ----- blockchain/chain_sync/src/network_context.rs | 15 ++------------- blockchain/chain_sync/src/sync.rs | 9 +++++---- blockchain/state_manager/Cargo.toml | 1 - crypto/Cargo.toml | 1 - forest/src/main.rs | 7 ++----- node/forest_libp2p/src/service.rs | 6 +++--- 8 files changed, 12 insertions(+), 34 deletions(-) diff --git a/blockchain/chain/Cargo.toml b/blockchain/chain/Cargo.toml index e1bddd63c051..c3829360f7d1 100644 --- a/blockchain/chain/Cargo.toml +++ b/blockchain/chain/Cargo.toml @@ -5,7 +5,6 @@ authors = ["ChainSafe Systems "] edition = "2018" [dependencies] -forest_libp2p = { path = "../../node/forest_libp2p" } blocks = { package = "forest_blocks", path = "../blocks" } db = { path = "../../node/db" } cid = { package = "forest_cid", path = "../../ipld/cid" } @@ -15,7 +14,6 @@ serde = { version = "1.0", features = ["derive"] } num-bigint = { path = "../../math/bigint", package = "forest_bigint" } message = { package = "forest_message", path = "../../vm/message" } ipld_blockstore = { path = "../../ipld/blockstore" } -async-trait = "0.1.24" [dev-dependencies] address = { package = "forest_address", path = "../../vm/address" } diff --git a/blockchain/chain_sync/Cargo.toml b/blockchain/chain_sync/Cargo.toml index fa544b8bc6f7..900753918946 100644 --- a/blockchain/chain_sync/Cargo.toml +++ b/blockchain/chain_sync/Cargo.toml @@ -15,7 +15,6 @@ cid = { package = "forest_cid", path = "../../ipld/cid" } ipld_blockstore = { path = "../../ipld/blockstore" } chain = { path = "../chain" } message = { package = "forest_message", path = "../../vm/message" } -multihash = "0.9.4" state_tree = { path = "../../vm/state_tree/" } state_manager = { path = "../state_manager/" } num-bigint = { path = "../../math/bigint", package = "forest_bigint" } @@ -24,8 +23,4 @@ log = "0.4.8" async-std = { version = "1.4.0", features = ["unstable"] } forest_libp2p = { path = "../../node/forest_libp2p" } pin-project = "0.4.8" -fnv = "1.0.6" futures = "0.3.2" - -[dev-dependencies] -cid = { package = "forest_cid", path = "../../ipld/cid" } diff --git a/blockchain/chain_sync/src/network_context.rs b/blockchain/chain_sync/src/network_context.rs index 7d420dddd56d..44710a114266 100644 --- a/blockchain/chain_sync/src/network_context.rs +++ b/blockchain/chain_sync/src/network_context.rs @@ -19,14 +19,6 @@ pub struct SyncNetworkContext { request_id: RequestId, } -// #[async_trait] -// impl BlockSyncProvider for SyncNetworkContext { -// async fn get_headers(&mut self, _tsk: &TipSetKeys, _count: u64) -> Result, String> { -// self.blocksync_request(peer_id: PeerId, request: Message) -// Ok(vec![]) -// } -// } - impl SyncNetworkContext { pub fn new(network_send: Sender) -> Self { Self { @@ -35,6 +27,7 @@ impl SyncNetworkContext { } } + /// Send a blocksync request for only block headers (ignore messages) pub async fn blocksync_headers( &mut self, peer_id: PeerId, @@ -79,11 +72,7 @@ impl SyncNetworkContext { /// Handles sending the base event to the network service async fn send_rpc_event(&mut self, peer_id: PeerId, event: RPCEvent) { self.network_send - .send(NetworkMessage::RPC { - peer_id, - // TODO probably change event name to event from request - request: event, - }) + .send(NetworkMessage::RPC { peer_id, event }) .await } } diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index e577512affbc..160d636b10af 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -128,7 +128,8 @@ where }) } - pub async fn poll_tmp(&mut self) { + /// Starts syncing process + pub async fn sync(&mut self) -> Result<(), Error> { let mut nw = self.network_rx.clone().fuse(); loop { select! { @@ -142,15 +143,15 @@ where } } } - } - /// Starts syncing process - pub async fn sync(mut self, head: Tipset) -> Result<(), Error> { info!("Starting chain sync"); // Get heaviest tipset from storage to sync toward let heaviest = self.chain_store.heaviest_tipset(); + // TODO remove this and + let head = Tipset::new(vec![BlockHeader::default()]).unwrap(); + // Sync headers from network from head to heaviest from storage let headers = self.sync_headers_reverse(head, heaviest).await?; diff --git a/blockchain/state_manager/Cargo.toml b/blockchain/state_manager/Cargo.toml index 09a2e87c6559..e686d399e79a 100644 --- a/blockchain/state_manager/Cargo.toml +++ b/blockchain/state_manager/Cargo.toml @@ -7,7 +7,6 @@ edition = "2018" [dependencies] address = { package = "forest_address", path = "../../vm/address/" } actor = { path = "../../vm/actor/" } -chain = { path = "../chain/" } db = { path = "../../node/db/" } encoding = { package = "forest_encoding", path = "../../encoding/" } state_tree = { path = "../../vm/state_tree/" } diff --git a/crypto/Cargo.toml b/crypto/Cargo.toml index 38b2b52d994b..755851fe9230 100644 --- a/crypto/Cargo.toml +++ b/crypto/Cargo.toml @@ -5,7 +5,6 @@ authors = ["ChainSafe Systems "] edition = "2018" [dependencies] -blake2 = "0.8.1" address = { package = "forest_address", path = "../vm/address" } encoding = { package = "forest_encoding", path = "../encoding" } libsecp256k1 = "0.3.4" diff --git a/forest/src/main.rs b/forest/src/main.rs index d62ed4c1f780..13f644857c15 100644 --- a/forest/src/main.rs +++ b/forest/src/main.rs @@ -1,9 +1,9 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use self::cli::cli; mod cli; mod logger; -use self::cli::cli; use async_std::task; use chain_sync::ChainSyncer; use db::RocksDb; @@ -77,10 +77,7 @@ async fn main() { db.open().unwrap(); let mut chain_syncer = ChainSyncer::new(&db, network_send, network_rx).unwrap(); - chain_syncer.poll_tmp().await; - // chain_syncer - // .sync(Tipset::new(vec![BlockHeader::default()]).unwrap()) - // .await + chain_syncer.sync().await.unwrap(); }); // Block until ctrl-c is hit diff --git a/node/forest_libp2p/src/service.rs b/node/forest_libp2p/src/service.rs index 6a2fa50cfe91..2c5c3e8a30cb 100644 --- a/node/forest_libp2p/src/service.rs +++ b/node/forest_libp2p/src/service.rs @@ -45,7 +45,7 @@ pub enum NetworkEvent { #[derive(Clone, Debug)] pub enum NetworkMessage { PubsubMessage { topic: Topic, message: Vec }, - RPC { peer_id: PeerId, request: RPCEvent }, + RPC { peer_id: PeerId, event: RPCEvent }, } /// The Libp2pService listens to events from the Libp2p swarm. pub struct Libp2pService { @@ -165,8 +165,8 @@ impl Libp2pService { NetworkMessage::PubsubMessage{topic, message} => { swarm_stream.get_mut().publish(&topic, message); } - NetworkMessage::RPC{peer_id, request} => { - swarm_stream.get_mut().send_rpc(peer_id, request); + NetworkMessage::RPC{peer_id, event} => { + swarm_stream.get_mut().send_rpc(peer_id, event); } } None => {break;} From 4b00f204567a2b6959d14e6f03ed1de836bd3290 Mon Sep 17 00:00:00 2001 From: austinabell Date: Wed, 26 Feb 2020 21:25:32 -0500 Subject: [PATCH 09/21] Implement conversion of tipset bundle to be cleaned up --- blockchain/blocks/src/tipset.rs | 1 + blockchain/chain_sync/src/sync.rs | 2 +- .../src/rpc/blocksync_message.rs | 63 ++++++++++++++++++- node/forest_libp2p/tests/decode_test.rs | 38 +++++++++++ 4 files changed, 102 insertions(+), 2 deletions(-) create mode 100644 node/forest_libp2p/tests/decode_test.rs diff --git a/blockchain/blocks/src/tipset.rs b/blockchain/blocks/src/tipset.rs index ff5a945690bc..915e733a5588 100644 --- a/blockchain/blocks/src/tipset.rs +++ b/blockchain/blocks/src/tipset.rs @@ -201,6 +201,7 @@ impl Tipset { } /// FullTipSet is an expanded version of the TipSet that contains all the blocks and messages +#[derive(Debug, PartialEq, Clone)] pub struct FullTipset { blocks: Vec, } diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index 160d636b10af..f5d2be2f1e5f 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -410,7 +410,7 @@ where let _window = min(epoch_diff, REQUEST_WINDOW); // // Load blocks from network using blocksync - // TODO add sending blocksync req back + // TODO add sending blocksync req back (requires some channel for data back) // let tipsets: Vec = self // .network // .get_headers(ts.parents(), window) diff --git a/node/forest_libp2p/src/rpc/blocksync_message.rs b/node/forest_libp2p/src/rpc/blocksync_message.rs index d0a0fe1ad7f3..f8d3140e2637 100644 --- a/node/forest_libp2p/src/rpc/blocksync_message.rs +++ b/node/forest_libp2p/src/rpc/blocksync_message.rs @@ -1,13 +1,14 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use forest_blocks::BlockHeader; +use forest_blocks::{Block, BlockHeader, FullTipset}; use forest_cid::Cid; use forest_encoding::{ de::{self, Deserialize, Deserializer}, ser::{self, Serialize, Serializer}, }; use forest_message::{SignedMessage, UnsignedMessage}; +use std::convert::TryFrom; /// The payload that gets sent to another node to request for blocks and messages. It get DagCBOR serialized before sending over the wire. #[derive(Clone, Debug, PartialEq)] @@ -54,6 +55,21 @@ pub struct BlockSyncResponse { pub message: String, } +impl BlockSyncResponse { + pub fn into_result(self) -> Result, String> { + if self.status != 0 { + // TODO implement a better error type than string if needed to be handled differently + return Err(format!("Status {}: {}", self.status, self.message)); + } + + Ok(self + .chain + .into_iter() + .map(FullTipset::try_from) + .collect::>()?) + } +} + impl Serialize for BlockSyncResponse { fn serialize(&self, serializer: S) -> Result where @@ -93,6 +109,51 @@ pub struct TipSetBundle { pub secp_msg_includes: Vec>, } +impl TryFrom for FullTipset { + type Error = &'static str; + + fn try_from(tsb: TipSetBundle) -> Result { + let mut blocks: Vec = Vec::with_capacity(tsb.blocks.len()); + + if tsb.blocks.len() != tsb.bls_msg_includes.len() + || tsb.blocks.len() != tsb.secp_msg_includes.len() + { + return Err("Invalid formed TipSet bundle, lengths of includes does not match blocks"); + } + + for (i, header) in tsb.blocks.into_iter().enumerate() { + let mut bls_messages = Vec::with_capacity(tsb.bls_msg_includes[i].len()); + for (_, idx) in tsb.bls_msg_includes[i].iter().enumerate() { + bls_messages.push( + tsb.bls_msgs + .get(*idx as usize) + .cloned() + .ok_or_else(|| "Invalid bls message index")?, + ); + } + + let mut secp_messages = Vec::with_capacity(tsb.secp_msg_includes[i].len()); + for (_, idx) in tsb.secp_msg_includes[i].iter().enumerate() { + secp_messages.push( + tsb.secp_msgs + .get(*idx as usize) + .cloned() + .ok_or_else(|| "Invalid bls message index")?, + ); + } + + blocks.push(Block { + header, + secp_messages, + bls_messages, + }); + } + + // TODO FullTipset constructor doesn't perform any validation (but probably should?) + Ok(FullTipset::new(blocks)) + } +} + impl ser::Serialize for TipSetBundle { fn serialize(&self, serializer: S) -> Result<::Ok, ::Error> where diff --git a/node/forest_libp2p/tests/decode_test.rs b/node/forest_libp2p/tests/decode_test.rs new file mode 100644 index 000000000000..a10e74cd846e --- /dev/null +++ b/node/forest_libp2p/tests/decode_test.rs @@ -0,0 +1,38 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +#![cfg(test)] + +use forest_blocks::FullTipset; +use forest_libp2p::rpc::{BlockSyncResponse, TipSetBundle}; + +#[test] +fn convert_single_tipset_bundle() { + let bundle = TipSetBundle { + blocks: Vec::new(), + bls_msgs: Vec::new(), + bls_msg_includes: Vec::new(), + secp_msgs: Vec::new(), + secp_msg_includes: Vec::new(), + }; + + let res = BlockSyncResponse { + chain: vec![bundle], + status: 0, + message: "".into(), + } + .into_result() + .unwrap(); + + assert_eq!(res, [FullTipset::new(vec![])]); +} + +#[test] +fn convert_example_tipset_bundle() { + // TODO +} + +#[test] +fn convert_actual_tipset_bundle() { + // TODO +} From b9818fe8db5132fd5ec9846e725e696c6ae2e962 Mon Sep 17 00:00:00 2001 From: austinabell Date: Thu, 27 Feb 2020 09:01:50 -0500 Subject: [PATCH 10/21] Add bad blocks cache --- blockchain/chain_sync/Cargo.toml | 1 + blockchain/chain_sync/src/sync.rs | 61 +++++++++++++++++++++---------- 2 files changed, 42 insertions(+), 20 deletions(-) diff --git a/blockchain/chain_sync/Cargo.toml b/blockchain/chain_sync/Cargo.toml index 900753918946..7690091427eb 100644 --- a/blockchain/chain_sync/Cargo.toml +++ b/blockchain/chain_sync/Cargo.toml @@ -24,3 +24,4 @@ async-std = { version = "1.4.0", features = ["unstable"] } forest_libp2p = { path = "../../node/forest_libp2p" } pin-project = "0.4.8" futures = "0.3.2" +lru = "0.4.3" diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index f5d2be2f1e5f..7efd419f4746 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -18,6 +18,7 @@ use futures::{select, FutureExt}; use ipld_blockstore::BlockStore; use libp2p::core::PeerId; use log::info; +use lru::LruCache; use message::Message; use num_bigint::BigUint; use pin_project::pin_project; @@ -63,14 +64,16 @@ pub struct ChainSyncer<'db, DB, ST> { /// access and store tipsets / blocks / messages chain_store: ChainStore<'db, DB>, - // /// Provider for BlockSync service - // block_sync: BS, /// Context to be able to send requests to p2p network network: SyncNetworkContext, /// the known genesis tipset _genesis: Tipset, + /// Bad blocks cache, updates based on invalid state transitions. + /// Will mark any invalid blocks and all childen as bad in this bounded cache + bad_blocks: LruCache, + /// Channel for incoming network events to be handled by syncer #[pin] network_rx: Receiver, @@ -125,6 +128,7 @@ where _genesis, sync_manager, network_rx, + bad_blocks: LruCache::new(1 << 15), }) } @@ -149,11 +153,11 @@ where // Get heaviest tipset from storage to sync toward let heaviest = self.chain_store.heaviest_tipset(); - // TODO remove this and + // TODO remove this and retrieve head from storage let head = Tipset::new(vec![BlockHeader::default()]).unwrap(); // Sync headers from network from head to heaviest from storage - let headers = self.sync_headers_reverse(head, heaviest).await?; + let headers = self.sync_headers_reverse(head, &heaviest).await?; // Persist header chain pulled from network self.persist_headers(&headers)?; @@ -368,14 +372,15 @@ where } /// Syncs chain data and persists it to blockstore - // TODO function signature surely won't stay this way async fn sync_headers_reverse( &mut self, head: Tipset, - to: Arc, + to: &Tipset, ) -> Result, Error> { info!("Syncing headers from: {:?}", head.key()); + let mut accepted_blocks: Vec = Vec::new(); + let mut return_set = vec![head]; let to_epoch = to @@ -384,29 +389,26 @@ where .ok_or_else(|| Error::Blockchain("Tipset must not be empty".to_owned()))? .epoch(); - // TODO use accepted_blocks as cache to mark bad blocks on failed sync - let mut accepted_blocks: Vec = Vec::new(); - // Loop until most recent tipset height is less than to tipset height - 'sync: while let Some(ts) = return_set.last() { - if ts.epoch() < to_epoch { + 'sync: while let Some(cur_ts) = return_set.last() { + // Check if parent cids exist in bad block cache + self.validate_tipset_against_cache(cur_ts.parents(), &accepted_blocks)?; + + if cur_ts.epoch() < to_epoch { // Current tipset is less than epoch of tipset syncing toward break; } - // Check parent cids - // TODO check if cids exist in rejected blocks cache when implemented - // Try to load parent tipset from local storage - if let Ok(ts) = self.chain_store.tipset_from_keys(ts.parents()) { + if let Ok(ts) = self.chain_store.tipset_from_keys(cur_ts.parents()) { // Add blocks in tipset to accepted chain and push the tipset to return set accepted_blocks.extend_from_slice(ts.key().cids()); return_set.push(ts); continue; } - const REQUEST_WINDOW: u64 = 500; - let epoch_diff = u64::from(ts.epoch() - to_epoch); + const REQUEST_WINDOW: u64 = 100; + let epoch_diff = u64::from(cur_ts.epoch() - to_epoch); let _window = min(epoch_diff, REQUEST_WINDOW); // // Load blocks from network using blocksync @@ -426,9 +428,7 @@ where break 'sync; } // Check Cids of blocks against bad block cache - for _ in ts.key().cids() { - // TODO - } + self.validate_tipset_against_cache(&ts.key(), &accepted_blocks)?; accepted_blocks.extend_from_slice(ts.key().cids()); // Add tipset to vector of tipsets to return @@ -454,6 +454,27 @@ where Ok(return_set) } + fn validate_tipset_against_cache( + &mut self, + ts: &TipSetKeys, + accepted_blocks: &[Cid], + ) -> Result<(), Error> { + for cid in ts.cids() { + if let Some(reason) = self.bad_blocks.get(cid).cloned() { + for bh in accepted_blocks { + self.bad_blocks + .put(bh.clone(), format!("chain contained {}", cid)); + } + + return Err(Error::Other(format!( + "Chain contained block marked as bad: {}, {}", + cid, reason + ))); + } + } + Ok(()) + } + async fn sync_fork(&mut self, _head: &Tipset, _to: &Tipset) -> Result>, Error> { // TODO sync fork until tipsets are equal or reaches genesis todo!() From e731d52572b319241701ac69e4441cffd9707c54 Mon Sep 17 00:00:00 2001 From: austinabell Date: Thu, 27 Feb 2020 09:05:37 -0500 Subject: [PATCH 11/21] function to allow getting cids from TipSet --- blockchain/blocks/src/tipset.rs | 4 ++++ blockchain/chain_sync/src/sync.rs | 6 +++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/blockchain/blocks/src/tipset.rs b/blockchain/blocks/src/tipset.rs index 915e733a5588..0523256512cc 100644 --- a/blockchain/blocks/src/tipset.rs +++ b/blockchain/blocks/src/tipset.rs @@ -186,6 +186,10 @@ impl Tipset { pub fn key(&self) -> &TipSetKeys { &self.key } + /// Returns slice of Cids for the current tipset + pub fn cids(&self) -> &[Cid] { + &self.key.cids() + } /// Returns the CIDs of the parents of the blocks in the tipset pub fn parents(&self) -> &TipSetKeys { &self.blocks[0].parents() diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index 7efd419f4746..511d65eab0b1 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -402,7 +402,7 @@ where // Try to load parent tipset from local storage if let Ok(ts) = self.chain_store.tipset_from_keys(cur_ts.parents()) { // Add blocks in tipset to accepted chain and push the tipset to return set - accepted_blocks.extend_from_slice(ts.key().cids()); + accepted_blocks.extend_from_slice(ts.cids()); return_set.push(ts); continue; } @@ -430,7 +430,7 @@ where // Check Cids of blocks against bad block cache self.validate_tipset_against_cache(&ts.key(), &accepted_blocks)?; - accepted_blocks.extend_from_slice(ts.key().cids()); + accepted_blocks.extend_from_slice(ts.cids()); // Add tipset to vector of tipsets to return return_set.push(ts); } @@ -441,7 +441,7 @@ where .ok_or_else(|| Error::Other("Return set should contain a tipset".to_owned()))?; // Check if local chain was fork - if last_ts.key().cids() != to.key().cids() { + if last_ts.key() != to.key() { if last_ts.parents() == to.parents() { // block received part of same tipset as best block // This removes need to sync fork From c5beef8cd19dfa85ce021ddcf850695eadf8c10a Mon Sep 17 00:00:00 2001 From: austinabell Date: Thu, 27 Feb 2020 10:08:24 -0500 Subject: [PATCH 12/21] Add tests --- blockchain/chain_sync/tests/syncer_test.rs | 17 ++++ node/forest_libp2p/Cargo.toml | 5 +- node/forest_libp2p/tests/decode_test.rs | 96 ++++++++++++++++++++-- node/forest_libp2p/tests/rpc_test.rs | 2 - 4 files changed, 108 insertions(+), 12 deletions(-) create mode 100644 blockchain/chain_sync/tests/syncer_test.rs diff --git a/blockchain/chain_sync/tests/syncer_test.rs b/blockchain/chain_sync/tests/syncer_test.rs new file mode 100644 index 000000000000..847a5d9c1886 --- /dev/null +++ b/blockchain/chain_sync/tests/syncer_test.rs @@ -0,0 +1,17 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use async_std::sync::channel; +use chain_sync::ChainSyncer; +use db::MemoryDB; + +#[test] +fn chainsync_constructor() { + let db = MemoryDB::default(); + let (local_sender, _test_receiver) = channel(20); + let (_event_sender, event_receiver) = channel(20); + + // Test just makes sure that the chain syncer can be created without using a live database or + // p2p network (local channels to simulate network messages and responses) + let _chain_syncer = ChainSyncer::new(&db, local_sender, event_receiver).unwrap(); +} diff --git a/node/forest_libp2p/Cargo.toml b/node/forest_libp2p/Cargo.toml index f6245a5c564f..ae97075992fe 100644 --- a/node/forest_libp2p/Cargo.toml +++ b/node/forest_libp2p/Cargo.toml @@ -22,5 +22,8 @@ fnv = "1.0.6" smallvec = "1.1.0" [dev-dependencies] +forest_address = { path = "../../vm/address" } slog-async = "2.3.0" -slog-term = "2.4.2" \ No newline at end of file +slog-term = "2.4.2" +num-bigint = { path = "../../math/bigint", package = "forest_bigint" } +crypto = { path = "../../crypto" } diff --git a/node/forest_libp2p/tests/decode_test.rs b/node/forest_libp2p/tests/decode_test.rs index a10e74cd846e..a6bb677cdb01 100644 --- a/node/forest_libp2p/tests/decode_test.rs +++ b/node/forest_libp2p/tests/decode_test.rs @@ -1,10 +1,24 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -#![cfg(test)] - -use forest_blocks::FullTipset; +use crypto::{Signature, Signer}; +use forest_address::Address; +use forest_blocks::{Block, BlockHeader, FullTipset}; use forest_libp2p::rpc::{BlockSyncResponse, TipSetBundle}; +use forest_message::{SignedMessage, UnsignedMessage}; +use num_bigint::BigUint; +use std::convert::TryFrom; +use std::error::Error; + +const DUMMY_SIG: [u8; 1] = [0u8]; + +/// Test struct to generate one byte signature for testing +struct DummySigner; +impl Signer for DummySigner { + fn sign_bytes(&self, _: Vec, _: &Address) -> Result> { + Ok(Signature::new_secp256k1(DUMMY_SIG.to_vec())) + } +} #[test] fn convert_single_tipset_bundle() { @@ -28,11 +42,75 @@ fn convert_single_tipset_bundle() { } #[test] -fn convert_example_tipset_bundle() { - // TODO -} +fn tipset_bundle_to_full_tipset() { + let h0 = BlockHeader::builder() + .weight(BigUint::from(1 as u32)) + .build() + .unwrap(); + let h1 = BlockHeader::builder() + .weight(BigUint::from(2 as u32)) + .build() + .unwrap(); + let ua = UnsignedMessage::builder() + .to(Address::new_id(0).unwrap()) + .from(Address::new_id(0).unwrap()) + .build() + .unwrap(); + let ub = UnsignedMessage::builder() + .to(Address::new_id(1).unwrap()) + .from(Address::new_id(1).unwrap()) + .build() + .unwrap(); + let uc = UnsignedMessage::builder() + .to(Address::new_id(2).unwrap()) + .from(Address::new_id(2).unwrap()) + .build() + .unwrap(); + let ud = UnsignedMessage::builder() + .to(Address::new_id(3).unwrap()) + .from(Address::new_id(3).unwrap()) + .build() + .unwrap(); + let sa = SignedMessage::new(&ua, &DummySigner).unwrap(); + let sb = SignedMessage::new(&ua, &DummySigner).unwrap(); + let sc = SignedMessage::new(&ua, &DummySigner).unwrap(); + let sd = SignedMessage::new(&ua, &DummySigner).unwrap(); -#[test] -fn convert_actual_tipset_bundle() { - // TODO + let b0 = Block { + header: h0.clone(), + secp_messages: vec![sa.clone(), sb.clone(), sd.clone()], + bls_messages: vec![ua.clone(), ub.clone()], + }; + let b1 = Block { + header: h1.clone(), + secp_messages: vec![sb.clone(), sc.clone(), sa.clone()], + bls_messages: vec![uc.clone(), ud.clone()], + }; + + let mut tsb = TipSetBundle { + blocks: vec![h0, h1], + secp_msgs: vec![sa, sb, sc, sd], + secp_msg_includes: vec![vec![0, 1, 3], vec![1, 2, 0]], + bls_msgs: vec![ua, ub, uc, ud], + bls_msg_includes: vec![vec![0, 1], vec![2, 3]], + }; + + assert_eq!( + FullTipset::try_from(tsb.clone()).unwrap(), + FullTipset::new(vec![b0, b1]) + ); + + // Invalidate tipset bundle by having invalid index + tsb.secp_msg_includes = vec![vec![0, 4], vec![0]]; + assert!( + FullTipset::try_from(tsb.clone()).is_err(), + "Invalid index should return error" + ); + + // Invalidate tipset bundle by not having includes same length as number of blocks + tsb.secp_msg_includes = vec![vec![0]]; + assert!( + FullTipset::try_from(tsb.clone()).is_err(), + "Invalid includes index vector should return error" + ); } diff --git a/node/forest_libp2p/tests/rpc_test.rs b/node/forest_libp2p/tests/rpc_test.rs index 0877dbfb14ce..a78e3000f5d0 100644 --- a/node/forest_libp2p/tests/rpc_test.rs +++ b/node/forest_libp2p/tests/rpc_test.rs @@ -1,8 +1,6 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -#![cfg(test)] - use async_std::task; use forest_libp2p::rpc::{ BlockSyncRequest, BlockSyncResponse, RPCEvent, RPCMessage, RPCRequest, RPCResponse, RPC, From ae689257695dabd083db0f91d562d5961bd5470d Mon Sep 17 00:00:00 2001 From: austinabell Date: Thu, 27 Feb 2020 10:32:34 -0500 Subject: [PATCH 13/21] replace duplicate logic with generic function --- .../src/rpc/blocksync_message.rs | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/node/forest_libp2p/src/rpc/blocksync_message.rs b/node/forest_libp2p/src/rpc/blocksync_message.rs index f8d3140e2637..ce62c4409b4f 100644 --- a/node/forest_libp2p/src/rpc/blocksync_message.rs +++ b/node/forest_libp2p/src/rpc/blocksync_message.rs @@ -121,26 +121,25 @@ impl TryFrom for FullTipset { return Err("Invalid formed TipSet bundle, lengths of includes does not match blocks"); } - for (i, header) in tsb.blocks.into_iter().enumerate() { - let mut bls_messages = Vec::with_capacity(tsb.bls_msg_includes[i].len()); - for (_, idx) in tsb.bls_msg_includes[i].iter().enumerate() { - bls_messages.push( - tsb.bls_msgs + fn values_from_indexes( + indexes: &[u64], + values: &[T], + ) -> Result, &'static str> { + let mut msgs = Vec::with_capacity(indexes.len()); + for idx in indexes.iter() { + msgs.push( + values .get(*idx as usize) .cloned() - .ok_or_else(|| "Invalid bls message index")?, + .ok_or_else(|| "Invalid message index")?, ); } + Ok(msgs) + } - let mut secp_messages = Vec::with_capacity(tsb.secp_msg_includes[i].len()); - for (_, idx) in tsb.secp_msg_includes[i].iter().enumerate() { - secp_messages.push( - tsb.secp_msgs - .get(*idx as usize) - .cloned() - .ok_or_else(|| "Invalid bls message index")?, - ); - } + for (i, header) in tsb.blocks.into_iter().enumerate() { + let bls_messages = values_from_indexes(&tsb.bls_msg_includes[i], &tsb.bls_msgs)?; + let secp_messages = values_from_indexes(&tsb.secp_msg_includes[i], &tsb.secp_msgs)?; blocks.push(Block { header, From c52daafba5b113d4a50a13e9331af8b5573144e9 Mon Sep 17 00:00:00 2001 From: austinabell Date: Thu, 27 Feb 2020 11:59:38 -0500 Subject: [PATCH 14/21] remove comments and update a fn signature --- blockchain/chain_sync/src/manager.rs | 2 +- blockchain/chain_sync/src/sync.rs | 7 +------ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/blockchain/chain_sync/src/manager.rs b/blockchain/chain_sync/src/manager.rs index a5ad21469802..335eddaa972a 100644 --- a/blockchain/chain_sync/src/manager.rs +++ b/blockchain/chain_sync/src/manager.rs @@ -19,7 +19,7 @@ impl SyncManager { pub fn select_sync_target(&self) -> Option> { self.sync_queue.heaviest() } - pub fn set_peer_head(&self, _peer: PeerId, _ts: Tipset) { + pub fn set_peer_head(&self, _peer: &PeerId, _ts: Tipset) { // TODO todo!() } diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index 511d65eab0b1..21e8e21c4873 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -101,7 +101,6 @@ struct MsgMetaData { impl<'db, DB> ChainSyncer<'db, DB, HamtStateTree> where DB: BlockStore, - // BS: BlockSyncProvider, { pub fn new( db: &'db DB, @@ -138,10 +137,6 @@ where loop { select! { network_msg = nw.next().fuse() => match network_msg { - // TODO remove this comment (testing use) - // Some(NetworkEvent::PubsubMessage{source, ..}) => { - // let req_id = self.network.blocksync_headers("QmUbiXETSptmbSYNtvCWEBYqXDKGwiBCCFWgJn1ebGiFiV".parse().unwrap(), self._genesis.key(), 10).await; - // }, Some(event) => println!("received some other event: {:?}", event), None => break, } @@ -168,7 +163,7 @@ where /// informs the syncer about a new potential tipset /// This should be called when connecting to new peers, and additionally /// when receiving new blocks from the network - pub fn inform_new_head(&self, from: PeerId, fts: FullTipset) -> Result<(), Error> { + pub fn inform_new_head(&self, from: &PeerId, fts: &FullTipset) -> Result<(), Error> { // check if full block is nil and if so return error if fts.blocks().is_empty() { return Err(Error::NoBlocks); From 6279152de9c9eaa98b11716b92fe4f6925e03b29 Mon Sep 17 00:00:00 2001 From: austinabell Date: Thu, 27 Feb 2020 13:31:33 -0500 Subject: [PATCH 15/21] Update genesis storage, fix address deserialization edge case --- blockchain/chain/src/store/chain_store.rs | 30 ++++++++++++++++++++--- blockchain/chain_sync/src/sync.rs | 2 +- node/clock/src/lib.rs | 6 +++++ vm/address/src/lib.rs | 8 ++++-- 4 files changed, 39 insertions(+), 7 deletions(-) diff --git a/blockchain/chain/src/store/chain_store.rs b/blockchain/chain/src/store/chain_store.rs index bf0563ed84f3..8c193d26dc21 100644 --- a/blockchain/chain/src/store/chain_store.rs +++ b/blockchain/chain/src/store/chain_store.rs @@ -11,6 +11,8 @@ use message::{SignedMessage, UnsignedMessage}; use num_bigint::BigUint; use std::sync::Arc; +const GENESIS_KEY: &'static str = "gen_block"; + /// Generic implementation of the datastore trait and structures pub struct ChainStore<'db, DB> { // TODO add IPLD Store @@ -61,6 +63,7 @@ where /// Writes genesis to blockstore pub fn set_genesis(&self, header: BlockHeader) -> Result<(), DbError> { + self.db.write(GENESIS_KEY, header.marshal_cbor()?)?; let ts: Tipset = Tipset::new(vec![header])?; Ok(self.persist_headers(&ts)?) } @@ -94,13 +97,12 @@ where /// Returns genesis blockheader from blockstore pub fn genesis(&self) -> Result { - // TODO change data store for pulling genesis - let bz = self.db.read("gen_block")?; + let bz = self.db.read(GENESIS_KEY)?; match bz { None => Err(Error::UndefinedKey( "Genesis key does not exist".to_string(), )), - Some(ref x) => from_slice(&x)?, + Some(x) => BlockHeader::unmarshal_cbor(&x).map_err(Error::from), } } @@ -121,7 +123,7 @@ where let raw_header = self.db.read(c.key())?; if let Some(x) = raw_header { // decode raw header into BlockHeader - let bh = from_slice(&x)?; + let bh = BlockHeader::unmarshal_cbor(&x)?; block_headers.push(bh); } else { return Err(Error::KeyValueStore( @@ -163,3 +165,23 @@ where .collect() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn genesis_test() { + let db = db::MemoryDB::default(); + + let cs = ChainStore::new(&db); + let gen_block = BlockHeader::builder() + .epoch(1.into()) + .weight((2 as u32).into()) + .build_and_validate() + .unwrap(); + + cs.set_genesis(gen_block.clone()).unwrap(); + assert_eq!(cs.genesis().unwrap(), gen_block); + } +} diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index 21e8e21c4873..4d4c8ba7e116 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -137,7 +137,7 @@ where loop { select! { network_msg = nw.next().fuse() => match network_msg { - Some(event) => println!("received some other event: {:?}", event), + Some(event) => info!("received some other event: {:?}", event), None => break, } } diff --git a/node/clock/src/lib.rs b/node/clock/src/lib.rs index ba4c185e2e88..1e0a1131d9a4 100644 --- a/node/clock/src/lib.rs +++ b/node/clock/src/lib.rs @@ -16,6 +16,12 @@ const EPOCH_DURATION: i32 = 15; /// An epoch represents a single valid state in the blockchain pub struct ChainEpoch(u64); +impl From for ChainEpoch { + fn from(num: u64) -> ChainEpoch { + ChainEpoch(num) + } +} + impl ser::Serialize for ChainEpoch { fn serialize(&self, serializer: S) -> Result where diff --git a/vm/address/src/lib.rs b/vm/address/src/lib.rs index dd86765f842f..7edba229fb0e 100644 --- a/vm/address/src/lib.rs +++ b/vm/address/src/lib.rs @@ -218,10 +218,14 @@ impl<'de> de::Deserialize<'de> for Address { D: de::Deserializer<'de>, { let mut bz: Vec = serde_bytes::Deserialize::deserialize(deserializer)?; + if bz.is_empty() { + return Err(de::Error::custom("Cannot deserialize empty bytes")); + } // Remove protocol byte - let protocol = Protocol::from_byte(bz.remove(0)) + let protocol_byte = bz.remove(0); + let protocol = Protocol::from_byte(protocol_byte) .ok_or(EncodingError::Unmarshalling { - description: format!("Invalid protocol byte: {}", bz[0]), + description: format!("Invalid protocol byte: {}", protocol_byte), protocol: CodecProtocol::Cbor, }) .map_err(de::Error::custom)?; From 615ebd390bd5d18f30d27bc1231f1a86d09e8cf6 Mon Sep 17 00:00:00 2001 From: austinabell Date: Thu, 27 Feb 2020 13:42:55 -0500 Subject: [PATCH 16/21] Change function signature and fix lint --- blockchain/chain/src/store/chain_store.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/blockchain/chain/src/store/chain_store.rs b/blockchain/chain/src/store/chain_store.rs index 8c193d26dc21..89808f39c829 100644 --- a/blockchain/chain/src/store/chain_store.rs +++ b/blockchain/chain/src/store/chain_store.rs @@ -11,7 +11,7 @@ use message::{SignedMessage, UnsignedMessage}; use num_bigint::BigUint; use std::sync::Arc; -const GENESIS_KEY: &'static str = "gen_block"; +const GENESIS_KEY: &str = "gen_block"; /// Generic implementation of the datastore trait and structures pub struct ChainStore<'db, DB> { @@ -96,14 +96,11 @@ where } /// Returns genesis blockheader from blockstore - pub fn genesis(&self) -> Result { - let bz = self.db.read(GENESIS_KEY)?; - match bz { - None => Err(Error::UndefinedKey( - "Genesis key does not exist".to_string(), - )), - Some(x) => BlockHeader::unmarshal_cbor(&x).map_err(Error::from), - } + pub fn genesis(&self) -> Result, Error> { + Ok(match self.db.read(GENESIS_KEY)? { + Some(bz) => Some(BlockHeader::unmarshal_cbor(&bz)?), + None => None, + }) } /// Returns heaviest tipset from blockstore @@ -181,7 +178,8 @@ mod tests { .build_and_validate() .unwrap(); + assert_eq!(cs.genesis().unwrap(), None); cs.set_genesis(gen_block.clone()).unwrap(); - assert_eq!(cs.genesis().unwrap(), gen_block); + assert_eq!(cs.genesis().unwrap(), Some(gen_block)); } } From 9aad4ebc7567722b1d97fb12b26568779cd3dc42 Mon Sep 17 00:00:00 2001 From: austinabell Date: Thu, 27 Feb 2020 13:54:49 -0500 Subject: [PATCH 17/21] Remove resolved TODO --- blockchain/chain_sync/src/sync.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index 4d4c8ba7e116..a9d720fb12a5 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -172,8 +172,6 @@ where for block in fts.blocks() { self.validate_msg_data(block)?; } - // TODO send pubsub message indicating incoming blocks - // TODO Add peer to blocksync // compare target_weight to heaviest weight stored; ignore otherwise let heaviest_tipset = self.chain_store.heaviest_tipset(); From a95b64021f5f2f3514088f99f990a8e12d867ea9 Mon Sep 17 00:00:00 2001 From: austinabell Date: Thu, 27 Feb 2020 14:01:31 -0500 Subject: [PATCH 18/21] Query for genesis in syncer constructor --- blockchain/chain_sync/src/sync.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index a9d720fb12a5..2a050485c4f6 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -17,7 +17,7 @@ use forest_libp2p::{NetworkEvent, NetworkMessage}; use futures::{select, FutureExt}; use ipld_blockstore::BlockStore; use libp2p::core::PeerId; -use log::info; +use log::{info, warn}; use lru::LruCache; use message::Message; use num_bigint::BigUint; @@ -107,13 +107,17 @@ where network_send: Sender, network_rx: Receiver, ) -> Result { - // TODO import genesis from storage - let _genesis = Tipset::new(vec![BlockHeader::default()])?; - // TODO change from being default when impl let sync_manager = SyncManager::default(); let chain_store = ChainStore::new(db); + let _genesis = match chain_store.genesis()? { + Some(gen) => Tipset::new(vec![gen])?, + None => { + warn!("no genesis found in data storage, using a default"); + Tipset::new(vec![BlockHeader::default()])? + } + }; let state_manager = StateManager::new(db, HamtStateTree::default()); From d08022a19dfe2e803ced1ecedab06fd79c176996 Mon Sep 17 00:00:00 2001 From: austinabell Date: Thu, 27 Feb 2020 14:08:23 -0500 Subject: [PATCH 19/21] update TODO and future proof generics --- blockchain/chain_sync/src/sync.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index 2a050485c4f6..9e835e7beb3c 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -107,13 +107,13 @@ where network_send: Sender, network_rx: Receiver, ) -> Result { - // TODO change from being default when impl let sync_manager = SyncManager::default(); let chain_store = ChainStore::new(db); let _genesis = match chain_store.genesis()? { Some(gen) => Tipset::new(vec![gen])?, None => { + // TODO change default logic for genesis or setup better initialization warn!("no genesis found in data storage, using a default"); Tipset::new(vec![BlockHeader::default()])? } @@ -134,7 +134,13 @@ where bad_blocks: LruCache::new(1 << 15), }) } +} +impl<'db, DB, ST> ChainSyncer<'db, DB, ST> +where + DB: BlockStore, + ST: StateTree, +{ /// Starts syncing process pub async fn sync(&mut self) -> Result<(), Error> { let mut nw = self.network_rx.clone().fuse(); @@ -261,13 +267,14 @@ where // TODO verify_bls_aggregate // check msgs for validity - fn check_msg( + fn check_msg( msg: &M, msg_meta_data: &mut HashMap, - tree: &HamtStateTree, + tree: &ST, ) -> Result<(), Error> where M: Message, + ST: StateTree, { let updated_state: MsgMetaData = match msg_meta_data.get(msg.from()) { // address is present begin validity checks From f2d182ff02d6d4e26b47d0576cbf370050853680 Mon Sep 17 00:00:00 2001 From: austinabell Date: Fri, 28 Feb 2020 07:13:31 -0500 Subject: [PATCH 20/21] remove main async tag --- forest/src/main.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/forest/src/main.rs b/forest/src/main.rs index 13f644857c15..f9e002d9208d 100644 --- a/forest/src/main.rs +++ b/forest/src/main.rs @@ -39,8 +39,7 @@ fn block_until_sigint() { task::block_on(ctrlc_oneshot).unwrap(); } -#[async_std::main] -async fn main() { +fn main() { logger::setup_logger(); info!("Starting Forest"); From ae66d68ad7283c5bc72dcca185ed1566e7f8a055 Mon Sep 17 00:00:00 2001 From: austinabell Date: Fri, 28 Feb 2020 12:24:13 -0500 Subject: [PATCH 21/21] doczzz --- blockchain/chain_sync/src/manager.rs | 4 ++++ blockchain/chain_sync/src/network_context.rs | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/blockchain/chain_sync/src/manager.rs b/blockchain/chain_sync/src/manager.rs index 335eddaa972a..55b9430ca7ce 100644 --- a/blockchain/chain_sync/src/manager.rs +++ b/blockchain/chain_sync/src/manager.rs @@ -6,19 +6,23 @@ use blocks::Tipset; use libp2p::core::PeerId; use std::sync::Arc; +/// Manages tipsets pulled from network to be synced #[derive(Default)] pub struct SyncManager { sync_queue: SyncBucketSet, } impl SyncManager { + /// Schedules a new tipset to be handled by the sync manager pub fn schedule_tipset(&mut self, tipset: Arc) { // TODO implement interactions for syncing state when SyncManager built out self.sync_queue.insert(tipset); } + /// Retrieves the heaviest tipset in the sync queue pub fn select_sync_target(&self) -> Option> { self.sync_queue.heaviest() } + /// Sets the PeerId indicating the head tipset pub fn set_peer_head(&self, _peer: &PeerId, _ts: Tipset) { // TODO todo!() diff --git a/blockchain/chain_sync/src/network_context.rs b/blockchain/chain_sync/src/network_context.rs index 44710a114266..78166897d539 100644 --- a/blockchain/chain_sync/src/network_context.rs +++ b/blockchain/chain_sync/src/network_context.rs @@ -10,7 +10,7 @@ use forest_libp2p::{ use libp2p::core::PeerId; use log::trace; -/// Context used in chain sync to handle sequential +/// Context used in chain sync to handle network requests pub struct SyncNetworkContext { /// Channel to send network messages through p2p service network_send: Sender,