Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Change validation & collation protocol names to include genesis hash & fork id #5876

Merged
merged 13 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions node/network/bridge/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct Metrics(pub(crate) Option<MetricsInner>);

fn peer_set_label(peer_set: PeerSet, version: ProtocolVersion) -> &'static str {
// Higher level code is meant to protect against this ever happening.
peer_set.get_protocol_name_static(version).unwrap_or("<internal error>")
peer_set.get_protocol_label(version).unwrap_or("<internal error>")
}

#[allow(missing_docs)]
Expand Down Expand Up @@ -98,7 +98,7 @@ impl Metrics {
self.0.as_ref().map(|metrics| {
metrics
.desired_peer_count
.with_label_values(&[peer_set.get_default_protocol_name()])
.with_label_values(&[peer_set.get_label()])
.set(size as u64)
});
}
Expand Down
30 changes: 14 additions & 16 deletions node/network/bridge/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use sc_network_common::service::{
};

use polkadot_node_network_protocol::{
peer_set::PeerSet,
peer_set::{PeerSet, PeerSetProtocolNames},
request_response::{OutgoingRequest, Recipient, ReqProtocolNames, Requests},
PeerId, ProtocolVersion, UnifiedReputationChange as Rep,
};
Expand All @@ -51,6 +51,7 @@ pub(crate) fn send_message<M>(
mut peers: Vec<PeerId>,
peer_set: PeerSet,
version: ProtocolVersion,
protocol_names: &PeerSetProtocolNames,
message: M,
metrics: &super::Metrics,
) where
Expand All @@ -66,11 +67,13 @@ pub(crate) fn send_message<M>(
// list. The message payload can be quite large. If the underlying
// network used `Bytes` this would not be necessary.
let last_peer = peers.pop();
// optimization: generate the protocol name once.
let protocol_name = protocol_names.get_name(peer_set, version);
bkchr marked this conversation as resolved.
Show resolved Hide resolved
peers.into_iter().for_each(|peer| {
net.write_notification(peer, peer_set, message.clone());
net.write_notification(peer, protocol_name.clone(), message.clone());
});
if let Some(peer) = last_peer {
net.write_notification(peer, peer_set, message);
net.write_notification(peer, protocol_name, message);
}
}

Expand Down Expand Up @@ -107,11 +110,11 @@ pub trait Network: Clone + Send + 'static {
/// Report a given peer as either beneficial (+) or costly (-) according to the given scalar.
fn report_peer(&self, who: PeerId, cost_benefit: Rep);

/// Disconnect a given peer from the peer set specified without harming reputation.
fn disconnect_peer(&self, who: PeerId, peer_set: PeerSet);
/// Disconnect a given peer from the protocol specified without harming reputation.
fn disconnect_peer(&self, who: PeerId, protocol: Cow<'static, str>);

/// Write a notification to a peer on the given peer-set's protocol.
fn write_notification(&self, who: PeerId, peer_set: PeerSet, message: Vec<u8>);
/// Write a notification to a peer on the given protocol.
fn write_notification(&self, who: PeerId, protocol: Cow<'static, str>, message: Vec<u8>);
}

#[async_trait]
Expand All @@ -136,17 +139,12 @@ impl Network for Arc<NetworkService<Block, Hash>> {
NetworkService::report_peer(&**self, who, cost_benefit.into_base_rep());
}

fn disconnect_peer(&self, who: PeerId, peer_set: PeerSet) {
NetworkService::disconnect_peer(&**self, who, peer_set.into_default_protocol_name());
fn disconnect_peer(&self, who: PeerId, protocol: Cow<'static, str>) {
NetworkService::disconnect_peer(&**self, who, protocol);
}

fn write_notification(&self, who: PeerId, peer_set: PeerSet, message: Vec<u8>) {
NetworkService::write_notification(
&**self,
who,
peer_set.into_default_protocol_name(),
message,
);
fn write_notification(&self, who: PeerId, protocol: Cow<'static, str>, message: Vec<u8>) {
NetworkService::write_notification(&**self, who, protocol, message);
}

async fn start_request<AD: AuthorityDiscovery>(
Expand Down
74 changes: 60 additions & 14 deletions node/network/bridge/src/rx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use sp_consensus::SyncOracle;

use polkadot_node_network_protocol::{
self as net_protocol,
peer_set::{PeerSet, PerPeerSet},
peer_set::{CollationVersion, PeerSet, PeerSetProtocolNames, PerPeerSet, ValidationVersion},
v1 as protocol_v1, ObservedRole, OurView, PeerId, ProtocolVersion,
UnifiedReputationChange as Rep, View,
};
Expand Down Expand Up @@ -80,6 +80,7 @@ pub struct NetworkBridgeRx<N, AD> {
sync_oracle: Box<dyn SyncOracle + Send>,
shared: Shared,
metrics: Metrics,
peerset_protocol_names: PeerSetProtocolNames,
}

impl<N, AD> NetworkBridgeRx<N, AD> {
Expand All @@ -92,9 +93,17 @@ impl<N, AD> NetworkBridgeRx<N, AD> {
authority_discovery_service: AD,
sync_oracle: Box<dyn SyncOracle + Send>,
metrics: Metrics,
peerset_protocol_names: PeerSetProtocolNames,
) -> Self {
let shared = Shared::default();
Self { network_service, authority_discovery_service, sync_oracle, shared, metrics }
Self {
network_service,
authority_discovery_service,
sync_oracle,
shared,
metrics,
peerset_protocol_names,
}
}
}

Expand Down Expand Up @@ -147,6 +156,7 @@ async fn handle_network_messages<AD>(
mut authority_discovery_service: AD,
metrics: Metrics,
shared: Shared,
peerset_protocol_names: PeerSetProtocolNames,
) -> Result<(), Error>
where
AD: validator_discovery::AuthorityDiscovery + Send,
Expand All @@ -166,13 +176,14 @@ where
}) => {
let role = ObservedRole::from(role);
let (peer_set, version) = {
let (peer_set, version) = match PeerSet::try_from_protocol_name(&protocol) {
None => continue,
Some(p) => p,
};
let (peer_set, version) =
match peerset_protocol_names.try_get_protocol(&protocol) {
None => continue,
Some(p) => p,
};

if let Some(fallback) = negotiated_fallback {
match PeerSet::try_from_protocol_name(&fallback) {
match peerset_protocol_names.try_get_protocol(&fallback) {
None => {
gum::debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -259,6 +270,7 @@ where
vec![peer],
PeerSet::Validation,
version,
&peerset_protocol_names,
WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate(local_view),
&metrics,
);
Expand All @@ -283,14 +295,15 @@ where
vec![peer],
PeerSet::Collation,
version,
&peerset_protocol_names,
WireMessage::<protocol_v1::CollationProtocol>::ViewUpdate(local_view),
&metrics,
);
},
}
},
Some(NetworkEvent::NotificationStreamClosed { remote: peer, protocol }) => {
let (peer_set, version) = match PeerSet::try_from_protocol_name(&protocol) {
let (peer_set, version) = match peerset_protocol_names.try_get_protocol(&protocol) {
None => continue,
Some(peer_set) => peer_set,
};
Expand All @@ -317,7 +330,7 @@ where
w
};

if was_connected && version == peer_set.get_default_version() {
if was_connected && version == peer_set.get_main_version() {
match peer_set {
PeerSet::Validation =>
dispatch_validation_event_to_all(
Expand Down Expand Up @@ -355,7 +368,8 @@ where
.filter_map(|(protocol, msg_bytes)| {
// version doesn't matter because we always receive on the 'correct'
// protocol name, not the negotiated fallback.
let (peer_set, _version) = PeerSet::try_from_protocol_name(protocol)?;
let (peer_set, _version) =
peerset_protocol_names.try_get_protocol(protocol)?;
if peer_set == PeerSet::Validation {
if expected_versions[PeerSet::Validation].is_none() {
return Some(Err(UNCONNECTED_PEERSET_COST))
Expand Down Expand Up @@ -384,7 +398,8 @@ where
.filter_map(|(protocol, msg_bytes)| {
// version doesn't matter because we always receive on the 'correct'
// protocol name, not the negotiated fallback.
let (peer_set, _version) = PeerSet::try_from_protocol_name(protocol)?;
let (peer_set, _version) =
peerset_protocol_names.try_get_protocol(protocol)?;

if peer_set == PeerSet::Collation {
if expected_versions[PeerSet::Collation].is_none() {
Expand Down Expand Up @@ -494,6 +509,7 @@ async fn run_incoming_orchestra_signals<Context, N, AD>(
shared: Shared,
sync_oracle: Box<dyn SyncOracle + Send>,
metrics: Metrics,
peerset_protocol_names: PeerSetProtocolNames,
) -> Result<(), Error>
where
N: Network,
Expand Down Expand Up @@ -574,6 +590,7 @@ where
&shared,
finalized_number,
&metrics,
&peerset_protocol_names,
);
}
}
Expand Down Expand Up @@ -619,6 +636,7 @@ where
metrics,
sync_oracle,
shared,
peerset_protocol_names,
} = bridge;

let (task, network_event_handler) = handle_network_messages(
Expand All @@ -628,6 +646,7 @@ where
authority_discovery_service.clone(),
metrics.clone(),
shared.clone(),
peerset_protocol_names.clone(),
)
.remote_handle();

Expand All @@ -641,6 +660,7 @@ where
shared,
sync_oracle,
metrics,
peerset_protocol_names,
);

futures::pin_mut!(orchestra_signal_handler);
Expand All @@ -667,6 +687,7 @@ fn update_our_view<Net, Context>(
shared: &Shared,
finalized_number: BlockNumber,
metrics: &Metrics,
peerset_protocol_names: &PeerSetProtocolNames,
) where
Net: Network,
{
Expand Down Expand Up @@ -700,11 +721,18 @@ fn update_our_view<Net, Context>(
send_validation_message_v1(
net,
validation_peers,
peerset_protocol_names,
WireMessage::ViewUpdate(new_view.clone()),
metrics,
);

send_collation_message_v1(net, collation_peers, WireMessage::ViewUpdate(new_view), metrics);
send_collation_message_v1(
net,
collation_peers,
peerset_protocol_names,
WireMessage::ViewUpdate(new_view),
metrics,
);

let our_view = OurView::new(
live_heads.iter().take(MAX_VIEW_HEADS).cloned().map(|a| (a.hash, a.span)),
Expand Down Expand Up @@ -778,19 +806,37 @@ fn handle_v1_peer_messages<RawMessage: Decode, OutMessage: From<RawMessage>>(
fn send_validation_message_v1(
net: &mut impl Network,
peers: Vec<PeerId>,
peerset_protocol_names: &PeerSetProtocolNames,
message: WireMessage<protocol_v1::ValidationProtocol>,
metrics: &Metrics,
) {
send_message(net, peers, PeerSet::Validation, 1, message, metrics);
send_message(
net,
peers,
PeerSet::Validation,
ValidationVersion::V1.into(),
peerset_protocol_names,
message,
metrics,
);
}

fn send_collation_message_v1(
net: &mut impl Network,
peers: Vec<PeerId>,
peerset_protocol_names: &PeerSetProtocolNames,
message: WireMessage<protocol_v1::CollationProtocol>,
metrics: &Metrics,
) {
send_message(net, peers, PeerSet::Collation, 1, message, metrics)
send_message(
net,
peers,
PeerSet::Collation,
CollationVersion::V1.into(),
peerset_protocol_names,
message,
metrics,
);
}

async fn dispatch_validation_event_to_all(
Expand Down
Loading