diff --git a/crates/net/network/src/protocol.rs b/crates/net/network/src/protocol.rs index c5347417c44e..24dd68690422 100644 --- a/crates/net/network/src/protocol.rs +++ b/crates/net/network/src/protocol.rs @@ -9,12 +9,11 @@ use reth_primitives::BytesMut; use reth_rpc_types::PeerId; use std::{ fmt, - future::Future, net::SocketAddr, pin::Pin, task::{Context, Poll}, }; -use tokio::sync::mpsc::UnboundedSender; + use tokio_stream::wrappers::UnboundedReceiverStream; /// A trait that allows to offer additional RLPx-based application-level protocols when establishing @@ -42,10 +41,12 @@ pub trait ProtocolHandler: fmt::Debug + Send + Sync + 'static { /// A trait that allows to authenticate a protocol after the RLPx connection was established. pub trait ConnectionHandler: Send + Sync + 'static { - /// The future that handles the connection - type Connection: Future + Send + 'static; + /// The connection that yields messages to send to the remote. + /// + /// The connection will be closed when this stream resolves. + type Connection: Stream + Send + 'static; - /// Returns the protocols to announce when the RLPx connection will be established. + /// Returns the protocol to announce when the RLPx connection will be established. /// /// This will be negotiated with the remote peer. fn protocol(&self) -> Protocol; @@ -80,27 +81,12 @@ pub enum OnNotSupported { Disconnect, } -/// A connection channel to send and receive messages for a specific protocols. +/// A connection channel to receive messages for the negotiated protocol. /// /// This is a [Stream] that returns raw bytes of the received messages for this protocol. #[derive(Debug)] pub struct ProtocolConnection { from_wire: UnboundedReceiverStream, - to_wire: UnboundedSender, -} - -impl ProtocolConnection { - /// Sends a message to the remote. - /// - /// Returns an error if the connection has been disconnected. - pub fn send(&self, msg: BytesMut) { - self.to_wire.send(ProtocolMessage::Message(msg)).ok(); - } - - /// Disconnects the connection. - pub fn disconnect(&self) { - let _ = self.to_wire.send(ProtocolMessage::Disconnect); - } } impl Stream for ProtocolConnection { @@ -111,38 +97,6 @@ impl Stream for ProtocolConnection { } } -/// Messages that can be sent from a protocol connection -#[derive(Debug)] -pub(crate) enum ProtocolMessage { - /// New message to send to the remote. - Message(BytesMut), - /// Disconnect the connection. - Disconnect, -} - -/// Errors that can occur when handling a protocol. -#[derive(Debug, thiserror::Error)] -pub enum ProtocolError { - /// custom error message - #[error("{0}")] - Message(String), - /// Ayn other error - #[error(transparent)] - Other(Box), -} - -impl ProtocolError { - /// Creates a new error with the given message. - pub fn msg(msg: impl fmt::Display) -> Self { - ProtocolError::Message(msg.to_string()) - } - - /// Wraps the given error in a `ProtocolError`. - pub fn new(err: impl std::error::Error + Send + Sync + 'static) -> Self { - ProtocolError::Other(Box::new(err)) - } -} - /// A wrapper type for a RLPx sub-protocol. #[derive(Debug)] pub struct RlpxSubProtocol(Box); @@ -221,7 +175,7 @@ pub(crate) trait DynConnectionHandler: Send + Sync + 'static { direction: Direction, peer_id: PeerId, conn: ProtocolConnection, - ) -> Pin + Send + 'static>>; + ) -> Pin + Send + 'static>>; } impl DynConnectionHandler for T @@ -246,7 +200,7 @@ where direction: Direction, peer_id: PeerId, conn: ProtocolConnection, - ) -> Pin + Send + 'static>> { + ) -> Pin + Send + 'static>> { Box::pin(T::into_connection(self, direction, peer_id, conn)) } }