From 24bba7e71bee1cb7a2208926aa5da54f6fc606f7 Mon Sep 17 00:00:00 2001 From: austinabell Date: Thu, 22 Oct 2020 14:07:16 -0400 Subject: [PATCH 1/3] Fix sync messages logic --- blockchain/chain/src/store/chain_store.rs | 33 +++++++++++----- blockchain/chain_sync/src/sync_worker.rs | 48 ++++++++++------------- 2 files changed, 44 insertions(+), 37 deletions(-) diff --git a/blockchain/chain/src/store/chain_store.rs b/blockchain/chain/src/store/chain_store.rs index 58bef16422fb..444fc19d4376 100644 --- a/blockchain/chain/src/store/chain_store.rs +++ b/blockchain/chain/src/store/chain_store.rs @@ -205,8 +205,8 @@ where } /// Constructs and returns a full tipset if messages from storage exists - pub fn fill_tipsets(&self, ts: Tipset) -> Result { - fill_tipsets(self.blockstore(), ts) + pub fn fill_tipset(&self, ts: Tipset) -> Result { + fill_tipset(self.blockstore(), ts) } /// Determines if provided tipset is heavier than existing known heaviest tipset @@ -323,20 +323,35 @@ where } /// Constructs and returns a full tipset if messages from storage exists - non self version -pub fn fill_tipsets(db: &DB, ts: Tipset) -> Result +pub fn fill_tipset(db: &DB, ts: Tipset) -> Result where DB: BlockStore, { - let mut blocks: Vec = Vec::with_capacity(ts.blocks().len()); + // Collect all messages before moving tipset. + let messages: Vec<(Vec<_>, Vec<_>)> = match ts + .blocks() + .iter() + .map(|h| block_messages(db, h)) + .collect::>() + { + Ok(m) => m, + Err(e) => { + log::trace!("failed to fill tipset: {}", e); + return Err(ts); + } + }; - for header in ts.into_blocks() { - let (bls_messages, secp_messages) = block_messages(db, &header)?; - blocks.push(Block { + // Zip messages with blocks + let blocks = ts + .into_blocks() + .into_iter() + .zip(messages) + .map(|(header, (bls_messages, secp_messages))| Block { header, bls_messages, secp_messages, - }); - } + }) + .collect(); // the given tipset has already been verified, so this cannot fail Ok(FullTipset::new(blocks).unwrap()) diff --git a/blockchain/chain_sync/src/sync_worker.rs b/blockchain/chain_sync/src/sync_worker.rs index 7d88266ac934..ea6b54eb07fb 100644 --- a/blockchain/chain_sync/src/sync_worker.rs +++ b/blockchain/chain_sync/src/sync_worker.rs @@ -32,7 +32,7 @@ use state_manager::StateManager; use state_tree::StateTree; use std::cmp::min; use std::collections::HashMap; -use std::convert::{TryFrom, TryInto}; +use std::convert::TryInto; use std::error::Error as StdError; use std::marker::PhantomData; use std::sync::Arc; @@ -129,7 +129,7 @@ where } // Sync and validate messages from fetched tipsets self.set_stage(SyncStage::Messages).await; - if let Err(e) = self.sync_messages_check_state(&tipsets).await { + if let Err(e) = self.sync_messages_check_state(tipsets).await { self.state.write().await.error(e.to_string()); return Err(e); } @@ -299,45 +299,39 @@ where } /// Syncs messages by first checking state for message existence otherwise fetches messages from blocksync - async fn sync_messages_check_state(&self, ts: &[Tipset]) -> Result<(), Error> { - // see https://github.com/filecoin-project/lotus/blob/master/build/params_shared.go#L109 for request window size - const REQUEST_WINDOW: i64 = 1; - // TODO refactor type handling - // set i to the length of provided tipsets - let mut i: i64 = i64::try_from(ts.len())? - 1; - - while i >= 0 { + async fn sync_messages_check_state(&self, tipsets: Vec) -> Result<(), Error> { + let mut ts_iter = tipsets.into_iter().rev(); + // Currently syncing 1 height at a time, no reason for us to sync more + const REQUEST_WINDOW: usize = 1; + + while let Some(ts) = ts_iter.next() { // check storage first to see if we have full tipset - let fts = match self.chain_store.fill_tipsets(ts[i as usize].clone()) { + let fts = match self.chain_store.fill_tipset(ts) { Ok(fts) => fts, - Err(_) => { + Err(ts) => { // no full tipset in storage; request messages via blocksync - let mut batch_size = REQUEST_WINDOW; - if i < batch_size { - batch_size = i; - } - - // set params for blocksync request - let idx = i - batch_size; - let next = &ts[idx as usize]; - let req_len = batch_size + 1; + let batch_size = REQUEST_WINDOW; debug!( "BlockSync message sync tipsets: epoch: {}, len: {}", - next.epoch(), - req_len + ts.epoch(), + batch_size ); // receive tipset bundle from block sync let compacted_messages = self .network - .blocksync_messages(None, next.key(), req_len as u64) + .blocksync_messages(None, ts.key(), batch_size as u64) .await?; - let mut ts_r = ts[(idx) as usize..(idx + 1 + req_len) as usize].to_vec(); + // Chain current tipset with iterator + let mut bs_iter = std::iter::once(ts).chain(&mut ts_iter); + // since the bundle only has messages, we have to put the headers in them for messages in compacted_messages.into_iter() { - let t = ts_r.pop().unwrap(); + let t = bs_iter.next().ok_or_else(|| { + Error::Other("Messages returned exceeded tipsets in chain".to_string()) + })?; let bundle = TipsetBundle { blocks: t.into_blocks(), @@ -360,7 +354,6 @@ where } } - i -= REQUEST_WINDOW; continue; } }; @@ -368,7 +361,6 @@ where let curr_epoch = fts.epoch(); self.validate_tipset(fts).await?; self.state.write().await.set_epoch(curr_epoch); - i -= 1; continue; } From cc38d60b51f80c438ddb2a05f16e35f8de01cec7 Mon Sep 17 00:00:00 2001 From: austinabell Date: Thu, 22 Oct 2020 16:06:24 -0400 Subject: [PATCH 2/3] Fix response code handling --- node/forest_libp2p/src/blocksync/message.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/node/forest_libp2p/src/blocksync/message.rs b/node/forest_libp2p/src/blocksync/message.rs index 3bad03237308..98490d929c4b 100644 --- a/node/forest_libp2p/src/blocksync/message.rs +++ b/node/forest_libp2p/src/blocksync/message.rs @@ -115,7 +115,9 @@ impl BlockSyncResponse { where T: TryFrom, { - if self.status != BlockSyncResponseStatus::Success { + if self.status != BlockSyncResponseStatus::Success + && self.status != BlockSyncResponseStatus::PartialResponse + { // TODO implement a better error type than string if needed to be handled differently return Err(format!("Status {:?}: {}", self.status, self.message)); } From 084eb2d1f9039247d36453d6c3a34ecfd4e0131b Mon Sep 17 00:00:00 2001 From: austinabell Date: Thu, 22 Oct 2020 16:07:34 -0400 Subject: [PATCH 3/3] Remove unnecessary TODO --- node/forest_libp2p/src/blocksync/message.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/node/forest_libp2p/src/blocksync/message.rs b/node/forest_libp2p/src/blocksync/message.rs index 98490d929c4b..50cb483c367a 100644 --- a/node/forest_libp2p/src/blocksync/message.rs +++ b/node/forest_libp2p/src/blocksync/message.rs @@ -118,7 +118,6 @@ impl BlockSyncResponse { if self.status != BlockSyncResponseStatus::Success && self.status != BlockSyncResponseStatus::PartialResponse { - // TODO implement a better error type than string if needed to be handled differently return Err(format!("Status {:?}: {}", self.status, self.message)); }