Skip to content

Commit

Permalink
split outbound and inbound codecs encoded types (sigp#2410)
Browse files Browse the repository at this point in the history
Splits the inbound and outbound requests, for maintainability.
  • Loading branch information
divagant-martian committed Jun 17, 2021
1 parent a526145 commit 3261eff
Show file tree
Hide file tree
Showing 10 changed files with 304 additions and 154 deletions.
2 changes: 1 addition & 1 deletion beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl<TSpec: EthSpec> BehaviourHandler<TSpec> {
pub enum BehaviourHandlerIn<TSpec: EthSpec> {
Delegate(DelegateIn<TSpec>),
/// Start the shutdown process.
Shutdown(Option<(RequestId, RPCRequest<TSpec>)>),
Shutdown(Option<(RequestId, OutboundRequest<TSpec>)>),
}

impl<TSpec: EthSpec> ProtocolsHandler for BehaviourHandler<TSpec> {
Expand Down
30 changes: 15 additions & 15 deletions beacon_node/eth2_libp2p/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
trace!(self.log, "Sending Ping"; "request_id" => id, "peer_id" => %peer_id);

self.eth2_rpc
.send_request(peer_id, id, RPCRequest::Ping(ping));
.send_request(peer_id, id, OutboundRequest::Ping(ping));
}

/// Sends a Pong response to the peer.
Expand All @@ -610,7 +610,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {

/// Sends a METADATA request to a peer.
fn send_meta_data_request(&mut self, peer_id: PeerId) {
let event = RPCRequest::MetaData(PhantomData);
let event = OutboundRequest::MetaData(PhantomData);
self.eth2_rpc
.send_request(peer_id, RequestId::Behaviour, event);
}
Expand Down Expand Up @@ -749,17 +749,17 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
let peer_request_id = (handler_id, id);
match request {
/* Behaviour managed protocols: Ping and Metadata */
RPCRequest::Ping(ping) => {
InboundRequest::Ping(ping) => {
// inform the peer manager and send the response
self.peer_manager.ping_request(&peer_id, ping.data);
// send a ping response
self.pong(peer_request_id, peer_id);
}
RPCRequest::MetaData(_) => {
InboundRequest::MetaData(_) => {
// send the requested meta-data
self.send_meta_data_response((handler_id, id), peer_id);
}
RPCRequest::Goodbye(reason) => {
InboundRequest::Goodbye(reason) => {
// queue for disconnection without a goodbye message
debug!(
self.log, "Peer sent Goodbye";
Expand All @@ -775,18 +775,18 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
// inform the application layer early.
}
/* Protocols propagated to the Network */
RPCRequest::Status(msg) => {
InboundRequest::Status(msg) => {
// inform the peer manager that we have received a status from a peer
self.peer_manager.peer_statusd(&peer_id);
// propagate the STATUS message upwards
self.propagate_request(peer_request_id, peer_id, Request::Status(msg))
}
RPCRequest::BlocksByRange(req) => self.propagate_request(
InboundRequest::BlocksByRange(req) => self.propagate_request(
peer_request_id,
peer_id,
Request::BlocksByRange(req),
),
RPCRequest::BlocksByRoot(req) => {
InboundRequest::BlocksByRoot(req) => {
self.propagate_request(peer_request_id, peer_id, Request::BlocksByRoot(req))
}
}
Expand Down Expand Up @@ -834,7 +834,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
peer_id,
handler: NotifyHandler::Any,
event: BehaviourHandlerIn::Shutdown(
reason.map(|reason| (RequestId::Behaviour, RPCRequest::Goodbye(reason))),
reason.map(|reason| (RequestId::Behaviour, OutboundRequest::Goodbye(reason))),
),
});
}
Expand Down Expand Up @@ -878,7 +878,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
handler: NotifyHandler::Any,
event: BehaviourHandlerIn::Shutdown(Some((
RequestId::Behaviour,
RPCRequest::Goodbye(reason),
OutboundRequest::Goodbye(reason),
))),
});
}
Expand Down Expand Up @@ -1293,12 +1293,12 @@ pub enum Request {
BlocksByRoot(BlocksByRootRequest),
}

impl<TSpec: EthSpec> std::convert::From<Request> for RPCRequest<TSpec> {
fn from(req: Request) -> RPCRequest<TSpec> {
impl<TSpec: EthSpec> std::convert::From<Request> for OutboundRequest<TSpec> {
fn from(req: Request) -> OutboundRequest<TSpec> {
match req {
Request::BlocksByRoot(r) => RPCRequest::BlocksByRoot(r),
Request::BlocksByRange(r) => RPCRequest::BlocksByRange(r),
Request::Status(s) => RPCRequest::Status(s),
Request::BlocksByRoot(r) => OutboundRequest::BlocksByRoot(r),
Request::BlocksByRange(r) => OutboundRequest::BlocksByRange(r),
Request::Status(s) => OutboundRequest::Status(s),
}
}
}
Expand Down
24 changes: 14 additions & 10 deletions beacon_node/eth2_libp2p/src/rpc/codec/base.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! This handles the various supported encoding mechanism for the Eth 2.0 RPC.

use crate::rpc::methods::ErrorType;
use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse};
use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse, RPCResponse};
use libp2p::bytes::BufMut;
use libp2p::bytes::BytesMut;
use std::marker::PhantomData;
Expand Down Expand Up @@ -47,7 +47,7 @@ where
// This deals with Decoding RPC Responses from other peers and encoding our requests
pub struct BaseOutboundCodec<TOutboundCodec, TSpec>
where
TOutboundCodec: OutboundCodec<RPCRequest<TSpec>>,
TOutboundCodec: OutboundCodec<OutboundRequest<TSpec>>,
TSpec: EthSpec,
{
/// Inner codec for handling various encodings.
Expand All @@ -60,7 +60,7 @@ where
impl<TOutboundCodec, TSpec> BaseOutboundCodec<TOutboundCodec, TSpec>
where
TSpec: EthSpec,
TOutboundCodec: OutboundCodec<RPCRequest<TSpec>>,
TOutboundCodec: OutboundCodec<OutboundRequest<TSpec>>,
{
pub fn new(codec: TOutboundCodec) -> Self {
BaseOutboundCodec {
Expand Down Expand Up @@ -102,9 +102,9 @@ where
impl<TCodec, TSpec> Decoder for BaseInboundCodec<TCodec, TSpec>
where
TSpec: EthSpec,
TCodec: Encoder<RPCCodedResponse<TSpec>> + Decoder<Item = RPCRequest<TSpec>>,
TCodec: Encoder<RPCCodedResponse<TSpec>> + Decoder<Item = InboundRequest<TSpec>>,
{
type Item = RPCRequest<TSpec>;
type Item = InboundRequest<TSpec>;
type Error = <TCodec as Decoder>::Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
Expand All @@ -115,14 +115,18 @@ where
/* Base Outbound Codec */

// This Encodes RPC Requests sent to external peers
impl<TCodec, TSpec> Encoder<RPCRequest<TSpec>> for BaseOutboundCodec<TCodec, TSpec>
impl<TCodec, TSpec> Encoder<OutboundRequest<TSpec>> for BaseOutboundCodec<TCodec, TSpec>
where
TSpec: EthSpec,
TCodec: OutboundCodec<RPCRequest<TSpec>> + Encoder<RPCRequest<TSpec>>,
TCodec: OutboundCodec<OutboundRequest<TSpec>> + Encoder<OutboundRequest<TSpec>>,
{
type Error = <TCodec as Encoder<RPCRequest<TSpec>>>::Error;
type Error = <TCodec as Encoder<OutboundRequest<TSpec>>>::Error;

fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(
&mut self,
item: OutboundRequest<TSpec>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
self.inner.encode(item, dst)
}
}
Expand All @@ -131,7 +135,7 @@ where
impl<TCodec, TSpec> Decoder for BaseOutboundCodec<TCodec, TSpec>
where
TSpec: EthSpec,
TCodec: OutboundCodec<RPCRequest<TSpec>, CodecErrorType = ErrorType>
TCodec: OutboundCodec<OutboundRequest<TSpec>, CodecErrorType = ErrorType>
+ Decoder<Item = RPCResponse<TSpec>>,
{
type Item = RPCCodedResponse<TSpec>;
Expand Down
12 changes: 8 additions & 4 deletions beacon_node/eth2_libp2p/src/rpc/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub(crate) mod ssz_snappy;
use self::base::{BaseInboundCodec, BaseOutboundCodec};
use self::ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec};
use crate::rpc::protocol::RPCError;
use crate::rpc::{RPCCodedResponse, RPCRequest};
use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse};
use libp2p::bytes::BytesMut;
use tokio_util::codec::{Decoder, Encoder};
use types::EthSpec;
Expand All @@ -29,7 +29,7 @@ impl<T: EthSpec> Encoder<RPCCodedResponse<T>> for InboundCodec<T> {
}

impl<TSpec: EthSpec> Decoder for InboundCodec<TSpec> {
type Item = RPCRequest<TSpec>;
type Item = InboundRequest<TSpec>;
type Error = RPCError;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
Expand All @@ -39,10 +39,14 @@ impl<TSpec: EthSpec> Decoder for InboundCodec<TSpec> {
}
}

impl<TSpec: EthSpec> Encoder<RPCRequest<TSpec>> for OutboundCodec<TSpec> {
impl<TSpec: EthSpec> Encoder<OutboundRequest<TSpec>> for OutboundCodec<TSpec> {
type Error = RPCError;

fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(
&mut self,
item: OutboundRequest<TSpec>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
match self {
OutboundCodec::SSZSnappy(codec) => codec.encode(item, dst),
}
Expand Down
49 changes: 27 additions & 22 deletions beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::rpc::methods::*;
use crate::rpc::{
codec::base::OutboundCodec,
protocol::{Encoding, Protocol, ProtocolId, RPCError, Version, ERROR_TYPE_MAX, ERROR_TYPE_MIN},
};
use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse};
use crate::rpc::{methods::*, InboundRequest, OutboundRequest, RPCCodedResponse, RPCResponse};
use libp2p::bytes::BytesMut;
use snap::read::FrameDecoder;
use snap::write::FrameEncoder;
Expand Down Expand Up @@ -90,7 +89,7 @@ impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZSnappyInboundCodec<

// Decoder for inbound streams: Decodes RPC requests from peers
impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
type Item = RPCRequest<TSpec>;
type Item = InboundRequest<TSpec>;
type Error = RPCError;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
Expand Down Expand Up @@ -133,27 +132,29 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
// since we have already checked `length` above.
match self.protocol.message_name {
Protocol::Status => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes(
&decoded_buffer,
)?))),
Version::V1 => Ok(Some(InboundRequest::Status(
StatusMessage::from_ssz_bytes(&decoded_buffer)?,
))),
},
Protocol::Goodbye => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::Goodbye(
Version::V1 => Ok(Some(InboundRequest::Goodbye(
GoodbyeReason::from_ssz_bytes(&decoded_buffer)?,
))),
},
Protocol::BlocksByRange => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::BlocksByRange(
Version::V1 => Ok(Some(InboundRequest::BlocksByRange(
BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?,
))),
},
Protocol::BlocksByRoot => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?,
}))),
Version::V1 => {
Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?,
})))
}
},
Protocol::Ping => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::Ping(Ping {
Version::V1 => Ok(Some(InboundRequest::Ping(Ping {
data: u64::from_ssz_bytes(&decoded_buffer)?,
}))),
},
Expand All @@ -163,7 +164,7 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
if !decoded_buffer.is_empty() {
Err(RPCError::InvalidData)
} else {
Ok(Some(RPCRequest::MetaData(PhantomData)))
Ok(Some(InboundRequest::MetaData(PhantomData)))
}
}
},
Expand Down Expand Up @@ -201,17 +202,21 @@ impl<TSpec: EthSpec> SSZSnappyOutboundCodec<TSpec> {
}

// Encoder for outbound streams: Encodes RPC Requests to peers
impl<TSpec: EthSpec> Encoder<RPCRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> {
impl<TSpec: EthSpec> Encoder<OutboundRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> {
type Error = RPCError;

fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(
&mut self,
item: OutboundRequest<TSpec>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
let bytes = match item {
RPCRequest::Status(req) => req.as_ssz_bytes(),
RPCRequest::Goodbye(req) => req.as_ssz_bytes(),
RPCRequest::BlocksByRange(req) => req.as_ssz_bytes(),
RPCRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(),
RPCRequest::Ping(req) => req.as_ssz_bytes(),
RPCRequest::MetaData(_) => return Ok(()), // no metadata to encode
OutboundRequest::Status(req) => req.as_ssz_bytes(),
OutboundRequest::Goodbye(req) => req.as_ssz_bytes(),
OutboundRequest::BlocksByRange(req) => req.as_ssz_bytes(),
OutboundRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(),
OutboundRequest::Ping(req) => req.as_ssz_bytes(),
OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode
};
// SSZ encoded bytes should be within `max_packet_size`
if bytes.len() > self.max_packet_size {
Expand Down Expand Up @@ -318,7 +323,7 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyOutboundCodec<TSpec> {
}
}

impl<TSpec: EthSpec> OutboundCodec<RPCRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> {
impl<TSpec: EthSpec> OutboundCodec<OutboundRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> {
type CodecErrorType = ErrorType;

fn decode_error(
Expand Down
17 changes: 9 additions & 8 deletions beacon_node/eth2_libp2p/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
#![allow(clippy::cognitive_complexity)]

use super::methods::{RPCCodedResponse, RPCResponseErrorCode, RequestId, ResponseTermination};
use super::protocol::{Protocol, RPCError, RPCProtocol, RPCRequest};
use super::protocol::{Protocol, RPCError, RPCProtocol};
use super::{RPCReceived, RPCSend};
use crate::rpc::protocol::{InboundFramed, OutboundFramed};
use crate::rpc::outbound::{OutboundFramed, OutboundRequest};
use crate::rpc::protocol::InboundFramed;
use fnv::FnvHashMap;
use futures::prelude::*;
use futures::{Sink, SinkExt};
Expand Down Expand Up @@ -90,7 +91,7 @@ where
events_out: SmallVec<[HandlerEvent<TSpec>; 4]>,

/// Queue of outbound substreams to open.
dial_queue: SmallVec<[(RequestId, RPCRequest<TSpec>); 4]>,
dial_queue: SmallVec<[(RequestId, OutboundRequest<TSpec>); 4]>,

/// Current number of concurrent outbound substreams being opened.
dial_negotiated: u32,
Expand Down Expand Up @@ -186,7 +187,7 @@ pub enum OutboundSubstreamState<TSpec: EthSpec> {
/// The framed negotiated substream.
substream: Box<OutboundFramed<NegotiatedSubstream, TSpec>>,
/// Keeps track of the actual request sent.
request: RPCRequest<TSpec>,
request: OutboundRequest<TSpec>,
},
/// Closing an outbound substream>
Closing(Box<OutboundFramed<NegotiatedSubstream, TSpec>>),
Expand Down Expand Up @@ -221,7 +222,7 @@ where
}

/// Initiates the handler's shutdown process, sending an optional last message to the peer.
pub fn shutdown(&mut self, final_msg: Option<(RequestId, RPCRequest<TSpec>)>) {
pub fn shutdown(&mut self, final_msg: Option<(RequestId, OutboundRequest<TSpec>)>) {
if matches!(self.state, HandlerState::Active) {
if !self.dial_queue.is_empty() {
debug!(self.log, "Starting handler shutdown"; "unsent_queued_requests" => self.dial_queue.len());
Expand All @@ -247,7 +248,7 @@ where
}

/// Opens an outbound substream with a request.
fn send_request(&mut self, id: RequestId, req: RPCRequest<TSpec>) {
fn send_request(&mut self, id: RequestId, req: OutboundRequest<TSpec>) {
match self.state {
HandlerState::Active => {
self.dial_queue.push((id, req));
Expand Down Expand Up @@ -303,8 +304,8 @@ where
type OutEvent = HandlerEvent<TSpec>;
type Error = RPCError;
type InboundProtocol = RPCProtocol<TSpec>;
type OutboundProtocol = RPCRequest<TSpec>;
type OutboundOpenInfo = (RequestId, RPCRequest<TSpec>); // Keep track of the id and the request
type OutboundProtocol = OutboundRequest<TSpec>;
type OutboundOpenInfo = (RequestId, OutboundRequest<TSpec>); // Keep track of the id and the request
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
Expand Down
Loading

0 comments on commit 3261eff

Please sign in to comment.