From e07fb6cd66f055381d06489066a9be31db78d431 Mon Sep 17 00:00:00 2001 From: Hanif Ariffin Date: Thu, 19 Nov 2020 21:23:35 +0800 Subject: [PATCH 1/4] Cleaned up the code in prepation for outfile feature. Now logs the pongs in a buffer. Next is to use some kinda resize-able ring buffer of sort to put the pongs in and have a consumer thread. Signed-off-by: Hanif Ariffin --- src/main.rs | 73 +++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 57 insertions(+), 16 deletions(-) diff --git a/src/main.rs b/src/main.rs index c0519b678..75944ee6f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -130,6 +130,13 @@ enum Event { Input(KeyEvent), } +pub fn clone_ping(ping: &PingResult) -> PingResult { + match ping { + PingResult::Pong(r) => PingResult::Pong(*r), + PingResult::Timeout => PingResult::Timeout, + } +} + fn main() -> Result<()> { let args = Args::from_args(); let mut app = App::new(args.hosts.len(), args.buffer); @@ -142,21 +149,43 @@ fn main() -> Result<()> { terminal.clear()?; - let (key_tx, rx) = mpsc::channel(); + let (tx, rx) = mpsc::channel(); let mut threads = vec![]; - - let killed = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + let host_pongs = std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashMap::< + usize, + Vec, + >::new())); + let host_iterations = std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashMap::< + usize, + usize, + >::new())); + let killed_signal = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + let hosts_len = args.hosts.len(); for (host_id, host) in args.hosts.iter().cloned().enumerate() { - let ping_tx = key_tx.clone(); - - let killed_ping = std::sync::Arc::clone(&killed); + let ping_tx = tx.clone(); + let host_pongs = std::sync::Arc::clone(&host_pongs); + let host_iterations = std::sync::Arc::clone(&host_iterations); + let killed_signal = std::sync::Arc::clone(&killed_signal); // Pump ping messages into the queue let ping_thread = thread::spawn(move || -> Result<()> { let stream = ping(host)?; - while !killed_ping.load(Ordering::Acquire) { - ping_tx.send(Event::Update(host_id, stream.recv()?))?; + let mut iteration = 0; + while !killed_signal.load(Ordering::Acquire) { + let pong = stream.recv()?; + ping_tx.send(Event::Update(host_id, clone_ping(&pong)))?; + let mut host_pongs = host_pongs.lock().unwrap(); + let mut host_iterations = host_iterations.lock().unwrap(); + host_pongs + .entry(iteration) + .and_modify(|vec| vec.push(pong)) + .or_insert(Vec::with_capacity(hosts_len)); + host_iterations + .entry(host_id) + .and_modify(|e| *e = iteration) + .or_insert(iteration); + iteration += 1; } Ok(()) }); @@ -164,12 +193,12 @@ fn main() -> Result<()> { } // Pump keyboard messages into the queue - let killed_thread = std::sync::Arc::clone(&killed); + let killed_thread = std::sync::Arc::clone(&killed_signal); let key_thread = thread::spawn(move || -> Result<()> { while !killed_thread.load(Ordering::Acquire) { if event::poll(Duration::from_secs(1))? { if let CEvent::Key(key) = event::read()? { - key_tx.send(Event::Input(key))?; + tx.send(Event::Input(key))?; } } } @@ -177,6 +206,7 @@ fn main() -> Result<()> { }); threads.push(key_thread); + let host_iterations = std::sync::Arc::clone(&host_iterations); loop { match rx.recv()? { Event::Update(host_id, ping_result) => { @@ -196,6 +226,7 @@ fn main() -> Result<()> { .as_ref(), ) .split(f.size()); + let host_iterations = host_iterations.lock().unwrap(); for (((host_id, host), stats), &style) in args .hosts .iter() @@ -207,10 +238,11 @@ fn main() -> Result<()> { .direction(Direction::Horizontal) .constraints( [ - Constraint::Percentage(25), - Constraint::Percentage(25), - Constraint::Percentage(25), - Constraint::Percentage(25), + Constraint::Percentage(20), + Constraint::Percentage(20), + Constraint::Percentage(20), + Constraint::Percentage(20), + Constraint::Percentage(20), ] .as_ref(), ) @@ -245,6 +277,15 @@ fn main() -> Result<()> { .style(style), header_layout[3], ); + + f.render_widget( + Paragraph::new(format!( + "iteration {:?}", + host_iterations.get(&host_id).unwrap_or(&0) + )) + .style(style), + header_layout[4], + ); } let datasets: Vec<_> = app @@ -280,11 +321,11 @@ fn main() -> Result<()> { } Event::Input(input) => match input.code { KeyCode::Char('q') | KeyCode::Esc => { - killed.store(true, Ordering::Release); + killed_signal.store(true, Ordering::Release); break; } KeyCode::Char('c') if input.modifiers == KeyModifiers::CONTROL => { - killed.store(true, Ordering::Release); + killed_signal.store(true, Ordering::Release); break; } _ => {} From 42a8233030822e528f4686751f53aee8f97964cb Mon Sep 17 00:00:00 2001 From: Hanif Ariffin Date: Thu, 19 Nov 2020 22:22:16 +0800 Subject: [PATCH 2/4] Now writes to the specified file. For the moment it will not clear out its buffer when we exit the program. Will do that in the future. Signed-off-by: Hanif Ariffin --- src/main.rs | 91 +++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 71 insertions(+), 20 deletions(-) diff --git a/src/main.rs b/src/main.rs index 75944ee6f..b27707787 100644 --- a/src/main.rs +++ b/src/main.rs @@ -37,6 +37,8 @@ struct Args { help = "Determines the number pings to display." )] buffer: usize, + #[structopt(short, long, help = "Optionally the file to write to.")] + outfile: Option, } struct App { @@ -137,6 +139,13 @@ pub fn clone_ping(ping: &PingResult) -> PingResult { } } +pub fn get_value_pong(ping: &PingResult) -> u128 { + match ping { + PingResult::Pong(r) => r.as_micros(), + PingResult::Timeout => 0, + } +} + fn main() -> Result<()> { let args = Args::from_args(); let mut app = App::new(args.hosts.len(), args.buffer); @@ -151,20 +160,21 @@ fn main() -> Result<()> { let (tx, rx) = mpsc::channel(); + let outfile = args.outfile.clone(); let mut threads = vec![]; let host_pongs = std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashMap::< usize, - Vec, + Vec>, >::new())); let host_iterations = std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashMap::< usize, usize, >::new())); let killed_signal = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); - let hosts_len = args.hosts.len(); + let hosts_count = args.hosts.len(); for (host_id, host) in args.hosts.iter().cloned().enumerate() { - let ping_tx = tx.clone(); + let tx = tx.clone(); let host_pongs = std::sync::Arc::clone(&host_pongs); let host_iterations = std::sync::Arc::clone(&host_iterations); let killed_signal = std::sync::Arc::clone(&killed_signal); @@ -174,17 +184,18 @@ fn main() -> Result<()> { let mut iteration = 0; while !killed_signal.load(Ordering::Acquire) { let pong = stream.recv()?; - ping_tx.send(Event::Update(host_id, clone_ping(&pong)))?; + tx.send(Event::Update(host_id, clone_ping(&pong)))?; let mut host_pongs = host_pongs.lock().unwrap(); let mut host_iterations = host_iterations.lock().unwrap(); host_pongs .entry(iteration) - .and_modify(|vec| vec.push(pong)) - .or_insert(Vec::with_capacity(hosts_len)); - host_iterations - .entry(host_id) - .and_modify(|e| *e = iteration) - .or_insert(iteration); + .and_modify(|vec| vec[host_id] = Some(get_value_pong(&pong))) + .or_insert({ + let mut result = vec![None; hosts_count]; + result[host_id] = Some(get_value_pong(&pong)); + result + }); + host_iterations.insert(host_id, iteration); iteration += 1; } Ok(()) @@ -193,18 +204,58 @@ fn main() -> Result<()> { } // Pump keyboard messages into the queue - let killed_thread = std::sync::Arc::clone(&killed_signal); - let key_thread = thread::spawn(move || -> Result<()> { - while !killed_thread.load(Ordering::Acquire) { - if event::poll(Duration::from_secs(1))? { - if let CEvent::Key(key) = event::read()? { - tx.send(Event::Input(key))?; + { + let killed_signal = std::sync::Arc::clone(&killed_signal); + let key_tx = tx.clone(); + let key_thread = thread::spawn(move || -> Result<()> { + while !killed_signal.load(Ordering::Acquire) { + if event::poll(Duration::from_secs(1))? { + if let CEvent::Key(key) = event::read()? { + key_tx.send(Event::Input(key))?; + } } } - } - Ok(()) - }); - threads.push(key_thread); + Ok(()) + }); + threads.push(key_thread); + } + + if let Some(outfile) = outfile { + let killed_signal = std::sync::Arc::clone(&killed_signal); + let writer_thread = thread::spawn(move || -> Result<()> { + let mut iteration = 0; + let host_pongs = std::sync::Arc::clone(&host_pongs); + let file = std::fs::OpenOptions::new() + .append(true) + .create(true) + .open(&outfile)?; + let mut bufwriter = std::io::BufWriter::new(file); + while !killed_signal.load(Ordering::Acquire) { + let mut host_pongs = host_pongs.lock().unwrap(); + let should_write = host_pongs + .get(&iteration) + .map(|pongs| pongs.iter().fold(true, |acc, x| acc && x.is_some())) + .unwrap_or(false); + if should_write { + // SAFETY: We have checked above that this is true. + let result = host_pongs.remove(&iteration).unwrap(); + let csv_row = result + .iter() + // SAFETY: We have checked that they all contain something. + .map(|x| x.unwrap()) + .map(|x| format!("{},", x)) + .fold(format!("{},", iteration), |acc, x| acc + &x) + + "\n"; + bufwriter.write_all(csv_row.as_bytes())?; + iteration += 1; + bufwriter.flush()?; + } + } + // Go through the entire thing and write everything else. + Ok(()) + }); + threads.push(writer_thread); + } let host_iterations = std::sync::Arc::clone(&host_iterations); loop { From 754a35d85e1fdca5d2a09300124d9f36ce215cf7 Mon Sep 17 00:00:00 2001 From: Hanif Ariffin Date: Thu, 19 Nov 2020 22:40:33 +0800 Subject: [PATCH 3/4] Some minor performance(?) improvements 1. Only hold the lock as necessary. 2. Returns immediately from polling for input. Signed-off-by: Hanif Ariffin --- src/main.rs | 116 ++++++++++++++++++++++++++-------------------------- 1 file changed, 59 insertions(+), 57 deletions(-) diff --git a/src/main.rs b/src/main.rs index b27707787..bca2ab16f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -209,7 +209,7 @@ fn main() -> Result<()> { let key_tx = tx.clone(); let key_thread = thread::spawn(move || -> Result<()> { while !killed_signal.load(Ordering::Acquire) { - if event::poll(Duration::from_secs(1))? { + if event::poll(Duration::from_secs(0))? { if let CEvent::Key(key) = event::read()? { key_tx.send(Event::Input(key))?; } @@ -277,66 +277,68 @@ fn main() -> Result<()> { .as_ref(), ) .split(f.size()); - let host_iterations = host_iterations.lock().unwrap(); - for (((host_id, host), stats), &style) in args - .hosts - .iter() - .enumerate() - .zip(app.stats()) - .zip(&app.styles) { - let header_layout = Layout::default() - .direction(Direction::Horizontal) - .constraints( - [ - Constraint::Percentage(20), - Constraint::Percentage(20), - Constraint::Percentage(20), - Constraint::Percentage(20), - Constraint::Percentage(20), - ] - .as_ref(), - ) - .split(chunks[host_id]); + let host_iterations = host_iterations.lock().unwrap(); + for (((host_id, host), stats), &style) in args + .hosts + .iter() + .enumerate() + .zip(app.stats()) + .zip(&app.styles) + { + let header_layout = Layout::default() + .direction(Direction::Horizontal) + .constraints( + [ + Constraint::Percentage(20), + Constraint::Percentage(20), + Constraint::Percentage(20), + Constraint::Percentage(20), + Constraint::Percentage(20), + ] + .as_ref(), + ) + .split(chunks[host_id]); - f.render_widget( - Paragraph::new(format!("Pinging {}", host)).style(style), - header_layout[0], - ); + f.render_widget( + Paragraph::new(format!("Pinging {}", host)).style(style), + header_layout[0], + ); - f.render_widget( - Paragraph::new(format!( - "min {:?}", - Duration::from_micros(stats.minimum().unwrap_or(0)) - )) - .style(style), - header_layout[1], - ); - f.render_widget( - Paragraph::new(format!( - "max {:?}", - Duration::from_micros(stats.maximum().unwrap_or(0)) - )) - .style(style), - header_layout[2], - ); - f.render_widget( - Paragraph::new(format!( - "p95 {:?}", - Duration::from_micros(stats.percentile(95.0).unwrap_or(0)) - )) - .style(style), - header_layout[3], - ); + f.render_widget( + Paragraph::new(format!( + "min {:?}", + Duration::from_micros(stats.minimum().unwrap_or(0)) + )) + .style(style), + header_layout[1], + ); + f.render_widget( + Paragraph::new(format!( + "max {:?}", + Duration::from_micros(stats.maximum().unwrap_or(0)) + )) + .style(style), + header_layout[2], + ); + f.render_widget( + Paragraph::new(format!( + "p95 {:?}", + Duration::from_micros(stats.percentile(95.0).unwrap_or(0)) + )) + .style(style), + header_layout[3], + ); - f.render_widget( - Paragraph::new(format!( - "iteration {:?}", - host_iterations.get(&host_id).unwrap_or(&0) - )) - .style(style), - header_layout[4], - ); + f.render_widget( + Paragraph::new(format!( + "iteration {:?}", + host_iterations.get(&host_id).unwrap_or(&0) + )) + .style(style), + header_layout[4], + ); + } } let datasets: Vec<_> = app From 31c467f964fd56362c7748876669ab02a5b77a13 Mon Sep 17 00:00:00 2001 From: Hanif Ariffin Date: Fri, 20 Nov 2020 22:22:13 +0800 Subject: [PATCH 4/4] Removed unnecessary widget. Draw shouldn't lock now. Signed-off-by: Hanif Ariffin --- src/main.rs | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/src/main.rs b/src/main.rs index bca2ab16f..a998514d4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -257,7 +257,6 @@ fn main() -> Result<()> { threads.push(writer_thread); } - let host_iterations = std::sync::Arc::clone(&host_iterations); loop { match rx.recv()? { Event::Update(host_id, ping_result) => { @@ -278,7 +277,6 @@ fn main() -> Result<()> { ) .split(f.size()); { - let host_iterations = host_iterations.lock().unwrap(); for (((host_id, host), stats), &style) in args .hosts .iter() @@ -290,11 +288,10 @@ fn main() -> Result<()> { .direction(Direction::Horizontal) .constraints( [ - Constraint::Percentage(20), - Constraint::Percentage(20), - Constraint::Percentage(20), - Constraint::Percentage(20), - Constraint::Percentage(20), + Constraint::Percentage(25), + Constraint::Percentage(25), + Constraint::Percentage(25), + Constraint::Percentage(25), ] .as_ref(), ) @@ -329,15 +326,6 @@ fn main() -> Result<()> { .style(style), header_layout[3], ); - - f.render_widget( - Paragraph::new(format!( - "iteration {:?}", - host_iterations.get(&host_id).unwrap_or(&0) - )) - .style(style), - header_layout[4], - ); } }