Skip to content

Commit

Permalink
Send PeerViewChange with high priority (paritytech#4755)
Browse files Browse the repository at this point in the history
Closes paritytech#577

### Changed
- `orchestra` updated to 0.4.0
- `PeerViewChange` sent with high priority and should be processed first
in a queue.
- To count them in tests added tracker to TestSender and TestOverseer.
It acts more like a smoke test though.

### Testing on Versi

The changes were tested on Versi with two objectives:
1. Make sure the node functionality does not change.
2. See how the changes affect performance.

Test setup:
- 2.5 hours for each case
- 100 validators
- 50 parachains
- validatorsPerCore = 2
- neededApprovals = 100
- nDelayTranches = 89
- relayVrfModuloSamples = 50

During the test period, all nodes ran without any crashes, which
satisfies the first objective.

To estimate the change in performance we used ToF charts. The graphs
show that there are no spikes in the top as before. This proves that our
hypothesis is correct.

### Normalized charts with ToF

![image](https://github.com/user-attachments/assets/0d49d0db-8302-4a8c-a557-501856805ff5)
[Before](https://grafana.teleport.parity.io/goto/ZoR53ClSg?orgId=1)


![image](https://github.com/user-attachments/assets/9cc73784-7e45-49d9-8212-152373c05880)
[After](https://grafana.teleport.parity.io/goto/6ux5qC_IR?orgId=1)

### Conclusion

The prioritization of subsystem messages reduces the ToF of the
networking subsystem, which helps faster propagation of gossip messages.
  • Loading branch information
AndreiEres authored and TarekkMA committed Aug 2, 2024
1 parent f6f8888 commit a5f2986
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 27 deletions.
24 changes: 18 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ num-rational = { version = "0.4.1" }
num-traits = { version = "0.2.17", default-features = false }
num_cpus = { version = "1.13.1" }
once_cell = { version = "1.19.0" }
orchestra = { version = "0.3.5", default-features = false }
orchestra = { version = "0.4.0", default-features = false }
pallet-alliance = { path = "substrate/frame/alliance", default-features = false }
pallet-asset-conversion = { path = "substrate/frame/asset-conversion", default-features = false }
pallet-asset-conversion-ops = { path = "substrate/frame/asset-conversion/ops", default-features = false }
Expand Down
13 changes: 12 additions & 1 deletion polkadot/node/malus/src/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ where
<OutgoingMessage as TryFrom<overseer::AllMessages>>::Error: std::fmt::Debug,
{
async fn send_message(&mut self, msg: OutgoingMessage) {
self.send_message_with_priority::<overseer::NormalPriority>(msg).await;
}

async fn send_message_with_priority<P: Priority>(&mut self, msg: OutgoingMessage) {
let msg = <
<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
Expand All @@ -103,7 +107,14 @@ where
}
}

fn try_send_message(&mut self, msg: OutgoingMessage) -> Result<(), TrySendError<OutgoingMessage>> {
fn try_send_message(
&mut self,
msg: OutgoingMessage,
) -> Result<(), polkadot_node_subsystem_util::metered::TrySendError<OutgoingMessage>> {
self.try_send_message_with_priority::<overseer::NormalPriority>(msg)
}

fn try_send_message_with_priority<P: Priority>(&mut self, msg: OutgoingMessage) -> Result<(), TrySendError<OutgoingMessage>> {
let msg = <
<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl TestState {
// Test will fail if this does not happen until timeout.
let mut remaining_stores = self.valid_chunks.len();

let TestSubsystemContextHandle { tx, mut rx } = harness.virtual_overseer;
let TestSubsystemContextHandle { tx, mut rx, .. } = harness.virtual_overseer;

// Spawning necessary as incoming queue can only hold a single item, we don't want to dead
// lock ;-)
Expand Down
32 changes: 26 additions & 6 deletions polkadot/node/network/bridge/src/rx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1135,13 +1135,33 @@ async fn dispatch_validation_events_to_all<I>(
I: IntoIterator<Item = NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>>,
I::IntoIter: Send,
{
macro_rules! send_message {
($event:expr, $message:ident) => {
if let Ok(event) = $event.focus() {
let has_high_priority = matches!(
event,
// NetworkBridgeEvent::OurViewChange(..) must also be here,
// but it is sent via an unbounded channel.
// See https://github.com/paritytech/polkadot-sdk/issues/824
NetworkBridgeEvent::PeerConnected(..) |
NetworkBridgeEvent::PeerDisconnected(..) |
NetworkBridgeEvent::PeerViewChange(..)
);
let message = $message::from(event);
if has_high_priority {
sender.send_message_with_priority::<overseer::HighPriority>(message).await;
} else {
sender.send_message(message).await;
}
}
};
}

for event in events {
sender
.send_messages(event.focus().map(StatementDistributionMessage::from))
.await;
sender.send_messages(event.focus().map(BitfieldDistributionMessage::from)).await;
sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await;
sender.send_messages(event.focus().map(GossipSupportMessage::from)).await;
send_message!(event, StatementDistributionMessage);
send_message!(event, BitfieldDistributionMessage);
send_message!(event, ApprovalDistributionMessage);
send_message!(event, GossipSupportMessage);
}
}

Expand Down
17 changes: 17 additions & 0 deletions polkadot/node/network/bridge/src/rx/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,8 @@ fn peer_view_updates_sent_via_overseer() {
&mut virtual_overseer,
)
.await;

assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8);
}

network_handle
Expand All @@ -895,6 +897,7 @@ fn peer_view_updates_sent_via_overseer() {
&mut virtual_overseer,
)
.await;
assert_eq!(virtual_overseer.message_counter.with_high_priority(), 12);
virtual_overseer
});
}
Expand Down Expand Up @@ -930,6 +933,8 @@ fn peer_messages_sent_via_overseer() {
&mut virtual_overseer,
)
.await;

assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8);
}

let approval_distribution_message =
Expand Down Expand Up @@ -970,6 +975,7 @@ fn peer_messages_sent_via_overseer() {
&mut virtual_overseer,
)
.await;
assert_eq!(virtual_overseer.message_counter.with_high_priority(), 12);
virtual_overseer
});
}
Expand Down Expand Up @@ -1008,6 +1014,8 @@ fn peer_disconnect_from_just_one_peerset() {
&mut virtual_overseer,
)
.await;

assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8);
}

{
Expand Down Expand Up @@ -1036,6 +1044,7 @@ fn peer_disconnect_from_just_one_peerset() {
&mut virtual_overseer,
)
.await;
assert_eq!(virtual_overseer.message_counter.with_high_priority(), 12);

// to show that we're still connected on the collation protocol, send a view update.

Expand Down Expand Up @@ -1094,6 +1103,8 @@ fn relays_collation_protocol_messages() {
&mut virtual_overseer,
)
.await;

assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8);
}

{
Expand Down Expand Up @@ -1201,6 +1212,8 @@ fn different_views_on_different_peer_sets() {
&mut virtual_overseer,
)
.await;

assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8);
}

{
Expand Down Expand Up @@ -1247,6 +1260,8 @@ fn different_views_on_different_peer_sets() {
)
.await;

assert_eq!(virtual_overseer.message_counter.with_high_priority(), 12);

assert_sends_collation_event_to_all(
NetworkBridgeEvent::PeerViewChange(peer, view_b.clone()),
&mut virtual_overseer,
Expand Down Expand Up @@ -1481,6 +1496,8 @@ fn network_protocol_versioning_subsystem_msg() {
&mut virtual_overseer,
)
.await;

assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8);
}

let approval_distribution_message =
Expand Down
17 changes: 9 additions & 8 deletions polkadot/node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,11 @@ pub use polkadot_node_metrics::{

pub use orchestra as gen;
pub use orchestra::{
contextbounds, orchestra, subsystem, FromOrchestra, MapSubsystem, MessagePacket,
OrchestraError as OverseerError, SignalsReceived, Spawner, Subsystem, SubsystemContext,
SubsystemIncomingMessages, SubsystemInstance, SubsystemMeterReadouts, SubsystemMeters,
SubsystemSender, TimeoutExt, ToOrchestra, TrySendError,
contextbounds, orchestra, subsystem, FromOrchestra, HighPriority, MapSubsystem, MessagePacket,
NormalPriority, OrchestraError as OverseerError, Priority, PriorityLevel, SignalsReceived,
Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, SubsystemInstance,
SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra,
TrySendError,
};

#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
Expand Down Expand Up @@ -495,7 +496,7 @@ pub struct Overseer<SupportsParachains> {
RuntimeApiMessage,
ProspectiveParachainsMessage,
ChainApiMessage,
])]
], can_receive_priority_messages)]
statement_distribution: StatementDistribution,

#[subsystem(AvailabilityDistributionMessage, sends: [
Expand Down Expand Up @@ -524,7 +525,7 @@ pub struct Overseer<SupportsParachains> {
RuntimeApiMessage,
NetworkBridgeTxMessage,
ProvisionerMessage,
])]
], can_receive_priority_messages)]
bitfield_distribution: BitfieldDistribution,

#[subsystem(ProvisionerMessage, sends: [
Expand Down Expand Up @@ -580,7 +581,7 @@ pub struct Overseer<SupportsParachains> {
#[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [
NetworkBridgeTxMessage,
ApprovalVotingMessage,
])]
], can_receive_priority_messages)]
approval_distribution: ApprovalDistribution,

#[subsystem(blocking, ApprovalVotingMessage, sends: [
Expand All @@ -599,7 +600,7 @@ pub struct Overseer<SupportsParachains> {
NetworkBridgeRxMessage, // TODO <https://github.com/paritytech/polkadot/issues/5626>
RuntimeApiMessage,
ChainSelectionMessage,
])]
], can_receive_priority_messages)]
gossip_support: GossipSupport,

#[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [
Expand Down
Loading

0 comments on commit a5f2986

Please sign in to comment.