Skip to content

Commit

Permalink
client: minimal tx broadcast, add new scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
rustaceanrob committed Jun 12, 2024
1 parent a158d9e commit 0991f07
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 18 deletions.
3 changes: 3 additions & 0 deletions example/signet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ async fn main() {
NodeMessage::BlocksDisconnected(r) => {
let _ = r;
}
NodeMessage::TxBroadcastFailure => {
tracing::error!("The transaction could not be broadcast.")
}
NodeMessage::Synced(tip) => {
tracing::info!("Synced chain up to block {}", tip.height,);
tracing::info!("Chain tip: {}", tip.hash.to_string(),);
Expand Down
6 changes: 6 additions & 0 deletions src/chain/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,12 @@ impl Chain {
.iter()
.any(|out| self.scripts.contains(&out.script_pubkey))
}

pub(crate) fn put_scripts(&mut self, scripts: HashSet<ScriptBuf>) {
for script in scripts {
self.scripts.insert(script);
}
}
}

#[cfg(test)]
Expand Down
15 changes: 6 additions & 9 deletions src/db/sqlite/header_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,12 @@ impl HeaderStore for SqliteHeaderDb {
"db corruption. incorrect header hash."
);

match headers.values().last() {
Some(header) => {
assert_eq!(
header.block_hash(),
next_header.prev_blockhash,
"db corruption. headers do not link."
);
}
None => (),
if let Some(header) = headers.values().last() {
assert_eq!(
header.block_hash(),
next_header.prev_blockhash,
"db corruption. headers do not link."
);
}
headers.insert(height, next_header);
}
Expand Down
27 changes: 27 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,30 @@ impl IndexedBlock {
Self { height, block }
}
}

/// Broadcast a [`Transaction`] to a set of connected peers.
#[derive(Debug, Clone)]
pub struct TxBroadcast {
/// The presumably valid Bitcoin transaction.
pub tx: Transaction,
/// The strategy for how this transaction should be shared with the network
pub broadcast_policy: TxBroadcastPolicy,
}

impl TxBroadcast {
pub fn new(tx: Transaction, broadcast_policy: TxBroadcastPolicy) -> Self {
Self {
tx,
broadcast_policy,
}
}
}

/// The strategy for how this transaction should be shared with the network
#[derive(Debug, Clone)]
pub enum TxBroadcastPolicy {
/// Broadcast the transaction to all peers at the same time.
AllPeers,
/// Broadcast the transaction to a single random peer, optimal for user privacy.
RandomPeer,
}
3 changes: 2 additions & 1 deletion src/node/channel_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bitcoin::{
message_filter::{CFHeaders, CFilter, GetCFHeaders, GetCFilters},
Address, ServiceFlags,
},
Block, BlockHash,
Block, BlockHash, Transaction,
};

#[derive(Debug, Clone)]
Expand All @@ -15,6 +15,7 @@ pub(crate) enum MainThreadMessage {
GetFilters(GetCFilters),
GetBlock(GetBlockConfig),
Disconnect,
BroadcastTx(Transaction),
// more messages
}

Expand Down
16 changes: 13 additions & 3 deletions src/node/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
pub use bitcoin::{Block, Transaction};
use std::collections::HashSet;

pub use bitcoin::{Block, ScriptBuf, Transaction};
use tokio::sync::broadcast;
pub use tokio::sync::broadcast::Receiver;
pub use tokio::sync::mpsc::Sender;

use crate::{IndexedBlock, IndexedTransaction};
use crate::{IndexedBlock, IndexedTransaction, TxBroadcast};

use super::{
error::ClientError,
Expand Down Expand Up @@ -150,10 +152,18 @@ impl ClientSender {
}

/// Broadcast a new transaction to the network.
pub async fn broadcast_tx(&mut self, tx: Transaction) -> Result<(), ClientError> {
pub async fn broadcast_tx(&mut self, tx: TxBroadcast) -> Result<(), ClientError> {
self.ntx
.send(ClientMessage::Broadcast(tx))
.await
.map_err(|_| ClientError::SendError)
}

/// Add more Bitcoin [`ScriptBuf`] to watch for. Does not rescan the filters.
pub async fn add_scripts(&mut self, scripts: HashSet<ScriptBuf>) -> Result<(), ClientError> {
self.ntx
.send(ClientMessage::AddScripts(scripts))
.await
.map_err(|_| ClientError::SendError)
}
}
11 changes: 9 additions & 2 deletions src/node/messages.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::collections::HashSet;

use bitcoin::ScriptBuf;
pub use bitcoin::{Block, Transaction};

use crate::{
chain::checkpoints::HeaderCheckpoint, DisconnectedHeader, IndexedBlock, IndexedTransaction,
TxBroadcast,
};

/// Messages receivable by a running node.
Expand All @@ -19,13 +23,16 @@ pub enum NodeMessage {
Synced(HeaderCheckpoint),
/// Blocks were reorganized out of the chain
BlocksDisconnected(Vec<DisconnectedHeader>),
/// A problem occured sending a transaction.
TxBroadcastFailure,
}

/// Commands to issue a node.
#[derive(Debug, Clone)]
pub enum ClientMessage {
/// Stop the node
Shutdown,
/// Broadcast a [`Transaction`]
Broadcast(Transaction),
/// Broadcast a [`Transaction`] with a [`crate::TxBroadcastPolicy`]
Broadcast(TxBroadcast),
AddScripts(HashSet<ScriptBuf>),
}
28 changes: 26 additions & 2 deletions src/node/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use bitcoin::{
message_filter::{CFHeaders, CFilter},
Address, ServiceFlags,
},
Block, Network,
Block, Network, ScriptBuf,
};
use rand::{prelude::SliceRandom, rngs::StdRng, SeedableRng};
use tokio::sync::{broadcast, mpsc::Receiver, Mutex, RwLock};
Expand All @@ -31,6 +31,7 @@ use crate::{
filters::cfheader_chain::CFHeaderSyncResult,
node::{error::PersistenceError, peer_map::PeerMap},
peers::dns::Dns,
TxBroadcastPolicy,
};

use super::{
Expand Down Expand Up @@ -275,7 +276,25 @@ impl Node {
if let Some(message) = message {
match message {
ClientMessage::Shutdown => return Ok(()),
ClientMessage::Broadcast(tx) => drop(tx),
ClientMessage::Broadcast(transaction) => {
let connected_peers = node_map.live();
if connected_peers < 1 {
self.dialog.send_data(NodeMessage::TxBroadcastFailure).await;
continue;
}
match transaction.broadcast_policy {
TxBroadcastPolicy::AllPeers => {
self.dialog.send_dialog(format!("Sending transaction to {} connected peers.", connected_peers))
.await;
node_map.broadcast(MainThreadMessage::BroadcastTx(transaction.tx)).await
},
TxBroadcastPolicy::RandomPeer => {
self.dialog.send_dialog("Sending transaction to a random peer.".into())
.await;
node_map.send_random(MainThreadMessage::BroadcastTx(transaction.tx)).await },
}
},
ClientMessage::AddScripts(scripts) => self.add_scripts(scripts).await,
}
}
}
Expand Down Expand Up @@ -633,4 +652,9 @@ impl Node {
}
}
}

async fn add_scripts(&mut self, scripts: HashSet<ScriptBuf>) {
let mut chain = self.chain.lock().await;
chain.put_scripts(scripts);
}
}
8 changes: 7 additions & 1 deletion src/peers/outbound_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use bitcoin::{
message_network::VersionMessage,
Address, ServiceFlags,
},
BlockHash, Network,
BlockHash, Network, Transaction,
};

use crate::{node::channel_messages::GetBlockConfig, prelude::default_port_from_network};
Expand Down Expand Up @@ -103,4 +103,10 @@ impl V1OutboundMessage {
let data = &mut RawNetworkMessage::new(self.network.magic(), msg);
serialize(&data)
}

pub(crate) fn new_transaction(&self, transaction: Transaction) -> Vec<u8> {
let msg = NetworkMessage::Tx(transaction);
let data = &mut RawNetworkMessage::new(self.network.magic(), msg);
serialize(&data)
}
}
7 changes: 7 additions & 0 deletions src/peers/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,13 @@ impl Peer {
.await
.map_err(|_| PeerError::BufferWrite)?;
}
MainThreadMessage::BroadcastTx(transaction) => {
let message = message_generator.new_transaction(transaction);
writer
.write_all(&message)
.await
.map_err(|_| PeerError::BufferWrite)?;
}
MainThreadMessage::Disconnect => return Err(PeerError::DisconnectCommand),
}
Ok(())
Expand Down

0 comments on commit 0991f07

Please sign in to comment.