From 339831ca2c3457f3073109c732c81cd3bfb2aa6a Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Wed, 30 Oct 2024 11:46:56 -0300 Subject: [PATCH 01/12] Added some error handling in connection module --- crates/networking/p2p/rlpx/connection.rs | 233 ++++++++++++----------- crates/networking/p2p/rlpx/error.rs | 11 +- crates/networking/p2p/rlpx/handshake.rs | 8 +- crates/networking/p2p/rlpx/utils.rs | 4 +- 4 files changed, 134 insertions(+), 122 deletions(-) diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index 642535904..396cef1c7 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -12,7 +12,6 @@ use super::{ utils::{ecdh_xchng, pubkey2id}, }; use aes::cipher::KeyIvInit; -use bytes::BufMut as _; use ethereum_rust_core::{H256, H512}; use ethereum_rust_rlp::decode::RLPDecode; use ethereum_rust_storage::Store; @@ -22,7 +21,7 @@ use k256::{ }; use sha3::{Digest, Keccak256}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use tracing::{error, info}; +use tracing::info; const CAP_P2P: (Capability, u8) = (Capability::P2p, 5); const CAP_ETH: (Capability, u8) = (Capability::Eth, 68); //const CAP_SNAP: (Capability, u8) = (Capability::Snap, 1); @@ -81,12 +80,12 @@ impl RLPxConnection { pub async fn handshake(&mut self) -> Result<(), RLPxError> { match &self.state { RLPxConnectionState::Initiator(_) => { - self.send_auth().await; - self.receive_ack().await; + self.send_auth().await?; + self.receive_ack().await?; } RLPxConnectionState::Receiver(_) => { - self.receive_auth().await; - self.send_ack().await; + self.receive_auth().await?; + self.send_ack().await?; } _ => { return Err(RLPxError::HandshakeError( @@ -106,10 +105,10 @@ impl RLPxConnection { PublicKey::from(self.signer.verifying_key()), )); - self.send(hello_msg).await; + self.send(hello_msg).await?; // Receive Hello message - match self.receive().await { + match self.receive().await? { Message::Hello(hello_message) => { self.capabilities = hello_message.capabilities; @@ -139,7 +138,7 @@ impl RLPxConnection { RLPxConnectionState::Established(_) => { info!("Started peer main loop"); loop { - match self.receive().await { + match self.receive().await? { // TODO: implement handlers for each message type Message::Disconnect(_) => info!("Received Disconnect"), Message::Ping(_) => info!("Received Ping"), @@ -150,9 +149,7 @@ impl RLPxConnection { }; } } - _ => Err(RLPxError::InvalidState( - "Invalid connection state".to_string(), - )), + _ => Err(RLPxError::InvalidState()), } } @@ -168,160 +165,164 @@ impl RLPxConnection { // Sending eth Status if peer supports it if self.capabilities.contains(&CAP_ETH) { let status = backend::get_status(&self.storage).unwrap(); - self.send(Message::Status(status)).await; + self.send(Message::Status(status)).await?; } // TODO: add new capabilities startup when required (eg. snap) Ok(()) } - async fn send_auth(&mut self) { + async fn send_auth(&mut self) -> Result<(), RLPxError> { match &self.state { RLPxConnectionState::Initiator(initiator_state) => { let secret_key: SecretKey = self.signer.clone().into(); - let peer_pk = id2pubkey(initiator_state.remote_node_id).unwrap(); + let peer_pk = + id2pubkey(initiator_state.remote_node_id).ok_or(RLPxError::InvalidPeerId())?; + + // Clonning previous state to avoid ownership issues + let previous_state = initiator_state.clone(); - let mut auth_message = vec![]; let msg = encode_auth_message( &secret_key, - initiator_state.nonce, + previous_state.nonce, &peer_pk, - &initiator_state.ephemeral_key, + &previous_state.ephemeral_key, ); - auth_message.put_slice(&msg); - self.stream.write_all(&auth_message).await.unwrap(); + self.send_handshake_msg(&msg).await?; - self.state = RLPxConnectionState::InitiatedAuth(InitiatedAuth::new( - initiator_state, - auth_message, - )) + self.state = + RLPxConnectionState::InitiatedAuth(InitiatedAuth::new(previous_state, msg)); + Ok(()) } - // TODO proper error - _ => panic!("Invalid state to send auth message"), - }; + _ => Err(RLPxError::InvalidState()), + } } - async fn send_ack(&mut self) { + async fn send_ack(&mut self) -> Result<(), RLPxError> { match &self.state { RLPxConnectionState::ReceivedAuth(received_auth_state) => { - let peer_pk = id2pubkey(received_auth_state.remote_node_id).unwrap(); + let peer_pk = id2pubkey(received_auth_state.remote_node_id) + .ok_or(RLPxError::InvalidPeerId())?; + + // Clonning previous state to avoid ownership issues + let previous_state = received_auth_state.clone(); - let mut ack_message = vec![]; let msg = encode_ack_message( - &received_auth_state.local_ephemeral_key, - received_auth_state.local_nonce, + &previous_state.local_ephemeral_key, + previous_state.local_nonce, &peer_pk, ); - ack_message.put_slice(&msg); - self.stream.write_all(&ack_message).await.unwrap(); + self.send_handshake_msg(&msg).await?; self.state = RLPxConnectionState::Established(Box::new(Established::for_receiver( - received_auth_state, - ack_message, - ))) + previous_state, + msg, + ))); + Ok(()) } - // TODO proper error - _ => panic!("Invalid state to send ack message"), - }; + _ => Err(RLPxError::InvalidState()), + } } - async fn receive_auth(&mut self) { + async fn receive_auth(&mut self) -> Result<(), RLPxError> { match &self.state { RLPxConnectionState::Receiver(receiver_state) => { let secret_key: SecretKey = self.signer.clone().into(); - let mut buf = vec![0; MAX_DISC_PACKET_SIZE]; - - // Read the auth message's size - self.stream.read_exact(&mut buf[..2]).await.unwrap(); - let auth_data = buf[..2].try_into().unwrap(); - let msg_size = u16::from_be_bytes(auth_data) as usize; - - // Read the rest of the auth message - self.stream - .read_exact(&mut buf[2..msg_size + 2]) - .await - .unwrap(); - let auth_bytes = &buf[..msg_size + 2]; - let msg = &buf[2..msg_size + 2]; - let (auth, remote_ephemeral_key) = decode_auth_message(&secret_key, msg, auth_data); + // Clonning previous state to avoid ownership issues + let previous_state = receiver_state.clone(); + let msg_bytes = self.receive_handshake_msg().await?; + let size_data = &msg_bytes[..2]; + let msg = &msg_bytes[2..]; + let (auth, remote_ephemeral_key) = decode_auth_message(&secret_key, msg, size_data); // Build next state self.state = RLPxConnectionState::ReceivedAuth(ReceivedAuth::new( - receiver_state, + previous_state, auth.node_id, - auth_bytes.to_owned(), + msg_bytes.to_owned(), auth.nonce, remote_ephemeral_key, - )) + )); + Ok(()) } - // TODO proper error - _ => panic!("Received an unexpected auth message"), - }; + _ => Err(RLPxError::InvalidState()), + } } - async fn receive_ack(&mut self) { + async fn receive_ack(&mut self) -> Result<(), RLPxError> { match &self.state { RLPxConnectionState::InitiatedAuth(initiated_auth_state) => { let secret_key: SecretKey = self.signer.clone().into(); - let mut buf = vec![0; MAX_DISC_PACKET_SIZE]; - - // Read the ack message's size - self.stream.read_exact(&mut buf[..2]).await.unwrap(); - let ack_data = buf[..2].try_into().unwrap(); - let msg_size = u16::from_be_bytes(ack_data) as usize; - - // Read the rest of the ack message - self.stream - .read_exact(&mut buf[2..msg_size + 2]) - .await - .unwrap(); - let ack_bytes = &buf[..msg_size + 2]; - let msg = &buf[2..msg_size + 2]; - let ack = decode_ack_message(&secret_key, msg, ack_data); + // Clonning previous state to avoid ownership issues + let previous_state = initiated_auth_state.clone(); + let msg_bytes = self.receive_handshake_msg().await?; + let size_data = &msg_bytes[..2]; + let msg = &msg_bytes[2..]; + let ack = decode_ack_message(&secret_key, msg, size_data); let remote_ephemeral_key = ack.get_ephemeral_pubkey().unwrap(); - // Build next state - self.state = RLPxConnectionState::Established(Box::new(Established::for_initiator( - initiated_auth_state, - ack_bytes.to_owned(), - ack.nonce, - remote_ephemeral_key, - ))) + self.state = + RLPxConnectionState::Established(Box::new(Established::for_initiator( + previous_state, + msg_bytes.to_owned(), + ack.nonce, + remote_ephemeral_key, + ))); + Ok(()) } - // TODO proper error - _ => panic!("Received an unexpected ack message"), - }; + _ => Err(RLPxError::InvalidState()), + } } - async fn send(&mut self, message: rlpx::Message) { + async fn send_handshake_msg(&mut self, msg: &[u8]) -> Result<(), RLPxError> { + self.stream + .write_all(msg) + .await + .map_err(|_| RLPxError::HandshakeError("Could not send message".to_string()))?; + Ok(()) + } + + async fn receive_handshake_msg(&mut self) -> Result, RLPxError> { + let mut buf = vec![0; MAX_DISC_PACKET_SIZE]; + + // Read the message's size + self.stream + .read_exact(&mut buf[..2]) + .await + .map_err(|_| RLPxError::HandshakeError("Connection dropped".to_string()))?; + let ack_data = [buf[0], buf[1]]; + let msg_size = u16::from_be_bytes(ack_data) as usize; + + // Read the rest of the message + self.stream + .read_exact(&mut buf[2..msg_size + 2]) + .await + .map_err(|_| RLPxError::HandshakeError("Connection dropped".to_string()))?; + let ack_bytes = &buf[..msg_size + 2]; + Ok(ack_bytes.to_vec()) + } + + async fn send(&mut self, message: rlpx::Message) -> Result<(), RLPxError> { match &mut self.state { RLPxConnectionState::Established(state) => { let mut frame_buffer = vec![]; - match message.encode(&mut frame_buffer) { - Ok(_) => {} - Err(e) => { - // TODO: better error handling - error!("Failed to encode message: {:?}", e); - } - }; + message.encode(&mut frame_buffer)?; frame::write(frame_buffer, state, &mut self.stream).await; + Ok(()) } - // TODO proper error - _ => panic!("Invalid state to send message"), + _ => Err(RLPxError::InvalidState()), } } - async fn receive(&mut self) -> rlpx::Message { + async fn receive(&mut self) -> Result { match &mut self.state { RLPxConnectionState::Established(state) => { let frame_data = frame::read(state, &mut self.stream).await; - let (msg_id, msg_data): (u8, _) = - RLPDecode::decode_unfinished(&frame_data).unwrap(); - rlpx::Message::decode(msg_id, msg_data).unwrap() + let (msg_id, msg_data): (u8, _) = RLPDecode::decode_unfinished(&frame_data)?; + Ok(rlpx::Message::decode(msg_id, msg_data)?) } - // TODO proper error - _ => panic!("Received an unexpected message"), + _ => Err(RLPxError::InvalidState()), } } } @@ -334,6 +335,7 @@ enum RLPxConnectionState { Established(Box), } +#[derive(Clone)] struct Receiver { pub(crate) nonce: H256, pub(crate) ephemeral_key: SecretKey, @@ -348,6 +350,7 @@ impl Receiver { } } +#[derive(Clone)] struct Initiator { pub(crate) nonce: H256, pub(crate) ephemeral_key: SecretKey, @@ -364,6 +367,7 @@ impl Initiator { } } +#[derive(Clone)] struct ReceivedAuth { pub(crate) local_nonce: H256, pub(crate) local_ephemeral_key: SecretKey, @@ -375,7 +379,7 @@ struct ReceivedAuth { impl ReceivedAuth { pub fn new( - previous_state: &Receiver, + previous_state: Receiver, remote_node_id: H512, remote_init_message: Vec, remote_nonce: H256, @@ -383,7 +387,7 @@ impl ReceivedAuth { ) -> Self { Self { local_nonce: previous_state.nonce, - local_ephemeral_key: previous_state.ephemeral_key.clone(), + local_ephemeral_key: previous_state.ephemeral_key, remote_node_id, remote_nonce, remote_ephemeral_key, @@ -392,6 +396,7 @@ impl ReceivedAuth { } } +#[derive(Clone)] struct InitiatedAuth { pub(crate) remote_node_id: H512, pub(crate) local_nonce: H256, @@ -400,11 +405,11 @@ struct InitiatedAuth { } impl InitiatedAuth { - pub fn new(previous_state: &Initiator, local_init_message: Vec) -> Self { + pub fn new(previous_state: Initiator, local_init_message: Vec) -> Self { Self { remote_node_id: previous_state.remote_node_id, local_nonce: previous_state.nonce, - local_ephemeral_key: previous_state.ephemeral_key.clone(), + local_ephemeral_key: previous_state.ephemeral_key, local_init_message, } } @@ -420,7 +425,7 @@ pub struct Established { } impl Established { - fn for_receiver(previous_state: &ReceivedAuth, init_message: Vec) -> Self { + fn for_receiver(previous_state: ReceivedAuth, init_message: Vec) -> Self { // keccak256(nonce || initiator-nonce) // Remote node is initator let hashed_nonces = Keccak256::digest( @@ -432,16 +437,16 @@ impl Established { previous_state.remote_node_id, init_message, previous_state.local_nonce, - previous_state.local_ephemeral_key.clone(), + previous_state.local_ephemeral_key, hashed_nonces, - previous_state.remote_init_message.clone(), + previous_state.remote_init_message, previous_state.remote_nonce, previous_state.remote_ephemeral_key, ) } fn for_initiator( - previous_state: &InitiatedAuth, + previous_state: InitiatedAuth, remote_init_message: Vec, remote_nonce: H256, remote_ephemeral_key: PublicKey, @@ -453,9 +458,9 @@ impl Established { Self::new( previous_state.remote_node_id, - previous_state.local_init_message.clone(), + previous_state.local_init_message, previous_state.local_nonce, - previous_state.local_ephemeral_key.clone(), + previous_state.local_ephemeral_key, hashed_nonces, remote_init_message, remote_nonce, diff --git a/crates/networking/p2p/rlpx/error.rs b/crates/networking/p2p/rlpx/error.rs index 4177ea10f..553b3ea98 100644 --- a/crates/networking/p2p/rlpx/error.rs +++ b/crates/networking/p2p/rlpx/error.rs @@ -1,4 +1,5 @@ use crate::rlpx::message::Message; +use ethereum_rust_rlp::error::{RLPDecodeError, RLPEncodeError}; use thiserror::Error; // TODO improve errors @@ -6,8 +7,14 @@ use thiserror::Error; pub(crate) enum RLPxError { #[error("{0}")] HandshakeError(String), - #[error("{0}")] - InvalidState(String), + #[error("Invalid connection state")] + InvalidState(), + #[error("Decode Error: {0}")] + DecodeError(#[from] RLPDecodeError), + #[error("Encode Error: {0}")] + EncodeError(#[from] RLPEncodeError), + #[error("Invalid peer id")] + InvalidPeerId(), #[error("Unexpected message: {0}")] UnexpectedMessage(Message), } diff --git a/crates/networking/p2p/rlpx/handshake.rs b/crates/networking/p2p/rlpx/handshake.rs index c88941ca5..740e1f399 100644 --- a/crates/networking/p2p/rlpx/handshake.rs +++ b/crates/networking/p2p/rlpx/handshake.rs @@ -48,7 +48,7 @@ pub fn encode_auth_message( pub(crate) fn decode_auth_message( static_key: &SecretKey, msg: &[u8], - auth_data: [u8; 2], + auth_data: &[u8], ) -> (AuthMessage, PublicKey) { let payload = decrypt_message(static_key, msg, auth_data); @@ -82,7 +82,7 @@ pub fn encode_ack_message( pub(crate) fn decode_ack_message( static_key: &SecretKey, msg: &[u8], - auth_data: [u8; 2], + auth_data: &[u8], ) -> AckMessage { let payload = decrypt_message(static_key, msg, auth_data); @@ -92,7 +92,7 @@ pub(crate) fn decode_ack_message( ack } -fn decrypt_message(static_key: &SecretKey, msg: &[u8], auth_data: [u8; 2]) -> Vec { +fn decrypt_message(static_key: &SecretKey, msg: &[u8], size_data: &[u8]) -> Vec { // Split the message into its components. General layout is: // public-key (65) || iv (16) || ciphertext || mac (32) let (pk, rest) = msg.split_at(65); @@ -109,7 +109,7 @@ fn decrypt_message(static_key: &SecretKey, msg: &[u8], auth_data: [u8; 2]) -> Ve let mac_key = sha256(&buf[16..]); // Verify the MAC. - let expected_d = sha256_hmac(&mac_key, &[iv, c], &auth_data); + let expected_d = sha256_hmac(&mac_key, &[iv, c], size_data); assert_eq!(d, expected_d); // Decrypt the message with the AES key. diff --git a/crates/networking/p2p/rlpx/utils.rs b/crates/networking/p2p/rlpx/utils.rs index 062c8ee1d..a2118a9d7 100644 --- a/crates/networking/p2p/rlpx/utils.rs +++ b/crates/networking/p2p/rlpx/utils.rs @@ -11,7 +11,7 @@ pub fn sha256(data: &[u8]) -> [u8; 32] { k256::sha2::Sha256::digest(data).into() } -pub fn sha256_hmac(key: &[u8], inputs: &[&[u8]], auth_data: &[u8]) -> [u8; 32] { +pub fn sha256_hmac(key: &[u8], inputs: &[&[u8]], size_data: &[u8]) -> [u8; 32] { use hmac::Mac; use k256::sha2::Sha256; @@ -19,7 +19,7 @@ pub fn sha256_hmac(key: &[u8], inputs: &[&[u8]], auth_data: &[u8]) -> [u8; 32] { for input in inputs { hasher.update(input); } - hasher.update(auth_data); + hasher.update(size_data); hasher.finalize().into_bytes().into() } From 4f95e552a352ea107c445020c169a28f4ba69826 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Thu, 31 Oct 2024 09:41:03 -0300 Subject: [PATCH 02/12] Refactored connection.rs to remove unhandled unwraps() --- crates/networking/p2p/net.rs | 20 ++++++++++----- crates/networking/p2p/rlpx/connection.rs | 31 ++++++++++++++--------- crates/networking/p2p/rlpx/error.rs | 15 ++++++++--- crates/networking/p2p/rlpx/eth/backend.rs | 18 +++++++++---- 4 files changed, 58 insertions(+), 26 deletions(-) diff --git a/crates/networking/p2p/net.rs b/crates/networking/p2p/net.rs index fd512c138..3425fe423 100644 --- a/crates/networking/p2p/net.rs +++ b/crates/networking/p2p/net.rs @@ -24,7 +24,7 @@ use tokio::{ sync::Mutex, try_join, }; -use tracing::{debug, info}; +use tracing::{debug, error, info}; use types::{Endpoint, Node}; pub mod bootnode; @@ -773,8 +773,12 @@ async fn handle_peer_as_initiator( .connect(SocketAddr::new(node.ip, node.tcp_port)) .await .unwrap(); - let conn = RLPxConnection::initiator(signer, msg, stream, storage).await; - handle_peer(conn, table).await; + match RLPxConnection::initiator(signer, msg, stream, storage).await { + Ok(conn) => handle_peer(conn, table).await, + Err(e) => { + error!("Error: {e}, Could not start connection with {node:?}"); + } + } } async fn handle_peer(mut conn: RLPxConnection, table: Arc>) { @@ -784,9 +788,13 @@ async fn handle_peer(mut conn: RLPxConnection, table: Arc info!("Error during RLPx connection: ({e})"), }, Err(e) => { - // Discard peer from kademlia table - info!("Handshake failed, discarding peer: ({e})"); - table.lock().await.replace_peer(conn.get_remote_node_id()); + if let Ok(node_id) = conn.get_remote_node_id() { + // Discard peer from kademlia table + info!("Handshake failed: ({e}), discarding peer {node_id}"); + table.lock().await.replace_peer(node_id); + } else { + info!("Handshake failed: ({e}), unknown peer"); + } } } } diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index 396cef1c7..04087bfae 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -63,18 +63,23 @@ impl RLPxConnection { ) } - pub async fn initiator(signer: SigningKey, msg: &[u8], stream: S, storage: Store) -> Self { + pub async fn initiator( + signer: SigningKey, + msg: &[u8], + stream: S, + storage: Store, + ) -> Result { let mut rng = rand::thread_rng(); let digest = Keccak256::digest(&msg[65..]); - let signature = &Signature::from_bytes(msg[..64].into()).unwrap(); - let rid = RecoveryId::from_byte(msg[64]).unwrap(); - let peer_pk = VerifyingKey::recover_from_prehash(&digest, signature, rid).unwrap(); + let signature = &Signature::from_bytes(msg[..64].into())?; + let rid = RecoveryId::from_byte(msg[64]).ok_or(RLPxError::InvalidRecoveryId())?; + let peer_pk = VerifyingKey::recover_from_prehash(&digest, signature, rid)?; let state = RLPxConnectionState::Initiator(Initiator::new( H256::random_using(&mut rng), SecretKey::random(&mut rng), pubkey2id(&peer_pk.into()), )); - RLPxConnection::new(signer, stream, state, storage) + Ok(RLPxConnection::new(signer, stream, state, storage)) } pub async fn handshake(&mut self) -> Result<(), RLPxError> { @@ -140,12 +145,13 @@ impl RLPxConnection { loop { match self.receive().await? { // TODO: implement handlers for each message type + // https://github.com/lambdaclass/lambda_ethereum_rust/issues/1030 Message::Disconnect(_) => info!("Received Disconnect"), Message::Ping(_) => info!("Received Ping"), Message::Pong(_) => info!("Received Pong"), Message::Status(_) => info!("Received Status"), // TODO: Add new message types and handlers as they are implemented - message => return Err(RLPxError::UnexpectedMessage(message)), + _ => return Err(RLPxError::MessageNotHandled()), }; } } @@ -153,18 +159,17 @@ impl RLPxConnection { } } - pub fn get_remote_node_id(&self) -> H512 { + pub fn get_remote_node_id(&self) -> Result { match &self.state { - RLPxConnectionState::Established(state) => state.remote_node_id, - // TODO proper error - _ => panic!("Invalid state"), + RLPxConnectionState::Established(state) => Ok(state.remote_node_id), + _ => Err(RLPxError::InvalidState()), } } async fn start_capabilities(&mut self) -> Result<(), RLPxError> { // Sending eth Status if peer supports it if self.capabilities.contains(&CAP_ETH) { - let status = backend::get_status(&self.storage).unwrap(); + let status = backend::get_status(&self.storage)?; self.send(Message::Status(status)).await?; } // TODO: add new capabilities startup when required (eg. snap) @@ -260,7 +265,9 @@ impl RLPxConnection { let size_data = &msg_bytes[..2]; let msg = &msg_bytes[2..]; let ack = decode_ack_message(&secret_key, msg, size_data); - let remote_ephemeral_key = ack.get_ephemeral_pubkey().unwrap(); + let remote_ephemeral_key = ack + .get_ephemeral_pubkey() + .ok_or(RLPxError::NotFound("Remote ephemeral key".to_string()))?; // Build next state self.state = RLPxConnectionState::Established(Box::new(Established::for_initiator( diff --git a/crates/networking/p2p/rlpx/error.rs b/crates/networking/p2p/rlpx/error.rs index 553b3ea98..565af0fc1 100644 --- a/crates/networking/p2p/rlpx/error.rs +++ b/crates/networking/p2p/rlpx/error.rs @@ -1,5 +1,6 @@ -use crate::rlpx::message::Message; use ethereum_rust_rlp::error::{RLPDecodeError, RLPEncodeError}; +use ethereum_rust_storage::error::StoreError; +use k256::ecdsa::Error as EcdsaError; use thiserror::Error; // TODO improve errors @@ -13,8 +14,16 @@ pub(crate) enum RLPxError { DecodeError(#[from] RLPDecodeError), #[error("Encode Error: {0}")] EncodeError(#[from] RLPEncodeError), + #[error("Store Error: {0}")] + StoreError(#[from] StoreError), + #[error("Not Found: {0}")] + NotFound(String), #[error("Invalid peer id")] InvalidPeerId(), - #[error("Unexpected message: {0}")] - UnexpectedMessage(Message), + #[error("Cryptography Error: {0}")] + CryptographyError(#[from] EcdsaError), + #[error("Invalid recovery id")] + InvalidRecoveryId(), + #[error("Cannot handle message")] + MessageNotHandled(), } diff --git a/crates/networking/p2p/rlpx/eth/backend.rs b/crates/networking/p2p/rlpx/eth/backend.rs index 1af62214a..df1cb7c28 100644 --- a/crates/networking/p2p/rlpx/eth/backend.rs +++ b/crates/networking/p2p/rlpx/eth/backend.rs @@ -1,19 +1,27 @@ use ethereum_rust_core::{types::ForkId, U256}; -use ethereum_rust_storage::{error::StoreError, Store}; +use ethereum_rust_storage::Store; + +use crate::rlpx::error::RLPxError; use super::status::StatusMessage; pub const ETH_VERSION: u32 = 68; -pub fn get_status(storage: &Store) -> Result { +pub fn get_status(storage: &Store) -> Result { let chain_config = storage.get_chain_config()?; let total_difficulty = U256::from(chain_config.terminal_total_difficulty.unwrap_or_default()); let network_id = chain_config.chain_id; // These blocks must always be available - let genesis_header = storage.get_block_header(0)?.unwrap(); - let block_number = storage.get_latest_block_number()?.unwrap(); - let block_header = storage.get_block_header(block_number)?.unwrap(); + let genesis_header = storage + .get_block_header(0)? + .ok_or(RLPxError::NotFound("Genesis Block".to_string()))?; + let block_number = storage + .get_latest_block_number()? + .ok_or(RLPxError::NotFound("Latest Block Number".to_string()))?; + let block_header = storage + .get_block_header(block_number)? + .ok_or(RLPxError::NotFound(format!("Block {block_number}")))?; let genesis = genesis_header.compute_block_hash(); let block_hash = block_header.compute_block_hash(); From 17903aad18d5d3b872c60597e4425cc105709319 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Thu, 31 Oct 2024 10:52:32 -0300 Subject: [PATCH 03/12] Fixed ack decoding test and moved to proper module --- crates/networking/p2p/rlpx/connection.rs | 46 ------------------------ crates/networking/p2p/rlpx/handshake.rs | 42 ++++++++++++++++++++++ 2 files changed, 42 insertions(+), 46 deletions(-) diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index 04087bfae..1c10d2d45 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -519,49 +519,3 @@ impl Established { } } } - -// TODO fix this test now that RLPxClient does no longer exist -// https://github.com/lambdaclass/lambda_ethereum_rust/issues/843 -#[cfg(test)] -mod tests { - // use hex_literal::hex; - // use k256::SecretKey; - - #[test] - fn test_ack_decoding() { - // // This is the Ack₂ message from EIP-8. - // let msg = hex!("01ea0451958701280a56482929d3b0757da8f7fbe5286784beead59d95089c217c9b917788989470b0e330cc6e4fb383c0340ed85fab836ec9fb8a49672712aeabbdfd1e837c1ff4cace34311cd7f4de05d59279e3524ab26ef753a0095637ac88f2b499b9914b5f64e143eae548a1066e14cd2f4bd7f814c4652f11b254f8a2d0191e2f5546fae6055694aed14d906df79ad3b407d94692694e259191cde171ad542fc588fa2b7333313d82a9f887332f1dfc36cea03f831cb9a23fea05b33deb999e85489e645f6aab1872475d488d7bd6c7c120caf28dbfc5d6833888155ed69d34dbdc39c1f299be1057810f34fbe754d021bfca14dc989753d61c413d261934e1a9c67ee060a25eefb54e81a4d14baff922180c395d3f998d70f46f6b58306f969627ae364497e73fc27f6d17ae45a413d322cb8814276be6ddd13b885b201b943213656cde498fa0e9ddc8e0b8f8a53824fbd82254f3e2c17e8eaea009c38b4aa0a3f306e8797db43c25d68e86f262e564086f59a2fc60511c42abfb3057c247a8a8fe4fb3ccbadde17514b7ac8000cdb6a912778426260c47f38919a91f25f4b5ffb455d6aaaf150f7e5529c100ce62d6d92826a71778d809bdf60232ae21ce8a437eca8223f45ac37f6487452ce626f549b3b5fdee26afd2072e4bc75833c2464c805246155289f4"); - - // let static_key = hex!("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee"); - // let nonce = hex!("7e968bba13b6c50e2c4cd7f241cc0d64d1ac25c7f5952df231ac6a2bda8ee5d6"); - // let ephemeral_key = - // hex!("869d6ecf5211f1cc60418a13b9d870b22959d0c16f02bec714c960dd2298a32d"); - - // let mut client = RLPxClient::new( - // true, - // nonce.into(), - // SecretKey::from_slice(&ephemeral_key).unwrap(), - // ); - - // assert_eq!( - // &client.local_ephemeral_key.to_bytes()[..], - // &ephemeral_key[..] - // ); - // assert_eq!(client.local_nonce.0, nonce); - - // let auth_data = msg[..2].try_into().unwrap(); - - // client.local_init_message = Some(vec![]); - - // let state = client.decode_ack_message( - // &SecretKey::from_slice(&static_key).unwrap(), - // &msg[2..], - // auth_data, - // ); - - // let expected_mac_secret = - // hex!("2ea74ec5dae199227dff1af715362700e989d889d7a493cb0639691efb8e5f98"); - - // assert_eq!(state.mac_key.0, expected_mac_secret); - } -} diff --git a/crates/networking/p2p/rlpx/handshake.rs b/crates/networking/p2p/rlpx/handshake.rs index 740e1f399..11f42993e 100644 --- a/crates/networking/p2p/rlpx/handshake.rs +++ b/crates/networking/p2p/rlpx/handshake.rs @@ -305,3 +305,45 @@ impl RLPDecode for AckMessage { Ok((this, rest)) } } + +// TODO fix this test now that RLPxClient does no longer exist +// https://github.com/lambdaclass/lambda_ethereum_rust/issues/843 +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use ethereum_rust_core::H256; + use hex_literal::hex; + use k256::SecretKey; + + use crate::rlpx::{handshake::decode_ack_message, utils::pubkey2id}; + + #[test] + fn test_ack_decoding() { + // This is the Ack₂ message from EIP-8. + // https://github.com/ethereum/EIPs/blob/master/EIPS/eip-8.md + let msg = hex!("01ea0451958701280a56482929d3b0757da8f7fbe5286784beead59d95089c217c9b917788989470b0e330cc6e4fb383c0340ed85fab836ec9fb8a49672712aeabbdfd1e837c1ff4cace34311cd7f4de05d59279e3524ab26ef753a0095637ac88f2b499b9914b5f64e143eae548a1066e14cd2f4bd7f814c4652f11b254f8a2d0191e2f5546fae6055694aed14d906df79ad3b407d94692694e259191cde171ad542fc588fa2b7333313d82a9f887332f1dfc36cea03f831cb9a23fea05b33deb999e85489e645f6aab1872475d488d7bd6c7c120caf28dbfc5d6833888155ed69d34dbdc39c1f299be1057810f34fbe754d021bfca14dc989753d61c413d261934e1a9c67ee060a25eefb54e81a4d14baff922180c395d3f998d70f46f6b58306f969627ae364497e73fc27f6d17ae45a413d322cb8814276be6ddd13b885b201b943213656cde498fa0e9ddc8e0b8f8a53824fbd82254f3e2c17e8eaea009c38b4aa0a3f306e8797db43c25d68e86f262e564086f59a2fc60511c42abfb3057c247a8a8fe4fb3ccbadde17514b7ac8000cdb6a912778426260c47f38919a91f25f4b5ffb455d6aaaf150f7e5529c100ce62d6d92826a71778d809bdf60232ae21ce8a437eca8223f45ac37f6487452ce626f549b3b5fdee26afd2072e4bc75833c2464c805246155289f4"); + let static_key_a = hex!("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee"); + let ephemeral_key_b = + hex!("e238eb8e04fee6511ab04c6dd3c89ce097b11f25d584863ac2b6d5b35b1847e4"); + let expected_ephemeral_key_b = pubkey2id( + &SecretKey::from_slice(&ephemeral_key_b) + .unwrap() + .public_key(), + ); + + let ack = decode_ack_message( + &SecretKey::from_slice(&static_key_a).unwrap(), + &msg[2..], + &msg[..2], + ); + + assert_eq!(ack.ephemeral_pubkey, expected_ephemeral_key_b); + assert_eq!( + ack.nonce, + H256::from_str("559aead08264d5795d3909718cdd05abd49572e84fe55590eef31a88a08fdffd") + .unwrap() + ); + assert_eq!(ack.version, 4u8); + } +} From 132b34207d3b344457430f93196c532cfd6f757c Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Thu, 31 Oct 2024 10:56:42 -0300 Subject: [PATCH 04/12] Better format for test --- crates/networking/p2p/rlpx/handshake.rs | 31 ++++++++++++------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/crates/networking/p2p/rlpx/handshake.rs b/crates/networking/p2p/rlpx/handshake.rs index 11f42993e..de49736db 100644 --- a/crates/networking/p2p/rlpx/handshake.rs +++ b/crates/networking/p2p/rlpx/handshake.rs @@ -323,27 +323,26 @@ mod tests { // This is the Ack₂ message from EIP-8. // https://github.com/ethereum/EIPs/blob/master/EIPS/eip-8.md let msg = hex!("01ea0451958701280a56482929d3b0757da8f7fbe5286784beead59d95089c217c9b917788989470b0e330cc6e4fb383c0340ed85fab836ec9fb8a49672712aeabbdfd1e837c1ff4cace34311cd7f4de05d59279e3524ab26ef753a0095637ac88f2b499b9914b5f64e143eae548a1066e14cd2f4bd7f814c4652f11b254f8a2d0191e2f5546fae6055694aed14d906df79ad3b407d94692694e259191cde171ad542fc588fa2b7333313d82a9f887332f1dfc36cea03f831cb9a23fea05b33deb999e85489e645f6aab1872475d488d7bd6c7c120caf28dbfc5d6833888155ed69d34dbdc39c1f299be1057810f34fbe754d021bfca14dc989753d61c413d261934e1a9c67ee060a25eefb54e81a4d14baff922180c395d3f998d70f46f6b58306f969627ae364497e73fc27f6d17ae45a413d322cb8814276be6ddd13b885b201b943213656cde498fa0e9ddc8e0b8f8a53824fbd82254f3e2c17e8eaea009c38b4aa0a3f306e8797db43c25d68e86f262e564086f59a2fc60511c42abfb3057c247a8a8fe4fb3ccbadde17514b7ac8000cdb6a912778426260c47f38919a91f25f4b5ffb455d6aaaf150f7e5529c100ce62d6d92826a71778d809bdf60232ae21ce8a437eca8223f45ac37f6487452ce626f549b3b5fdee26afd2072e4bc75833c2464c805246155289f4"); - let static_key_a = hex!("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee"); - let ephemeral_key_b = - hex!("e238eb8e04fee6511ab04c6dd3c89ce097b11f25d584863ac2b6d5b35b1847e4"); + let static_key_a = SecretKey::from_slice(&hex!( + "49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee" + )) + .unwrap(); + + let expected_nonce_b = + H256::from_str("559aead08264d5795d3909718cdd05abd49572e84fe55590eef31a88a08fdffd") + .unwrap(); let expected_ephemeral_key_b = pubkey2id( - &SecretKey::from_slice(&ephemeral_key_b) - .unwrap() - .public_key(), + &SecretKey::from_slice(&hex!( + "e238eb8e04fee6511ab04c6dd3c89ce097b11f25d584863ac2b6d5b35b1847e4" + )) + .unwrap() + .public_key(), ); - let ack = decode_ack_message( - &SecretKey::from_slice(&static_key_a).unwrap(), - &msg[2..], - &msg[..2], - ); + let ack = decode_ack_message(&static_key_a, &msg[2..], &msg[..2]); assert_eq!(ack.ephemeral_pubkey, expected_ephemeral_key_b); - assert_eq!( - ack.nonce, - H256::from_str("559aead08264d5795d3909718cdd05abd49572e84fe55590eef31a88a08fdffd") - .unwrap() - ); + assert_eq!(ack.nonce, expected_nonce_b); assert_eq!(ack.version, 4u8); } } From 788af33fb84be1c64cab19064a7a249f67fd65b0 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Thu, 31 Oct 2024 11:05:05 -0300 Subject: [PATCH 05/12] Removed outdated comment --- crates/networking/p2p/rlpx/handshake.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/networking/p2p/rlpx/handshake.rs b/crates/networking/p2p/rlpx/handshake.rs index de49736db..87895602f 100644 --- a/crates/networking/p2p/rlpx/handshake.rs +++ b/crates/networking/p2p/rlpx/handshake.rs @@ -306,8 +306,6 @@ impl RLPDecode for AckMessage { } } -// TODO fix this test now that RLPxClient does no longer exist -// https://github.com/lambdaclass/lambda_ethereum_rust/issues/843 #[cfg(test)] mod tests { use std::str::FromStr; From 7609609e69f0b3e27b3f32d217890e7d96a19227 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Thu, 31 Oct 2024 14:20:48 -0300 Subject: [PATCH 06/12] Updated rlpx/errors.rs --- crates/networking/p2p/rlpx/error.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/networking/p2p/rlpx/error.rs b/crates/networking/p2p/rlpx/error.rs index 565af0fc1..245c99595 100644 --- a/crates/networking/p2p/rlpx/error.rs +++ b/crates/networking/p2p/rlpx/error.rs @@ -10,20 +10,20 @@ pub(crate) enum RLPxError { HandshakeError(String), #[error("Invalid connection state")] InvalidState(), - #[error("Decode Error: {0}")] - DecodeError(#[from] RLPDecodeError), - #[error("Encode Error: {0}")] - EncodeError(#[from] RLPEncodeError), - #[error("Store Error: {0}")] - StoreError(#[from] StoreError), #[error("Not Found: {0}")] NotFound(String), #[error("Invalid peer id")] InvalidPeerId(), - #[error("Cryptography Error: {0}")] - CryptographyError(#[from] EcdsaError), #[error("Invalid recovery id")] InvalidRecoveryId(), #[error("Cannot handle message")] MessageNotHandled(), + #[error(transparent)] + RLPDecodeError(#[from] RLPDecodeError), + #[error(transparent)] + RLPEncodeError(#[from] RLPEncodeError), + #[error(transparent)] + StoreError(#[from] StoreError), + #[error(transparent)] + EcdsaError(#[from] EcdsaError), } From 2225526bee0785408149f53358115ce6f5d1d6be Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Thu, 31 Oct 2024 15:30:32 -0300 Subject: [PATCH 07/12] Updated rlpx/handshake.rs to handle internal errors --- crates/networking/p2p/rlpx/connection.rs | 9 +- crates/networking/p2p/rlpx/error.rs | 31 ++++++- crates/networking/p2p/rlpx/handshake.rs | 102 +++++++++++++---------- 3 files changed, 89 insertions(+), 53 deletions(-) diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index 1c10d2d45..a60bf538a 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -191,7 +191,7 @@ impl RLPxConnection { previous_state.nonce, &peer_pk, &previous_state.ephemeral_key, - ); + )?; self.send_handshake_msg(&msg).await?; @@ -216,7 +216,7 @@ impl RLPxConnection { &previous_state.local_ephemeral_key, previous_state.local_nonce, &peer_pk, - ); + )?; self.send_handshake_msg(&msg).await?; @@ -239,7 +239,8 @@ impl RLPxConnection { let msg_bytes = self.receive_handshake_msg().await?; let size_data = &msg_bytes[..2]; let msg = &msg_bytes[2..]; - let (auth, remote_ephemeral_key) = decode_auth_message(&secret_key, msg, size_data); + let (auth, remote_ephemeral_key) = + decode_auth_message(&secret_key, msg, size_data)?; // Build next state self.state = RLPxConnectionState::ReceivedAuth(ReceivedAuth::new( @@ -264,7 +265,7 @@ impl RLPxConnection { let msg_bytes = self.receive_handshake_msg().await?; let size_data = &msg_bytes[..2]; let msg = &msg_bytes[2..]; - let ack = decode_ack_message(&secret_key, msg, size_data); + let ack = decode_ack_message(&secret_key, msg, size_data)?; let remote_ephemeral_key = ack .get_ephemeral_pubkey() .ok_or(RLPxError::NotFound("Remote ephemeral key".to_string()))?; diff --git a/crates/networking/p2p/rlpx/error.rs b/crates/networking/p2p/rlpx/error.rs index 245c99595..0367b598d 100644 --- a/crates/networking/p2p/rlpx/error.rs +++ b/crates/networking/p2p/rlpx/error.rs @@ -1,6 +1,5 @@ use ethereum_rust_rlp::error::{RLPDecodeError, RLPEncodeError}; use ethereum_rust_storage::error::StoreError; -use k256::ecdsa::Error as EcdsaError; use thiserror::Error; // TODO improve errors @@ -24,6 +23,32 @@ pub(crate) enum RLPxError { RLPEncodeError(#[from] RLPEncodeError), #[error(transparent)] StoreError(#[from] StoreError), - #[error(transparent)] - EcdsaError(#[from] EcdsaError), + #[error("Error in cryptographic library: {0}")] + CryptographyError(String), +} + +// Grouping all cryptographic related errors in a single CryptographicError variant +// We can improve this to individual errors if required +impl From for RLPxError { + fn from(e: k256::ecdsa::Error) -> Self { + RLPxError::CryptographyError(e.to_string()) + } +} + +impl From for RLPxError { + fn from(e: k256::elliptic_curve::Error) -> Self { + RLPxError::CryptographyError(e.to_string()) + } +} + +impl From for RLPxError { + fn from(e: sha3::digest::InvalidLength) -> Self { + RLPxError::CryptographyError(e.to_string()) + } +} + +impl From for RLPxError { + fn from(e: aes::cipher::StreamCipherError) -> Self { + RLPxError::CryptographyError(e.to_string()) + } } diff --git a/crates/networking/p2p/rlpx/handshake.rs b/crates/networking/p2p/rlpx/handshake.rs index 87895602f..bc8e9e985 100644 --- a/crates/networking/p2p/rlpx/handshake.rs +++ b/crates/networking/p2p/rlpx/handshake.rs @@ -14,15 +14,17 @@ use k256::{ }; use rand::Rng; +use super::error::RLPxError; + type Aes128Ctr64BE = ctr::Ctr64BE; /// Encodes an Auth message, to start a handshake. -pub fn encode_auth_message( +pub(crate) fn encode_auth_message( static_key: &SecretKey, local_nonce: H256, remote_static_pubkey: &PublicKey, local_ephemeral_key: &SecretKey, -) -> Vec { +) -> Result, RLPxError> { let node_id = pubkey2id(&static_key.public_key()); // Derive a shared secret from the static keys. @@ -33,7 +35,7 @@ pub fn encode_auth_message( static_shared_secret.into(), local_nonce, local_ephemeral_key, - ); + )?; // Compose the auth message. let auth = AuthMessage::new(signature, node_id, local_nonce); @@ -49,26 +51,26 @@ pub(crate) fn decode_auth_message( static_key: &SecretKey, msg: &[u8], auth_data: &[u8], -) -> (AuthMessage, PublicKey) { - let payload = decrypt_message(static_key, msg, auth_data); +) -> Result<(AuthMessage, PublicKey), RLPxError> { + let payload = decrypt_message(static_key, msg, auth_data)?; // RLP-decode the message. - let (auth, _padding) = AuthMessage::decode_unfinished(&payload).unwrap(); + let (auth, _padding) = AuthMessage::decode_unfinished(&payload)?; // Derive a shared secret from the static keys. - let peer_pk = id2pubkey(auth.node_id).unwrap(); + let peer_pk = id2pubkey(auth.node_id).ok_or(RLPxError::NotFound("Node id".to_string()))?; let static_shared_secret = ecdh_xchng(static_key, &peer_pk); let remote_ephemeral_key = - retrieve_remote_ephemeral_key(static_shared_secret.into(), auth.nonce, auth.signature); - (auth, remote_ephemeral_key) + retrieve_remote_ephemeral_key(static_shared_secret.into(), auth.nonce, auth.signature)?; + Ok((auth, remote_ephemeral_key)) } /// Encodes an Ack message, to complete a handshake -pub fn encode_ack_message( +pub(crate) fn encode_ack_message( local_ephemeral_key: &SecretKey, local_nonce: H256, remote_static_pubkey: &PublicKey, -) -> Vec { +) -> Result, RLPxError> { // Compose the ack message. let ack_msg = AckMessage::new(pubkey2id(&local_ephemeral_key.public_key()), local_nonce); @@ -83,16 +85,20 @@ pub(crate) fn decode_ack_message( static_key: &SecretKey, msg: &[u8], auth_data: &[u8], -) -> AckMessage { - let payload = decrypt_message(static_key, msg, auth_data); +) -> Result { + let payload = decrypt_message(static_key, msg, auth_data)?; // RLP-decode the message. - let (ack, _padding) = AckMessage::decode_unfinished(&payload).unwrap(); + let (ack, _padding) = AckMessage::decode_unfinished(&payload)?; - ack + Ok(ack) } -fn decrypt_message(static_key: &SecretKey, msg: &[u8], size_data: &[u8]) -> Vec { +fn decrypt_message( + static_key: &SecretKey, + msg: &[u8], + size_data: &[u8], +) -> Result, RLPxError> { // Split the message into its components. General layout is: // public-key (65) || iv (16) || ciphertext || mac (32) let (pk, rest) = msg.split_at(65); @@ -100,7 +106,7 @@ fn decrypt_message(static_key: &SecretKey, msg: &[u8], size_data: &[u8]) -> Vec< let (c, d) = rest.split_at(rest.len() - 32); // Derive the message shared secret. - let shared_secret = ecdh_xchng(static_key, &PublicKey::from_sec1_bytes(pk).unwrap()); + let shared_secret = ecdh_xchng(static_key, &PublicKey::from_sec1_bytes(pk)?); // Derive the AES and MAC keys from the message shared secret. let mut buf = [0; 32]; @@ -113,13 +119,16 @@ fn decrypt_message(static_key: &SecretKey, msg: &[u8], size_data: &[u8]) -> Vec< assert_eq!(d, expected_d); // Decrypt the message with the AES key. - let mut stream_cipher = Aes128Ctr64BE::new_from_slices(aes_key, iv).unwrap(); + let mut stream_cipher = Aes128Ctr64BE::new_from_slices(aes_key, iv)?; let mut decoded = c.to_vec(); - stream_cipher.try_apply_keystream(&mut decoded).unwrap(); - decoded + stream_cipher.try_apply_keystream(&mut decoded)?; + Ok(decoded) } -pub fn encrypt_message(remote_static_pubkey: &PublicKey, mut encoded_msg: Vec) -> Vec { +pub(crate) fn encrypt_message( + remote_static_pubkey: &PublicKey, + mut encoded_msg: Vec, +) -> Result, RLPxError> { const SIGNATURE_SIZE: usize = 65; const IV_SIZE: usize = 16; const MAC_FOOTER_SIZE: usize = 32; @@ -133,7 +142,9 @@ pub fn encrypt_message(remote_static_pubkey: &PublicKey, mut encoded_msg: Vec PublicKey { +) -> Result { let signature_prehash = shared_secret ^ remote_nonce; - let sign = ecdsa::Signature::from_slice(&signature.to_fixed_bytes()[..64]).unwrap(); - let rid = RecoveryId::from_byte(signature[64]).unwrap(); + let sign = ecdsa::Signature::from_slice(&signature.to_fixed_bytes()[..64])?; + let rid = RecoveryId::from_byte(signature[64]).ok_or(RLPxError::InvalidRecoveryId())?; let ephemeral_key = - VerifyingKey::recover_from_prehash(signature_prehash.as_bytes(), &sign, rid).unwrap(); - ephemeral_key.into() + VerifyingKey::recover_from_prehash(signature_prehash.as_bytes(), &sign, rid)?; + Ok(ephemeral_key.into()) } fn sign_shared_secret( shared_secret: H256, local_nonce: H256, local_ephemeral_key: &SecretKey, -) -> Signature { +) -> Result { let signature_prehash = shared_secret ^ local_nonce; - let (signature, rid) = SigningKey::from(local_ephemeral_key) - .sign_prehash_recoverable(&signature_prehash.0) - .unwrap(); + let (signature, rid) = + SigningKey::from(local_ephemeral_key).sign_prehash_recoverable(&signature_prehash.0)?; let mut signature_bytes = [0; 65]; signature_bytes[..64].copy_from_slice(signature.to_bytes().as_slice()); signature_bytes[64] = rid.to_byte(); - signature_bytes.into() + Ok(signature_bytes.into()) } #[derive(Debug)] @@ -236,11 +246,11 @@ impl RLPEncode for AuthMessage { impl RLPDecode for AuthMessage { // NOTE: discards any extra data in the list after the known fields. fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> { - let decoder = Decoder::new(rlp).unwrap(); - let (signature, decoder) = decoder.decode_field("signature").unwrap(); - let (node_id, decoder) = decoder.decode_field("node_id").unwrap(); - let (nonce, decoder) = decoder.decode_field("nonce").unwrap(); - let (version, decoder) = decoder.decode_field("version").unwrap(); + let decoder = Decoder::new(rlp)?; + let (signature, decoder) = decoder.decode_field("signature")?; + let (node_id, decoder) = decoder.decode_field("node_id")?; + let (nonce, decoder) = decoder.decode_field("nonce")?; + let (version, decoder) = decoder.decode_field("version")?; let rest = decoder.finish_unchecked(); let this = Self { @@ -291,10 +301,10 @@ impl RLPEncode for AckMessage { impl RLPDecode for AckMessage { // NOTE: discards any extra data in the list after the known fields. fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> { - let decoder = Decoder::new(rlp).unwrap(); - let (ephemeral_pubkey, decoder) = decoder.decode_field("ephemeral_pubkey").unwrap(); - let (nonce, decoder) = decoder.decode_field("nonce").unwrap(); - let (version, decoder) = decoder.decode_field("version").unwrap(); + let decoder = Decoder::new(rlp)?; + let (ephemeral_pubkey, decoder) = decoder.decode_field("ephemeral_pubkey")?; + let (nonce, decoder) = decoder.decode_field("nonce")?; + let (version, decoder) = decoder.decode_field("version")?; let rest = decoder.finish_unchecked(); let this = Self { @@ -337,7 +347,7 @@ mod tests { .public_key(), ); - let ack = decode_ack_message(&static_key_a, &msg[2..], &msg[..2]); + let ack = decode_ack_message(&static_key_a, &msg[2..], &msg[..2]).unwrap(); assert_eq!(ack.ephemeral_pubkey, expected_ephemeral_key_b); assert_eq!(ack.nonce, expected_nonce_b); From 09c149a37526898d73102b6aad648f3424b610b5 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Tue, 5 Nov 2024 10:14:44 -0300 Subject: [PATCH 08/12] Changed function visibility --- crates/networking/p2p/rlpx/handshake.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/networking/p2p/rlpx/handshake.rs b/crates/networking/p2p/rlpx/handshake.rs index bc8e9e985..c976738c8 100644 --- a/crates/networking/p2p/rlpx/handshake.rs +++ b/crates/networking/p2p/rlpx/handshake.rs @@ -125,7 +125,7 @@ fn decrypt_message( Ok(decoded) } -pub(crate) fn encrypt_message( +fn encrypt_message( remote_static_pubkey: &PublicKey, mut encoded_msg: Vec, ) -> Result, RLPxError> { From 0f81e982b7690de09226a09759a860585c004d20 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Tue, 5 Nov 2024 16:46:31 -0300 Subject: [PATCH 09/12] Making p2p messages code consistent with the rest of the capabilities --- crates/networking/p2p/rlpx/connection.rs | 8 ++--- crates/networking/p2p/rlpx/eth/backend.rs | 6 ++-- crates/networking/p2p/rlpx/eth/status.rs | 44 +++++------------------ crates/networking/p2p/rlpx/message.rs | 25 ++++++++++--- crates/networking/p2p/rlpx/p2p.rs | 7 ---- 5 files changed, 35 insertions(+), 55 deletions(-) diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index b89938f39..8b5941363 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -171,23 +171,23 @@ impl RLPxConnection { id: msg_data.id, block_headers: msg_data.fetch_headers(&self.storage), }; - self.send(Message::BlockHeaders(response)).await; + self.send(Message::BlockHeaders(response)).await? } Message::GetBlockBodies(msg_data) => { let response = BlockBodies { id: msg_data.id, block_bodies: msg_data.fetch_blocks(&self.storage), }; - self.send(Message::BlockBodies(response)).await; + self.send(Message::BlockBodies(response)).await? } Message::GetStorageRanges(req) => { let response = process_storage_ranges_request(req, self.storage.clone())?; - self.send(Message::StorageRanges(response)).await + self.send(Message::StorageRanges(response)).await? } Message::GetByteCodes(req) => { let response = process_byte_codes_request(req, self.storage.clone())?; - self.send(Message::ByteCodes(response)).await + self.send(Message::ByteCodes(response)).await? } // TODO: Add new message types and handlers as they are implemented _ => return Err(RLPxError::MessageNotHandled()), diff --git a/crates/networking/p2p/rlpx/eth/backend.rs b/crates/networking/p2p/rlpx/eth/backend.rs index df1cb7c28..cc51bd8f2 100644 --- a/crates/networking/p2p/rlpx/eth/backend.rs +++ b/crates/networking/p2p/rlpx/eth/backend.rs @@ -26,12 +26,12 @@ pub fn get_status(storage: &Store) -> Result { let genesis = genesis_header.compute_block_hash(); let block_hash = block_header.compute_block_hash(); let fork_id = ForkId::new(chain_config, genesis, block_header.timestamp, block_number); - Ok(StatusMessage::new( - ETH_VERSION, + Ok(StatusMessage { + eth_version: ETH_VERSION, network_id, total_difficulty, block_hash, genesis, fork_id, - )) + }) } diff --git a/crates/networking/p2p/rlpx/eth/status.rs b/crates/networking/p2p/rlpx/eth/status.rs index 9050ac433..aae3c025d 100644 --- a/crates/networking/p2p/rlpx/eth/status.rs +++ b/crates/networking/p2p/rlpx/eth/status.rs @@ -4,7 +4,6 @@ use ethereum_rust_core::{ U256, }; use ethereum_rust_rlp::{ - encode::RLPEncode, error::{RLPDecodeError, RLPEncodeError}, structs::{Decoder, Encoder}, }; @@ -14,38 +13,16 @@ use crate::rlpx::{message::RLPxMessage, utils::snappy_encode}; #[derive(Debug)] pub(crate) struct StatusMessage { - eth_version: u32, - network_id: u64, - total_difficulty: U256, - block_hash: BlockHash, - genesis: BlockHash, - fork_id: ForkId, -} - -impl StatusMessage { - pub fn new( - eth_version: u32, - network_id: u64, - total_difficulty: U256, - block_hash: BlockHash, - genesis: BlockHash, - fork_id: ForkId, - ) -> Self { - Self { - eth_version, - network_id, - total_difficulty, - block_hash, - genesis, - fork_id, - } - } + pub(crate) eth_version: u32, + pub(crate) network_id: u64, + pub(crate) total_difficulty: U256, + pub(crate) block_hash: BlockHash, + pub(crate) genesis: BlockHash, + pub(crate) fork_id: ForkId, } impl RLPxMessage for StatusMessage { fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { - 16_u8.encode(buf); // msg_id - let mut encoded_data = vec![]; Encoder::new(&mut encoded_data) .encode_field(&self.eth_version) @@ -72,25 +49,20 @@ impl RLPxMessage for StatusMessage { assert_eq!(eth_version, 68, "only eth version 68 is supported"); let (network_id, decoder): (u64, _) = decoder.decode_field("networkId")?; - let (total_difficulty, decoder): (U256, _) = decoder.decode_field("totalDifficulty")?; - let (block_hash, decoder): (BlockHash, _) = decoder.decode_field("blockHash")?; - let (genesis, decoder): (BlockHash, _) = decoder.decode_field("genesis")?; - let (fork_id, decoder): (ForkId, _) = decoder.decode_field("forkId")?; - // Implementations must ignore any additional list elements let _padding = decoder.finish_unchecked(); - Ok(Self::new( + Ok(Self { eth_version, network_id, total_difficulty, block_hash, genesis, fork_id, - )) + }) } } diff --git a/crates/networking/p2p/rlpx/message.rs b/crates/networking/p2p/rlpx/message.rs index e06f27802..314bcc763 100644 --- a/crates/networking/p2p/rlpx/message.rs +++ b/crates/networking/p2p/rlpx/message.rs @@ -72,11 +72,26 @@ impl Message { pub fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { match self { - Message::Hello(msg) => msg.encode(buf), - Message::Disconnect(msg) => msg.encode(buf), - Message::Ping(msg) => msg.encode(buf), - Message::Pong(msg) => msg.encode(buf), - Message::Status(msg) => msg.encode(buf), + Message::Hello(msg) => { + 0x00_u8.encode(buf); + msg.encode(buf) + } + Message::Disconnect(msg) => { + 0x01_u8.encode(buf); + msg.encode(buf) + } + Message::Ping(msg) => { + 0x02_u8.encode(buf); + msg.encode(buf) + } + Message::Pong(msg) => { + 0x03_u8.encode(buf); + msg.encode(buf) + } + Message::Status(msg) => { + 0x10_u8.encode(buf); + msg.encode(buf) + } Message::GetBlockHeaders(msg) => { 0x13_u8.encode(buf); msg.encode(buf) diff --git a/crates/networking/p2p/rlpx/p2p.rs b/crates/networking/p2p/rlpx/p2p.rs index 4521c5bef..a40158c9a 100644 --- a/crates/networking/p2p/rlpx/p2p.rs +++ b/crates/networking/p2p/rlpx/p2p.rs @@ -62,7 +62,6 @@ impl HelloMessage { impl RLPxMessage for HelloMessage { fn encode(&self, mut buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { - 0_u8.encode(buf); //msg_id Encoder::new(&mut buf) .encode_field(&5_u8) // protocolVersion .encode_field(&"Ethereum(++)/1.0.0") // clientId @@ -113,8 +112,6 @@ impl DisconnectMessage { impl RLPxMessage for DisconnectMessage { fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { - 1_u8.encode(buf); //msg_id - let mut encoded_data = vec![]; // Disconnect msg_data is reason or none match self.reason { @@ -161,8 +158,6 @@ impl PingMessage { impl RLPxMessage for PingMessage { fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { - 2_u8.encode(buf); // msg_id - let mut encoded_data = vec![]; // Ping msg_data is only [] Vec::::new().encode(&mut encoded_data); @@ -194,8 +189,6 @@ impl PongMessage { impl RLPxMessage for PongMessage { fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { - 2_u8.encode(buf); // msg_id - let mut encoded_data = vec![]; // Pong msg_data is only [] Vec::::new().encode(&mut encoded_data); From fc6409280072bb9c8d2ad9356317c6f07e153736 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Wed, 6 Nov 2024 10:21:33 -0300 Subject: [PATCH 10/12] Refactored snappy decompress duplicated code --- Cargo.toml | 1 + crates/common/rlp/Cargo.toml | 1 + crates/common/rlp/error.rs | 4 +- crates/networking/p2p/rlpx/eth/blocks.rs | 34 +++++-------- crates/networking/p2p/rlpx/eth/receipts.rs | 21 +++----- crates/networking/p2p/rlpx/eth/status.rs | 14 +++--- .../networking/p2p/rlpx/eth/transactions.rs | 34 +++++-------- crates/networking/p2p/rlpx/p2p.rs | 38 +++++++------- crates/networking/p2p/rlpx/snap.rs | 49 ++++++------------- crates/networking/p2p/rlpx/utils.rs | 15 +++--- 10 files changed, 86 insertions(+), 125 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index aeb8b3b97..cacad50aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,3 +62,4 @@ jsonwebtoken = "9.3.0" rand = "0.8.5" cfg-if = "1.0.0" reqwest = { version = "0.12.7", features = ["json"] } +snap = "1.1.1" diff --git a/crates/common/rlp/Cargo.toml b/crates/common/rlp/Cargo.toml index 1c708d3aa..b3737055b 100644 --- a/crates/common/rlp/Cargo.toml +++ b/crates/common/rlp/Cargo.toml @@ -10,6 +10,7 @@ bytes.workspace = true hex.workspace = true lazy_static.workspace = true ethereum-types.workspace = true +snap.workspace = true [dev-dependencies] hex-literal.workspace = true diff --git a/crates/common/rlp/error.rs b/crates/common/rlp/error.rs index 03987c0d4..81e4f321a 100644 --- a/crates/common/rlp/error.rs +++ b/crates/common/rlp/error.rs @@ -13,6 +13,8 @@ pub enum RLPDecodeError { UnexpectedList, #[error("UnexpectedString")] UnexpectedString, + #[error("InvalidCompression")] + InvalidCompression(#[from] snap::Error), #[error("{0}")] Custom(String), } @@ -21,7 +23,7 @@ pub enum RLPDecodeError { #[derive(Debug, Error)] pub enum RLPEncodeError { #[error("InvalidCompression")] - InvalidCompression, + InvalidCompression(#[from] snap::Error), #[error("{0}")] Custom(String), } diff --git a/crates/networking/p2p/rlpx/eth/blocks.rs b/crates/networking/p2p/rlpx/eth/blocks.rs index 999550bf0..1ec932c0c 100644 --- a/crates/networking/p2p/rlpx/eth/blocks.rs +++ b/crates/networking/p2p/rlpx/eth/blocks.rs @@ -1,4 +1,7 @@ -use crate::rlpx::{message::RLPxMessage, utils::snappy_encode}; +use crate::rlpx::{ + message::RLPxMessage, + utils::{snappy_compress, snappy_decompress}, +}; use bytes::BufMut; use ethereum_rust_core::types::{BlockBody, BlockHash, BlockHeader, BlockNumber}; use ethereum_rust_rlp::{ @@ -8,7 +11,6 @@ use ethereum_rust_rlp::{ structs::{Decoder, Encoder}, }; use ethereum_rust_storage::Store; -use snap::raw::Decoder as SnappyDecoder; use tracing::error; pub const HASH_FIRST_BYTE_DECODER: u8 = 160; @@ -141,16 +143,13 @@ impl RLPxMessage for GetBlockHeaders { .encode_field(&self.id) .encode_field(&(self.startblock.clone(), limit, skip, reverse)) .finish(); - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder - .decompress_vec(msg_data) - .map_err(|e| RLPDecodeError::Custom(e.to_string()))?; + let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let (id, decoder): (u64, _) = decoder.decode_field("request-id")?; let ((start_block, limit, skip, reverse), _): ((HashOrNumber, u64, u64, bool), _) = @@ -184,16 +183,13 @@ impl RLPxMessage for BlockHeaders { .encode_field(&self.id) .encode_field(&self.block_headers) .finish(); - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder - .decompress_vec(msg_data) - .map_err(|e| RLPDecodeError::Custom(e.to_string()))?; + let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let (id, decoder): (u64, _) = decoder.decode_field("request-id")?; let (block_headers, _): (Vec, _) = decoder.decode_field("headers")?; @@ -252,16 +248,13 @@ impl RLPxMessage for GetBlockBodies { .encode_field(&self.block_hashes) .finish(); - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder - .decompress_vec(msg_data) - .map_err(|err| RLPDecodeError::Custom(err.to_string()))?; + let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let (id, decoder): (u64, _) = decoder.decode_field("request-id")?; let (block_hashes, _): (Vec, _) = decoder.decode_field("blockHashes")?; @@ -293,16 +286,13 @@ impl RLPxMessage for BlockBodies { .encode_field(&self.block_bodies) .finish(); - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder - .decompress_vec(msg_data) - .map_err(|err| RLPDecodeError::Custom(err.to_string()))?; + let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let (id, decoder): (u64, _) = decoder.decode_field("request-id")?; let (block_bodies, _): (Vec, _) = decoder.decode_field("blockBodies")?; diff --git a/crates/networking/p2p/rlpx/eth/receipts.rs b/crates/networking/p2p/rlpx/eth/receipts.rs index 5d76a2f27..0eae0af0a 100644 --- a/crates/networking/p2p/rlpx/eth/receipts.rs +++ b/crates/networking/p2p/rlpx/eth/receipts.rs @@ -1,12 +1,13 @@ +use crate::rlpx::{ + message::RLPxMessage, + utils::{snappy_compress, snappy_decompress}, +}; use bytes::BufMut; use ethereum_rust_core::types::{BlockHash, Receipt}; use ethereum_rust_rlp::{ error::{RLPDecodeError, RLPEncodeError}, structs::{Decoder, Encoder}, }; -use snap::raw::Decoder as SnappyDecoder; - -use crate::rlpx::{message::RLPxMessage, utils::snappy_encode}; // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getreceipts-0x0f #[derive(Debug)] @@ -31,16 +32,13 @@ impl RLPxMessage for GetReceipts { .encode_field(&self.block_hashes) .finish(); - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder - .decompress_vec(msg_data) - .map_err(|err| RLPDecodeError::Custom(err.to_string()))?; + let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let (id, decoder): (u64, _) = decoder.decode_field("request-id")?; let (block_hashes, _): (Vec, _) = decoder.decode_field("blockHashes")?; @@ -71,16 +69,13 @@ impl RLPxMessage for Receipts { .encode_field(&self.receipts) .finish(); - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder - .decompress_vec(msg_data) - .map_err(|err| RLPDecodeError::Custom(err.to_string()))?; + let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let (id, decoder): (u64, _) = decoder.decode_field("request-id")?; let (receipts, _): (Vec>, _) = decoder.decode_field("receipts")?; diff --git a/crates/networking/p2p/rlpx/eth/status.rs b/crates/networking/p2p/rlpx/eth/status.rs index aae3c025d..b0e21bbf8 100644 --- a/crates/networking/p2p/rlpx/eth/status.rs +++ b/crates/networking/p2p/rlpx/eth/status.rs @@ -1,3 +1,7 @@ +use crate::rlpx::{ + message::RLPxMessage, + utils::{snappy_compress, snappy_decompress}, +}; use bytes::BufMut; use ethereum_rust_core::{ types::{BlockHash, ForkId}, @@ -7,9 +11,6 @@ use ethereum_rust_rlp::{ error::{RLPDecodeError, RLPEncodeError}, structs::{Decoder, Encoder}, }; -use snap::raw::Decoder as SnappyDecoder; - -use crate::rlpx::{message::RLPxMessage, utils::snappy_encode}; #[derive(Debug)] pub(crate) struct StatusMessage { @@ -33,16 +34,13 @@ impl RLPxMessage for StatusMessage { .encode_field(&self.fork_id) .finish(); - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder - .decompress_vec(msg_data) - .map_err(|e| RLPDecodeError::Custom(e.to_string()))?; + let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let (eth_version, decoder): (u32, _) = decoder.decode_field("protocolVersion")?; diff --git a/crates/networking/p2p/rlpx/eth/transactions.rs b/crates/networking/p2p/rlpx/eth/transactions.rs index d84e9c228..fad509ec1 100644 --- a/crates/networking/p2p/rlpx/eth/transactions.rs +++ b/crates/networking/p2p/rlpx/eth/transactions.rs @@ -4,9 +4,11 @@ use ethereum_rust_rlp::{ error::{RLPDecodeError, RLPEncodeError}, structs::{Decoder, Encoder}, }; -use snap::raw::Decoder as SnappyDecoder; -use crate::rlpx::{message::RLPxMessage, utils::snappy_encode}; +use crate::rlpx::{ + message::RLPxMessage, + utils::{snappy_compress, snappy_decompress}, +}; // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#transactions-0x02 // Broadcast message @@ -28,16 +30,13 @@ impl RLPxMessage for Transactions { .encode_field(&self.transactions) .finish(); - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder - .decompress_vec(msg_data) - .map_err(|e| RLPDecodeError::Custom(e.to_string()))?; + let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let (transactions, _): (Vec, _) = decoder.decode_field("transactions")?; @@ -89,16 +88,13 @@ impl RLPxMessage for NewPooledTransactionHashes { .encode_field(&self.transaction_hashes) .finish(); - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder - .decompress_vec(msg_data) - .map_err(|e| RLPDecodeError::Custom(e.to_string()))?; + let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let (transaction_types, decoder): (Vec, _) = decoder.decode_field("transactionTypes")?; @@ -149,16 +145,13 @@ impl RLPxMessage for GetPooledTransactions { .encode_field(&self.transaction_hashes) .finish(); - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder - .decompress_vec(msg_data) - .map_err(|e| RLPDecodeError::Custom(e.to_string()))?; + let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let (id, decoder): (u64, _) = decoder.decode_field("request-id")?; let (transaction_hashes, _): (Vec, _) = decoder.decode_field("transactionHashes")?; @@ -191,16 +184,13 @@ impl RLPxMessage for PooledTransactions { .encode_field(&self.id) .encode_field(&self.pooled_transactions) .finish(); - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder - .decompress_vec(msg_data) - .map_err(|e| RLPDecodeError::Custom(e.to_string()))?; + let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let (id, decoder): (u64, _) = decoder.decode_field("request-id")?; let (pooled_transactions, _): (Vec, _) = diff --git a/crates/networking/p2p/rlpx/p2p.rs b/crates/networking/p2p/rlpx/p2p.rs index a40158c9a..41f7f6432 100644 --- a/crates/networking/p2p/rlpx/p2p.rs +++ b/crates/networking/p2p/rlpx/p2p.rs @@ -7,13 +7,12 @@ use ethereum_rust_rlp::{ structs::{Decoder, Encoder}, }; use k256::PublicKey; -use snap::raw::Decoder as SnappyDecoder; -use crate::rlpx::utils::id2pubkey; +use crate::rlpx::utils::{id2pubkey, snappy_decompress}; use super::{ message::RLPxMessage, - utils::{pubkey2id, snappy_encode}, + utils::{pubkey2id, snappy_compress}, }; #[derive(Debug, Clone, PartialEq)] @@ -74,28 +73,30 @@ impl RLPxMessage for HelloMessage { fn decode(msg_data: &[u8]) -> Result { // decode hello message: [protocolVersion: P, clientId: B, capabilities, listenPort: P, nodeId: B_64, ...] - let decoder = Decoder::new(msg_data).unwrap(); - let (protocol_version, decoder): (u64, _) = - decoder.decode_field("protocolVersion").unwrap(); + let decoder = Decoder::new(msg_data)?; + let (protocol_version, decoder): (u64, _) = decoder.decode_field("protocolVersion")?; assert_eq!(protocol_version, 5, "only protocol version 5 is supported"); - let (_client_id, decoder): (String, _) = decoder.decode_field("clientId").unwrap(); + let (_client_id, decoder): (String, _) = decoder.decode_field("clientId")?; // TODO: store client id for debugging purposes // [[cap1, capVersion1], [cap2, capVersion2], ...] let (capabilities, decoder): (Vec<(Capability, u8)>, _) = - decoder.decode_field("capabilities").unwrap(); + decoder.decode_field("capabilities")?; // This field should be ignored - let (_listen_port, decoder): (u16, _) = decoder.decode_field("listenPort").unwrap(); + let (_listen_port, decoder): (u16, _) = decoder.decode_field("listenPort")?; - let (node_id, decoder): (H512, _) = decoder.decode_field("nodeId").unwrap(); + let (node_id, decoder): (H512, _) = decoder.decode_field("nodeId")?; // Implementations must ignore any additional list elements let _padding = decoder.finish_unchecked(); - Ok(Self::new(capabilities, id2pubkey(node_id).unwrap())) + Ok(Self::new( + capabilities, + id2pubkey(node_id).ok_or(RLPDecodeError::MalformedData)?, + )) } } @@ -120,15 +121,14 @@ impl RLPxMessage for DisconnectMessage { .finish(), None => Vec::::new().encode(&mut encoded_data), } - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { // decode disconnect message: [reason (optional)] - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder.decompress_vec(msg_data).unwrap(); + let decompressed_data = snappy_decompress(msg_data)?; // It seems that disconnect reason can be encoded in different ways: // TODO: it may be not compressed at all. We should check that case let reason = match decompressed_data.len() { @@ -161,15 +161,14 @@ impl RLPxMessage for PingMessage { let mut encoded_data = vec![]; // Ping msg_data is only [] Vec::::new().encode(&mut encoded_data); - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { // decode ping message: data is empty list [] but it is snappy compressed - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder.decompress_vec(msg_data).unwrap(); + let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let result = decoder.finish_unchecked(); let empty: &[u8] = &[]; @@ -192,15 +191,14 @@ impl RLPxMessage for PongMessage { let mut encoded_data = vec![]; // Pong msg_data is only [] Vec::::new().encode(&mut encoded_data); - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { // decode pong message: data is empty list [] but it is snappy compressed - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder.decompress_vec(msg_data).unwrap(); + let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let result = decoder.finish_unchecked(); let empty: &[u8] = &[]; diff --git a/crates/networking/p2p/rlpx/snap.rs b/crates/networking/p2p/rlpx/snap.rs index 6126db6dc..772710466 100644 --- a/crates/networking/p2p/rlpx/snap.rs +++ b/crates/networking/p2p/rlpx/snap.rs @@ -1,3 +1,7 @@ +use super::{ + message::RLPxMessage, + utils::{snappy_compress, snappy_decompress}, +}; use bytes::{BufMut, Bytes}; use ethereum_rust_core::{ types::{AccountState, EMPTY_KECCACK_HASH, EMPTY_TRIE_HASH}, @@ -9,9 +13,6 @@ use ethereum_rust_rlp::{ error::{RLPDecodeError, RLPEncodeError}, structs::{Decoder, Encoder}, }; -use snap::raw::Decoder as SnappyDecoder; - -use super::{message::RLPxMessage, utils::snappy_encode}; // Snap Capability Messages @@ -74,16 +75,13 @@ impl RLPxMessage for GetAccountRange { .encode_field(&self.response_bytes) .finish(); - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder - .decompress_vec(msg_data) - .map_err(|e| RLPDecodeError::Custom(e.to_string()))?; + let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let (id, decoder) = decoder.decode_field("request-id")?; let (root_hash, decoder) = decoder.decode_field("rootHash")?; @@ -111,16 +109,13 @@ impl RLPxMessage for AccountRange { .encode_field(&self.proof) .finish(); - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder - .decompress_vec(msg_data) - .map_err(|e| RLPDecodeError::Custom(e.to_string()))?; + let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let (id, decoder) = decoder.decode_field("request-id")?; let (accounts, decoder) = decoder.decode_field("accounts")?; @@ -147,16 +142,13 @@ impl RLPxMessage for GetStorageRanges { .encode_field(&self.response_bytes) .finish(); - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder - .decompress_vec(msg_data) - .map_err(|e| RLPDecodeError::Custom(e.to_string()))?; + let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let (id, decoder) = decoder.decode_field("request-id")?; let (root_hash, decoder) = decoder.decode_field("rootHash")?; @@ -186,16 +178,13 @@ impl RLPxMessage for StorageRanges { .encode_field(&self.proof) .finish(); - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder - .decompress_vec(msg_data) - .map_err(|e| RLPDecodeError::Custom(e.to_string()))?; + let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let (id, decoder) = decoder.decode_field("request-id")?; let (slots, decoder) = decoder.decode_field("slots")?; @@ -215,16 +204,13 @@ impl RLPxMessage for GetByteCodes { .encode_field(&self.bytes) .finish(); - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder - .decompress_vec(msg_data) - .map_err(|e| RLPDecodeError::Custom(e.to_string()))?; + let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let (id, decoder) = decoder.decode_field("request-id")?; let (hashes, decoder) = decoder.decode_field("hashes")?; @@ -243,16 +229,13 @@ impl RLPxMessage for ByteCodes { .encode_field(&self.codes) .finish(); - let msg_data = snappy_encode(encoded_data)?; + let msg_data = snappy_compress(encoded_data)?; buf.put_slice(&msg_data); Ok(()) } fn decode(msg_data: &[u8]) -> Result { - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder - .decompress_vec(msg_data) - .map_err(|e| RLPDecodeError::Custom(e.to_string()))?; + let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let (id, decoder) = decoder.decode_field("request-id")?; let (codes, decoder) = decoder.decode_field("codes")?; diff --git a/crates/networking/p2p/rlpx/utils.rs b/crates/networking/p2p/rlpx/utils.rs index a2118a9d7..348932401 100644 --- a/crates/networking/p2p/rlpx/utils.rs +++ b/crates/networking/p2p/rlpx/utils.rs @@ -1,10 +1,10 @@ use ethereum_rust_core::H512; -use ethereum_rust_rlp::error::RLPEncodeError; +use ethereum_rust_rlp::error::{RLPDecodeError, RLPEncodeError}; use k256::{ elliptic_curve::sec1::{FromEncodedPoint, ToEncodedPoint}, EncodedPoint, PublicKey, SecretKey, }; -use snap::raw::{max_compress_len, Encoder as SnappyEncoder}; +use snap::raw::{max_compress_len, Decoder as SnappyDecoder, Encoder as SnappyEncoder}; pub fn sha256(data: &[u8]) -> [u8; 32] { use k256::sha2::Digest; @@ -50,17 +50,20 @@ pub fn id2pubkey(id: H512) -> Option { PublicKey::from_encoded_point(&point).into_option() } -pub fn snappy_encode(encoded_data: Vec) -> Result, RLPEncodeError> { +pub fn snappy_compress(encoded_data: Vec) -> Result, RLPEncodeError> { let mut snappy_encoder = SnappyEncoder::new(); let mut msg_data = vec![0; max_compress_len(encoded_data.len()) + 1]; - let compressed_size = snappy_encoder - .compress(&encoded_data, &mut msg_data) - .map_err(|_| RLPEncodeError::InvalidCompression)?; + let compressed_size = snappy_encoder.compress(&encoded_data, &mut msg_data)?; msg_data.truncate(compressed_size); Ok(msg_data) } +pub fn snappy_decompress(msg_data: &[u8]) -> Result, RLPDecodeError> { + let mut snappy_decoder = SnappyDecoder::new(); + Ok(snappy_decoder.decompress_vec(msg_data)?) +} + #[cfg(test)] mod tests { use super::*; From 232ecd73a648f6ecbedd4e94d7000846ea08e390 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Wed, 6 Nov 2024 12:01:28 -0300 Subject: [PATCH 11/12] Improved error handling in frame.rs module and some refactors --- crates/networking/p2p/rlpx/connection.rs | 10 ++-- crates/networking/p2p/rlpx/error.rs | 2 + crates/networking/p2p/rlpx/frame.rs | 76 ++++++++++++++++-------- crates/networking/p2p/rlpx/handshake.rs | 2 +- 4 files changed, 58 insertions(+), 32 deletions(-) diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index 8b5941363..da93a9bfe 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -326,7 +326,7 @@ impl RLPxConnection { self.stream .write_all(msg) .await - .map_err(|_| RLPxError::HandshakeError("Could not send message".to_string()))?; + .map_err(|_| RLPxError::ConnectionError("Could not send message".to_string()))?; Ok(()) } @@ -337,7 +337,7 @@ impl RLPxConnection { self.stream .read_exact(&mut buf[..2]) .await - .map_err(|_| RLPxError::HandshakeError("Connection dropped".to_string()))?; + .map_err(|_| RLPxError::ConnectionError("Connection dropped".to_string()))?; let ack_data = [buf[0], buf[1]]; let msg_size = u16::from_be_bytes(ack_data) as usize; @@ -345,7 +345,7 @@ impl RLPxConnection { self.stream .read_exact(&mut buf[2..msg_size + 2]) .await - .map_err(|_| RLPxError::HandshakeError("Connection dropped".to_string()))?; + .map_err(|_| RLPxError::ConnectionError("Connection dropped".to_string()))?; let ack_bytes = &buf[..msg_size + 2]; Ok(ack_bytes.to_vec()) } @@ -355,7 +355,7 @@ impl RLPxConnection { RLPxConnectionState::Established(state) => { let mut frame_buffer = vec![]; message.encode(&mut frame_buffer)?; - frame::write(frame_buffer, state, &mut self.stream).await; + frame::write(frame_buffer, state, &mut self.stream).await?; Ok(()) } _ => Err(RLPxError::InvalidState()), @@ -365,7 +365,7 @@ impl RLPxConnection { async fn receive(&mut self) -> Result { match &mut self.state { RLPxConnectionState::Established(state) => { - let frame_data = frame::read(state, &mut self.stream).await; + let frame_data = frame::read(state, &mut self.stream).await?; let (msg_id, msg_data): (u8, _) = RLPDecode::decode_unfinished(&frame_data)?; Ok(rlpx::Message::decode(msg_id, msg_data)?) } diff --git a/crates/networking/p2p/rlpx/error.rs b/crates/networking/p2p/rlpx/error.rs index 0367b598d..1261c7f55 100644 --- a/crates/networking/p2p/rlpx/error.rs +++ b/crates/networking/p2p/rlpx/error.rs @@ -7,6 +7,8 @@ use thiserror::Error; pub(crate) enum RLPxError { #[error("{0}")] HandshakeError(String), + #[error("{0}")] + ConnectionError(String), #[error("Invalid connection state")] InvalidState(), #[error("Not Found: {0}")] diff --git a/crates/networking/p2p/rlpx/frame.rs b/crates/networking/p2p/rlpx/frame.rs index 9c5c8d266..0480c5e8d 100644 --- a/crates/networking/p2p/rlpx/frame.rs +++ b/crates/networking/p2p/rlpx/frame.rs @@ -7,14 +7,14 @@ use ethereum_rust_rlp::encode::RLPEncode as _; use sha3::Digest as _; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use super::connection::Established; +use super::{connection::Established, error::RLPxError}; pub(crate) async fn write( mut frame_data: Vec, state: &mut Established, stream: &mut S, -) { - let mac_aes_cipher = Aes256Enc::new_from_slice(&state.mac_key.0).unwrap(); +) -> Result<(), RLPxError> { + let mac_aes_cipher = Aes256Enc::new_from_slice(&state.mac_key.0)?; // header = frame-size || header-data || header-padding let mut header = Vec::with_capacity(32); @@ -28,20 +28,27 @@ pub(crate) async fn write( header.resize(16, 0); state.egress_aes.apply_keystream(&mut header[..16]); - let header_mac_seed = { - let mac_digest: [u8; 16] = state.egress_mac.clone().finalize()[..16] - .try_into() - .unwrap(); - let mut seed = mac_digest.into(); - mac_aes_cipher.encrypt_block(&mut seed); - H128(seed.into()) ^ H128(header[..16].try_into().unwrap()) - }; + let header_mac_seed = + { + let mac_digest: [u8; 16] = state.egress_mac.clone().finalize()[..16] + .try_into() + .map_err(|_| RLPxError::CryptographyError("Invalid mac digest".to_owned()))?; + let mut seed = mac_digest.into(); + mac_aes_cipher.encrypt_block(&mut seed); + H128(seed.into()) + ^ H128(header[..16].try_into().map_err(|_| { + RLPxError::CryptographyError("Invalid header length".to_owned()) + })?) + }; state.egress_mac.update(header_mac_seed); let header_mac = state.egress_mac.clone().finalize(); header.extend_from_slice(&header_mac[..16]); // Write header - stream.write_all(&header).await.unwrap(); + stream + .write_all(&header) + .await + .map_err(|_| RLPxError::ConnectionError("Could not send message".to_string()))?; // Pad to next multiple of 16 frame_data.resize(frame_data.len().next_multiple_of(16), 0); @@ -49,7 +56,10 @@ pub(crate) async fn write( let frame_ciphertext = frame_data; // Send frame - stream.write_all(&frame_ciphertext).await.unwrap(); + stream + .write_all(&frame_ciphertext) + .await + .map_err(|_| RLPxError::ConnectionError("Could not send message".to_string()))?; // Compute frame-mac state.egress_mac.update(&frame_ciphertext); @@ -58,7 +68,7 @@ pub(crate) async fn write( let frame_mac_seed = { let mac_digest: [u8; 16] = state.egress_mac.clone().finalize()[..16] .try_into() - .unwrap(); + .map_err(|_| RLPxError::CryptographyError("Invalid mac digest".to_owned()))?; let mut seed = mac_digest.into(); mac_aes_cipher.encrypt_block(&mut seed); (H128(seed.into()) ^ H128(mac_digest)).0 @@ -66,18 +76,25 @@ pub(crate) async fn write( state.egress_mac.update(frame_mac_seed); let frame_mac = state.egress_mac.clone().finalize(); // Send frame-mac - stream.write_all(&frame_mac[..16]).await.unwrap(); + stream + .write_all(&frame_mac[..16]) + .await + .map_err(|_| RLPxError::ConnectionError("Could not send message".to_string()))?; + Ok(()) } pub(crate) async fn read( state: &mut Established, stream: &mut S, -) -> Vec { - let mac_aes_cipher = Aes256Enc::new_from_slice(&state.mac_key.0).unwrap(); +) -> Result, RLPxError> { + let mac_aes_cipher = Aes256Enc::new_from_slice(&state.mac_key.0)?; // Receive the message's frame header let mut frame_header = [0; 32]; - stream.read_exact(&mut frame_header).await.unwrap(); + stream + .read_exact(&mut frame_header) + .await + .map_err(|_| RLPxError::ConnectionError("Connection dropped".to_string()))?; // Both are padded to the block's size (16 bytes) let (header_ciphertext, header_mac) = frame_header.split_at_mut(16); @@ -86,10 +103,14 @@ pub(crate) async fn read( let header_mac_seed = { let mac_digest: [u8; 16] = state.ingress_mac.clone().finalize()[..16] .try_into() - .unwrap(); + .map_err(|_| RLPxError::CryptographyError("Invalid mac digest".to_owned()))?; let mut seed = mac_digest.into(); mac_aes_cipher.encrypt_block(&mut seed); - (H128(seed.into()) ^ H128(header_ciphertext.try_into().unwrap())).0 + (H128(seed.into()) + ^ H128(header_ciphertext.try_into().map_err(|_| { + RLPxError::CryptographyError("Invalid header ciphertext length".to_owned()) + })?)) + .0 }; // ingress-mac = keccak256.update(ingress-mac, header-mac-seed) @@ -99,7 +120,7 @@ pub(crate) async fn read( let expected_header_mac = H128( state.ingress_mac.clone().finalize()[..16] .try_into() - .unwrap(), + .map_err(|_| RLPxError::CryptographyError("Invalid header mac".to_owned()))?, ); assert_eq!(header_mac, expected_header_mac.0); @@ -113,11 +134,14 @@ pub(crate) async fn read( let frame_size: usize = u32::from_be_bytes([0, header_text[0], header_text[1], header_text[2]]) .try_into() - .unwrap(); + .map_err(|_| RLPxError::CryptographyError("Invalid frame size".to_owned()))?; // Receive the hello message let padded_size = frame_size.next_multiple_of(16); let mut frame_data = vec![0; padded_size + 16]; - stream.read_exact(&mut frame_data).await.unwrap(); + stream + .read_exact(&mut frame_data) + .await + .map_err(|_| RLPxError::ConnectionError("Connection dropped".to_string()))?; let (frame_ciphertext, frame_mac) = frame_data.split_at_mut(padded_size); // check MAC @@ -126,7 +150,7 @@ pub(crate) async fn read( let frame_mac_seed = { let mac_digest: [u8; 16] = state.ingress_mac.clone().finalize()[..16] .try_into() - .unwrap(); + .map_err(|_| RLPxError::CryptographyError("Invalid mac digest".to_owned()))?; let mut seed = mac_digest.into(); mac_aes_cipher.encrypt_block(&mut seed); (H128(seed.into()) ^ H128(mac_digest)).0 @@ -134,7 +158,7 @@ pub(crate) async fn read( state.ingress_mac.update(frame_mac_seed); let expected_frame_mac: [u8; 16] = state.ingress_mac.clone().finalize()[..16] .try_into() - .unwrap(); + .map_err(|_| RLPxError::CryptographyError("Invalid frame mac".to_owned()))?; assert_eq!(frame_mac, expected_frame_mac); @@ -143,5 +167,5 @@ pub(crate) async fn read( let (frame_data, _padding) = frame_ciphertext.split_at(frame_size); - frame_data.to_vec() + Ok(frame_data.to_vec()) } diff --git a/crates/networking/p2p/rlpx/handshake.rs b/crates/networking/p2p/rlpx/handshake.rs index c976738c8..87caa0038 100644 --- a/crates/networking/p2p/rlpx/handshake.rs +++ b/crates/networking/p2p/rlpx/handshake.rs @@ -58,7 +58,7 @@ pub(crate) fn decode_auth_message( let (auth, _padding) = AuthMessage::decode_unfinished(&payload)?; // Derive a shared secret from the static keys. - let peer_pk = id2pubkey(auth.node_id).ok_or(RLPxError::NotFound("Node id".to_string()))?; + let peer_pk = id2pubkey(auth.node_id).ok_or(RLPxError::InvalidPeerId())?; let static_shared_secret = ecdh_xchng(static_key, &peer_pk); let remote_ephemeral_key = retrieve_remote_ephemeral_key(static_shared_secret.into(), auth.nonce, auth.signature)?; From 50103ced56e16b14b5fb5eedc5df609dc6fa5ee4 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Thu, 7 Nov 2024 11:30:35 -0300 Subject: [PATCH 12/12] Added some slice bounds checks --- crates/networking/p2p/rlpx/connection.rs | 27 ++++++++++++++++++------ crates/networking/p2p/rlpx/error.rs | 2 ++ 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index da93a9bfe..ea9e053e0 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -81,9 +81,14 @@ impl RLPxConnection { storage: Store, ) -> Result { let mut rng = rand::thread_rng(); - let digest = Keccak256::digest(&msg[65..]); - let signature = &Signature::from_bytes(msg[..64].into())?; - let rid = RecoveryId::from_byte(msg[64]).ok_or(RLPxError::InvalidRecoveryId())?; + let digest = Keccak256::digest(msg.get(65..).ok_or(RLPxError::InvalidMessageLength())?); + let signature = &Signature::from_bytes( + msg.get(..64) + .ok_or(RLPxError::InvalidMessageLength())? + .into(), + )?; + let rid = RecoveryId::from_byte(*msg.get(64).ok_or(RLPxError::InvalidMessageLength())?) + .ok_or(RLPxError::InvalidRecoveryId())?; let peer_pk = VerifyingKey::recover_from_prehash(&digest, signature, rid)?; let state = RLPxConnectionState::Initiator(Initiator::new( H256::random_using(&mut rng), @@ -276,8 +281,12 @@ impl RLPxConnection { // Clonning previous state to avoid ownership issues let previous_state = receiver_state.clone(); let msg_bytes = self.receive_handshake_msg().await?; - let size_data = &msg_bytes[..2]; - let msg = &msg_bytes[2..]; + let size_data = &msg_bytes + .get(..2) + .ok_or(RLPxError::InvalidMessageLength())?; + let msg = &msg_bytes + .get(2..) + .ok_or(RLPxError::InvalidMessageLength())?; let (auth, remote_ephemeral_key) = decode_auth_message(&secret_key, msg, size_data)?; @@ -302,8 +311,12 @@ impl RLPxConnection { // Clonning previous state to avoid ownership issues let previous_state = initiated_auth_state.clone(); let msg_bytes = self.receive_handshake_msg().await?; - let size_data = &msg_bytes[..2]; - let msg = &msg_bytes[2..]; + let size_data = &msg_bytes + .get(..2) + .ok_or(RLPxError::InvalidMessageLength())?; + let msg = &msg_bytes + .get(2..) + .ok_or(RLPxError::InvalidMessageLength())?; let ack = decode_ack_message(&secret_key, msg, size_data)?; let remote_ephemeral_key = ack .get_ephemeral_pubkey() diff --git a/crates/networking/p2p/rlpx/error.rs b/crates/networking/p2p/rlpx/error.rs index 1261c7f55..a33e8b161 100644 --- a/crates/networking/p2p/rlpx/error.rs +++ b/crates/networking/p2p/rlpx/error.rs @@ -17,6 +17,8 @@ pub(crate) enum RLPxError { InvalidPeerId(), #[error("Invalid recovery id")] InvalidRecoveryId(), + #[error("Invalid message length")] + InvalidMessageLength(), #[error("Cannot handle message")] MessageNotHandled(), #[error(transparent)]