Skip to content

Commit

Permalink
Update peer liveness
Browse files Browse the repository at this point in the history
- Updated peer liveness to print test results to the console, one item per row.
- Added a command-line arg option to disable the splash screen; this is useful
  for screen scraping in an automated test environment.
  • Loading branch information
hansieodendaal committed Dec 11, 2024
1 parent b127883 commit 99c8eae
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 46 deletions.
7 changes: 7 additions & 0 deletions applications/minotari_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,21 @@ pub struct Cli {
/// Watch a command in the non-interactive mode.
#[clap(long)]
pub watch: Option<String>,
/// Run in test profile mode
#[clap(long, alias = "profile")]
pub profile_with_tokio_console: bool,
/// Enable gRPC
#[clap(long, env = "MINOTARI_NODE_ENABLE_GRPC", alias = "enable-grpc")]
pub grpc_enabled: bool,
/// Enable mining
#[clap(long, env = "MINOTARI_NODE_ENABLE_MINING", alias = "enable-mining")]
pub mining_enabled: bool,
/// Enable the second layer gRPC server
#[clap(long, env = "MINOTARI_NODE_SECOND_LAYER_GRPC_ENABLED", alias = "enable-second-layer")]
pub second_layer_grpc_enabled: bool,
/// Disable the splash screen
#[clap(long)]
pub disable_splash_screen: bool,
}

impl ConfigOverrideProvider for Cli {
Expand Down
6 changes: 4 additions & 2 deletions applications/minotari_node/src/commands/cli_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@ impl CliLoop {
///
/// ## Returns
/// Doesn't return anything
pub async fn cli_loop(mut self) {
cli::print_banner(self.commands.clone(), 3);
pub async fn cli_loop(mut self, disable_splash_screen: bool) {
if !disable_splash_screen {
cli::print_banner(self.commands.clone(), 3);
}

if self.non_interactive {
self.watch_loop_non_interactive().await;
Expand Down
113 changes: 70 additions & 43 deletions applications/minotari_node/src/commands/command/test_peer_liveness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use tari_comms::{
net_address::{MultiaddressesWithStats, PeerAddressSource},
peer_manager::{NodeId, Peer, PeerFeatures, PeerFlags},
};
use tari_p2p::services::liveness::LivenessEvent;
use tari_p2p::services::liveness::{LivenessEvent, LivenessHandle};
use tokio::{sync::watch, task};

use super::{CommandContext, HandleCommand};
Expand Down Expand Up @@ -107,41 +107,14 @@ impl HandleCommand<ArgsTestPeerLiveness> for CommandContext {
for _ in 0..5 {
if self.dial_peer(node_id.clone()).await.is_ok() {
println!("🏓 Peer ({}, {}) dialed successfully", node_id, public_key);
let mut liveness_events = self.liveness.get_event_stream();
let mut liveness = self.liveness.clone();
let liveness = self.liveness.clone();
task::spawn(async move {
if let Ok(nonce) = liveness.send_ping(node_id.clone()).await {
println!("🏓 Pinging peer ({}, {}) with nonce {} ...", node_id, public_key, nonce);
for _ in 0..5 {
match liveness_events.recv().await {
Ok(event) => {
if let LivenessEvent::ReceivedPong(pong) = &*event {
if pong.node_id == node_id && pong.nonce == nonce {
println!(
"🏓️ Pong: peer ({}, {}) responded with nonce {}, round-trip-time is \
{:.2?}!",
pong.node_id,
public_key,
pong.nonce,
pong.latency.unwrap_or_default()
);
let _ = tx.send(PingResult::Success);
return;
}
}
},
Err(e) => {
println!("🏓 Ping peer ({}, {}) gave error: {}", node_id, public_key, e);
},
}
}
let _ = tx.send(PingResult::Fail);
}
ping_peer_liveness(liveness, node_id, public_key, tx).await;
});
// Break if the dial was successful
break;
} else {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
tokio::time::sleep(Duration::from_secs(5)).await;
}
}

Expand All @@ -151,15 +124,13 @@ impl HandleCommand<ArgsTestPeerLiveness> for CommandContext {
_ = rx.changed() => {
let test_duration = start.elapsed();
let responsive = *rx.borrow();
println!("\nWhen rx.changed(): {:?}\n", responsive);
if responsive == PingResult::Success {
println!("✅ Peer ({}, {}) is responsive", node_id_clone, public_key_clone);
} else {
println!("❌ Peer ({}, {}) is unresponsive", node_id_clone, public_key_clone);
}
let date_time = Local::now().format("%Y-%m-%d %H:%M:%S").to_string();

print_results_to_console(&date_time, responsive, &public_key_clone, &node_id_clone, &address_clone, test_duration);

if let Some(true) = args.output_to_file {
print_to_file(
&date_time,
responsive,
args.output_directory,
args.refresh_file,
Expand All @@ -168,12 +139,11 @@ impl HandleCommand<ArgsTestPeerLiveness> for CommandContext {
test_duration
).await;
}
println!();

if let Some(true) = args.exit {
println!("The liveness test is complete and base node will now exit\n");
self.shutdown.trigger();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
match responsive {
PingResult::Success => process::exit(0),
_ => process::exit(1),
Expand All @@ -183,15 +153,74 @@ impl HandleCommand<ArgsTestPeerLiveness> for CommandContext {
break;
},

_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {},
_ = tokio::time::sleep(Duration::from_secs(1)) => {},
}
}

Ok(())
}
}

fn print_results_to_console(
date_time: &str,
responsive: PingResult,
public_key: &PublicKey,
node_id: &NodeId,
address: &Multiaddr,
test_duration: Duration,
) {
println!();
if responsive == PingResult::Success {
println!("✅ Peer is responsive");
} else {
println!("❌ Peer is unresponsive");
}
println!(" Date Time: {}", date_time);
println!(" Public Key: {}", public_key);
println!(" Node ID: {}", node_id);
println!(" Address: {}", address);
println!(" Result: {:?}", responsive);
println!(" Test Duration: {:.2?}", test_duration);
println!();
}

async fn ping_peer_liveness(
mut liveness: LivenessHandle,
node_id: NodeId,
public_key: PublicKey,
tx: watch::Sender<PingResult>,
) {
let mut liveness_events = liveness.get_event_stream();
if let Ok(nonce) = liveness.send_ping(node_id.clone()).await {
println!("🏓 Pinging peer ({}, {}) with nonce {} ...", node_id, public_key, nonce);
for _ in 0..5 {
match liveness_events.recv().await {
Ok(event) => {
if let LivenessEvent::ReceivedPong(pong) = &*event {
if pong.node_id == node_id && pong.nonce == nonce {
println!(
"🏓️ Pong: peer ({}, {}) responded with nonce {}, round-trip-time is {:.2?}!",
pong.node_id,
public_key,
pong.nonce,
pong.latency.unwrap_or_default()
);
let _ = tx.send(PingResult::Success);
return;
}
}
},
Err(e) => {
println!("🏓 Ping peer ({}, {}) gave error: {}", node_id, public_key, e);
},
}
}
let _ = tx.send(PingResult::Fail);
}
}

async fn print_to_file(
date_time: &str,
responsive: PingResult,
output_directory: Option<PathBuf>,
refresh_file: Option<bool>,
Expand All @@ -204,8 +233,6 @@ async fn print_to_file(
} else {
"FAIL"
};
let now = Local::now();
let date_time = now.format("%Y-%m-%d %H:%M:%S").to_string();

let file_name = "peer_liveness_test.csv";
let file_path = if let Some(path) = output_directory.clone() {
Expand All @@ -222,7 +249,7 @@ async fn print_to_file(

if let Some(true) = refresh_file {
let _unused = fs::remove_file(&file_path);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
}
let write_header = !file_path.exists();
if let Ok(mut file) = OpenOptions::new().append(true).create(true).open(file_path.clone()) {
Expand Down
3 changes: 2 additions & 1 deletion applications/minotari_node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub async fn run_base_node(
grpc_enabled: false,
mining_enabled: false,
second_layer_grpc_enabled: false,
disable_splash_screen: false,
};

run_base_node_with_cli(node_identity, config, cli, shutdown).await
Expand Down Expand Up @@ -170,7 +171,7 @@ pub async fn run_base_node_with_cli(
}

info!(target: LOG_TARGET, "Minotari base node has STARTED");
main_loop.cli_loop().await;
main_loop.cli_loop(cli.disable_splash_screen).await;

ctx.wait_for_shutdown().await;

Expand Down

0 comments on commit 99c8eae

Please sign in to comment.