Skip to content

Commit

Permalink
Merge pull request #125 from rustaceanrob/mutex-09-03
Browse files Browse the repository at this point in the history
feat(node): make `Node::run` immmutable
  • Loading branch information
rustaceanrob authored Sep 3, 2024
2 parents 4c625f0 + 73fef60 commit e83aedc
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 27 deletions.
2 changes: 1 addition & 1 deletion example/rescan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn main() {
// Create a new node builder
let builder = NodeBuilder::new(bitcoin::Network::Signet);
// Add node preferences and build the node/client
let (mut node, client) = builder
let (node, client) = builder
// The Bitcoin scripts to monitor
.add_scripts(addresses)
// Only scan blocks strictly after an anchor checkpoint
Expand Down
2 changes: 1 addition & 1 deletion example/signet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn main() {
// Create a new node builder
let builder = NodeBuilder::new(bitcoin::Network::Signet);
// Add node preferences and build the node/client
let (mut node, client) = builder
let (node, client) = builder
// Add the peers
.add_peers(vec![(peer_1, None).into(), peer_2])
// The Bitcoin scripts to monitor
Expand Down
2 changes: 1 addition & 1 deletion example/tor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn main() {
// Create a new node builder
let builder = NodeBuilder::new(bitcoin::Network::Signet);
// Add node preferences and build the node/client
let (mut node, client) = builder
let (node, client) = builder
// Add the peer
.add_peers(vec![peer])
// The Bitcoin scripts to monitor
Expand Down
16 changes: 9 additions & 7 deletions src/core/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ pub struct Node {
tx_broadcaster: Arc<Mutex<Broadcaster>>,
required_peers: usize,
dialog: Dialog,
client_recv: Receiver<ClientMessage>,
peer_recv: Receiver<PeerThreadMessage>,
client_recv: Arc<Mutex<Receiver<ClientMessage>>>,
peer_recv: Arc<Mutex<Receiver<PeerThreadMessage>>>,
is_running: AtomicBool,
filter_sync_policy: Arc<RwLock<FilterSyncPolicy>>,
}
Expand Down Expand Up @@ -141,8 +141,8 @@ impl Node {
tx_broadcaster,
required_peers,
dialog,
client_recv: crx,
peer_recv: mrx,
client_recv: Arc::new(Mutex::new(crx)),
peer_recv: Arc::new(Mutex::new(mrx)),
is_running: AtomicBool::new(false),
filter_sync_policy: Arc::new(RwLock::new(filter_sync_policy)),
},
Expand Down Expand Up @@ -181,12 +181,14 @@ impl Node {
/// # Errors
///
/// A node will cease running if a fatal error is encountered with either the [`PeerStore`] or [`HeaderStore`].
pub async fn run(&mut self) -> Result<(), NodeError> {
pub async fn run(&self) -> Result<(), NodeError> {
self.dialog.send_dialog("Starting node".into()).await;
self.is_running
.store(true, std::sync::atomic::Ordering::Relaxed);
self.fetch_headers().await?;
let mut last_block = LastBlockMonitor::new();
let mut peer_recv = self.peer_recv.lock().await;
let mut client_recv = self.client_recv.lock().await;
loop {
// Try to advance the state of the node
self.advance_state(&last_block).await;
Expand All @@ -198,7 +200,7 @@ impl Node {
self.broadcast_transactions().await;
// Either handle a message from a remote peer or from our client
select! {
peer = tokio::time::timeout(Duration::from_secs(LOOP_TIMEOUT), self.peer_recv.recv()) => {
peer = tokio::time::timeout(Duration::from_secs(LOOP_TIMEOUT), peer_recv.recv()) => {
match peer {
Ok(Some(peer_thread)) => {
match peer_thread.message {
Expand Down Expand Up @@ -283,7 +285,7 @@ impl Node {
_ => continue,
}
},
message = self.client_recv.recv() => {
message = client_recv.recv() => {
if let Some(message) = message {
match message {
ClientMessage::Shutdown => return Ok(()),
Expand Down
34 changes: 17 additions & 17 deletions tests/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ async fn test_reorg() {
let mut scripts = HashSet::new();
let other = rpc.get_new_address(None, None).unwrap().assume_checked();
scripts.insert(other.into());
let (mut node, client) = new_node(scripts.clone()).await;
let (node, client) = new_node(scripts.clone()).await;
tokio::task::spawn(async move { node.run().await });
let (sender, mut recv) = client.split();
sync_assert(&best, &mut recv).await;
Expand Down Expand Up @@ -186,7 +186,7 @@ async fn test_mine_after_reorg() {
let mut scripts = HashSet::new();
let other = rpc.get_new_address(None, None).unwrap().assume_checked();
scripts.insert(other.into());
let (mut node, client) = new_node(scripts.clone()).await;
let (node, client) = new_node(scripts.clone()).await;
tokio::task::spawn(async move { node.run().await });
let (_, mut recv) = client.split();
sync_assert(&best, &mut recv).await;
Expand Down Expand Up @@ -278,7 +278,7 @@ async fn test_broadcast_success() {
// Make sure we sync up to the tip under usual conditions.
let mut scripts = HashSet::new();
scripts.insert(burn.into());
let (mut node, client) = new_node(scripts.clone()).await;
let (node, client) = new_node(scripts.clone()).await;
tokio::task::spawn(async move { node.run().await });
let (sender, mut recv) = client.split();
// Broadcast the transaction to the network
Expand Down Expand Up @@ -362,7 +362,7 @@ async fn test_broadcast_fail() {
println!("Built unsigned transaction with TXID {}", tx.compute_txid());
let mut scripts = HashSet::new();
scripts.insert(burn.into());
let (mut node, client) = new_node(scripts.clone()).await;
let (node, client) = new_node(scripts.clone()).await;
tokio::task::spawn(async move { node.run().await });
let (sender, mut recv) = client.split();
// Broadcast the transaction to the network
Expand Down Expand Up @@ -409,7 +409,7 @@ async fn test_long_chain() {
let mut scripts = HashSet::new();
let other = rpc.get_new_address(None, None).unwrap().assume_checked();
scripts.insert(other.into());
let (mut node, client) = new_node(scripts.clone()).await;
let (node, client) = new_node(scripts.clone()).await;
tokio::task::spawn(async move { node.run().await });
let (sender, mut recv) = client.split();
sync_assert(&best, &mut recv).await;
Expand Down Expand Up @@ -440,7 +440,7 @@ async fn test_sql_reorg() {
let mut scripts = HashSet::new();
let other = rpc.get_new_address(None, None).unwrap().assume_checked();
scripts.insert(other.into());
let (mut node, client) = new_node_sql(scripts.clone()).await;
let (node, client) = new_node_sql(scripts.clone()).await;
tokio::task::spawn(async move { node.run().await });
let (_, mut recv) = client.split();
sync_assert(&best, &mut recv).await;
Expand All @@ -452,7 +452,7 @@ async fn test_sql_reorg() {
mine_blocks(&rpc, &miner, 2, 1).await;
let best = rpc.get_best_block_hash().unwrap();
// Spin up the node on a cold start
let (mut node, client) = new_node_sql(scripts.clone()).await;
let (node, client) = new_node_sql(scripts.clone()).await;
tokio::task::spawn(async move { node.run().await });
let (_, mut recv) = client.split();
// Make sure the reorganization is caught after a cold start
Expand All @@ -478,7 +478,7 @@ async fn test_sql_reorg() {
mine_blocks(&rpc, &miner, 2, 1).await;
let best = rpc.get_best_block_hash().unwrap();
// Make sure the node does not have any corrupted headers
let (mut node, client) = new_node_sql(scripts.clone()).await;
let (node, client) = new_node_sql(scripts.clone()).await;
tokio::task::spawn(async move { node.run().await });
let (_, mut recv) = client.split();
// The node properly syncs after persisting a reorg
Expand Down Expand Up @@ -510,7 +510,7 @@ async fn test_two_deep_reorg() {
let mut scripts = HashSet::new();
let other = rpc.get_new_address(None, None).unwrap().assume_checked();
scripts.insert(other.into());
let (mut node, client) = new_node_sql(scripts.clone()).await;
let (node, client) = new_node_sql(scripts.clone()).await;
tokio::task::spawn(async move { node.run().await });
let (_, mut recv) = client.split();
sync_assert(&best, &mut recv).await;
Expand All @@ -524,7 +524,7 @@ async fn test_two_deep_reorg() {
mine_blocks(&rpc, &miner, 3, 1).await;
let best = rpc.get_best_block_hash().unwrap();
// Make sure the reorganization is caught after a cold start
let (mut node, client) = new_node_sql(scripts.clone()).await;
let (node, client) = new_node_sql(scripts.clone()).await;
tokio::task::spawn(async move { node.run().await });
let (_, mut recv) = client.split();
while let Ok(message) = recv.recv().await {
Expand All @@ -549,7 +549,7 @@ async fn test_two_deep_reorg() {
mine_blocks(&rpc, &miner, 2, 1).await;
let best = rpc.get_best_block_hash().unwrap();
// Make sure the node does not have any corrupted headers
let (mut node, client) = new_node_sql(scripts.clone()).await;
let (node, client) = new_node_sql(scripts.clone()).await;
tokio::task::spawn(async move { node.run().await });
let (_, mut recv) = client.split();
// The node properly syncs after persisting a reorg
Expand Down Expand Up @@ -584,7 +584,7 @@ async fn test_sql_stale_anchor() {
let mut scripts = HashSet::new();
let other = rpc.get_new_address(None, None).unwrap().assume_checked();
scripts.insert(other.into());
let (mut node, client) = new_node_sql(scripts.clone()).await;
let (node, client) = new_node_sql(scripts.clone()).await;
tokio::task::spawn(async move { node.run().await });
let (_, mut recv) = client.split();
sync_assert(&best, &mut recv).await;
Expand All @@ -596,7 +596,7 @@ async fn test_sql_stale_anchor() {
mine_blocks(&rpc, &miner, 2, 1).await;
let best = rpc.get_best_block_hash().unwrap();
// Spin up the node on a cold start with a stale tip
let (mut node, client) = new_node_anchor_sql(
let (node, client) = new_node_anchor_sql(
scripts.clone(),
HeaderCheckpoint::new(old_height as u32, old_best),
)
Expand Down Expand Up @@ -627,7 +627,7 @@ async fn test_sql_stale_anchor() {
let old_height = rpc.get_block_count().unwrap();
let best = rpc.get_best_block_hash().unwrap();
// Make sure the node does not have any corrupted headers
let (mut node, client) = new_node_anchor_sql(
let (node, client) = new_node_anchor_sql(
scripts.clone(),
HeaderCheckpoint::new(old_height as u32, cp),
)
Expand All @@ -643,7 +643,7 @@ async fn test_sql_stale_anchor() {
mine_blocks(&rpc, &miner, 2, 1).await;
let best = rpc.get_best_block_hash().unwrap();
// Make sure the node does not have any corrupted headers
let (mut node, client) = new_node_anchor_sql(
let (node, client) = new_node_anchor_sql(
scripts.clone(),
HeaderCheckpoint::new(old_height as u32, cp),
)
Expand Down Expand Up @@ -676,7 +676,7 @@ async fn test_halting_works() {

let host = (IpAddr::from(Ipv4Addr::new(0, 0, 0, 0)), Some(PORT));
let builder = kyoto::core::builder::NodeBuilder::new(bitcoin::Network::Regtest);
let (mut node, client) = builder
let (node, client) = builder
.add_peers(vec![host.into()])
.add_scripts(scripts)
.halt_filter_download()
Expand Down Expand Up @@ -719,7 +719,7 @@ async fn test_signet_syncs() {
set.insert(address);
let host = (IpAddr::from(Ipv4Addr::new(68, 47, 229, 218)), None);
let builder = kyoto::core::builder::NodeBuilder::new(bitcoin::Network::Signet);
let (mut node, client) = builder
let (node, client) = builder
.add_peers(vec![host.into()])
.add_scripts(set)
.build_with_databases(StatelessPeerStore::new(), ());
Expand Down

0 comments on commit e83aedc

Please sign in to comment.