From 2566837869b4d5216654a658181f9197d4afd0db Mon Sep 17 00:00:00 2001 From: austinabell Date: Mon, 25 May 2020 21:11:20 -0400 Subject: [PATCH 1/6] wip kademlia impl --- node/forest_libp2p/src/behaviour.rs | 48 +++++++++++++++++++++++++---- node/forest_libp2p/src/service.rs | 2 +- 2 files changed, 43 insertions(+), 7 deletions(-) diff --git a/node/forest_libp2p/src/behaviour.rs b/node/forest_libp2p/src/behaviour.rs index 1d49d7b8e508..7955a38fdeb3 100644 --- a/node/forest_libp2p/src/behaviour.rs +++ b/node/forest_libp2p/src/behaviour.rs @@ -2,10 +2,13 @@ // SPDX-License-Identifier: Apache-2.0, MIT use super::rpc::{RPCEvent, RPCMessage, RPC}; +use crate::config::Libp2pConfig; use libp2p::core::identity::Keypair; use libp2p::core::PeerId; use libp2p::gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, Topic, TopicHash}; use libp2p::identify::{Identify, IdentifyEvent}; +use libp2p::kad::record::store::MemoryStore; +use libp2p::kad::{Kademlia, KademliaEvent}; use libp2p::mdns::{Mdns, MdnsEvent}; use libp2p::ping::{ handler::{PingFailure, PingSuccess}, @@ -19,11 +22,12 @@ use std::{task::Context, task::Poll}; #[derive(NetworkBehaviour)] #[behaviour(out_event = "ForestBehaviourEvent", poll_method = "poll")] pub struct ForestBehaviour { - pub gossipsub: Gossipsub, - pub mdns: Mdns, - pub ping: Ping, - pub identify: Identify, - pub rpc: RPC, + gossipsub: Gossipsub, + mdns: Mdns, + ping: Ping, + identify: Identify, + rpc: RPC, + kademlia: Kademlia, #[behaviour(ignore)] events: Vec, } @@ -61,6 +65,21 @@ impl NetworkBehaviourEventProcess for ForestBehaviour { } } +impl NetworkBehaviourEventProcess for ForestBehaviour { + fn inject_event(&mut self, event: KademliaEvent) { + match event { + KademliaEvent::Discovered { peer_id, ty, .. } => { + log::info!("kad: Discovered peer {} {:?}", peer_id.to_base58(), ty); + self.events + .push(ForestBehaviourEvent::DiscoveredPeer(peer_id)) + } + event => { + log::trace!("kad: {:?}", event); + } + } + } +} + impl NetworkBehaviourEventProcess for ForestBehaviour { fn inject_event(&mut self, message: GossipsubEvent) { if let GossipsubEvent::Message(_, _, message) = message { @@ -158,9 +177,25 @@ impl ForestBehaviour { Poll::Pending } - pub fn new(local_key: &Keypair) -> Self { + pub fn new(local_key: &Keypair, config: &Libp2pConfig) -> Self { let local_peer_id = local_key.public().into_peer_id(); let gossipsub_config = GossipsubConfig::default(); + + // Kademlia config + let store = MemoryStore::new(local_peer_id.to_owned()); + let mut kademlia = Kademlia::new(local_peer_id.to_owned(), store); + // TODO fix this (need peerIDs) + // for (peer_id, addr) in &config.bootstrap_peers { + // kademlia.add_address(&addr.parse().unwrap(), addr.parse().unwrap()); + // } + + // kademlia.add_address( + // &"12D3KooWKNF7vNFEhnvB45E9mw2B5z6t419W3ziZPLdUDVnLLKGs" + // .parse() + // .unwrap(), + // "/ip4/86.109.15.57/tcp/1347".parse().unwrap(), + // ); + ForestBehaviour { gossipsub: Gossipsub::new(local_peer_id, gossipsub_config), mdns: Mdns::new().expect("Could not start mDNS"), @@ -168,6 +203,7 @@ impl ForestBehaviour { identify: Identify::new("forest/libp2p".into(), "0.0.1".into(), local_key.public()), rpc: RPC::default(), events: vec![], + kademlia, } } diff --git a/node/forest_libp2p/src/service.rs b/node/forest_libp2p/src/service.rs index 3e869f241160..d7b4931c0770 100644 --- a/node/forest_libp2p/src/service.rs +++ b/node/forest_libp2p/src/service.rs @@ -70,7 +70,7 @@ impl Libp2pService { let transport = build_transport(net_keypair.clone()); let mut swarm = { - let be = ForestBehaviour::new(&net_keypair); + let be = ForestBehaviour::new(&net_keypair, config); Swarm::new(transport, be, peer_id) }; From 9e297709a995f4d7511fbe90613d4a4fed887df3 Mon Sep 17 00:00:00 2001 From: austinabell Date: Wed, 10 Jun 2020 21:26:17 -0400 Subject: [PATCH 2/6] Refactor RPC --- node/forest_libp2p/src/behaviour.rs | 4 +- node/forest_libp2p/src/rpc/handler.rs | 117 +++++++++++++++++++------ node/forest_libp2p/src/rpc/protocol.rs | 22 ++--- 3 files changed, 99 insertions(+), 44 deletions(-) diff --git a/node/forest_libp2p/src/behaviour.rs b/node/forest_libp2p/src/behaviour.rs index 7955a38fdeb3..f576d832936c 100644 --- a/node/forest_libp2p/src/behaviour.rs +++ b/node/forest_libp2p/src/behaviour.rs @@ -177,13 +177,13 @@ impl ForestBehaviour { Poll::Pending } - pub fn new(local_key: &Keypair, config: &Libp2pConfig) -> Self { + pub fn new(local_key: &Keypair, _config: &Libp2pConfig) -> Self { let local_peer_id = local_key.public().into_peer_id(); let gossipsub_config = GossipsubConfig::default(); // Kademlia config let store = MemoryStore::new(local_peer_id.to_owned()); - let mut kademlia = Kademlia::new(local_peer_id.to_owned(), store); + let kademlia = Kademlia::new(local_peer_id.to_owned(), store); // TODO fix this (need peerIDs) // for (peer_id, addr) in &config.bootstrap_peers { // kademlia.add_address(&addr.parse().unwrap(), addr.parse().unwrap()); diff --git a/node/forest_libp2p/src/rpc/handler.rs b/node/forest_libp2p/src/rpc/handler.rs index 300da96e8593..9a864167ff90 100644 --- a/node/forest_libp2p/src/rpc/handler.rs +++ b/node/forest_libp2p/src/rpc/handler.rs @@ -1,8 +1,8 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use super::protocol::{OutboundFramed, RPCInbound}; -use super::{InboundCodec, RPCError, RPCEvent, RPCRequest, RPCResponse, RequestId}; +use super::protocol::RPCInbound; +use super::{InboundCodec, OutboundCodec, RPCError, RPCEvent, RPCRequest, RPCResponse, RequestId}; use fnv::FnvHashMap; use futures::prelude::*; use futures_codec::Framed; @@ -11,7 +11,7 @@ use libp2p::swarm::{ ProtocolsHandlerUpgrErr, SubstreamProtocol, }; use libp2p::{InboundUpgrade, OutboundUpgrade}; -use log::error; +use log::{debug, error}; use smallvec::SmallVec; use std::{ pin::Pin, @@ -39,7 +39,7 @@ pub struct RPCHandler { dial_negotiated: u32, /// Map of current substreams awaiting a response to an RPC request. - inbound_substreams: FnvHashMap, + inbound_substreams: FnvHashMap, /// The vector of outbound substream states to progress. outbound_substreams: Vec, @@ -94,12 +94,21 @@ impl Default for RPCHandler { } } -/// An outbound substream is waiting a response from the user. -struct WaitingResponse { - /// The framed negotiated substream. - substream: Framed, - /// The time when the substream is closed. - timeout: Instant, +/// State of inbound substreams. +enum InboundSubstreamState { + /// Waiting for message from the remote. + WaitingInput(Framed), + /// An outbound substream is waiting a response from the user. + WaitingResponse { + /// The framed negotiated substream. + substream: Framed, + /// The time when the substream is closed. + timeout: Instant, + }, + /// Substream is being closed. + Closing(Framed), + /// Inserted to ensure no state remains unhandled. + Poisoned, } /// State of the outbound substream, opened either by us or by the remote. @@ -111,7 +120,7 @@ enum SubstreamState { }, /// Request has been sent, awaiting response PendingResponse { - substream: OutboundFramed, + substream: Framed, event: RPCEvent, timeout: Instant, }, @@ -131,20 +140,13 @@ impl ProtocolsHandler for RPCHandler { fn inject_fully_negotiated_inbound( &mut self, - out: >::Output, + substream: >::Output, ) { - let (req, substream) = out; - // New inbound request. Store the stream and tag the output. - let awaiting_stream = WaitingResponse { - substream, - timeout: Instant::now() + Duration::from_secs(RESPONSE_TIMEOUT), - }; + let awaiting_stream = InboundSubstreamState::WaitingInput(substream); self.inbound_substreams .insert(self.current_substream_id, awaiting_stream); - self.events_out - .push(RPCEvent::Request(self.current_substream_id, req)); self.current_substream_id += 1; } @@ -182,13 +184,15 @@ impl ProtocolsHandler for RPCHandler { fn inject_event(&mut self, event: Self::InEvent) { match event { RPCEvent::Request(_, _) => self.send_request(event), - RPCEvent::Response(rpc_id, res) => { + RPCEvent::Response(rpc_id, response) => { // check if the stream matching the response still exists - if let Some(waiting_stream) = self.inbound_substreams.remove(&rpc_id) { + if let Some(InboundSubstreamState::WaitingResponse { substream, .. }) = + self.inbound_substreams.remove(&rpc_id) + { // only send one response per stream. This must be in the waiting state self.outbound_substreams.push(SubstreamState::PendingSend { - substream: waiting_stream.substream, - response: res, + substream, + response, }); } } @@ -236,9 +240,72 @@ impl ProtocolsHandler for RPCHandler { self.events_out.shrink_to_fit(); } + let mut remove_list: Vec = Vec::new(); + for (req_id, state) in self.inbound_substreams.iter_mut() { + loop { + match std::mem::replace(state, InboundSubstreamState::Poisoned) { + InboundSubstreamState::WaitingInput(mut substream) => { + match substream.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(message))) => { + *state = InboundSubstreamState::WaitingResponse { + substream, + timeout: Instant::now() + Duration::from_secs(RESPONSE_TIMEOUT), + }; + return Poll::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Request(*req_id, message), + )); + } + Poll::Ready(Some(Err(e))) => { + debug!("Inbound substream error while awaiting input: {:?}", e); + *state = InboundSubstreamState::Closing(substream); + } + // peer closed the stream + Poll::Ready(None) => { + *state = InboundSubstreamState::Closing(substream); + } + Poll::Pending => { + *state = InboundSubstreamState::WaitingInput(substream); + break; + } + } + } + InboundSubstreamState::Closing(mut substream) => { + match Sink::poll_close(Pin::new(&mut substream), cx) { + Poll::Ready(res) => { + if let Err(e) = res { + // Don't close the connection but just drop the inbound substream. + // In case the remote has more to send, they will open up a new + // substream. + debug!("Inbound substream error while closing: {:?}", e); + } + remove_list.push(*req_id); + break; + } + Poll::Pending => { + *state = InboundSubstreamState::Closing(substream); + break; + } + } + } + InboundSubstreamState::Poisoned => { + panic!("Tried to process a poisoned substream state") + } + st @ InboundSubstreamState::WaitingResponse { .. } => { + *state = st; + break; + } + } + } + } + // remove expired inbound substreams self.inbound_substreams - .retain(|_, waiting_stream| Instant::now() <= waiting_stream.timeout); + .retain(|req_id, waiting_stream| match waiting_stream { + InboundSubstreamState::WaitingResponse { timeout, .. } => { + Instant::now() <= *timeout + } + _ => !remove_list.contains(&req_id), + }); // drive streams that need to be processed for n in (0..self.outbound_substreams.len()).rev() { diff --git a/node/forest_libp2p/src/rpc/protocol.rs b/node/forest_libp2p/src/rpc/protocol.rs index 2c76c942830c..4a94e4a623c6 100644 --- a/node/forest_libp2p/src/rpc/protocol.rs +++ b/node/forest_libp2p/src/rpc/protocol.rs @@ -6,8 +6,8 @@ use crate::blocksync::{BlockSyncRequest, BlockSyncResponse, BLOCKSYNC_PROTOCOL_I use crate::hello::{HelloMessage, HelloResponse, HELLO_PROTOCOL_ID}; use bytes::BytesMut; use futures::prelude::*; -use futures::{AsyncRead, AsyncReadExt, AsyncWrite}; -use futures_codec::{Decoder, Encoder, Framed}; +use futures::{AsyncRead, AsyncWrite}; +use futures_codec::{Encoder, Framed}; use libp2p::core::UpgradeInfo; use libp2p::swarm::NegotiatedSubstream; use libp2p::{InboundUpgrade, OutboundUpgrade}; @@ -33,27 +33,17 @@ impl UpgradeInfo for RPCInbound { } } -pub(crate) type InboundFramed = Framed; -pub(crate) type InboundOutput = (RPCRequest, InboundFramed); - impl InboundUpgrade for RPCInbound where TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static, { - type Output = InboundOutput; + type Output = Framed; type Error = RPCError; #[allow(clippy::type_complexity)] type Future = Pin> + Send>>; - fn upgrade_inbound(self, mut socket: TSocket, protocol: Self::Info) -> Self::Future { - Box::pin(async move { - let mut buf = Vec::default(); - socket.read_to_end(&mut buf).await?; - let mut bm = BytesMut::from(&buf[..]); - let mut codec = InboundCodec::new(protocol); - let req = codec.decode(&mut bm)?.unwrap(); - Ok((req, Framed::new(socket, codec))) - }) + fn upgrade_inbound(self, socket: TSocket, protocol: Self::Info) -> Self::Future { + Box::pin(future::ok(Framed::new(socket, InboundCodec::new(protocol)))) } } @@ -88,8 +78,6 @@ impl RPCRequest { } } -pub(crate) type OutboundFramed = Framed; - impl OutboundUpgrade for RPCRequest where TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static, From 509da5fa5a1d9e6b4151702138c2f02e4bad9eab Mon Sep 17 00:00:00 2001 From: austinabell Date: Mon, 15 Jun 2020 19:16:10 -0400 Subject: [PATCH 3/6] Setup kademlia and refactor libp2p crate --- blockchain/chain_sync/src/sync.rs | 2 +- forest/src/logger/mod.rs | 1 + forest/src/main.rs | 5 +- node/forest_libp2p/Cargo.toml | 1 + node/forest_libp2p/src/behaviour.rs | 95 +++++++++++++++++--------- node/forest_libp2p/src/config.rs | 47 +++++++------ node/forest_libp2p/src/rpc/handler.rs | 67 +++++++++--------- node/forest_libp2p/src/rpc/protocol.rs | 1 - node/forest_libp2p/src/service.rs | 40 ++++------- 9 files changed, 135 insertions(+), 124 deletions(-) diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index e356d263c5c4..b46d80ce3709 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -857,7 +857,7 @@ where // TODO change from using random peerID to managed let peer_id = PeerId::random(); // pulled from Lotus: https://github.com/filecoin-project/lotus/blob/master/chain/sync.go#L996 - const FORK_LENGTH_THRESHOLD: u64 = 500; + const FORK_LENGTH_THRESHOLD: u64 = 1; // Load blocks from network using blocksync let tips: Vec = self diff --git a/forest/src/logger/mod.rs b/forest/src/logger/mod.rs index 7c4670a219e6..f571d81576a3 100644 --- a/forest/src/logger/mod.rs +++ b/forest/src/logger/mod.rs @@ -10,6 +10,7 @@ pub(crate) fn setup_logger() { logger_builder.parse_filters(&s); } else { // If no ENV variable specified, default to info + logger_builder.filter(Some("libp2p_gossipsub"), LevelFilter::Warn); logger_builder.filter(None, LevelFilter::Info); } let logger = logger_builder.build(); diff --git a/forest/src/main.rs b/forest/src/main.rs index cdf60c97ba2b..8b0c3df037de 100644 --- a/forest/src/main.rs +++ b/forest/src/main.rs @@ -24,7 +24,7 @@ fn main() { // Capture CLI inputs let cli = cli::CLI::from_args(); - let mut config = cli.get_config().expect("CLI error"); + let config = cli.get_config().expect("CLI error"); let net_keypair = match get_keypair(&format!("{}{}", &config.data_dir, "/libp2p/keypair")) { Some(kp) => kp, @@ -56,8 +56,7 @@ fn main() { initialize_genesis(&config.genesis_file, &mut chain_store).unwrap(); // Libp2p service setup - config.network.set_network_name(&network_name); - let p2p_service = Libp2pService::new(&config.network, net_keypair); + let p2p_service = Libp2pService::new(config.network, net_keypair, &network_name); let network_rx = p2p_service.network_receiver(); let network_send = p2p_service.network_sender(); diff --git a/node/forest_libp2p/Cargo.toml b/node/forest_libp2p/Cargo.toml index bcc42de3fbcb..5f28055e5deb 100644 --- a/node/forest_libp2p/Cargo.toml +++ b/node/forest_libp2p/Cargo.toml @@ -22,6 +22,7 @@ fnv = "1.0.6" smallvec = "1.1.0" clock = { path = "../clock" } num-bigint = { path = "../../utils/bigint", package = "forest_bigint" } +hex = "0.4.2" [dev-dependencies] forest_address = { path = "../../vm/address" } diff --git a/node/forest_libp2p/src/behaviour.rs b/node/forest_libp2p/src/behaviour.rs index f576d832936c..1b5e9f00afca 100644 --- a/node/forest_libp2p/src/behaviour.rs +++ b/node/forest_libp2p/src/behaviour.rs @@ -8,21 +8,24 @@ use libp2p::core::PeerId; use libp2p::gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, Topic, TopicHash}; use libp2p::identify::{Identify, IdentifyEvent}; use libp2p::kad::record::store::MemoryStore; -use libp2p::kad::{Kademlia, KademliaEvent}; +use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, QueryId}; use libp2p::mdns::{Mdns, MdnsEvent}; +use libp2p::multiaddr::Protocol; use libp2p::ping::{ handler::{PingFailure, PingSuccess}, Ping, PingEvent, }; use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}; use libp2p::NetworkBehaviour; -use log::{debug, warn}; +use log::{debug, trace, warn}; +use std::collections::HashSet; use std::{task::Context, task::Poll}; #[derive(NetworkBehaviour)] #[behaviour(out_event = "ForestBehaviourEvent", poll_method = "poll")] pub struct ForestBehaviour { gossipsub: Gossipsub, + // TODO configure to allow turning mdns off mdns: Mdns, ping: Ping, identify: Identify, @@ -30,14 +33,14 @@ pub struct ForestBehaviour { kademlia: Kademlia, #[behaviour(ignore)] events: Vec, + #[behaviour(ignore)] + peers: HashSet, } #[derive(Debug)] pub enum ForestBehaviourEvent { PeerDialed(PeerId), PeerDisconnected(PeerId), - DiscoveredPeer(PeerId), - ExpiredPeer(PeerId), GossipMessage { source: PeerId, topics: Vec, @@ -51,13 +54,14 @@ impl NetworkBehaviourEventProcess for ForestBehaviour { match event { MdnsEvent::Discovered(list) => { for (peer, _) in list { - self.events.push(ForestBehaviourEvent::DiscoveredPeer(peer)) + trace!("mdns: Discovered peer {}", peer.to_base58()); + self.add_peer(peer); } } MdnsEvent::Expired(list) => { for (peer, _) in list { if !self.mdns.has_node(&peer) { - self.events.push(ForestBehaviourEvent::ExpiredPeer(peer)) + self.remove_peer(&peer); } } } @@ -68,13 +72,11 @@ impl NetworkBehaviourEventProcess for ForestBehaviour { impl NetworkBehaviourEventProcess for ForestBehaviour { fn inject_event(&mut self, event: KademliaEvent) { match event { - KademliaEvent::Discovered { peer_id, ty, .. } => { - log::info!("kad: Discovered peer {} {:?}", peer_id.to_base58(), ty); - self.events - .push(ForestBehaviourEvent::DiscoveredPeer(peer_id)) + KademliaEvent::Discovered { peer_id, .. } => { + self.add_peer(peer_id); } event => { - log::trace!("kad: {:?}", event); + trace!("kad: {:?}", event); } } } @@ -96,14 +98,14 @@ impl NetworkBehaviourEventProcess for ForestBehaviour { fn inject_event(&mut self, event: PingEvent) { match event.result { Result::Ok(PingSuccess::Ping { rtt }) => { - debug!( + trace!( "PingSuccess::Ping rtt to {} is {} ms", event.peer.to_base58(), rtt.as_millis() ); } Result::Ok(PingSuccess::Pong) => { - debug!("PingSuccess::Pong from {}", event.peer.to_base58()); + trace!("PingSuccess::Pong from {}", event.peer.to_base58()); } Result::Err(PingFailure::Timeout) => { debug!("PingFailure::Timeout {}", event.peer.to_base58()); @@ -123,12 +125,12 @@ impl NetworkBehaviourEventProcess for ForestBehaviour { info, observed_addr, } => { - debug!("Identified Peer {:?}", peer_id); - debug!("protocol_version {:}?", info.protocol_version); - debug!("agent_version {:?}", info.agent_version); - debug!("listening_ addresses {:?}", info.listen_addrs); - debug!("observed_address {:?}", observed_addr); - debug!("protocols {:?}", info.protocols); + trace!("Identified Peer {}", peer_id); + trace!("protocol_version {}", info.protocol_version); + trace!("agent_version {}", info.agent_version); + trace!("listening_ addresses {:?}", info.listen_addrs); + trace!("observed_address {}", observed_addr); + trace!("protocols {:?}", info.protocols); } IdentifyEvent::Sent { .. } => (), IdentifyEvent::Error { .. } => (), @@ -177,36 +179,51 @@ impl ForestBehaviour { Poll::Pending } - pub fn new(local_key: &Keypair, _config: &Libp2pConfig) -> Self { + pub fn new(local_key: &Keypair, config: &Libp2pConfig, network_name: &str) -> Self { let local_peer_id = local_key.public().into_peer_id(); let gossipsub_config = GossipsubConfig::default(); // Kademlia config let store = MemoryStore::new(local_peer_id.to_owned()); - let kademlia = Kademlia::new(local_peer_id.to_owned(), store); - // TODO fix this (need peerIDs) - // for (peer_id, addr) in &config.bootstrap_peers { - // kademlia.add_address(&addr.parse().unwrap(), addr.parse().unwrap()); - // } - - // kademlia.add_address( - // &"12D3KooWKNF7vNFEhnvB45E9mw2B5z6t419W3ziZPLdUDVnLLKGs" - // .parse() - // .unwrap(), - // "/ip4/86.109.15.57/tcp/1347".parse().unwrap(), - // ); + let mut kad_config = KademliaConfig::default(); + let network = format!("/fil/kad/{}/kad/1.0.0", network_name); + kad_config.set_protocol_name(network.as_bytes().to_vec()); + let mut kademlia = Kademlia::with_config(local_peer_id.to_owned(), store, kad_config); + for multiaddr in config.bootstrap_peers.iter() { + let mut addr = multiaddr.to_owned(); + if let Some(Protocol::P2p(mh)) = addr.pop() { + let peer_id = PeerId::from_multihash(mh).unwrap(); + kademlia.add_address(&peer_id, addr); + } else { + warn!("Could not add addr {} to Kademlia DHT", multiaddr) + } + } + if let Err(e) = kademlia.bootstrap() { + warn!("Kademlia bootstrap failed: {}", e); + } ForestBehaviour { gossipsub: Gossipsub::new(local_peer_id, gossipsub_config), mdns: Mdns::new().expect("Could not start mDNS"), ping: Ping::default(), - identify: Identify::new("forest/libp2p".into(), "0.0.1".into(), local_key.public()), + identify: Identify::new( + "ipfs/0.1.0".into(), + // TODO update to include actual version + format!("forest-{}", "0.1.0"), + local_key.public(), + ), + kademlia, rpc: RPC::default(), events: vec![], - kademlia, + peers: Default::default(), } } + /// Bootstrap Kademlia network + pub fn bootstrap(&mut self) -> Result { + self.kademlia.bootstrap().map_err(|e| e.to_string()) + } + /// Publish data over the gossip network. pub fn publish(&mut self, topic: &Topic, data: impl Into>) { self.gossipsub.publish(topic, data); @@ -221,4 +238,14 @@ impl ForestBehaviour { pub fn send_rpc(&mut self, peer_id: PeerId, req: RPCEvent) { self.rpc.send_rpc(peer_id, req); } + + /// Adds peer to the peer set. + pub fn add_peer(&mut self, peer_id: PeerId) { + self.peers.insert(peer_id); + } + + /// Adds peer to the peer set. + pub fn remove_peer(&mut self, peer_id: &PeerId) { + self.peers.remove(peer_id); + } } diff --git a/node/forest_libp2p/src/config.rs b/node/forest_libp2p/src/config.rs index 373fa319573a..6be6b12f53ba 100644 --- a/node/forest_libp2p/src/config.rs +++ b/node/forest_libp2p/src/config.rs @@ -1,37 +1,40 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use libp2p::gossipsub::Topic; +use libp2p::Multiaddr; use serde::Deserialize; +const DEFAULT_BOOTSTRAP: &[&str] = &[ + "/dns4/bootstrap-0-sin.fil-test.net/tcp/1347/p2p/12D3KooWKNF7vNFEhnvB45E9mw2B5z6t419W3ziZPLdUDVnLLKGs", + "/ip4/86.109.15.57/tcp/1347/p2p/12D3KooWKNF7vNFEhnvB45E9mw2B5z6t419W3ziZPLdUDVnLLKGs", + "/dns4/bootstrap-0-dfw.fil-test.net/tcp/1347/p2p/12D3KooWECJTm7RUPyGfNbRwm6y2fK4wA7EB8rDJtWsq5AKi7iDr", + "/ip4/139.178.84.45/tcp/1347/p2p/12D3KooWECJTm7RUPyGfNbRwm6y2fK4wA7EB8rDJtWsq5AKi7iDr", + "/dns4/bootstrap-0-fra.fil-test.net/tcp/1347/p2p/12D3KooWC7MD6m7iNCuDsYtNr7xVtazihyVUizBbhmhEiyMAm9ym", + "/ip4/136.144.49.17/tcp/1347/p2p/12D3KooWC7MD6m7iNCuDsYtNr7xVtazihyVUizBbhmhEiyMAm9ym", + "/dns4/bootstrap-1-sin.fil-test.net/tcp/1347/p2p/12D3KooWD8eYqsKcEMFax6EbWN3rjA7qFsxCez2rmN8dWqkzgNaN", + "/ip4/86.109.15.55/tcp/1347/p2p/12D3KooWD8eYqsKcEMFax6EbWN3rjA7qFsxCez2rmN8dWqkzgNaN", + "/dns4/bootstrap-1-dfw.fil-test.net/tcp/1347/p2p/12D3KooWLB3RR8frLAmaK4ntHC2dwrAjyGzQgyUzWxAum1FxyyqD", + "/ip4/139.178.84.41/tcp/1347/p2p/12D3KooWLB3RR8frLAmaK4ntHC2dwrAjyGzQgyUzWxAum1FxyyqD", + "/dns4/bootstrap-1-fra.fil-test.net/tcp/1347/p2p/12D3KooWGPDJAw3HW4uVU3JEQBfFaZ1kdpg4HvvwRMVpUYbzhsLQ", + "/ip4/136.144.49.131/tcp/1347/p2p/12D3KooWGPDJAw3HW4uVU3JEQBfFaZ1kdpg4HvvwRMVpUYbzhsLQ", +]; + #[derive(Debug, Deserialize)] #[serde(default)] pub struct Libp2pConfig { - pub listening_multiaddr: String, - pub bootstrap_peers: Vec, - #[serde(skip_deserializing)] // Always use default - pub pubsub_topics: Vec, -} - -impl Libp2pConfig { - /// Sets the pubsub topics to the network name provided - pub fn set_network_name(&mut self, s: &str) { - self.pubsub_topics = vec![ - Topic::new(format!("/fil/blocks/{}", s)), - Topic::new(format!("/fil/msgs/{}", s)), - ] - } + pub listening_multiaddr: Multiaddr, + pub bootstrap_peers: Vec, } impl Default for Libp2pConfig { fn default() -> Self { - Libp2pConfig { - listening_multiaddr: "/ip4/0.0.0.0/tcp/0".to_owned(), - pubsub_topics: vec![ - Topic::new("/fil/blocks/interop".to_owned()), - Topic::new("/fil/msgs/interop".to_owned()), - ], - bootstrap_peers: vec![], + let bootstrap_peers = DEFAULT_BOOTSTRAP + .iter() + .map(|node| node.parse().unwrap()) + .collect(); + Self { + listening_multiaddr: "/ip4/0.0.0.0/tcp/0".parse().unwrap(), + bootstrap_peers, } } } diff --git a/node/forest_libp2p/src/rpc/handler.rs b/node/forest_libp2p/src/rpc/handler.rs index 9a864167ff90..07e36f922aa6 100644 --- a/node/forest_libp2p/src/rpc/handler.rs +++ b/node/forest_libp2p/src/rpc/handler.rs @@ -20,7 +20,7 @@ use std::{ }; /// The time (in seconds) before a substream that is awaiting a response from the user times out. -pub const RESPONSE_TIMEOUT: u64 = 10; +pub const RESPONSE_TIMEOUT: u64 = 20; pub struct RPCHandler { /// Upgrade configuration for RPC protocol. @@ -350,43 +350,40 @@ impl ProtocolsHandler for RPCHandler { mut substream, event, timeout, - } => match substream.poll_next_unpin(cx) { - Poll::Ready(response) => { - match response { - Some(Ok(response)) => { - return Poll::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Response(event.id(), response), - )); - } - Some(Err(err)) => { - return Poll::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error(event.id(), RPCError::Custom(err.to_string())), - )); - } - None => { - // stream closed early or nothing was sent - return Poll::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error( - event.id(), - RPCError::Custom( - "Stream closed early. Empty response".to_owned(), - ), - ), - )); - } + } => { + // std::thread::sleep(std::time::Duration::from_secs(5)); + match substream.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(response))) => { + return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Response( + event.id(), + response, + ))); } - } - Poll::Pending => { - if Instant::now() < timeout { - self.outbound_substreams - .push(SubstreamState::PendingResponse { - substream, - event, - timeout, - }); + Poll::Ready(Some(Err(err))) => { + return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Error( + event.id(), + RPCError::Custom(err.to_string()), + ))); + } + Poll::Ready(None) => { + // stream closed early or nothing was sent + return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Error( + event.id(), + RPCError::Custom("Stream closed early. Empty response".to_owned()), + ))); + } + Poll::Pending => { + if Instant::now() < timeout { + self.outbound_substreams + .push(SubstreamState::PendingResponse { + substream, + event, + timeout, + }); + } } } - }, + } } } diff --git a/node/forest_libp2p/src/rpc/protocol.rs b/node/forest_libp2p/src/rpc/protocol.rs index 4a94e4a623c6..2cb5696db1a6 100644 --- a/node/forest_libp2p/src/rpc/protocol.rs +++ b/node/forest_libp2p/src/rpc/protocol.rs @@ -9,7 +9,6 @@ use futures::prelude::*; use futures::{AsyncRead, AsyncWrite}; use futures_codec::{Encoder, Framed}; use libp2p::core::UpgradeInfo; -use libp2p::swarm::NegotiatedSubstream; use libp2p::{InboundUpgrade, OutboundUpgrade}; use std::pin::Pin; diff --git a/node/forest_libp2p/src/service.rs b/node/forest_libp2p/src/service.rs index d7b4931c0770..82944efc5ca7 100644 --- a/node/forest_libp2p/src/service.rs +++ b/node/forest_libp2p/src/service.rs @@ -21,6 +21,8 @@ use std::io::{Error, ErrorKind}; use std::time::Duration; use utils::read_file_to_vec; +const PUBSUB_TOPICS: [&str; 2] = ["/fil/blocks", "/fil/msgs"]; + /// Events emitted by this Service #[derive(Clone, Debug)] pub enum NetworkEvent { @@ -64,37 +66,26 @@ pub struct Libp2pService { impl Libp2pService { /// Constructs a Libp2pService - pub fn new(config: &Libp2pConfig, net_keypair: Keypair) -> Self { + pub fn new(config: Libp2pConfig, net_keypair: Keypair, network_name: &str) -> Self { let peer_id = PeerId::from(net_keypair.public()); let transport = build_transport(net_keypair.clone()); let mut swarm = { - let be = ForestBehaviour::new(&net_keypair, config); + let be = ForestBehaviour::new(&net_keypair, &config, network_name); Swarm::new(transport, be, peer_id) }; - for node in &config.bootstrap_peers { - match node.parse() { - Ok(to_dial) => match Swarm::dial_addr(&mut swarm, to_dial) { - Ok(_) => debug!("Dialed {:?}", node), - Err(e) => warn!("Dial {:?} failed: {:?}", node, e), - }, - Err(err) => warn!("Failed to parse address to dial: {:?}", err), - } + Swarm::listen_on(&mut swarm, config.listening_multiaddr).unwrap(); + + // Subscribe to gossipsub topics with the network name suffix + for topic in PUBSUB_TOPICS.iter() { + swarm.subscribe(Topic::new(format!("{}/{}", topic, network_name))); } - Swarm::listen_on( - &mut swarm, - config - .listening_multiaddr - .parse() - .expect("Incorrect MultiAddr Format"), - ) - .unwrap(); - - for topic in config.pubsub_topics.clone() { - swarm.subscribe(topic); + // Bootstrap with Kademlia + if let Err(e) = swarm.bootstrap() { + warn!("Failed to bootstrap with Kademlia: {}", e); } let (network_sender_in, network_receiver_in) = channel(20); @@ -126,13 +117,6 @@ impl Libp2pService { ForestBehaviourEvent::PeerDisconnected(peer_id) => { debug!("Peer disconnected, {:?}", peer_id); } - ForestBehaviourEvent::DiscoveredPeer(peer) => { - debug!("Discovered: {:?}", peer); - if let Err(e) = libp2p::Swarm::dial(&mut swarm_stream.get_mut(), &peer) { - warn!("failed to dial peer: {:?}", peer); - } - } - ForestBehaviourEvent::ExpiredPeer(_) => {} ForestBehaviourEvent::GossipMessage { source, topics, From 57ddc29bd7e803eb49f487eb3e24ce743a9a3705 Mon Sep 17 00:00:00 2001 From: austinabell Date: Mon, 15 Jun 2020 20:19:16 -0400 Subject: [PATCH 4/6] cleanup --- node/forest_libp2p/Cargo.toml | 1 - node/forest_libp2p/src/rpc/handler.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/node/forest_libp2p/Cargo.toml b/node/forest_libp2p/Cargo.toml index 5f28055e5deb..bcc42de3fbcb 100644 --- a/node/forest_libp2p/Cargo.toml +++ b/node/forest_libp2p/Cargo.toml @@ -22,7 +22,6 @@ fnv = "1.0.6" smallvec = "1.1.0" clock = { path = "../clock" } num-bigint = { path = "../../utils/bigint", package = "forest_bigint" } -hex = "0.4.2" [dev-dependencies] forest_address = { path = "../../vm/address" } diff --git a/node/forest_libp2p/src/rpc/handler.rs b/node/forest_libp2p/src/rpc/handler.rs index 07e36f922aa6..cc651b1eed9d 100644 --- a/node/forest_libp2p/src/rpc/handler.rs +++ b/node/forest_libp2p/src/rpc/handler.rs @@ -351,7 +351,7 @@ impl ProtocolsHandler for RPCHandler { event, timeout, } => { - // std::thread::sleep(std::time::Duration::from_secs(5)); + // TODO fix polling for response (polls partial written bytes in delayed cases) match substream.poll_next_unpin(cx) { Poll::Ready(Some(Ok(response))) => { return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Response( From 44b46410d96dd46110c73a30093f08d9bf3f83ff Mon Sep 17 00:00:00 2001 From: austinabell Date: Mon, 15 Jun 2020 21:08:16 -0400 Subject: [PATCH 5/6] Cleanup auxiliary things --- README.md | 2 +- blockchain/chain_sync/src/sync.rs | 33 ++++++++++++++------------- node/forest_libp2p/src/rpc/handler.rs | 4 ++-- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 67636106f46c..fadb6e63cc96 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ listening_multiaddr = "" bootstrap_peers = [""] ``` -Example of a [multiaddress](https://github.com/multiformats/multiaddr): `"/ip4/54.186.82.90/tcp/1347"` +Example of a [multiaddress](https://github.com/multiformats/multiaddr): `"/ip4/54.186.82.90/tcp/1347/p2p/12D3K1oWKNF7vNFEhnvB45E9mw2B5z6t419W3ziZPLdUDVnLLKGs"` ### Logging diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index 588766a5018b..d739dc7e9ba8 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -848,17 +848,7 @@ where // update sync state to Bootstrap indicating we are acquiring a 'secure enough' set of peers self.set_state(SyncState::Bootstrap); - // TODO change from using random peerID to managed - while self.peer_manager.is_empty().await { - warn!("No valid peers to sync, waiting for other nodes"); - task::sleep(Duration::from_secs(5)).await; - } - - let peer_id = self - .peer_manager - .get_peer() - .await - .expect("Peer set is not empty here"); + let peer_id = self.get_peer().await; // checkpoint established self.set_state(SyncState::Checkpoint); @@ -934,15 +924,14 @@ where } /// fork detected, collect tipsets to be included in return_set sync_headers_reverse async fn sync_fork(&mut self, head: &Tipset, to: &Tipset) -> Result, Error> { - // TODO change from using random peerID to managed - let peer_id = PeerId::random(); - // pulled from Lotus: https://github.com/filecoin-project/lotus/blob/master/chain/sync.go#L996 - const FORK_LENGTH_THRESHOLD: u64 = 1; + let peer_id = self.get_peer().await; + // TODO move to shared parameter (from actors crate most likely) + const FORK_LENGTH_THRESHOLD: u64 = 500; // Load blocks from network using blocksync let tips: Vec = self .network - .blocksync_headers(peer_id.clone(), head.parents(), FORK_LENGTH_THRESHOLD) + .blocksync_headers(peer_id, head.parents(), FORK_LENGTH_THRESHOLD) .await .map_err(|_| Error::Other("Could not retrieve tipset".to_string()))?; @@ -982,6 +971,18 @@ where pub fn set_state(&mut self, new_state: SyncState) { self.state = new_state } + + async fn get_peer(&self) -> PeerId { + while self.peer_manager.is_empty().await { + warn!("No valid peers to sync, waiting for other nodes"); + task::sleep(Duration::from_secs(5)).await; + } + + self.peer_manager + .get_peer() + .await + .expect("Peer set is not empty here") + } } /// Returns message root CID from bls and secp message contained in the param Block diff --git a/node/forest_libp2p/src/rpc/handler.rs b/node/forest_libp2p/src/rpc/handler.rs index cc651b1eed9d..af4e9dc75cba 100644 --- a/node/forest_libp2p/src/rpc/handler.rs +++ b/node/forest_libp2p/src/rpc/handler.rs @@ -11,7 +11,7 @@ use libp2p::swarm::{ ProtocolsHandlerUpgrErr, SubstreamProtocol, }; use libp2p::{InboundUpgrade, OutboundUpgrade}; -use log::{debug, error}; +use log::debug; use smallvec::SmallVec; use std::{ pin::Pin, @@ -230,7 +230,7 @@ impl ProtocolsHandler for RPCHandler { > { if let Some(err) = self.pending_error.take() { // Log error, shouldn't necessarily return error and drop peer here - error!("{}", err); + debug!("{}", err); } // return any events that need to be reported From 76ea8470d69c48d094c53032fb1a7bf7c83eb1fc Mon Sep 17 00:00:00 2001 From: austinabell Date: Tue, 16 Jun 2020 08:44:48 -0400 Subject: [PATCH 6/6] Add interval timer for logging peer count --- node/forest_libp2p/src/behaviour.rs | 5 +++++ node/forest_libp2p/src/service.rs | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/node/forest_libp2p/src/behaviour.rs b/node/forest_libp2p/src/behaviour.rs index 1b5e9f00afca..acf9d6a91ed3 100644 --- a/node/forest_libp2p/src/behaviour.rs +++ b/node/forest_libp2p/src/behaviour.rs @@ -248,4 +248,9 @@ impl ForestBehaviour { pub fn remove_peer(&mut self, peer_id: &PeerId) { self.peers.remove(peer_id); } + + /// Adds peer to the peer set. + pub fn peers(&self) -> &HashSet { + &self.peers + } } diff --git a/node/forest_libp2p/src/service.rs b/node/forest_libp2p/src/service.rs index 82944efc5ca7..f4200cdd1f92 100644 --- a/node/forest_libp2p/src/service.rs +++ b/node/forest_libp2p/src/service.rs @@ -5,6 +5,7 @@ use super::blocksync::BlockSyncResponse; use super::hello::HelloMessage; use super::rpc::{RPCEvent, RPCRequest, RPCResponse}; use super::{ForestBehaviour, ForestBehaviourEvent, Libp2pConfig}; +use async_std::stream; use async_std::sync::{channel, Receiver, Sender}; use futures::select; use futures_util::stream::StreamExt; @@ -103,6 +104,7 @@ impl Libp2pService { pub async fn run(self) { let mut swarm_stream = self.swarm.fuse(); let mut network_stream = self.network_receiver_in.fuse(); + let mut interval = stream::interval(Duration::from_secs(10)).fuse(); loop { select! { @@ -166,6 +168,9 @@ impl Libp2pService { } } None => {break;} + }, + interval_event = interval.next() => if interval_event.is_some() { + info!("Peers connected: {}", swarm_stream.get_ref().peers().len()); } }; }