Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(peers): add V2 transport traits #69

Merged
merged 1 commit into from
Aug 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/node/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@ impl Node {
self.dialog
.send_warning(Warning::NotEnoughConnections)
.await;
let ip = peer_map.next_peer().await?;
if peer_map.dispatch(ip.0, ip.1).await.is_err() {
let address = peer_map.next_peer().await?;
if peer_map.dispatch(address).await.is_err() {
self.dialog.send_warning(Warning::CouldNotConnect).await;
}
}
Expand Down
29 changes: 17 additions & 12 deletions src/node/peer_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl PeerMap {
}

// Send out a TCP connection to a new peer and begin tracking the task
pub async fn dispatch(&mut self, address: AddrV2, port: u16) -> Result<(), PeerError> {
pub async fn dispatch(&mut self, loaded_peer: PersistedPeer) -> Result<(), PeerError> {
let (ptx, prx) = mpsc::channel::<MainThreadMessage>(32);
let peer_num = self.num_peers + 1;
self.num_peers = peer_num;
Expand All @@ -159,23 +159,29 @@ impl PeerMap {
self.network,
self.mtx.clone(),
prx,
loaded_peer.services,
self.dialog.clone(),
);
let mut connector = self.connector.lock().await;
if !connector.can_connect(&address) {
if !connector.can_connect(&loaded_peer.addr) {
return Err(PeerError::UnreachableSocketAddr);
}
self.dialog
.send_dialog(format!("Connecting to {:?}:{}", address, port))
.send_dialog(format!(
"Connecting to {:?}:{}",
loaded_peer.addr, loaded_peer.port
))
.await;
let (reader, writer) = connector.connect(address.clone(), port).await?;
let (reader, writer) = connector
.connect(loaded_peer.addr.clone(), loaded_peer.port)
.await?;
let handle = tokio::spawn(async move { peer.run(reader, writer).await });
self.map.insert(
peer_num,
ManagedPeer {
service_flags: None,
address,
port,
address: loaded_peer.addr,
port: loaded_peer.port,
net_time: 0,
ptx,
handle,
Expand Down Expand Up @@ -233,13 +239,14 @@ impl PeerMap {

// Pull a peer from the configuration if we have one. If not, select a random peer from the database,
// as long as it is not from the same netgroup. If there are no peers in the database, try DNS.
pub async fn next_peer(&mut self) -> Result<(AddrV2, u16), NodeError> {
pub async fn next_peer(&mut self) -> Result<PersistedPeer, NodeError> {
if let Some(whitelist) = &mut self.whitelist {
if let Some((address, port)) = whitelist.pop() {
self.dialog
.send_dialog("Using a configured peer.".into())
.await;
return Ok((address, port));
let peer = PersistedPeer::new(address, port, ServiceFlags::NONE, PeerStatus::Tried);
return Ok(peer);
}
}
let current_count = {
Expand All @@ -257,12 +264,11 @@ impl PeerMap {
let mut peer_manager = self.db.lock().await;
let mut tries = 0;
while tries < 10 {
let peer: (AddrV2, u16) = peer_manager
let peer = peer_manager
.random()
.await
.map(|r| r.into())
.map_err(|e| NodeError::PeerDatabase(PeerManagerError::Database(e)))?;
if self.net_groups.contains(&peer.0.netgroup()) {
if self.net_groups.contains(&peer.addr.netgroup()) {
tries += 1;
continue;
} else {
Expand All @@ -272,7 +278,6 @@ impl PeerMap {
peer_manager
.random()
.await
.map(|r| r.into())
.map_err(|e| NodeError::PeerDatabase(PeerManagerError::Database(e)))
}

Expand Down
8 changes: 8 additions & 0 deletions src/peers/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ impl_sourceless_error!(PeerReadError);
#[derive(Debug)]
pub enum PeerError {
ConnectionFailed,
MessageSerialization,
HandshakeFailed,
BufferWrite,
ThreadChannel,
DisconnectCommand,
Expand All @@ -54,6 +56,12 @@ impl core::fmt::Display for PeerError {
PeerError::UnreachableSocketAddr => {
write!(f, "cannot make use of provided p2p address.")
}
PeerError::MessageSerialization => {
write!(f, "serializing a message into bytes failed.")
}
PeerError::HandshakeFailed => {
write!(f, "an attempted V2 transport handshake failed.")
}
}
}
}
Expand Down
175 changes: 133 additions & 42 deletions src/peers/outbound_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
time::{SystemTime, UNIX_EPOCH},
};

use bip324::PacketWriter;
use bitcoin::{
consensus::serialize,
hashes::Hash,
Expand All @@ -18,7 +19,7 @@ use bitcoin::{

use crate::{node::channel_messages::GetBlockConfig, prelude::default_port_from_network};

use super::traits::MessageGenerator;
use super::{error::PeerError, traits::MessageGenerator};

pub const PROTOCOL_VERSION: u32 = 70016;

Expand All @@ -32,86 +33,176 @@ impl V1OutboundMessage {
}
}

fn make_version(port: Option<u16>, network: &Network) -> VersionMessage {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time went backwards")
.as_secs();
let default_port = default_port_from_network(network);
let ip = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port.unwrap_or(default_port),
);
let from_and_recv = Address::new(&ip, ServiceFlags::NONE);
let msg = VersionMessage {
version: PROTOCOL_VERSION,
services: ServiceFlags::NONE,
timestamp: now as i64,
receiver: from_and_recv.clone(),
sender: from_and_recv,
nonce: 1,
user_agent: "Kyoto Light Client / 0.1.0 / rust-bitcoin 0.32".to_string(),
start_height: 0,
relay: false,
};
msg
}

impl MessageGenerator for V1OutboundMessage {
fn version_message(&mut self, port: Option<u16>) -> Vec<u8> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time went backwards")
.as_secs();
let default_port = default_port_from_network(&self.network);
let ip = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port.unwrap_or(default_port),
);
let from_and_recv = Address::new(&ip, ServiceFlags::NONE);
let msg = VersionMessage {
version: PROTOCOL_VERSION,
services: ServiceFlags::NONE,
timestamp: now as i64,
receiver: from_and_recv.clone(),
sender: from_and_recv,
nonce: 1,
user_agent: "Kyoto Light Client / 0.1.0 / rust-bitcoin 0.32".to_string(),
start_height: 0,
relay: false,
};
fn version_message(&mut self, port: Option<u16>) -> Result<Vec<u8>, PeerError> {
let msg = make_version(port, &self.network);
let data = RawNetworkMessage::new(self.network.magic(), NetworkMessage::Version(msg));
serialize(&data)
Ok(serialize(&data))
}

fn verack(&mut self) -> Vec<u8> {
fn verack(&mut self) -> Result<Vec<u8>, PeerError> {
let data = RawNetworkMessage::new(self.network.magic(), NetworkMessage::Verack);
serialize(&data)
Ok(serialize(&data))
}

fn addr(&mut self) -> Vec<u8> {
fn addr(&mut self) -> Result<Vec<u8>, PeerError> {
let data = RawNetworkMessage::new(self.network.magic(), NetworkMessage::GetAddr);
serialize(&data)
Ok(serialize(&data))
}

fn addrv2(&mut self) -> Vec<u8> {
fn addrv2(&mut self) -> Result<Vec<u8>, PeerError> {
let data = RawNetworkMessage::new(self.network.magic(), NetworkMessage::SendAddrV2);
serialize(&data)
Ok(serialize(&data))
}

fn headers(&mut self, locator_hashes: Vec<BlockHash>, stop_hash: Option<BlockHash>) -> Vec<u8> {
fn headers(
&mut self,
locator_hashes: Vec<BlockHash>,
stop_hash: Option<BlockHash>,
) -> Result<Vec<u8>, PeerError> {
let msg =
GetHeadersMessage::new(locator_hashes, stop_hash.unwrap_or(BlockHash::all_zeros()));
let data =
&mut RawNetworkMessage::new(self.network.magic(), NetworkMessage::GetHeaders(msg));
serialize(&data)
Ok(serialize(&data))
}

fn cf_headers(&mut self, message: GetCFHeaders) -> Vec<u8> {
fn cf_headers(&mut self, message: GetCFHeaders) -> Result<Vec<u8>, PeerError> {
let data = &mut RawNetworkMessage::new(
self.network.magic(),
NetworkMessage::GetCFHeaders(message),
);
serialize(&data)
Ok(serialize(&data))
}

fn filters(&mut self, message: GetCFilters) -> Vec<u8> {
fn filters(&mut self, message: GetCFilters) -> Result<Vec<u8>, PeerError> {
let data =
&mut RawNetworkMessage::new(self.network.magic(), NetworkMessage::GetCFilters(message));
serialize(&data)
Ok(serialize(&data))
}

fn block(&mut self, config: GetBlockConfig) -> Vec<u8> {
fn block(&mut self, config: GetBlockConfig) -> Result<Vec<u8>, PeerError> {
let inv = Inventory::Block(config.locator);
let data =
&mut RawNetworkMessage::new(self.network.magic(), NetworkMessage::GetData(vec![inv]));
serialize(&data)
Ok(serialize(&data))
}

fn pong(&mut self, nonce: u64) -> Vec<u8> {
fn pong(&mut self, nonce: u64) -> Result<Vec<u8>, PeerError> {
let msg = NetworkMessage::Pong(nonce);
let data = &mut RawNetworkMessage::new(self.network.magic(), msg);
serialize(&data)
Ok(serialize(&data))
}

fn transaction(&mut self, transaction: Transaction) -> Vec<u8> {
fn transaction(&mut self, transaction: Transaction) -> Result<Vec<u8>, PeerError> {
let msg = NetworkMessage::Tx(transaction);
let data = &mut RawNetworkMessage::new(self.network.magic(), msg);
serialize(&data)
Ok(serialize(&data))
}
}

pub(crate) struct V2OutboundMessage {
network: Network,
encryptor: PacketWriter,
}

impl V2OutboundMessage {
pub(crate) fn new(network: Network, encryptor: PacketWriter) -> Self {
Self { network, encryptor }
}

fn serialize_network_message(&self, message: NetworkMessage) -> Result<Vec<u8>, PeerError> {
bip324::serde::serialize(message).map_err(|_| PeerError::MessageSerialization)
}

fn serialize_plaintext(&mut self, plaintext: Vec<u8>) -> Result<Vec<u8>, PeerError> {
self.encryptor
.prepare_packet_with_alloc(&plaintext, None, false)
.map_err(|_| PeerError::MessageSerialization)
}
}

impl MessageGenerator for V2OutboundMessage {
fn version_message(&mut self, port: Option<u16>) -> Result<Vec<u8>, PeerError> {
let msg = make_version(port, &self.network);
let plaintext = self.serialize_network_message(NetworkMessage::Version(msg))?;
self.serialize_plaintext(plaintext)
}

fn verack(&mut self) -> Result<Vec<u8>, PeerError> {
let plaintext = self.serialize_network_message(NetworkMessage::Verack)?;
self.serialize_plaintext(plaintext)
}

fn addr(&mut self) -> Result<Vec<u8>, PeerError> {
let plaintext = self.serialize_network_message(NetworkMessage::GetAddr)?;
self.serialize_plaintext(plaintext)
}

fn addrv2(&mut self) -> Result<Vec<u8>, PeerError> {
let plaintext = self.serialize_network_message(NetworkMessage::SendAddrV2)?;
self.serialize_plaintext(plaintext)
}

fn headers(
&mut self,
locator_hashes: Vec<BlockHash>,
stop_hash: Option<BlockHash>,
) -> Result<Vec<u8>, PeerError> {
let msg =
GetHeadersMessage::new(locator_hashes, stop_hash.unwrap_or(BlockHash::all_zeros()));
let plaintext = self.serialize_network_message(NetworkMessage::GetHeaders(msg))?;
self.serialize_plaintext(plaintext)
}

fn cf_headers(&mut self, message: GetCFHeaders) -> Result<Vec<u8>, PeerError> {
let plaintext = self.serialize_network_message(NetworkMessage::GetCFHeaders(message))?;
self.serialize_plaintext(plaintext)
}

fn filters(&mut self, message: GetCFilters) -> Result<Vec<u8>, PeerError> {
let plaintext = self.serialize_network_message(NetworkMessage::GetCFilters(message))?;
self.serialize_plaintext(plaintext)
}

fn block(&mut self, config: GetBlockConfig) -> Result<Vec<u8>, PeerError> {
let inv = Inventory::Block(config.locator);
let plaintext = self.serialize_network_message(NetworkMessage::GetData(vec![inv]))?;
self.serialize_plaintext(plaintext)
}

fn pong(&mut self, nonce: u64) -> Result<Vec<u8>, PeerError> {
let plaintext = self.serialize_network_message(NetworkMessage::Pong(nonce))?;
self.serialize_plaintext(plaintext)
}

fn transaction(&mut self, transaction: Transaction) -> Result<Vec<u8>, PeerError> {
let plaintext = self.serialize_network_message(NetworkMessage::Tx(transaction))?;
self.serialize_plaintext(plaintext)
}
}
Loading
Loading