Skip to content

Commit

Permalink
resume when connection is lost
Browse files Browse the repository at this point in the history
The miner will pause when the connection is lost and resume when the connection is re-established.
  • Loading branch information
x100111010 committed Jun 22, 2024
1 parent e438270 commit 25ab8bf
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 23 deletions.
65 changes: 45 additions & 20 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use chrono::Local;
use clap::Parser;
use log::{info, warn};
use log::{info, warn, error};
use std::error::Error as StdError;
use std::{
io::Write,
Expand All @@ -12,6 +12,7 @@ use std::{
},
time::Duration,
};
use tokio::sync::mpsc;

// Add sysinfo crate for system information
use sysinfo::System;
Expand Down Expand Up @@ -110,29 +111,53 @@ async fn main() -> Result<(), Error> {
let shutdown = ShutdownHandler(Arc::new(AtomicBool::new(false)));
let _shutdown_when_dropped = shutdown.arm();

let (send_channel, _) = mpsc::channel(100);
let mut miner_manager = MinerManager::new(
send_channel.clone(),
opt.num_threads,
throttle,
shutdown.clone(),
);

while !shutdown.is_shutdown() {
let mut client = SpectredHandler::connect(
match SpectredHandler::connect(
opt.spectred_address.clone(),
opt.mining_address.clone(),
opt.mine_when_not_synced,
)
.await?;
if let Some(devfund_address) = &opt.devfund_address {
client.add_devfund(devfund_address.clone(), opt.devfund_percent);
info!(
"devfund enabled, mining {}.{}% of the time to devfund address: {} ",
opt.devfund_percent / 100,
opt.devfund_percent % 100,
devfund_address
);
).await {
Ok(mut client) => {
if let Some(devfund_address) = &opt.devfund_address {
client.add_devfund(devfund_address.clone(), opt.devfund_percent);
info!(
"devfund enabled, mining {}.{}% of the time to devfund address: {} ",
opt.devfund_percent / 100,
opt.devfund_percent % 100,
devfund_address
);
}
if let Err(e) = client.client_send(NotifyNewBlockTemplateRequestMessage {}).await {
error!("Error sending block template request: {}", e);
}
if let Err(e) = client.client_get_block_template().await {
error!("Error getting block template: {}", e);
}

miner_manager.resume();

if let Err(e) = client.listen(&mut miner_manager, shutdown.clone()).await {
warn!("Connection error: {}. Reconnecting in 10 seconds...", e);
miner_manager.pause();
} else {
warn!("Disconnected from spectred. Reconnecting in 10 seconds...");
miner_manager.pause();
}
}
Err(e) => {
warn!("Failed to connect to spectred: {}. Retrying in 10 seconds...", e);
miner_manager.pause();
}
}
client.client_send(NotifyNewBlockTemplateRequestMessage {}).await?;
client.client_get_block_template().await?;

let mut miner_manager =
MinerManager::new(client.send_channel.clone(), opt.num_threads, throttle, shutdown.clone());
client.listen(&mut miner_manager, shutdown.clone()).await?;
warn!("Disconnected from spectred, retrying");
tokio::time::sleep(Duration::from_secs(10)).await;
}
Ok(())
}
}
24 changes: 22 additions & 2 deletions src/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use rand::{thread_rng, RngCore};
use std::{
num::Wrapping,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc,
},
time::Duration,
Expand All @@ -31,6 +31,7 @@ pub struct MinerManager {
is_synced: bool,
hashes_tried: Arc<AtomicU64>,
current_state_id: AtomicUsize,
paused: Arc<AtomicBool>,
}

impl Drop for MinerManager {
Expand All @@ -56,13 +57,15 @@ impl MinerManager {
) -> Self {
let hashes_tried = Arc::new(AtomicU64::new(0));
let watch = WatchSwap::empty();
let paused = Arc::new(AtomicBool::new(false));
let handles = Self::launch_cpu_threads(
send_channel.clone(),
hashes_tried.clone(),
watch.clone(),
shutdown,
n_cpus,
throttle,
paused.clone(),
)
.collect();

Expand All @@ -74,6 +77,7 @@ impl MinerManager {
is_synced: true,
hashes_tried,
current_state_id: AtomicUsize::new(0),
paused,
}
}

Expand All @@ -84,6 +88,7 @@ impl MinerManager {
shutdown: ShutdownHandler,
n_cpus: Option<u16>,
throttle: Option<Duration>,
paused: Arc<AtomicBool>,
) -> impl Iterator<Item = MinerHandler> {
let n_cpus = get_num_cpus(n_cpus);
info!("Launching: {} cpu miners", n_cpus);
Expand All @@ -94,6 +99,7 @@ impl MinerManager {
hashes_tried.clone(),
throttle,
shutdown.clone(),
paused.clone(),
)
})
}
Expand All @@ -117,12 +123,21 @@ impl MinerManager {
Ok(())
}

pub fn pause(&self) {
self.paused.store(true, Ordering::SeqCst);
}

pub fn resume(&self) {
self.paused.store(false, Ordering::SeqCst);
}

pub fn launch_cpu_miner(
send_channel: Sender<SpectredMessage>,
mut block_channel: WatchSwap<pow::State>,
hashes_tried: Arc<AtomicU64>,
throttle: Option<Duration>,
shutdown: ShutdownHandler,
paused: Arc<AtomicBool>,
) -> MinerHandler {
// We mark it cold as the function is not called often, and it's not in the hot path
#[cold]
Expand All @@ -137,6 +152,11 @@ impl MinerManager {
std::thread::spawn(move || {
let mut state = None;
loop {
if paused.load(Ordering::SeqCst) {
std::thread::sleep(Duration::from_millis(100));
continue;
}

if state.is_none() {
state = block_channel.wait_for_change().as_deref().cloned();
}
Expand Down Expand Up @@ -240,4 +260,4 @@ mod benches {
}
});
}
}
}
2 changes: 1 addition & 1 deletion src/pow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,4 +370,4 @@ mod tests {
hasher.write(buf.0);
assert_eq!(hasher.finalize(), expected_hash);
}
}
}

0 comments on commit 25ab8bf

Please sign in to comment.