Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(refactor): reuse more code in adapters #41

Merged
merged 8 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions src-tauri/src/cpu_miner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::app_config::MiningMode;
use crate::mm_proxy_manager::MmProxyManager;
use crate::process_adapter::ProcessAdapter;
use crate::xmrig::http_api::XmrigHttpApiClient;
use crate::xmrig_adapter::{XmrigAdapter, XmrigNodeConnection};
use crate::{
Expand Down Expand Up @@ -76,17 +78,14 @@ impl CpuMiner {
};
let xmrig = XmrigAdapter::new(
xmrig_node_connection,
"44AFFq5kSiGBoZ4NMDwYtN18obc8AemS33DBLWs3H7otXft3XjrpDtQGv7SqSsaBYBb98uNbr2VBBEt7f2wfn3RVGQBEP3A".to_string()
);
let (mut _rx, mut xmrig_child, client) = xmrig.spawn(
"44AFFq5kSiGBoZ4NMDwYtN18obc8AemS33DBLWs3H7otXft3XjrpDtQGv7SqSsaBYBb98uNbr2VBBEt7f2wfn3RVGQBEP3A".to_string(),
cache_dir,
log_dir,
base_path,
progress_tracker,
cpu_max_percentage,
)?;

self.api_client = Some(client);
);
let (mut xmrig_child, _xmrig_status_monitor) = xmrig.spawn_inner(base_path.clone())?;
self.api_client = Some(xmrig.client);

self.watcher_task = Some(tauri::async_runtime::spawn(async move {
println!("Starting process");
Expand All @@ -97,12 +96,12 @@ impl CpuMiner {
select! {
_ = watch_timer.tick() => {
println!("watching");
if xmrig_child.ping().expect("idk") {
if xmrig_child.ping() {
println!("xmrig is running");
} else {
println!("xmrig is not running");
match xmrig_child.stop().await {
Ok(()) => {
Ok(_) => {
println!("xmrig exited successfully");
}
Err(e) => {
Expand Down
43 changes: 2 additions & 41 deletions src-tauri/src/merge_mining_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use crate::binary_resolver::{Binaries, BinaryResolver};
use crate::process_adapter::{ProcessAdapter, ProcessInstance, StatusMonitor};
use anyhow::Error;
use async_trait::async_trait;
use log::{debug, warn};
use std::fs;
use std::path::PathBuf;
use tari_common_types::tari_address::TariAddress;
use tari_shutdown::Shutdown;
use tokio::runtime::Handle;
use tokio::select;
use tokio::task::JoinHandle;

const LOG_TARGET: &str = "tari::universe::merge_mining_proxy_adapter";

Expand All @@ -26,13 +23,12 @@ impl MergeMiningProxyAdapter {
}

impl ProcessAdapter for MergeMiningProxyAdapter {
type Instance = MergeMiningProxyInstance;
type StatusMonitor = MergeMiningProxyStatusMonitor;

fn spawn_inner(
&self,
data_dir: PathBuf,
) -> Result<(Self::Instance, Self::StatusMonitor), Error> {
) -> Result<(ProcessInstance, Self::StatusMonitor), Error> {
let inner_shutdown = Shutdown::new();
let shutdown_signal = inner_shutdown.to_signal();

Expand All @@ -55,7 +51,7 @@ impl ProcessAdapter for MergeMiningProxyAdapter {
"merge_mining_proxy.wait_for_initial_sync_at_startup=false".to_string(),
];
Ok((
MergeMiningProxyInstance {
ProcessInstance {
shutdown: inner_shutdown,
handle: Some(tokio::spawn(async move {
let file_path = BinaryResolver::current()
Expand Down Expand Up @@ -117,41 +113,6 @@ impl ProcessAdapter for MergeMiningProxyAdapter {
}
}

pub struct MergeMiningProxyInstance {
pub shutdown: Shutdown,
handle: Option<JoinHandle<Result<i32, anyhow::Error>>>,
}

pub struct MergeMiningProxyStatusMonitor {}

#[async_trait]
impl ProcessInstance for MergeMiningProxyInstance {
fn ping(&self) -> bool {
self.handle
.as_ref()
.map(|m| !m.is_finished())
.unwrap_or_else(|| false)
}

async fn stop(&mut self) -> Result<i32, Error> {
self.shutdown.trigger();
let handle = self.handle.take();
let res = handle.unwrap().await??;
Ok(res)
}
}
impl Drop for MergeMiningProxyInstance {
fn drop(&mut self) {
println!("Drop being called");
self.shutdown.trigger();
if let Some(handle) = self.handle.take() {
Handle::current().block_on(async move {
let _ = handle.await.unwrap().map_err(|e| {
warn!(target: LOG_TARGET, "Error in MergeMiningProxyInstance: {}", e);
});
});
}
}
}

impl StatusMonitor for MergeMiningProxyStatusMonitor {}
44 changes: 2 additions & 42 deletions src-tauri/src/node_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::node_manager::NodeIdentity;
use crate::process_adapter::{ProcessAdapter, ProcessInstance, StatusMonitor};
use crate::ProgressTracker;
use anyhow::{anyhow, Error};
use async_trait::async_trait;
use humantime::format_duration;
use log::{debug, info, warn};
use minotari_node_grpc_client::grpc::{Empty, HeightRequest, NewBlockTemplateRequest, PowAlgo};
Expand All @@ -15,9 +14,7 @@ use tari_core::transactions::tari_amount::MicroMinotari;
use tari_crypto::ristretto::RistrettoPublicKey;
use tari_shutdown::Shutdown;
use tari_utilities::ByteArray;
use tokio::runtime::Handle;
use tokio::select;
use tokio::task::JoinHandle;

const LOG_TARGET: &str = "tari::universe::minotari_node_adapter";

Expand All @@ -32,13 +29,12 @@ impl MinotariNodeAdapter {
}

impl ProcessAdapter for MinotariNodeAdapter {
type Instance = MinotariNodeInstance;
type StatusMonitor = MinotariNodeStatusMonitor;

fn spawn_inner(
&self,
data_dir: PathBuf,
) -> Result<(Self::Instance, Self::StatusMonitor), Error> {
) -> Result<(ProcessInstance, Self::StatusMonitor), Error> {
let inner_shutdown = Shutdown::new();
let shutdown_signal = inner_shutdown.to_signal();

Expand Down Expand Up @@ -76,7 +72,7 @@ impl ProcessAdapter for MinotariNodeAdapter {
);
}
Ok((
MinotariNodeInstance {
ProcessInstance {
shutdown: inner_shutdown,
handle: Some(tokio::spawn(async move {
let file_path = BinaryResolver::current()
Expand Down Expand Up @@ -138,42 +134,6 @@ impl ProcessAdapter for MinotariNodeAdapter {
}
}

pub struct MinotariNodeInstance {
pub shutdown: Shutdown,
handle: Option<JoinHandle<Result<i32, anyhow::Error>>>,
}

#[async_trait]
impl ProcessInstance for MinotariNodeInstance {
fn ping(&self) -> bool {
self.handle
.as_ref()
.map(|m| !m.is_finished())
.unwrap_or_else(|| false)
}

async fn stop(&mut self) -> Result<i32, Error> {
self.shutdown.trigger();
let handle = self.handle.take();
let res = handle.unwrap().await??;
Ok(res)
}
}

impl Drop for MinotariNodeInstance {
fn drop(&mut self) {
println!("Drop being called");
self.shutdown.trigger();
if let Some(handle) = self.handle.take() {
Handle::current().block_on(async move {
let _ = handle.await.unwrap().map_err(|e| {
warn!(target: LOG_TARGET, "Error stopping minotari node: {:?}", e);
e
});
});
}
}
}
pub struct MinotariNodeStatusMonitor {}

impl StatusMonitor for MinotariNodeStatusMonitor {}
Expand Down
47 changes: 38 additions & 9 deletions src-tauri/src/process_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
use crate::process_killer::kill_process;
use anyhow::Error;
use async_trait::async_trait;
use log::{info, warn};
use std::fs;
use std::path::PathBuf;
use tari_shutdown::Shutdown;
use tokio::runtime::Handle;
use tokio::task::JoinHandle;

const LOG_TARGET: &str = "tari::universe::process_adapter";

pub trait ProcessAdapter {
type Instance: ProcessInstance;
type StatusMonitor: StatusMonitor;
// fn spawn(&self) -> Result<(Receiver<()>, TInstance), anyhow::Error>;
fn spawn_inner(
&self,
base_folder: PathBuf,
) -> Result<(Self::Instance, Self::StatusMonitor), anyhow::Error>;
) -> Result<(ProcessInstance, Self::StatusMonitor), anyhow::Error>;
fn name(&self) -> &str;

fn spawn(
&self,
base_folder: PathBuf,
) -> Result<(Self::Instance, Self::StatusMonitor), anyhow::Error> {
) -> Result<(ProcessInstance, Self::StatusMonitor), anyhow::Error> {
self.spawn_inner(base_folder)
}

Expand All @@ -34,7 +35,7 @@ pub trait ProcessAdapter {
kill_process(pid)?;
}
Err(e) => {
warn!(target: LOG_TARGET, "Could not read node's pid file: {}", e);
warn!(target: LOG_TARGET, "Could not read {} pid file: {}", self.pid_file_name(), e);
}
}
Ok(())
Expand All @@ -43,8 +44,36 @@ pub trait ProcessAdapter {

pub trait StatusMonitor {}

#[async_trait]
pub trait ProcessInstance: Send + Sync + 'static {
fn ping(&self) -> bool;
async fn stop(&mut self) -> Result<i32, anyhow::Error>;
pub struct ProcessInstance {
pub shutdown: Shutdown,
pub handle: Option<JoinHandle<Result<i32, anyhow::Error>>>,
}

impl ProcessInstance {
pub fn ping(&self) -> bool {
self.handle
.as_ref()
.map(|m| !m.is_finished())
.unwrap_or_else(|| false)
}

pub async fn stop(&mut self) -> Result<i32, anyhow::Error> {
self.shutdown.trigger();
let handle = self.handle.take();
handle.unwrap().await?
}
}

impl Drop for ProcessInstance {
fn drop(&mut self) {
println!("Drop being called");
self.shutdown.trigger();
if let Some(handle) = self.handle.take() {
Handle::current().block_on(async move {
let _ = handle.await.unwrap().map_err(|e| {
warn!(target: LOG_TARGET, "Error in Process Adapter: {}", e);
});
});
}
}
}
2 changes: 1 addition & 1 deletion src-tauri/src/process_killer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub fn kill_process(pid: u32) -> Result<(), anyhow::Error> {
use nix::unistd::Pid;

let pid = Pid::from_raw(pid as i32);
signal::kill(pid, Signal::SIGTERM);
let _ = signal::kill(pid, Signal::SIGTERM);
}
Ok(())
}
46 changes: 3 additions & 43 deletions src-tauri/src/wallet_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::binary_resolver::{Binaries, BinaryResolver};
use crate::process_adapter::{ProcessAdapter, ProcessInstance, StatusMonitor};
use anyhow::Error;
use async_trait::async_trait;
use log::{debug, info, warn};
use minotari_node_grpc_client::grpc::wallet_client::WalletClient;
use minotari_node_grpc_client::grpc::GetBalanceRequest;
Expand All @@ -12,9 +11,7 @@ use tari_core::transactions::tari_amount::MicroMinotari;
use tari_crypto::ristretto::RistrettoPublicKey;
use tari_shutdown::Shutdown;
use tari_utilities::hex::Hex;
use tokio::runtime::Handle;
use tokio::select;
use tokio::task::JoinHandle;

const LOG_TARGET: &str = "tari::universe::wallet_adapter";

Expand All @@ -39,12 +36,12 @@ impl WalletAdapter {
}

impl ProcessAdapter for WalletAdapter {
type Instance = WalletInstance;
type StatusMonitor = WalletStatusMonitor;

fn spawn_inner(
&self,
data_dir: PathBuf,
) -> Result<(Self::Instance, Self::StatusMonitor), Error> {
) -> Result<(ProcessInstance, Self::StatusMonitor), Error> {
// TODO: This was copied from node_adapter. This should be DRY'ed up
let inner_shutdown = Shutdown::new();
let shutdown_signal = inner_shutdown.to_signal();
Expand Down Expand Up @@ -97,7 +94,7 @@ impl ProcessAdapter for WalletAdapter {
args.push("wallet.p2p.transport.tor.proxy_bypass_for_outbound_tcp=false".to_string())
}
Ok((
WalletInstance {
ProcessInstance {
shutdown: inner_shutdown,
handle: Some(tokio::spawn(async move {
let file_path = BinaryResolver::current()
Expand Down Expand Up @@ -158,43 +155,6 @@ impl ProcessAdapter for WalletAdapter {
}
}

pub struct WalletInstance {
pub shutdown: Shutdown,
handle: Option<JoinHandle<Result<i32, anyhow::Error>>>,
}

#[async_trait]
impl ProcessInstance for WalletInstance {
fn ping(&self) -> bool {
self.handle
.as_ref()
.map(|m| !m.is_finished())
.unwrap_or_else(|| false)
}

async fn stop(&mut self) -> Result<i32, Error> {
self.shutdown.trigger();
let handle = self.handle.take();
let res = handle.unwrap().await??;
Ok(res)
}
}

impl Drop for WalletInstance {
fn drop(&mut self) {
println!("Drop being called");
self.shutdown.trigger();
if let Some(handle) = self.handle.take() {
Handle::current().block_on(async move {
let _ = handle.await.unwrap().map_err(|e| {
warn!(target: LOG_TARGET, "Error stopping wallet: {:?}", e);
e
});
});
}
}
}

pub struct WalletStatusMonitor {}

impl StatusMonitor for WalletStatusMonitor {}
Expand Down
Loading