diff --git a/src/control.rs b/src/control.rs index c76e927..48701b0 100644 --- a/src/control.rs +++ b/src/control.rs @@ -1,51 +1,23 @@ // #![allow(dead_code, unused_imports, unused_variables, unused_mut, unreachable_patterns)] // Please be quiet, I'm coding -use std::{ sync::{ RwLock, Arc }, sync::atomic::Ordering, mem::drop, collections::{ HashMap, VecDeque }, cmp }; -use regex::Regex; +use std::{ sync::{ RwLock, Arc }, sync::atomic::Ordering, mem::drop, collections::HashMap }; use tokio::{ fs, sync }; use petgraph::{ graph, graph::UnGraph, data::FromElements, algo }; -use lazy_static::lazy_static; use rand::seq::SliceRandom; -#[allow(unused_imports)] -use crate::{ Config, Node, Control, Protocol, LogLevel, unixtime, timestamp, timestamp_from, get_local_interfaces, duration_from }; +use crate::{ Config, Control, Data, IntfStats, LogLevel, Node, Path, PingResult, Protocol, get_local_interfaces, duration_from, unixtime, shorten_ipv6 }; #[cfg(feature = "web")] -use http_body_util::Full; +use crate::web::{ run_http, run_https }; #[cfg(feature = "web")] -use hyper::{ Request, Response, body::{ Bytes, Incoming }, service::service_fn }; -#[cfg(feature = "web")] -use hyper_util::rt::TokioIo; -#[cfg(feature = "web")] -use hyper_tungstenite::{HyperWebsocket, tungstenite::Message}; -#[cfg(feature = "web")] -use tokio_stream::wrappers::TcpListenerStream; -#[cfg(feature = "web")] -use futures_util::stream::StreamExt; -#[cfg(feature = "web")] -use rustls_acme::{ AcmeConfig, AccountCache, CertCache }; -#[cfg(feature = "web")] -use serde::Serialize; -#[cfg(feature = "web")] -use tokio::net::TcpListener; -#[cfg(feature = "web")] -use ring::digest::{ Context, SHA256 }; -#[cfg(feature = "web")] -use async_trait::async_trait; -#[cfg(feature = "web")] -use base64::{ Engine as _, engine::general_purpose::STANDARD as base64 }; +use hyper_tungstenite::tungstenite::Message; #[cfg(feature = "tui")] -use termion::{ raw::IntoRawMode, screen::IntoAlternateScreen, screen::AlternateScreen, raw::RawTerminal, input::TermRead, event::Key }; +use crate::{ timestamp, timestamp_from, tui::{ draw, start_tui } }; #[cfg(feature = "tui")] -use tui::{ Terminal, Frame, backend::{ Backend, TermionBackend }, widgets::{ Block, Borders, List, ListItem, Table, Row }, layout::{ Layout, Constraint, Direction, Corner }, text::{ Span, Spans }, style::{ Style, Color } }; +use termion::{ input::TermRead, event::Key }; -static HISTSIZE: usize = 1440; static THRESHOLD: u16 = 4; static MAX_LINGER: u64 = 86400; // Seconds to keep visualising links that are down -#[cfg(feature = "web")] -static INDEX_FILE: &str = include_str!("../web/index.html"); -#[cfg(feature = "web")] -static ICON_FILE: &[u8] = include_bytes!("../web/favicon.ico"); trait GraphExt { fn find_node(&self, name: &str) -> Option; @@ -120,192 +92,6 @@ impl GraphExt for UnGraph { } } -struct PingResult { - node: String, - intf: String, - port: String, - min: u16, - losspct: f32, - last: Option, - hist: VecDeque, - statesince: u64 // Unix timestamp -} -impl PingResult { - fn new(node: String, intf: String, port: String) -> PingResult { - PingResult { node, intf, port, min: u16::MAX, losspct: 0.0, last: None, hist: VecDeque::with_capacity(HISTSIZE), statesince: unixtime() } - } - fn push_hist(&mut self, result: u16) { - if self.hist.len() >= HISTSIZE { self.hist.pop_back().unwrap(); } - self.hist.push_front(result); - } -} -impl Ord for PingResult { - fn cmp(&self, other: &Self) -> cmp::Ordering { - match self.node.cmp(&other.node) { - cmp::Ordering::Equal => { - match self.port.cmp(&other.port) { // TODO: better comparison for ip addresses - cmp::Ordering::Equal => self.intf.cmp(&other.intf), - ord => ord - } - }, - ord => ord - } - } -} -impl Eq for PingResult {} -impl PartialOrd for PingResult { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} -impl PartialEq for PingResult { - fn eq(&self, other: &Self) -> bool { - self.node == other.node && self.port == other.port && self.intf == other.intf - } -} - -#[derive(Default)] -struct Data { - log: RwLock>, - ping: RwLock>, - intf: RwLock>, - results: RwLock>, - pathcache: RwLock> -} -impl Data { - fn push_log(&self, ts: u64, line: String) { - let mut log = self.log.write().unwrap(); - if log.len() >= 50 { log.pop_back().unwrap(); } - log.push_front((ts, line)); - } - fn push_ping(&self, line: String) { - let mut ping = self.ping.write().unwrap(); - if ping.len() >= 50 { ping.pop_back().unwrap(); } - ping.push_front(line); - } -} -struct IntfStats { - #[allow(dead_code)] - symbol: char, - min: u16, - lag: u16 -} - -#[derive(Debug)] -struct Path { - from: String, - to: String, - fromintf: String, - tointf: String, - losspct: u8, - since: u64 -} -impl Path { - fn new(from: String, to: String, fromintf: String, tointf: String, losspct: u8) -> Path { - Path { from, to, fromintf, tointf, losspct, since: unixtime() } - } -} -impl PartialEq for Path { - fn eq(&self, other: &Self) -> bool { - self.from == other.from && self.to == other.to && self.fromintf == other.fromintf && self.tointf == other.tointf - } -} - -#[cfg(feature = "web")] -struct ConfigCache { - config: Arc> -} -#[cfg(feature = "web")] -impl ConfigCache { - fn new(config: &Arc>) -> ConfigCache { - ConfigCache { - config: config.clone() - } - } - fn cached_account_key(&self, contact: &[String], directory_url: impl AsRef) -> String { - let mut ctx = Context::new(&SHA256); - for el in contact { - ctx.update(el.as_ref()); - ctx.update(&[0]) - } - ctx.update(directory_url.as_ref().as_bytes()); - let hash = base64.encode(ctx.finish()); - format!("cached_account_{}", hash) - } - fn cached_cert_key(&self, domains: &[String], directory_url: impl AsRef) -> String { - let mut ctx = Context::new(&SHA256); - for domain in domains { - ctx.update(domain.as_ref()); - ctx.update(&[0]) - } - ctx.update(directory_url.as_ref().as_bytes()); - let hash = base64.encode(ctx.finish()); - format!("cached_cert_{}", hash) - } -} -#[async_trait] -#[cfg(feature = "web")] -impl CertCache for ConfigCache { - type EC = std::io::Error; - async fn load_cert( - &self, - domains: &[String], - directory_url: &str, - ) -> Result>, Self::EC> { - let key = self.cached_cert_key(&domains, directory_url); - Ok(self.config.read().unwrap().cache.get(&key).map(|v| base64.decode(v).unwrap())) - } - async fn store_cert( - &self, - domains: &[String], - directory_url: &str, - cert: &[u8], - ) -> Result<(), Self::EC> { - let key = self.cached_cert_key(&domains, directory_url); - let mut config = self.config.write().unwrap(); - config.cache.insert(key, base64.encode(cert)); - config.modified.store(true, Ordering::Relaxed); - Ok(()) - } -} -#[async_trait] -#[cfg(feature = "web")] -impl AccountCache for ConfigCache { - type EA = std::io::Error; - async fn load_account( - &self, - contact: &[String], - directory_url: &str, - ) -> Result>, Self::EA> { - let key = self.cached_account_key(&contact, directory_url); - Ok(self.config.read().unwrap().cache.get(&key).map(|v| base64.decode(v).unwrap())) - } - - async fn store_account( - &self, - contact: &[String], - directory_url: &str, - account: &[u8], - ) -> Result<(), Self::EA> { - let key = self.cached_account_key(&contact, directory_url); - let mut config = self.config.write().unwrap(); - config.cache.insert(key, base64.encode(account)); - config.modified.store(true, Ordering::Relaxed); - Ok(()) - } -} - -#[cfg(feature = "tui")] -fn start_tui(data: Arc) -> Option>>>> { - let stdout = std::io::stdout().into_raw_mode().unwrap(); - let stdout = stdout.into_alternate_screen().unwrap(); - let backend = TermionBackend::new(stdout); - let mut term = Terminal::new(backend).unwrap(); - term.clear().unwrap(); - term.draw(|f| draw(f, data)).unwrap(); - Some(term) -} - pub async fn run(aconfig: Arc>, mut rx: sync::mpsc::Receiver, ctrltx: sync::mpsc::Sender, udptx: sync::mpsc::Sender) { let mut peers = HashMap::new(); let mynode = graph::NodeIndex::new(0); @@ -327,69 +113,14 @@ pub async fn run(aconfig: Arc>, mut rx: sync::mpsc::Receiver x, - Err(e) => { - ctrltx.send(Control::Log(LogLevel::Error, format!("Failed to start http server on {arg}: {e}"))).await.unwrap(); - return; - } - }; - ctrltx.send(Control::Log(LogLevel::Info, format!("Started HTTP server on {}", arg))).await.unwrap(); - while let Ok((stream, addr)) = tcp_listener.accept().await { - if debug { ctrltx.send(Control::Log(LogLevel::Debug, format!("Incoming HTTP connection from {}", addr))).await.unwrap(); } - let conn = http.serve_connection(TokioIo::new(stream), service.clone()).with_upgrades(); - if let Err(e) = tokio::spawn(async move { conn.await }).await { - ctrltx.send(Control::Log(LogLevel::Error, format!("Error: {e}"))).await.unwrap(); - } - } - }); + run_http(config, data, ctrltx, arg, debug); } #[cfg(feature = "web")] if let Some(arg) = https { let config = aconfig.clone(); let data = data.clone(); let ctrltx = ctrltx.clone(); - tokio::spawn(async move { - let http = hyper::server::conn::http1::Builder::new(); - let aconfig = config.clone(); - let service = service_fn(move |req| { - // if debug { println!("{} Received HTTPS request {} {}", timestamp(), req.method(), req.uri()); } - handle_http(req, aconfig.clone(), data.clone()) - }); - let tcp_listener = match TcpListener::bind(&arg).await { - Ok(x) => x, - Err(e) => { - ctrltx.send(Control::Log(LogLevel::Error, format!("Failed to start https server on {arg}: {e}"))).await.unwrap(); - return; - } - }; - let domain = arg.rsplit_once(':').expect("No colon found in --https argument").0; - if domain.contains(':') || domain.find(char::is_alphabetic).is_none() { - ctrltx.send(Control::Log(LogLevel::Error, format!("Cannot use bare IP address with --https; use a fully qualified domain name"))).await.unwrap(); - return; - } - let tcp_incoming = TcpListenerStream::new(tcp_listener); - let mut tls_incoming = AcmeConfig::new([ &domain ]) - .contact_push(format!("mailto:{}", letsencrypt.unwrap())) - .cache(ConfigCache::new(&config)) - .directory_lets_encrypt(true) - .tokio_incoming(tcp_incoming, Vec::new()); - ctrltx.send(Control::Log(LogLevel::Info, format!("Started HTTPS server on {}", arg))).await.unwrap(); - while let Some(tls) = tls_incoming.next().await { - let stream = tls.unwrap(); - if debug { ctrltx.send(Control::Log(LogLevel::Debug, format!("Incoming HTTPS connection from {}", stream.get_ref().get_ref().0.get_ref().peer_addr().unwrap_or("0.0.0.0:0".parse().unwrap())))).await.unwrap(); } - let conn = http.serve_connection(TokioIo::new(stream), service.clone()).with_upgrades(); - if let Err(e) = tokio::spawn(async move { conn.await }).await { - ctrltx.send(Control::Log(LogLevel::Error, format!("Error: {e}"))).await.unwrap(); - } - } - }); + run_https(config, data, ctrltx, arg, letsencrypt, debug); } #[cfg(feature = "tui")] @@ -1021,262 +752,3 @@ fn check_loss_port(result: &mut PingResult) { result.losspct = 0.0; } } - -#[cfg(feature = "tui")] -fn draw(f: &mut Frame, data: Arc) { - let resultssize = match data.results.read().unwrap().len() { 0 => 3, n => n+2 } as u16; - let vert1 = Layout::default() - .direction(Direction::Vertical) - .constraints([Constraint::Min(12), Constraint::Length(resultssize) ].as_ref()) - .split(f.size()); - let hori = Layout::default() - .direction(Direction::Horizontal) - .constraints([Constraint::Max(105), Constraint::Percentage(50)].as_ref()) - .split(vert1[0]); - let vert2 = Layout::default() - .direction(Direction::Vertical) - .constraints([Constraint::Min(6), Constraint::Length((data.intf.read().unwrap().len()+3) as u16)].as_ref()) - .split(hori[1]); - - let block = Block::default() - .title(" Ping results ") - .borders(Borders::ALL); - let mut content: Vec = vec![]; - for line in data.ping.read().unwrap().iter().take(hori[0].height.into()) { - content.push(ListItem::new(Span::from(line.clone()))); - } - let list = List::new(content).block(block).start_corner(Corner::BottomLeft); - f.render_widget(list, hori[0]); - - let block = Block::default() - .title(" Network log ") - .borders(Borders::ALL); - let mut content: Vec = vec![]; - for (ts, line) in data.log.read().unwrap().iter().take(vert2[0].height.into()) { - content.push(ListItem::new(Span::from(format!("{} {}", timestamp_from(*ts), line)))); - } - let list = List::new(content).block(block).start_corner(Corner::BottomLeft); - f.render_widget(list, vert2[0]); - - let block = Block::default() - .title(" Local interface stats ") - .borders(Borders::ALL); - let mut content: Vec = vec![]; - { - let intf = data.intf.read().unwrap(); - let mut rows: Vec<_> = intf.iter().collect(); - rows.sort_by(|a, b| a.1.symbol.cmp(&b.1.symbol)); - for (intf, stats) in rows { - content.push(Row::new(vec![ format!(" {} ", stats.symbol), shorten_ipv6(intf.clone()), format!("{:^5}", stats.min), format!("{:^5}", stats.lag) ])); - } - } - let table = Table::new(content) - .block(block) - .column_spacing(1) - .header(Row::new(vec![ "Sym", "Interface", "Best", "Lag" ])) - .widths(&[Constraint::Length(3), Constraint::Length(26), Constraint::Length(5), Constraint::Length(5)]); - f.render_widget(table, vert2[1]); - - let block = Block::default() - .title(" Results grid ") - .borders(Borders::ALL); - let mut content: Vec = vec![]; - let mut prev = String::new(); - let mut mark; - for result in data.results.read().unwrap().iter() { - if prev != result.node { - prev = result.node.clone(); - mark = "▔"; - } - else { mark = " "; } - let symbol = match data.intf.read().unwrap().get(&result.intf) { - Some(i) => i.symbol, - None => ' ' - }; - let header = format!("{:10} {:26} {} ", result.node, shorten_ipv6(result.port.to_string()), symbol); - let mut line = Vec::with_capacity((vert1[1].width).into()); - line.push(Span::from(header)); - if let Some(rtt) = result.last { - line.push(draw_mark(rtt, result.min, mark)); - } - else { line.push(Span::raw(" ")); } - for rtt in result.hist.iter().take((vert1[1].width-43).into()) { - line.push(draw_mark(*rtt, result.min, mark)); - } - content.push(ListItem::new(Spans::from(line))); - } - if content.is_empty() { content.push(ListItem::new("No results yet")); } - let list = List::new(content).block(block).start_corner(Corner::TopLeft); - f.render_widget(list, vert1[1]); -} - -#[cfg(feature = "tui")] -fn draw_mark(rtt: u16, min: u16, mark: &'static str) -> Span<'static> { - lazy_static!{ - static ref STYLES: Vec