Skip to content

Commit

Permalink
refactor: Remove DiscoveryService struct
Browse files Browse the repository at this point in the history
  • Loading branch information
TheWaWaR committed Apr 7, 2020
1 parent 113f98f commit 1cfaff6
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 572 deletions.
23 changes: 7 additions & 16 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::peer_store::{
};
use crate::protocols::{
disconnect_message::DisconnectMessageProtocol,
discovery::{DiscoveryProtocol, DiscoveryService},
discovery::DiscoveryProtocol,
feeler::Feeler,
identify::{IdentifyCallback, IdentifyProtocol},
ping::{PingHandler, PingService},
Expand All @@ -25,10 +25,7 @@ use ckb_logger::{debug, error, info, trace, warn};
use ckb_stop_handler::{SignalSender, StopHandler};
use ckb_util::{Condvar, Mutex, RwLock};
use futures::{
channel::{
mpsc::{self, channel},
oneshot,
},
channel::{mpsc::channel, oneshot},
Future, StreamExt,
};
use ipnetwork::IpNetwork;
Expand Down Expand Up @@ -879,7 +876,7 @@ impl NetworkService {
.build();

// Discovery protocol
let (disc_sender, disc_receiver) = mpsc::unbounded();
let disc_network_state = Arc::clone(&network_state);
let disc_meta = MetaBuilder::default()
.id(DISCOVERY_PROTOCOL_ID.into())
.name(move |_| "/ckb/discovery".to_string())
Expand All @@ -891,10 +888,10 @@ impl NetworkService {
)
})
.service_handle(move || {
ProtocolHandle::Both(Box::new(
DiscoveryProtocol::new(disc_sender.clone())
.global_ip_only(!config.discovery_local_address),
))
ProtocolHandle::Both(Box::new(DiscoveryProtocol::new(
disc_network_state,
config.discovery_local_address,
)))
})
.build();

Expand Down Expand Up @@ -975,11 +972,6 @@ impl NetworkService {
.build(event_handler);

// == Build background service tasks
let disc_service = DiscoveryService::new(
Arc::clone(&network_state),
disc_receiver,
config.discovery_local_address,
);
let mut ping_service = PingService::new(
Arc::clone(&network_state),
p2p_service.control().to_owned(),
Expand All @@ -999,7 +991,6 @@ impl NetworkService {
}
}
}) as Pin<Box<_>>,
Box::pin(disc_service) as Pin<Box<_>>,
Box::pin(dump_peer_store_service) as Pin<Box<_>>,
Box::pin(protocol_type_checker_service) as Pin<Box<_>>,
];
Expand Down
9 changes: 0 additions & 9 deletions network/src/protocols/discovery/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,11 @@ pub enum Misbehavior {

/// Misbehavior report result
pub enum MisbehaveResult {
// /// Continue to run
// Continue,
/// Disconnect this peer
Disconnect,
}

impl MisbehaveResult {
// pub fn is_continue(&self) -> bool {
// match self {
// MisbehaveResult::Continue => true,
// _ => false,
// }
// }

pub fn is_disconnect(&self) -> bool {
match self {
MisbehaveResult::Disconnect => true,
Expand Down
234 changes: 59 additions & 175 deletions network/src/protocols/discovery/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crossbeam_channel::{self, bounded};
use futures::{
channel::mpsc::{self, channel, Receiver, Sender},
prelude::*,
Expand All @@ -15,6 +14,7 @@ use std::{
};

use ckb_logger::{debug, error, trace, warn};
use ckb_util::RwLock;
use p2p::{
bytes,
context::{ProtocolContext, ProtocolContextMutRef},
Expand Down Expand Up @@ -311,30 +311,30 @@ pub struct DiscoveryProtocol {
discovery: Option<Discovery<DiscoveryAddressManager>>,
discovery_handle: DiscoveryHandle,
discovery_senders: HashMap<SessionId, mpsc::Sender<Vec<u8>>>,
event_sender: mpsc::UnboundedSender<DiscoveryEvent>,
sessions: Arc<RwLock<HashMap<SessionId, PeerId>>>,
}

impl DiscoveryProtocol {
pub fn new(event_sender: mpsc::UnboundedSender<DiscoveryEvent>) -> DiscoveryProtocol {
pub fn new(
network_state: Arc<NetworkState>,
discovery_local_address: bool,
) -> DiscoveryProtocol {
let sessions = Arc::new(RwLock::new(HashMap::default()));
let addr_mgr = DiscoveryAddressManager {
event_sender: event_sender.clone(),
sessions: Arc::clone(&sessions),
network_state,
discovery_local_address,
};
let discovery = Discovery::new(addr_mgr, Some(Duration::from_secs(7)));
let discovery = Discovery::new(addr_mgr, Some(Duration::from_secs(7)))
.global_ip_only(!discovery_local_address);
let discovery_handle = discovery.handle();
DiscoveryProtocol {
discovery: Some(discovery),
discovery_handle,
discovery_senders: HashMap::default(),
event_sender,
sessions,
}
}

pub fn global_ip_only(mut self, global_ip_only: bool) -> Self {
self.discovery = self
.discovery
.map(move |protocol| protocol.global_ip_only(global_ip_only));
self
}
}

impl ServiceProtocol for DiscoveryProtocol {
Expand Down Expand Up @@ -368,13 +368,9 @@ impl ServiceProtocol for DiscoveryProtocol {
"protocol [discovery] open on session [{}], address: [{}], type: [{:?}]",
session.id, session.address, session.ty
);
let event = DiscoveryEvent::Connected {
session_id: session.id,
peer_id: session.remote_pubkey.clone().map(|pubkey| pubkey.peer_id()),
};
if self.event_sender.unbounded_send(event).is_err() {
debug!("receiver maybe dropped! (ServiceProtocol::connected)");
return;

if let Some(pubkey) = session.remote_pubkey.as_ref() {
self.sessions.write().insert(session.id, pubkey.peer_id());
}

let (sender, receiver) = mpsc::channel(8);
Expand All @@ -393,11 +389,7 @@ impl ServiceProtocol for DiscoveryProtocol {

fn disconnected(&mut self, context: ProtocolContextMutRef) {
let session = context.session;
let event = DiscoveryEvent::Disconnected(session.id);
if self.event_sender.unbounded_send(event).is_err() {
debug!("receiver maybe dropped! (ServiceProtocol::disconnected)");
return;
}
self.sessions.write().remove(&session.id);
self.discovery_senders.remove(&session.id);
debug!("protocol [discovery] close on session [{}]", session.id);
}
Expand All @@ -422,48 +414,13 @@ impl ServiceProtocol for DiscoveryProtocol {
}
}

pub enum DiscoveryEvent {
Connected {
session_id: SessionId,
peer_id: Option<PeerId>,
},
Disconnected(SessionId),
AddNewAddrs {
session_id: SessionId,
addrs: Vec<Multiaddr>,
},
Misbehave {
session_id: SessionId,
kind: Misbehavior,
result: crossbeam_channel::Sender<MisbehaveResult>,
},
GetRandom {
n: usize,
result: crossbeam_channel::Sender<Vec<Multiaddr>>,
},
}

pub struct DiscoveryService {
event_receiver: mpsc::UnboundedReceiver<DiscoveryEvent>,
pub struct DiscoveryAddressManager {
network_state: Arc<NetworkState>,
sessions: HashMap<SessionId, PeerId>,
sessions: Arc<RwLock<HashMap<SessionId, PeerId>>>,
discovery_local_address: bool,
}

impl DiscoveryService {
pub fn new(
network_state: Arc<NetworkState>,
event_receiver: mpsc::UnboundedReceiver<DiscoveryEvent>,
discovery_local_address: bool,
) -> DiscoveryService {
DiscoveryService {
event_receiver,
network_state,
sessions: HashMap::default(),
discovery_local_address,
}
}

impl DiscoveryAddressManager {
fn is_valid_addr(&self, addr: &Multiaddr) -> bool {
if !self.discovery_local_address {
let local_or_invalid = multiaddr_to_socketaddr(&addr)
Expand All @@ -474,94 +431,6 @@ impl DiscoveryService {
true
}
}

fn handle_event(&mut self, event: DiscoveryEvent) {
match event {
DiscoveryEvent::Connected {
session_id,
peer_id,
} => {
if let Some(peer_id) = peer_id {
self.sessions.insert(session_id, peer_id);
}
}
DiscoveryEvent::Disconnected(session_id) => {
self.sessions.remove(&session_id);
}
DiscoveryEvent::AddNewAddrs { session_id, addrs } => {
if let Some(_peer_id) = self.sessions.get(&session_id) {
// TODO: wait for peer store update
for addr in addrs.into_iter().filter(|addr| self.is_valid_addr(addr)) {
trace!("Add discovered address:{:?}", addr);
if let Some(peer_id) = extract_peer_id(&addr) {
self.network_state.with_peer_store_mut(|peer_store| {
if let Err(err) = peer_store.add_addr(peer_id.clone(), addr) {
debug!(
"Failed to add discoved address to peer_store {:?} {:?}",
err, peer_id
);
}
});
}
}
}
}
DiscoveryEvent::Misbehave {
session_id: _session_id,
kind: _kind,
result: _result,
} => {
// FIXME:
}
DiscoveryEvent::GetRandom { n, result } => {
let fetch_random_addrs = self
.network_state
.with_peer_store_mut(|peer_store| peer_store.fetch_random_addrs(n));
let addrs = fetch_random_addrs
.into_iter()
.filter_map(|paddr| {
if !self.is_valid_addr(&paddr.addr) {
return None;
}
match paddr.multiaddr() {
Ok(addr) => Some(addr),
Err(err) => {
error!("return discovery addresses error: {:?}", err);
None
}
}
})
.collect();
trace!("discovery send random addrs: {:?}", addrs);
result
.send(addrs)
.expect("Send failed (should not happened)");
}
}
}
}

impl Future for DiscoveryService {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match self.event_receiver.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => {
self.handle_event(event);
}
Poll::Ready(None) => {
debug!("discovery service shutdown");
return Poll::Ready(());
}
Poll::Pending => break,
}
}
Poll::Pending
}
}

pub struct DiscoveryAddressManager {
pub event_sender: mpsc::UnboundedSender<DiscoveryEvent>,
}

impl AddressManager for DiscoveryAddressManager {
Expand All @@ -573,35 +442,50 @@ impl AddressManager for DiscoveryAddressManager {
if addrs.is_empty() {
return;
}
let event = DiscoveryEvent::AddNewAddrs { session_id, addrs };
if self.event_sender.unbounded_send(event).is_err() {
debug!("receiver maybe dropped! (DiscoveryAddressManager::add_new_addrs)");

if let Some(_peer_id) = self.sessions.read().get(&session_id) {
// TODO: wait for peer store update
for addr in addrs.into_iter().filter(|addr| self.is_valid_addr(addr)) {
trace!("Add discovered address:{:?}", addr);
if let Some(peer_id) = extract_peer_id(&addr) {
self.network_state.with_peer_store_mut(|peer_store| {
if let Err(err) = peer_store.add_addr(peer_id.clone(), addr) {
debug!(
"Failed to add discoved address to peer_store {:?} {:?}",
err, peer_id
);
}
});
}
}
}
}

fn misbehave(&mut self, session_id: SessionId, kind: Misbehavior) -> MisbehaveResult {
let (sender, receiver) = bounded(1);
let event = DiscoveryEvent::Misbehave {
session_id,
kind,
result: sender,
};
if self.event_sender.unbounded_send(event).is_err() {
debug!("receiver maybe dropped! (DiscoveryAddressManager::misbehave)");
MisbehaveResult::Disconnect
} else {
tokio::task::block_in_place(|| receiver.recv().unwrap_or(MisbehaveResult::Disconnect))
}
fn misbehave(&mut self, _session_id: SessionId, _kind: Misbehavior) -> MisbehaveResult {
// FIXME:
MisbehaveResult::Disconnect
}

fn get_random(&mut self, n: usize) -> Vec<Multiaddr> {
let (sender, receiver) = bounded(1);
let event = DiscoveryEvent::GetRandom { n, result: sender };
if self.event_sender.unbounded_send(event).is_err() {
debug!("receiver maybe dropped! (DiscoveryAddressManager::get_random)");
Vec::new()
} else {
tokio::task::block_in_place(|| receiver.recv().ok().unwrap_or_else(Vec::new))
}
let fetch_random_addrs = self
.network_state
.with_peer_store_mut(|peer_store| peer_store.fetch_random_addrs(n));
let addrs = fetch_random_addrs
.into_iter()
.filter_map(|paddr| {
if !self.is_valid_addr(&paddr.addr) {
return None;
}
match paddr.multiaddr() {
Ok(addr) => Some(addr),
Err(err) => {
error!("return discovery addresses error: {:?}", err);
None
}
}
})
.collect();
trace!("discovery send random addrs: {:?}", addrs);
addrs
}
}
Loading

0 comments on commit 1cfaff6

Please sign in to comment.