diff --git a/src/node/node.rs b/src/node/node.rs index 03edcbc..5024126 100644 --- a/src/node/node.rs +++ b/src/node/node.rs @@ -322,8 +322,8 @@ impl Node { self.dialog .send_warning(Warning::NotEnoughConnections) .await; - let ip = peer_map.next_peer().await?; - if peer_map.dispatch(ip.0, ip.1).await.is_err() { + let address = peer_map.next_peer().await?; + if peer_map.dispatch(address).await.is_err() { self.dialog.send_warning(Warning::CouldNotConnect).await; } } diff --git a/src/node/peer_map.rs b/src/node/peer_map.rs index 9b9b8a9..a3f1838 100644 --- a/src/node/peer_map.rs +++ b/src/node/peer_map.rs @@ -150,7 +150,7 @@ impl PeerMap { } // Send out a TCP connection to a new peer and begin tracking the task - pub async fn dispatch(&mut self, address: AddrV2, port: u16) -> Result<(), PeerError> { + pub async fn dispatch(&mut self, loaded_peer: PersistedPeer) -> Result<(), PeerError> { let (ptx, prx) = mpsc::channel::(32); let peer_num = self.num_peers + 1; self.num_peers = peer_num; @@ -159,23 +159,29 @@ impl PeerMap { self.network, self.mtx.clone(), prx, + loaded_peer.services, self.dialog.clone(), ); let mut connector = self.connector.lock().await; - if !connector.can_connect(&address) { + if !connector.can_connect(&loaded_peer.addr) { return Err(PeerError::UnreachableSocketAddr); } self.dialog - .send_dialog(format!("Connecting to {:?}:{}", address, port)) + .send_dialog(format!( + "Connecting to {:?}:{}", + loaded_peer.addr, loaded_peer.port + )) .await; - let (reader, writer) = connector.connect(address.clone(), port).await?; + let (reader, writer) = connector + .connect(loaded_peer.addr.clone(), loaded_peer.port) + .await?; let handle = tokio::spawn(async move { peer.run(reader, writer).await }); self.map.insert( peer_num, ManagedPeer { service_flags: None, - address, - port, + address: loaded_peer.addr, + port: loaded_peer.port, net_time: 0, ptx, handle, @@ -233,13 +239,14 @@ impl PeerMap { // Pull a peer from the configuration if we have one. If not, select a random peer from the database, // as long as it is not from the same netgroup. If there are no peers in the database, try DNS. - pub async fn next_peer(&mut self) -> Result<(AddrV2, u16), NodeError> { + pub async fn next_peer(&mut self) -> Result { if let Some(whitelist) = &mut self.whitelist { if let Some((address, port)) = whitelist.pop() { self.dialog .send_dialog("Using a configured peer.".into()) .await; - return Ok((address, port)); + let peer = PersistedPeer::new(address, port, ServiceFlags::NONE, PeerStatus::Tried); + return Ok(peer); } } let current_count = { @@ -257,12 +264,11 @@ impl PeerMap { let mut peer_manager = self.db.lock().await; let mut tries = 0; while tries < 10 { - let peer: (AddrV2, u16) = peer_manager + let peer = peer_manager .random() .await - .map(|r| r.into()) .map_err(|e| NodeError::PeerDatabase(PeerManagerError::Database(e)))?; - if self.net_groups.contains(&peer.0.netgroup()) { + if self.net_groups.contains(&peer.addr.netgroup()) { tries += 1; continue; } else { @@ -272,7 +278,6 @@ impl PeerMap { peer_manager .random() .await - .map(|r| r.into()) .map_err(|e| NodeError::PeerDatabase(PeerManagerError::Database(e))) } diff --git a/src/peers/error.rs b/src/peers/error.rs index c70faa3..66dbc2a 100644 --- a/src/peers/error.rs +++ b/src/peers/error.rs @@ -29,6 +29,8 @@ impl_sourceless_error!(PeerReadError); #[derive(Debug)] pub enum PeerError { ConnectionFailed, + MessageSerialization, + HandshakeFailed, BufferWrite, ThreadChannel, DisconnectCommand, @@ -54,6 +56,12 @@ impl core::fmt::Display for PeerError { PeerError::UnreachableSocketAddr => { write!(f, "cannot make use of provided p2p address.") } + PeerError::MessageSerialization => { + write!(f, "serializing a message into bytes failed.") + } + PeerError::HandshakeFailed => { + write!(f, "an attempted V2 transport handshake failed.") + } } } } diff --git a/src/peers/outbound_messages.rs b/src/peers/outbound_messages.rs index ced17bc..26e7424 100644 --- a/src/peers/outbound_messages.rs +++ b/src/peers/outbound_messages.rs @@ -3,6 +3,7 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; +use bip324::PacketWriter; use bitcoin::{ consensus::serialize, hashes::Hash, @@ -18,7 +19,7 @@ use bitcoin::{ use crate::{node::channel_messages::GetBlockConfig, prelude::default_port_from_network}; -use super::traits::MessageGenerator; +use super::{error::PeerError, traits::MessageGenerator}; pub const PROTOCOL_VERSION: u32 = 70016; @@ -32,86 +33,176 @@ impl V1OutboundMessage { } } +fn make_version(port: Option, network: &Network) -> VersionMessage { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("time went backwards") + .as_secs(); + let default_port = default_port_from_network(network); + let ip = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + port.unwrap_or(default_port), + ); + let from_and_recv = Address::new(&ip, ServiceFlags::NONE); + let msg = VersionMessage { + version: PROTOCOL_VERSION, + services: ServiceFlags::NONE, + timestamp: now as i64, + receiver: from_and_recv.clone(), + sender: from_and_recv, + nonce: 1, + user_agent: "Kyoto Light Client / 0.1.0 / rust-bitcoin 0.32".to_string(), + start_height: 0, + relay: false, + }; + msg +} + impl MessageGenerator for V1OutboundMessage { - fn version_message(&mut self, port: Option) -> Vec { - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("time went backwards") - .as_secs(); - let default_port = default_port_from_network(&self.network); - let ip = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - port.unwrap_or(default_port), - ); - let from_and_recv = Address::new(&ip, ServiceFlags::NONE); - let msg = VersionMessage { - version: PROTOCOL_VERSION, - services: ServiceFlags::NONE, - timestamp: now as i64, - receiver: from_and_recv.clone(), - sender: from_and_recv, - nonce: 1, - user_agent: "Kyoto Light Client / 0.1.0 / rust-bitcoin 0.32".to_string(), - start_height: 0, - relay: false, - }; + fn version_message(&mut self, port: Option) -> Result, PeerError> { + let msg = make_version(port, &self.network); let data = RawNetworkMessage::new(self.network.magic(), NetworkMessage::Version(msg)); - serialize(&data) + Ok(serialize(&data)) } - fn verack(&mut self) -> Vec { + fn verack(&mut self) -> Result, PeerError> { let data = RawNetworkMessage::new(self.network.magic(), NetworkMessage::Verack); - serialize(&data) + Ok(serialize(&data)) } - fn addr(&mut self) -> Vec { + fn addr(&mut self) -> Result, PeerError> { let data = RawNetworkMessage::new(self.network.magic(), NetworkMessage::GetAddr); - serialize(&data) + Ok(serialize(&data)) } - fn addrv2(&mut self) -> Vec { + fn addrv2(&mut self) -> Result, PeerError> { let data = RawNetworkMessage::new(self.network.magic(), NetworkMessage::SendAddrV2); - serialize(&data) + Ok(serialize(&data)) } - fn headers(&mut self, locator_hashes: Vec, stop_hash: Option) -> Vec { + fn headers( + &mut self, + locator_hashes: Vec, + stop_hash: Option, + ) -> Result, PeerError> { let msg = GetHeadersMessage::new(locator_hashes, stop_hash.unwrap_or(BlockHash::all_zeros())); let data = &mut RawNetworkMessage::new(self.network.magic(), NetworkMessage::GetHeaders(msg)); - serialize(&data) + Ok(serialize(&data)) } - fn cf_headers(&mut self, message: GetCFHeaders) -> Vec { + fn cf_headers(&mut self, message: GetCFHeaders) -> Result, PeerError> { let data = &mut RawNetworkMessage::new( self.network.magic(), NetworkMessage::GetCFHeaders(message), ); - serialize(&data) + Ok(serialize(&data)) } - fn filters(&mut self, message: GetCFilters) -> Vec { + fn filters(&mut self, message: GetCFilters) -> Result, PeerError> { let data = &mut RawNetworkMessage::new(self.network.magic(), NetworkMessage::GetCFilters(message)); - serialize(&data) + Ok(serialize(&data)) } - fn block(&mut self, config: GetBlockConfig) -> Vec { + fn block(&mut self, config: GetBlockConfig) -> Result, PeerError> { let inv = Inventory::Block(config.locator); let data = &mut RawNetworkMessage::new(self.network.magic(), NetworkMessage::GetData(vec![inv])); - serialize(&data) + Ok(serialize(&data)) } - fn pong(&mut self, nonce: u64) -> Vec { + fn pong(&mut self, nonce: u64) -> Result, PeerError> { let msg = NetworkMessage::Pong(nonce); let data = &mut RawNetworkMessage::new(self.network.magic(), msg); - serialize(&data) + Ok(serialize(&data)) } - fn transaction(&mut self, transaction: Transaction) -> Vec { + fn transaction(&mut self, transaction: Transaction) -> Result, PeerError> { let msg = NetworkMessage::Tx(transaction); let data = &mut RawNetworkMessage::new(self.network.magic(), msg); - serialize(&data) + Ok(serialize(&data)) + } +} + +pub(crate) struct V2OutboundMessage { + network: Network, + encryptor: PacketWriter, +} + +impl V2OutboundMessage { + pub(crate) fn new(network: Network, encryptor: PacketWriter) -> Self { + Self { network, encryptor } + } + + fn serialize_network_message(&self, message: NetworkMessage) -> Result, PeerError> { + bip324::serde::serialize(message).map_err(|_| PeerError::MessageSerialization) + } + + fn serialize_plaintext(&mut self, plaintext: Vec) -> Result, PeerError> { + self.encryptor + .prepare_packet_with_alloc(&plaintext, None, false) + .map_err(|_| PeerError::MessageSerialization) + } +} + +impl MessageGenerator for V2OutboundMessage { + fn version_message(&mut self, port: Option) -> Result, PeerError> { + let msg = make_version(port, &self.network); + let plaintext = self.serialize_network_message(NetworkMessage::Version(msg))?; + self.serialize_plaintext(plaintext) + } + + fn verack(&mut self) -> Result, PeerError> { + let plaintext = self.serialize_network_message(NetworkMessage::Verack)?; + self.serialize_plaintext(plaintext) + } + + fn addr(&mut self) -> Result, PeerError> { + let plaintext = self.serialize_network_message(NetworkMessage::GetAddr)?; + self.serialize_plaintext(plaintext) + } + + fn addrv2(&mut self) -> Result, PeerError> { + let plaintext = self.serialize_network_message(NetworkMessage::SendAddrV2)?; + self.serialize_plaintext(plaintext) + } + + fn headers( + &mut self, + locator_hashes: Vec, + stop_hash: Option, + ) -> Result, PeerError> { + let msg = + GetHeadersMessage::new(locator_hashes, stop_hash.unwrap_or(BlockHash::all_zeros())); + let plaintext = self.serialize_network_message(NetworkMessage::GetHeaders(msg))?; + self.serialize_plaintext(plaintext) + } + + fn cf_headers(&mut self, message: GetCFHeaders) -> Result, PeerError> { + let plaintext = self.serialize_network_message(NetworkMessage::GetCFHeaders(message))?; + self.serialize_plaintext(plaintext) + } + + fn filters(&mut self, message: GetCFilters) -> Result, PeerError> { + let plaintext = self.serialize_network_message(NetworkMessage::GetCFilters(message))?; + self.serialize_plaintext(plaintext) + } + + fn block(&mut self, config: GetBlockConfig) -> Result, PeerError> { + let inv = Inventory::Block(config.locator); + let plaintext = self.serialize_network_message(NetworkMessage::GetData(vec![inv]))?; + self.serialize_plaintext(plaintext) + } + + fn pong(&mut self, nonce: u64) -> Result, PeerError> { + let plaintext = self.serialize_network_message(NetworkMessage::Pong(nonce))?; + self.serialize_plaintext(plaintext) + } + + fn transaction(&mut self, transaction: Transaction) -> Result, PeerError> { + let plaintext = self.serialize_network_message(NetworkMessage::Tx(transaction))?; + self.serialize_plaintext(plaintext) } } diff --git a/src/peers/parsers.rs b/src/peers/parsers.rs index 23f3da1..f066cf6 100644 --- a/src/peers/parsers.rs +++ b/src/peers/parsers.rs @@ -1,4 +1,5 @@ use bip324::serde::NetworkMessage; +use bip324::{PacketReader, ReceivedMessage}; use bitcoin::consensus::{deserialize, deserialize_partial}; use bitcoin::p2p::message::RawNetworkMessage; use bitcoin::Network; @@ -57,3 +58,48 @@ impl MessageParser for V1MessageParser { Box::pin(self.do_read_message()) } } + +pub(crate) struct V2MessageParser { + stream: StreamReader, + decryptor: PacketReader, +} + +impl V2MessageParser { + pub(crate) fn new(stream: StreamReader, decryptor: PacketReader) -> Self { + Self { stream, decryptor } + } + + async fn do_read_message(&mut self) -> Result, PeerReadError> { + let mut stream = self.stream.lock().await; + let mut len_buf = [0; 3]; + let _ = stream + .read_exact(&mut len_buf) + .await + .map_err(|_| PeerReadError::ReadBuffer)?; + let message_len = self.decryptor.decypt_len(len_buf); + let mut response_message = vec![0; message_len]; + let _ = stream + .read_exact(&mut response_message) + .await + .map_err(|_| PeerReadError::ReadBuffer)?; + let msg = self + .decryptor + .decrypt_contents_with_alloc(&response_message, None) + .unwrap(); + let message = ReceivedMessage::new(&msg.clone()).unwrap(); + match message.message { + Some(message) => { + let parsed = bip324::serde::deserialize(&message) + .map_err(|_| PeerReadError::Deserialization)?; + Ok(Some(parsed)) + } + None => Ok(None), + } + } +} + +impl MessageParser for V2MessageParser { + fn read_message(&mut self) -> FutureResult, PeerReadError> { + Box::pin(self.do_read_message()) + } +} diff --git a/src/peers/peer.rs b/src/peers/peer.rs index 200e740..bc73f8b 100644 --- a/src/peers/peer.rs +++ b/src/peers/peer.rs @@ -1,7 +1,7 @@ extern crate tokio; use std::{ops::DerefMut, time::Duration}; -use bitcoin::Network; +use bitcoin::{p2p::ServiceFlags, Network}; use tokio::{ io::{AsyncWrite, AsyncWriteExt}, select, @@ -34,6 +34,7 @@ pub(crate) struct Peer { network: Network, message_counter: MessageCounter, message_timer: MessageTimer, + services: ServiceFlags, dialog: Dialog, } @@ -43,6 +44,7 @@ impl Peer { network: Network, main_thread_sender: Sender, main_thread_recv: Receiver, + services: ServiceFlags, dialog: Dialog, ) -> Self { let message_counter = MessageCounter::new(); @@ -54,6 +56,7 @@ impl Peer { network, message_counter, message_timer, + services, dialog, } } @@ -66,7 +69,7 @@ impl Peer { let mut lock = writer.lock().await; let writer = lock.deref_mut(); let mut outbound_messages = V1OutboundMessage::new(self.network); - let message = outbound_messages.version_message(None); + let message = outbound_messages.version_message(None)?; self.write_bytes(writer, message).await?; self.message_timer.track(); let (tx, mut rx) = mpsc::channel(32); @@ -227,7 +230,7 @@ impl Peer { Ok(()) } PeerMessage::Ping(nonce) => { - let message = message_generator.pong(nonce); + let message = message_generator.pong(nonce)?; self.write_bytes(writer, message).await?; Ok(()) } @@ -269,43 +272,43 @@ impl Peer { match request { MainThreadMessage::GetAddr => { self.message_counter.sent_addrs(); - let message = message_generator.addr(); + let message = message_generator.addr()?; self.write_bytes(writer, message).await?; } MainThreadMessage::GetAddrV2 => { self.message_counter.sent_addrs(); - let message = message_generator.addrv2(); + let message = message_generator.addrv2()?; self.write_bytes(writer, message).await?; } MainThreadMessage::GetHeaders(config) => { self.message_timer.track(); - let message = message_generator.headers(config.locators, config.stop_hash); + let message = message_generator.headers(config.locators, config.stop_hash)?; self.write_bytes(writer, message).await?; } MainThreadMessage::GetFilterHeaders(config) => { self.message_counter.sent_filter_header(); self.message_timer.track(); - let message = message_generator.cf_headers(config); + let message = message_generator.cf_headers(config)?; self.write_bytes(writer, message).await?; } MainThreadMessage::GetFilters(config) => { self.message_counter.sent_filters(); - let message = message_generator.filters(config); + let message = message_generator.filters(config)?; self.write_bytes(writer, message).await?; } MainThreadMessage::GetBlock(message) => { self.message_counter.sent_block(); self.message_timer.track(); - let message = message_generator.block(message); + let message = message_generator.block(message)?; self.write_bytes(writer, message).await?; } MainThreadMessage::BroadcastTx(transaction) => { self.message_counter.sent_tx(); - let message = message_generator.transaction(transaction); + let message = message_generator.transaction(transaction)?; self.write_bytes(writer, message).await?; } MainThreadMessage::Verack => { - let message = message_generator.verack(); + let message = message_generator.verack()?; self.write_bytes(writer, message).await?; } MainThreadMessage::Disconnect => return Err(PeerError::DisconnectCommand), diff --git a/src/peers/traits.rs b/src/peers/traits.rs index e184b8f..64e16a1 100644 --- a/src/peers/traits.rs +++ b/src/peers/traits.rs @@ -26,25 +26,29 @@ pub(crate) type StreamWriter = Mutex>; // Responsible for serializing messages to write over the wire, either encrypted or plaintext. pub(crate) trait MessageGenerator { - fn version_message(&mut self, port: Option) -> Vec; + fn version_message(&mut self, port: Option) -> Result, PeerError>; - fn verack(&mut self) -> Vec; + fn verack(&mut self) -> Result, PeerError>; - fn addr(&mut self) -> Vec; + fn addr(&mut self) -> Result, PeerError>; - fn addrv2(&mut self) -> Vec; + fn addrv2(&mut self) -> Result, PeerError>; - fn headers(&mut self, locator_hashes: Vec, stop_hash: Option) -> Vec; + fn headers( + &mut self, + locator_hashes: Vec, + stop_hash: Option, + ) -> Result, PeerError>; - fn cf_headers(&mut self, message: GetCFHeaders) -> Vec; + fn cf_headers(&mut self, message: GetCFHeaders) -> Result, PeerError>; - fn filters(&mut self, message: GetCFilters) -> Vec; + fn filters(&mut self, message: GetCFilters) -> Result, PeerError>; - fn block(&mut self, config: GetBlockConfig) -> Vec; + fn block(&mut self, config: GetBlockConfig) -> Result, PeerError>; - fn pong(&mut self, nonce: u64) -> Vec; + fn pong(&mut self, nonce: u64) -> Result, PeerError>; - fn transaction(&mut self, transaction: Transaction) -> Vec; + fn transaction(&mut self, transaction: Transaction) -> Result, PeerError>; } // Responsible for parsing plaintext or encrypted messages off of the wire.