Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Send Status message on all newly-opened legacy substreams (#6593)
Browse files Browse the repository at this point in the history
* Send Status message on all newly-opened legacy substreams

* Fix tests
  • Loading branch information
tomaka authored Jul 8, 2020
1 parent 802a0d0 commit 83b06a2
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 116 deletions.
40 changes: 23 additions & 17 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,22 @@ impl<B: BlockT> BlockAnnouncesHandshake<B> {
}
}

/// Builds a SCALE-encoded "Status" message to send as handshake for the legacy protocol.
fn build_status_message<B: BlockT>(protocol_config: &ProtocolConfig, chain: &Arc<dyn Client<B>>) -> Vec<u8> {
let info = chain.info();
let status = message::generic::Status {
version: CURRENT_VERSION,
min_supported_version: MIN_VERSION,
genesis_hash: info.genesis_hash,
roles: protocol_config.roles.into(),
best_number: info.best_number,
best_hash: info.best_hash,
chain_status: Vec::new(), // TODO: find a way to make this backwards-compatible
};

Message::<B>::Status(status).encode()
}

/// Fallback mechanism to use to send a notification if no substream is open.
#[derive(Debug, Clone, PartialEq, Eq)]
enum Fallback {
Expand Down Expand Up @@ -403,6 +419,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
local_peer_id,
protocol_id.clone(),
versions,
build_status_message(&config, &chain),
peerset,
queue_size_report
);
Expand Down Expand Up @@ -547,6 +564,11 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
pub fn update_chain(&mut self) {
let info = self.context_data.chain.info();
self.sync.update_chain_info(&info.best_hash, info.best_number);
self.behaviour.set_legacy_handshake_message(build_status_message(&self.config, &self.context_data.chain));
self.behaviour.set_notif_protocol_handshake(
&self.block_announces_protocol,
BlockAnnouncesHandshake::build(&self.config, &self.context_data.chain).encode()
);
}

/// Inform sync about an own imported block.
Expand Down Expand Up @@ -683,7 +705,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
pub fn on_peer_connected(&mut self, who: PeerId) {
trace!(target: "sync", "Connecting {}", who);
self.handshaking_peers.insert(who.clone(), HandshakingPeer { timestamp: Instant::now() });
self.send_status(who);
}

/// Called by peer when it is disconnecting
Expand Down Expand Up @@ -1329,22 +1350,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}

/// Send Status message
fn send_status(&mut self, who: PeerId) {
let info = self.context_data.chain.info();
let status = message::generic::Status {
version: CURRENT_VERSION,
min_supported_version: MIN_VERSION,
genesis_hash: info.genesis_hash,
roles: self.config.roles,
best_number: info.best_number,
best_hash: info.best_hash,
chain_status: Vec::new(), // TODO: find a way to make this backwards-compatible
};

self.send_message(&who, None, GenericMessage::Status(status))
}

fn on_block_announce(
&mut self,
who: PeerId,
Expand Down Expand Up @@ -1498,6 +1503,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
});
if let Some((best_num, best_hash)) = new_best {
self.sync.update_chain_info(&best_hash, best_num);
self.behaviour.set_legacy_handshake_message(build_status_message(&self.config, &self.context_data.chain));
self.behaviour.set_notif_protocol_handshake(
&self.block_announces_protocol,
BlockAnnouncesHandshake::build(&self.config, &self.context_data.chain).encode()
Expand Down
48 changes: 15 additions & 33 deletions client/network/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ use libp2p::swarm::{
PollParameters
};
use log::{debug, error, trace, warn};
use parking_lot::RwLock;
use prometheus_endpoint::HistogramVec;
use rand::distributions::{Distribution as _, Uniform};
use smallvec::SmallVec;
use std::task::{Context, Poll};
use std::{borrow::Cow, cmp, collections::{hash_map::Entry, VecDeque}};
use std::{error, mem, pin::Pin, str, time::Duration};
use std::{error, mem, pin::Pin, str, sync::Arc, time::Duration};
use wasm_timer::Instant;

/// Network behaviour that handles opening substreams for custom protocols with other peers.
Expand Down Expand Up @@ -118,7 +119,7 @@ pub struct GenericProto {
/// Notification protocols. Entries are only ever added and not removed.
/// Contains, for each protocol, the protocol name and the message to send as part of the
/// initial handshake.
notif_protocols: Vec<(Cow<'static, [u8]>, Vec<u8>)>,
notif_protocols: Vec<(Cow<'static, [u8]>, Arc<RwLock<Vec<u8>>>)>,

/// Receiver for instructions about who to connect to or disconnect from.
peerset: sc_peerset::Peerset,
Expand Down Expand Up @@ -220,20 +221,6 @@ enum PeerState {
}

impl PeerState {
/// True if there exists any established connection to the peer.
fn is_connected(&self) -> bool {
match self {
PeerState::Disabled { .. } |
PeerState::DisabledPendingEnable { .. } |
PeerState::Enabled { .. } |
PeerState::PendingRequest { .. } |
PeerState::Requested |
PeerState::Incoming { .. } => true,
PeerState::Poisoned |
PeerState::Banned { .. } => false,
}
}

/// True if there exists an established connection to the peer
/// that is open for custom protocol traffic.
fn is_open(&self) -> bool {
Expand Down Expand Up @@ -343,10 +330,12 @@ impl GenericProto {
local_peer_id: PeerId,
protocol: impl Into<ProtocolId>,
versions: &[u8],
handshake_message: Vec<u8>,
peerset: sc_peerset::Peerset,
queue_size_report: Option<HistogramVec>,
) -> Self {
let legacy_protocol = RegisteredProtocol::new(protocol, versions);
let legacy_handshake_message = Arc::new(RwLock::new(handshake_message));
let legacy_protocol = RegisteredProtocol::new(protocol, versions, legacy_handshake_message);

GenericProto {
local_peer_id,
Expand All @@ -372,7 +361,7 @@ impl GenericProto {
protocol_name: impl Into<Cow<'static, [u8]>>,
handshake_msg: impl Into<Vec<u8>>
) {
self.notif_protocols.push((protocol_name.into(), handshake_msg.into()));
self.notif_protocols.push((protocol_name.into(), Arc::new(RwLock::new(handshake_msg.into()))));
}

/// Modifies the handshake of the given notifications protocol.
Expand All @@ -383,24 +372,17 @@ impl GenericProto {
protocol_name: &[u8],
handshake_message: impl Into<Vec<u8>>
) {
let handshake_message = handshake_message.into();
if let Some(protocol) = self.notif_protocols.iter_mut().find(|(name, _)| name == &protocol_name) {
protocol.1 = handshake_message.clone();
} else {
return;
*protocol.1.write() = handshake_message.into();
}
}

// Send an event to all the peers we're connected to, updating the handshake message.
for (peer_id, _) in self.peers.iter().filter(|(_, state)| state.is_connected()) {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::All,
event: NotifsHandlerIn::UpdateHandshake {
protocol_name: Cow::Owned(protocol_name.to_owned()),
handshake_message: handshake_message.clone(),
},
});
}
/// Modifies the handshake of the legacy protocol.
pub fn set_legacy_handshake_message(
&mut self,
handshake_message: impl Into<Vec<u8>>
) {
*self.legacy_protocol.handshake_message().write() = handshake_message.into();
}

/// Returns the number of discovered nodes that we keep in memory.
Expand Down
54 changes: 20 additions & 34 deletions client/network/src/protocol/generic_proto/handler/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ use libp2p::swarm::{
NegotiatedSubstream,
};
use log::{debug, error};
use parking_lot::RwLock;
use prometheus_endpoint::HistogramVec;
use std::{borrow::Cow, error, io, str, task::{Context, Poll}};
use std::{borrow::Cow, error, io, str, sync::Arc, task::{Context, Poll}};

/// Implements the `IntoProtocolsHandler` trait of libp2p.
///
Expand All @@ -77,10 +78,10 @@ use std::{borrow::Cow, error, io, str, task::{Context, Poll}};
pub struct NotifsHandlerProto {
/// Prototypes for handlers for inbound substreams, and the message we respond with in the
/// handshake.
in_handlers: Vec<(NotifsInHandlerProto, Vec<u8>)>,
in_handlers: Vec<(NotifsInHandlerProto, Arc<RwLock<Vec<u8>>>)>,

/// Prototypes for handlers for outbound substreams, and the initial handshake message we send.
out_handlers: Vec<(NotifsOutHandlerProto, Vec<u8>)>,
out_handlers: Vec<(NotifsOutHandlerProto, Arc<RwLock<Vec<u8>>>)>,

/// Prototype for handler for backwards-compatibility.
legacy: LegacyProtoHandlerProto,
Expand All @@ -91,10 +92,10 @@ pub struct NotifsHandlerProto {
/// See the documentation at the module level for more information.
pub struct NotifsHandler {
/// Handlers for inbound substreams, and the message we respond with in the handshake.
in_handlers: Vec<(NotifsInHandler, Vec<u8>)>,
in_handlers: Vec<(NotifsInHandler, Arc<RwLock<Vec<u8>>>)>,

/// Handlers for outbound substreams, and the initial handshake message we send.
out_handlers: Vec<(NotifsOutHandler, Vec<u8>)>,
out_handlers: Vec<(NotifsOutHandler, Arc<RwLock<Vec<u8>>>)>,

/// Handler for backwards-compatibility.
legacy: LegacyProtoHandler,
Expand Down Expand Up @@ -161,18 +162,6 @@ pub enum NotifsHandlerIn {
message: Vec<u8>,
},

/// Modifies the handshake message of a notifications protocol.
UpdateHandshake {
/// Name of the protocol for the message.
///
/// Must match one of the registered protocols.
protocol_name: Cow<'static, [u8]>,

/// The new handshake message to send if we open a substream or if the remote opens a
/// substream towards us.
handshake_message: Vec<u8>,
},

/// Sends a notifications message.
SendNotification {
/// Name of the protocol for the message.
Expand Down Expand Up @@ -253,7 +242,7 @@ impl NotifsHandlerProto {
/// messages queue. If passed, it must have one label for the protocol name.
pub fn new(
legacy: RegisteredProtocol,
list: impl Into<Vec<(Cow<'static, [u8]>, Vec<u8>)>>,
list: impl Into<Vec<(Cow<'static, [u8]>, Arc<RwLock<Vec<u8>>>)>>,
queue_size_report: Option<HistogramVec>
) -> Self {
let list = list.into();
Expand Down Expand Up @@ -346,12 +335,17 @@ impl ProtocolsHandler for NotifsHandler {
self.enabled = EnabledState::Enabled;
self.legacy.inject_event(LegacyProtoHandlerIn::Enable);
for (handler, initial_message) in &mut self.out_handlers {
// We create `initial_message` on a separate line to be sure that the lock
// is released as soon as possible.
let initial_message = initial_message.read().clone();
handler.inject_event(NotifsOutHandlerIn::Enable {
initial_message: initial_message.clone(),
initial_message,
});
}
for num in self.pending_in.drain(..) {
let handshake_message = self.in_handlers[num].1.clone();
// We create `handshake_message` on a separate line to be sure
// that the lock is released as soon as possible.
let handshake_message = self.in_handlers[num].1.read().clone();
self.in_handlers[num].0
.inject_event(NotifsInHandlerIn::Accept(handshake_message));
}
Expand All @@ -375,18 +369,6 @@ impl ProtocolsHandler for NotifsHandler {
},
NotifsHandlerIn::SendLegacy { message } =>
self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { message }),
NotifsHandlerIn::UpdateHandshake { protocol_name, handshake_message } => {
for (handler, current_handshake) in &mut self.in_handlers {
if handler.protocol_name() == &*protocol_name {
*current_handshake = handshake_message.clone();
}
}
for (handler, current_handshake) in &mut self.out_handlers {
if handler.protocol_name() == &*protocol_name {
*current_handshake = handshake_message.clone();
}
}
}
NotifsHandlerIn::SendNotification { message, encoded_fallback_message, protocol_name } => {
for (handler, _) in &mut self.out_handlers {
if handler.protocol_name() != &protocol_name[..] {
Expand Down Expand Up @@ -524,8 +506,12 @@ impl ProtocolsHandler for NotifsHandler {
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) =>
match self.enabled {
EnabledState::Initial => self.pending_in.push(handler_num),
EnabledState::Enabled =>
handler.inject_event(NotifsInHandlerIn::Accept(handshake_message.clone())),
EnabledState::Enabled => {
// We create `handshake_message` on a separate line to be sure
// that the lock is released as soon as possible.
let handshake_message = handshake_message.read().clone();
handler.inject_event(NotifsInHandlerIn::Accept(handshake_message))
},
EnabledState::Disabled =>
handler.inject_event(NotifsInHandlerIn::Refuse),
},
Expand Down
10 changes: 9 additions & 1 deletion client/network/src/protocol/generic_proto/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
});

let behaviour = CustomProtoWithAddr {
inner: GenericProto::new(local_peer_id, &b"test"[..], &[1], peerset, None),
inner: GenericProto::new(local_peer_id, &b"test"[..], &[1], vec![], peerset, None),
addrs: addrs
.iter()
.enumerate()
Expand Down Expand Up @@ -241,6 +241,8 @@ fn two_nodes_transfer_lots_of_packets() {
);
}
},
// An empty handshake is being sent after opening.
Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {},
_ => panic!(),
}
}
Expand All @@ -251,6 +253,8 @@ fn two_nodes_transfer_lots_of_packets() {
loop {
match ready!(service2.poll_next_unpin(cx)) {
Some(GenericProtoOut::CustomProtocolOpen { .. }) => {},
// An empty handshake is being sent after opening.
Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {},
Some(GenericProtoOut::LegacyMessage { message, .. }) => {
match Message::<Block>::decode(&mut &message[..]).unwrap() {
Message::<Block>::BlockResponse(BlockResponse { id: _, blocks }) => {
Expand Down Expand Up @@ -312,6 +316,8 @@ fn basic_two_nodes_requests_in_parallel() {
service1.send_packet(&peer_id, msg.encode());
}
},
// An empty handshake is being sent after opening.
Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {},
_ => panic!(),
}
}
Expand All @@ -321,6 +327,8 @@ fn basic_two_nodes_requests_in_parallel() {
loop {
match ready!(service2.poll_next_unpin(cx)) {
Some(GenericProtoOut::CustomProtocolOpen { .. }) => {},
// An empty handshake is being sent after opening.
Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {},
Some(GenericProtoOut::LegacyMessage { message, .. }) => {
let pos = to_receive.iter().position(|m| m.encode() == message).unwrap();
to_receive.remove(pos);
Expand Down
Loading

0 comments on commit 83b06a2

Please sign in to comment.