Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fetch tipsets in a single request #5071

Merged
merged 7 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@

### Fixed

- [#5071](https://github.com/ChainSafe/forest/pull/5071) Fix issue that caused
Forest to temporarily drift out of sync.

## Forest v0.23.0 "Saenchai"

This is a mandatory release for the calibration network. It includes the NV25
Expand Down
89 changes: 21 additions & 68 deletions src/chain_sync/chain_muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,14 @@ use crate::libp2p::{
};
use crate::message::SignedMessage;
use crate::message_pool::{MessagePool, Provider};
use crate::shim::{clock::SECONDS_IN_DAY, message::Message};
use crate::shim::clock::SECONDS_IN_DAY;
use crate::state_manager::StateManager;
use crate::{
blocks::{Block, CreateTipsetError, FullTipset, GossipBlock, Tipset, TipsetKey},
blocks::{Block, CreateTipsetError, FullTipset, Tipset, TipsetKey},
networks::calculate_expected_epoch,
};
use cid::Cid;
use futures::{
future::{try_join_all, Future},
stream::FuturesUnordered,
try_join, StreamExt,
};
use futures::{future::Future, stream::FuturesUnordered, StreamExt};
use fvm_ipld_blockstore::Blockstore;
use itertools::{Either, Itertools};
use parking_lot::RwLock;
Expand Down Expand Up @@ -69,8 +65,6 @@ pub enum ChainMuxerError {
ChainStore(#[from] ChainStoreError),
#[error("Chain exchange: {0}")]
ChainExchange(String),
#[error("Bitswap: {0}")]
Bitswap(String),
#[error("Block error: {0}")]
Block(#[from] CreateTipsetError),
#[error("Following network unexpectedly failed: {0}")]
Expand Down Expand Up @@ -220,7 +214,7 @@ where
async fn get_full_tipset(
network: SyncNetworkContext<DB>,
chain_store: Arc<ChainStore<DB>>,
peer_id: PeerId,
peer_id: Option<PeerId>,
tipset_keys: TipsetKey,
) -> Result<FullTipset, ChainMuxerError> {
// Attempt to load from the store
Expand All @@ -229,7 +223,7 @@ where
}
// Load from the network
network
.chain_exchange_fts(Some(peer_id), &tipset_keys.clone())
.chain_exchange_fts(peer_id, &tipset_keys.clone())
.await
.map_err(ChainMuxerError::ChainExchange)
}
Expand Down Expand Up @@ -306,52 +300,6 @@ where
network.peer_manager().unmark_peer_bad(&peer_id);
}

async fn gossipsub_block_to_full_tipset(
block: GossipBlock,
source: PeerId,
network: SyncNetworkContext<DB>,
) -> Result<FullTipset, ChainMuxerError> {
debug!(
"Received block over GossipSub: {} height {} from {}",
block.header.cid(),
block.header.epoch,
source,
);

let epoch = block.header.epoch;

debug!(
"Getting messages of gossipblock, epoch: {epoch}, block: {}",
block.header.cid()
);
// Get bls_message in the store or over Bitswap
let bls_messages: Vec<_> = block
.bls_messages
.into_iter()
.map(|m| network.bitswap_get::<Message>(m, Some(epoch)))
.collect();

// Get secp_messages in the store or over Bitswap
let secp_messages: Vec<_> = block
.secpk_messages
.into_iter()
.map(|m| network.bitswap_get::<SignedMessage>(m, Some(epoch)))
.collect();

let (bls_messages, secp_messages) =
match try_join!(try_join_all(bls_messages), try_join_all(secp_messages)) {
Ok(msgs) => msgs,
Err(e) => return Err(ChainMuxerError::Bitswap(e)),
};

let block = Block {
header: block.header,
bls_messages,
secp_messages,
};
Ok(FullTipset::from(block))
}

fn handle_pubsub_message(mem_pool: Arc<MessagePool<M>>, message: SignedMessage) {
if let Err(why) = mem_pool.add(message) {
debug!(
Expand All @@ -372,7 +320,7 @@ where
message_processing_strategy: PubsubMessageProcessingStrategy,
block_delay: u32,
stateless_mode: bool,
) -> Result<Option<(FullTipset, PeerId)>, ChainMuxerError> {
) -> Result<Option<FullTipset>, ChainMuxerError> {
let (tipset, source) = match event {
NetworkEvent::HelloRequestInbound => {
metrics::LIBP2P_MESSAGE_TOTAL
Expand All @@ -396,7 +344,7 @@ where
let tipset = match Self::get_full_tipset(
network.clone(),
chain_store.clone(),
source,
Some(source),
tipset_keys,
)
.await
Expand Down Expand Up @@ -454,8 +402,13 @@ where
return Ok(None);
}
// Assemble full tipset from block only in stateful mode
let tipset =
Self::gossipsub_block_to_full_tipset(b, source, network.clone()).await?;
let tipset = Self::get_full_tipset(
network.clone(),
chain_store.clone(),
None,
TipsetKey::from(nunny::vec![*b.header.cid()]),
)
.await?;
(tipset, source)
}
PubsubMessage::Message(m) => {
Expand Down Expand Up @@ -533,7 +486,7 @@ where
// This is needed for the Ethereum mapping
chain_store.put_tipset_key(tipset.key())?;

Ok(Some((tipset, source)))
Ok(Some(tipset))
}

fn stateless_node(&self) -> ChainMuxerFuture<(), ChainMuxerError> {
Expand Down Expand Up @@ -622,7 +575,7 @@ where
}
};

let (tipset, _) = match Self::process_gossipsub_event(
let tipset = match Self::process_gossipsub_event(
event,
network.clone(),
chain_store.clone(),
Expand All @@ -635,7 +588,7 @@ where
)
.await
{
Ok(Some((tipset, source))) => (tipset, source),
Ok(Some(tipset)) => tipset,
Ok(None) => continue,
Err(why) => {
debug!("Processing GossipSub event failed: {:?}", why);
Expand Down Expand Up @@ -761,7 +714,7 @@ where
}
};

let (_tipset, _) = match Self::process_gossipsub_event(
let _tipset = match Self::process_gossipsub_event(
event,
network.clone(),
chain_store.clone(),
Expand All @@ -774,7 +727,7 @@ where
)
.await
{
Ok(Some((tipset, source))) => (tipset, source),
Ok(Some(tipset)) => tipset,
Ok(None) => continue,
Err(why) => {
debug!("Processing GossipSub event failed: {:?}", why);
Expand Down Expand Up @@ -865,7 +818,7 @@ where
}
};

let (tipset, _) = match Self::process_gossipsub_event(
let tipset = match Self::process_gossipsub_event(
event,
network.clone(),
chain_store.clone(),
Expand All @@ -878,7 +831,7 @@ where
)
.await
{
Ok(Some((tipset, source))) => (tipset, source),
Ok(Some(tipset)) => tipset,
Ok(None) => continue,
Err(why) => {
debug!("Processing GossipSub event failed: {:?}", why);
Expand Down
46 changes: 1 addition & 45 deletions src/chain_sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,17 @@ use crate::{
},
hello::{HelloRequest, HelloResponse},
rpc::RequestResponseError,
NetworkMessage, PeerId, PeerManager, BITSWAP_TIMEOUT,
NetworkMessage, PeerId, PeerManager,
},
utils::{
misc::{AdaptiveValueProvider, ExponentialAdaptiveValueProvider},
stats::Stats,
},
};
use anyhow::Context as _;
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::CborStore;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use serde::de::DeserializeOwned;
use std::future::Future;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
Expand Down Expand Up @@ -232,47 +229,6 @@ where
Ok(fts.remove(0))
}

/// Requests that some content with a particular `Cid` get fetched over
/// `Bitswap` if it doesn't exist in the `BlockStore`.
pub async fn bitswap_get<TMessage: DeserializeOwned>(
&self,
content: Cid,
epoch: Option<i64>,
) -> Result<TMessage, String> {
// Check if what we are fetching over Bitswap already exists in the
// database. If it does, return it, else fetch over the network.
if let Some(b) = self.db.get_cbor(&content).map_err(|e| e.to_string())? {
return Ok(b);
}

let (tx, rx) = flume::bounded(1);

self.network_send
.send_async(NetworkMessage::BitswapRequest {
cid: content,
response_channel: tx,
epoch,
})
.await
.map_err(|_| "failed to send bitswap request, network receiver dropped")?;

let success = tokio::task::spawn_blocking(move || {
rx.recv_timeout(BITSWAP_TIMEOUT).unwrap_or_default()
})
.await
.is_ok();

match self.db.get_cbor(&content) {
Ok(Some(b)) => Ok(b),
Ok(None) => Err(format!(
"Not found in db, bitswap. success: {success} cid, {content:?}"
)),
Err(e) => Err(format!(
"Error retrieving from db. success: {success} cid, {content:?}, {e}"
)),
}
}

/// Helper function to handle the peer retrieval if no peer supplied as well
/// as the logging and updating of the peer info in the `PeerManager`.
async fn handle_chain_exchange_request<T, F>(
Expand Down
Loading