Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sync messages logic #778

Merged
merged 3 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions blockchain/chain/src/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ where
}

/// Constructs and returns a full tipset if messages from storage exists
pub fn fill_tipsets(&self, ts: Tipset) -> Result<FullTipset, Error> {
fill_tipsets(self.blockstore(), ts)
pub fn fill_tipset(&self, ts: Tipset) -> Result<FullTipset, Tipset> {
fill_tipset(self.blockstore(), ts)
}

/// Determines if provided tipset is heavier than existing known heaviest tipset
Expand Down Expand Up @@ -323,20 +323,35 @@ where
}

/// Constructs and returns a full tipset if messages from storage exists - non self version
pub fn fill_tipsets<DB>(db: &DB, ts: Tipset) -> Result<FullTipset, Error>
pub fn fill_tipset<DB>(db: &DB, ts: Tipset) -> Result<FullTipset, Tipset>
where
DB: BlockStore,
{
let mut blocks: Vec<Block> = Vec::with_capacity(ts.blocks().len());
// Collect all messages before moving tipset.
let messages: Vec<(Vec<_>, Vec<_>)> = match ts
.blocks()
.iter()
.map(|h| block_messages(db, h))
.collect::<Result<_, Error>>()
{
Ok(m) => m,
Err(e) => {
log::trace!("failed to fill tipset: {}", e);
return Err(ts);
}
};

for header in ts.into_blocks() {
let (bls_messages, secp_messages) = block_messages(db, &header)?;
blocks.push(Block {
// Zip messages with blocks
let blocks = ts
.into_blocks()
.into_iter()
.zip(messages)
.map(|(header, (bls_messages, secp_messages))| Block {
header,
bls_messages,
secp_messages,
});
}
})
.collect();

// the given tipset has already been verified, so this cannot fail
Ok(FullTipset::new(blocks).unwrap())
Expand Down
48 changes: 20 additions & 28 deletions blockchain/chain_sync/src/sync_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use state_manager::StateManager;
use state_tree::StateTree;
use std::cmp::min;
use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::convert::TryInto;
use std::error::Error as StdError;
use std::marker::PhantomData;
use std::sync::Arc;
Expand Down Expand Up @@ -129,7 +129,7 @@ where
}
// Sync and validate messages from fetched tipsets
self.set_stage(SyncStage::Messages).await;
if let Err(e) = self.sync_messages_check_state(&tipsets).await {
if let Err(e) = self.sync_messages_check_state(tipsets).await {
self.state.write().await.error(e.to_string());
return Err(e);
}
Expand Down Expand Up @@ -299,45 +299,39 @@ where
}

/// Syncs messages by first checking state for message existence otherwise fetches messages from blocksync
async fn sync_messages_check_state(&self, ts: &[Tipset]) -> Result<(), Error> {
// see https://github.com/filecoin-project/lotus/blob/master/build/params_shared.go#L109 for request window size
const REQUEST_WINDOW: i64 = 1;
// TODO refactor type handling
// set i to the length of provided tipsets
let mut i: i64 = i64::try_from(ts.len())? - 1;

while i >= 0 {
async fn sync_messages_check_state(&self, tipsets: Vec<Tipset>) -> Result<(), Error> {
let mut ts_iter = tipsets.into_iter().rev();
// Currently syncing 1 height at a time, no reason for us to sync more
const REQUEST_WINDOW: usize = 1;

while let Some(ts) = ts_iter.next() {
// check storage first to see if we have full tipset
let fts = match self.chain_store.fill_tipsets(ts[i as usize].clone()) {
let fts = match self.chain_store.fill_tipset(ts) {
Ok(fts) => fts,
Err(_) => {
Err(ts) => {
// no full tipset in storage; request messages via blocksync

let mut batch_size = REQUEST_WINDOW;
if i < batch_size {
batch_size = i;
}

// set params for blocksync request
let idx = i - batch_size;
let next = &ts[idx as usize];
let req_len = batch_size + 1;
let batch_size = REQUEST_WINDOW;
debug!(
"BlockSync message sync tipsets: epoch: {}, len: {}",
next.epoch(),
req_len
ts.epoch(),
batch_size
);

// receive tipset bundle from block sync
let compacted_messages = self
.network
.blocksync_messages(None, next.key(), req_len as u64)
.blocksync_messages(None, ts.key(), batch_size as u64)
.await?;

let mut ts_r = ts[(idx) as usize..(idx + 1 + req_len) as usize].to_vec();
// Chain current tipset with iterator
let mut bs_iter = std::iter::once(ts).chain(&mut ts_iter);

// since the bundle only has messages, we have to put the headers in them
for messages in compacted_messages.into_iter() {
let t = ts_r.pop().unwrap();
let t = bs_iter.next().ok_or_else(|| {
Error::Other("Messages returned exceeded tipsets in chain".to_string())
})?;

let bundle = TipsetBundle {
blocks: t.into_blocks(),
Expand All @@ -360,15 +354,13 @@ where
}
}

i -= REQUEST_WINDOW;
continue;
}
};
// full tipset found in storage; validate and continue
let curr_epoch = fts.epoch();
self.validate_tipset(fts).await?;
self.state.write().await.set_epoch(curr_epoch);
i -= 1;
continue;
}

Expand Down