Skip to content

Commit

Permalink
fix: fetch tipsets in a single request (#5071)
Browse files Browse the repository at this point in the history
  • Loading branch information
lemmih authored Dec 11, 2024
1 parent 466f3a4 commit 713ce91
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 113 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@

### Fixed

- [#5071](https://github.com/ChainSafe/forest/pull/5071) Fix issue that caused
Forest to temporarily drift out of sync.

## Forest v0.23.0 "Saenchai"

This is a mandatory release for the calibration network. It includes the NV25
Expand Down
89 changes: 21 additions & 68 deletions src/chain_sync/chain_muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,14 @@ use crate::libp2p::{
};
use crate::message::SignedMessage;
use crate::message_pool::{MessagePool, Provider};
use crate::shim::{clock::SECONDS_IN_DAY, message::Message};
use crate::shim::clock::SECONDS_IN_DAY;
use crate::state_manager::StateManager;
use crate::{
blocks::{Block, CreateTipsetError, FullTipset, GossipBlock, Tipset, TipsetKey},
blocks::{Block, CreateTipsetError, FullTipset, Tipset, TipsetKey},
networks::calculate_expected_epoch,
};
use cid::Cid;
use futures::{
future::{try_join_all, Future},
stream::FuturesUnordered,
try_join, StreamExt,
};
use futures::{future::Future, stream::FuturesUnordered, StreamExt};
use fvm_ipld_blockstore::Blockstore;
use itertools::{Either, Itertools};
use parking_lot::RwLock;
Expand Down Expand Up @@ -69,8 +65,6 @@ pub enum ChainMuxerError {
ChainStore(#[from] ChainStoreError),
#[error("Chain exchange: {0}")]
ChainExchange(String),
#[error("Bitswap: {0}")]
Bitswap(String),
#[error("Block error: {0}")]
Block(#[from] CreateTipsetError),
#[error("Following network unexpectedly failed: {0}")]
Expand Down Expand Up @@ -220,7 +214,7 @@ where
async fn get_full_tipset(
network: SyncNetworkContext<DB>,
chain_store: Arc<ChainStore<DB>>,
peer_id: PeerId,
peer_id: Option<PeerId>,
tipset_keys: TipsetKey,
) -> Result<FullTipset, ChainMuxerError> {
// Attempt to load from the store
Expand All @@ -229,7 +223,7 @@ where
}
// Load from the network
network
.chain_exchange_fts(Some(peer_id), &tipset_keys.clone())
.chain_exchange_fts(peer_id, &tipset_keys.clone())
.await
.map_err(ChainMuxerError::ChainExchange)
}
Expand Down Expand Up @@ -306,52 +300,6 @@ where
network.peer_manager().unmark_peer_bad(&peer_id);
}

async fn gossipsub_block_to_full_tipset(
block: GossipBlock,
source: PeerId,
network: SyncNetworkContext<DB>,
) -> Result<FullTipset, ChainMuxerError> {
debug!(
"Received block over GossipSub: {} height {} from {}",
block.header.cid(),
block.header.epoch,
source,
);

let epoch = block.header.epoch;

debug!(
"Getting messages of gossipblock, epoch: {epoch}, block: {}",
block.header.cid()
);
// Get bls_message in the store or over Bitswap
let bls_messages: Vec<_> = block
.bls_messages
.into_iter()
.map(|m| network.bitswap_get::<Message>(m, Some(epoch)))
.collect();

// Get secp_messages in the store or over Bitswap
let secp_messages: Vec<_> = block
.secpk_messages
.into_iter()
.map(|m| network.bitswap_get::<SignedMessage>(m, Some(epoch)))
.collect();

let (bls_messages, secp_messages) =
match try_join!(try_join_all(bls_messages), try_join_all(secp_messages)) {
Ok(msgs) => msgs,
Err(e) => return Err(ChainMuxerError::Bitswap(e)),
};

let block = Block {
header: block.header,
bls_messages,
secp_messages,
};
Ok(FullTipset::from(block))
}

fn handle_pubsub_message(mem_pool: Arc<MessagePool<M>>, message: SignedMessage) {
if let Err(why) = mem_pool.add(message) {
debug!(
Expand All @@ -372,7 +320,7 @@ where
message_processing_strategy: PubsubMessageProcessingStrategy,
block_delay: u32,
stateless_mode: bool,
) -> Result<Option<(FullTipset, PeerId)>, ChainMuxerError> {
) -> Result<Option<FullTipset>, ChainMuxerError> {
let (tipset, source) = match event {
NetworkEvent::HelloRequestInbound => {
metrics::LIBP2P_MESSAGE_TOTAL
Expand All @@ -396,7 +344,7 @@ where
let tipset = match Self::get_full_tipset(
network.clone(),
chain_store.clone(),
source,
Some(source),
tipset_keys,
)
.await
Expand Down Expand Up @@ -454,8 +402,13 @@ where
return Ok(None);
}
// Assemble full tipset from block only in stateful mode
let tipset =
Self::gossipsub_block_to_full_tipset(b, source, network.clone()).await?;
let tipset = Self::get_full_tipset(
network.clone(),
chain_store.clone(),
None,
TipsetKey::from(nunny::vec![*b.header.cid()]),
)
.await?;
(tipset, source)
}
PubsubMessage::Message(m) => {
Expand Down Expand Up @@ -533,7 +486,7 @@ where
// This is needed for the Ethereum mapping
chain_store.put_tipset_key(tipset.key())?;

Ok(Some((tipset, source)))
Ok(Some(tipset))
}

fn stateless_node(&self) -> ChainMuxerFuture<(), ChainMuxerError> {
Expand Down Expand Up @@ -622,7 +575,7 @@ where
}
};

let (tipset, _) = match Self::process_gossipsub_event(
let tipset = match Self::process_gossipsub_event(
event,
network.clone(),
chain_store.clone(),
Expand All @@ -635,7 +588,7 @@ where
)
.await
{
Ok(Some((tipset, source))) => (tipset, source),
Ok(Some(tipset)) => tipset,
Ok(None) => continue,
Err(why) => {
debug!("Processing GossipSub event failed: {:?}", why);
Expand Down Expand Up @@ -761,7 +714,7 @@ where
}
};

let (_tipset, _) = match Self::process_gossipsub_event(
let _tipset = match Self::process_gossipsub_event(
event,
network.clone(),
chain_store.clone(),
Expand All @@ -774,7 +727,7 @@ where
)
.await
{
Ok(Some((tipset, source))) => (tipset, source),
Ok(Some(tipset)) => tipset,
Ok(None) => continue,
Err(why) => {
debug!("Processing GossipSub event failed: {:?}", why);
Expand Down Expand Up @@ -865,7 +818,7 @@ where
}
};

let (tipset, _) = match Self::process_gossipsub_event(
let tipset = match Self::process_gossipsub_event(
event,
network.clone(),
chain_store.clone(),
Expand All @@ -878,7 +831,7 @@ where
)
.await
{
Ok(Some((tipset, source))) => (tipset, source),
Ok(Some(tipset)) => tipset,
Ok(None) => continue,
Err(why) => {
debug!("Processing GossipSub event failed: {:?}", why);
Expand Down
46 changes: 1 addition & 45 deletions src/chain_sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,17 @@ use crate::{
},
hello::{HelloRequest, HelloResponse},
rpc::RequestResponseError,
NetworkMessage, PeerId, PeerManager, BITSWAP_TIMEOUT,
NetworkMessage, PeerId, PeerManager,
},
utils::{
misc::{AdaptiveValueProvider, ExponentialAdaptiveValueProvider},
stats::Stats,
},
};
use anyhow::Context as _;
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::CborStore;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use serde::de::DeserializeOwned;
use std::future::Future;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
Expand Down Expand Up @@ -232,47 +229,6 @@ where
Ok(fts.remove(0))
}

/// Requests that some content with a particular `Cid` get fetched over
/// `Bitswap` if it doesn't exist in the `BlockStore`.
pub async fn bitswap_get<TMessage: DeserializeOwned>(
&self,
content: Cid,
epoch: Option<i64>,
) -> Result<TMessage, String> {
// Check if what we are fetching over Bitswap already exists in the
// database. If it does, return it, else fetch over the network.
if let Some(b) = self.db.get_cbor(&content).map_err(|e| e.to_string())? {
return Ok(b);
}

let (tx, rx) = flume::bounded(1);

self.network_send
.send_async(NetworkMessage::BitswapRequest {
cid: content,
response_channel: tx,
epoch,
})
.await
.map_err(|_| "failed to send bitswap request, network receiver dropped")?;

let success = tokio::task::spawn_blocking(move || {
rx.recv_timeout(BITSWAP_TIMEOUT).unwrap_or_default()
})
.await
.is_ok();

match self.db.get_cbor(&content) {
Ok(Some(b)) => Ok(b),
Ok(None) => Err(format!(
"Not found in db, bitswap. success: {success} cid, {content:?}"
)),
Err(e) => Err(format!(
"Error retrieving from db. success: {success} cid, {content:?}, {e}"
)),
}
}

/// Helper function to handle the peer retrieval if no peer supplied as well
/// as the logging and updating of the peer info in the `PeerManager`.
async fn handle_chain_exchange_request<T, F>(
Expand Down

0 comments on commit 713ce91

Please sign in to comment.