Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update libp2p, async-std, and other deps #922

Merged
merged 7 commits into from
Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,116 changes: 692 additions & 424 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion blockchain/beacon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2018"
features = ["json"]

[dependencies]
ahash = "0.5"
ahash = "0.6"
async-std = { version = "1.6.3", features = ["unstable"] }
clock = { package = "fil_clock", path = "../../node/clock" }
bls-signatures = "0.6.1"
Expand Down
8 changes: 5 additions & 3 deletions blockchain/chain/src/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
use super::{tipset_tracker::TipsetTracker, ChainIndex, Error};
use actor::{miner, power};
use address::Address;
use async_std::sync::{channel, RwLock};
use async_std::channel::bounded;
use async_std::sync::RwLock;
use async_std::task;
use beacon::{BeaconEntry, IGNORE_DRAND_VAR};
use blake2b_simd::Params;
Expand Down Expand Up @@ -526,7 +527,7 @@ where
{
// Channel cap is equal to buffered write size
const CHANNEL_CAP: usize = 1000;
let (tx, mut rx) = channel(CHANNEL_CAP);
let (tx, mut rx) = bounded(CHANNEL_CAP);
let header = CarHeader::from(tipset.key().cids().to_vec());
let write_task =
task::spawn(async move { header.write_stream_async(&mut writer, &mut rx).await });
Expand All @@ -540,7 +541,8 @@ where
.ok_or_else(|| format!("Cid {} not found in blockstore", cid))?;

// * If cb can return a generic type, deserializing would remove need to clone.
task::block_on(tx.send((cid, block.clone())));
// Ignore error intentionally, if receiver dropped, error will be handled below
let _ = task::block_on(tx.send((cid, block.clone())));
Ok(block)
})
.await?;
Expand Down
2 changes: 1 addition & 1 deletion blockchain/chain_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ blocks = { package = "forest_blocks", path = "../blocks", features = ["json"] }
beacon = { path = "../beacon" }
db = { package = "forest_db", version = "0.1" }
encoding = { package = "forest_encoding", version = "0.2.1" }
libp2p = { version = "0.24", default-features = false }
libp2p = { version = "0.28", default-features = false }
cid = { package = "forest_cid", version = "0.3" }
ipld_blockstore = "0.1"
chain = { path = "../chain" }
Expand Down
23 changes: 17 additions & 6 deletions blockchain/chain_sync/src/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use super::peer_manager::PeerManager;
use async_std::channel::Sender;
use async_std::future;
use async_std::sync::Sender;
use blocks::{FullTipset, Tipset, TipsetKeys};
use cid::Cid;
use encoding::de::DeserializeOwned;
Expand Down Expand Up @@ -132,7 +132,8 @@ where
cid: content,
response_channel: tx,
})
.await;
.await
.map_err(|_| "failed to send bitswap request, network receiver dropped")?;
let res = future::timeout(Duration::from_secs(RPC_TIMEOUT), rx).await;
match res {
Ok(Ok(())) => {
Expand Down Expand Up @@ -223,13 +224,18 @@ where
let req_pre_time = SystemTime::now();

let (tx, rx) = oneshot_channel();
self.network_send
if self
.network_send
.send(NetworkMessage::ChainExchangeRequest {
peer_id: peer_id.clone(),
request,
response_channel: tx,
})
.await;
.await
.is_err()
{
return Err("Failed to send chain exchange request to network".to_string());
};

let res = future::timeout(Duration::from_secs(RPC_TIMEOUT), rx).await;
let res_duration = SystemTime::now()
Expand All @@ -252,11 +258,16 @@ where
}

/// Send a hello request to the network (does not await response)
pub async fn hello_request(&self, peer_id: PeerId, request: HelloRequest) {
pub async fn hello_request(
&self,
peer_id: PeerId,
request: HelloRequest,
) -> Result<(), &'static str> {
trace!("Sending Hello Message {:?}", request);
// TODO update to await response when we want to handle the latency
self.network_send
.send(NetworkMessage::HelloRequest { peer_id, request })
.await;
.await
.map_err(|_| "Failed to send hello request: receiver dropped")
}
}
39 changes: 25 additions & 14 deletions blockchain/chain_sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use super::sync_state::SyncState;
use super::sync_worker::SyncWorker;
use super::{Error, SyncNetworkContext};
use amt::Amt;
use async_std::sync::{channel, Mutex, Receiver, RwLock, Sender};
use async_std::channel::{bounded, Receiver, Sender};
use async_std::sync::{Mutex, RwLock};
use async_std::task::{self, JoinHandle};
use beacon::{Beacon, BeaconSchedule};
use blocks::{Block, FullTipset, GossipBlock, Tipset, TipsetKeys, TxMeta};
Expand All @@ -24,7 +25,7 @@ use futures::stream::StreamExt;
use futures::{future::try_join_all, try_join};
use ipld_blockstore::BlockStore;
use libp2p::core::PeerId;
use log::{debug, info, trace, warn};
use log::{debug, error, info, trace, warn};
use message::{SignedMessage, UnsignedMessage};
use message_pool::{MessagePool, Provider};
use serde::Deserialize;
Expand Down Expand Up @@ -175,8 +176,7 @@ where
new_ts_tx: &Sender<(PeerId, FullTipset)>,
) {
match network_event {
NetworkEvent::HelloRequest { request, channel } => {
let source = channel.peer.clone();
NetworkEvent::HelloRequest { request, source } => {
self.network
.peer_manager()
.update_peer_head(source.clone(), None)
Expand Down Expand Up @@ -208,7 +208,8 @@ where
.heaviest_tipset()
.await
.unwrap();
self.network
if let Err(e) = self
.network
.hello_request(
peer_id,
HelloRequest {
Expand All @@ -219,6 +220,9 @@ where
},
)
.await
{
error!("{}", e)
};
}
NetworkEvent::PubsubMessage { source, message } => {
if *self.state.lock().await != ChainSyncState::Follow {
Expand Down Expand Up @@ -297,7 +301,9 @@ where
secp_messages,
};
let ts = FullTipset::new(vec![block]).unwrap();
channel.send((source, ts)).await;
if channel.send((source, ts)).await.is_err() {
error!("Failed to update peer list, receiver dropped");
}
}

/// Spawns a network handler and begins the syncing process.
Expand All @@ -308,7 +314,7 @@ where
}

// Channels to handle fetching hello tipsets in separate task and return tipset.
let (new_ts_tx, new_ts_rx) = channel(10);
let (new_ts_tx, new_ts_rx) = bounded(10);

let mut fused_handler = self.net_handler.clone().fuse();
let mut fused_inform_channel = new_ts_rx.fuse();
Expand All @@ -319,7 +325,10 @@ where
if let Some(tar) = self.next_sync_target.take() {
if let Some(ts) = tar.heaviest_tipset() {
self.active_sync_tipsets.insert(ts.clone());
worker_tx.send(ts).await;
worker_tx
.send(ts)
.await
.expect("Worker receivers should not be dropped");
}
}
}
Expand Down Expand Up @@ -351,7 +360,10 @@ where
) {
match Self::fetch_full_tipset(cs.as_ref(), &network, peer_id.clone(), &tsk).await {
Ok(fts) => {
channel.send((peer_id, fts)).await;
channel
.send((peer_id, fts))
.await
.expect("Inform tipset receiver dropped");
}
Err(e) => {
debug!("Failed to fetch full tipset from peer ({}): {}", peer_id, e);
Expand Down Expand Up @@ -576,8 +588,7 @@ fn cids_from_messages<T: Cbor>(messages: &[T]) -> Result<Vec<Cid>, EncodingError
#[cfg(test)]
mod tests {
use super::*;
use async_std::sync::channel;
use async_std::sync::Sender;
use async_std::channel::{bounded, Sender};
use async_std::task;
use beacon::{BeaconPoint, MockBeacon};
use db::MemoryDB;
Expand All @@ -599,7 +610,7 @@ mod tests {
) {
let chain_store = Arc::new(ChainStore::new(db.clone()));
let test_provider = TestApi::default();
let (tx, _rx) = channel(10);
let (tx, _rx) = bounded(10);
let mpool = task::block_on(MessagePool::new(
test_provider,
"test".to_string(),
Expand All @@ -608,8 +619,8 @@ mod tests {
))
.unwrap();
let mpool = Arc::new(mpool);
let (local_sender, test_receiver) = channel(20);
let (event_sender, event_receiver) = channel(20);
let (local_sender, test_receiver) = bounded(20);
let (event_sender, event_receiver) = bounded(20);

let gen = construct_dummy_header();
chain_store.set_genesis(&gen).unwrap();
Expand Down
21 changes: 9 additions & 12 deletions blockchain/chain_sync/src/sync/peer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@

use super::*;
use address::Address;
use async_std::sync::channel;
use async_std::channel::bounded;
use async_std::task;
use beacon::{BeaconPoint, MockBeacon};
use blocks::BlockHeader;
use db::MemoryDB;
use fil_types::verifier::MockVerifier;
use forest_libp2p::{hello::HelloRequest, rpc::ResponseChannel};
use forest_libp2p::hello::HelloRequest;
use libp2p::core::PeerId;
use message_pool::{test_provider::TestApi, MessagePool};
use state_manager::StateManager;
Expand All @@ -20,7 +20,7 @@ fn peer_manager_update() {
let db = Arc::new(MemoryDB::default());

let chain_store = Arc::new(ChainStore::new(db.clone()));
let (tx, _rx) = channel(10);
let (tx, _rx) = bounded(10);
let mpool = task::block_on(MessagePool::new(
TestApi::default(),
"test".to_string(),
Expand All @@ -30,8 +30,8 @@ fn peer_manager_update() {
.unwrap();
let mpool = Arc::new(mpool);

let (local_sender, _test_receiver) = channel(20);
let (event_sender, event_receiver) = channel(20);
let (local_sender, _test_receiver) = bounded(20);
let (event_sender, event_receiver) = bounded(20);

let msg_root = compute_msg_meta(chain_store.blockstore(), &[], &[]).unwrap();

Expand Down Expand Up @@ -63,14 +63,13 @@ fn peer_manager_update() {

let peer_manager = Arc::clone(&cs.network.peer_manager_cloned());

let (worker_tx, worker_rx) = channel(10);
let (worker_tx, worker_rx) = bounded(10);
task::spawn(async {
cs.start(worker_tx, worker_rx).await;
});

let source = PeerId::random();
let source_clone = source.clone();
let (sender, _) = channel(1);

let gen_cloned = genesis_ts.clone();
task::block_on(async {
Expand All @@ -82,12 +81,10 @@ fn peer_manager_update() {
heaviest_tipset_weight: gen_cloned.weight().clone(),
genesis_hash: gen_hash,
},
channel: ResponseChannel {
peer: source,
sender,
},
source,
})
.await;
.await
.unwrap();

// Would be ideal to not have to sleep here and have it deterministic
task::sleep(Duration::from_millis(1000)).await;
Expand Down
7 changes: 4 additions & 3 deletions blockchain/chain_sync/src/sync_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use super::{Error, SyncNetworkContext};
use actor::{is_account_actor, power};
use address::Address;
use amt::Amt;
use async_std::sync::{Mutex, Receiver, RwLock};
use async_std::channel::Receiver;
use async_std::sync::{Mutex, RwLock};
use async_std::task::{self, JoinHandle};
use beacon::{Beacon, BeaconEntry, BeaconSchedule, IGNORE_DRAND_VAR};
use blocks::{Block, BlockHeader, FullTipset, Tipset, TipsetKeys, TxMeta};
Expand Down Expand Up @@ -1020,7 +1021,7 @@ fn cids_from_messages<T: Cbor>(messages: &[T]) -> Result<Vec<Cid>, EncodingError
#[cfg(test)]
mod tests {
use super::*;
use async_std::sync::channel;
use async_std::channel::bounded;
use beacon::{BeaconPoint, MockBeacon};
use db::MemoryDB;
use fil_types::verifier::MockVerifier;
Expand All @@ -1038,7 +1039,7 @@ mod tests {
) {
let chain_store = Arc::new(ChainStore::new(db.clone()));

let (local_sender, test_receiver) = channel(20);
let (local_sender, test_receiver) = bounded(20);

let gen = construct_dummy_header();
chain_store.set_genesis(&gen).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions blockchain/chain_sync/src/sync_worker/full_sync_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use super::*;
use crate::peer_manager::PeerManager;
use async_std::sync::channel;
use async_std::channel::bounded;
use async_std::task;
use db::MemoryDB;
use fil_types::verifier::FullVerifier;
Expand Down Expand Up @@ -43,7 +43,7 @@ async fn space_race_full_sync() {
let chain_store = Arc::new(ChainStore::new(db.clone()));
let state_manager = Arc::new(StateManager::new(chain_store));

let (network_send, network_recv) = channel(20);
let (network_send, network_recv) = bounded(20);

// Initialize genesis using default (currently space-race) genesis
let (genesis, _) = initialize_genesis(None, &state_manager).await.unwrap();
Expand Down
Loading