From 1cfaff6c8bb2108599c2e44448b099e9ecc9f6f2 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Tue, 7 Apr 2020 23:49:29 +0800 Subject: [PATCH] refactor: Remove DiscoveryService struct --- network/src/network.rs | 23 +- network/src/protocols/discovery/addr.rs | 9 - network/src/protocols/discovery/mod.rs | 234 +++++------------- network/src/protocols/identify/mod.rs | 6 - network/src/protocols/test.rs | 26 +- protocols/discovery/Cargo.toml | 17 -- protocols/identify/Cargo.toml | 13 - protocols/ping/Cargo.toml | 14 -- protocols/ping/src/lib.rs | 302 ------------------------ 9 files changed, 72 insertions(+), 572 deletions(-) delete mode 100644 protocols/discovery/Cargo.toml delete mode 100644 protocols/identify/Cargo.toml delete mode 100644 protocols/ping/Cargo.toml delete mode 100644 protocols/ping/src/lib.rs diff --git a/network/src/network.rs b/network/src/network.rs index 2bfd4f4bb61..9722ba5f6c6 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -6,7 +6,7 @@ use crate::peer_store::{ }; use crate::protocols::{ disconnect_message::DisconnectMessageProtocol, - discovery::{DiscoveryProtocol, DiscoveryService}, + discovery::DiscoveryProtocol, feeler::Feeler, identify::{IdentifyCallback, IdentifyProtocol}, ping::{PingHandler, PingService}, @@ -25,10 +25,7 @@ use ckb_logger::{debug, error, info, trace, warn}; use ckb_stop_handler::{SignalSender, StopHandler}; use ckb_util::{Condvar, Mutex, RwLock}; use futures::{ - channel::{ - mpsc::{self, channel}, - oneshot, - }, + channel::{mpsc::channel, oneshot}, Future, StreamExt, }; use ipnetwork::IpNetwork; @@ -879,7 +876,7 @@ impl NetworkService { .build(); // Discovery protocol - let (disc_sender, disc_receiver) = mpsc::unbounded(); + let disc_network_state = Arc::clone(&network_state); let disc_meta = MetaBuilder::default() .id(DISCOVERY_PROTOCOL_ID.into()) .name(move |_| "/ckb/discovery".to_string()) @@ -891,10 +888,10 @@ impl NetworkService { ) }) .service_handle(move || { - ProtocolHandle::Both(Box::new( - DiscoveryProtocol::new(disc_sender.clone()) - .global_ip_only(!config.discovery_local_address), - )) + ProtocolHandle::Both(Box::new(DiscoveryProtocol::new( + disc_network_state, + config.discovery_local_address, + ))) }) .build(); @@ -975,11 +972,6 @@ impl NetworkService { .build(event_handler); // == Build background service tasks - let disc_service = DiscoveryService::new( - Arc::clone(&network_state), - disc_receiver, - config.discovery_local_address, - ); let mut ping_service = PingService::new( Arc::clone(&network_state), p2p_service.control().to_owned(), @@ -999,7 +991,6 @@ impl NetworkService { } } }) as Pin>, - Box::pin(disc_service) as Pin>, Box::pin(dump_peer_store_service) as Pin>, Box::pin(protocol_type_checker_service) as Pin>, ]; diff --git a/network/src/protocols/discovery/addr.rs b/network/src/protocols/discovery/addr.rs index 5232e1ba0a0..d4975bad1e7 100644 --- a/network/src/protocols/discovery/addr.rs +++ b/network/src/protocols/discovery/addr.rs @@ -29,20 +29,11 @@ pub enum Misbehavior { /// Misbehavior report result pub enum MisbehaveResult { - // /// Continue to run - // Continue, /// Disconnect this peer Disconnect, } impl MisbehaveResult { - // pub fn is_continue(&self) -> bool { - // match self { - // MisbehaveResult::Continue => true, - // _ => false, - // } - // } - pub fn is_disconnect(&self) -> bool { match self { MisbehaveResult::Disconnect => true, diff --git a/network/src/protocols/discovery/mod.rs b/network/src/protocols/discovery/mod.rs index 6b2916d5654..0a0e62e0e38 100644 --- a/network/src/protocols/discovery/mod.rs +++ b/network/src/protocols/discovery/mod.rs @@ -1,4 +1,3 @@ -use crossbeam_channel::{self, bounded}; use futures::{ channel::mpsc::{self, channel, Receiver, Sender}, prelude::*, @@ -15,6 +14,7 @@ use std::{ }; use ckb_logger::{debug, error, trace, warn}; +use ckb_util::RwLock; use p2p::{ bytes, context::{ProtocolContext, ProtocolContextMutRef}, @@ -311,30 +311,30 @@ pub struct DiscoveryProtocol { discovery: Option>, discovery_handle: DiscoveryHandle, discovery_senders: HashMap>>, - event_sender: mpsc::UnboundedSender, + sessions: Arc>>, } impl DiscoveryProtocol { - pub fn new(event_sender: mpsc::UnboundedSender) -> DiscoveryProtocol { + pub fn new( + network_state: Arc, + discovery_local_address: bool, + ) -> DiscoveryProtocol { + let sessions = Arc::new(RwLock::new(HashMap::default())); let addr_mgr = DiscoveryAddressManager { - event_sender: event_sender.clone(), + sessions: Arc::clone(&sessions), + network_state, + discovery_local_address, }; - let discovery = Discovery::new(addr_mgr, Some(Duration::from_secs(7))); + let discovery = Discovery::new(addr_mgr, Some(Duration::from_secs(7))) + .global_ip_only(!discovery_local_address); let discovery_handle = discovery.handle(); DiscoveryProtocol { discovery: Some(discovery), discovery_handle, discovery_senders: HashMap::default(), - event_sender, + sessions, } } - - pub fn global_ip_only(mut self, global_ip_only: bool) -> Self { - self.discovery = self - .discovery - .map(move |protocol| protocol.global_ip_only(global_ip_only)); - self - } } impl ServiceProtocol for DiscoveryProtocol { @@ -368,13 +368,9 @@ impl ServiceProtocol for DiscoveryProtocol { "protocol [discovery] open on session [{}], address: [{}], type: [{:?}]", session.id, session.address, session.ty ); - let event = DiscoveryEvent::Connected { - session_id: session.id, - peer_id: session.remote_pubkey.clone().map(|pubkey| pubkey.peer_id()), - }; - if self.event_sender.unbounded_send(event).is_err() { - debug!("receiver maybe dropped! (ServiceProtocol::connected)"); - return; + + if let Some(pubkey) = session.remote_pubkey.as_ref() { + self.sessions.write().insert(session.id, pubkey.peer_id()); } let (sender, receiver) = mpsc::channel(8); @@ -393,11 +389,7 @@ impl ServiceProtocol for DiscoveryProtocol { fn disconnected(&mut self, context: ProtocolContextMutRef) { let session = context.session; - let event = DiscoveryEvent::Disconnected(session.id); - if self.event_sender.unbounded_send(event).is_err() { - debug!("receiver maybe dropped! (ServiceProtocol::disconnected)"); - return; - } + self.sessions.write().remove(&session.id); self.discovery_senders.remove(&session.id); debug!("protocol [discovery] close on session [{}]", session.id); } @@ -422,48 +414,13 @@ impl ServiceProtocol for DiscoveryProtocol { } } -pub enum DiscoveryEvent { - Connected { - session_id: SessionId, - peer_id: Option, - }, - Disconnected(SessionId), - AddNewAddrs { - session_id: SessionId, - addrs: Vec, - }, - Misbehave { - session_id: SessionId, - kind: Misbehavior, - result: crossbeam_channel::Sender, - }, - GetRandom { - n: usize, - result: crossbeam_channel::Sender>, - }, -} - -pub struct DiscoveryService { - event_receiver: mpsc::UnboundedReceiver, +pub struct DiscoveryAddressManager { network_state: Arc, - sessions: HashMap, + sessions: Arc>>, discovery_local_address: bool, } -impl DiscoveryService { - pub fn new( - network_state: Arc, - event_receiver: mpsc::UnboundedReceiver, - discovery_local_address: bool, - ) -> DiscoveryService { - DiscoveryService { - event_receiver, - network_state, - sessions: HashMap::default(), - discovery_local_address, - } - } - +impl DiscoveryAddressManager { fn is_valid_addr(&self, addr: &Multiaddr) -> bool { if !self.discovery_local_address { let local_or_invalid = multiaddr_to_socketaddr(&addr) @@ -474,94 +431,6 @@ impl DiscoveryService { true } } - - fn handle_event(&mut self, event: DiscoveryEvent) { - match event { - DiscoveryEvent::Connected { - session_id, - peer_id, - } => { - if let Some(peer_id) = peer_id { - self.sessions.insert(session_id, peer_id); - } - } - DiscoveryEvent::Disconnected(session_id) => { - self.sessions.remove(&session_id); - } - DiscoveryEvent::AddNewAddrs { session_id, addrs } => { - if let Some(_peer_id) = self.sessions.get(&session_id) { - // TODO: wait for peer store update - for addr in addrs.into_iter().filter(|addr| self.is_valid_addr(addr)) { - trace!("Add discovered address:{:?}", addr); - if let Some(peer_id) = extract_peer_id(&addr) { - self.network_state.with_peer_store_mut(|peer_store| { - if let Err(err) = peer_store.add_addr(peer_id.clone(), addr) { - debug!( - "Failed to add discoved address to peer_store {:?} {:?}", - err, peer_id - ); - } - }); - } - } - } - } - DiscoveryEvent::Misbehave { - session_id: _session_id, - kind: _kind, - result: _result, - } => { - // FIXME: - } - DiscoveryEvent::GetRandom { n, result } => { - let fetch_random_addrs = self - .network_state - .with_peer_store_mut(|peer_store| peer_store.fetch_random_addrs(n)); - let addrs = fetch_random_addrs - .into_iter() - .filter_map(|paddr| { - if !self.is_valid_addr(&paddr.addr) { - return None; - } - match paddr.multiaddr() { - Ok(addr) => Some(addr), - Err(err) => { - error!("return discovery addresses error: {:?}", err); - None - } - } - }) - .collect(); - trace!("discovery send random addrs: {:?}", addrs); - result - .send(addrs) - .expect("Send failed (should not happened)"); - } - } - } -} - -impl Future for DiscoveryService { - type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match self.event_receiver.poll_next_unpin(cx) { - Poll::Ready(Some(event)) => { - self.handle_event(event); - } - Poll::Ready(None) => { - debug!("discovery service shutdown"); - return Poll::Ready(()); - } - Poll::Pending => break, - } - } - Poll::Pending - } -} - -pub struct DiscoveryAddressManager { - pub event_sender: mpsc::UnboundedSender, } impl AddressManager for DiscoveryAddressManager { @@ -573,35 +442,50 @@ impl AddressManager for DiscoveryAddressManager { if addrs.is_empty() { return; } - let event = DiscoveryEvent::AddNewAddrs { session_id, addrs }; - if self.event_sender.unbounded_send(event).is_err() { - debug!("receiver maybe dropped! (DiscoveryAddressManager::add_new_addrs)"); + + if let Some(_peer_id) = self.sessions.read().get(&session_id) { + // TODO: wait for peer store update + for addr in addrs.into_iter().filter(|addr| self.is_valid_addr(addr)) { + trace!("Add discovered address:{:?}", addr); + if let Some(peer_id) = extract_peer_id(&addr) { + self.network_state.with_peer_store_mut(|peer_store| { + if let Err(err) = peer_store.add_addr(peer_id.clone(), addr) { + debug!( + "Failed to add discoved address to peer_store {:?} {:?}", + err, peer_id + ); + } + }); + } + } } } - fn misbehave(&mut self, session_id: SessionId, kind: Misbehavior) -> MisbehaveResult { - let (sender, receiver) = bounded(1); - let event = DiscoveryEvent::Misbehave { - session_id, - kind, - result: sender, - }; - if self.event_sender.unbounded_send(event).is_err() { - debug!("receiver maybe dropped! (DiscoveryAddressManager::misbehave)"); - MisbehaveResult::Disconnect - } else { - tokio::task::block_in_place(|| receiver.recv().unwrap_or(MisbehaveResult::Disconnect)) - } + fn misbehave(&mut self, _session_id: SessionId, _kind: Misbehavior) -> MisbehaveResult { + // FIXME: + MisbehaveResult::Disconnect } fn get_random(&mut self, n: usize) -> Vec { - let (sender, receiver) = bounded(1); - let event = DiscoveryEvent::GetRandom { n, result: sender }; - if self.event_sender.unbounded_send(event).is_err() { - debug!("receiver maybe dropped! (DiscoveryAddressManager::get_random)"); - Vec::new() - } else { - tokio::task::block_in_place(|| receiver.recv().ok().unwrap_or_else(Vec::new)) - } + let fetch_random_addrs = self + .network_state + .with_peer_store_mut(|peer_store| peer_store.fetch_random_addrs(n)); + let addrs = fetch_random_addrs + .into_iter() + .filter_map(|paddr| { + if !self.is_valid_addr(&paddr.addr) { + return None; + } + match paddr.multiaddr() { + Ok(addr) => Some(addr), + Err(err) => { + error!("return discovery addresses error: {:?}", err); + None + } + } + }) + .collect(); + trace!("discovery send random addrs: {:?}", addrs); + addrs } } diff --git a/network/src/protocols/identify/mod.rs b/network/src/protocols/identify/mod.rs index fd120de8259..0ae86c7d3b0 100644 --- a/network/src/protocols/identify/mod.rs +++ b/network/src/protocols/identify/mod.rs @@ -52,12 +52,6 @@ pub enum MisbehaveResult { } impl MisbehaveResult { - // pub fn is_continue(&self) -> bool { - // match self { - // MisbehaveResult::Continue => true, - // _ => false, - // } - // } pub fn is_disconnect(&self) -> bool { match self { MisbehaveResult::Disconnect => true, diff --git a/network/src/protocols/test.rs b/network/src/protocols/test.rs index 3c7c1e122ce..89d44b22c36 100644 --- a/network/src/protocols/test.rs +++ b/network/src/protocols/test.rs @@ -1,8 +1,8 @@ use super::{ - discovery::{DiscoveryProtocol, DiscoveryService}, + discovery::DiscoveryProtocol, feeler::Feeler, - identify::IdentifyCallback, - ping::PingService, + identify::{IdentifyCallback, IdentifyProtocol}, + ping::{PingHandler, PingService}, }; use crate::{ @@ -19,10 +19,7 @@ use std::{ }; use ckb_util::{Condvar, Mutex}; -use futures::{ - channel::mpsc::{self, channel}, - StreamExt, -}; +use futures::{channel::mpsc::channel, StreamExt}; use p2p::{ builder::{MetaBuilder, ServiceBuilder}, multiaddr::{Multiaddr, Protocol}, @@ -30,8 +27,6 @@ use p2p::{ utils::multiaddr_to_socketaddr, ProtocolId, SessionId, }; -use p2p_identify::IdentifyProtocol; -use p2p_ping::PingHandler; use tempfile::tempdir; struct Node { @@ -161,13 +156,11 @@ fn net_service_start(name: String) -> Node { .build(); // Discovery protocol - let (disc_sender, disc_receiver) = mpsc::unbounded(); + let disc_network_state = Arc::clone(&network_state); let disc_meta = MetaBuilder::default() .id(DISCOVERY_PROTOCOL_ID.into()) .service_handle(move || { - ProtocolHandle::Both(Box::new( - DiscoveryProtocol::new(disc_sender).global_ip_only(false), - )) + ProtocolHandle::Both(Box::new(DiscoveryProtocol::new(disc_network_state, true))) }) .build(); @@ -206,12 +199,6 @@ fn net_service_start(name: String) -> Node { exit_condvar: Arc::new((Mutex::new(()), Condvar::new())), }); - let disc_service = DiscoveryService::new( - Arc::clone(&network_state), - disc_receiver, - config.discovery_local_address, - ); - let mut ping_service = PingService::new( Arc::clone(&network_state), p2p_service.control().to_owned(), @@ -231,7 +218,6 @@ fn net_service_start(name: String) -> Node { .threaded_scheduler() .build() .unwrap(); - rt.spawn(disc_service); rt.spawn(async move { loop { if ping_service.next().await.is_none() { diff --git a/protocols/discovery/Cargo.toml b/protocols/discovery/Cargo.toml deleted file mode 100644 index 984df77b3bc..00000000000 --- a/protocols/discovery/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -name = "ckb-discovery" -version = "0.31.0-pre" -license = "MIT" -authors = ["Nervos Core Dev "] -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -p2p = { version = "0.3.0-alpha.1", package = "tentacle", features = ["molc"] } -ckb-logger = { path = "../../util/logger" } -futures = { version = "0.3.0" } -tokio = { version = "0.2.0", features = ["time", "io-util", "tcp", "dns", "stream"] } -tokio-util = { version = "0.2.0", features = ["codec"] } -rand = "0.6.1" -ckb-types = { path = "../../util/types" } diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml deleted file mode 100644 index b4de0723413..00000000000 --- a/protocols/identify/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "ckb-identify" -version = "0.31.0-pre" -license = "MIT" -authors = ["Nervos Core Dev "] -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -p2p = { version = "0.3.0-alpha.1", package = "tentacle", features = ["molc"] } -ckb-logger = { path = "../../util/logger" } -ckb-types = { path = "../../util/types" } diff --git a/protocols/ping/Cargo.toml b/protocols/ping/Cargo.toml deleted file mode 100644 index 9da18702e6c..00000000000 --- a/protocols/ping/Cargo.toml +++ /dev/null @@ -1,14 +0,0 @@ -[package] -name = "ckb-ping" -version = "0.31.0-pre" -authors = ["Nervos Core Dev "] -license = "MIT" -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -p2p = { version = "0.3.0-alpha.1", package = "tentacle", features = ["molc"] } -ckb-logger = { path = "../../util/logger" } -futures = "0.3" -ckb-types = { path = "../../util/types" } diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs deleted file mode 100644 index 02c12b7a89e..00000000000 --- a/protocols/ping/src/lib.rs +++ /dev/null @@ -1,302 +0,0 @@ -use ckb_logger::{debug, error, warn}; -use futures::channel::mpsc::Sender; -use p2p::{ - bytes::Bytes, - context::{ProtocolContext, ProtocolContextMutRef}, - secio::PeerId, - service::TargetSession, - traits::ServiceProtocol, - SessionId, -}; - -use std::{ - collections::HashMap, - str, - time::{Duration, SystemTime, UNIX_EPOCH}, -}; - -use ckb_types::{packed, prelude::*}; - -const SEND_PING_TOKEN: u64 = 0; -const CHECK_TIMEOUT_TOKEN: u64 = 1; - -/// Ping protocol events -#[derive(Debug)] -pub enum Event { - /// Peer send ping to us. - Ping(PeerId), - /// Peer send pong to us. - Pong(PeerId, Duration), - /// Peer is timeout. - Timeout(PeerId), - /// Peer cause a unexpected error. - UnexpectedError(PeerId), -} - -/// Ping protocol handler. -/// -/// The interval means that we send ping to peers. -/// The timeout means that consider peer is timeout if during a timeout we still have not received pong from a peer -pub struct PingHandler { - interval: Duration, - timeout: Duration, - connected_session_ids: HashMap, - event_sender: Sender, -} - -impl PingHandler { - pub fn new(interval: Duration, timeout: Duration, event_sender: Sender) -> PingHandler { - PingHandler { - interval, - timeout, - connected_session_ids: Default::default(), - event_sender, - } - } - - pub fn send_event(&mut self, event: Event) { - if let Err(err) = self.event_sender.try_send(event) { - error!("send ping event error: {}", err); - } - } -} - -/// PingStatus of a peer -#[derive(Clone, Debug)] -struct PingStatus { - /// Are we currently pinging this peer? - processing: bool, - /// The time we last send ping to this peer. - last_ping: SystemTime, - peer_id: PeerId, - version: String, -} - -impl PingStatus { - /// A meaningless value, peer must send a pong has same nonce to respond a ping. - fn nonce(&self) -> u32 { - self.last_ping - .duration_since(UNIX_EPOCH) - .map(|dur| dur.as_secs()) - .unwrap_or(0) as u32 - } - - /// Time duration since we last send ping. - fn elapsed(&self) -> Duration { - self.last_ping.elapsed().unwrap_or(Duration::from_secs(0)) - } -} - -impl ServiceProtocol for PingHandler { - fn init(&mut self, context: &mut ProtocolContext) { - // periodicly send ping to peers - let proto_id = context.proto_id; - if context - .set_service_notify(proto_id, self.interval, SEND_PING_TOKEN) - .is_err() - { - warn!("start ping fail"); - } - if context - .set_service_notify(proto_id, self.timeout, CHECK_TIMEOUT_TOKEN) - .is_err() - { - warn!("start ping fail"); - } - } - - fn connected(&mut self, context: ProtocolContextMutRef, version: &str) { - let session = context.session; - match session.remote_pubkey { - Some(ref pubkey) => { - let peer_id = pubkey.peer_id(); - self.connected_session_ids - .entry(session.id) - .or_insert_with(|| PingStatus { - last_ping: SystemTime::now(), - processing: false, - peer_id, - version: version.to_owned(), - }); - debug!( - "proto id [{}] open on session [{}], address: [{}], type: [{:?}], version: {}", - context.proto_id, session.id, session.address, session.ty, version - ); - debug!("connected sessions are: {:?}", self.connected_session_ids); - } - None => { - if context.disconnect(session.id).is_err() { - debug!("disconnect fail"); - } - } - } - } - - fn disconnected(&mut self, context: ProtocolContextMutRef) { - let session = context.session; - self.connected_session_ids.remove(&session.id); - debug!( - "proto id [{}] close on session [{}]", - context.proto_id, session.id - ); - } - - fn received(&mut self, context: ProtocolContextMutRef, data: Bytes) { - let session = context.session; - if let Some(peer_id) = self - .connected_session_ids - .get(&session.id) - .map(|ps| ps.peer_id.clone()) - { - match PingMessage::decode(data.as_ref()) { - None => { - error!("decode message error"); - self.send_event(Event::UnexpectedError(peer_id)); - } - Some(msg) => { - match msg { - PingPayload::Ping(nonce) => { - if context - .send_message(PingMessage::build_pong(nonce)) - .is_err() - { - debug!("send message fail"); - } - self.send_event(Event::Ping(peer_id)); - } - PingPayload::Pong(nonce) => { - // check pong - if self - .connected_session_ids - .get(&session.id) - .map(|ps| (ps.processing, ps.nonce())) - == Some((true, nonce)) - { - let ping_time = - match self.connected_session_ids.get_mut(&session.id) { - Some(ps) => { - ps.processing = false; - ps.elapsed() - } - None => return, - }; - self.send_event(Event::Pong(peer_id, ping_time)); - } else { - // ignore if nonce is incorrect - self.send_event(Event::UnexpectedError(peer_id)); - } - } - } - } - } - } - } - - fn notify(&mut self, context: &mut ProtocolContext, token: u64) { - match token { - SEND_PING_TOKEN => { - debug!("proto [{}] start ping peers", context.proto_id); - let now = SystemTime::now(); - let peers: Vec<(SessionId, u32)> = self - .connected_session_ids - .iter_mut() - .filter_map(|(session_id, ps)| { - if ps.processing { - None - } else { - ps.processing = true; - ps.last_ping = now; - Some((*session_id, ps.nonce())) - } - }) - .collect(); - if !peers.is_empty() { - let ping_msg = PingMessage::build_ping(peers[0].1); - let peer_ids: Vec = peers - .into_iter() - .map(|(session_id, _)| session_id) - .collect(); - let proto_id = context.proto_id; - if context - .filter_broadcast(TargetSession::Multi(peer_ids), proto_id, ping_msg) - .is_err() - { - debug!("send message fail"); - } - } - } - CHECK_TIMEOUT_TOKEN => { - debug!("proto [{}] check ping timeout", context.proto_id); - let timeout = self.timeout; - for peer_id in self - .connected_session_ids - .values() - .filter(|ps| ps.processing && ps.elapsed() >= timeout) - .map(|ps| ps.peer_id.clone()) - .collect::>() - { - self.send_event(Event::Timeout(peer_id)); - } - } - _ => panic!("unknown token {}", token), - } - } -} - -enum PingPayload { - Ping(u32), - Pong(u32), -} - -struct PingMessage; - -impl PingMessage { - fn build_ping(nonce: u32) -> Bytes { - let nonce_le = nonce.to_le_bytes(); - let nonce = packed::Uint32::new_builder() - .nth0(nonce_le[0].into()) - .nth1(nonce_le[1].into()) - .nth2(nonce_le[2].into()) - .nth3(nonce_le[3].into()) - .build(); - let ping = packed::Ping::new_builder().nonce(nonce).build(); - let payload = packed::PingPayload::new_builder().set(ping).build(); - - packed::PingMessage::new_builder() - .payload(payload) - .build() - .as_bytes() - } - - fn build_pong(nonce: u32) -> Bytes { - let nonce_le = nonce.to_le_bytes(); - let nonce = packed::Uint32::new_builder() - .nth0(nonce_le[0].into()) - .nth1(nonce_le[1].into()) - .nth2(nonce_le[2].into()) - .nth3(nonce_le[3].into()) - .build(); - let pong = packed::Pong::new_builder().nonce(nonce).build(); - let payload = packed::PingPayload::new_builder().set(pong).build(); - - packed::PingMessage::new_builder() - .payload(payload) - .build() - .as_bytes() - } - - #[allow(clippy::cast_ptr_alignment)] - fn decode(data: &[u8]) -> Option { - let reader = packed::PingMessageReader::from_compatible_slice(data).ok()?; - match reader.payload().to_enum() { - packed::PingPayloadUnionReader::Ping(reader) => { - let le = reader.nonce().raw_data().as_ptr() as *const u32; - Some(PingPayload::Ping(u32::from_le(unsafe { *le }))) - } - packed::PingPayloadUnionReader::Pong(reader) => { - let le = reader.nonce().raw_data().as_ptr() as *const u32; - Some(PingPayload::Pong(u32::from_le(unsafe { *le }))) - } - } - } -}