Skip to content

Commit

Permalink
Merge pull request #57 from rustaceanrob/conn
Browse files Browse the repository at this point in the history
Conn
  • Loading branch information
rustaceanrob authored Aug 5, 2024
2 parents 629238b + 2ba0188 commit 1add428
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 50 deletions.
5 changes: 4 additions & 1 deletion src/db/memory/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use bitcoin::{
};
use rand::{rngs::StdRng, SeedableRng};

use crate::db::{error::DatabaseError, traits::PeerStore, FutureResult, PeerStatus, PersistedPeer};
use crate::{
db::{error::DatabaseError, traits::PeerStore, PeerStatus, PersistedPeer},
prelude::FutureResult,
};

/// A simple peer store that does not save state in between sessions.
/// If DNS is not enabled, a node will require at least one peer to connect to.
Expand Down
5 changes: 0 additions & 5 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use core::future::Future;
use core::pin::Pin;

use bitcoin::p2p::address::AddrV2;
use bitcoin::p2p::ServiceFlags;

Expand All @@ -14,8 +11,6 @@ pub mod sqlite;
/// Traits that define the header and peer databases.
pub mod traits;

pub(crate) type FutureResult<'a, T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>;

/// A peer that will be saved to the [`traits::PeerStore`].
#[derive(Debug, Clone, PartialEq)]
pub struct PersistedPeer {
Expand Down
2 changes: 1 addition & 1 deletion src/db/sqlite/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::sync::Mutex;

use crate::db::error::DatabaseError;
use crate::db::traits::HeaderStore;
use crate::db::FutureResult;
use crate::prelude::FutureResult;

const SCHEMA: &str = "CREATE TABLE IF NOT EXISTS headers (
height INTEGER PRIMARY KEY,
Expand Down
3 changes: 2 additions & 1 deletion src/db/sqlite/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use tokio::sync::Mutex;

use crate::db::error::DatabaseError;
use crate::db::traits::PeerStore;
use crate::db::{FutureResult, PeerStatus, PersistedPeer};
use crate::db::{PeerStatus, PersistedPeer};
use crate::prelude::FutureResult;

const PEER_SCHEMA: &str = "CREATE TABLE IF NOT EXISTS peers (
ip_addr BLOB PRIMARY KEY,
Expand Down
4 changes: 3 additions & 1 deletion src/db/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use std::collections::BTreeMap;

use bitcoin::{block::Header, BlockHash};

use super::{error::DatabaseError, FutureResult, PersistedPeer};
use crate::prelude::FutureResult;

use super::{error::DatabaseError, PersistedPeer};

/// Methods required to persist the chain of block headers.
pub trait HeaderStore {
Expand Down
5 changes: 5 additions & 0 deletions src/node/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ pub enum Warning {
NotEnoughConnections,
/// A connection to a peer timed out.
PeerTimedOut,
/// The node was unable to connect to a peer in the database.
CouldNotConnect,
/// A peer sent us a peer-to-peer message the node did not request.
UnsolicitedMessage,
/// The provided anchor is deeper than the database history.
Expand Down Expand Up @@ -126,6 +128,9 @@ impl core::fmt::Display for Warning {
f,
"The provided anchor is deeper than the database history."
),
Warning::CouldNotConnect => {
write!(f, "An attempted connection failed or timed out.")
}
Warning::TransactionRejected => write!(f, "A transaction got rejected."),
Warning::FailedPersistance { warning } => {
write!(f, "A database failed to persist some data: {}", warning)
Expand Down
4 changes: 3 additions & 1 deletion src/node/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@ impl Node {
.send_warning(Warning::NotEnoughConnections)
.await;
let ip = peer_map.next_peer().await?;
peer_map.dispatch(ip.0, ip.1).await;
if let Err(_) = peer_map.dispatch(ip.0, ip.1).await {
self.dialog.send_warning(Warning::CouldNotConnect).await;
}
}
Ok(())
}
Expand Down
19 changes: 15 additions & 4 deletions src/node/peer_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ use tokio::{

use crate::{
db::{error::PeerManagerError, traits::PeerStore, PeerStatus, PersistedPeer},
peers::error::PeerError,
peers::peer::Peer,
peers::{
error::PeerError,
peer::Peer,
traits::{ClearNetConnection, NetworkConnector},
},
prelude::{Median, Netgroup},
};

Expand Down Expand Up @@ -56,6 +59,7 @@ pub(crate) struct PeerMap {
mtx: Sender<PeerThreadMessage>,
map: HashMap<u32, ManagedPeer>,
db: Arc<Mutex<dyn PeerStore + Send + Sync>>,
connector: Arc<Mutex<dyn NetworkConnector + Send + Sync>>,
whitelist: Whitelist,
dialog: Dialog,
target_db_size: u32,
Expand All @@ -78,6 +82,7 @@ impl PeerMap {
mtx,
map: HashMap::new(),
db: Arc::new(Mutex::new(db)),
connector: Arc::new(Mutex::new(ClearNetConnection::new())),
whitelist,
dialog,
target_db_size,
Expand Down Expand Up @@ -135,7 +140,7 @@ impl PeerMap {
}

// Send out a TCP connection to a new peer and begin tracking the task
pub async fn dispatch(&mut self, ip: AddrV2, port: u16) {
pub async fn dispatch(&mut self, ip: AddrV2, port: u16) -> Result<(), PeerError> {
let (ptx, prx) = mpsc::channel::<MainThreadMessage>(32);
let peer_num = self.num_peers + 1;
self.num_peers = peer_num;
Expand All @@ -148,7 +153,12 @@ impl PeerMap {
prx,
self.dialog.clone(),
);
let handle = tokio::spawn(async move { peer.connect().await });
let mut connector = self.connector.lock().await;
if !connector.can_connect(&ip) {
return Err(PeerError::UnreachableSocketAddr);
}
let (reader, writer) = connector.connect(ip.clone(), port).await?;
let handle = tokio::spawn(async move { peer.run(reader, writer).await });
self.dialog
.send_dialog(format!("Connecting to {:?}:{}", ip, port))
.await;
Expand All @@ -163,6 +173,7 @@ impl PeerMap {
handle,
},
);
Ok(())
}

// Set the services of a peer
Expand Down
4 changes: 2 additions & 2 deletions src/peers/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl_sourceless_error!(PeerReadError);

#[derive(Debug)]
pub enum PeerError {
TcpConnectionFailed,
ConnectionFailed,
BufferWrite,
ThreadChannel,
DisconnectCommand,
Expand All @@ -38,7 +38,7 @@ pub enum PeerError {
impl core::fmt::Display for PeerError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
PeerError::TcpConnectionFailed => {
PeerError::ConnectionFailed => {
write!(f, "the peer's TCP port was closed or we could not connect.")
}
PeerError::BufferWrite => write!(f, "a message could not be written to the peer."),
Expand Down
40 changes: 8 additions & 32 deletions src/peers/peer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
extern crate tokio;
use std::{net::IpAddr, time::Duration};
use std::time::Duration;

use bitcoin::{p2p::address::AddrV2, Network};
use tokio::{
io::{AsyncWrite, AsyncWriteExt},
net::TcpStream,
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
select,
sync::mpsc::{self, Receiver, Sender},
};
Expand Down Expand Up @@ -64,38 +63,15 @@ impl Peer {
}
}

pub async fn connect(&mut self) -> Result<(), PeerError> {
let socket_addr = match self.ip_addr {
AddrV2::Ipv4(ip) => IpAddr::V4(ip),
AddrV2::Ipv6(ip) => IpAddr::V6(ip),
_ => return Err(PeerError::UnreachableSocketAddr),
};
let timeout = tokio::time::timeout(
Duration::from_secs(CONNECTION_TIMEOUT),
TcpStream::connect((socket_addr, self.port)),
)
.await
.map_err(|_| PeerError::TcpConnectionFailed)?;
// Replace with generalization
let mut stream: TcpStream;
if let Ok(tcp) = timeout {
stream = tcp;
} else {
self.dialog.send_warning(Warning::PeerTimedOut).await;
let _ = self
.main_thread_sender
.send(PeerThreadMessage {
nonce: self.nonce,
message: PeerMessage::Disconnect,
})
.await;
return Err(PeerError::TcpConnectionFailed);
}
pub async fn run<R, W>(&mut self, reader: R, mut writer: W) -> Result<(), PeerError>
where
R: AsyncRead + Send + Sync + Unpin + 'static,
W: AsyncWrite + Send + Sync + Unpin,
{
let mut outbound_messages = V1OutboundMessage::new(self.network);
let message = outbound_messages.version_message(None);
self.write_bytes(&mut stream, message).await?;
self.write_bytes(&mut writer, message).await?;
self.message_timer.track();
let (reader, mut writer) = stream.into_split();
let (tx, mut rx) = mpsc::channel(32);
let mut peer_reader = Reader::new(reader, tx, self.network);
let read_handle = tokio::spawn(async move {
Expand Down
78 changes: 76 additions & 2 deletions src/peers/traits.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
use core::fmt::Debug;
use std::{net::IpAddr, time::Duration};

use bitcoin::{
p2p::message_filter::{GetCFHeaders, GetCFilters},
p2p::{
address::AddrV2,
message_filter::{GetCFHeaders, GetCFilters},
},
BlockHash, Transaction,
};
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpStream,
};

use crate::{node::channel_messages::GetBlockConfig, prelude::FutureResult};

use super::error::PeerError;

use crate::node::channel_messages::GetBlockConfig;
const CONNECTION_TIMEOUT: u64 = 2;

type Reader = Box<dyn AsyncRead + Send + Sync + Unpin>;
type Writer = Box<dyn AsyncWrite + Send + Sync + Unpin>;

pub(crate) trait MessageGenerator {
fn version_message(&mut self, port: Option<u16>) -> Vec<u8>;
Expand All @@ -26,3 +43,60 @@ pub(crate) trait MessageGenerator {

fn transaction(&mut self, transaction: Transaction) -> Vec<u8>;
}

pub(crate) trait NetworkConnector {
fn can_connect(&self, addr: &AddrV2) -> bool;

fn connect(&mut self, addr: AddrV2, port: u16) -> FutureResult<(Reader, Writer), PeerError>;
}

pub(crate) struct ClearNetConnection {}

impl ClearNetConnection {
pub(crate) fn new() -> Self {
Self {}
}
}

impl NetworkConnector for ClearNetConnection {
fn can_connect(&self, addr: &AddrV2) -> bool {
match addr {
AddrV2::Ipv4(_) => true,
AddrV2::Ipv6(_) => true,
_ => false,
}
}

fn connect(&mut self, addr: AddrV2, port: u16) -> FutureResult<(Reader, Writer), PeerError> {
async fn do_impl(addr: AddrV2, port: u16) -> Result<(Reader, Writer), PeerError> {
let socket_addr = match addr {
AddrV2::Ipv4(ip) => IpAddr::V4(ip),
AddrV2::Ipv6(ip) => IpAddr::V6(ip),
_ => return Err(PeerError::UnreachableSocketAddr),
};
let timeout = tokio::time::timeout(
Duration::from_secs(CONNECTION_TIMEOUT),
TcpStream::connect((socket_addr, port)),
)
.await
.map_err(|_| PeerError::ConnectionFailed)?;
match timeout {
Ok(stream) => {
let (reader, writer) = stream.into_split();
Ok((Box::new(reader), Box::new(writer)))
}
Err(_) => Err(PeerError::ConnectionFailed),
}
}
Box::pin(do_impl(addr, port))
}
}

impl Debug for dyn NetworkConnector + Send + Sync {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Generic connection. Either TCP, Tor, or something else concrete"
)
}
}
4 changes: 4 additions & 0 deletions src/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use core::{future::Future, pin::Pin};

use bitcoin::{hex::DisplayHex, p2p::address::AddrV2, params::Params, Network};

pub const MAX_FUTURE_BLOCK_TIME: i64 = 60 * 60 * 2;
pub const MEDIAN_TIME_PAST: usize = 11;

pub(crate) type FutureResult<'a, T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>;

#[macro_export]
macro_rules! impl_sourceless_error {
($e:ident) => {
Expand Down

0 comments on commit 1add428

Please sign in to comment.