Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Poll the substream validation before polling Notifications #13934

Merged
merged 3 commits into from
Apr 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 16 additions & 28 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use sc_utils::mpsc::TracingUnboundedSender;
use sp_runtime::traits::Block as BlockT;

use std::{
collections::{HashMap, HashSet, VecDeque},
collections::{HashMap, HashSet},
future::Future,
iter,
pin::Pin,
Expand Down Expand Up @@ -77,8 +77,6 @@ type PendingSyncSubstreamValidation =

// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT> {
/// Pending list of messages to return from `poll` as a priority.
pending_messages: VecDeque<CustomMessageOutcome>,
/// Used to report reputation changes.
peerset_handle: sc_peerset::PeersetHandle,
/// Handles opening the unique substream and sending and receiving raw messages.
Expand Down Expand Up @@ -181,7 +179,6 @@ impl<B: BlockT> Protocol<B> {
};

let protocol = Self {
pending_messages: VecDeque::new(),
peerset_handle: peerset_handle.clone(),
behaviour,
notification_protocols: iter::once(block_announces_protocol.notifications_protocol)
Expand Down Expand Up @@ -409,8 +406,21 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
cx: &mut std::task::Context,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message))
while let Poll::Ready(Some(validation_result)) =
self.sync_substream_validations.poll_next_unpin(cx)
{
match validation_result {
Ok((peer, roles)) => {
self.peers.insert(peer, roles);
},
Err(peer) => {
log::debug!(
target: "sub-libp2p",
"`SyncingEngine` rejected stream"
);
self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC);
},
}
}

let event = match self.behaviour.poll(cx, params) {
Expand All @@ -430,23 +440,6 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
return Poll::Ready(NetworkBehaviourAction::CloseConnection { peer_id, connection }),
};

while let Poll::Ready(Some(validation_result)) =
self.sync_substream_validations.poll_next_unpin(cx)
{
match validation_result {
Ok((peer, roles)) => {
self.peers.insert(peer, roles);
},
Err(peer) => {
log::debug!(
target: "sub-libp2p",
"`SyncingEngine` rejected stream"
);
self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC);
},
}
}

let outcome = match event {
altonen marked this conversation as resolved.
Show resolved Hide resolved
NotificationsOut::CustomProtocolOpen {
peer_id,
Expand Down Expand Up @@ -509,7 +502,6 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
) {
Ok(handshake) => {
let roles = handshake.roles;
self.peers.insert(peer_id, roles);
altonen marked this conversation as resolved.
Show resolved Hide resolved

let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(
Expand Down Expand Up @@ -644,10 +636,6 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(outcome))
}

if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message))
}

// This block can only be reached if an event was pulled from the behaviour and that
// resulted in `CustomMessageOutcome::None`. Since there might be another pending
// message from the behaviour, the task is scheduled again.
Expand Down