Skip to content

Commit

Permalink
poller: introduce a communication channel with the poller thread
Browse files Browse the repository at this point in the history
We'll need to ask the poller thread another thing besides to shut down,
so it's cleaner to start using proper messages.

The mpsc channel in the std lib was buggy for awhile but since they
merged crossbeam and are using this behind the hood now it should be
fine starting with Rust 1.67. That's (slightly) higher than our MSRV but
it's what we use for releases so that's reasonable. See
rust-lang/rust#39364 for details.
  • Loading branch information
darosior committed Mar 15, 2024
1 parent 6c1e558 commit f53a40b
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 40 deletions.
48 changes: 34 additions & 14 deletions src/bitcoin/poller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ mod looper;
use crate::{bitcoin::BitcoinInterface, database::DatabaseInterface, descriptors};

use std::{
sync::{self, atomic},
thread, time,
sync::{self, mpsc},
time,
};

use miniscript::bitcoin::secp256k1;

#[derive(Debug, Clone)]
pub enum PollerMessage {
Shutdown,
}

/// The Bitcoin poller handler.
pub struct Poller {
bit: sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
Expand Down Expand Up @@ -50,29 +55,43 @@ impl Poller {
pub fn poll_forever(
&self,
poll_interval: time::Duration,
shutdown: sync::Arc<atomic::AtomicBool>,
receiver: mpsc::Receiver<PollerMessage>,
) {
let mut last_poll = None;
let mut synced = false;

while !shutdown.load(atomic::Ordering::Relaxed) || last_poll.is_none() {
let now = time::Instant::now();

if let Some(last_poll) = last_poll {
let time_since_poll = now.duration_since(last_poll);
loop {
// How long to wait before the next poll.
let time_before_poll = if let Some(last_poll) = last_poll {
let time_since_poll = time::Instant::now().duration_since(last_poll);
// Until we are synced we poll less often to avoid harassing bitcoind and impeding
// the sync. As a function since it's mocked for the tests.
let poll_interval = if synced {
poll_interval
} else {
// Until we are synced we poll less often to avoid harassing bitcoind and impeding
// the sync. As a function since it's mocked for the tests.
looper::sync_poll_interval()
};
if time_since_poll < poll_interval {
thread::sleep(time::Duration::from_millis(500));
continue;
poll_interval.saturating_sub(time_since_poll)
} else {
// Don't wait before doing the first poll.
time::Duration::ZERO
};

// Wait for the duration of the interval between polls, but listen to messages in the
// meantime.
match receiver.recv_timeout(time_before_poll) {
Ok(PollerMessage::Shutdown) => {
log::info!("Bitcoin poller was told to shut down.");
return;
}
Err(mpsc::RecvTimeoutError::Timeout) => {
// It's been long enough since the last poll.
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
log::error!("Bitcoin poller communication channel got disconnected. Exiting.");
return;
}
}
last_poll = Some(now);

// Don't poll until the Bitcoin backend is fully synced.
if !synced {
Expand All @@ -89,6 +108,7 @@ impl Poller {
}
}

last_poll = Some(time::Instant::now());
looper::poll(&self.bit, &self.db, &self.secp, &self.descs);
}
}
Expand Down
49 changes: 23 additions & 26 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ use crate::{
},
};

use std::{error, fmt, fs, io, path, sync, thread};
use std::{
error, fmt, fs, io, path,
sync::{self, mpsc},
thread,
};

use miniscript::bitcoin::secp256k1;

Expand Down Expand Up @@ -284,12 +288,12 @@ impl DaemonControl {
/// JSONRPC server or one which exposes its API through a `DaemonControl`.
pub enum DaemonHandle {
Controller {
poller_shutdown: sync::Arc<sync::atomic::AtomicBool>,
poller_sender: mpsc::Sender<poller::PollerMessage>,
poller_handle: thread::JoinHandle<()>,
control: DaemonControl,
},
Server {
poller_shutdown: sync::Arc<sync::atomic::AtomicBool>,
poller_sender: mpsc::Sender<poller::PollerMessage>,
poller_handle: thread::JoinHandle<()>,
rpcserver_shutdown: sync::Arc<sync::atomic::AtomicBool>,
rpcserver_handle: thread::JoinHandle<Result<(), io::Error>>,
Expand Down Expand Up @@ -368,15 +372,14 @@ impl DaemonHandle {
// an atomic to be able to stop it.
let bitcoin_poller =
poller::Poller::new(bit.clone(), db.clone(), config.main_descriptor.clone());
let poller_shutdown = sync::Arc::from(sync::atomic::AtomicBool::from(false));
let (poller_sender, poller_receiver) = mpsc::channel();
let poller_handle = thread::Builder::new()
.name("Bitcoin Network poller".to_string())
.spawn({
let poll_interval = config.bitcoin_config.poll_interval_secs;
let shutdown = poller_shutdown.clone();
move || {
log::info!("Bitcoin poller started.");
bitcoin_poller.poll_forever(poll_interval, shutdown);
bitcoin_poller.poll_forever(poll_interval, poller_receiver);
log::info!("Bitcoin poller stopped.");
}
})
Expand Down Expand Up @@ -406,14 +409,14 @@ impl DaemonHandle {
.expect("Spawning the RPC server thread should never fail.");

DaemonHandle::Server {
poller_shutdown,
poller_sender,
poller_handle,
rpcserver_shutdown,
rpcserver_handle,
}
} else {
DaemonHandle::Controller {
poller_shutdown,
poller_sender,
poller_handle,
control,
}
Expand Down Expand Up @@ -454,21 +457,25 @@ impl DaemonHandle {
pub fn stop(self) -> Result<(), Box<dyn error::Error>> {
match self {
Self::Controller {
poller_shutdown,
poller_sender,
poller_handle,
..
} => {
poller_shutdown.store(true, sync::atomic::Ordering::Relaxed);
poller_sender
.send(poller::PollerMessage::Shutdown)
.expect("The other end should never have hung up before this.");
poller_handle.join().expect("Poller thread must not panic");
Ok(())
}
Self::Server {
poller_shutdown,
poller_sender,
poller_handle,
rpcserver_shutdown,
rpcserver_handle,
} => {
poller_shutdown.store(true, sync::atomic::Ordering::Relaxed);
poller_sender
.send(poller::PollerMessage::Shutdown)
.expect("The other end should never have hung up before this.");
rpcserver_shutdown.store(true, sync::atomic::Ordering::Relaxed);
rpcserver_handle
.join()
Expand Down Expand Up @@ -656,18 +663,6 @@ mod tests {
stream.flush().unwrap();
}

// Send them a response to 'getblockchaininfo' saying we are far from being synced
fn complete_sync_check(server: &net::TcpListener) {
let net_resp = [
"HTTP/1.1 200\n\r\n{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"verificationprogress\":0.1,\"headers\":1000,\"blocks\":100}}\n".as_bytes(),
]
.concat();
let (mut stream, _) = server.accept().unwrap();
read_til_json_end(&mut stream);
stream.write_all(&net_resp).unwrap();
stream.flush().unwrap();
}

// TODO: we could move the dummy bitcoind thread stuff to the bitcoind module to test the
// bitcoind interface, and use the DummyLiana from testutils to sanity check the startup.
// Note that startup as checked by this unit test is also tested in the functional test
Expand Down Expand Up @@ -744,7 +739,8 @@ mod tests {
complete_wallet_check(&server, &wo_path);
complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string());
complete_tip_init(&server);
complete_sync_check(&server);
// We don't have to complete the sync check as the poller checks whether it needs to stop
// before checking the bitcoind sync status.
t.join().unwrap();

// The datadir is created now, so if we restart it it won't create the wo wallet.
Expand All @@ -761,7 +757,8 @@ mod tests {
complete_wallet_loading(&server);
complete_wallet_check(&server, &wo_path);
complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string());
complete_sync_check(&server);
// We don't have to complete the sync check as the poller checks whether it needs to stop
// before checking the bitcoind sync status.
t.join().unwrap();

fs::remove_dir_all(&tmp_dir).unwrap();
Expand Down

0 comments on commit f53a40b

Please sign in to comment.