Skip to content

Commit

Permalink
test: add multiplex test
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse committed Dec 2, 2023
1 parent 9b26746 commit a9e36cc
Show file tree
Hide file tree
Showing 5 changed files with 344 additions and 1 deletion.
6 changes: 6 additions & 0 deletions crates/net/eth-wire/src/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,12 @@ impl SharedCapabilities {
pub fn len(&self) -> usize {
self.0.len()
}

/// Returns true if there are no shared capabilities.
#[inline]
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}

/// Determines the offsets for each shared capability between the input list of peer
Expand Down
2 changes: 1 addition & 1 deletion crates/net/network/src/session/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub type EthSatelliteConnection =

/// Connection types that support the ETH protocol.
///
/// Either a [`PeerConnection`] or an [`EthSatelliteConnection`].
/// Either a [`EthPeerConnection`] or an [`EthSatelliteConnection`].
// This type is boxed because the underlying stream is ~6KB,
// mostly coming from `P2PStream`'s `snap::Encoder` (2072), and `ECIESStream` (3600).
#[derive(Debug)]
Expand Down
6 changes: 6 additions & 0 deletions crates/net/network/src/test_utils/testnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
builder::ETH_REQUEST_CHANNEL_CAPACITY,
error::NetworkError,
eth_requests::EthRequestHandler,
protocol::IntoRlpxSubProtocol,
transactions::{TransactionsHandle, TransactionsManager},
NetworkConfig, NetworkConfigBuilder, NetworkEvent, NetworkEvents, NetworkHandle,
NetworkManager,
Expand Down Expand Up @@ -332,6 +333,11 @@ where
self.network.num_connected_peers()
}

/// Adds an additional protocol handler to the peer.
pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
self.network.add_rlpx_sub_protocol(protocol);
}

/// Returns a handle to the peer's network.
pub fn peer_handle(&self) -> PeerHandle<Pool> {
PeerHandle {
Expand Down
1 change: 1 addition & 0 deletions crates/net/network/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod big_pooled_txs_req;
mod clique;
mod connect;
mod geth;
mod multiplex;
mod requests;
mod session;
mod startup;
Expand Down
330 changes: 330 additions & 0 deletions crates/net/network/tests/it/multiplex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
//! Testing gossiping of transactions.
use crate::multiplex::proto::{PingPongProtoMessage, PingPongProtoMessageKind};
use futures::{Stream, StreamExt};
use reth_eth_wire::{
capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol,
};
use reth_network::{
protocol::{ConnectionHandler, OnNotSupported, ProtocolHandler},
test_utils::Testnet,
};
use reth_network_api::Direction;
use reth_primitives::BytesMut;
use reth_provider::test_utils::MockEthProvider;
use reth_rpc_types::PeerId;
use std::{
net::SocketAddr,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;

/// A simple Rplx subprotocol for
mod proto {
use super::*;
use reth_eth_wire::capability::Capability;
use reth_primitives::{Buf, BufMut};

#[repr(u8)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum PingPongProtoMessageId {
Ping = 0x00,
Pong = 0x01,
PingMessage = 0x02,
PongMessage = 0x03,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum PingPongProtoMessageKind {
Ping,
Pong,
PingMessage(String),
PongMessage(String),
}

/// An protocol message, containing a message ID and payload.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PingPongProtoMessage {
pub message_type: PingPongProtoMessageId,
pub message: PingPongProtoMessageKind,
}

impl PingPongProtoMessage {
/// Returns the capability for the `ping` protocol.
pub fn capability() -> Capability {
Capability::new_static("ping", 1)
}

/// Returns the protocol for the `test` protocol.
pub fn protocol() -> Protocol {
Protocol::new(Self::capability(), 4)
}

/// Creates a ping message
pub fn ping() -> Self {
Self {
message_type: PingPongProtoMessageId::Ping,
message: PingPongProtoMessageKind::Ping,
}
}

/// Creates a pong message
pub fn pong() -> Self {
Self {
message_type: PingPongProtoMessageId::Pong,
message: PingPongProtoMessageKind::Pong,
}
}

/// Creates a ping message
pub fn ping_message(msg: impl Into<String>) -> Self {
Self {
message_type: PingPongProtoMessageId::PingMessage,
message: PingPongProtoMessageKind::PingMessage(msg.into()),
}
}
/// Creates a ping message
pub fn pong_message(msg: impl Into<String>) -> Self {
Self {
message_type: PingPongProtoMessageId::PongMessage,
message: PingPongProtoMessageKind::PongMessage(msg.into()),
}
}

/// Creates a new `TestProtoMessage` with the given message ID and payload.
pub fn encoded(&self) -> BytesMut {
let mut buf = BytesMut::new();
buf.put_u8(self.message_type as u8);
match &self.message {
PingPongProtoMessageKind::Ping => {}
PingPongProtoMessageKind::Pong => {}
PingPongProtoMessageKind::PingMessage(msg) => {
buf.put(msg.as_bytes());
}
PingPongProtoMessageKind::PongMessage(msg) => {
buf.put(msg.as_bytes());
}
}
buf
}

/// Decodes a `TestProtoMessage` from the given message buffer.
pub fn decode_message(buf: &mut &[u8]) -> Option<Self> {
if buf.is_empty() {
return None;
}
let id = buf[0];
buf.advance(1);
let message_type = match id {
0x00 => PingPongProtoMessageId::Ping,
0x01 => PingPongProtoMessageId::Pong,
0x02 => PingPongProtoMessageId::PingMessage,
0x03 => PingPongProtoMessageId::PongMessage,
_ => return None,
};
let message = match message_type {
PingPongProtoMessageId::Ping => PingPongProtoMessageKind::Ping,
PingPongProtoMessageId::Pong => PingPongProtoMessageKind::Pong,
PingPongProtoMessageId::PingMessage => PingPongProtoMessageKind::PingMessage(
String::from_utf8_lossy(&buf[..]).into_owned(),
),
PingPongProtoMessageId::PongMessage => PingPongProtoMessageKind::PongMessage(
String::from_utf8_lossy(&buf[..]).into_owned(),
),
};
Some(Self { message_type, message })
}
}
}

#[derive(Debug)]
struct PingPongProtoHandler {
state: ProtocolState,
}

impl ProtocolHandler for PingPongProtoHandler {
type ConnectionHandler = PingPongConnectionHandler;

fn on_incoming(&self, _socket_addr: SocketAddr) -> Option<Self::ConnectionHandler> {
Some(PingPongConnectionHandler { state: self.state.clone() })
}

fn on_outgoing(
&self,
_socket_addr: SocketAddr,
_peer_id: PeerId,
) -> Option<Self::ConnectionHandler> {
Some(PingPongConnectionHandler { state: self.state.clone() })
}
}

#[derive(Clone, Debug)]
struct ProtocolState {
events: mpsc::UnboundedSender<ProtocolEvent>,
}

#[derive(Debug)]
#[allow(dead_code)]
enum ProtocolEvent {
Established {
direction: Direction,
peer_id: PeerId,
to_connection: mpsc::UnboundedSender<Command>,
},
}

enum Command {
/// Send a ping message to the peer.
PingMessage {
msg: String,
/// The response will be sent to this channel.
response: oneshot::Sender<String>,
},
}

struct PingPongConnectionHandler {
state: ProtocolState,
}

impl ConnectionHandler for PingPongConnectionHandler {
type Connection = PingPongProtoConnection;

fn protocol(&self) -> Protocol {
PingPongProtoMessage::protocol()
}

fn on_unsupported_by_peer(
self,
_supported: &SharedCapabilities,
_direction: Direction,
_peer_id: PeerId,
) -> OnNotSupported {
OnNotSupported::KeepAlive
}

fn into_connection(
self,
direction: Direction,
_peer_id: PeerId,
conn: ProtocolConnection,
) -> Self::Connection {
let (tx, rx) = mpsc::unbounded_channel();
self.state
.events
.send(ProtocolEvent::Established { direction, peer_id: _peer_id, to_connection: tx })
.ok();
PingPongProtoConnection {
conn,
initial_ping: direction.is_outgoing().then(PingPongProtoMessage::ping),
commands: UnboundedReceiverStream::new(rx),
pending_pong: None,
}
}
}

struct PingPongProtoConnection {
conn: ProtocolConnection,
initial_ping: Option<PingPongProtoMessage>,
commands: UnboundedReceiverStream<Command>,
pending_pong: Option<oneshot::Sender<String>>,
}

impl Stream for PingPongProtoConnection {
type Item = BytesMut;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Some(initial_ping) = this.initial_ping.take() {
return Poll::Ready(Some(initial_ping.encoded()));
}

loop {
if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) {
return match cmd {
Command::PingMessage { msg, response } => {
this.pending_pong = Some(response);
Poll::Ready(Some(PingPongProtoMessage::ping_message(msg).encoded()))
}
}
}
let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else {
return Poll::Ready(None);
};

let Some(msg) = PingPongProtoMessage::decode_message(&mut &msg[..]) else {
return Poll::Ready(None);
};

match msg.message {
PingPongProtoMessageKind::Ping => {
return Poll::Ready(Some(PingPongProtoMessage::pong().encoded()));
}
PingPongProtoMessageKind::Pong => {}
PingPongProtoMessageKind::PingMessage(msg) => {
return Poll::Ready(Some(PingPongProtoMessage::pong_message(msg).encoded()));
}
PingPongProtoMessageKind::PongMessage(msg) => {
if let Some(sender) = this.pending_pong.take() {
sender.send(msg).ok();
}
continue
}
}

return Poll::Pending;
}
}
}

#[tokio::test(flavor = "multi_thread")]
async fn test_proto_multiplex() {
reth_tracing::init_test_tracing();
let provider = MockEthProvider::default();
let mut net = Testnet::create_with(2, provider.clone()).await;

let (tx, mut from_peer0) = mpsc::unbounded_channel();
net.peers_mut()[0]
.add_rlpx_sub_protocol(PingPongProtoHandler { state: ProtocolState { events: tx } });

let (tx, mut from_peer1) = mpsc::unbounded_channel();
net.peers_mut()[1]
.add_rlpx_sub_protocol(PingPongProtoHandler { state: ProtocolState { events: tx } });

let handle = net.spawn();
// connect all the peers
handle.connect_peers().await;

let peer0_to_peer1 = from_peer0.recv().await.unwrap();
let peer0_conn = match peer0_to_peer1 {
ProtocolEvent::Established { direction: _, peer_id, to_connection } => {
assert_eq!(peer_id, *handle.peers()[1].peer_id());
to_connection
}
};

let peer1_to_peer0 = from_peer1.recv().await.unwrap();
let peer1_conn = match peer1_to_peer0 {
ProtocolEvent::Established { direction: _, peer_id, to_connection } => {
assert_eq!(peer_id, *handle.peers()[0].peer_id());
to_connection
}
};

let (tx, rx) = oneshot::channel();
// send a ping message from peer0 to peer1
peer0_conn.send(Command::PingMessage { msg: "hello!".to_string(), response: tx }).unwrap();

let response = rx.await.unwrap();
assert_eq!(response, "hello!");

let (tx, rx) = oneshot::channel();
// send a ping message from peer1 to peer0
peer1_conn
.send(Command::PingMessage { msg: "hello from peer1!".to_string(), response: tx })
.unwrap();

let response = rx.await.unwrap();
assert_eq!(response, "hello from peer1!");
}

0 comments on commit a9e36cc

Please sign in to comment.