Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Allow fallback names for protocols #8682

Merged
7 commits merged into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions client/finality-grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ fn good_commit_leads_to_relay() {
let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened {
remote: sender_id.clone(),
protocol: GRANDPA_PROTOCOL_NAME.into(),
negotiated_fallback: None,
role: ObservedRole::Full,
});

Expand All @@ -308,6 +309,7 @@ fn good_commit_leads_to_relay() {
let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened {
remote: receiver_id.clone(),
protocol: GRANDPA_PROTOCOL_NAME.into(),
negotiated_fallback: None,
role: ObservedRole::Full,
});

Expand Down Expand Up @@ -442,6 +444,7 @@ fn bad_commit_leads_to_report() {
let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened {
remote: sender_id.clone(),
protocol: GRANDPA_PROTOCOL_NAME.into(),
negotiated_fallback: None,
role: ObservedRole::Full,
});
let _ = sender.unbounded_send(NetworkEvent::NotificationsReceived {
Expand Down
5 changes: 3 additions & 2 deletions client/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@ pub struct GrandpaParams<Block: BlockT, C, N, SC, VR> {
pub fn grandpa_peers_set_config() -> sc_network::config::NonDefaultSetConfig {
sc_network::config::NonDefaultSetConfig {
notifications_protocol: communication::GRANDPA_PROTOCOL_NAME.into(),
fallback_names: Vec::new(),
// Notifications reach ~256kiB in size at the time of writing on Kusama and Polkadot.
max_notification_size: 1024 * 1024,
set_config: sc_network::config::SetConfig {
Expand Down Expand Up @@ -1134,12 +1135,12 @@ fn local_authority_id(
voters: &VoterSet<AuthorityId>,
keystore: Option<&SyncCryptoStorePtr>,
) -> Option<AuthorityId> {
keystore.and_then(|keystore| {
keystore.and_then(|keystore| {
voters
.iter()
.find(|(p, _)| {
SyncCryptoStore::has_keys(&**keystore, &[(p.to_raw_vec(), AuthorityId::ID)])
})
.map(|(p, _)| p.clone())
.map(|(p, _)| p.clone())
})
}
4 changes: 3 additions & 1 deletion client/network-gossip/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl<B: BlockT> Future for GossipEngine<B> {
Event::SyncDisconnected { remote } => {
this.network.remove_set_reserved(remote, this.protocol.clone());
}
Event::NotificationStreamOpened { remote, protocol, role } => {
Event::NotificationStreamOpened { remote, protocol, role, .. } => {
if protocol != this.protocol {
continue;
}
Expand Down Expand Up @@ -416,6 +416,7 @@ mod tests {
Event::NotificationStreamOpened {
remote: remote_peer.clone(),
protocol: protocol.clone(),
negotiated_fallback: None,
role: ObservedRole::Authority,
}
).expect("Event stream is unbounded; qed.");
Expand Down Expand Up @@ -575,6 +576,7 @@ mod tests {
Event::NotificationStreamOpened {
remote: remote_peer.clone(),
protocol: protocol.clone(),
negotiated_fallback: None,
role: ObservedRole::Authority,
}
).expect("Event stream is unbounded; qed.");
Expand Down
10 changes: 9 additions & 1 deletion client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ pub enum BehaviourOut<B: BlockT> {
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
protocol: Cow<'static, str>,
/// If the negotiation didn't use the main name of the protocol (the one in
/// `notifications_protocol`), then this field contains which name has actually been
/// used.
/// See also [`crate::Event::NotificationStreamOpened`].
negotiated_fallback: Option<Cow<'static, str>>,
/// Object that permits sending notifications to the peer.
notifications_sink: NotificationsSink,
/// Role of the remote.
Expand Down Expand Up @@ -324,10 +329,13 @@ Behaviour<B> {
&target, &self.block_request_protocol_name, buf, pending_response, IfDisconnected::ImmediateError,
);
},
CustomMessageOutcome::NotificationStreamOpened { remote, protocol, roles, notifications_sink } => {
CustomMessageOutcome::NotificationStreamOpened {
remote, protocol, negotiated_fallback, roles, notifications_sink
} => {
self.events.push_back(BehaviourOut::NotificationStreamOpened {
remote,
protocol,
negotiated_fallback,
role: reported_roles_to_observed_role(roles),
notifications_sink: notifications_sink.clone(),
});
Expand Down
8 changes: 8 additions & 0 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,13 @@ pub struct NonDefaultSetConfig {
/// > **Note**: This field isn't present for the default set, as this is handled internally
/// > by the networking code.
pub notifications_protocol: Cow<'static, str>,
/// If the remote reports that it doesn't support the protocol indicated in the
/// `notifications_protocol` field, then each of these fallback names will be tried one by
/// one.
///
/// If a fallback is used, it will be reported in
/// [`crate::Event::NotificationStreamOpened::negotiated_fallback`].
pub fallback_names: Vec<Cow<'static, str>>,
/// Maximum allowed size of single notifications.
pub max_notification_size: u64,
/// Base configuration.
Expand All @@ -553,6 +560,7 @@ impl NonDefaultSetConfig {
NonDefaultSetConfig {
notifications_protocol,
max_notification_size,
fallback_names: Vec::new(),
set_config: SetConfig {
in_peers: 0,
out_peers: 0,
Expand Down
2 changes: 2 additions & 0 deletions client/network/src/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ fn build_nodes_one_proto()
extra_sets: vec![
config::NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME,
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
set_config: Default::default()
}
Expand All @@ -173,6 +174,7 @@ fn build_nodes_one_proto()
extra_sets: vec![
config::NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME,
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
set_config: config::SetConfig {
reserved_nodes: vec![config::MultiaddrWithPeerId {
Expand Down
26 changes: 23 additions & 3 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,24 @@ impl<B: BlockT> Protocol<B> {
genesis_hash,
).encode();

let sync_protocol_config = notifications::ProtocolConfig {
name: block_announces_protocol,
fallback_names: Vec::new(),
handshake: block_announces_handshake,
max_notification_size: MAX_BLOCK_ANNOUNCE_SIZE,
};

Notifications::new(
peerset,
iter::once((block_announces_protocol, block_announces_handshake, MAX_BLOCK_ANNOUNCE_SIZE))
iter::once(sync_protocol_config)
.chain(network_config.extra_sets.iter()
.zip(notifications_protocols_handshakes)
.map(|(s, hs)| (s.notifications_protocol.clone(), hs, s.max_notification_size))
.map(|(s, hs)| notifications::ProtocolConfig {
name: s.notifications_protocol.clone(),
kpp marked this conversation as resolved.
Show resolved Hide resolved
fallback_names: s.fallback_names.clone(),
handshake: hs,
max_notification_size: s.max_notification_size,
})
),
)
};
Expand Down Expand Up @@ -1154,6 +1166,8 @@ pub enum CustomMessageOutcome<B: BlockT> {
NotificationStreamOpened {
remote: PeerId,
protocol: Cow<'static, str>,
/// See [`crate::Event::NotificationStreamOpened::negotiated_fallback`].
negotiated_fallback: Option<Cow<'static, str>>,
roles: Roles,
notifications_sink: NotificationsSink
},
Expand Down Expand Up @@ -1346,9 +1360,13 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
};

let outcome = match event {
NotificationsOut::CustomProtocolOpen { peer_id, set_id, received_handshake, notifications_sink, .. } => {
NotificationsOut::CustomProtocolOpen {
peer_id, set_id, received_handshake, notifications_sink, negotiated_fallback
} => {
// Set number 0 is hardcoded the default set of peers we sync from.
if set_id == HARDCODED_PEERSETS_SYNC {
debug_assert!(negotiated_fallback.is_none());

// `received_handshake` can be either a `Status` message if received from the
// legacy substream ,or a `BlockAnnouncesHandshake` if received from the block
// announces substream.
Expand Down Expand Up @@ -1408,6 +1426,7 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
protocol: self.notification_protocols[usize::from(set_id) - NUM_HARDCODED_PEERSETS].clone(),
negotiated_fallback,
roles,
notifications_sink,
},
Expand All @@ -1419,6 +1438,7 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
protocol: self.notification_protocols[usize::from(set_id) - NUM_HARDCODED_PEERSETS].clone(),
negotiated_fallback,
roles: peer.info.roles,
notifications_sink,
}
Expand Down
9 changes: 9 additions & 0 deletions client/network/src/protocol/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,16 @@ pub enum Event {
/// Node we opened the substream with.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
/// This is always equal to the value of
/// [`crate::config::NonDefaultSetConfig::notifications_protocol`] of one of the
/// configured sets.
protocol: Cow<'static, str>,
/// If the negotiation didn't use the main name of the protocol (the one in
/// `notifications_protocol`), then this field contains which name has actually been
/// used.
/// Always contains a value equal to the value in
/// [`crate::config::NonDefaultSetConfig::fallback_names`].
negotiated_fallback: Option<Cow<'static, str>>,
kpp marked this conversation as resolved.
Show resolved Hide resolved
/// Role of the remote.
role: ObservedRole,
},
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/protocol/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! Implementation of libp2p's `NetworkBehaviour` trait that establishes communications and opens
//! notifications substreams.

pub use self::behaviour::{Notifications, NotificationsOut};
pub use self::behaviour::{Notifications, NotificationsOut, ProtocolConfig};
pub use self::handler::{NotifsHandlerError, NotificationsSink, Ready};

mod behaviour;
Expand Down
40 changes: 31 additions & 9 deletions client/network/src/protocol/notifications/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::protocol::notifications::{
handler::{NotificationsSink, NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn}
handler::{self, NotificationsSink, NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn}
};

use bytes::BytesMut;
Expand Down Expand Up @@ -95,10 +95,8 @@ use wasm_timer::Instant;
/// accommodates for any number of connections.
///
pub struct Notifications {
/// Notification protocols. Entries are only ever added and not removed.
/// Contains, for each protocol, the protocol name and the message to send as part of the
/// initial handshake.
notif_protocols: Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>, u64)>,
/// Notification protocols. Entries never change after initialization.
notif_protocols: Vec<handler::ProtocolConfig>,

/// Receiver for instructions about who to connect to or disconnect from.
peerset: sc_peerset::Peerset,
Expand Down Expand Up @@ -130,6 +128,19 @@ pub struct Notifications {
events: VecDeque<NetworkBehaviourAction<NotifsHandlerIn, NotificationsOut>>,
}

/// Configuration for a notifications protocol.
#[derive(Debug, Clone)]
pub struct ProtocolConfig {
/// Name of the protocol.
pub name: Cow<'static, str>,
/// Names of the protocol to use if the main one isn't available.
pub fallback_names: Vec<Cow<'static, str>>,
/// Handshake of the protocol.
pub handshake: Vec<u8>,
/// Maximum allowed size for a notification.
pub max_notification_size: u64,
}

/// Identifier for a delay firing.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct DelayId(u64);
Expand Down Expand Up @@ -311,6 +322,9 @@ pub enum NotificationsOut {
peer_id: PeerId,
/// Peerset set ID the substream is tied to.
set_id: sc_peerset::SetId,
/// If `Some`, a fallback protocol name has been used rather the main protocol name.
/// Always matches one of the fallback names passed at initialization.
negotiated_fallback: Option<Cow<'static, str>>,
/// Handshake that was sent to us.
/// This is normally a "Status" message, but this is out of the concern of this code.
received_handshake: Vec<u8>,
Expand Down Expand Up @@ -358,10 +372,15 @@ impl Notifications {
/// Creates a `CustomProtos`.
pub fn new(
peerset: sc_peerset::Peerset,
notif_protocols: impl Iterator<Item = (Cow<'static, str>, Vec<u8>, u64)>,
notif_protocols: impl Iterator<Item = ProtocolConfig>,
) -> Self {
let notif_protocols = notif_protocols
.map(|(n, hs, sz)| (n, Arc::new(RwLock::new(hs)), sz))
.map(|cfg| handler::ProtocolConfig {
name: cfg.name,
fallback_names: cfg.fallback_names,
handshake: Arc::new(RwLock::new(cfg.handshake)),
max_notification_size: cfg.max_notification_size,
})
.collect::<Vec<_>>();

assert!(!notif_protocols.is_empty());
Expand All @@ -385,7 +404,7 @@ impl Notifications {
handshake_message: impl Into<Vec<u8>>
) {
if let Some(p) = self.notif_protocols.get_mut(usize::from(set_id)) {
*p.1.write() = handshake_message.into();
*p.handshake.write() = handshake_message.into();
} else {
log::error!(target: "sub-libp2p", "Unknown handshake change set: {:?}", set_id);
debug_assert!(false);
Expand Down Expand Up @@ -1728,7 +1747,9 @@ impl NetworkBehaviour for Notifications {
}
}

NotifsHandlerOut::OpenResultOk { protocol_index, received_handshake, notifications_sink, .. } => {
NotifsHandlerOut::OpenResultOk {
protocol_index, negotiated_fallback, received_handshake, notifications_sink, ..
} => {
let set_id = sc_peerset::SetId::from(protocol_index);
trace!(target: "sub-libp2p",
"Handler({}, {:?}) => OpenResultOk({:?})",
Expand All @@ -1748,6 +1769,7 @@ impl NetworkBehaviour for Notifications {
let event = NotificationsOut::CustomProtocolOpen {
peer_id: source,
set_id,
negotiated_fallback,
received_handshake,
notifications_sink: notifications_sink.clone(),
};
Expand Down
Loading