Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Showing 6 changed files with 47 additions and 617 deletions.
100 changes: 1 addition & 99 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ use crate::{
utils::{interval, LruHashSet},
};

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use codec::{Decode, DecodeAll, Encode};
use futures::{channel::oneshot, prelude::*};
use generic_proto::{GenericProto, GenericProtoOut};
@@ -75,11 +75,6 @@ const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024;
// Must be equal to `max(MAX_BLOCK_ANNOUNCE_SIZE, MAX_TRANSACTIONS_SIZE)`.
pub(crate) const BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE: u64 = 16 * 1024 * 1024;

/// Current protocol version.
pub(crate) const CURRENT_VERSION: u32 = 6;
/// Lowest version we support
pub(crate) const MIN_VERSION: u32 = 3;

/// Identifier of the peerset for the block announces protocol.
const HARDCODED_PEERSETS_SYNC: sc_peerset::SetId = sc_peerset::SetId::from(0);
/// Number of hardcoded peersets (the constants right above). Any set whose identifier is equal or
@@ -254,26 +249,6 @@ impl<B: BlockT> BlockAnnouncesHandshake<B> {
}
}

/// Builds a SCALE-encoded "Status" message to send as handshake for the legacy protocol.
fn build_status_message<B: BlockT>(
protocol_config: &ProtocolConfig,
best_number: NumberFor<B>,
best_hash: B::Hash,
genesis_hash: B::Hash,
) -> Vec<u8> {
let status = message::generic::Status {
version: CURRENT_VERSION,
min_supported_version: MIN_VERSION,
genesis_hash,
roles: protocol_config.roles.into(),
best_number,
best_hash,
chain_status: Vec::new(), // TODO: find a way to make this backwards-compatible
};

Message::<B>::Status(status).encode()
}

impl<B: BlockT> Protocol<B> {
/// Create a new instance.
pub fn new(
@@ -375,8 +350,6 @@ impl<B: BlockT> Protocol<B> {
});

let behaviour = {
let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>();

let best_number = info.best_number;
let best_hash = info.best_hash;
let genesis_hash = info.genesis_hash;
@@ -389,9 +362,6 @@ impl<B: BlockT> Protocol<B> {
).encode();

GenericProto::new(
protocol_id.clone(),
versions,
build_status_message::<B>(&config, best_number, best_hash, genesis_hash),
peerset,
iter::once((block_announces_protocol, block_announces_handshake, MAX_BLOCK_ANNOUNCE_SIZE))
.chain(network_config.extra_sets.iter()
@@ -511,9 +481,6 @@ impl<B: BlockT> Protocol<B> {

self.sync.update_chain_info(&hash, number);

self.behaviour.set_legacy_handshake_message(
build_status_message::<B>(&self.config, number, hash, self.genesis_hash),
);
self.behaviour.set_notif_protocol_handshake(
HARDCODED_PEERSETS_SYNC,
BlockAnnouncesHandshake::<B>::build(
@@ -539,64 +506,6 @@ impl<B: BlockT> Protocol<B> {
self.peers.iter().map(|(id, peer)| (id, &peer.info))
}

fn on_custom_message(
&mut self,
who: PeerId,
data: BytesMut,
) -> CustomMessageOutcome<B> {
let message = match <Message<B> as Decode>::decode(&mut &data[..]) {
Ok(message) => message,
Err(err) => {
debug!(
target: "sync",
"Couldn't decode packet sent by {}: {:?}: {}",
who,
data,
err,
);
self.peerset_handle.report_peer(who, rep::BAD_MESSAGE);
return CustomMessageOutcome::None;
}
};

match message {
GenericMessage::Status(_) =>
debug!(target: "sub-libp2p", "Received unexpected Status"),
GenericMessage::BlockAnnounce(announce) =>
self.push_block_announce_validation(who.clone(), announce),
GenericMessage::Transactions(_) =>
warn!(target: "sub-libp2p", "Received unexpected Transactions"),
GenericMessage::BlockResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected BlockResponse"),
GenericMessage::RemoteCallResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteCallResponse"),
GenericMessage::RemoteReadResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteReadResponse"),
GenericMessage::RemoteHeaderResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteHeaderResponse"),
GenericMessage::RemoteChangesResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteChangesResponse"),
GenericMessage::BlockRequest(_) |
GenericMessage::RemoteReadChildRequest(_) |
GenericMessage::RemoteCallRequest(_) |
GenericMessage::RemoteReadRequest(_) |
GenericMessage::RemoteHeaderRequest(_) |
GenericMessage::RemoteChangesRequest(_) |
GenericMessage::Consensus(_) |
GenericMessage::ConsensusBatch(_) => {
debug!(
target: "sub-libp2p",
"Received no longer supported legacy request from {:?}",
who
);
self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(who, rep::BAD_PROTOCOL);
},
}

CustomMessageOutcome::None
}

fn prepare_block_request(
&mut self,
who: PeerId,
@@ -1547,13 +1456,6 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
}
}
},
GenericProtoOut::LegacyMessage { peer_id, message } => {
if self.peers.contains_key(&peer_id) {
self.on_custom_message(peer_id, message)
} else {
CustomMessageOutcome::None
}
},
GenericProtoOut::Notification { peer_id, set_id, message } =>
match set_id {
HARDCODED_PEERSETS_SYNC if self.peers.contains_key(&peer_id) => {
54 changes: 2 additions & 52 deletions client/network/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
@@ -16,10 +16,8 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::config::ProtocolId;
use crate::protocol::generic_proto::{
handler::{NotificationsSink, NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn},
upgrade::RegisteredProtocol
handler::{NotificationsSink, NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn}
};

use bytes::BytesMut;
@@ -97,9 +95,6 @@ use wasm_timer::Instant;
/// accommodates for any number of connections.
///
pub struct GenericProto {
/// Legacy protocol to open with peers. Never modified.
legacy_protocol: RegisteredProtocol,

/// Notification protocols. Entries are only ever added and not removed.
/// Contains, for each protocol, the protocol name and the message to send as part of the
/// initial handshake.
@@ -346,14 +341,6 @@ pub enum GenericProtoOut {
set_id: sc_peerset::SetId,
},

/// Receives a message on the legacy substream.
LegacyMessage {
/// Id of the peer the message came from.
peer_id: PeerId,
/// Message that has been received.
message: BytesMut,
},

/// Receives a message on a custom protocol substream.
///
/// Also concerns received notifications for the notifications API.
@@ -370,9 +357,6 @@ pub enum GenericProtoOut {
impl GenericProto {
/// Creates a `CustomProtos`.
pub fn new(
protocol: impl Into<ProtocolId>,
versions: &[u8],
handshake_message: Vec<u8>,
peerset: sc_peerset::Peerset,
notif_protocols: impl Iterator<Item = (Cow<'static, str>, Vec<u8>, u64)>,
) -> Self {
@@ -382,11 +366,7 @@ impl GenericProto {

assert!(!notif_protocols.is_empty());

let legacy_handshake_message = Arc::new(RwLock::new(handshake_message));
let legacy_protocol = RegisteredProtocol::new(protocol, versions, legacy_handshake_message);

GenericProto {
legacy_protocol,
notif_protocols,
peerset,
peers: FnvHashMap::default(),
@@ -412,14 +392,6 @@ impl GenericProto {
}
}

/// Modifies the handshake of the legacy protocol.
pub fn set_legacy_handshake_message(
&mut self,
handshake_message: impl Into<Vec<u8>>
) {
*self.legacy_protocol.handshake_message().write() = handshake_message.into();
}

/// Returns the number of discovered nodes that we keep in memory.
pub fn num_discovered_peers(&self) -> usize {
self.peerset.num_discovered_peers()
@@ -1046,10 +1018,7 @@ impl NetworkBehaviour for GenericProto {
type OutEvent = GenericProtoOut;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
NotifsHandlerProto::new(
self.legacy_protocol.clone(),
self.notif_protocols.clone(),
)
NotifsHandlerProto::new(self.notif_protocols.clone())
}

fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
@@ -1900,25 +1869,6 @@ impl NetworkBehaviour for GenericProto {
};
}

NotifsHandlerOut::CustomMessage { message } => {
if self.is_open(&source, sc_peerset::SetId::from(0)) { // TODO: using set 0 here is hacky
trace!(target: "sub-libp2p", "Handler({:?}) => Message", source);
trace!(target: "sub-libp2p", "External API <= Message({:?})", source);
let event = GenericProtoOut::LegacyMessage {
peer_id: source,
message,
};

self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
} else {
trace!(
target: "sub-libp2p",
"Handler({:?}) => Post-close message. Dropping message.",
source,
);
}
}

NotifsHandlerOut::Notification { protocol_index, message } => {
let set_id = sc_peerset::SetId::from(protocol_index);
if self.is_open(&source, set_id) {
Loading

0 comments on commit 0a61b0a

Please sign in to comment.