Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: convert connection type to stream #5384

Merged
merged 1 commit into from
Nov 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 9 additions & 55 deletions crates/net/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ use reth_primitives::BytesMut;
use reth_rpc_types::PeerId;
use std::{
fmt,
future::Future,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::mpsc::UnboundedSender;

use tokio_stream::wrappers::UnboundedReceiverStream;

/// A trait that allows to offer additional RLPx-based application-level protocols when establishing
Expand Down Expand Up @@ -42,10 +41,12 @@ pub trait ProtocolHandler: fmt::Debug + Send + Sync + 'static {

/// A trait that allows to authenticate a protocol after the RLPx connection was established.
pub trait ConnectionHandler: Send + Sync + 'static {
/// The future that handles the connection
type Connection: Future<Output = ()> + Send + 'static;
/// The connection that yields messages to send to the remote.
///
/// The connection will be closed when this stream resolves.
type Connection: Stream<Item = BytesMut> + Send + 'static;

/// Returns the protocols to announce when the RLPx connection will be established.
/// Returns the protocol to announce when the RLPx connection will be established.
///
/// This will be negotiated with the remote peer.
fn protocol(&self) -> Protocol;
Expand Down Expand Up @@ -80,27 +81,12 @@ pub enum OnNotSupported {
Disconnect,
}

/// A connection channel to send and receive messages for a specific protocols.
/// A connection channel to receive messages for the negotiated protocol.
///
/// This is a [Stream] that returns raw bytes of the received messages for this protocol.
#[derive(Debug)]
pub struct ProtocolConnection {
from_wire: UnboundedReceiverStream<BytesMut>,
to_wire: UnboundedSender<ProtocolMessage>,
}

impl ProtocolConnection {
/// Sends a message to the remote.
///
/// Returns an error if the connection has been disconnected.
pub fn send(&self, msg: BytesMut) {
self.to_wire.send(ProtocolMessage::Message(msg)).ok();
}

/// Disconnects the connection.
pub fn disconnect(&self) {
let _ = self.to_wire.send(ProtocolMessage::Disconnect);
}
}

impl Stream for ProtocolConnection {
Expand All @@ -111,38 +97,6 @@ impl Stream for ProtocolConnection {
}
}

/// Messages that can be sent from a protocol connection
#[derive(Debug)]
pub(crate) enum ProtocolMessage {
/// New message to send to the remote.
Message(BytesMut),
/// Disconnect the connection.
Disconnect,
}

/// Errors that can occur when handling a protocol.
#[derive(Debug, thiserror::Error)]
pub enum ProtocolError {
/// custom error message
#[error("{0}")]
Message(String),
/// Ayn other error
#[error(transparent)]
Other(Box<dyn std::error::Error + Send + Sync + 'static>),
}

impl ProtocolError {
/// Creates a new error with the given message.
pub fn msg(msg: impl fmt::Display) -> Self {
ProtocolError::Message(msg.to_string())
}

/// Wraps the given error in a `ProtocolError`.
pub fn new(err: impl std::error::Error + Send + Sync + 'static) -> Self {
ProtocolError::Other(Box::new(err))
}
}

/// A wrapper type for a RLPx sub-protocol.
#[derive(Debug)]
pub struct RlpxSubProtocol(Box<dyn DynProtocolHandler>);
Expand Down Expand Up @@ -221,7 +175,7 @@ pub(crate) trait DynConnectionHandler: Send + Sync + 'static {
direction: Direction,
peer_id: PeerId,
conn: ProtocolConnection,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
) -> Pin<Box<dyn Stream<Item = BytesMut> + Send + 'static>>;
}

impl<T> DynConnectionHandler for T
Expand All @@ -246,7 +200,7 @@ where
direction: Direction,
peer_id: PeerId,
conn: ProtocolConnection,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
) -> Pin<Box<dyn Stream<Item = BytesMut> + Send + 'static>> {
Box::pin(T::into_connection(self, direction, peer_id, conn))
}
}
Loading