Skip to content

Commit

Permalink
Merge pull request #69 from rustaceanrob/v2-08-10
Browse files Browse the repository at this point in the history
  • Loading branch information
rustaceanrob authored Aug 10, 2024
2 parents c10f183 + f6eeee2 commit 616e759
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 77 deletions.
4 changes: 2 additions & 2 deletions src/node/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@ impl Node {
self.dialog
.send_warning(Warning::NotEnoughConnections)
.await;
let ip = peer_map.next_peer().await?;
if peer_map.dispatch(ip.0, ip.1).await.is_err() {
let address = peer_map.next_peer().await?;
if peer_map.dispatch(address).await.is_err() {
self.dialog.send_warning(Warning::CouldNotConnect).await;
}
}
Expand Down
29 changes: 17 additions & 12 deletions src/node/peer_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl PeerMap {
}

// Send out a TCP connection to a new peer and begin tracking the task
pub async fn dispatch(&mut self, address: AddrV2, port: u16) -> Result<(), PeerError> {
pub async fn dispatch(&mut self, loaded_peer: PersistedPeer) -> Result<(), PeerError> {
let (ptx, prx) = mpsc::channel::<MainThreadMessage>(32);
let peer_num = self.num_peers + 1;
self.num_peers = peer_num;
Expand All @@ -159,23 +159,29 @@ impl PeerMap {
self.network,
self.mtx.clone(),
prx,
loaded_peer.services,
self.dialog.clone(),
);
let mut connector = self.connector.lock().await;
if !connector.can_connect(&address) {
if !connector.can_connect(&loaded_peer.addr) {
return Err(PeerError::UnreachableSocketAddr);
}
self.dialog
.send_dialog(format!("Connecting to {:?}:{}", address, port))
.send_dialog(format!(
"Connecting to {:?}:{}",
loaded_peer.addr, loaded_peer.port
))
.await;
let (reader, writer) = connector.connect(address.clone(), port).await?;
let (reader, writer) = connector
.connect(loaded_peer.addr.clone(), loaded_peer.port)
.await?;
let handle = tokio::spawn(async move { peer.run(reader, writer).await });
self.map.insert(
peer_num,
ManagedPeer {
service_flags: None,
address,
port,
address: loaded_peer.addr,
port: loaded_peer.port,
net_time: 0,
ptx,
handle,
Expand Down Expand Up @@ -233,13 +239,14 @@ impl PeerMap {

// Pull a peer from the configuration if we have one. If not, select a random peer from the database,
// as long as it is not from the same netgroup. If there are no peers in the database, try DNS.
pub async fn next_peer(&mut self) -> Result<(AddrV2, u16), NodeError> {
pub async fn next_peer(&mut self) -> Result<PersistedPeer, NodeError> {
if let Some(whitelist) = &mut self.whitelist {
if let Some((address, port)) = whitelist.pop() {
self.dialog
.send_dialog("Using a configured peer.".into())
.await;
return Ok((address, port));
let peer = PersistedPeer::new(address, port, ServiceFlags::NONE, PeerStatus::Tried);
return Ok(peer);
}
}
let current_count = {
Expand All @@ -257,12 +264,11 @@ impl PeerMap {
let mut peer_manager = self.db.lock().await;
let mut tries = 0;
while tries < 10 {
let peer: (AddrV2, u16) = peer_manager
let peer = peer_manager
.random()
.await
.map(|r| r.into())
.map_err(|e| NodeError::PeerDatabase(PeerManagerError::Database(e)))?;
if self.net_groups.contains(&peer.0.netgroup()) {
if self.net_groups.contains(&peer.addr.netgroup()) {
tries += 1;
continue;
} else {
Expand All @@ -272,7 +278,6 @@ impl PeerMap {
peer_manager
.random()
.await
.map(|r| r.into())
.map_err(|e| NodeError::PeerDatabase(PeerManagerError::Database(e)))
}

Expand Down
8 changes: 8 additions & 0 deletions src/peers/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ impl_sourceless_error!(PeerReadError);
#[derive(Debug)]
pub enum PeerError {
ConnectionFailed,
MessageSerialization,
HandshakeFailed,
BufferWrite,
ThreadChannel,
DisconnectCommand,
Expand All @@ -54,6 +56,12 @@ impl core::fmt::Display for PeerError {
PeerError::UnreachableSocketAddr => {
write!(f, "cannot make use of provided p2p address.")
}
PeerError::MessageSerialization => {
write!(f, "serializing a message into bytes failed.")
}
PeerError::HandshakeFailed => {
write!(f, "an attempted V2 transport handshake failed.")
}
}
}
}
Expand Down
175 changes: 133 additions & 42 deletions src/peers/outbound_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
time::{SystemTime, UNIX_EPOCH},
};

use bip324::PacketWriter;
use bitcoin::{
consensus::serialize,
hashes::Hash,
Expand All @@ -18,7 +19,7 @@ use bitcoin::{

use crate::{node::channel_messages::GetBlockConfig, prelude::default_port_from_network};

use super::traits::MessageGenerator;
use super::{error::PeerError, traits::MessageGenerator};

pub const PROTOCOL_VERSION: u32 = 70016;

Expand All @@ -32,86 +33,176 @@ impl V1OutboundMessage {
}
}

fn make_version(port: Option<u16>, network: &Network) -> VersionMessage {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time went backwards")
.as_secs();
let default_port = default_port_from_network(network);
let ip = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port.unwrap_or(default_port),
);
let from_and_recv = Address::new(&ip, ServiceFlags::NONE);
let msg = VersionMessage {
version: PROTOCOL_VERSION,
services: ServiceFlags::NONE,
timestamp: now as i64,
receiver: from_and_recv.clone(),
sender: from_and_recv,
nonce: 1,
user_agent: "Kyoto Light Client / 0.1.0 / rust-bitcoin 0.32".to_string(),
start_height: 0,
relay: false,
};
msg
}

impl MessageGenerator for V1OutboundMessage {
fn version_message(&mut self, port: Option<u16>) -> Vec<u8> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time went backwards")
.as_secs();
let default_port = default_port_from_network(&self.network);
let ip = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port.unwrap_or(default_port),
);
let from_and_recv = Address::new(&ip, ServiceFlags::NONE);
let msg = VersionMessage {
version: PROTOCOL_VERSION,
services: ServiceFlags::NONE,
timestamp: now as i64,
receiver: from_and_recv.clone(),
sender: from_and_recv,
nonce: 1,
user_agent: "Kyoto Light Client / 0.1.0 / rust-bitcoin 0.32".to_string(),
start_height: 0,
relay: false,
};
fn version_message(&mut self, port: Option<u16>) -> Result<Vec<u8>, PeerError> {
let msg = make_version(port, &self.network);
let data = RawNetworkMessage::new(self.network.magic(), NetworkMessage::Version(msg));
serialize(&data)
Ok(serialize(&data))
}

fn verack(&mut self) -> Vec<u8> {
fn verack(&mut self) -> Result<Vec<u8>, PeerError> {
let data = RawNetworkMessage::new(self.network.magic(), NetworkMessage::Verack);
serialize(&data)
Ok(serialize(&data))
}

fn addr(&mut self) -> Vec<u8> {
fn addr(&mut self) -> Result<Vec<u8>, PeerError> {
let data = RawNetworkMessage::new(self.network.magic(), NetworkMessage::GetAddr);
serialize(&data)
Ok(serialize(&data))
}

fn addrv2(&mut self) -> Vec<u8> {
fn addrv2(&mut self) -> Result<Vec<u8>, PeerError> {
let data = RawNetworkMessage::new(self.network.magic(), NetworkMessage::SendAddrV2);
serialize(&data)
Ok(serialize(&data))
}

fn headers(&mut self, locator_hashes: Vec<BlockHash>, stop_hash: Option<BlockHash>) -> Vec<u8> {
fn headers(
&mut self,
locator_hashes: Vec<BlockHash>,
stop_hash: Option<BlockHash>,
) -> Result<Vec<u8>, PeerError> {
let msg =
GetHeadersMessage::new(locator_hashes, stop_hash.unwrap_or(BlockHash::all_zeros()));
let data =
&mut RawNetworkMessage::new(self.network.magic(), NetworkMessage::GetHeaders(msg));
serialize(&data)
Ok(serialize(&data))
}

fn cf_headers(&mut self, message: GetCFHeaders) -> Vec<u8> {
fn cf_headers(&mut self, message: GetCFHeaders) -> Result<Vec<u8>, PeerError> {
let data = &mut RawNetworkMessage::new(
self.network.magic(),
NetworkMessage::GetCFHeaders(message),
);
serialize(&data)
Ok(serialize(&data))
}

fn filters(&mut self, message: GetCFilters) -> Vec<u8> {
fn filters(&mut self, message: GetCFilters) -> Result<Vec<u8>, PeerError> {
let data =
&mut RawNetworkMessage::new(self.network.magic(), NetworkMessage::GetCFilters(message));
serialize(&data)
Ok(serialize(&data))
}

fn block(&mut self, config: GetBlockConfig) -> Vec<u8> {
fn block(&mut self, config: GetBlockConfig) -> Result<Vec<u8>, PeerError> {
let inv = Inventory::Block(config.locator);
let data =
&mut RawNetworkMessage::new(self.network.magic(), NetworkMessage::GetData(vec![inv]));
serialize(&data)
Ok(serialize(&data))
}

fn pong(&mut self, nonce: u64) -> Vec<u8> {
fn pong(&mut self, nonce: u64) -> Result<Vec<u8>, PeerError> {
let msg = NetworkMessage::Pong(nonce);
let data = &mut RawNetworkMessage::new(self.network.magic(), msg);
serialize(&data)
Ok(serialize(&data))
}

fn transaction(&mut self, transaction: Transaction) -> Vec<u8> {
fn transaction(&mut self, transaction: Transaction) -> Result<Vec<u8>, PeerError> {
let msg = NetworkMessage::Tx(transaction);
let data = &mut RawNetworkMessage::new(self.network.magic(), msg);
serialize(&data)
Ok(serialize(&data))
}
}

pub(crate) struct V2OutboundMessage {
network: Network,
encryptor: PacketWriter,
}

impl V2OutboundMessage {
pub(crate) fn new(network: Network, encryptor: PacketWriter) -> Self {
Self { network, encryptor }
}

fn serialize_network_message(&self, message: NetworkMessage) -> Result<Vec<u8>, PeerError> {
bip324::serde::serialize(message).map_err(|_| PeerError::MessageSerialization)
}

fn serialize_plaintext(&mut self, plaintext: Vec<u8>) -> Result<Vec<u8>, PeerError> {
self.encryptor
.prepare_packet_with_alloc(&plaintext, None, false)
.map_err(|_| PeerError::MessageSerialization)
}
}

impl MessageGenerator for V2OutboundMessage {
fn version_message(&mut self, port: Option<u16>) -> Result<Vec<u8>, PeerError> {
let msg = make_version(port, &self.network);
let plaintext = self.serialize_network_message(NetworkMessage::Version(msg))?;
self.serialize_plaintext(plaintext)
}

fn verack(&mut self) -> Result<Vec<u8>, PeerError> {
let plaintext = self.serialize_network_message(NetworkMessage::Verack)?;
self.serialize_plaintext(plaintext)
}

fn addr(&mut self) -> Result<Vec<u8>, PeerError> {
let plaintext = self.serialize_network_message(NetworkMessage::GetAddr)?;
self.serialize_plaintext(plaintext)
}

fn addrv2(&mut self) -> Result<Vec<u8>, PeerError> {
let plaintext = self.serialize_network_message(NetworkMessage::SendAddrV2)?;
self.serialize_plaintext(plaintext)
}

fn headers(
&mut self,
locator_hashes: Vec<BlockHash>,
stop_hash: Option<BlockHash>,
) -> Result<Vec<u8>, PeerError> {
let msg =
GetHeadersMessage::new(locator_hashes, stop_hash.unwrap_or(BlockHash::all_zeros()));
let plaintext = self.serialize_network_message(NetworkMessage::GetHeaders(msg))?;
self.serialize_plaintext(plaintext)
}

fn cf_headers(&mut self, message: GetCFHeaders) -> Result<Vec<u8>, PeerError> {
let plaintext = self.serialize_network_message(NetworkMessage::GetCFHeaders(message))?;
self.serialize_plaintext(plaintext)
}

fn filters(&mut self, message: GetCFilters) -> Result<Vec<u8>, PeerError> {
let plaintext = self.serialize_network_message(NetworkMessage::GetCFilters(message))?;
self.serialize_plaintext(plaintext)
}

fn block(&mut self, config: GetBlockConfig) -> Result<Vec<u8>, PeerError> {
let inv = Inventory::Block(config.locator);
let plaintext = self.serialize_network_message(NetworkMessage::GetData(vec![inv]))?;
self.serialize_plaintext(plaintext)
}

fn pong(&mut self, nonce: u64) -> Result<Vec<u8>, PeerError> {
let plaintext = self.serialize_network_message(NetworkMessage::Pong(nonce))?;
self.serialize_plaintext(plaintext)
}

fn transaction(&mut self, transaction: Transaction) -> Result<Vec<u8>, PeerError> {
let plaintext = self.serialize_network_message(NetworkMessage::Tx(transaction))?;
self.serialize_plaintext(plaintext)
}
}
Loading

0 comments on commit 616e759

Please sign in to comment.