Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Batch gossip messages #4055

Merged
merged 4 commits into from
Nov 8, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 43 additions & 16 deletions core/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,14 @@ const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100);
const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900);

/// Current protocol version.
pub(crate) const CURRENT_VERSION: u32 = 4;
pub(crate) const CURRENT_VERSION: u32 = 5;
/// Lowest version we support
pub(crate) const MIN_VERSION: u32 = 3;

// Maximum allowed entries in `BlockResponse`
const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
// Maximum allowed entries in `ConsensusBatch`
const MAX_CONSENSUS_MESSAGES: usize = 256;
/// When light node connects to the full node and the full node is behind light node
/// for at least `LIGHT_MAXIMAL_BLOCKS_DIFFERENCE` blocks, we consider it unuseful
/// and disconnect to free connection slot.
Expand Down Expand Up @@ -298,7 +300,7 @@ pub trait Context<B: BlockT> {
fn disconnect_peer(&mut self, who: PeerId);

/// Send a consensus message to a peer.
fn send_consensus(&mut self, who: PeerId, consensus: ConsensusMessage);
fn send_consensus(&mut self, who: PeerId, messages: Vec<ConsensusMessage>);

/// Send a chain-specific message to a peer.
fn send_chain_specific(&mut self, who: PeerId, message: Vec<u8>);
Expand Down Expand Up @@ -330,13 +332,33 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B,
self.behaviour.disconnect_peer(&who)
}

fn send_consensus(&mut self, who: PeerId, consensus: ConsensusMessage) {
send_message::<B> (
self.behaviour,
&mut self.context_data.stats,
&who,
GenericMessage::Consensus(consensus)
)
fn send_consensus(&mut self, who: PeerId, messages: Vec<ConsensusMessage>) {
if self.context_data.peers.get(&who).map_or(false, |peer| peer.info.protocol_version > 4) {
let mut batch = Vec::new();
let len = messages.len();
for (index, message) in messages.into_iter().enumerate() {
batch.reserve(MAX_CONSENSUS_MESSAGES);
batch.push(message);
if batch.len() == MAX_CONSENSUS_MESSAGES || index == len - 1 {
send_message::<B> (
self.behaviour,
&mut self.context_data.stats,
&who,
GenericMessage::ConsensusBatch(std::mem::replace(&mut batch, Vec::new())),
)
}
}
} else {
// Backwards compatibility
for message in messages {
send_message::<B> (
self.behaviour,
&mut self.context_data.stats,
&who,
GenericMessage::Consensus(message)
)
}
}
}

fn send_chain_specific(&mut self, who: PeerId, message: Vec<u8>) {
Expand Down Expand Up @@ -598,13 +620,18 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
GenericMessage::RemoteReadChildRequest(request) =>
self.on_remote_read_child_request(who, request),
GenericMessage::Consensus(msg) => {
if self.context_data.peers.get(&who).map_or(false, |peer| peer.info.protocol_version > 2) {
self.consensus_gossip.on_incoming(
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
who,
msg,
);
}
self.consensus_gossip.on_incoming(
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
who,
vec![msg],
);
}
GenericMessage::ConsensusBatch(messages) => {
self.consensus_gossip.on_incoming(
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
who,
messages,
);
}
GenericMessage::ChainSpecific(msg) => self.specialization.on_message(
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
Expand Down
122 changes: 64 additions & 58 deletions core/network/src/protocol/consensus_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> {

/// Send addressed message to a peer.
fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
self.protocol.send_consensus(who.clone(), ConsensusMessage {
self.protocol.send_consensus(who.clone(), vec![ConsensusMessage {
engine_id: self.engine_id,
data: message,
});
}]);
}

/// Send all messages with given topic to a peer.
Expand All @@ -190,7 +190,7 @@ fn propagate<'a, B: BlockT, I>(
peers: &mut HashMap<PeerId, PeerConsensus<B::Hash>>,
validators: &HashMap<ConsensusEngineId, Arc<dyn Validator<B>>>,
)
where I: IntoIterator<Item=(&'a B::Hash, &'a B::Hash, &'a ConsensusMessage)>, // (msg_hash, topic, message)
where I: Clone + IntoIterator<Item=(&'a B::Hash, &'a B::Hash, &'a ConsensusMessage)>, // (msg_hash, topic, message)
{
let mut check_fns = HashMap::new();
let mut message_allowed = move |who: &PeerId, intent: MessageIntent, topic: &B::Hash, message: &ConsensusMessage| {
Expand All @@ -206,8 +206,9 @@ fn propagate<'a, B: BlockT, I>(
(check_fn)(who, intent, topic, &message.data)
};

for (message_hash, topic, message) in messages {
for (id, ref mut peer) in peers.iter_mut() {
for (id, ref mut peer) in peers.iter_mut() {
let mut batch = Vec::new();
for (message_hash, topic, message) in messages.clone() {
let previous_attempts = peer.filtered_messages
.get(&message_hash)
.cloned()
Expand Down Expand Up @@ -245,8 +246,9 @@ fn propagate<'a, B: BlockT, I>(
peer.known_messages.insert(message_hash.clone());

trace!(target: "gossip", "Propagating to {}: {:?}", id, message);
protocol.send_consensus(id.clone(), message.clone());
batch.push(message.clone())
}
protocol.send_consensus(id.clone(), batch);
}
}

Expand Down Expand Up @@ -477,65 +479,68 @@ impl<B: BlockT> ConsensusGossip<B> {
&mut self,
protocol: &mut dyn Context<B>,
who: PeerId,
message: ConsensusMessage,
messages: Vec<ConsensusMessage>,
) {
let message_hash = HashFor::<B>::hash(&message.data[..]);
trace!(target:"gossip", "Received {} messages from peer {}", messages.len(), who);
for message in messages {
let message_hash = HashFor::<B>::hash(&message.data[..]);

if self.known_messages.contains_key(&message_hash) {
trace!(target:"gossip", "Ignored already known message from {}", who);
protocol.report_peer(who.clone(), DUPLICATE_GOSSIP_REPUTATION_CHANGE);
return;
}
if self.known_messages.contains_key(&message_hash) {
trace!(target:"gossip", "Ignored already known message from {}", who);
protocol.report_peer(who.clone(), DUPLICATE_GOSSIP_REPUTATION_CHANGE);
continue;
}

let engine_id = message.engine_id;
// validate the message
let validation = self.validators.get(&engine_id)
.cloned()
.map(|v| {
let mut context = NetworkContext { gossip: self, protocol, engine_id };
v.validate(&mut context, &who, &message.data)
});
let engine_id = message.engine_id;
// validate the message
let validation = self.validators.get(&engine_id)
.cloned()
.map(|v| {
let mut context = NetworkContext { gossip: self, protocol, engine_id };
v.validate(&mut context, &who, &message.data)
});

let validation_result = match validation {
Some(ValidationResult::ProcessAndKeep(topic)) => Some((topic, true)),
Some(ValidationResult::ProcessAndDiscard(topic)) => Some((topic, false)),
Some(ValidationResult::Discard) => None,
None => {
trace!(target:"gossip", "Unknown message engine id {:?} from {}", engine_id, who);
protocol.report_peer(who.clone(), UNKNOWN_GOSSIP_REPUTATION_CHANGE);
protocol.disconnect_peer(who);
return;
}
};
let validation_result = match validation {
Some(ValidationResult::ProcessAndKeep(topic)) => Some((topic, true)),
Some(ValidationResult::ProcessAndDiscard(topic)) => Some((topic, false)),
Some(ValidationResult::Discard) => None,
None => {
trace!(target:"gossip", "Unknown message engine id {:?} from {}", engine_id, who);
protocol.report_peer(who.clone(), UNKNOWN_GOSSIP_REPUTATION_CHANGE);
protocol.disconnect_peer(who.clone());
continue;
}
};

if let Some((topic, keep)) = validation_result {
protocol.report_peer(who.clone(), GOSSIP_SUCCESS_REPUTATION_CHANGE);
if let Some(ref mut peer) = self.peers.get_mut(&who) {
peer.known_messages.insert(message_hash);
if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) {
debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic);
entry.get_mut().retain(|sink| {
if let Err(e) = sink.unbounded_send(TopicNotification {
message: message.data.clone(),
sender: Some(who.clone())
}) {
trace!(target: "gossip", "Error broadcasting message notification: {:?}", e);
if let Some((topic, keep)) = validation_result {
protocol.report_peer(who.clone(), GOSSIP_SUCCESS_REPUTATION_CHANGE);
if let Some(ref mut peer) = self.peers.get_mut(&who) {
peer.known_messages.insert(message_hash);
if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) {
debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic);
entry.get_mut().retain(|sink| {
if let Err(e) = sink.unbounded_send(TopicNotification {
message: message.data.clone(),
sender: Some(who.clone())
}) {
trace!(target: "gossip", "Error broadcasting message notification: {:?}", e);
}
!sink.is_closed()
});
if entry.get().is_empty() {
entry.remove_entry();
}
!sink.is_closed()
});
if entry.get().is_empty() {
entry.remove_entry();
}
}
if keep {
self.register_message_hashed(message_hash, topic, message, Some(who.clone()));
if keep {
self.register_message_hashed(message_hash, topic, message, Some(who.clone()));
}
} else {
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
protocol.report_peer(who.clone(), UNREGISTERED_TOPIC_REPUTATION_CHANGE);
}
} else {
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
protocol.report_peer(who.clone(), UNREGISTERED_TOPIC_REPUTATION_CHANGE);
trace!(target:"gossip", "Handled valid one hop message from peer {}", who);
}
} else {
trace!(target:"gossip", "Handled valid one hop message from peer {}", who);
}
}

Expand All @@ -555,6 +560,7 @@ impl<B: BlockT> ConsensusGossip<B> {
};

if let Some(ref mut peer) = self.peers.get_mut(who) {
let mut batch = Vec::new();
for entry in self.messages.iter().filter(|m| m.topic == topic && m.message.engine_id == engine_id) {
let intent = if force {
MessageIntent::ForcedBroadcast
Expand Down Expand Up @@ -585,11 +591,12 @@ impl<B: BlockT> ConsensusGossip<B> {
peer.known_messages.insert(entry.message_hash.clone());

trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message);
protocol.send_consensus(who.clone(), ConsensusMessage {
batch.push(ConsensusMessage {
engine_id: engine_id.clone(),
data: entry.message.data.clone(),
});
}
protocol.send_consensus(who.clone(), batch);
}
}

Expand Down Expand Up @@ -626,8 +633,7 @@ impl<B: BlockT> ConsensusGossip<B> {

peer.filtered_messages.remove(&message_hash);
peer.known_messages.insert(message_hash);

protocol.send_consensus(who.clone(), message.clone());
protocol.send_consensus(who.clone(), vec![message.clone()]);
}
}

Expand Down
3 changes: 3 additions & 0 deletions core/network/src/protocol/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ pub mod generic {
FinalityProofRequest(FinalityProofRequest<Hash>),
/// Finality proof reponse.
FinalityProofResponse(FinalityProofResponse<Hash>),
/// Batch of consensus protocol messages.
ConsensusBatch(Vec<ConsensusMessage>),
/// Chain-specific message.
#[codec(index = "255")]
ChainSpecific(Vec<u8>),
Expand All @@ -243,6 +245,7 @@ pub mod generic {
Message::RemoteReadChildRequest(_) => "RemoteReadChildRequest",
Message::FinalityProofRequest(_) => "FinalityProofRequest",
Message::FinalityProofResponse(_) => "FinalityProofResponse",
Message::ConsensusBatch(_) => "ConsensusBatch",
Message::ChainSpecific(_) => "ChainSpecific",
}
}
Expand Down