diff --git a/Cargo.lock b/Cargo.lock index c40fcca58bed..ff17d1181ac0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6587,8 +6587,10 @@ dependencies = [ name = "polkadot-network-bridge" version = "0.9.19" dependencies = [ + "always-assert", "assert_matches", "async-trait", + "bytes 1.1.0", "futures 0.3.21", "futures-timer", "parity-scale-codec", @@ -6984,6 +6986,7 @@ name = "polkadot-node-network-protocol" version = "0.9.19" dependencies = [ "async-trait", + "derive_more", "fatality", "futures 0.3.21", "parity-scale-codec", diff --git a/node/network/approval-distribution/src/lib.rs b/node/network/approval-distribution/src/lib.rs index 1b54ce21bd56..c8e8741cba83 100644 --- a/node/network/approval-distribution/src/lib.rs +++ b/node/network/approval-distribution/src/lib.rs @@ -22,7 +22,8 @@ use futures::{channel::oneshot, FutureExt as _}; use polkadot_node_network_protocol::{ - v1 as protocol_v1, PeerId, UnifiedReputationChange as Rep, View, + self as net_protocol, v1 as protocol_v1, PeerId, UnifiedReputationChange as Rep, Versioned, + View, }; use polkadot_node_primitives::approval::{ AssignmentCert, BlockApprovalMeta, IndirectAssignmentCert, IndirectSignedApprovalVote, @@ -457,11 +458,11 @@ impl State { ctx: &mut (impl SubsystemContext + overseer::SubsystemContext), metrics: &Metrics, - event: NetworkBridgeEvent, + event: NetworkBridgeEvent, rng: &mut (impl CryptoRng + Rng), ) { match event { - NetworkBridgeEvent::PeerConnected(peer_id, role, _) => { + NetworkBridgeEvent::PeerConnected(peer_id, role, _, _) => { // insert a blank view if none already present gum::trace!(target: LOG_TARGET, ?peer_id, ?role, "Peer connected"); self.peer_views.entry(peer_id).or_default(); @@ -501,7 +502,7 @@ impl State { live }); }, - NetworkBridgeEvent::PeerMessage(peer_id, msg) => { + NetworkBridgeEvent::PeerMessage(peer_id, Versioned::V1(msg)) => { self.process_incoming_peer_message(ctx, metrics, peer_id, msg, rng).await; }, } @@ -1068,9 +1069,9 @@ impl State { ctx.send_message(NetworkBridgeMessage::SendValidationMessage( peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(assignments), - ), + )), )) .await; } @@ -1330,9 +1331,9 @@ impl State { ctx.send_message(NetworkBridgeMessage::SendValidationMessage( peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Approvals(approvals), - ), + )), )) .await; } @@ -1458,9 +1459,9 @@ impl State { ctx.send_message(NetworkBridgeMessage::SendValidationMessage( vec![peer_id.clone()], - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(assignments_to_send), - ), + )), )) .await; } @@ -1474,10 +1475,10 @@ impl State { ); ctx.send_message(NetworkBridgeMessage::SendValidationMessage( - vec![peer_id.clone()], - protocol_v1::ValidationProtocol::ApprovalDistribution( + vec![peer_id], + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Approvals(approvals_to_send), - ), + )), )) .await; } @@ -1676,9 +1677,9 @@ async fn adjust_required_routing_and_propagate( for (peer, assignments_packet) in peer_assignments { ctx.send_message(NetworkBridgeMessage::SendValidationMessage( vec![peer], - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(assignments_packet), - ), + )), )) .await; } @@ -1686,9 +1687,9 @@ async fn adjust_required_routing_and_propagate( for (peer, approvals_packet) in peer_approvals { ctx.send_message(NetworkBridgeMessage::SendValidationMessage( vec![peer], - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Approvals(approvals_packet), - ), + )), )) .await; } @@ -1779,7 +1780,7 @@ impl ApprovalDistribution { Context: overseer::SubsystemContext, { match msg { - ApprovalDistributionMessage::NetworkBridgeUpdateV1(event) => { + ApprovalDistributionMessage::NetworkBridgeUpdate(event) => { state.handle_network_msg(ctx, metrics, event, rng).await; }, ApprovalDistributionMessage::NewBlocks(metas) => { diff --git a/node/network/approval-distribution/src/tests.rs b/node/network/approval-distribution/src/tests.rs index 39d4b61a6a03..8d1ed874c05d 100644 --- a/node/network/approval-distribution/src/tests.rs +++ b/node/network/approval-distribution/src/tests.rs @@ -157,7 +157,7 @@ async fn setup_gossip_topology( ) { overseer_send( virtual_overseer, - ApprovalDistributionMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::NewGossipTopology( + ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::NewGossipTopology( gossip_topology, )), ) @@ -171,16 +171,17 @@ async fn setup_peer_with_view( ) { overseer_send( virtual_overseer, - ApprovalDistributionMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerConnected( + ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( peer_id.clone(), ObservedRole::Full, + 1, None, )), ) .await; overseer_send( virtual_overseer, - ApprovalDistributionMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerViewChange( + ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerViewChange( peer_id.clone(), view, )), @@ -195,9 +196,9 @@ async fn send_message_from_peer( ) { overseer_send( virtual_overseer, - ApprovalDistributionMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerMessage( + ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage( peer_id.clone(), - msg, + Versioned::V1(msg), )), ) .await; @@ -300,9 +301,9 @@ fn try_import_the_same_assignment() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(assignments) - ) + )) )) => { assert_eq!(peers.len(), 2); assert_eq!(assignments.len(), 1); @@ -390,7 +391,7 @@ fn spam_attack_results_in_negative_reputation_change() { // send a view update that removes block B from peer's view by bumping the finalized_number overseer_send( overseer, - ApprovalDistributionMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerViewChange( + ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerViewChange( peer.clone(), View::with_finalized(2), )), @@ -450,7 +451,7 @@ fn peer_sending_us_the_same_we_just_sent_them_is_ok() { // update peer view to include the hash overseer_send( overseer, - ApprovalDistributionMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerViewChange( + ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerViewChange( peer.clone(), view![hash], )), @@ -462,9 +463,9 @@ fn peer_sending_us_the_same_we_just_sent_them_is_ok() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(assignments) - ) + )) )) => { assert_eq!(peers.len(), 1); assert_eq!(assignments.len(), 1); @@ -529,9 +530,9 @@ fn import_approval_happy_path() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(assignments) - ) + )) )) => { assert_eq!(peers.len(), 2); assert_eq!(assignments.len(), 1); @@ -565,9 +566,9 @@ fn import_approval_happy_path() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Approvals(approvals) - ) + )) )) => { assert_eq!(peers.len(), 1); assert_eq!(approvals.len(), 1); @@ -788,9 +789,9 @@ fn update_peer_view() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(assignments) - ) + )) )) => { assert_eq!(peers.len(), 1); assert_eq!(assignments.len(), 1); @@ -819,7 +820,7 @@ fn update_peer_view() { // update peer's view overseer_send( overseer, - ApprovalDistributionMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerViewChange( + ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerViewChange( peer.clone(), View::new(vec![hash_b, hash_c, hash_d], 2), )), @@ -839,9 +840,9 @@ fn update_peer_view() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(assignments) - ) + )) )) => { assert_eq!(peers.len(), 1); assert_eq!(assignments.len(), 1); @@ -872,7 +873,7 @@ fn update_peer_view() { // update peer's view overseer_send( overseer, - ApprovalDistributionMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerViewChange( + ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerViewChange( peer.clone(), View::with_finalized(finalized_number), )), @@ -1026,9 +1027,9 @@ fn sends_assignments_even_when_state_is_approved() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) - ) + )) )) => { assert_eq!(peers, vec![peer.clone()]); assert_eq!(sent_assignments, assignments); @@ -1039,9 +1040,9 @@ fn sends_assignments_even_when_state_is_approved() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals) - ) + )) )) => { assert_eq!(peers, vec![peer.clone()]); assert_eq!(sent_approvals, approvals); @@ -1086,7 +1087,7 @@ fn race_condition_in_local_vs_remote_view_update() { // Send our view update to include a new head overseer_send( overseer, - ApprovalDistributionMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( + ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::OurViewChange( our_view![hash_b], )), ) @@ -1201,9 +1202,9 @@ fn propagates_locally_generated_assignment_to_both_dimensions() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( sent_peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) - ) + )) )) => { assert_eq!(sent_peers.len(), expected_indices.len() + 4); for &i in &expected_indices { @@ -1222,9 +1223,9 @@ fn propagates_locally_generated_assignment_to_both_dimensions() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( sent_peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals) - ) + )) )) => { // Random sampling is reused from the assignment. assert_eq!(sent_peers, assignment_sent_peers); @@ -1305,9 +1306,9 @@ fn propagates_assignments_along_unshared_dimension() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( sent_peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) - ) + )) )) => { assert_eq!(sent_peers.len(), expected_y.len() + 4); for &i in &expected_y { @@ -1354,9 +1355,9 @@ fn propagates_assignments_along_unshared_dimension() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( sent_peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) - ) + )) )) => { assert_eq!(sent_peers.len(), expected_x.len() + 4); for &i in &expected_x { @@ -1449,9 +1450,9 @@ fn propagates_to_required_after_connect() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( sent_peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) - ) + )) )) => { assert_eq!(sent_peers.len(), expected_indices.len() + 4); for &i in &expected_indices { @@ -1470,9 +1471,9 @@ fn propagates_to_required_after_connect() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( sent_peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals) - ) + )) )) => { // Random sampling is reused from the assignment. assert_eq!(sent_peers, assignment_sent_peers); @@ -1487,9 +1488,9 @@ fn propagates_to_required_after_connect() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( sent_peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) - ) + )) )) => { assert_eq!(sent_peers.len(), 1); assert_eq!(&sent_peers[0], &peers[i].0); @@ -1501,9 +1502,9 @@ fn propagates_to_required_after_connect() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( sent_peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals) - ) + )) )) => { assert_eq!(sent_peers.len(), 1); assert_eq!(&sent_peers[0], &peers[i].0); @@ -1575,9 +1576,9 @@ fn sends_to_more_peers_after_getting_topology() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( sent_peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) - ) + )) )) => { // Only sends to random peers. assert_eq!(sent_peers.len(), 4); @@ -1597,9 +1598,9 @@ fn sends_to_more_peers_after_getting_topology() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( sent_peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals) - ) + )) )) => { // Random sampling is reused from the assignment. assert_eq!(sent_peers, assignment_sent_peers); @@ -1622,9 +1623,9 @@ fn sends_to_more_peers_after_getting_topology() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( sent_peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) - ) + )) )) => { // Sends to all expected peers. assert_eq!(sent_peers.len(), 1); @@ -1643,9 +1644,9 @@ fn sends_to_more_peers_after_getting_topology() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( sent_peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals) - ) + )) )) => { // Sends to all expected peers. assert_eq!(sent_peers.len(), 1); @@ -1733,9 +1734,9 @@ fn originator_aggression_l1() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( sent_peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(_) - ) + )) )) => { sent_peers.into_iter() .filter_map(|sp| peers.iter().position(|p| &p.0 == &sp)) @@ -1747,9 +1748,9 @@ fn originator_aggression_l1() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( _, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Approvals(_) - ) + )) )) => { } ); @@ -1783,9 +1784,9 @@ fn originator_aggression_l1() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( sent_peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) - ) + )) )) => { // Sends to all expected peers. assert_eq!(sent_peers.len(), 1); @@ -1803,9 +1804,9 @@ fn originator_aggression_l1() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( sent_peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals) - ) + )) )) => { // Sends to all expected peers. assert_eq!(sent_peers.len(), 1); @@ -1892,9 +1893,9 @@ fn non_originator_aggression_l1() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( _, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(_) - ) + )) )) => { } ); @@ -1997,9 +1998,9 @@ fn non_originator_aggression_l2() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( sent_peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(_) - ) + )) )) => { sent_peers.into_iter() .filter_map(|sp| peers.iter().position(|p| &p.0 == &sp)) @@ -2067,9 +2068,9 @@ fn non_originator_aggression_l2() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( sent_peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) - ) + )) )) => { // Sends to all expected peers. assert_eq!(sent_peers.len(), 1); @@ -2158,9 +2159,9 @@ fn resends_messages_periodically() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( sent_peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) - ) + )) )) => { assert_eq!(sent_peers.len(), expected_y.len() + 4); for &i in &expected_y { @@ -2207,9 +2208,9 @@ fn resends_messages_periodically() { overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( sent_peers, - protocol_v1::ValidationProtocol::ApprovalDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) - ) + )) )) => { assert_eq!(sent_peers.len(), 1); let expected_pos = expected_y.iter() diff --git a/node/network/availability-distribution/src/pov_requester/mod.rs b/node/network/availability-distribution/src/pov_requester/mod.rs index bc9f60a4fd62..b4ff76aa82c8 100644 --- a/node/network/availability-distribution/src/pov_requester/mod.rs +++ b/node/network/availability-distribution/src/pov_requester/mod.rs @@ -62,7 +62,7 @@ where Recipient::Authority(authority_id.clone()), PoVFetchingRequest { candidate_hash }, ); - let full_req = Requests::PoVFetching(req); + let full_req = Requests::PoVFetchingV1(req); ctx.send_message(NetworkBridgeMessage::SendRequests( vec![full_req], @@ -200,7 +200,7 @@ mod tests { AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(mut reqs, _)) => { let req = assert_matches!( reqs.pop(), - Some(Requests::PoVFetching(outgoing)) => {outgoing} + Some(Requests::PoVFetchingV1(outgoing)) => {outgoing} ); req.pending_response .send(Ok(PoVFetchingResponse::PoV(pov.clone()).encode())) diff --git a/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/node/network/availability-distribution/src/requester/fetch_task/mod.rs index 548a6d5e8a3c..0e751435196f 100644 --- a/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -330,7 +330,7 @@ impl RunningTask { ) -> std::result::Result { let (full_request, response_recv) = OutgoingRequest::new(Recipient::Authority(validator.clone()), self.request); - let requests = Requests::ChunkFetching(full_request); + let requests = Requests::ChunkFetchingV1(full_request); self.sender .send(FromFetchTask::Message(AllMessages::NetworkBridge( diff --git a/node/network/availability-distribution/src/requester/fetch_task/tests.rs b/node/network/availability-distribution/src/requester/fetch_task/tests.rs index db436105437c..432ef17e9995 100644 --- a/node/network/availability-distribution/src/requester/fetch_task/tests.rs +++ b/node/network/availability-distribution/src/requester/fetch_task/tests.rs @@ -235,7 +235,7 @@ impl TestRun { let mut valid_responses = 0; for req in reqs { let req = match req { - Requests::ChunkFetching(req) => req, + Requests::ChunkFetchingV1(req) => req, _ => panic!("Unexpected request"), }; let response = diff --git a/node/network/availability-distribution/src/tests/state.rs b/node/network/availability-distribution/src/tests/state.rs index ced1fe37a49c..b4bda8375949 100644 --- a/node/network/availability-distribution/src/tests/state.rs +++ b/node/network/availability-distribution/src/tests/state.rs @@ -313,7 +313,7 @@ fn to_incoming_req( outgoing: Requests, ) -> IncomingRequest { match outgoing { - Requests::ChunkFetching(OutgoingRequest { payload, pending_response, .. }) => { + Requests::ChunkFetchingV1(OutgoingRequest { payload, pending_response, .. }) => { let (tx, rx): (oneshot::Sender, oneshot::Receiver<_>) = oneshot::channel(); executor.spawn( diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs index 5f7d8de06966..312178251775 100644 --- a/node/network/availability-recovery/src/lib.rs +++ b/node/network/availability-recovery/src/lib.rs @@ -201,7 +201,7 @@ impl RequestFromBackers { sender .send_message( NetworkBridgeMessage::SendRequests( - vec![Requests::AvailableDataFetching(req)], + vec![Requests::AvailableDataFetchingV1(req)], IfDisconnected::ImmediateError, ) .into(), @@ -325,7 +325,7 @@ impl RequestChunksFromValidators { let (req, res) = OutgoingRequest::new(Recipient::Authority(validator), raw_request.clone()); - requests.push(Requests::ChunkFetching(req)); + requests.push(Requests::ChunkFetchingV1(req)); params.metrics.on_chunk_request_issued(); let timer = params.metrics.time_chunk_request(); diff --git a/node/network/availability-recovery/src/tests.rs b/node/network/availability-recovery/src/tests.rs index 193ec27ebb60..38d3a8b76062 100644 --- a/node/network/availability-recovery/src/tests.rs +++ b/node/network/availability-recovery/src/tests.rs @@ -292,7 +292,7 @@ impl TestState { i += 1; assert_matches!( req, - Requests::ChunkFetching(req) => { + Requests::ChunkFetchingV1(req) => { assert_eq!(req.payload.candidate_hash, candidate_hash); let validator_index = req.payload.index.0 as usize; @@ -341,7 +341,7 @@ impl TestState { assert_matches!( requests.pop().unwrap(), - Requests::AvailableDataFetching(req) => { + Requests::AvailableDataFetchingV1(req) => { assert_eq!(req.payload.candidate_hash, candidate_hash); let validator_index = self.validator_authority_id .iter() diff --git a/node/network/bitfield-distribution/src/lib.rs b/node/network/bitfield-distribution/src/lib.rs index 0bedc677f53d..4dea02d3d252 100644 --- a/node/network/bitfield-distribution/src/lib.rs +++ b/node/network/bitfield-distribution/src/lib.rs @@ -25,7 +25,8 @@ use futures::{channel::oneshot, FutureExt}; use polkadot_node_network_protocol::{ - v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep, View, + self as net_protocol, v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep, + Versioned, View, }; use polkadot_node_subsystem_util::{self as util, MIN_GOSSIP_PEERS}; use polkadot_primitives::v2::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId}; @@ -63,15 +64,15 @@ struct BitfieldGossipMessage { } impl BitfieldGossipMessage { - fn into_validation_protocol(self) -> protocol_v1::ValidationProtocol { - protocol_v1::ValidationProtocol::BitfieldDistribution(self.into_network_message()) + fn into_validation_protocol(self) -> net_protocol::VersionedValidationProtocol { + self.into_network_message().into() } - fn into_network_message(self) -> protocol_v1::BitfieldDistributionMessage { - protocol_v1::BitfieldDistributionMessage::Bitfield( + fn into_network_message(self) -> net_protocol::BitfieldDistributionMessage { + Versioned::V1(protocol_v1::BitfieldDistributionMessage::Bitfield( self.relay_parent, self.signed_availability.into(), - ) + )) } } @@ -207,7 +208,7 @@ impl BitfieldDistribution { .await; }, FromOverseer::Communication { - msg: BitfieldDistributionMessage::NetworkBridgeUpdateV1(event), + msg: BitfieldDistributionMessage::NetworkBridgeUpdate(event), } => { gum::trace!(target: LOG_TARGET, "Processing NetworkMessage"); // a network message was received @@ -506,14 +507,14 @@ async fn handle_network_msg( ctx: &mut Context, state: &mut ProtocolState, metrics: &Metrics, - bridge_message: NetworkBridgeEvent, + bridge_message: NetworkBridgeEvent, ) where Context: SubsystemContext, { let _timer = metrics.time_handle_network_msg(); match bridge_message { - NetworkBridgeEvent::PeerConnected(peer, role, _) => { + NetworkBridgeEvent::PeerConnected(peer, role, _, _) => { gum::trace!(target: LOG_TARGET, ?peer, ?role, "Peer connected"); // insert if none already present state.peer_views.entry(peer).or_default(); @@ -550,7 +551,7 @@ async fn handle_network_msg( gum::trace!(target: LOG_TARGET, ?new_view, "Our view change"); handle_our_view_change(state, new_view); }, - NetworkBridgeEvent::PeerMessage(remote, message) => + NetworkBridgeEvent::PeerMessage(remote, Versioned::V1(message)) => process_incoming_peer_message(ctx, state, metrics, remote, message).await, } } diff --git a/node/network/bitfield-distribution/src/tests.rs b/node/network/bitfield-distribution/src/tests.rs index b16d9b7789a6..ac6c21184680 100644 --- a/node/network/bitfield-distribution/src/tests.rs +++ b/node/network/bitfield-distribution/src/tests.rs @@ -538,7 +538,7 @@ fn changing_view() { &mut ctx, &mut state, &Default::default(), - NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None), + NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, 1, None), )); // make peer b interested diff --git a/node/network/bridge/Cargo.toml b/node/network/bridge/Cargo.toml index 919f5271cd12..b057d079ce4d 100644 --- a/node/network/bridge/Cargo.toml +++ b/node/network/bridge/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Parity Technologies "] edition = "2021" [dependencies] +always-assert = "0.1" async-trait = "0.1.53" futures = "0.3.21" gum = { package = "tracing-gum", path = "../../gum" } @@ -17,6 +18,7 @@ polkadot-overseer = { path = "../../overseer" } polkadot-node-network-protocol = { path = "../protocol" } polkadot-node-subsystem-util = { path = "../../subsystem-util"} parking_lot = "0.12.0" +bytes = "1" [dev-dependencies] assert_matches = "1.4.0" diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index 0ca57f044c8b..054d1135ee26 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -19,15 +19,19 @@ #![deny(unused_crate_dependencies)] #![warn(missing_docs)] +use always_assert::never; +use bytes::Bytes; use futures::{prelude::*, stream::BoxStream}; -use parity_scale_codec::{Decode, Encode}; +use parity_scale_codec::{Decode, DecodeAll, Encode}; use parking_lot::Mutex; use sc_network::Event as NetworkEvent; use sp_consensus::SyncOracle; use polkadot_node_network_protocol::{ - peer_set::PeerSet, v1 as protocol_v1, ObservedRole, OurView, PeerId, - UnifiedReputationChange as Rep, View, + self as net_protocol, + peer_set::{PeerSet, PerPeerSet}, + v1 as protocol_v1, ObservedRole, OurView, PeerId, ProtocolVersion, + UnifiedReputationChange as Rep, Versioned, View, }; use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_overseer::gen::{OverseerError, Subsystem}; @@ -83,58 +87,69 @@ const LOG_TARGET: &'static str = "parachain::network-bridge"; #[derive(Clone, Default)] pub struct Metrics(Option); +fn peer_set_label(peer_set: PeerSet, version: ProtocolVersion) -> &'static str { + // Higher level code is meant to protect against this ever happening. + peer_set.get_protocol_name_static(version).unwrap_or("") +} + impl Metrics { - fn on_peer_connected(&self, peer_set: PeerSet) { + fn on_peer_connected(&self, peer_set: PeerSet, version: ProtocolVersion) { self.0.as_ref().map(|metrics| { metrics .connected_events - .with_label_values(&[peer_set.get_protocol_name_static()]) + .with_label_values(&[peer_set_label(peer_set, version)]) .inc() }); } - fn on_peer_disconnected(&self, peer_set: PeerSet) { + fn on_peer_disconnected(&self, peer_set: PeerSet, version: ProtocolVersion) { self.0.as_ref().map(|metrics| { metrics .disconnected_events - .with_label_values(&[peer_set.get_protocol_name_static()]) + .with_label_values(&[peer_set_label(peer_set, version)]) .inc() }); } - fn note_peer_count(&self, peer_set: PeerSet, count: usize) { + fn note_peer_count(&self, peer_set: PeerSet, version: ProtocolVersion, count: usize) { self.0.as_ref().map(|metrics| { metrics .peer_count - .with_label_values(&[peer_set.get_protocol_name_static()]) + .with_label_values(&[peer_set_label(peer_set, version)]) .set(count as u64) }); } - fn on_notification_received(&self, peer_set: PeerSet, size: usize) { + fn on_notification_received(&self, peer_set: PeerSet, version: ProtocolVersion, size: usize) { if let Some(metrics) = self.0.as_ref() { metrics .notifications_received - .with_label_values(&[peer_set.get_protocol_name_static()]) + .with_label_values(&[peer_set_label(peer_set, version)]) .inc(); metrics .bytes_received - .with_label_values(&[peer_set.get_protocol_name_static()]) + .with_label_values(&[peer_set_label(peer_set, version)]) .inc_by(size as u64); } } - fn on_notification_sent(&self, peer_set: PeerSet, size: usize, to_peers: usize) { + fn on_notification_sent( + &self, + peer_set: PeerSet, + version: ProtocolVersion, + size: usize, + to_peers: usize, + ) { if let Some(metrics) = self.0.as_ref() { metrics .notifications_sent - .with_label_values(&[peer_set.get_protocol_name_static()]) + .with_label_values(&[peer_set_label(peer_set, version)]) .inc_by(to_peers as u64); metrics .bytes_sent - .with_label_values(&[peer_set.get_protocol_name_static()]) + .with_label_values(&[peer_set_label(peer_set, version)]) .inc_by((size * to_peers) as u64); } } @@ -143,7 +158,7 @@ impl Metrics { self.0.as_ref().map(|metrics| { metrics .desired_peer_count - .with_label_values(&[peer_set.get_protocol_name_static()]) + .with_label_values(&[peer_set.get_default_protocol_name()]) .set(size as u64) }); } @@ -329,6 +344,7 @@ where struct PeerData { /// The Latest view sent by the peer. view: View, + version: ProtocolVersion, } #[derive(Debug)] @@ -467,22 +483,24 @@ where ?peer, peer_set = ?peer_set, ); + network_service.disconnect_peer(peer, peer_set); } NetworkBridgeMessage::SendValidationMessage(peers, msg) => { gum::trace!( target: LOG_TARGET, action = "SendValidationMessages", - num_messages = 1, + num_messages = 1usize, ); - send_message( - &mut network_service, - peers, - PeerSet::Validation, - WireMessage::ProtocolMessage(msg), - &metrics, - ); + match msg { + Versioned::V1(msg) => send_validation_message_v1( + &mut network_service, + peers, + WireMessage::ProtocolMessage(msg), + &metrics, + ), + } } NetworkBridgeMessage::SendValidationMessages(msgs) => { gum::trace!( @@ -492,29 +510,31 @@ where ); for (peers, msg) in msgs { - send_message( - &mut network_service, - peers, - PeerSet::Validation, - WireMessage::ProtocolMessage(msg), - &metrics, - ); + match msg { + Versioned::V1(msg) => send_validation_message_v1( + &mut network_service, + peers, + WireMessage::ProtocolMessage(msg), + &metrics, + ), + } } } NetworkBridgeMessage::SendCollationMessage(peers, msg) => { gum::trace!( target: LOG_TARGET, action = "SendCollationMessages", - num_messages = 1, + num_messages = 1usize, ); - send_message( - &mut network_service, - peers, - PeerSet::Collation, - WireMessage::ProtocolMessage(msg), - &metrics, - ); + match msg { + Versioned::V1(msg) => send_collation_message_v1( + &mut network_service, + peers, + WireMessage::ProtocolMessage(msg), + &metrics, + ), + } } NetworkBridgeMessage::SendCollationMessages(msgs) => { gum::trace!( @@ -524,13 +544,14 @@ where ); for (peers, msg) in msgs { - send_message( - &mut network_service, - peers, - PeerSet::Collation, - WireMessage::ProtocolMessage(msg), - &metrics, - ); + match msg { + Versioned::V1(msg) => send_collation_message_v1( + &mut network_service, + peers, + WireMessage::ProtocolMessage(msg), + &metrics, + ), + } } } NetworkBridgeMessage::SendRequests(reqs, if_disconnected) => { @@ -672,18 +693,58 @@ async fn handle_network_messages( Some(NetworkEvent::SyncConnected { .. }) | Some(NetworkEvent::SyncDisconnected { .. }) => {}, Some(NetworkEvent::NotificationStreamOpened { - remote: peer, protocol, role, .. + remote: peer, + protocol, + role, + negotiated_fallback, }) => { let role = ObservedRole::from(role); - let peer_set = match PeerSet::try_from_protocol_name(&protocol) { - None => continue, - Some(peer_set) => peer_set, + let (peer_set, version) = { + let (peer_set, version) = match PeerSet::try_from_protocol_name(&protocol) { + None => continue, + Some(p) => p, + }; + + if let Some(fallback) = negotiated_fallback { + match PeerSet::try_from_protocol_name(&fallback) { + None => { + gum::debug!( + target: LOG_TARGET, + fallback = &*fallback, + ?peer, + ?peer_set, + "Unknown fallback", + ); + + continue + }, + Some((p2, v2)) => { + if p2 != peer_set { + gum::debug!( + target: LOG_TARGET, + fallback = &*fallback, + fallback_peerset = ?p2, + protocol = &*protocol, + peerset = ?peer_set, + "Fallback mismatched peer-set", + ); + + continue + } + + (p2, v2) + }, + } + } else { + (peer_set, version) + } }; gum::debug!( target: LOG_TARGET, action = "PeerConnected", peer_set = ?peer_set, + version, peer = ?peer, role = ?role ); @@ -698,12 +759,12 @@ async fn handle_network_messages( match peer_map.entry(peer.clone()) { hash_map::Entry::Occupied(_) => continue, hash_map::Entry::Vacant(vacant) => { - vacant.insert(PeerData { view: View::default() }); + vacant.insert(PeerData { view: View::default(), version }); }, } - metrics.on_peer_connected(peer_set); - metrics.note_peer_count(peer_set, peer_map.len()); + metrics.on_peer_connected(peer_set, version); + metrics.note_peer_count(peer_set, version, peer_map.len()); shared.local_view.clone().unwrap_or(View::default()) }; @@ -718,6 +779,7 @@ async fn handle_network_messages( NetworkBridgeEvent::PeerConnected( peer.clone(), role, + 1, maybe_authority, ), NetworkBridgeEvent::PeerViewChange(peer.clone(), View::default()), @@ -730,6 +792,7 @@ async fn handle_network_messages( &mut network_service, vec![peer], PeerSet::Validation, + version, WireMessage::::ViewUpdate(local_view), &metrics, ); @@ -740,6 +803,7 @@ async fn handle_network_messages( NetworkBridgeEvent::PeerConnected( peer.clone(), role, + 1, maybe_authority, ), NetworkBridgeEvent::PeerViewChange(peer.clone(), View::default()), @@ -752,6 +816,7 @@ async fn handle_network_messages( &mut network_service, vec![peer], PeerSet::Collation, + version, WireMessage::::ViewUpdate(local_view), &metrics, ); @@ -759,7 +824,7 @@ async fn handle_network_messages( } }, Some(NetworkEvent::NotificationStreamClosed { remote: peer, protocol }) => { - let peer_set = match PeerSet::try_from_protocol_name(&protocol) { + let (peer_set, version) = match PeerSet::try_from_protocol_name(&protocol) { None => continue, Some(peer_set) => peer_set, }; @@ -780,13 +845,13 @@ async fn handle_network_messages( let w = peer_map.remove(&peer).is_some(); - metrics.on_peer_disconnected(peer_set); - metrics.note_peer_count(peer_set, peer_map.len()); + metrics.on_peer_disconnected(peer_set, version); + metrics.note_peer_count(peer_set, version, peer_map.len()); w }; - if was_connected { + if was_connected && version == peer_set.get_default_version() { match peer_set { PeerSet::Validation => dispatch_validation_event_to_all( @@ -804,83 +869,151 @@ async fn handle_network_messages( } }, Some(NetworkEvent::NotificationsReceived { remote, messages }) => { + let expected_versions = { + let mut versions = PerPeerSet::>::default(); + let shared = shared.0.lock(); + if let Some(peer_data) = shared.validation_peers.get(&remote) { + versions[PeerSet::Validation] = Some(peer_data.version); + } + + if let Some(peer_data) = shared.collation_peers.get(&remote) { + versions[PeerSet::Collation] = Some(peer_data.version); + } + + versions + }; + + // non-decoded, but version-checked validation messages. let v_messages: Result, _> = messages .iter() - .filter(|(protocol, _)| protocol == &PeerSet::Validation.into_protocol_name()) - .map(|(_, msg_bytes)| { - WireMessage::decode(&mut msg_bytes.as_ref()).map(|m| (m, msg_bytes.len())) + .filter_map(|(protocol, msg_bytes)| { + // version doesn't matter because we always receive on the 'correct' + // protocol name, not the negotiated fallback. + let (peer_set, _version) = PeerSet::try_from_protocol_name(protocol)?; + if peer_set == PeerSet::Validation { + if expected_versions[PeerSet::Validation].is_none() { + return Some(Err(UNCONNECTED_PEERSET_COST)) + } + + Some(Ok(msg_bytes.clone())) + } else { + None + } }) .collect(); let v_messages = match v_messages { - Err(_) => { + Err(rep) => { gum::debug!(target: LOG_TARGET, action = "ReportPeer"); + network_service.report_peer(remote, rep); - network_service.report_peer(remote, MALFORMED_MESSAGE_COST); continue }, Ok(v) => v, }; + // non-decoded, but version-checked colldation messages. let c_messages: Result, _> = messages .iter() - .filter(|(protocol, _)| protocol == &PeerSet::Collation.into_protocol_name()) - .map(|(_, msg_bytes)| { - WireMessage::decode(&mut msg_bytes.as_ref()).map(|m| (m, msg_bytes.len())) + .filter_map(|(protocol, msg_bytes)| { + // version doesn't matter because we always receive on the 'correct' + // protocol name, not the negotiated fallback. + let (peer_set, _version) = PeerSet::try_from_protocol_name(protocol)?; + + if peer_set == PeerSet::Collation { + if expected_versions[PeerSet::Collation].is_none() { + return Some(Err(UNCONNECTED_PEERSET_COST)) + } + + Some(Ok(msg_bytes.clone())) + } else { + None + } }) .collect(); - match c_messages { - Err(_) => { + let c_messages = match c_messages { + Err(rep) => { gum::debug!(target: LOG_TARGET, action = "ReportPeer"); + network_service.report_peer(remote, rep); - network_service.report_peer(remote, MALFORMED_MESSAGE_COST); continue }, - Ok(c_messages) => - if v_messages.is_empty() && c_messages.is_empty() { - continue + Ok(v) => v, + }; + + if v_messages.is_empty() && c_messages.is_empty() { + continue + } + + gum::trace!( + target: LOG_TARGET, + action = "PeerMessages", + peer = ?remote, + num_validation_messages = %v_messages.len(), + num_collation_messages = %c_messages.len() + ); + + if !v_messages.is_empty() { + let (events, reports) = + if expected_versions[PeerSet::Validation] == Some(1) { + handle_v1_peer_messages::( + remote.clone(), + PeerSet::Validation, + &mut shared.0.lock().validation_peers, + v_messages, + &metrics, + ) } else { - gum::trace!( + gum::warn!( target: LOG_TARGET, - action = "PeerMessages", - peer = ?remote, - num_validation_messages = %v_messages.len(), - num_collation_messages = %c_messages.len() + version = ?expected_versions[PeerSet::Validation], + "Major logic bug. Peer somehow has unsupported validation protocol version." ); - if !v_messages.is_empty() { - let (events, reports) = handle_peer_messages( - remote.clone(), - PeerSet::Validation, - &mut shared.0.lock().validation_peers, - v_messages, - &metrics, - ); + never!("Only version 1 is supported; peer set connection checked above; qed"); - for report in reports { - network_service.report_peer(remote.clone(), report); - } + // If a peer somehow triggers this, we'll disconnect them + // eventually. + (Vec::new(), vec![UNCONNECTED_PEERSET_COST]) + }; - dispatch_validation_events_to_all(events, &mut sender).await; - } + for report in reports { + network_service.report_peer(remote.clone(), report); + } - if !c_messages.is_empty() { - let (events, reports) = handle_peer_messages( - remote.clone(), - PeerSet::Collation, - &mut shared.0.lock().collation_peers, - c_messages, - &metrics, - ); + dispatch_validation_events_to_all(events, &mut sender).await; + } - for report in reports { - network_service.report_peer(remote.clone(), report); - } + if !c_messages.is_empty() { + let (events, reports) = + if expected_versions[PeerSet::Collation] == Some(1) { + handle_v1_peer_messages::( + remote.clone(), + PeerSet::Collation, + &mut shared.0.lock().collation_peers, + c_messages, + &metrics, + ) + } else { + gum::warn!( + target: LOG_TARGET, + version = ?expected_versions[PeerSet::Collation], + "Major logic bug. Peer somehow has unsupported collation protocol version." + ); - dispatch_collation_events_to_all(events, &mut sender).await; - } - }, + never!("Only version 1 is supported; peer set connection checked above; qed"); + + // If a peer somehow triggers this, we'll disconnect them + // eventually. + (Vec::new(), vec![UNCONNECTED_PEERSET_COST]) + }; + + for report in reports { + network_service.report_peer(remote.clone(), report); + } + + dispatch_collation_events_to_all(events, &mut sender).await; } }, } @@ -1007,14 +1140,14 @@ fn update_our_view( ) }; - send_validation_message( + send_validation_message_v1( net, validation_peers, WireMessage::ViewUpdate(new_view.clone()), metrics, ); - send_collation_message(net, collation_peers, WireMessage::ViewUpdate(new_view), metrics); + send_collation_message_v1(net, collation_peers, WireMessage::ViewUpdate(new_view), metrics); let our_view = OurView::new( live_heads.iter().take(MAX_VIEW_HEADS).cloned().map(|a| (a.hash, a.span)), @@ -1032,27 +1165,34 @@ fn update_our_view( ); } -// Handle messages on a specific peer-set. The peer is expected to be connected on that +// Handle messages on a specific v1 peer-set. The peer is expected to be connected on that // peer-set. -fn handle_peer_messages( +fn handle_v1_peer_messages>( peer: PeerId, peer_set: PeerSet, peers: &mut HashMap, - messages: Vec<(WireMessage, usize)>, + messages: Vec, metrics: &Metrics, -) -> (Vec>, Vec) { +) -> (Vec>, Vec) { let peer_data = match peers.get_mut(&peer) { None => return (Vec::new(), vec![UNCONNECTED_PEERSET_COST]), Some(d) => d, }; - let mut outgoing_messages = Vec::with_capacity(messages.len()); + let mut outgoing_events = Vec::with_capacity(messages.len()); let mut reports = Vec::new(); - for (message, size_bytes) in messages { - metrics.on_notification_received(peer_set, size_bytes); + for message in messages { + metrics.on_notification_received(peer_set, peer_data.version, message.len()); + let message = match WireMessage::::decode_all(&mut message.as_ref()) { + Err(_) => { + reports.push(MALFORMED_MESSAGE_COST); + continue + }, + Ok(m) => m, + }; - outgoing_messages.push(match message { + outgoing_events.push(match message { WireMessage::ViewUpdate(new_view) => { if new_view.len() > MAX_VIEW_HEADS || new_view.finalized_number < peer_data.view.finalized_number @@ -1071,47 +1211,47 @@ fn handle_peer_messages( } }, WireMessage::ProtocolMessage(message) => - NetworkBridgeEvent::PeerMessage(peer.clone(), message), + NetworkBridgeEvent::PeerMessage(peer.clone(), message.into()), }) } - (outgoing_messages, reports) + (outgoing_events, reports) } -fn send_validation_message( +fn send_validation_message_v1( net: &mut impl Network, peers: Vec, message: WireMessage, metrics: &Metrics, ) { - send_message(net, peers, PeerSet::Validation, message, metrics); + send_message(net, peers, PeerSet::Validation, 1, message, metrics); } -fn send_collation_message( +fn send_collation_message_v1( net: &mut impl Network, peers: Vec, message: WireMessage, metrics: &Metrics, ) { - send_message(net, peers, PeerSet::Collation, message, metrics) + send_message(net, peers, PeerSet::Collation, 1, message, metrics) } async fn dispatch_validation_event_to_all( - event: NetworkBridgeEvent, + event: NetworkBridgeEvent, ctx: &mut impl SubsystemSender, ) { dispatch_validation_events_to_all(std::iter::once(event), ctx).await } async fn dispatch_collation_event_to_all( - event: NetworkBridgeEvent, + event: NetworkBridgeEvent, ctx: &mut impl SubsystemSender, ) { dispatch_collation_events_to_all(std::iter::once(event), ctx).await } fn dispatch_validation_event_to_all_unbounded( - event: NetworkBridgeEvent, + event: NetworkBridgeEvent, ctx: &mut impl SubsystemSender, ) { for msg in AllMessages::dispatch_iter(event) { @@ -1120,17 +1260,17 @@ fn dispatch_validation_event_to_all_unbounded( } fn dispatch_collation_event_to_all_unbounded( - event: NetworkBridgeEvent, + event: NetworkBridgeEvent, ctx: &mut impl SubsystemSender, ) { - if let Some(msg) = event.focus().ok().map(CollatorProtocolMessage::NetworkBridgeUpdateV1) { + if let Some(msg) = event.focus().ok().map(CollatorProtocolMessage::NetworkBridgeUpdate) { ctx.send_unbounded_message(msg.into()); } } async fn dispatch_validation_events_to_all(events: I, ctx: &mut impl SubsystemSender) where - I: IntoIterator>, + I: IntoIterator>, I::IntoIter: Send, { ctx.send_messages(events.into_iter().flat_map(AllMessages::dispatch_iter)).await @@ -1138,13 +1278,14 @@ where async fn dispatch_collation_events_to_all(events: I, ctx: &mut impl SubsystemSender) where - I: IntoIterator>, + I: IntoIterator>, I::IntoIter: Send, { - let messages_for = |event: NetworkBridgeEvent| { - event.focus().ok().map(|m| { - AllMessages::CollatorProtocol(CollatorProtocolMessage::NetworkBridgeUpdateV1(m)) - }) + let messages_for = |event: NetworkBridgeEvent| { + event + .focus() + .ok() + .map(|m| AllMessages::CollatorProtocol(CollatorProtocolMessage::NetworkBridgeUpdate(m))) }; ctx.send_messages(events.into_iter().flat_map(messages_for)).await diff --git a/node/network/bridge/src/network.rs b/node/network/bridge/src/network.rs index e5ce3effefb1..538958602d25 100644 --- a/node/network/bridge/src/network.rs +++ b/node/network/bridge/src/network.rs @@ -29,7 +29,7 @@ use sc_network::{ use polkadot_node_network_protocol::{ peer_set::PeerSet, request_response::{OutgoingRequest, Recipient, Requests}, - PeerId, UnifiedReputationChange as Rep, + PeerId, ProtocolVersion, UnifiedReputationChange as Rep, }; use polkadot_primitives::v2::{AuthorityDiscoveryId, Block, Hash}; @@ -46,6 +46,7 @@ pub(crate) fn send_message( net: &mut impl Network, mut peers: Vec, peer_set: PeerSet, + version: ProtocolVersion, message: M, metrics: &super::Metrics, ) where @@ -53,7 +54,7 @@ pub(crate) fn send_message( { let message = { let encoded = message.encode(); - metrics.on_notification_sent(peer_set, encoded.len(), peers.len()); + metrics.on_notification_sent(peer_set, version, encoded.len(), peers.len()); encoded }; @@ -131,14 +132,18 @@ impl Network for Arc> { } fn disconnect_peer(&self, who: PeerId, peer_set: PeerSet) { - sc_network::NetworkService::disconnect_peer(&**self, who, peer_set.into_protocol_name()); + sc_network::NetworkService::disconnect_peer( + &**self, + who, + peer_set.into_default_protocol_name(), + ); } fn write_notification(&self, who: PeerId, peer_set: PeerSet, message: Vec) { sc_network::NetworkService::write_notification( &**self, who, - peer_set.into_protocol_name(), + peer_set.into_default_protocol_name(), message, ); } diff --git a/node/network/bridge/src/tests.rs b/node/network/bridge/src/tests.rs index 75681ee7b5d6..140280e7af3c 100644 --- a/node/network/bridge/src/tests.rs +++ b/node/network/bridge/src/tests.rs @@ -28,7 +28,9 @@ use std::{ use sc_network::{Event as NetworkEvent, IfDisconnected}; -use polkadot_node_network_protocol::{request_response::outgoing::Requests, view, ObservedRole}; +use polkadot_node_network_protocol::{ + request_response::outgoing::Requests, view, ObservedRole, Versioned, +}; use polkadot_node_subsystem_test_helpers::{ SingleItemSink, SingleItemStream, TestSubsystemContextHandle, }; @@ -175,7 +177,7 @@ impl TestNetworkHandle { async fn connect_peer(&mut self, peer: PeerId, peer_set: PeerSet, role: ObservedRole) { self.send_network_event(NetworkEvent::NotificationStreamOpened { remote: peer, - protocol: peer_set.into_protocol_name(), + protocol: peer_set.into_default_protocol_name(), negotiated_fallback: None, role: role.into(), }) @@ -185,7 +187,7 @@ impl TestNetworkHandle { async fn disconnect_peer(&mut self, peer: PeerId, peer_set: PeerSet) { self.send_network_event(NetworkEvent::NotificationStreamClosed { remote: peer, - protocol: peer_set.into_protocol_name(), + protocol: peer_set.into_default_protocol_name(), }) .await; } @@ -193,7 +195,7 @@ impl TestNetworkHandle { async fn peer_message(&mut self, peer: PeerId, peer_set: PeerSet, message: Vec) { self.send_network_event(NetworkEvent::NotificationsReceived { remote: peer, - messages: vec![(peer_set.into_protocol_name(), message.into())], + messages: vec![(peer_set.into_default_protocol_name(), message.into())], }) .await; } @@ -308,7 +310,7 @@ fn test_harness>( } async fn assert_sends_validation_event_to_all( - event: NetworkBridgeEvent, + event: NetworkBridgeEvent, virtual_overseer: &mut TestSubsystemContextHandle, ) { // Ordering must match the enum variant order @@ -316,40 +318,40 @@ async fn assert_sends_validation_event_to_all( assert_matches!( virtual_overseer.recv().await, AllMessages::StatementDistribution( - StatementDistributionMessage::NetworkBridgeUpdateV1(e) + StatementDistributionMessage::NetworkBridgeUpdate(e) ) if e == event.focus().expect("could not focus message") ); assert_matches!( virtual_overseer.recv().await, AllMessages::BitfieldDistribution( - BitfieldDistributionMessage::NetworkBridgeUpdateV1(e) + BitfieldDistributionMessage::NetworkBridgeUpdate(e) ) if e == event.focus().expect("could not focus message") ); assert_matches!( virtual_overseer.recv().await, AllMessages::ApprovalDistribution( - ApprovalDistributionMessage::NetworkBridgeUpdateV1(e) + ApprovalDistributionMessage::NetworkBridgeUpdate(e) ) if e == event.focus().expect("could not focus message") ); assert_matches!( virtual_overseer.recv().await, AllMessages::GossipSupport( - GossipSupportMessage::NetworkBridgeUpdateV1(e) + GossipSupportMessage::NetworkBridgeUpdate(e) ) if e == event.focus().expect("could not focus message") ); } async fn assert_sends_collation_event_to_all( - event: NetworkBridgeEvent, + event: NetworkBridgeEvent, virtual_overseer: &mut TestSubsystemContextHandle, ) { assert_matches!( virtual_overseer.recv().await, AllMessages::CollatorProtocol( - CollatorProtocolMessage::NetworkBridgeUpdateV1(e) + CollatorProtocolMessage::NetworkBridgeUpdate(e) ) if e == event.focus().expect("could not focus message") ) } @@ -642,7 +644,7 @@ fn peer_view_updates_sent_via_overseer() { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, 1, None), &mut virtual_overseer, ) .await; @@ -685,7 +687,7 @@ fn peer_messages_sent_via_overseer() { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, 1, None), &mut virtual_overseer, ) .await; @@ -700,7 +702,7 @@ fn peer_messages_sent_via_overseer() { let approval_distribution_message = protocol_v1::ApprovalDistributionMessage::Approvals(Vec::new()); - let message = protocol_v1::ValidationProtocol::ApprovalDistribution( + let message_v1 = protocol_v1::ValidationProtocol::ApprovalDistribution( approval_distribution_message.clone(), ); @@ -708,7 +710,7 @@ fn peer_messages_sent_via_overseer() { .peer_message( peer.clone(), PeerSet::Validation, - WireMessage::ProtocolMessage(message.clone()).encode(), + WireMessage::ProtocolMessage(message_v1.clone()).encode(), ) .await; @@ -720,8 +722,8 @@ fn peer_messages_sent_via_overseer() { assert_matches!( virtual_overseer.recv().await, AllMessages::ApprovalDistribution( - ApprovalDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage(p, m) + ApprovalDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerMessage(p, Versioned::V1(m)) ) ) => { assert_eq!(p, peer); @@ -755,7 +757,7 @@ fn peer_disconnect_from_just_one_peerset() { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, 1, None), &mut virtual_overseer, ) .await; @@ -769,7 +771,7 @@ fn peer_disconnect_from_just_one_peerset() { { assert_sends_collation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, 1, None), &mut virtual_overseer, ) .await; @@ -838,7 +840,7 @@ fn relays_collation_protocol_messages() { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, None), + NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, 1, None), &mut virtual_overseer, ) .await; @@ -852,7 +854,7 @@ fn relays_collation_protocol_messages() { { assert_sends_collation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None), + NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, 1, None), &mut virtual_overseer, ) .await; @@ -872,14 +874,14 @@ fn relays_collation_protocol_messages() { sp_core::crypto::UncheckedFrom::unchecked_from([1u8; 64]), ); - let message = + let message_v1 = protocol_v1::CollationProtocol::CollatorProtocol(collator_protocol_message.clone()); network_handle .peer_message( peer_a.clone(), PeerSet::Collation, - WireMessage::ProtocolMessage(message.clone()).encode(), + WireMessage::ProtocolMessage(message_v1.clone()).encode(), ) .await; @@ -895,15 +897,15 @@ fn relays_collation_protocol_messages() { .peer_message( peer_b.clone(), PeerSet::Collation, - WireMessage::ProtocolMessage(message.clone()).encode(), + WireMessage::ProtocolMessage(message_v1.clone()).encode(), ) .await; assert_matches!( virtual_overseer.recv().await, AllMessages::CollatorProtocol( - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage(p, m) + CollatorProtocolMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerMessage(p, Versioned::V1(m)) ) ) => { assert_eq!(p, peer_b); @@ -931,7 +933,7 @@ fn different_views_on_different_peer_sets() { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, 1, None), &mut virtual_overseer, ) .await; @@ -945,7 +947,7 @@ fn different_views_on_different_peer_sets() { { assert_sends_collation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, 1, None), &mut virtual_overseer, ) .await; @@ -1094,7 +1096,7 @@ fn send_messages_to_peers() { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, 1, None), &mut virtual_overseer, ) .await; @@ -1108,7 +1110,7 @@ fn send_messages_to_peers() { { assert_sends_collation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, 1, None), &mut virtual_overseer, ) .await; @@ -1131,7 +1133,7 @@ fn send_messages_to_peers() { let approval_distribution_message = protocol_v1::ApprovalDistributionMessage::Approvals(Vec::new()); - let message = protocol_v1::ValidationProtocol::ApprovalDistribution( + let message_v1 = protocol_v1::ValidationProtocol::ApprovalDistribution( approval_distribution_message.clone(), ); @@ -1139,7 +1141,7 @@ fn send_messages_to_peers() { .send(FromOverseer::Communication { msg: NetworkBridgeMessage::SendValidationMessage( vec![peer.clone()], - message.clone(), + Versioned::V1(message_v1.clone()), ), }) .await; @@ -1149,7 +1151,7 @@ fn send_messages_to_peers() { NetworkAction::WriteNotification( peer.clone(), PeerSet::Validation, - WireMessage::ProtocolMessage(message).encode(), + WireMessage::ProtocolMessage(message_v1).encode(), ) ); } @@ -1163,14 +1165,14 @@ fn send_messages_to_peers() { dummy_collator_signature(), ); - let message = + let message_v1 = protocol_v1::CollationProtocol::CollatorProtocol(collator_protocol_message.clone()); virtual_overseer .send(FromOverseer::Communication { msg: NetworkBridgeMessage::SendCollationMessage( vec![peer.clone()], - message.clone(), + Versioned::V1(message_v1.clone()), ), }) .await; @@ -1180,7 +1182,7 @@ fn send_messages_to_peers() { NetworkAction::WriteNotification( peer.clone(), PeerSet::Collation, - WireMessage::ProtocolMessage(message).encode(), + WireMessage::ProtocolMessage(message_v1).encode(), ) ); } diff --git a/node/network/bridge/src/validator_discovery.rs b/node/network/bridge/src/validator_discovery.rs index 69f4fabb4283..eb9bb954e7a1 100644 --- a/node/network/bridge/src/validator_discovery.rs +++ b/node/network/bridge/src/validator_discovery.rs @@ -75,15 +75,21 @@ impl Service { ); // ask the network to connect to these nodes and not disconnect // from them until removed from the set + // + // for peer-set management, the default should be used regardless of + // the negotiated version. if let Err(e) = network_service - .set_reserved_peers(peer_set.into_protocol_name(), newly_requested) + .set_reserved_peers(peer_set.into_default_protocol_name(), newly_requested) .await { gum::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress"); } // the addresses are known to be valid + // + // for peer-set management, the default should be used regardless of + // the negotiated version. let _ = network_service - .remove_from_peers_set(peer_set.into_protocol_name(), peers_to_remove) + .remove_from_peers_set(peer_set.into_default_protocol_name(), peers_to_remove) .await; network_service diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index 5a286bdf89a8..713303d3d869 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -26,13 +26,14 @@ use futures::{ use sp_core::Pair; use polkadot_node_network_protocol::{ + self as net_protocol, peer_set::PeerSet, request_response::{ incoming::{self, OutgoingResponse}, v1::{self as request_v1, CollationFetchingRequest, CollationFetchingResponse}, IncomingRequest, IncomingRequestReceiver, }, - v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep, View, + v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep, Versioned, View, }; use polkadot_node_primitives::{CollationSecondedSignal, PoV, Statement}; use polkadot_node_subsystem_util::{ @@ -545,7 +546,7 @@ where ctx.send_message(NetworkBridgeMessage::SendCollationMessage( vec![peer], - protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + Versioned::V1(protocol_v1::CollationProtocol::CollatorProtocol(wire_message)), )) .await; } @@ -622,7 +623,7 @@ async fn advertise_collation( ctx.send_message(NetworkBridgeMessage::SendCollationMessage( vec![peer.clone()], - protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + Versioned::V1(protocol_v1::CollationProtocol::CollatorProtocol(wire_message)), )) .await; @@ -687,9 +688,10 @@ where "ReportCollator message is not expected on the collator side of the protocol", ); }, - NetworkBridgeUpdateV1(event) => { + NetworkBridgeUpdate(event) => { // We should count only this shoulder in the histogram, as other shoulders are just introducing noise let _ = state.metrics.time_process_msg(); + if let Err(e) = handle_network_msg(ctx, runtime, state, event).await { gum::warn!( target: LOG_TARGET, @@ -930,7 +932,7 @@ async fn handle_network_msg( ctx: &mut Context, runtime: &mut RuntimeInfo, state: &mut State, - bridge_message: NetworkBridgeEvent, + bridge_message: NetworkBridgeEvent, ) -> Result<()> where Context: SubsystemContext, @@ -939,7 +941,7 @@ where use NetworkBridgeEvent::*; match bridge_message { - PeerConnected(peer_id, observed_role, maybe_authority) => { + PeerConnected(peer_id, observed_role, _, maybe_authority) => { // If it is possible that a disconnected validator would attempt a reconnect // it should be handled here. gum::trace!(target: LOG_TARGET, ?peer_id, ?observed_role, "Peer connected"); @@ -968,7 +970,7 @@ where gum::trace!(target: LOG_TARGET, ?view, "Own view change"); handle_our_view_change(state, view).await?; }, - PeerMessage(remote, msg) => { + PeerMessage(remote, Versioned::V1(msg)) => { handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?; }, NewGossipTopology { .. } => { diff --git a/node/network/collator-protocol/src/collator_side/tests.rs b/node/network/collator-protocol/src/collator_side/tests.rs index f8f985cd4c66..0ddb79fd53f8 100644 --- a/node/network/collator-protocol/src/collator_side/tests.rs +++ b/node/network/collator-protocol/src/collator_side/tests.rs @@ -171,7 +171,7 @@ impl TestState { overseer_send( virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::OurViewChange( our_view, )), ) @@ -277,9 +277,9 @@ async fn setup_system(virtual_overseer: &mut VirtualOverseer, test_state: &TestS overseer_send( virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( - our_view![test_state.relay_parent], - )), + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::OurViewChange(our_view![ + test_state.relay_parent + ])), ) .await; } @@ -388,9 +388,10 @@ async fn connect_peer( ) { overseer_send( virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerConnected( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( peer.clone(), polkadot_node_network_protocol::ObservedRole::Authority, + 1, authority_id.map(|v| HashSet::from([v])), )), ) @@ -398,7 +399,7 @@ async fn connect_peer( overseer_send( virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerViewChange( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerViewChange( peer, view![], )), @@ -410,7 +411,7 @@ async fn connect_peer( async fn disconnect_peer(virtual_overseer: &mut VirtualOverseer, peer: PeerId) { overseer_send( virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerDisconnected(peer)), + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerDisconnected(peer)), ) .await; } @@ -426,7 +427,7 @@ async fn expect_declare_msg( AllMessages::NetworkBridge( NetworkBridgeMessage::SendCollationMessage( to, - protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + Versioned::V1(protocol_v1::CollationProtocol::CollatorProtocol(wire_message)), ) ) => { assert_eq!(to[0], *peer); @@ -460,7 +461,7 @@ async fn expect_advertise_collation_msg( AllMessages::NetworkBridge( NetworkBridgeMessage::SendCollationMessage( to, - protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + Versioned::V1(protocol_v1::CollationProtocol::CollatorProtocol(wire_message)), ) ) => { assert_eq!(to[0], *peer); @@ -484,7 +485,7 @@ async fn send_peer_view_change( ) { overseer_send( virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerViewChange( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerViewChange( peer.clone(), View::new(hashes, 0), )), @@ -627,7 +628,7 @@ fn advertise_and_send_collation() { // Send info about peer's view. overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerViewChange( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerViewChange( peer.clone(), view![test_state.relay_parent], )), @@ -824,13 +825,13 @@ fn collators_reject_declare_messages() { overseer_send( virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerMessage( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage( peer.clone(), - protocol_v1::CollatorProtocolMessage::Declare( + Versioned::V1(protocol_v1::CollatorProtocolMessage::Declare( collator_pair2.public(), ParaId::from(5), collator_pair2.sign(b"garbage"), - ), + )), )), ) .await; diff --git a/node/network/collator-protocol/src/validator_side/mod.rs b/node/network/collator-protocol/src/validator_side/mod.rs index f3e50a630097..9462a698a511 100644 --- a/node/network/collator-protocol/src/validator_side/mod.rs +++ b/node/network/collator-protocol/src/validator_side/mod.rs @@ -33,6 +33,7 @@ use std::{ use sp_keystore::SyncCryptoStorePtr; use polkadot_node_network_protocol::{ + self as net_protocol, peer_set::PeerSet, request_response as req_res, request_response::{ @@ -40,7 +41,7 @@ use polkadot_node_network_protocol::{ v1::{CollationFetchingRequest, CollationFetchingResponse}, OutgoingRequest, Requests, }, - v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep, View, + v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep, Versioned, View, }; use polkadot_node_primitives::{PoV, SignedFullStatement}; use polkadot_node_subsystem_util::metrics::{self, prometheus}; @@ -726,7 +727,7 @@ async fn notify_collation_seconded( protocol_v1::CollatorProtocolMessage::CollationSeconded(relay_parent, statement.into()); ctx.send_message(NetworkBridgeMessage::SendCollationMessage( vec![peer_id], - protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + Versioned::V1(protocol_v1::CollationProtocol::CollatorProtocol(wire_message)), )) .await; @@ -790,7 +791,7 @@ async fn request_collation( Recipient::Peer(peer_id), CollationFetchingRequest { relay_parent, para_id }, ); - let requests = Requests::CollationFetching(full_request); + let requests = Requests::CollationFetchingV1(full_request); let per_request = PerRequest { from_collator: response_recv.boxed().fuse(), @@ -1073,7 +1074,7 @@ async fn handle_network_msg( ctx: &mut Context, state: &mut State, keystore: &SyncCryptoStorePtr, - bridge_message: NetworkBridgeEvent, + bridge_message: NetworkBridgeEvent, ) -> Result<()> where Context: overseer::SubsystemContext, @@ -1082,7 +1083,7 @@ where use NetworkBridgeEvent::*; match bridge_message { - PeerConnected(peer_id, _role, _) => { + PeerConnected(peer_id, _role, _version, _) => { state.peer_data.entry(peer_id).or_default(); state.metrics.note_collator_peer_count(state.peer_data.len()); }, @@ -1099,7 +1100,7 @@ where OurViewChange(view) => { handle_our_view_change(ctx, state, keystore, view).await?; }, - PeerMessage(remote, msg) => { + PeerMessage(remote, Versioned::V1(msg)) => { process_incoming_peer_message(ctx, state, remote, msg).await; }, } @@ -1138,7 +1139,7 @@ async fn process_msg( ReportCollator(id) => { report_collator(ctx, &state.peer_data, id).await; }, - NetworkBridgeUpdateV1(event) => { + NetworkBridgeUpdate(event) => { if let Err(e) = handle_network_msg(ctx, state, keystore, event).await { gum::warn!( target: LOG_TARGET, diff --git a/node/network/collator-protocol/src/validator_side/tests.rs b/node/network/collator-protocol/src/validator_side/tests.rs index 913575469579..e0406b433fe5 100644 --- a/node/network/collator-protocol/src/validator_side/tests.rs +++ b/node/network/collator-protocol/src/validator_side/tests.rs @@ -283,7 +283,7 @@ async fn assert_fetch_collation_request( let req = reqs.into_iter().next() .expect("There should be exactly one request"); match req { - Requests::CollationFetching(req) => { + Requests::CollationFetchingV1(req) => { let payload = req.payload; assert_eq!(payload.relay_parent, relay_parent); assert_eq!(payload.para_id, para_id); @@ -303,9 +303,10 @@ async fn connect_and_declare_collator( ) { overseer_send( virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerConnected( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( peer.clone(), ObservedRole::Full, + 1, None, )), ) @@ -313,13 +314,13 @@ async fn connect_and_declare_collator( overseer_send( virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerMessage( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage( peer.clone(), - protocol_v1::CollatorProtocolMessage::Declare( + Versioned::V1(protocol_v1::CollatorProtocolMessage::Declare( collator.public(), para_id, collator.sign(&protocol_v1::declare_signature_payload(&peer)), - ), + )), )), ) .await; @@ -333,9 +334,9 @@ async fn advertise_collation( ) { overseer_send( virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerMessage( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage( peer, - protocol_v1::CollatorProtocolMessage::AdvertiseCollation(relay_parent), + Versioned::V1(protocol_v1::CollatorProtocolMessage::AdvertiseCollation(relay_parent)), )), ) .await; @@ -354,7 +355,7 @@ fn act_on_advertisement() { overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::OurViewChange( our_view![test_state.relay_parent], )), ) @@ -395,7 +396,7 @@ fn collator_reporting_works() { overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::OurViewChange( our_view![test_state.relay_parent], )), ) @@ -454,9 +455,10 @@ fn collator_authentication_verification_works() { overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerConnected( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( peer_b, ObservedRole::Full, + 1, None, )), ) @@ -465,13 +467,13 @@ fn collator_authentication_verification_works() { // the peer sends a declare message but sign the wrong payload overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerMessage( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage( peer_b.clone(), - protocol_v1::CollatorProtocolMessage::Declare( + Versioned::V1(protocol_v1::CollatorProtocolMessage::Declare( test_state.collators[0].public(), test_state.chain_ids[0], test_state.collators[0].sign(&[42]), - ), + )), )), ) .await; @@ -510,7 +512,7 @@ fn fetch_collations_works() { overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::OurViewChange( our_view![test_state.relay_parent, second], )), ) @@ -574,7 +576,7 @@ fn fetch_collations_works() { overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerDisconnected( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerDisconnected( peer_b.clone(), )), ) @@ -582,7 +584,7 @@ fn fetch_collations_works() { overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerDisconnected( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerDisconnected( peer_c.clone(), )), ) @@ -675,7 +677,7 @@ fn reject_connection_to_next_group() { overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::OurViewChange( our_view![test_state.relay_parent], )), ) @@ -722,7 +724,7 @@ fn fetch_next_collation_on_invalid_collation() { overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::OurViewChange( our_view![test_state.relay_parent, second], )), ) @@ -822,7 +824,7 @@ fn inactive_disconnected() { overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::OurViewChange( our_view![hash_a], )), ) @@ -870,7 +872,7 @@ fn activity_extends_life() { overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::OurViewChange( our_view![hash_a, hash_b, hash_c], )), ) @@ -929,7 +931,7 @@ fn disconnect_if_no_declare() { overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::OurViewChange( our_view![test_state.relay_parent], )), ) @@ -941,9 +943,10 @@ fn disconnect_if_no_declare() { overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerConnected( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( peer_b.clone(), ObservedRole::Full, + 1, None, )), ) @@ -966,7 +969,7 @@ fn disconnect_if_wrong_declare() { overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::OurViewChange( our_view![test_state.relay_parent], )), ) @@ -978,9 +981,10 @@ fn disconnect_if_wrong_declare() { overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerConnected( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( peer_b.clone(), ObservedRole::Full, + 1, None, )), ) @@ -988,13 +992,13 @@ fn disconnect_if_wrong_declare() { overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerMessage( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage( peer_b.clone(), - protocol_v1::CollatorProtocolMessage::Declare( + Versioned::V1(protocol_v1::CollatorProtocolMessage::Declare( pair.public(), ParaId::from(69), pair.sign(&protocol_v1::declare_signature_payload(&peer_b)), - ), + )), )), ) .await; @@ -1027,7 +1031,7 @@ fn view_change_clears_old_collators() { overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::OurViewChange( our_view![test_state.relay_parent], )), ) @@ -1049,7 +1053,7 @@ fn view_change_clears_old_collators() { overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( + CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::OurViewChange( our_view![hash_b], )), ) diff --git a/node/network/dispute-distribution/src/sender/send_task.rs b/node/network/dispute-distribution/src/sender/send_task.rs index 4c032990aa09..9cc202a69e9f 100644 --- a/node/network/dispute-distribution/src/sender/send_task.rs +++ b/node/network/dispute-distribution/src/sender/send_task.rs @@ -255,7 +255,7 @@ async fn send_requests( let (outgoing, pending_response) = OutgoingRequest::new(Recipient::Authority(receiver.clone()), req.clone()); - reqs.push(Requests::DisputeSending(outgoing)); + reqs.push(Requests::DisputeSendingV1(outgoing)); let fut = wait_response_task( pending_response, diff --git a/node/network/dispute-distribution/src/tests/mod.rs b/node/network/dispute-distribution/src/tests/mod.rs index e3df6c20a6af..c9d15528a6ef 100644 --- a/node/network/dispute-distribution/src/tests/mod.rs +++ b/node/network/dispute-distribution/src/tests/mod.rs @@ -668,7 +668,7 @@ async fn check_sent_requests( let reqs: Vec<_> = reqs.into_iter().map(|r| assert_matches!( r, - Requests::DisputeSending(req) => {req} + Requests::DisputeSendingV1(req) => {req} ) ) .collect(); diff --git a/node/network/gossip-support/src/lib.rs b/node/network/gossip-support/src/lib.rs index d8ba6ce7c89d..f808cc7689dd 100644 --- a/node/network/gossip-support/src/lib.rs +++ b/node/network/gossip-support/src/lib.rs @@ -40,8 +40,8 @@ use sp_application_crypto::{AppKey, ByteArray}; use sp_keystore::{CryptoStore, SyncCryptoStorePtr}; use polkadot_node_network_protocol::{ - authority_discovery::AuthorityDiscovery, peer_set::PeerSet, v1::GossipSuppportNetworkMessage, - PeerId, + authority_discovery::AuthorityDiscovery, peer_set::PeerSet, GossipSupportNetworkMessage, + PeerId, Versioned, }; use polkadot_node_subsystem::{ messages::{ @@ -169,7 +169,7 @@ where ); match message { FromOverseer::Communication { - msg: GossipSupportMessage::NetworkBridgeUpdateV1(ev), + msg: GossipSupportMessage::NetworkBridgeUpdate(ev), } => self.handle_connect_disconnect(ev), FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, @@ -383,9 +383,9 @@ where }; } - fn handle_connect_disconnect(&mut self, ev: NetworkBridgeEvent) { + fn handle_connect_disconnect(&mut self, ev: NetworkBridgeEvent) { match ev { - NetworkBridgeEvent::PeerConnected(peer_id, _, o_authority) => { + NetworkBridgeEvent::PeerConnected(peer_id, _, _, o_authority) => { if let Some(authority_ids) = o_authority { authority_ids.iter().for_each(|a| { self.connected_authorities.insert(a.clone(), peer_id); @@ -404,7 +404,7 @@ where NetworkBridgeEvent::OurViewChange(_) => {}, NetworkBridgeEvent::PeerViewChange(_, _) => {}, NetworkBridgeEvent::NewGossipTopology { .. } => {}, - NetworkBridgeEvent::PeerMessage(_, v) => { + NetworkBridgeEvent::PeerMessage(_, Versioned::V1(v)) => { match v {}; }, } diff --git a/node/network/protocol/Cargo.toml b/node/network/protocol/Cargo.toml index 8c9090399302..ee8c02372409 100644 --- a/node/network/protocol/Cargo.toml +++ b/node/network/protocol/Cargo.toml @@ -17,3 +17,4 @@ strum = { version = "0.24", features = ["derive"] } futures = "0.3.21" thiserror = "1.0.30" fatality = "0.0.6" +derive_more = "0.99" diff --git a/node/network/protocol/src/lib.rs b/node/network/protocol/src/lib.rs index b039527440f2..cd659ad090bc 100644 --- a/node/network/protocol/src/lib.rs +++ b/node/network/protocol/src/lib.rs @@ -89,36 +89,6 @@ impl Into for ObservedRole { } } -/// Implement `TryFrom` for one enum variant into the inner type. -/// `$m_ty::$variant(inner) -> Ok(inner)` -macro_rules! impl_try_from { - ($m_ty:ident, $variant:ident, $out:ty) => { - impl TryFrom<$m_ty> for $out { - type Error = crate::WrongVariant; - - fn try_from(x: $m_ty) -> Result<$out, Self::Error> { - #[allow(unreachable_patterns)] // when there is only one variant - match x { - $m_ty::$variant(y) => Ok(y), - _ => Err(crate::WrongVariant), - } - } - } - - impl<'a> TryFrom<&'a $m_ty> for &'a $out { - type Error = crate::WrongVariant; - - fn try_from(x: &'a $m_ty) -> Result<&'a $out, Self::Error> { - #[allow(unreachable_patterns)] // when there is only one variant - match *x { - $m_ty::$variant(ref y) => Ok(y), - _ => Err(crate::WrongVariant), - } - } - } - }; -} - /// Specialized wrapper around [`View`]. /// /// Besides the access to the view itself, it also gives access to the [`jaeger::Span`] per leave/head. @@ -279,7 +249,152 @@ impl View { } } -/// v1 protocol types. +/// A protocol-versioned type. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Versioned { + /// V1 type. + V1(V1), +} + +impl Versioned<&'_ V1> { + /// Convert to a fully-owned version of the message. + pub fn clone_inner(&self) -> Versioned { + match *self { + Versioned::V1(inner) => Versioned::V1(inner.clone()), + } + } +} + +/// All supported versions of the validation protocol message. +pub type VersionedValidationProtocol = Versioned; + +impl From for VersionedValidationProtocol { + fn from(v1: v1::ValidationProtocol) -> Self { + VersionedValidationProtocol::V1(v1) + } +} + +/// All supported versions of the collation protocol message. +pub type VersionedCollationProtocol = Versioned; + +impl From for VersionedCollationProtocol { + fn from(v1: v1::CollationProtocol) -> Self { + VersionedCollationProtocol::V1(v1) + } +} + +macro_rules! impl_versioned_full_protocol_from { + ($from:ty, $out:ty, $variant:ident) => { + impl From<$from> for $out { + fn from(versioned_from: $from) -> $out { + match versioned_from { + Versioned::V1(x) => Versioned::V1(x.into()), + } + } + } + }; +} + +/// Implement `TryFrom` for one versioned enum variant into the inner type. +/// `$m_ty::$variant(inner) -> Ok(inner)` +macro_rules! impl_versioned_try_from { + ($from:ty, $out:ty, $v1_pat:pat => $v1_out:expr) => { + impl TryFrom<$from> for $out { + type Error = crate::WrongVariant; + + fn try_from(x: $from) -> Result<$out, Self::Error> { + #[allow(unreachable_patterns)] // when there is only one variant + match x { + Versioned::V1($v1_pat) => Ok(Versioned::V1($v1_out)), + _ => Err(crate::WrongVariant), + } + } + } + + impl<'a> TryFrom<&'a $from> for $out { + type Error = crate::WrongVariant; + + fn try_from(x: &'a $from) -> Result<$out, Self::Error> { + #[allow(unreachable_patterns)] // when there is only one variant + match x { + Versioned::V1($v1_pat) => Ok(Versioned::V1($v1_out.clone())), + _ => Err(crate::WrongVariant), + } + } + } + }; +} + +/// Version-annotated messages used by the bitfield distribution subsystem. +pub type BitfieldDistributionMessage = Versioned; +impl_versioned_full_protocol_from!( + BitfieldDistributionMessage, + VersionedValidationProtocol, + BitfieldDistribution +); +impl_versioned_try_from!( + VersionedValidationProtocol, + BitfieldDistributionMessage, + v1::ValidationProtocol::BitfieldDistribution(x) => x +); + +/// Version-annotated messages used by the statement distribution subsystem. +pub type StatementDistributionMessage = Versioned; +impl_versioned_full_protocol_from!( + StatementDistributionMessage, + VersionedValidationProtocol, + StatementDistribution +); +impl_versioned_try_from!( + VersionedValidationProtocol, + StatementDistributionMessage, + v1::ValidationProtocol::StatementDistribution(x) => x +); + +/// Version-annotated messages used by the approval distribution subsystem. +pub type ApprovalDistributionMessage = Versioned; +impl_versioned_full_protocol_from!( + ApprovalDistributionMessage, + VersionedValidationProtocol, + ApprovalDistribution +); +impl_versioned_try_from!( + VersionedValidationProtocol, + ApprovalDistributionMessage, + v1::ValidationProtocol::ApprovalDistribution(x) => x +); + +/// Version-annotated messages used by the gossip-support subsystem (this is void). +pub type GossipSupportNetworkMessage = Versioned; +// This is a void enum placeholder, so never gets sent over the wire. +impl TryFrom for GossipSupportNetworkMessage { + type Error = WrongVariant; + fn try_from(_: VersionedValidationProtocol) -> Result { + Err(WrongVariant) + } +} + +impl<'a> TryFrom<&'a VersionedValidationProtocol> for GossipSupportNetworkMessage { + type Error = WrongVariant; + fn try_from(_: &'a VersionedValidationProtocol) -> Result { + Err(WrongVariant) + } +} + +/// Version-annotated messages used by the bitfield distribution subsystem. +pub type CollatorProtocolMessage = Versioned; +impl_versioned_full_protocol_from!( + CollatorProtocolMessage, + VersionedCollationProtocol, + CollatorProtocol +); +impl_versioned_try_from!( + VersionedCollationProtocol, + CollatorProtocolMessage, + v1::CollationProtocol::CollatorProtocol(x) => x +); + +/// v1 notification protocol types. pub mod v1 { use parity_scale_codec::{Decode, Encode}; @@ -293,8 +408,6 @@ pub mod v1 { UncheckedSignedFullStatement, }; - use crate::WrongVariant; - /// Network messages used by the bitfield distribution subsystem. #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum BitfieldDistributionMessage { @@ -384,7 +497,7 @@ pub mod v1 { /// Dummy network message type, so we will receive connect/disconnect events. #[derive(Debug, Clone, PartialEq, Eq)] - pub enum GossipSuppportNetworkMessage {} + pub enum GossipSupportNetworkMessage {} /// Network messages used by the collator protocol subsystem #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] @@ -403,47 +516,31 @@ pub mod v1 { } /// All network messages on the validation peer-set. - #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] + #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq, derive_more::From)] pub enum ValidationProtocol { /// Bitfield distribution messages #[codec(index = 1)] + #[from] BitfieldDistribution(BitfieldDistributionMessage), /// Statement distribution messages #[codec(index = 3)] + #[from] StatementDistribution(StatementDistributionMessage), /// Approval distribution messages #[codec(index = 4)] + #[from] ApprovalDistribution(ApprovalDistributionMessage), } - impl_try_from!(ValidationProtocol, BitfieldDistribution, BitfieldDistributionMessage); - impl_try_from!(ValidationProtocol, StatementDistribution, StatementDistributionMessage); - impl_try_from!(ValidationProtocol, ApprovalDistribution, ApprovalDistributionMessage); - - impl TryFrom for GossipSuppportNetworkMessage { - type Error = WrongVariant; - fn try_from(_: ValidationProtocol) -> Result { - Err(WrongVariant) - } - } - - impl<'a> TryFrom<&'a ValidationProtocol> for &'a GossipSuppportNetworkMessage { - type Error = WrongVariant; - fn try_from(_: &'a ValidationProtocol) -> Result { - Err(WrongVariant) - } - } - /// All network messages on the collation peer-set. - #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] + #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq, derive_more::From)] pub enum CollationProtocol { /// Collator protocol messages #[codec(index = 0)] + #[from] CollatorProtocol(CollatorProtocolMessage), } - impl_try_from!(CollationProtocol, CollatorProtocol, CollatorProtocolMessage); - /// Get the payload that should be signed and included in a `Declare` message. /// /// The payload is the local peer id of the node, which serves to prove that it diff --git a/node/network/protocol/src/peer_set.rs b/node/network/protocol/src/peer_set.rs index 7856e3fd9f96..400b36e3d4c5 100644 --- a/node/network/protocol/src/peer_set.rs +++ b/node/network/protocol/src/peer_set.rs @@ -16,6 +16,7 @@ //! All peersets and protocols used for parachains. +use super::ProtocolVersion; use sc_network::config::{NonDefaultSetConfig, SetConfig}; use std::{ borrow::Cow, @@ -23,6 +24,16 @@ use std::{ }; use strum::{EnumIter, IntoEnumIterator}; +// Only supported protocol versions should be defined here. +const VALIDATION_PROTOCOL_V1: &str = "/polkadot/validation/1"; +const COLLATION_PROTOCOL_V1: &str = "/polkadot/collation/1"; + +/// The default validation protocol version. +pub const DEFAULT_VALIDATION_PROTOCOL_VERSION: ProtocolVersion = 1; + +/// The default collation protocol version. +pub const DEFAULT_COLLATION_PROTOCOL_VERSION: ProtocolVersion = 1; + /// The peer-sets and thus the protocols which are used for the network. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumIter)] pub enum PeerSet { @@ -45,12 +56,15 @@ pub enum IsAuthority { } impl PeerSet { - /// Get `sc_network` peer set configurations for each peerset. + /// Get `sc_network` peer set configurations for each peerset on the default version. /// /// Those should be used in the network configuration to register the protocols with the /// network service. pub fn get_info(self, is_authority: IsAuthority) -> NonDefaultSetConfig { - let protocol = self.into_protocol_name(); + let version = self.get_default_version(); + let protocol = self + .into_protocol_name(version) + .expect("default version always has protocol name; qed"); let max_notification_size = 100 * 1024; match self { @@ -88,24 +102,50 @@ impl PeerSet { } } - /// Get the protocol name associated with each peer set as static str. - pub const fn get_protocol_name_static(self) -> &'static str { + /// Get the default protocol version for this peer set. + pub const fn get_default_version(self) -> ProtocolVersion { + match self { + PeerSet::Validation => DEFAULT_VALIDATION_PROTOCOL_VERSION, + PeerSet::Collation => DEFAULT_COLLATION_PROTOCOL_VERSION, + } + } + + /// Get the default protocol name as a static str. + pub const fn get_default_protocol_name(self) -> &'static str { match self { - PeerSet::Validation => "/polkadot/validation/1", - PeerSet::Collation => "/polkadot/collation/1", + PeerSet::Validation => VALIDATION_PROTOCOL_V1, + PeerSet::Collation => COLLATION_PROTOCOL_V1, } } - /// Convert a peer set into a protocol name as understood by Substrate. - pub fn into_protocol_name(self) -> Cow<'static, str> { - self.get_protocol_name_static().into() + /// Get the protocol name associated with each peer set + /// and the given version, if any, as static str. + pub const fn get_protocol_name_static(self, version: ProtocolVersion) -> Option<&'static str> { + match (self, version) { + (PeerSet::Validation, 1) => Some(VALIDATION_PROTOCOL_V1), + (PeerSet::Collation, 1) => Some(COLLATION_PROTOCOL_V1), + _ => None, + } + } + + /// Get the protocol name associated with each peer set as understood by Substrate. + pub fn into_default_protocol_name(self) -> Cow<'static, str> { + self.get_default_protocol_name().into() } - /// Try parsing a protocol name into a peer set. - pub fn try_from_protocol_name(name: &Cow<'static, str>) -> Option { + /// Convert a peer set and the given version into a protocol name, if any, + /// as understood by Substrate. + pub fn into_protocol_name(self, version: ProtocolVersion) -> Option> { + self.get_protocol_name_static(version).map(|n| n.into()) + } + + /// Try parsing a protocol name into a peer set and protocol version. + /// + /// This only succeeds on supported versions. + pub fn try_from_protocol_name(name: &Cow<'static, str>) -> Option<(PeerSet, ProtocolVersion)> { match name { - n if n == &PeerSet::Validation.into_protocol_name() => Some(PeerSet::Validation), - n if n == &PeerSet::Collation.into_protocol_name() => Some(PeerSet::Collation), + n if n == VALIDATION_PROTOCOL_V1 => Some((PeerSet::Validation, 1)), + n if n == COLLATION_PROTOCOL_V1 => Some((PeerSet::Collation, 1)), _ => None, } } @@ -137,7 +177,7 @@ impl IndexMut for PerPeerSet { } } -/// Get `NonDefaultSetConfig`s for all available peer sets. +/// Get `NonDefaultSetConfig`s for all available peer sets, at their default versions. /// /// Should be used during network configuration (added to [`NetworkConfiguration::extra_sets`]) /// or shortly after startup to register the protocols with the network service. diff --git a/node/network/protocol/src/request_response/mod.rs b/node/network/protocol/src/request_response/mod.rs index 8b55cdbc0f15..7a0f85eaeced 100644 --- a/node/network/protocol/src/request_response/mod.rs +++ b/node/network/protocol/src/request_response/mod.rs @@ -60,17 +60,17 @@ pub mod v1; #[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, EnumIter)] pub enum Protocol { /// Protocol for chunk fetching, used by availability distribution and availability recovery. - ChunkFetching, + ChunkFetchingV1, /// Protocol for fetching collations from collators. - CollationFetching, + CollationFetchingV1, /// Protocol for fetching seconded PoVs from validators of the same group. - PoVFetching, + PoVFetchingV1, /// Protocol for fetching available data. - AvailableDataFetching, + AvailableDataFetchingV1, /// Fetching of statements that are too large for gossip. - StatementFetching, + StatementFetchingV1, /// Sending of dispute statements with application level confirmations. - DisputeSending, + DisputeSendingV1, } /// Minimum bandwidth we expect for validators - 500Mbit/s is the recommendation, so approximately @@ -111,12 +111,12 @@ pub const MAX_PARALLEL_STATEMENT_REQUESTS: u32 = 3; /// Response size limit for responses of POV like data. /// /// This is larger than `MAX_POV_SIZE` to account for protocol overhead and for additional data in -/// `CollationFetching` or `AvailableDataFetching` for example. We try to err on larger limits here +/// `CollationFetchingV1` or `AvailableDataFetchingV1` for example. We try to err on larger limits here /// as a too large limit only allows an attacker to waste our bandwidth some more, a too low limit /// might have more severe effects. const POV_RESPONSE_SIZE: u64 = MAX_POV_SIZE as u64 + 10_000; -/// Maximum response sizes for `StatementFetching`. +/// Maximum response sizes for `StatementFetchingV1`. /// /// This is `MAX_CODE_SIZE` plus some additional space for protocol overhead. const STATEMENT_RESPONSE_SIZE: u64 = MAX_CODE_SIZE as u64 + 10_000; @@ -130,7 +130,7 @@ impl Protocol { let p_name = self.into_protocol_name(); let (tx, rx) = mpsc::channel(self.get_channel_size()); let cfg = match self { - Protocol::ChunkFetching => RequestResponseConfig { + Protocol::ChunkFetchingV1 => RequestResponseConfig { name: p_name, max_request_size: 1_000, max_response_size: POV_RESPONSE_SIZE as u64 * 3, @@ -138,7 +138,7 @@ impl Protocol { request_timeout: CHUNK_REQUEST_TIMEOUT, inbound_queue: Some(tx), }, - Protocol::CollationFetching => RequestResponseConfig { + Protocol::CollationFetchingV1 => RequestResponseConfig { name: p_name, max_request_size: 1_000, max_response_size: POV_RESPONSE_SIZE, @@ -146,14 +146,14 @@ impl Protocol { request_timeout: POV_REQUEST_TIMEOUT_CONNECTED, inbound_queue: Some(tx), }, - Protocol::PoVFetching => RequestResponseConfig { + Protocol::PoVFetchingV1 => RequestResponseConfig { name: p_name, max_request_size: 1_000, max_response_size: POV_RESPONSE_SIZE, request_timeout: POV_REQUEST_TIMEOUT_CONNECTED, inbound_queue: Some(tx), }, - Protocol::AvailableDataFetching => RequestResponseConfig { + Protocol::AvailableDataFetchingV1 => RequestResponseConfig { name: p_name, max_request_size: 1_000, // Available data size is dominated by the PoV size. @@ -161,7 +161,7 @@ impl Protocol { request_timeout: POV_REQUEST_TIMEOUT_CONNECTED, inbound_queue: Some(tx), }, - Protocol::StatementFetching => RequestResponseConfig { + Protocol::StatementFetchingV1 => RequestResponseConfig { name: p_name, max_request_size: 1_000, // Available data size is dominated code size. @@ -178,7 +178,7 @@ impl Protocol { request_timeout: Duration::from_secs(1), inbound_queue: Some(tx), }, - Protocol::DisputeSending => RequestResponseConfig { + Protocol::DisputeSendingV1 => RequestResponseConfig { name: p_name, max_request_size: 1_000, /// Responses are just confirmation, in essence not even a bit. So 100 seems @@ -201,18 +201,18 @@ impl Protocol { // times (due to network delays), 100 seems big enough to accomodate for "bursts", // assuming we can service requests relatively quickly, which would need to be measured // as well. - Protocol::ChunkFetching => 100, + Protocol::ChunkFetchingV1 => 100, // 10 seems reasonable, considering group sizes of max 10 validators. - Protocol::CollationFetching => 10, + Protocol::CollationFetchingV1 => 10, // 10 seems reasonable, considering group sizes of max 10 validators. - Protocol::PoVFetching => 10, + Protocol::PoVFetchingV1 => 10, // Validators are constantly self-selecting to request available data which may lead // to constant load and occasional burstiness. - Protocol::AvailableDataFetching => 100, + Protocol::AvailableDataFetchingV1 => 100, // Our queue size approximation is how many blocks of the size of // a runtime we can transfer within a statements timeout, minus the requests we handle // in parallel. - Protocol::StatementFetching => { + Protocol::StatementFetchingV1 => { // We assume we can utilize up to 70% of the available bandwidth for statements. // This is just a guess/estimate, with the following considerations: If we are // faster than that, queue size will stay low anyway, even if not - requesters will @@ -233,7 +233,7 @@ impl Protocol { // Incoming requests can get bursty, we should also be able to handle them fast on // average, so something in the ballpark of 100 should be fine. Nodes will retry on // failure, so having a good value here is mostly about performance tuning. - Protocol::DisputeSending => 100, + Protocol::DisputeSendingV1 => 100, } } @@ -245,12 +245,12 @@ impl Protocol { /// Get the protocol name associated with each peer set as static str. pub const fn get_protocol_name_static(self) -> &'static str { match self { - Protocol::ChunkFetching => "/polkadot/req_chunk/1", - Protocol::CollationFetching => "/polkadot/req_collation/1", - Protocol::PoVFetching => "/polkadot/req_pov/1", - Protocol::AvailableDataFetching => "/polkadot/req_available_data/1", - Protocol::StatementFetching => "/polkadot/req_statement/1", - Protocol::DisputeSending => "/polkadot/send_dispute/1", + Protocol::ChunkFetchingV1 => "/polkadot/req_chunk/1", + Protocol::CollationFetchingV1 => "/polkadot/req_collation/1", + Protocol::PoVFetchingV1 => "/polkadot/req_pov/1", + Protocol::AvailableDataFetchingV1 => "/polkadot/req_available_data/1", + Protocol::StatementFetchingV1 => "/polkadot/req_statement/1", + Protocol::DisputeSendingV1 => "/polkadot/send_dispute/1", } } } diff --git a/node/network/protocol/src/request_response/outgoing.rs b/node/network/protocol/src/request_response/outgoing.rs index 1f71b4510ea8..a9353965a48f 100644 --- a/node/network/protocol/src/request_response/outgoing.rs +++ b/node/network/protocol/src/request_response/outgoing.rs @@ -29,29 +29,29 @@ use super::{v1, IsRequest, Protocol}; #[derive(Debug)] pub enum Requests { /// Request an availability chunk from a node. - ChunkFetching(OutgoingRequest), + ChunkFetchingV1(OutgoingRequest), /// Fetch a collation from a collator which previously announced it. - CollationFetching(OutgoingRequest), + CollationFetchingV1(OutgoingRequest), /// Fetch a PoV from a validator which previously sent out a seconded statement. - PoVFetching(OutgoingRequest), + PoVFetchingV1(OutgoingRequest), /// Request full available data from a node. - AvailableDataFetching(OutgoingRequest), + AvailableDataFetchingV1(OutgoingRequest), /// Requests for fetching large statements as part of statement distribution. - StatementFetching(OutgoingRequest), + StatementFetchingV1(OutgoingRequest), /// Requests for notifying about an ongoing dispute. - DisputeSending(OutgoingRequest), + DisputeSendingV1(OutgoingRequest), } impl Requests { /// Get the protocol this request conforms to. pub fn get_protocol(&self) -> Protocol { match self { - Self::ChunkFetching(_) => Protocol::ChunkFetching, - Self::CollationFetching(_) => Protocol::CollationFetching, - Self::PoVFetching(_) => Protocol::PoVFetching, - Self::AvailableDataFetching(_) => Protocol::AvailableDataFetching, - Self::StatementFetching(_) => Protocol::StatementFetching, - Self::DisputeSending(_) => Protocol::DisputeSending, + Self::ChunkFetchingV1(_) => Protocol::ChunkFetchingV1, + Self::CollationFetchingV1(_) => Protocol::CollationFetchingV1, + Self::PoVFetchingV1(_) => Protocol::PoVFetchingV1, + Self::AvailableDataFetchingV1(_) => Protocol::AvailableDataFetchingV1, + Self::StatementFetchingV1(_) => Protocol::StatementFetchingV1, + Self::DisputeSendingV1(_) => Protocol::DisputeSendingV1, } } @@ -64,12 +64,12 @@ impl Requests { /// contained in the `enum`. pub fn encode_request(self) -> (Protocol, OutgoingRequest>) { match self { - Self::ChunkFetching(r) => r.encode_request(), - Self::CollationFetching(r) => r.encode_request(), - Self::PoVFetching(r) => r.encode_request(), - Self::AvailableDataFetching(r) => r.encode_request(), - Self::StatementFetching(r) => r.encode_request(), - Self::DisputeSending(r) => r.encode_request(), + Self::ChunkFetchingV1(r) => r.encode_request(), + Self::CollationFetchingV1(r) => r.encode_request(), + Self::PoVFetchingV1(r) => r.encode_request(), + Self::AvailableDataFetchingV1(r) => r.encode_request(), + Self::StatementFetchingV1(r) => r.encode_request(), + Self::DisputeSendingV1(r) => r.encode_request(), } } } diff --git a/node/network/protocol/src/request_response/v1.rs b/node/network/protocol/src/request_response/v1.rs index 5474070c94ae..52458d6822d9 100644 --- a/node/network/protocol/src/request_response/v1.rs +++ b/node/network/protocol/src/request_response/v1.rs @@ -85,7 +85,7 @@ impl ChunkResponse { impl IsRequest for ChunkFetchingRequest { type Response = ChunkFetchingResponse; - const PROTOCOL: Protocol = Protocol::ChunkFetching; + const PROTOCOL: Protocol = Protocol::ChunkFetchingV1; } /// Request the advertised collation at that relay-parent. @@ -107,7 +107,7 @@ pub enum CollationFetchingResponse { impl IsRequest for CollationFetchingRequest { type Response = CollationFetchingResponse; - const PROTOCOL: Protocol = Protocol::CollationFetching; + const PROTOCOL: Protocol = Protocol::CollationFetchingV1; } /// Request the advertised collation at that relay-parent. @@ -130,7 +130,7 @@ pub enum PoVFetchingResponse { impl IsRequest for PoVFetchingRequest { type Response = PoVFetchingResponse; - const PROTOCOL: Protocol = Protocol::PoVFetching; + const PROTOCOL: Protocol = Protocol::PoVFetchingV1; } /// Request the entire available data for a candidate. @@ -162,7 +162,7 @@ impl From> for AvailableDataFetchingResponse { impl IsRequest for AvailableDataFetchingRequest { type Response = AvailableDataFetchingResponse; - const PROTOCOL: Protocol = Protocol::AvailableDataFetching; + const PROTOCOL: Protocol = Protocol::AvailableDataFetchingV1; } /// Request for fetching a large statement via request/response. @@ -188,7 +188,7 @@ pub enum StatementFetchingResponse { impl IsRequest for StatementFetchingRequest { type Response = StatementFetchingResponse; - const PROTOCOL: Protocol = Protocol::StatementFetching; + const PROTOCOL: Protocol = Protocol::StatementFetchingV1; } /// A dispute request. @@ -213,5 +213,5 @@ pub enum DisputeResponse { impl IsRequest for DisputeRequest { type Response = DisputeResponse; - const PROTOCOL: Protocol = Protocol::DisputeSending; + const PROTOCOL: Protocol = Protocol::DisputeSendingV1; } diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index 04a163f3883a..036fac399761 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -26,10 +26,11 @@ use error::{log_error, FatalResult, JfyiErrorResult}; use parity_scale_codec::Encode; use polkadot_node_network_protocol::{ + self as net_protocol, peer_set::{IsAuthority, PeerSet}, request_response::{v1 as request_v1, IncomingRequestReceiver}, v1::{self as protocol_v1, StatementMetadata}, - IfDisconnected, PeerId, UnifiedReputationChange as Rep, View, + IfDisconnected, PeerId, UnifiedReputationChange as Rep, Versioned, View, }; use polkadot_node_primitives::{SignedFullStatement, Statement, UncheckedSignedFullStatement}; use polkadot_node_subsystem_util::{self as util, rand, MIN_GOSSIP_PEERS}; @@ -961,7 +962,7 @@ fn statement_message( relay_parent: Hash, statement: SignedFullStatement, metrics: &Metrics, -) -> protocol_v1::ValidationProtocol { +) -> net_protocol::VersionedValidationProtocol { let (is_large, size) = is_statement_large(&statement); if let Some(size) = size { metrics.on_created_message(size); @@ -978,7 +979,7 @@ fn statement_message( protocol_v1::StatementDistributionMessage::Statement(relay_parent, statement.into()) }; - protocol_v1::ValidationProtocol::StatementDistribution(msg) + protocol_v1::ValidationProtocol::StatementDistribution(msg).into() } /// Check whether a statement should be treated as large statement. @@ -1603,12 +1604,12 @@ async fn handle_network_update( recent_outdated_heads: &RecentOutdatedHeads, ctx: &mut (impl SubsystemContext + overseer::SubsystemContext), req_sender: &mpsc::Sender, - update: NetworkBridgeEvent, + update: NetworkBridgeEvent, metrics: &Metrics, rng: &mut impl rand::Rng, ) { match update { - NetworkBridgeEvent::PeerConnected(peer, role, maybe_authority) => { + NetworkBridgeEvent::PeerConnected(peer, role, _, maybe_authority) => { gum::trace!(target: LOG_TARGET, ?peer, ?role, "Peer connected"); peers.insert( peer, @@ -1660,7 +1661,7 @@ async fn handle_network_update( } } }, - NetworkBridgeEvent::PeerMessage(peer, message) => { + NetworkBridgeEvent::PeerMessage(peer, Versioned::V1(message)) => { handle_incoming_message_and_circulate( peer, gossip_peers, @@ -2056,7 +2057,7 @@ impl StatementDistributionSubsystem { ) .await; }, - StatementDistributionMessage::NetworkBridgeUpdateV1(event) => { + StatementDistributionMessage::NetworkBridgeUpdate(event) => { handle_network_update( peers, gossip_peers, diff --git a/node/network/statement-distribution/src/requester.rs b/node/network/statement-distribution/src/requester.rs index 5cff21117a8d..9feaeeb5136d 100644 --- a/node/network/statement-distribution/src/requester.rs +++ b/node/network/statement-distribution/src/requester.rs @@ -107,7 +107,7 @@ pub async fn fetch( let (outgoing, pending_response) = OutgoingRequest::new(Recipient::Peer(peer), req.clone()); if let Err(err) = sender - .feed(RequesterMessage::SendRequest(Requests::StatementFetching(outgoing))) + .feed(RequesterMessage::SendRequest(Requests::StatementFetchingV1(outgoing))) .await { gum::info!( diff --git a/node/network/statement-distribution/src/tests.rs b/node/network/statement-distribution/src/tests.rs index c20aa3dccece..28fade444096 100644 --- a/node/network/statement-distribution/src/tests.rs +++ b/node/network/statement-distribution/src/tests.rs @@ -771,23 +771,23 @@ fn receiving_from_one_sends_to_another_and_to_candidate_backing() { // notify of peers and view handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, None), + msg: StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, 1, None), ), }) .await; handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None), + msg: StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, 1, None), ), }) .await; handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a]), ), }) @@ -795,7 +795,7 @@ fn receiving_from_one_sends_to_another_and_to_candidate_backing() { handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a]), ), }) @@ -830,13 +830,13 @@ fn receiving_from_one_sends_to_another_and_to_candidate_backing() { handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerMessage( peer_a.clone(), - protocol_v1::StatementDistributionMessage::Statement( + Versioned::V1(protocol_v1::StatementDistributionMessage::Statement( hash_a, statement.clone().into(), - ), + )), ), ), }) @@ -861,9 +861,9 @@ fn receiving_from_one_sends_to_another_and_to_candidate_backing() { AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage( recipients, - protocol_v1::ValidationProtocol::StatementDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::StatementDistribution( protocol_v1::StatementDistributionMessage::Statement(r, s) - ), + )), ) ) => { assert_eq!(recipients, vec![peer_b.clone()]); @@ -964,10 +964,11 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( // notify of peers and view handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerConnected( peer_a.clone(), ObservedRole::Full, + 1, Some(HashSet::from([Sr25519Keyring::Alice.public().into()])), ), ), @@ -976,10 +977,11 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerConnected( peer_b.clone(), ObservedRole::Full, + 1, Some(HashSet::from([Sr25519Keyring::Bob.public().into()])), ), ), @@ -987,10 +989,11 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( .await; handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerConnected( peer_c.clone(), ObservedRole::Full, + 1, Some(HashSet::from([Sr25519Keyring::Charlie.public().into()])), ), ), @@ -998,15 +1001,20 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( .await; handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_bad.clone(), ObservedRole::Full, None), + msg: StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerConnected( + peer_bad.clone(), + ObservedRole::Full, + 1, + None, + ), ), }) .await; handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a]), ), }) @@ -1014,21 +1022,21 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a]), ), }) .await; handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerViewChange(peer_c.clone(), view![hash_a]), ), }) .await; handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerViewChange(peer_bad.clone(), view![hash_a]), ), }) @@ -1066,10 +1074,12 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerMessage( peer_a.clone(), - protocol_v1::StatementDistributionMessage::LargeStatement(metadata.clone()), + Versioned::V1(protocol_v1::StatementDistributionMessage::LargeStatement( + metadata.clone(), + )), ), ), }) @@ -1084,7 +1094,7 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( ) => { let reqs = reqs.pop().unwrap(); let outgoing = match reqs { - Requests::StatementFetching(outgoing) => outgoing, + Requests::StatementFetchingV1(outgoing) => outgoing, _ => panic!("Unexpected request"), }; let req = outgoing.payload; @@ -1102,10 +1112,12 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerMessage( peer_c.clone(), - protocol_v1::StatementDistributionMessage::LargeStatement(metadata.clone()), + Versioned::V1(protocol_v1::StatementDistributionMessage::LargeStatement( + metadata.clone(), + )), ), ), }) @@ -1114,10 +1126,12 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( // Malicious peer: handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerMessage( peer_bad.clone(), - protocol_v1::StatementDistributionMessage::LargeStatement(metadata.clone()), + Versioned::V1(protocol_v1::StatementDistributionMessage::LargeStatement( + metadata.clone(), + )), ), ), }) @@ -1133,7 +1147,7 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( ) => { let reqs = reqs.pop().unwrap(); let outgoing = match reqs { - Requests::StatementFetching(outgoing) => outgoing, + Requests::StatementFetchingV1(outgoing) => outgoing, _ => panic!("Unexpected request"), }; let req = outgoing.payload; @@ -1153,7 +1167,7 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( ) => { let reqs = reqs.pop().unwrap(); let outgoing = match reqs { - Requests::StatementFetching(outgoing) => outgoing, + Requests::StatementFetchingV1(outgoing) => outgoing, _ => panic!("Unexpected request"), }; let req = outgoing.payload; @@ -1174,7 +1188,7 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( ) => { let reqs = reqs.pop().unwrap(); let outgoing = match reqs { - Requests::StatementFetching(outgoing) => outgoing, + Requests::StatementFetchingV1(outgoing) => outgoing, _ => panic!("Unexpected request"), }; let req = outgoing.payload; @@ -1209,7 +1223,7 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( ) => { let reqs = reqs.pop().unwrap(); let outgoing = match reqs { - Requests::StatementFetching(outgoing) => outgoing, + Requests::StatementFetchingV1(outgoing) => outgoing, _ => panic!("Unexpected request"), }; let req = outgoing.payload; @@ -1230,7 +1244,7 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( ) => { let reqs = reqs.pop().unwrap(); let outgoing = match reqs { - Requests::StatementFetching(outgoing) => outgoing, + Requests::StatementFetchingV1(outgoing) => outgoing, _ => panic!("Unexpected request"), }; let req = outgoing.payload; @@ -1277,9 +1291,9 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage( mut recipients, - protocol_v1::ValidationProtocol::StatementDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::StatementDistribution( protocol_v1::StatementDistributionMessage::LargeStatement(meta) - ), + )), ) ) => { gum::debug!( @@ -1463,10 +1477,11 @@ fn share_prioritizes_backing_group() { for (peer, pair) in dummy_peers.clone().into_iter().zip(dummy_pairs) { handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerConnected( peer, ObservedRole::Full, + 1, Some(HashSet::from([pair.public().into()])), ), ), @@ -1475,7 +1490,7 @@ fn share_prioritizes_backing_group() { handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerViewChange(peer, view![hash_a]), ), }) @@ -1485,10 +1500,11 @@ fn share_prioritizes_backing_group() { // notify of peers and view handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerConnected( peer_a.clone(), ObservedRole::Full, + 1, Some(HashSet::from([Sr25519Keyring::Alice.public().into()])), ), ), @@ -1496,10 +1512,11 @@ fn share_prioritizes_backing_group() { .await; handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerConnected( peer_b.clone(), ObservedRole::Full, + 1, Some(HashSet::from([Sr25519Keyring::Bob.public().into()])), ), ), @@ -1507,10 +1524,11 @@ fn share_prioritizes_backing_group() { .await; handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerConnected( peer_c.clone(), ObservedRole::Full, + 1, Some(HashSet::from([Sr25519Keyring::Charlie.public().into()])), ), ), @@ -1518,17 +1536,23 @@ fn share_prioritizes_backing_group() { .await; handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_bad.clone(), ObservedRole::Full, None), + msg: StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerConnected( + peer_bad.clone(), + ObservedRole::Full, + 1, + None, + ), ), }) .await; handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerConnected( peer_other_group.clone(), ObservedRole::Full, + 1, Some(HashSet::from([Sr25519Keyring::Dave.public().into()])), ), ), @@ -1537,7 +1561,7 @@ fn share_prioritizes_backing_group() { handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a]), ), }) @@ -1545,28 +1569,28 @@ fn share_prioritizes_backing_group() { handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a]), ), }) .await; handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerViewChange(peer_c.clone(), view![hash_a]), ), }) .await; handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerViewChange(peer_bad.clone(), view![hash_a]), ), }) .await; handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerViewChange(peer_other_group.clone(), view![hash_a]), ), }) @@ -1614,9 +1638,9 @@ fn share_prioritizes_backing_group() { AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage( mut recipients, - protocol_v1::ValidationProtocol::StatementDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::StatementDistribution( protocol_v1::StatementDistributionMessage::LargeStatement(meta) - ), + )), ) ) => { gum::debug!( @@ -1746,10 +1770,11 @@ fn peer_cant_flood_with_large_statements() { // notify of peers and view handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerConnected( peer_a.clone(), ObservedRole::Full, + 1, Some(HashSet::from([Sr25519Keyring::Alice.public().into()])), ), ), @@ -1758,7 +1783,7 @@ fn peer_cant_flood_with_large_statements() { handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a]), ), }) @@ -1795,11 +1820,13 @@ fn peer_cant_flood_with_large_statements() { for _ in 0..MAX_LARGE_STATEMENTS_PER_SENDER + 1 { handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerMessage( peer_a.clone(), - protocol_v1::StatementDistributionMessage::LargeStatement( - metadata.clone(), + Versioned::V1( + protocol_v1::StatementDistributionMessage::LargeStatement( + metadata.clone(), + ), ), ), ), @@ -1819,7 +1846,7 @@ fn peer_cant_flood_with_large_statements() { )) => { let reqs = reqs.pop().unwrap(); let outgoing = match reqs { - Requests::StatementFetching(outgoing) => outgoing, + Requests::StatementFetchingV1(outgoing) => outgoing, _ => panic!("Unexpected request"), }; let req = outgoing.payload; @@ -1949,14 +1976,19 @@ fn handle_multiple_seconded_statements() { for peer in all_peers.iter() { handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), + msg: StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerConnected( + peer.clone(), + ObservedRole::Full, + 1, + None, + ), ), }) .await; handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerViewChange(peer.clone(), view![relay_parent_hash]), ), }) @@ -1991,7 +2023,7 @@ fn handle_multiple_seconded_statements() { handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::NewGossipTopology(gossip_topology), ), }) @@ -2027,13 +2059,13 @@ fn handle_multiple_seconded_statements() { // `PeerA` sends a `Seconded` message handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerMessage( peer_a.clone(), - protocol_v1::StatementDistributionMessage::Statement( + Versioned::V1(protocol_v1::StatementDistributionMessage::Statement( relay_parent_hash, statement.clone().into(), - ), + )), ), ), }) @@ -2065,9 +2097,9 @@ fn handle_multiple_seconded_statements() { AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage( recipients, - protocol_v1::ValidationProtocol::StatementDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::StatementDistribution( protocol_v1::StatementDistributionMessage::Statement(r, s) - ), + )), ) ) => { assert!(!recipients.contains(&peer_b)); @@ -2079,13 +2111,13 @@ fn handle_multiple_seconded_statements() { // `PeerB` sends a `Seconded` message: valid but known handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerMessage( peer_b.clone(), - protocol_v1::StatementDistributionMessage::Statement( + Versioned::V1(protocol_v1::StatementDistributionMessage::Statement( relay_parent_hash, statement.clone().into(), - ), + )), ), ), }) @@ -2130,13 +2162,13 @@ fn handle_multiple_seconded_statements() { // `PeerA` sends a `Valid` message handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerMessage( peer_a.clone(), - protocol_v1::StatementDistributionMessage::Statement( + Versioned::V1(protocol_v1::StatementDistributionMessage::Statement( relay_parent_hash, statement.clone().into(), - ), + )), ), ), }) @@ -2167,9 +2199,9 @@ fn handle_multiple_seconded_statements() { AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage( recipients, - protocol_v1::ValidationProtocol::StatementDistribution( + Versioned::V1(protocol_v1::ValidationProtocol::StatementDistribution( protocol_v1::StatementDistributionMessage::Statement(r, s) - ), + )), ) ) => { assert!(!recipients.contains(&peer_b)); @@ -2181,13 +2213,13 @@ fn handle_multiple_seconded_statements() { // `PeerB` sends a `Valid` message handle .send(FromOverseer::Communication { - msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + msg: StatementDistributionMessage::NetworkBridgeUpdate( NetworkBridgeEvent::PeerMessage( peer_b.clone(), - protocol_v1::StatementDistributionMessage::Statement( + Versioned::V1(protocol_v1::StatementDistributionMessage::Statement( relay_parent_hash, statement.clone().into(), - ), + )), ), ), }) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index b5fd1fba9b78..08fdda22bc72 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -77,7 +77,7 @@ use polkadot_primitives::{ }; use sp_api::{ApiExt, ProvideRuntimeApi}; -use polkadot_node_network_protocol::v1 as protocol_v1; +use polkadot_node_network_protocol::VersionedValidationProtocol; use polkadot_node_subsystem_types::messages::{ ApprovalDistributionMessage, ApprovalVotingMessage, AvailabilityDistributionMessage, AvailabilityRecoveryMessage, AvailabilityStoreMessage, BitfieldDistributionMessage, @@ -414,7 +414,7 @@ pub async fn forward_events>(client: Arc

, mut hand event=Event, signal=OverseerSignal, error=SubsystemError, - network=NetworkBridgeEvent, + network=NetworkBridgeEvent, )] pub struct Overseer { #[subsystem(no_dispatch, CandidateValidationMessage)] diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index 2e3dca1cf110..2d486b75d883 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -848,7 +848,7 @@ fn test_network_bridge_event() -> NetworkBridgeEvent { } fn test_statement_distribution_msg() -> StatementDistributionMessage { - StatementDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event()) + StatementDistributionMessage::NetworkBridgeUpdate(test_network_bridge_event()) } fn test_availability_recovery_msg() -> AvailabilityRecoveryMessage { @@ -862,7 +862,7 @@ fn test_availability_recovery_msg() -> AvailabilityRecoveryMessage { } fn test_bitfield_distribution_msg() -> BitfieldDistributionMessage { - BitfieldDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event()) + BitfieldDistributionMessage::NetworkBridgeUpdate(test_network_bridge_event()) } fn test_provisioner_msg() -> ProvisionerMessage { diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index 756c46935be8..b4eef7c8c199 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -29,7 +29,7 @@ use thiserror::Error; pub use sc_network::IfDisconnected; use polkadot_node_network_protocol::{ - peer_set::PeerSet, request_response::Requests, v1 as protocol_v1, PeerId, + self as net_protocol, peer_set::PeerSet, request_response::Requests, PeerId, UnifiedReputationChange, }; use polkadot_node_primitives::{ @@ -201,7 +201,7 @@ pub enum CollatorProtocolMessage { ReportCollator(CollatorId), /// Get a network bridge update. #[from] - NetworkBridgeUpdateV1(NetworkBridgeEvent), + NetworkBridgeUpdate(NetworkBridgeEvent), /// We recommended a particular candidate to be seconded, but it was invalid; penalize the collator. /// /// The hash is the relay parent. @@ -328,20 +328,20 @@ pub enum NetworkBridgeMessage { DisconnectPeer(PeerId, PeerSet), /// Send a message to one or more peers on the validation peer-set. - SendValidationMessage(Vec, protocol_v1::ValidationProtocol), + SendValidationMessage(Vec, net_protocol::VersionedValidationProtocol), /// Send a message to one or more peers on the collation peer-set. - SendCollationMessage(Vec, protocol_v1::CollationProtocol), + SendCollationMessage(Vec, net_protocol::VersionedCollationProtocol), /// Send a batch of validation messages. /// /// NOTE: Messages will be processed in order (at least statement distribution relies on this). - SendValidationMessages(Vec<(Vec, protocol_v1::ValidationProtocol)>), + SendValidationMessages(Vec<(Vec, net_protocol::VersionedValidationProtocol)>), /// Send a batch of collation messages. /// /// NOTE: Messages will be processed in order. - SendCollationMessages(Vec<(Vec, protocol_v1::CollationProtocol)>), + SendCollationMessages(Vec<(Vec, net_protocol::VersionedCollationProtocol)>), /// Send requests via substrate request/response. /// Second parameter, tells what to do if we are not yet connected to the peer. @@ -455,7 +455,7 @@ pub enum BitfieldDistributionMessage { /// Event from the network bridge. #[from] - NetworkBridgeUpdateV1(NetworkBridgeEvent), + NetworkBridgeUpdate(NetworkBridgeEvent), } impl BitfieldDistributionMessage { @@ -463,7 +463,7 @@ impl BitfieldDistributionMessage { pub fn relay_parent(&self) -> Option { match self { Self::DistributeBitfield(hash, _) => Some(*hash), - Self::NetworkBridgeUpdateV1(_) => None, + Self::NetworkBridgeUpdate(_) => None, } } } @@ -719,7 +719,7 @@ pub enum StatementDistributionMessage { Share(Hash, SignedFullStatement), /// Event from the network bridge. #[from] - NetworkBridgeUpdateV1(NetworkBridgeEvent), + NetworkBridgeUpdate(NetworkBridgeEvent), } /// This data becomes intrinsics or extrinsics which should be included in a future relay chain block. @@ -913,7 +913,7 @@ pub enum ApprovalDistributionMessage { DistributeApproval(IndirectSignedApprovalVote), /// An update from the network bridge. #[from] - NetworkBridgeUpdateV1(NetworkBridgeEvent), + NetworkBridgeUpdate(NetworkBridgeEvent), } /// Message to the Gossip Support subsystem. @@ -921,7 +921,7 @@ pub enum ApprovalDistributionMessage { pub enum GossipSupportMessage { /// Dummy constructor, so we can receive networking events. #[from] - NetworkBridgeUpdateV1(NetworkBridgeEvent), + NetworkBridgeUpdate(NetworkBridgeEvent), } /// PVF checker message. diff --git a/node/subsystem-types/src/messages/network_bridge_event.rs b/node/subsystem-types/src/messages/network_bridge_event.rs index 91facbc6fe1f..97c71b7c483a 100644 --- a/node/subsystem-types/src/messages/network_bridge_event.rs +++ b/node/subsystem-types/src/messages/network_bridge_event.rs @@ -21,7 +21,7 @@ use std::{ pub use sc_network::{PeerId, ReputationChange}; -use polkadot_node_network_protocol::{ObservedRole, OurView, View, WrongVariant}; +use polkadot_node_network_protocol::{ObservedRole, OurView, ProtocolVersion, View, WrongVariant}; use polkadot_primitives::v2::{AuthorityDiscoveryId, SessionIndex, ValidatorIndex}; /// Information about a peer in the gossip topology for a session. @@ -49,7 +49,7 @@ pub struct NewGossipTopology { #[derive(Debug, Clone, PartialEq)] pub enum NetworkBridgeEvent { /// A peer has connected. - PeerConnected(PeerId, ObservedRole, Option>), + PeerConnected(PeerId, ObservedRole, ProtocolVersion, Option>), /// A peer has disconnected. PeerDisconnected(PeerId), @@ -92,13 +92,22 @@ impl NetworkBridgeEvent { pub fn focus<'a, T>(&'a self) -> Result, WrongVariant> where T: 'a + Clone, - &'a T: TryFrom<&'a M, Error = WrongVariant>, + T: TryFrom<&'a M, Error = WrongVariant>, { Ok(match *self { NetworkBridgeEvent::PeerMessage(ref peer, ref msg) => - NetworkBridgeEvent::PeerMessage(peer.clone(), <&'a T>::try_from(msg)?.clone()), - NetworkBridgeEvent::PeerConnected(ref peer, ref role, ref authority_id) => - NetworkBridgeEvent::PeerConnected(peer.clone(), role.clone(), authority_id.clone()), + NetworkBridgeEvent::PeerMessage(peer.clone(), T::try_from(msg)?), + NetworkBridgeEvent::PeerConnected( + ref peer, + ref role, + ref version, + ref authority_id, + ) => NetworkBridgeEvent::PeerConnected( + peer.clone(), + role.clone(), + *version, + authority_id.clone(), + ), NetworkBridgeEvent::PeerDisconnected(ref peer) => NetworkBridgeEvent::PeerDisconnected(peer.clone()), NetworkBridgeEvent::NewGossipTopology(ref topology) => diff --git a/roadmap/implementers-guide/src/node/approval/approval-distribution.md b/roadmap/implementers-guide/src/node/approval/approval-distribution.md index 9afc53c7e777..81c98afa16bf 100644 --- a/roadmap/implementers-guide/src/node/approval/approval-distribution.md +++ b/roadmap/implementers-guide/src/node/approval/approval-distribution.md @@ -39,7 +39,7 @@ Input: - `ApprovalDistributionMessage::NewBlocks` - `ApprovalDistributionMessage::DistributeAssignment` - `ApprovalDistributionMessage::DistributeApproval` - - `ApprovalDistributionMessage::NetworkBridgeUpdateV1` + - `ApprovalDistributionMessage::NetworkBridgeUpdate` - `OverseerSignal::BlockFinalized` Output: diff --git a/roadmap/implementers-guide/src/node/availability/availability-recovery.md b/roadmap/implementers-guide/src/node/availability/availability-recovery.md index d7d822188ccb..48fb0fb1ca19 100644 --- a/roadmap/implementers-guide/src/node/availability/availability-recovery.md +++ b/roadmap/implementers-guide/src/node/availability/availability-recovery.md @@ -10,7 +10,7 @@ This version of the availability recovery subsystem is based off of direct conne Input: -- `NetworkBridgeUpdateV1(update)` +- `NetworkBridgeUpdate(update)` - `AvailabilityRecoveryMessage::RecoverAvailableData(candidate, session, backing_group, response)` Output: diff --git a/roadmap/implementers-guide/src/node/backing/statement-distribution.md b/roadmap/implementers-guide/src/node/backing/statement-distribution.md index fae99aa1d266..39ea1c630d31 100644 --- a/roadmap/implementers-guide/src/node/backing/statement-distribution.md +++ b/roadmap/implementers-guide/src/node/backing/statement-distribution.md @@ -14,7 +14,7 @@ Input: Output: - `NetworkBridge::SendMessage(PeerId, message)` -- `NetworkBridge::SendRequests(StatementFetching)` +- `NetworkBridge::SendRequests(StatementFetchingV1)` - `NetworkBridge::ReportPeer(PeerId, cost_or_benefit)` ## Functionality @@ -86,7 +86,7 @@ example. For this reason, there exists a `LargeStatement` constructor for the of a statement. The actual candidate data is not included. This message type is used whenever a message is deemed large. The receiver of such a message needs to request the actual payload via request/response by means of a -`StatementFetching` request. +`StatementFetchingV1` request. This is necessary as distribution of a large payload (mega bytes) via gossip would make the network collapse and timely distribution of statements would no diff --git a/roadmap/implementers-guide/src/node/collators/collator-protocol.md b/roadmap/implementers-guide/src/node/collators/collator-protocol.md index ae4858aa243e..09265a534847 100644 --- a/roadmap/implementers-guide/src/node/collators/collator-protocol.md +++ b/roadmap/implementers-guide/src/node/collators/collator-protocol.md @@ -104,7 +104,7 @@ The protocol tracks advertisements received and the source of the advertisement. As a validator, we will handle requests from other subsystems to fetch a collation on a specific `ParaId` and relay-parent. These requests are made with the request response protocol `CollationFetchingRequest` request. To do so, we need to first check if we have already gathered a collation on that `ParaId` and relay-parent. If not, we need to select one of the advertisements and issue a request for it. If we've already issued a request, we shouldn't issue another one until the first has returned. -When acting on an advertisement, we issue a `Requests::CollationFetching`. However, we only request one collation at a time per relay parent. This reduces the bandwidth requirements and as we can second only one candidate per relay parent, the others are probably not required anyway. If the request times out, we need to note the collator as being unreliable and reduce its priority relative to other collators. +When acting on an advertisement, we issue a `Requests::CollationFetchingV1`. However, we only request one collation at a time per relay parent. This reduces the bandwidth requirements and as we can second only one candidate per relay parent, the others are probably not required anyway. If the request times out, we need to note the collator as being unreliable and reduce its priority relative to other collators. As a validator, once the collation has been fetched some other subsystem will inspect and do deeper validation of the collation. The subsystem will report to this subsystem with a [`CollatorProtocolMessage`][CPM]`::ReportCollator`. In that case, if we are connected directly to the collator, we apply a cost to the `PeerId` associated with the collator and potentially disconnect or blacklist it. If the collation is seconded, we notify the collator and apply a benefit to the `PeerId` associated with the collator. diff --git a/roadmap/implementers-guide/src/node/subsystems-and-jobs.md b/roadmap/implementers-guide/src/node/subsystems-and-jobs.md index bffee1ffd2b1..6e3b4cd2d166 100644 --- a/roadmap/implementers-guide/src/node/subsystems-and-jobs.md +++ b/roadmap/implementers-guide/src/node/subsystems-and-jobs.md @@ -103,11 +103,11 @@ digraph { coll_prot -> net_brdg [arrowhead = "onormal", label = "RequestCollation"] coll_prot -> cand_sel [arrowhead = "onormal", label = "Collation"] - net_brdg -> avail_dist [arrowhead = "onormal", label = "NetworkBridgeUpdateV1"] - net_brdg -> bitf_dist [arrowhead = "onormal", label = "NetworkBridgeUpdateV1"] - net_brdg -> pov_dist [arrowhead = "onormal", label = "NetworkBridgeUpdateV1"] - net_brdg -> stmt_dist [arrowhead = "onormal", label = "NetworkBridgeUpdateV1"] - net_brdg -> coll_prot [arrowhead = "onormal", label = "NetworkBridgeUpdateV1"] + net_brdg -> avail_dist [arrowhead = "onormal", label = "NetworkBridgeUpdate"] + net_brdg -> bitf_dist [arrowhead = "onormal", label = "NetworkBridgeUpdate"] + net_brdg -> pov_dist [arrowhead = "onormal", label = "NetworkBridgeUpdate"] + net_brdg -> stmt_dist [arrowhead = "onormal", label = "NetworkBridgeUpdate"] + net_brdg -> coll_prot [arrowhead = "onormal", label = "NetworkBridgeUpdate"] pov_dist -> net_brdg [arrowhead = "onormal", label = "SendValidationMessage"] pov_dist -> net_brdg [arrowhead = "onormal", label = "ReportPeer"] @@ -264,7 +264,7 @@ sequenceDiagram Note right of NB: Bridge sends validation message to all appropriate peers else On receipt of peer validation message - NB ->> SD: NetworkBridgeUpdateV1 + NB ->> SD: NetworkBridgeUpdate % fn handle_incoming_message alt if we aren't already aware of the relay parent for this statement diff --git a/roadmap/implementers-guide/src/node/utility/network-bridge.md b/roadmap/implementers-guide/src/node/utility/network-bridge.md index f50525793fe1..3245772d9d8d 100644 --- a/roadmap/implementers-guide/src/node/utility/network-bridge.md +++ b/roadmap/implementers-guide/src/node/utility/network-bridge.md @@ -21,10 +21,10 @@ Input: [`NetworkBridgeMessage`][NBM] Output: - - [`ApprovalDistributionMessage`][AppD]`::NetworkBridgeUpdateV1` - - [`BitfieldDistributionMessage`][BitD]`::NetworkBridgeUpdateV1` - - [`CollatorProtocolMessage`][CollP]`::NetworkBridgeUpdateV1` - - [`StatementDistributionMessage`][StmtD]`::NetworkBridgeUpdateV1` + - [`ApprovalDistributionMessage`][AppD]`::NetworkBridgeUpdate` + - [`BitfieldDistributionMessage`][BitD]`::NetworkBridgeUpdate` + - [`CollatorProtocolMessage`][CollP]`::NetworkBridgeUpdate` + - [`StatementDistributionMessage`][StmtD]`::NetworkBridgeUpdate` ## Functionality @@ -108,7 +108,7 @@ Map the message onto the corresponding [Event Handler](#event-handlers) based on ### `NewGossipTopology` -- Map all `AuthorityDiscoveryId`s to `PeerId`s and issue a corresponding `NetworkBridgeUpdateV1` +- Map all `AuthorityDiscoveryId`s to `PeerId`s and issue a corresponding `NetworkBridgeUpdate` to all validation subsystems. ## Event Handlers @@ -117,13 +117,13 @@ Network bridge event handlers are the intended recipients of particular network ### Validation V1 -* `ApprovalDistributionV1Message -> ApprovalDistributionMessage::NetworkBridgeUpdateV1` -* `BitfieldDistributionV1Message -> BitfieldDistributionMessage::NetworkBridgeUpdateV1` -* `StatementDistributionV1Message -> StatementDistributionMessage::NetworkBridgeUpdateV1` +* `ApprovalDistributionV1Message -> ApprovalDistributionMessage::NetworkBridgeUpdate` +* `BitfieldDistributionV1Message -> BitfieldDistributionMessage::NetworkBridgeUpdate` +* `StatementDistributionV1Message -> StatementDistributionMessage::NetworkBridgeUpdate` ### Collation V1 -* `CollatorProtocolV1Message -> CollatorProtocolMessage::NetworkBridgeUpdateV1` +* `CollatorProtocolV1Message -> CollatorProtocolMessage::NetworkBridgeUpdate` [NBM]: ../../types/overseer-protocol.md#network-bridge-message [AppD]: ../../types/overseer-protocol.md#approval-distribution-message diff --git a/roadmap/implementers-guide/src/types/network.md b/roadmap/implementers-guide/src/types/network.md index 34875816ca34..0d09a682cff2 100644 --- a/roadmap/implementers-guide/src/types/network.md +++ b/roadmap/implementers-guide/src/types/network.md @@ -161,7 +161,7 @@ struct TopologyPeerInfo { enum NetworkBridgeEvent { /// A peer with given ID is now connected. - PeerConnected(PeerId, ObservedRole, Option>), + PeerConnected(PeerId, ObservedRole, ProtocolVersion, Option>), /// A peer with given ID is now disconnected. PeerDisconnected(PeerId), /// Our neighbors in the new gossip topology. diff --git a/roadmap/implementers-guide/src/types/overseer-protocol.md b/roadmap/implementers-guide/src/types/overseer-protocol.md index 4180f0e20d01..f10537fcf666 100644 --- a/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -193,7 +193,7 @@ enum ApprovalDistributionMessage { /// the message. DistributeApproval(IndirectSignedApprovalVote), /// An update from the network bridge. - NetworkBridgeUpdateV1(NetworkBridgeEvent), + NetworkBridgeUpdate(NetworkBridgeEvent), } ``` @@ -284,7 +284,7 @@ enum BitfieldDistributionMessage { /// The bitfield distribution subsystem will assume this is indeed correctly signed. DistributeBitfield(relay_parent, SignedAvailabilityBitfield), /// Receive a network bridge update. - NetworkBridgeUpdateV1(NetworkBridgeEvent), + NetworkBridgeUpdate(NetworkBridgeEvent), } ``` @@ -643,7 +643,7 @@ enum PoVDistributionMessage { /// The PoV should correctly hash to the PoV hash mentioned in the CandidateDescriptor DistributePoV(Hash, CandidateDescriptor, PoV), /// An update from the network bridge. - NetworkBridgeUpdateV1(NetworkBridgeEvent), + NetworkBridgeUpdate(NetworkBridgeEvent), } ``` @@ -747,7 +747,7 @@ This is a network protocol that receives messages of type [`StatementDistributio ```rust enum StatementDistributionMessage { /// An update from the network bridge. - NetworkBridgeUpdateV1(NetworkBridgeEvent), + NetworkBridgeUpdate(NetworkBridgeEvent), /// We have validated a candidate and want to share our judgment with our peers. /// The hash is the relay parent. /// diff --git a/runtime/parachains/src/inclusion/tests.rs b/runtime/parachains/src/inclusion/tests.rs index 8de44a93efd6..afa067053d6c 100644 --- a/runtime/parachains/src/inclusion/tests.rs +++ b/runtime/parachains/src/inclusion/tests.rs @@ -1942,5 +1942,3 @@ fn session_change_wipes() { assert!(>::iter().collect::>().is_empty()); }); } - -// TODO [now]: test `collect_disputed`