From 2aef67b9c6531b3356056f9ffa0a3c0b5322be29 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 8 Nov 2019 18:53:44 +0100 Subject: [PATCH] Batch gossip messages --- core/network/src/protocol.rs | 59 ++++++--- core/network/src/protocol/consensus_gossip.rs | 124 ++++++++++-------- core/network/src/protocol/message.rs | 3 + 3 files changed, 112 insertions(+), 74 deletions(-) diff --git a/core/network/src/protocol.rs b/core/network/src/protocol.rs index 12a759c437ec3..f06f0cd937b5f 100644 --- a/core/network/src/protocol.rs +++ b/core/network/src/protocol.rs @@ -69,12 +69,14 @@ const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100); const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900); /// Current protocol version. -pub(crate) const CURRENT_VERSION: u32 = 4; +pub(crate) const CURRENT_VERSION: u32 = 5; /// Lowest version we support pub(crate) const MIN_VERSION: u32 = 3; // Maximum allowed entries in `BlockResponse` const MAX_BLOCK_DATA_RESPONSE: u32 = 128; +// Maximum allowed entries in `ConsensusBatch` +const MAX_CONSENSUS_MESSAGES: usize = 256; /// When light node connects to the full node and the full node is behind light node /// for at least `LIGHT_MAXIMAL_BLOCKS_DIFFERENCE` blocks, we consider it unuseful /// and disconnect to free connection slot. @@ -298,7 +300,7 @@ pub trait Context { fn disconnect_peer(&mut self, who: PeerId); /// Send a consensus message to a peer. - fn send_consensus(&mut self, who: PeerId, consensus: ConsensusMessage); + fn send_consensus(&mut self, who: PeerId, messages: Vec); /// Send a chain-specific message to a peer. fn send_chain_specific(&mut self, who: PeerId, message: Vec); @@ -330,13 +332,33 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B, self.behaviour.disconnect_peer(&who) } - fn send_consensus(&mut self, who: PeerId, consensus: ConsensusMessage) { - send_message:: ( - self.behaviour, - &mut self.context_data.stats, - &who, - GenericMessage::Consensus(consensus) - ) + fn send_consensus(&mut self, who: PeerId, messages: Vec) { + if self.context_data.peers.get(&who).map_or(false, |peer| peer.info.protocol_version > 4) { + let mut batch = Vec::new(); + let len = messages.len(); + for (index, message) in messages.into_iter().enumerate() { + batch.reserve(MAX_CONSENSUS_MESSAGES); + batch.push(message); + if batch.len() == MAX_CONSENSUS_MESSAGES || index == len - 1 { + send_message:: ( + self.behaviour, + &mut self.context_data.stats, + &who, + GenericMessage::ConsensusBatch(std::mem::replace(&mut batch, Vec::new())), + ) + } + } + } else { + // Backwards compatibility + for message in messages { + send_message:: ( + self.behaviour, + &mut self.context_data.stats, + &who, + GenericMessage::Consensus(message) + ) + } + } } fn send_chain_specific(&mut self, who: PeerId, message: Vec) { @@ -598,13 +620,18 @@ impl, H: ExHashT> Protocol { GenericMessage::RemoteReadChildRequest(request) => self.on_remote_read_child_request(who, request), GenericMessage::Consensus(msg) => { - if self.context_data.peers.get(&who).map_or(false, |peer| peer.info.protocol_version > 2) { - self.consensus_gossip.on_incoming( - &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), - who, - msg, - ); - } + self.consensus_gossip.on_incoming( + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), + who, + vec![msg], + ); + } + GenericMessage::ConsensusBatch(messages) => { + self.consensus_gossip.on_incoming( + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), + who, + messages, + ); } GenericMessage::ChainSpecific(msg) => self.specialization.on_message( &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), diff --git a/core/network/src/protocol/consensus_gossip.rs b/core/network/src/protocol/consensus_gossip.rs index f3d4e536a788d..1b69797490125 100644 --- a/core/network/src/protocol/consensus_gossip.rs +++ b/core/network/src/protocol/consensus_gossip.rs @@ -159,10 +159,10 @@ impl<'g, 'p, B: BlockT> ValidatorContext for NetworkContext<'g, 'p, B> { /// Send addressed message to a peer. fn send_message(&mut self, who: &PeerId, message: Vec) { - self.protocol.send_consensus(who.clone(), ConsensusMessage { + self.protocol.send_consensus(who.clone(), vec![ConsensusMessage { engine_id: self.engine_id, data: message, - }); + }]); } /// Send all messages with given topic to a peer. @@ -178,7 +178,7 @@ fn propagate<'a, B: BlockT, I>( peers: &mut HashMap>, validators: &HashMap>>, ) - where I: IntoIterator, // (msg_hash, topic, message) + where I: Clone + IntoIterator, // (msg_hash, topic, message) { let mut check_fns = HashMap::new(); let mut message_allowed = move |who: &PeerId, intent: MessageIntent, topic: &B::Hash, message: &ConsensusMessage| { @@ -194,8 +194,10 @@ fn propagate<'a, B: BlockT, I>( (check_fn)(who, intent, topic, &message.data) }; - for (message_hash, topic, message) in messages { - for (id, ref mut peer) in peers.iter_mut() { + + for (id, ref mut peer) in peers.iter_mut() { + let mut batch = Vec::new(); + for (message_hash, topic, message) in messages.clone() { let intent = match intent { MessageIntent::Broadcast => if peer.known_messages.contains(&message_hash) { @@ -219,8 +221,9 @@ fn propagate<'a, B: BlockT, I>( } peer.known_messages.insert(message_hash.clone()); trace!(target: "gossip", "Propagating to {}: {:?}", id, message); - protocol.send_consensus(id.clone(), message.clone()); + batch.push(message.clone()) } + protocol.send_consensus(id.clone(), batch); } } @@ -450,65 +453,68 @@ impl ConsensusGossip { &mut self, protocol: &mut dyn Context, who: PeerId, - message: ConsensusMessage, + messages: Vec, ) { - let message_hash = HashFor::::hash(&message.data[..]); - - if self.known_messages.contains_key(&message_hash) { - trace!(target:"gossip", "Ignored already known message from {}", who); - protocol.report_peer(who.clone(), DUPLICATE_GOSSIP_REPUTATION_CHANGE); - return; - } + trace!(target:"gossip", "Received {} messages from peer {}", messages.len(), who); + for message in messages { + let message_hash = HashFor::::hash(&message.data[..]); + + if self.known_messages.contains_key(&message_hash) { + trace!(target:"gossip", "Ignored already known message from {}", who); + protocol.report_peer(who.clone(), DUPLICATE_GOSSIP_REPUTATION_CHANGE); + continue; + } - let engine_id = message.engine_id; - // validate the message - let validation = self.validators.get(&engine_id) - .cloned() - .map(|v| { - let mut context = NetworkContext { gossip: self, protocol, engine_id }; - v.validate(&mut context, &who, &message.data) - }); + let engine_id = message.engine_id; + // validate the message + let validation = self.validators.get(&engine_id) + .cloned() + .map(|v| { + let mut context = NetworkContext { gossip: self, protocol, engine_id }; + v.validate(&mut context, &who, &message.data) + }); - let validation_result = match validation { - Some(ValidationResult::ProcessAndKeep(topic)) => Some((topic, true)), - Some(ValidationResult::ProcessAndDiscard(topic)) => Some((topic, false)), - Some(ValidationResult::Discard) => None, - None => { - trace!(target:"gossip", "Unknown message engine id {:?} from {}", engine_id, who); - protocol.report_peer(who.clone(), UNKNOWN_GOSSIP_REPUTATION_CHANGE); - protocol.disconnect_peer(who); - return; - } - }; + let validation_result = match validation { + Some(ValidationResult::ProcessAndKeep(topic)) => Some((topic, true)), + Some(ValidationResult::ProcessAndDiscard(topic)) => Some((topic, false)), + Some(ValidationResult::Discard) => None, + None => { + trace!(target:"gossip", "Unknown message engine id {:?} from {}", engine_id, who); + protocol.report_peer(who.clone(), UNKNOWN_GOSSIP_REPUTATION_CHANGE); + protocol.disconnect_peer(who.clone()); + continue; + } + }; - if let Some((topic, keep)) = validation_result { - protocol.report_peer(who.clone(), GOSSIP_SUCCESS_REPUTATION_CHANGE); - if let Some(ref mut peer) = self.peers.get_mut(&who) { - peer.known_messages.insert(message_hash); - if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) { - debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic); - entry.get_mut().retain(|sink| { - if let Err(e) = sink.unbounded_send(TopicNotification { - message: message.data.clone(), - sender: Some(who.clone()) - }) { - trace!(target: "gossip", "Error broadcasting message notification: {:?}", e); + if let Some((topic, keep)) = validation_result { + protocol.report_peer(who.clone(), GOSSIP_SUCCESS_REPUTATION_CHANGE); + if let Some(ref mut peer) = self.peers.get_mut(&who) { + peer.known_messages.insert(message_hash); + if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) { + debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic); + entry.get_mut().retain(|sink| { + if let Err(e) = sink.unbounded_send(TopicNotification { + message: message.data.clone(), + sender: Some(who.clone()) + }) { + trace!(target: "gossip", "Error broadcasting message notification: {:?}", e); + } + !sink.is_closed() + }); + if entry.get().is_empty() { + entry.remove_entry(); } - !sink.is_closed() - }); - if entry.get().is_empty() { - entry.remove_entry(); } - } - if keep { - self.register_message_hashed(message_hash, topic, message, Some(who.clone())); + if keep { + self.register_message_hashed(message_hash, topic, message, Some(who.clone())); + } + } else { + trace!(target:"gossip", "Ignored statement from unregistered peer {}", who); + protocol.report_peer(who.clone(), UNREGISTERED_TOPIC_REPUTATION_CHANGE); } } else { - trace!(target:"gossip", "Ignored statement from unregistered peer {}", who); - protocol.report_peer(who.clone(), UNREGISTERED_TOPIC_REPUTATION_CHANGE); + trace!(target:"gossip", "Handled valid one hop message from peer {}", who); } - } else { - trace!(target:"gossip", "Handled valid one hop message from peer {}", who); } } @@ -530,6 +536,7 @@ impl ConsensusGossip { let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast }; if let Some(ref mut peer) = self.peers.get_mut(who) { + let mut batch = Vec::new(); for entry in self.messages.iter().filter(|m| m.topic == topic && m.message.engine_id == engine_id) { if !force && peer.known_messages.contains(&entry.message_hash) { continue @@ -539,11 +546,12 @@ impl ConsensusGossip { } peer.known_messages.insert(entry.message_hash.clone()); trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message); - protocol.send_consensus(who.clone(), ConsensusMessage { + batch.push(ConsensusMessage { engine_id: engine_id.clone(), data: entry.message.data.clone(), }); } + protocol.send_consensus(who.clone(), batch); } } @@ -579,7 +587,7 @@ impl ConsensusGossip { trace!(target: "gossip", "Sending direct to {}: {:?}", who, message); peer.known_messages.insert(message_hash); - protocol.send_consensus(who.clone(), message.clone()); + protocol.send_consensus(who.clone(), vec![message.clone()]); } } diff --git a/core/network/src/protocol/message.rs b/core/network/src/protocol/message.rs index c180bc3669423..82ff791800ee0 100644 --- a/core/network/src/protocol/message.rs +++ b/core/network/src/protocol/message.rs @@ -217,6 +217,8 @@ pub mod generic { FinalityProofRequest(FinalityProofRequest), /// Finality proof reponse. FinalityProofResponse(FinalityProofResponse), + /// Batch of consensus protocol messages. + ConsensusBatch(Vec), /// Chain-specific message. #[codec(index = "255")] ChainSpecific(Vec), @@ -243,6 +245,7 @@ pub mod generic { Message::RemoteReadChildRequest(_) => "RemoteReadChildRequest", Message::FinalityProofRequest(_) => "FinalityProofRequest", Message::FinalityProofResponse(_) => "FinalityProofResponse", + Message::ConsensusBatch(_) => "ConsensusBatch", Message::ChainSpecific(_) => "ChainSpecific", } }