diff --git a/src-tauri/src/cpu_miner.rs b/src-tauri/src/cpu_miner.rs index 2dc0bb05d..50e1d67b7 100644 --- a/src-tauri/src/cpu_miner.rs +++ b/src-tauri/src/cpu_miner.rs @@ -1,4 +1,6 @@ use crate::app_config::MiningMode; +use crate::mm_proxy_manager::MmProxyManager; +use crate::process_adapter::ProcessAdapter; use crate::xmrig::http_api::XmrigHttpApiClient; use crate::xmrig_adapter::{XmrigAdapter, XmrigNodeConnection}; use crate::{ @@ -76,17 +78,14 @@ impl CpuMiner { }; let xmrig = XmrigAdapter::new( xmrig_node_connection, - "44AFFq5kSiGBoZ4NMDwYtN18obc8AemS33DBLWs3H7otXft3XjrpDtQGv7SqSsaBYBb98uNbr2VBBEt7f2wfn3RVGQBEP3A".to_string() - ); - let (mut _rx, mut xmrig_child, client) = xmrig.spawn( + "44AFFq5kSiGBoZ4NMDwYtN18obc8AemS33DBLWs3H7otXft3XjrpDtQGv7SqSsaBYBb98uNbr2VBBEt7f2wfn3RVGQBEP3A".to_string(), cache_dir, log_dir, - base_path, progress_tracker, cpu_max_percentage, - )?; - - self.api_client = Some(client); + ); + let (mut xmrig_child, _xmrig_status_monitor) = xmrig.spawn_inner(base_path.clone())?; + self.api_client = Some(xmrig.client); self.watcher_task = Some(tauri::async_runtime::spawn(async move { println!("Starting process"); @@ -97,12 +96,12 @@ impl CpuMiner { select! { _ = watch_timer.tick() => { println!("watching"); - if xmrig_child.ping().expect("idk") { + if xmrig_child.ping() { println!("xmrig is running"); } else { println!("xmrig is not running"); match xmrig_child.stop().await { - Ok(()) => { + Ok(_) => { println!("xmrig exited successfully"); } Err(e) => { diff --git a/src-tauri/src/merge_mining_adapter.rs b/src-tauri/src/merge_mining_adapter.rs index 581e3a16f..dfa4cc5d5 100644 --- a/src-tauri/src/merge_mining_adapter.rs +++ b/src-tauri/src/merge_mining_adapter.rs @@ -1,15 +1,12 @@ use crate::binary_resolver::{Binaries, BinaryResolver}; use crate::process_adapter::{ProcessAdapter, ProcessInstance, StatusMonitor}; use anyhow::Error; -use async_trait::async_trait; use log::{debug, warn}; use std::fs; use std::path::PathBuf; use tari_common_types::tari_address::TariAddress; use tari_shutdown::Shutdown; -use tokio::runtime::Handle; use tokio::select; -use tokio::task::JoinHandle; const LOG_TARGET: &str = "tari::universe::merge_mining_proxy_adapter"; @@ -26,13 +23,12 @@ impl MergeMiningProxyAdapter { } impl ProcessAdapter for MergeMiningProxyAdapter { - type Instance = MergeMiningProxyInstance; type StatusMonitor = MergeMiningProxyStatusMonitor; fn spawn_inner( &self, data_dir: PathBuf, - ) -> Result<(Self::Instance, Self::StatusMonitor), Error> { + ) -> Result<(ProcessInstance, Self::StatusMonitor), Error> { let inner_shutdown = Shutdown::new(); let shutdown_signal = inner_shutdown.to_signal(); @@ -55,7 +51,7 @@ impl ProcessAdapter for MergeMiningProxyAdapter { "merge_mining_proxy.wait_for_initial_sync_at_startup=false".to_string(), ]; Ok(( - MergeMiningProxyInstance { + ProcessInstance { shutdown: inner_shutdown, handle: Some(tokio::spawn(async move { let file_path = BinaryResolver::current() @@ -117,41 +113,6 @@ impl ProcessAdapter for MergeMiningProxyAdapter { } } -pub struct MergeMiningProxyInstance { - pub shutdown: Shutdown, - handle: Option>>, -} - pub struct MergeMiningProxyStatusMonitor {} -#[async_trait] -impl ProcessInstance for MergeMiningProxyInstance { - fn ping(&self) -> bool { - self.handle - .as_ref() - .map(|m| !m.is_finished()) - .unwrap_or_else(|| false) - } - - async fn stop(&mut self) -> Result { - self.shutdown.trigger(); - let handle = self.handle.take(); - let res = handle.unwrap().await??; - Ok(res) - } -} -impl Drop for MergeMiningProxyInstance { - fn drop(&mut self) { - println!("Drop being called"); - self.shutdown.trigger(); - if let Some(handle) = self.handle.take() { - Handle::current().block_on(async move { - let _ = handle.await.unwrap().map_err(|e| { - warn!(target: LOG_TARGET, "Error in MergeMiningProxyInstance: {}", e); - }); - }); - } - } -} - impl StatusMonitor for MergeMiningProxyStatusMonitor {} diff --git a/src-tauri/src/node_adapter.rs b/src-tauri/src/node_adapter.rs index 4768015ed..e0c569fbf 100644 --- a/src-tauri/src/node_adapter.rs +++ b/src-tauri/src/node_adapter.rs @@ -3,7 +3,6 @@ use crate::node_manager::NodeIdentity; use crate::process_adapter::{ProcessAdapter, ProcessInstance, StatusMonitor}; use crate::ProgressTracker; use anyhow::{anyhow, Error}; -use async_trait::async_trait; use humantime::format_duration; use log::{debug, info, warn}; use minotari_node_grpc_client::grpc::{Empty, HeightRequest, NewBlockTemplateRequest, PowAlgo}; @@ -15,9 +14,7 @@ use tari_core::transactions::tari_amount::MicroMinotari; use tari_crypto::ristretto::RistrettoPublicKey; use tari_shutdown::Shutdown; use tari_utilities::ByteArray; -use tokio::runtime::Handle; use tokio::select; -use tokio::task::JoinHandle; const LOG_TARGET: &str = "tari::universe::minotari_node_adapter"; @@ -32,13 +29,12 @@ impl MinotariNodeAdapter { } impl ProcessAdapter for MinotariNodeAdapter { - type Instance = MinotariNodeInstance; type StatusMonitor = MinotariNodeStatusMonitor; fn spawn_inner( &self, data_dir: PathBuf, - ) -> Result<(Self::Instance, Self::StatusMonitor), Error> { + ) -> Result<(ProcessInstance, Self::StatusMonitor), Error> { let inner_shutdown = Shutdown::new(); let shutdown_signal = inner_shutdown.to_signal(); @@ -76,7 +72,7 @@ impl ProcessAdapter for MinotariNodeAdapter { ); } Ok(( - MinotariNodeInstance { + ProcessInstance { shutdown: inner_shutdown, handle: Some(tokio::spawn(async move { let file_path = BinaryResolver::current() @@ -138,42 +134,6 @@ impl ProcessAdapter for MinotariNodeAdapter { } } -pub struct MinotariNodeInstance { - pub shutdown: Shutdown, - handle: Option>>, -} - -#[async_trait] -impl ProcessInstance for MinotariNodeInstance { - fn ping(&self) -> bool { - self.handle - .as_ref() - .map(|m| !m.is_finished()) - .unwrap_or_else(|| false) - } - - async fn stop(&mut self) -> Result { - self.shutdown.trigger(); - let handle = self.handle.take(); - let res = handle.unwrap().await??; - Ok(res) - } -} - -impl Drop for MinotariNodeInstance { - fn drop(&mut self) { - println!("Drop being called"); - self.shutdown.trigger(); - if let Some(handle) = self.handle.take() { - Handle::current().block_on(async move { - let _ = handle.await.unwrap().map_err(|e| { - warn!(target: LOG_TARGET, "Error stopping minotari node: {:?}", e); - e - }); - }); - } - } -} pub struct MinotariNodeStatusMonitor {} impl StatusMonitor for MinotariNodeStatusMonitor {} diff --git a/src-tauri/src/process_adapter.rs b/src-tauri/src/process_adapter.rs index e460d052a..faebbf33c 100644 --- a/src-tauri/src/process_adapter.rs +++ b/src-tauri/src/process_adapter.rs @@ -1,26 +1,27 @@ use crate::process_killer::kill_process; use anyhow::Error; -use async_trait::async_trait; use log::{info, warn}; use std::fs; use std::path::PathBuf; +use tari_shutdown::Shutdown; +use tokio::runtime::Handle; +use tokio::task::JoinHandle; const LOG_TARGET: &str = "tari::universe::process_adapter"; pub trait ProcessAdapter { - type Instance: ProcessInstance; type StatusMonitor: StatusMonitor; // fn spawn(&self) -> Result<(Receiver<()>, TInstance), anyhow::Error>; fn spawn_inner( &self, base_folder: PathBuf, - ) -> Result<(Self::Instance, Self::StatusMonitor), anyhow::Error>; + ) -> Result<(ProcessInstance, Self::StatusMonitor), anyhow::Error>; fn name(&self) -> &str; fn spawn( &self, base_folder: PathBuf, - ) -> Result<(Self::Instance, Self::StatusMonitor), anyhow::Error> { + ) -> Result<(ProcessInstance, Self::StatusMonitor), anyhow::Error> { self.spawn_inner(base_folder) } @@ -34,7 +35,7 @@ pub trait ProcessAdapter { kill_process(pid)?; } Err(e) => { - warn!(target: LOG_TARGET, "Could not read node's pid file: {}", e); + warn!(target: LOG_TARGET, "Could not read {} pid file: {}", self.pid_file_name(), e); } } Ok(()) @@ -43,8 +44,36 @@ pub trait ProcessAdapter { pub trait StatusMonitor {} -#[async_trait] -pub trait ProcessInstance: Send + Sync + 'static { - fn ping(&self) -> bool; - async fn stop(&mut self) -> Result; +pub struct ProcessInstance { + pub shutdown: Shutdown, + pub handle: Option>>, +} + +impl ProcessInstance { + pub fn ping(&self) -> bool { + self.handle + .as_ref() + .map(|m| !m.is_finished()) + .unwrap_or_else(|| false) + } + + pub async fn stop(&mut self) -> Result { + self.shutdown.trigger(); + let handle = self.handle.take(); + handle.unwrap().await? + } +} + +impl Drop for ProcessInstance { + fn drop(&mut self) { + println!("Drop being called"); + self.shutdown.trigger(); + if let Some(handle) = self.handle.take() { + Handle::current().block_on(async move { + let _ = handle.await.unwrap().map_err(|e| { + warn!(target: LOG_TARGET, "Error in Process Adapter: {}", e); + }); + }); + } + } } diff --git a/src-tauri/src/process_killer.rs b/src-tauri/src/process_killer.rs index d21b9a136..83015c0f8 100644 --- a/src-tauri/src/process_killer.rs +++ b/src-tauri/src/process_killer.rs @@ -20,7 +20,7 @@ pub fn kill_process(pid: u32) -> Result<(), anyhow::Error> { use nix::unistd::Pid; let pid = Pid::from_raw(pid as i32); - signal::kill(pid, Signal::SIGTERM); + let _ = signal::kill(pid, Signal::SIGTERM); } Ok(()) } diff --git a/src-tauri/src/wallet_adapter.rs b/src-tauri/src/wallet_adapter.rs index bdf03f7c5..b878fa7cc 100644 --- a/src-tauri/src/wallet_adapter.rs +++ b/src-tauri/src/wallet_adapter.rs @@ -1,7 +1,6 @@ use crate::binary_resolver::{Binaries, BinaryResolver}; use crate::process_adapter::{ProcessAdapter, ProcessInstance, StatusMonitor}; use anyhow::Error; -use async_trait::async_trait; use log::{debug, info, warn}; use minotari_node_grpc_client::grpc::wallet_client::WalletClient; use minotari_node_grpc_client::grpc::GetBalanceRequest; @@ -12,9 +11,7 @@ use tari_core::transactions::tari_amount::MicroMinotari; use tari_crypto::ristretto::RistrettoPublicKey; use tari_shutdown::Shutdown; use tari_utilities::hex::Hex; -use tokio::runtime::Handle; use tokio::select; -use tokio::task::JoinHandle; const LOG_TARGET: &str = "tari::universe::wallet_adapter"; @@ -39,12 +36,12 @@ impl WalletAdapter { } impl ProcessAdapter for WalletAdapter { - type Instance = WalletInstance; type StatusMonitor = WalletStatusMonitor; + fn spawn_inner( &self, data_dir: PathBuf, - ) -> Result<(Self::Instance, Self::StatusMonitor), Error> { + ) -> Result<(ProcessInstance, Self::StatusMonitor), Error> { // TODO: This was copied from node_adapter. This should be DRY'ed up let inner_shutdown = Shutdown::new(); let shutdown_signal = inner_shutdown.to_signal(); @@ -97,7 +94,7 @@ impl ProcessAdapter for WalletAdapter { args.push("wallet.p2p.transport.tor.proxy_bypass_for_outbound_tcp=false".to_string()) } Ok(( - WalletInstance { + ProcessInstance { shutdown: inner_shutdown, handle: Some(tokio::spawn(async move { let file_path = BinaryResolver::current() @@ -158,43 +155,6 @@ impl ProcessAdapter for WalletAdapter { } } -pub struct WalletInstance { - pub shutdown: Shutdown, - handle: Option>>, -} - -#[async_trait] -impl ProcessInstance for WalletInstance { - fn ping(&self) -> bool { - self.handle - .as_ref() - .map(|m| !m.is_finished()) - .unwrap_or_else(|| false) - } - - async fn stop(&mut self) -> Result { - self.shutdown.trigger(); - let handle = self.handle.take(); - let res = handle.unwrap().await??; - Ok(res) - } -} - -impl Drop for WalletInstance { - fn drop(&mut self) { - println!("Drop being called"); - self.shutdown.trigger(); - if let Some(handle) = self.handle.take() { - Handle::current().block_on(async move { - let _ = handle.await.unwrap().map_err(|e| { - warn!(target: LOG_TARGET, "Error stopping wallet: {:?}", e); - e - }); - }); - } - } -} - pub struct WalletStatusMonitor {} impl StatusMonitor for WalletStatusMonitor {} diff --git a/src-tauri/src/xmrig_adapter.rs b/src-tauri/src/xmrig_adapter.rs index 4fc8b5911..9b14d12b8 100644 --- a/src-tauri/src/xmrig_adapter.rs +++ b/src-tauri/src/xmrig_adapter.rs @@ -1,17 +1,15 @@ use crate::cpu_miner::CpuMinerEvent; use crate::download_utils::{download_file, extract}; -use crate::process_killer::kill_process; +use crate::process_adapter::{ProcessAdapter, ProcessInstance, StatusMonitor}; use crate::xmrig::http_api::XmrigHttpApiClient; use crate::xmrig::latest_release::fetch_latest_release; use crate::ProgressTracker; use anyhow::Error; -use log::{debug, info, warn}; +use log::{info, warn}; use std::path::PathBuf; use tari_shutdown::Shutdown; use tokio::fs; -use tokio::runtime::Handle; use tokio::sync::mpsc::Receiver; -use tokio::task::JoinHandle; const LOG_TARGET: &str = "tari::universe::xmrig_adapter"; @@ -41,94 +39,45 @@ pub struct XmrigAdapter { force_download: bool, node_connection: XmrigNodeConnection, monero_address: String, - // TODO: secure http_api_token: String, http_api_port: u16, -} - -pub struct XmrigInstance { - shutdown: Shutdown, - handle: Option>>, + cache_dir: PathBuf, + logs_dir: PathBuf, + cpu_max_percentage: usize, + progress_tracker: ProgressTracker, + rx: Receiver, + pub client: XmrigHttpApiClient, + // TODO: secure } impl XmrigAdapter { - pub fn new(xmrig_node_connection: XmrigNodeConnection, monero_address: String) -> Self { - Self { - force_download: false, - node_connection: xmrig_node_connection, - monero_address, - http_api_token: "pass".to_string(), - http_api_port: 9090, - } - } - pub fn spawn( - &self, + pub fn new( + xmrig_node_connection: XmrigNodeConnection, + monero_address: String, cache_dir: PathBuf, logs_dir: PathBuf, - data_dir: PathBuf, progress_tracker: ProgressTracker, cpu_max_percentage: usize, - ) -> Result<(Receiver, XmrigInstance, XmrigHttpApiClient), anyhow::Error> { - self.kill_previous_instances(data_dir.clone())?; - + ) -> Self { let (_tx, rx) = tokio::sync::mpsc::channel(100); - let force_download = self.force_download; - let xmrig_shutdown = Shutdown::new(); - let mut shutdown_signal = xmrig_shutdown.to_signal(); - let mut args = self.node_connection.generate_args(); - let xmrig_log_file = logs_dir.join("xmrig.log"); - std::fs::create_dir_all(xmrig_log_file.parent().unwrap())?; - args.push(format!("--log-file={}", &xmrig_log_file.to_str().unwrap())); - args.push(format!("--http-port={}", self.http_api_port)); - args.push(format!("--http-access-token={}", self.http_api_token)); - args.push("--donate-level=1".to_string()); - args.push(format!("--user={}", self.monero_address)); - args.push(format!("--threads={}", cpu_max_percentage)); - - let client = XmrigHttpApiClient::new( - format!("http://127.0.0.1:{}", self.http_api_port), - self.http_api_token.clone(), - ); - - Ok(( + let http_api_port = 9090; + let http_api_token = "pass".to_string(); + Self { + force_download: false, + node_connection: xmrig_node_connection, + monero_address, + http_api_token: http_api_token.clone(), + http_api_port: http_api_port.clone(), + cache_dir, + logs_dir, + cpu_max_percentage, + progress_tracker, rx, - XmrigInstance { - shutdown: xmrig_shutdown, - handle: Some(tokio::spawn(async move { - // TODO: Ensure version string is not malicious - let version = - Self::ensure_latest(cache_dir.clone(), force_download, progress_tracker) - .await?; - let xmrig_dir = cache_dir - .join("xmrig") - .join(&version) - .join(format!("xmrig-{}", version)); - let xmrig_bin = xmrig_dir.join("xmrig"); - let mut xmrig = tokio::process::Command::new(xmrig_bin) - .args(args) - .kill_on_drop(true) - .spawn()?; - - if let Some(id) = xmrig.id() { - std::fs::write(data_dir.join("xmrig_pid"), id.to_string())?; - } - shutdown_signal.wait().await; - println!("Stopping xmrig"); - - xmrig.kill().await?; - - match std::fs::remove_file(data_dir.join("xmrig_pid")) { - Ok(_) => {} - Err(_e) => { - debug!(target: LOG_TARGET, "Could not clear xmrig's pid file"); - } - } - - Ok(()) - })), - }, - client, - )) + client: XmrigHttpApiClient::new( + format!("http://127.0.0.1:{}", http_api_port).clone(), + http_api_token.clone(), + ), + } } pub async fn ensure_latest( @@ -177,55 +126,92 @@ impl XmrigAdapter { Ok(latest_release.version) } - - fn kill_previous_instances(&self, base_folder: PathBuf) -> Result<(), Error> { - match std::fs::read_to_string(base_folder.join("xmrig_pid")) { - Ok(pid) => { - let pid = pid.trim().parse::()?; - kill_process(pid)?; - } - Err(e) => { - warn!(target: LOG_TARGET, "Could not read xmrigs pid file: {}", e); - } - } - Ok(()) - } } -impl XmrigInstance { - pub fn ping(&self) -> Result { - Ok(self - .handle - .as_ref() - .map(|m| !m.is_finished()) - .unwrap_or_else(|| false)) - } +impl ProcessAdapter for XmrigAdapter { + type StatusMonitor = XmrigStatusMonitor; - pub async fn stop(&mut self) -> Result<(), anyhow::Error> { - self.shutdown.trigger(); - let handle = self.handle.take(); - handle.unwrap().await? + fn spawn_inner( + &self, + data_dir: PathBuf, + ) -> Result<(ProcessInstance, Self::StatusMonitor), anyhow::Error> { + self.kill_previous_instances(data_dir.clone())?; + + let cache_dir = self.cache_dir.clone(); + let progress_tracker = self.progress_tracker.clone(); + let force_download = self.force_download; + let xmrig_shutdown = Shutdown::new(); + let mut shutdown_signal = xmrig_shutdown.to_signal(); + let mut args = self.node_connection.generate_args(); + let xmrig_log_file = self.logs_dir.join("xmrig.log"); + std::fs::create_dir_all(xmrig_log_file.parent().unwrap())?; + args.push(format!("--log-file={}", &xmrig_log_file.to_str().unwrap())); + args.push(format!("--http-port={}", self.http_api_port)); + args.push(format!("--http-access-token={}", self.http_api_token)); + args.push(format!("--donate-level=1")); + args.push(format!("--user={}", self.monero_address)); + args.push(format!( + "--cpu-max-threads-hint={}", + self.cpu_max_percentage + )); + + Ok(( + ProcessInstance { + shutdown: xmrig_shutdown, + handle: Some(tokio::spawn(async move { + // TODO: Ensure version string is not malicious + let version = Self::ensure_latest( + cache_dir.clone(), + force_download, + progress_tracker.clone(), + ) + .await?; + let xmrig_dir = cache_dir + .clone() + .join("xmrig") + .join(&version) + .join(format!("xmrig-{}", version)); + let xmrig_bin = xmrig_dir.join("xmrig"); + let mut xmrig = tokio::process::Command::new(xmrig_bin) + .args(args) + .kill_on_drop(true) + .spawn()?; + + if let Some(id) = xmrig.id() { + std::fs::write(data_dir.join("xmrig_pid"), id.to_string())?; + } + shutdown_signal.wait().await; + println!("Stopping xmrig"); + + xmrig.kill().await?; + + match std::fs::remove_file(data_dir.join("xmrig_pid")) { + Ok(_) => {} + Err(e) => { + warn!(target: LOG_TARGET, "Could not clear xmrig's pid file - {e}"); + } + } + + Ok(0) + })), + }, + XmrigStatusMonitor {}, + )) } - pub fn kill(&self) -> Result<(), anyhow::Error> { - todo!() - // Ok(()) + + fn name(&self) -> &str { + "xmrig" } -} -impl Drop for XmrigInstance { - fn drop(&mut self) { - println!("Drop being called"); - self.shutdown.trigger(); - if let Some(handle) = self.handle.take() { - Handle::current().block_on(async move { - let _ = handle.await.unwrap().map_err(|e| { - warn!(target: LOG_TARGET, "Error in XmrigInstance: {}", e); - }); - }); - } + fn pid_file_name(&self) -> &str { + "xmrig_pid" } } +pub struct XmrigStatusMonitor {} + +impl StatusMonitor for XmrigStatusMonitor {} + #[allow(unreachable_code)] fn get_os_string_id() -> String { #[cfg(target_os = "windows")]