Skip to content

Commit

Permalink
[network] Spawn network worker future when service run, and use memor…
Browse files Browse the repository at this point in the history
…y transport in test.
  • Loading branch information
jolestar committed Dec 1, 2020
1 parent 1f46650 commit baf836e
Show file tree
Hide file tree
Showing 10 changed files with 269 additions and 234 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion commons/service-registry/src/handler_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ where

fn stop(&mut self, ctx: &mut ServiceContext<S>) -> Result<()> {
if self.status().is_stopped() {
warn!("Service {} has bean stopped", S::service_name());
info!("Service {} has bean stopped", S::service_name());
return Ok(());
}
let service = self.service.take();
Expand Down
2 changes: 1 addition & 1 deletion commons/service-registry/src/service_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ where
fn handle(&mut self, msg: EventMessage<M>, ctx: &mut Self::Context) -> Self::Result {
debug!("{} handle event: {:?}", S::service_name(), &msg.msg);
if self.proxy.status().is_stopped() {
error!("Service {} is stopped", S::service_name());
info!("Service {} is already stopped", S::service_name());
return;
}
let mut service_ctx = ServiceContext::new(&mut self.cache, ctx);
Expand Down
26 changes: 18 additions & 8 deletions config/src/network_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
};
use anyhow::{bail, format_err, Result};
use network_p2p_types::{
is_memory_addr, memory_addr,
multiaddr::{Multiaddr, Protocol},
MultiaddrWithPeerId,
};
Expand Down Expand Up @@ -58,11 +59,14 @@ impl NetworkConfig {

fn prepare_peer_id(&mut self) {
let peer_id = PeerId::from_ed25519_public_key(self.network_keypair().public_key.clone());
let host = self
.listen
.clone()
.replace(0, |_p| Some(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))))
.expect("Replace multi address fail.");
let host = if is_memory_addr(&self.listen) {
self.listen.clone()
} else {
self.listen
.clone()
.replace(0, |_p| Some(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))))
.expect("Replace multi address fail.")
};
self.self_address = Some(MultiaddrWithPeerId::new(host, peer_id.clone().into()));
self.self_peer_id = Some(peer_id);
}
Expand Down Expand Up @@ -103,10 +107,16 @@ impl ConfigModule for NetworkConfig {
} else {
DEFAULT_NETWORK_PORT
};
Ok(Self {
listen: format!("/ip4/0.0.0.0/tcp/{}", port)
//test env use in memory transport.
let listen = if base.net.is_test() {
memory_addr(port as u64)
} else {
format!("/ip4/0.0.0.0/tcp/{}", port)
.parse()
.expect("Parse multi address fail."),
.expect("Parse multi address fail.")
};
Ok(Self {
listen,
seeds,
network_keypair: Some(Arc::new(Self::load_or_generate_keypair(opt, base)?)),
self_peer_id: None,
Expand Down
1 change: 1 addition & 0 deletions network-p2p/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ anyhow = "1.0.34"
bitflags = "1.2.0"
bytes = "0.5.0"
derive_more = "0.99.11"
rand = "0.7.3"
scs = { package="starcoin-canonical-serialization", path = "../../commons/scs"}
serde = { version = "1.0", features = ["derive"] }
libp2p = { version = "0.30.1", default-features = false, features = ["request-response"] }
Expand Down
29 changes: 28 additions & 1 deletion network-p2p/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use std::fmt;
use std::str::FromStr;

pub use libp2p::core::{identity, multiaddr, Multiaddr, PeerId, PublicKey};
pub use libp2p::multihash;
pub use libp2p::request_response::{InboundFailure, OutboundFailure};
pub use libp2p::{build_multiaddr, multihash};

/// Parses a string address and splits it into Multiaddress and PeerId, if
/// valid.
Expand Down Expand Up @@ -42,6 +42,22 @@ pub fn parse_addr(mut addr: Multiaddr) -> Result<(PeerId, Multiaddr), ParseErr>
Ok((who, addr))
}

/// Build memory protocol Multiaddr by port
pub fn memory_addr(port: u64) -> Multiaddr {
build_multiaddr!(Memory(port))
}

/// Generate a random memory protocol Multiaddr
pub fn random_memory_addr() -> Multiaddr {
memory_addr(rand::random::<u64>())
}

/// Check the address is a memory protocol Multiaddr.
pub fn is_memory_addr(addr: &Multiaddr) -> bool {
addr.iter()
.any(|protocol| matches!(protocol, libp2p::core::multiaddr::Protocol::Memory(_)))
}

/// Address of a node, including its identity.
///
/// This struct represents a decoded version of a multiaddress that ends with `/p2p/<peerid>`.
Expand Down Expand Up @@ -187,3 +203,14 @@ pub struct ProtocolRequest {
pub protocol: Cow<'static, str>,
pub request: IncomingRequest,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_memory_address() {
let addr = random_memory_addr();
assert!(is_memory_addr(&addr));
}
}
87 changes: 52 additions & 35 deletions network/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use network_p2p::{
identity, Event, Multiaddr, NetworkConfiguration, NetworkService, NetworkWorker, NodeKeyConfig,
Params, ProtocolId, Secret,
};
use network_p2p_types::{PeerId, ProtocolRequest, RequestFailure};
use network_p2p_types::{is_memory_addr, PeerId, ProtocolRequest, RequestFailure};
use prometheus::{default_registry, Registry};
use starcoin_network_rpc::NetworkRpcService;
use starcoin_service_registry::ServiceRef;
Expand All @@ -25,8 +25,11 @@ use types::startup_info::{ChainInfo, ChainStatus};

#[derive(Clone)]
pub struct SNetworkService {
inner: NetworkInner,
service: Arc<NetworkService>,
protocol: ProtocolId,
chain_info: ChainInfo,
cfg: NetworkConfiguration,
metrics_registry: Option<Registry>,
service: Option<Arc<NetworkService>>,
net_tx: Option<mpsc::UnboundedSender<NetworkMessage>>,
}

Expand All @@ -42,18 +45,12 @@ impl SNetworkService {
cfg: NetworkConfiguration,
metrics_registry: Option<Registry>,
) -> Self {
let worker =
NetworkWorker::new(Params::new(cfg, protocol, chain_info, metrics_registry)).unwrap();
let service = worker.service().clone();
let worker = worker;

async_std::task::spawn(worker);

let inner = NetworkInner::new(service.clone());

Self {
inner,
service,
protocol,
chain_info,
cfg,
metrics_registry,
service: None,
net_tx: None,
}
}
Expand All @@ -70,21 +67,28 @@ impl SNetworkService {
let (tx, net_rx) = mpsc::unbounded();
let (net_tx, rx) = mpsc::unbounded::<NetworkMessage>();
let (event_tx, event_rx) = mpsc::unbounded::<PeerEvent>();
let inner = self.inner.clone();

let worker = NetworkWorker::new(Params::new(
self.cfg.clone(),
self.protocol.clone(),
self.chain_info.clone(),
self.metrics_registry.clone(),
))
.unwrap();
self.net_tx = Some(net_tx.clone());

async_std::task::spawn(Self::start_network(inner, tx, rx, event_tx, close_rx));
self.service = Some(worker.service().clone());
async_std::task::spawn(Self::start_network(worker, tx, rx, event_tx, close_rx));
(net_tx, net_rx, event_rx, close_tx)
}

async fn start_network(
inner: NetworkInner,
mut worker: NetworkWorker,
net_tx: mpsc::UnboundedSender<NetworkMessage>,
net_rx: mpsc::UnboundedReceiver<NetworkMessage>,
event_tx: mpsc::UnboundedSender<PeerEvent>,
close_rx: mpsc::UnboundedReceiver<()>,
) {
let inner = NetworkInner::new(worker.service().clone());
let mut event_stream = inner.service.event_stream("network").fuse();
let mut net_rx = net_rx.fuse();
let mut close_rx = close_rx.fuse();
Expand All @@ -98,24 +102,29 @@ impl SNetworkService {
inner.handle_network_receive(event,net_tx.clone(),event_tx.clone()).await;
},
_ = close_rx.select_next_some() => {
//TODO
debug!("To shutdown command ");
info!("Network shutdown");
break;
}
_ = (&mut worker).fuse() => {},
complete => {
debug!("all stream are complete");
break;
}
}
}
}
fn service(&self) -> &Arc<NetworkService> {
self.service
.as_ref()
.expect("Should call network function after network running.")
}

pub async fn is_connected(&self, peer_id: PeerId) -> Result<bool> {
Ok(self.service.is_connected(peer_id).await)
Ok(self.service().is_connected(peer_id).await)
}

pub fn identify(&self) -> &PeerId {
self.service.peer_id()
self.service().peer_id()
}

pub async fn send_message(
Expand All @@ -125,7 +134,7 @@ impl SNetworkService {
message: Vec<u8>,
) -> Result<()> {
debug!("Send message to {}", &peer_id);
self.service
self.service()
.write_notification(peer_id, protocol_name, message);

Ok(())
Expand All @@ -139,34 +148,38 @@ impl SNetworkService {
) -> Result<Vec<u8>, RequestFailure> {
let protocol = protocol.into();
debug!("Send request to peer {} and rpc: {:?}", target, protocol);
self.service.request(target.into(), protocol, request).await
self.service()
.request(target.into(), protocol, request)
.await
}

pub async fn broadcast_message(&mut self, protocol_name: Cow<'static, str>, message: Vec<u8>) {
debug!("broadcast message, protocol: {:?}", protocol_name);
self.service.broadcast_message(protocol_name, message).await;
self.service()
.broadcast_message(protocol_name, message)
.await;
}

pub fn add_peer(&self, peer: String) -> Result<()> {
self.service
self.service()
.add_reserved_peer(peer)
.map_err(|e| format_err!("{:?}", e))
}

pub async fn connected_peers(&self) -> HashSet<PeerId> {
self.service.connected_peers().await
self.service().connected_peers().await
}

pub fn update_chain_status(&self, chain_status: ChainStatus) {
self.service.update_chain_status(chain_status);
self.service().update_chain_status(chain_status);
}

pub async fn get_address(&self, peer_id: PeerId) -> Vec<Multiaddr> {
self.service.get_address(peer_id).await
self.service().get_address(peer_id).await
}

pub async fn exist_notif_proto(&self, protocol_name: Cow<'static, str>) -> bool {
self.service.exist_notif_proto(protocol_name).await
self.service().exist_notif_proto(protocol_name).await
}
}

Expand Down Expand Up @@ -267,11 +280,15 @@ pub fn build_network_service(
mpsc::UnboundedReceiver<PeerEvent>,
mpsc::UnboundedSender<()>,
) {
let transport_config = TransportConfig::Normal {
//TODO support enable mdns by config.
enable_mdns: false,
allow_private_ipv4: false,
wasm_external_transport: None,
let transport_config = if is_memory_addr(&cfg.listen) {
TransportConfig::MemoryOnly
} else {
TransportConfig::Normal {
//TODO support enable mdns by config.
enable_mdns: false,
allow_private_ipv4: false,
wasm_external_transport: None,
}
};
//let rpc_info: Vec<String> = starcoin_network_rpc_api::gen_client::get_rpc_info();
//TODO define RequestResponseConfig by rpc api
Expand Down
Loading

0 comments on commit baf836e

Please sign in to comment.