From 9f8084b62ef9d85ff2d1d1e01e21588857e1ecf2 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 24 Nov 2023 16:07:19 +0100 Subject: [PATCH] feat: add multiplexing test --- Cargo.lock | 24 ++ crates/net/eth-wire/Cargo.toml | 1 + crates/net/eth-wire/src/multiplex.rs | 374 ++++++++++++++++++++------ crates/net/eth-wire/src/test_utils.rs | 103 +++++++ 4 files changed, 423 insertions(+), 79 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2fc9c302f0cf9..b85bbdc109f6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -539,6 +539,28 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] + [[package]] name = "async-trait" version = "0.1.74" @@ -5923,6 +5945,7 @@ version = "0.1.0-alpha.11" dependencies = [ "alloy-rlp", "arbitrary", + "async-stream", "async-trait", "bytes", "ethers-core", @@ -6378,6 +6401,7 @@ dependencies = [ "reth-rpc-api", "reth-rpc-types", "serde_json", + "similar-asserts", "tokio", ] diff --git a/crates/net/eth-wire/Cargo.toml b/crates/net/eth-wire/Cargo.toml index a5f691ef383fe..479584877d908 100644 --- a/crates/net/eth-wire/Cargo.toml +++ b/crates/net/eth-wire/Cargo.toml @@ -50,6 +50,7 @@ secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recov arbitrary = { workspace = true, features = ["derive"] } proptest.workspace = true proptest-derive.workspace = true +async-stream = "0.3" [features] default = ["serde"] diff --git a/crates/net/eth-wire/src/multiplex.rs b/crates/net/eth-wire/src/multiplex.rs index e3bb92ecad3a2..689f17f0d6917 100644 --- a/crates/net/eth-wire/src/multiplex.rs +++ b/crates/net/eth-wire/src/multiplex.rs @@ -18,43 +18,88 @@ use std::{ use bytes::{Bytes, BytesMut}; use futures::{pin_mut, Sink, SinkExt, Stream, StreamExt, TryStream, TryStreamExt}; +use reth_primitives::ForkFilter; use tokio::sync::{mpsc, mpsc::UnboundedSender}; use tokio_stream::wrappers::UnboundedReceiverStream; use crate::{ capability::{Capability, SharedCapabilities, SharedCapability, UnsupportedCapabilityError}, - errors::P2PStreamError, - CanDisconnect, DisconnectReason, P2PStream, + errors::{EthStreamError, P2PStreamError}, + CanDisconnect, DisconnectReason, EthStream, EthVersion, P2PStream, Status, UnauthedEthStream, }; /// A Stream and Sink type that wraps a raw rlpx stream [P2PStream] and handles message ID /// multiplexing. #[derive(Debug)] pub struct RlpxProtocolMultiplexer { - /// The raw p2p stream - conn: P2PStream, - /// All the subprotocols that are multiplexed on top of the raw p2p stream - protocols: Vec, + inner: MultiplexInner, } impl RlpxProtocolMultiplexer { /// Wraps the raw p2p stream pub fn new(conn: P2PStream) -> Self { - Self { conn, protocols: Default::default() } + Self { + inner: MultiplexInner { + conn, + protocols: Default::default(), + out_buffer: Default::default(), + }, + } } - /// Installs a new protocol on top of the raw p2p stream - pub fn install_protocol( + /// Installs a new protocol on top of the raw p2p stream. + /// + /// This accepts a closure that receives a [ProtocolConnection] that will yield messages for the + /// given capability. + pub fn install_protocol( &mut self, - _cap: Capability, - _st: S, - ) -> Result<(), UnsupportedCapabilityError> { - todo!() + cap: &Capability, + f: F, + ) -> Result<(), UnsupportedCapabilityError> + where + F: FnOnce(ProtocolConnection) -> Proto, + Proto: Stream + Send + 'static, + { + self.inner.install_protocol(cap, f) } /// Returns the [SharedCapabilities] of the underlying raw p2p stream pub fn shared_capabilities(&self) -> &SharedCapabilities { - self.conn.shared_capabilities() + self.inner.shared_capabilities() + } + + /// Converts this multiplexer into a [RlpxSatelliteStream] with the given primary protocol. + pub fn into_satellite_stream( + self, + cap: &Capability, + primary: F, + ) -> Result, P2PStreamError> + where + F: FnOnce(ProtocolProxy) -> Primary, + { + let Ok(shared_cap) = self.shared_capabilities().ensure_matching_capability(cap).cloned() + else { + return Err(P2PStreamError::CapabilityNotShared) + }; + + let (to_primary, from_wire) = mpsc::unbounded_channel(); + let (to_wire, from_primary) = mpsc::unbounded_channel(); + let proxy = ProtocolProxy { + shared_cap: shared_cap.clone(), + from_wire: UnboundedReceiverStream::new(from_wire), + to_wire, + }; + + let st = primary(proxy); + Ok(RlpxSatelliteStream { + inner: self.inner, + primary: PrimaryProtocol { + to_primary, + from_primary: UnboundedReceiverStream::new(from_primary), + st, + shared_cap, + }, + }) } /// Converts this multiplexer into a [RlpxSatelliteStream] with the given primary protocol. @@ -80,7 +125,7 @@ impl RlpxProtocolMultiplexer { let (to_primary, from_wire) = mpsc::unbounded_channel(); let (to_wire, mut from_primary) = mpsc::unbounded_channel(); let proxy = ProtocolProxy { - cap: shared_cap.clone(), + shared_cap: shared_cap.clone(), from_wire: UnboundedReceiverStream::new(from_wire), to_wire, }; @@ -92,45 +137,114 @@ impl RlpxProtocolMultiplexer { // complete loop { tokio::select! { - Some(Ok(msg)) = self.conn.next() => { + Some(Ok(msg)) = self.inner.conn.next() => { // Ensure the message belongs to the primary protocol let offset = msg[0]; - if let Some(cap) = self.conn.shared_capabilities().find_by_relative_offset(offset) { - if cap == &shared_cap { + if let Some(cap) = self.shared_capabilities().find_by_relative_offset(offset).cloned() { + if cap == shared_cap { // delegate to primary let _ = to_primary.send(msg); } else { // delegate to satellite - for proto in &self.protocols { - if proto.cap == *cap { - // TODO: need some form of backpressure here so buffering can't be abused - proto.send_raw(msg); - break - } - } + self.inner.delegate_message(&cap, msg); } } else { return Err(P2PStreamError::UnknownReservedMessageId(offset).into()) } } Some(msg) = from_primary.recv() => { - self.conn.send(msg).await.map_err(Into::into)?; + self.inner.conn.send(msg).await.map_err(Into::into)?; } res = &mut f => { - let primary = res?; + let st = res?; return Ok(RlpxSatelliteStream { - conn: self.conn, - to_primary, - from_primary: UnboundedReceiverStream::new(from_primary), - primary, - primary_capability: shared_cap, - satellites: self.protocols, - out_buffer: Default::default(), + inner: self.inner, + primary: PrimaryProtocol { + to_primary, + from_primary: UnboundedReceiverStream::new(from_primary), + st, + shared_cap, + } }) } } } } + + /// Converts this multiplexer into a [RlpxSatelliteStream] with eth protocol as the given + /// primary protocol. + pub async fn into_eth_satellite_stream( + self, + eth_version: EthVersion, + status: Status, + fork_filter: ForkFilter, + ) -> Result>, EthStreamError> + where + St: Stream> + Sink + Unpin, + { + self.into_satellite_stream_with_handshake( + &Capability::eth(eth_version), + move |proxy| async move { + Ok(UnauthedEthStream::new(proxy).handshake(status, fork_filter).await?.0) + }, + ) + .await + } +} + +#[derive(Debug)] +struct MultiplexInner { + /// The raw p2p stream + conn: P2PStream, + /// All the subprotocols that are multiplexed on top of the raw p2p stream + protocols: Vec, + /// Buffer for outgoing messages on the wire. + out_buffer: VecDeque, +} + +impl MultiplexInner { + fn shared_capabilities(&self) -> &SharedCapabilities { + self.conn.shared_capabilities() + } + + /// Delegates a message to the matching protocol. + fn delegate_message(&mut self, cap: &SharedCapability, msg: BytesMut) -> bool { + for proto in &self.protocols { + if proto.shared_cap == *cap { + proto.send_raw(msg); + return true + } + } + false + } + + fn install_protocol( + &mut self, + cap: &Capability, + f: F, + ) -> Result<(), UnsupportedCapabilityError> + where + F: FnOnce(ProtocolConnection) -> Proto, + Proto: Stream + Send + 'static, + { + let shared_cap = + self.conn.shared_capabilities().ensure_matching_capability(cap).cloned()?; + let (to_satellite, rx) = mpsc::unbounded_channel(); + let proto_conn = ProtocolConnection { from_wire: UnboundedReceiverStream::new(rx) }; + let st = f(proto_conn); + let st = ProtocolStream { shared_cap, to_satellite, satellite_st: Box::pin(st) }; + self.protocols.push(st); + Ok(()) + } +} + +/// Represents a protocol in the multiplexer that is used as the primary protocol. +#[derive(Debug)] +struct PrimaryProtocol { + to_primary: UnboundedSender, + from_primary: UnboundedReceiverStream, + shared_cap: SharedCapability, + st: Primary, } /// A Stream and Sink type that acts as a wrapper around a primary RLPx subprotocol (e.g. "eth") @@ -138,7 +252,7 @@ impl RlpxProtocolMultiplexer { /// Only emits and sends _non-empty_ messages #[derive(Debug)] pub struct ProtocolProxy { - cap: SharedCapability, + shared_cap: SharedCapability, /// Receives _non-empty_ messages from the wire from_wire: UnboundedReceiverStream, /// Sends _non-empty_ messages from the wire @@ -163,7 +277,7 @@ impl ProtocolProxy { #[inline] fn mask_msg_id(&self, msg: Bytes) -> Bytes { let mut masked_bytes = BytesMut::zeroed(msg.len()); - masked_bytes[0] = msg[0] + self.cap.relative_message_id_offset(); + masked_bytes[0] = msg[0] + self.shared_cap.relative_message_id_offset(); masked_bytes[1..].copy_from_slice(&msg[1..]); masked_bytes.freeze() } @@ -175,7 +289,7 @@ impl ProtocolProxy { /// If the message is empty. #[inline] fn unmask_id(&self, mut msg: BytesMut) -> BytesMut { - msg[0] -= self.cap.relative_message_id_offset(); + msg[0] -= self.shared_cap.relative_message_id_offset(); msg } } @@ -240,17 +354,27 @@ impl Stream for ProtocolConnection { /// [EthStream](crate::EthStream) and can also handle additional subprotocols. #[derive(Debug)] pub struct RlpxSatelliteStream { - /// The raw p2p stream - conn: P2PStream, - to_primary: UnboundedSender, - from_primary: UnboundedReceiverStream, - primary: Primary, - primary_capability: SharedCapability, - satellites: Vec, - out_buffer: VecDeque, + inner: MultiplexInner, + primary: PrimaryProtocol, } -impl RlpxSatelliteStream {} +impl RlpxSatelliteStream { + /// Installs a new protocol on top of the raw p2p stream. + /// + /// This accepts a closure that receives a [ProtocolConnection] that will yield messages for the + /// given capability. + pub fn install_protocol( + &mut self, + cap: &Capability, + f: F, + ) -> Result<(), UnsupportedCapabilityError> + where + F: FnOnce(ProtocolConnection) -> Proto, + Proto: Stream + Send + 'static, + { + self.inner.install_protocol(cap, f) + } +} impl Stream for RlpxSatelliteStream where @@ -265,16 +389,16 @@ where loop { // first drain the primary stream - if let Poll::Ready(Some(msg)) = this.primary.try_poll_next_unpin(cx) { + if let Poll::Ready(Some(msg)) = this.primary.st.try_poll_next_unpin(cx) { return Poll::Ready(Some(msg)) } - let mut out_ready = true; + let mut conn_ready = true; loop { - match this.conn.poll_ready_unpin(cx) { + match this.inner.conn.poll_ready_unpin(cx) { Poll::Ready(_) => { - if let Some(msg) = this.out_buffer.pop_front() { - if let Err(err) = this.conn.start_send_unpin(msg) { + if let Some(msg) = this.inner.out_buffer.pop_front() { + if let Err(err) = this.inner.conn.start_send_unpin(msg) { return Poll::Ready(Some(Err(err.into()))) } } else { @@ -282,7 +406,7 @@ where } } Poll::Pending => { - out_ready = false; + conn_ready = false; break } } @@ -290,9 +414,9 @@ where // advance primary out loop { - match this.from_primary.poll_next_unpin(cx) { + match this.primary.from_primary.poll_next_unpin(cx) { Poll::Ready(Some(msg)) => { - this.out_buffer.push_back(msg); + this.inner.out_buffer.push_back(msg); } Poll::Ready(None) => { // primary closed @@ -303,16 +427,16 @@ where } // advance all satellites - for idx in (0..this.satellites.len()).rev() { - let mut proto = this.satellites.swap_remove(idx); + for idx in (0..this.inner.protocols.len()).rev() { + let mut proto = this.inner.protocols.swap_remove(idx); loop { match proto.poll_next_unpin(cx) { Poll::Ready(Some(msg)) => { - this.out_buffer.push_back(msg); + this.inner.out_buffer.push_back(msg); } Poll::Ready(None) => return Poll::Ready(None), Poll::Pending => { - this.satellites.push(proto); + this.inner.protocols.push(proto); break } } @@ -322,21 +446,21 @@ where let mut delegated = false; loop { // pull messages from connection - match this.conn.poll_next_unpin(cx) { + match this.inner.conn.poll_next_unpin(cx) { Poll::Ready(Some(Ok(msg))) => { delegated = true; let offset = msg[0]; // delegate the multiplexed message to the correct protocol if let Some(cap) = - this.conn.shared_capabilities().find_by_relative_offset(offset) + this.inner.conn.shared_capabilities().find_by_relative_offset(offset) { - if cap == &this.primary_capability { + if cap == &this.primary.shared_cap { // delegate to primary - let _ = this.to_primary.send(msg); + let _ = this.primary.to_primary.send(msg); } else { - // delegate to satellite - for proto in &this.satellites { - if proto.cap == *cap { + // delegate to installed satellite if any + for proto in &this.inner.protocols { + if proto.shared_cap == *cap { proto.send_raw(msg); break } @@ -358,7 +482,7 @@ where } } - if !delegated || !out_ready || this.out_buffer.is_empty() { + if !conn_ready || (!delegated && this.inner.out_buffer.is_empty()) { return Poll::Pending } } @@ -375,34 +499,34 @@ where fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - if let Err(err) = ready!(this.conn.poll_ready_unpin(cx)) { + if let Err(err) = ready!(this.inner.conn.poll_ready_unpin(cx)) { return Poll::Ready(Err(err.into())) } - if let Err(err) = ready!(this.primary.poll_ready_unpin(cx)) { + if let Err(err) = ready!(this.primary.st.poll_ready_unpin(cx)) { return Poll::Ready(Err(err)) } Poll::Ready(Ok(())) } fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - self.get_mut().primary.start_send_unpin(item) + self.get_mut().primary.st.start_send_unpin(item) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_mut().conn.poll_flush_unpin(cx).map_err(Into::into) + self.get_mut().inner.conn.poll_flush_unpin(cx).map_err(Into::into) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_mut().conn.poll_close_unpin(cx).map_err(Into::into) + self.get_mut().inner.conn.poll_close_unpin(cx).map_err(Into::into) } } /// Wraps a RLPx subprotocol and handles message ID multiplexing. struct ProtocolStream { - cap: SharedCapability, + shared_cap: SharedCapability, /// the channel shared with the satellite stream to_satellite: UnboundedSender, - satellite_st: Pin>>, + satellite_st: Pin + Send>>, } impl ProtocolStream { @@ -413,7 +537,7 @@ impl ProtocolStream { /// If the message is empty. #[inline] fn mask_msg_id(&self, mut msg: BytesMut) -> Bytes { - msg[0] += self.cap.relative_message_id_offset(); + msg[0] += self.shared_cap.relative_message_id_offset(); msg.freeze() } @@ -424,7 +548,7 @@ impl ProtocolStream { /// If the message is empty. #[inline] fn unmask_id(&self, mut msg: BytesMut) -> BytesMut { - msg[0] -= self.cap.relative_message_id_offset(); + msg[0] -= self.shared_cap.relative_message_id_offset(); msg } @@ -446,7 +570,7 @@ impl Stream for ProtocolStream { impl fmt::Debug for ProtocolStream { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ProtocolStream").field("cap", &self.cap).finish_non_exhaustive() + f.debug_struct("ProtocolStream").field("cap", &self.shared_cap).finish_non_exhaustive() } } @@ -454,10 +578,13 @@ impl fmt::Debug for ProtocolStream { mod tests { use super::*; use crate::{ - test_utils::{connect_passthrough, eth_handshake, eth_hello}, + test_utils::{ + connect_passthrough, eth_handshake, eth_hello, + proto::{test_hello, TestProtoMessage}, + }, UnauthedEthStream, UnauthedP2PStream, }; - use tokio::net::TcpListener; + use tokio::{net::TcpListener, sync::oneshot}; use tokio_util::codec::Decoder; #[tokio::test] @@ -487,7 +614,6 @@ mod tests { let eth = conn.shared_capabilities().eth().unwrap().clone(); let multiplexer = RlpxProtocolMultiplexer::new(conn); - let _satellite = multiplexer .into_satellite_stream_with_handshake( eth.capability().as_ref(), @@ -498,4 +624,94 @@ mod tests { .await .unwrap(); } + + /// A test that install a satellite stream eth+test protocol and sends messages between them. + #[tokio::test(flavor = "multi_thread")] + async fn eth_test_protocol_satellite() { + reth_tracing::init_test_tracing(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let local_addr = listener.local_addr().unwrap(); + let (status, fork_filter) = eth_handshake(); + let other_status = status; + let other_fork_filter = fork_filter.clone(); + let _handle = tokio::spawn(async move { + let (incoming, _) = listener.accept().await.unwrap(); + let stream = crate::PassthroughCodec::default().framed(incoming); + let (server_hello, _) = test_hello(); + let (conn, _) = UnauthedP2PStream::new(stream).handshake(server_hello).await.unwrap(); + + let mut st = RlpxProtocolMultiplexer::new(conn) + .into_eth_satellite_stream(EthVersion::Eth67, other_status, other_fork_filter) + .await + .unwrap(); + + st.install_protocol(&TestProtoMessage::capability(), |mut conn| { + async_stream::stream! { + yield TestProtoMessage::ping().encoded(); + let msg = conn.next().await.unwrap(); + let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap(); + assert_eq!(msg, TestProtoMessage::pong()); + + yield TestProtoMessage::message("hello").encoded(); + let msg = conn.next().await.unwrap(); + let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap(); + assert_eq!(msg, TestProtoMessage::message("good bye!")); + + yield TestProtoMessage::message("good bye!").encoded(); + + futures::future::pending::<()>().await; + unreachable!() + } + }) + .unwrap(); + + loop { + let _ = st.next().await; + } + }); + + let conn = connect_passthrough(local_addr, test_hello().0).await; + let mut st = RlpxProtocolMultiplexer::new(conn) + .into_eth_satellite_stream(EthVersion::Eth67, status, fork_filter) + .await + .unwrap(); + + let (tx, mut rx) = oneshot::channel(); + + st.install_protocol(&TestProtoMessage::capability(), |mut conn| { + async_stream::stream! { + let msg = conn.next().await.unwrap(); + let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap(); + assert_eq!(msg, TestProtoMessage::ping()); + + yield TestProtoMessage::pong().encoded(); + + let msg = conn.next().await.unwrap(); + let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap(); + assert_eq!(msg, TestProtoMessage::message("hello")); + + yield TestProtoMessage::message("good bye!").encoded(); + + let msg = conn.next().await.unwrap(); + let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap(); + assert_eq!(msg, TestProtoMessage::message("good bye!")); + + tx.send(()).unwrap(); + + futures::future::pending::<()>().await; + unreachable!() + } + }) + .unwrap(); + + loop { + tokio::select! { + _ = &mut rx => { + break + } + _ = st.next() => { + } + } + } + } } diff --git a/crates/net/eth-wire/src/test_utils.rs b/crates/net/eth-wire/src/test_utils.rs index 01bd9a048dc33..53521c72b500d 100644 --- a/crates/net/eth-wire/src/test_utils.rs +++ b/crates/net/eth-wire/src/test_utils.rs @@ -55,3 +55,106 @@ pub async fn connect_passthrough( p2p_stream } + +/// A Rplx subprotocol for testing +pub mod proto { + use super::*; + use crate::{capability::Capability, protocol::Protocol}; + use bytes::{Buf, BufMut, BytesMut}; + + /// Returns a new testing `HelloMessage` with eth and the test protocol + pub fn test_hello() -> (HelloMessageWithProtocols, SecretKey) { + let mut handshake = eth_hello(); + handshake.0.protocols.push(TestProtoMessage::protocol()); + handshake + } + + #[repr(u8)] + #[derive(Clone, Copy, Debug, PartialEq, Eq)] + pub enum TestProtoMessageId { + Ping = 0x00, + Pong = 0x01, + Message = 0x02, + } + + #[derive(Clone, Debug, PartialEq, Eq)] + pub enum TesProtoMessage { + Message(String), + Ping, + Pong, + } + + /// An `test` protocol message, containing a message ID and payload. + #[derive(Clone, Debug, PartialEq, Eq)] + pub struct TestProtoMessage { + pub message_type: TestProtoMessageId, + pub message: TesProtoMessage, + } + + impl TestProtoMessage { + /// Returns the capability for the `test` protocol. + pub fn capability() -> Capability { + Capability::new_static("test", 1) + } + + /// Returns the protocol for the `test` protocol. + pub fn protocol() -> Protocol { + Protocol::new(Self::capability(), 3) + } + + /// Creates a ping message + pub fn ping() -> Self { + Self { message_type: TestProtoMessageId::Ping, message: TesProtoMessage::Ping } + } + + /// Creates a pong message + pub fn pong() -> Self { + Self { message_type: TestProtoMessageId::Pong, message: TesProtoMessage::Pong } + } + + /// Creates a message + pub fn message(msg: impl Into) -> Self { + Self { + message_type: TestProtoMessageId::Message, + message: TesProtoMessage::Message(msg.into()), + } + } + + /// Creates a new `TestProtoMessage` with the given message ID and payload. + pub fn encoded(&self) -> BytesMut { + let mut buf = BytesMut::new(); + buf.put_u8(self.message_type as u8); + match &self.message { + TesProtoMessage::Ping => {} + TesProtoMessage::Pong => {} + TesProtoMessage::Message(msg) => { + buf.put(msg.as_bytes()); + } + } + buf + } + + /// Decodes a `TestProtoMessage` from the given message buffer. + pub fn decode_message(buf: &mut &[u8]) -> Option { + if buf.is_empty() { + return None; + } + let id = buf[0]; + buf.advance(1); + let message_type = match id { + 0x00 => TestProtoMessageId::Ping, + 0x01 => TestProtoMessageId::Pong, + 0x02 => TestProtoMessageId::Message, + _ => return None, + }; + let message = match message_type { + TestProtoMessageId::Ping => TesProtoMessage::Ping, + TestProtoMessageId::Pong => TesProtoMessage::Pong, + TestProtoMessageId::Message => { + TesProtoMessage::Message(String::from_utf8_lossy(&buf[..]).into_owned()) + } + }; + Some(Self { message_type, message }) + } + } +}