diff --git a/src/navico/data.rs b/src/navico/data.rs index f4b29d0..63f8314 100644 --- a/src/navico/data.rs +++ b/src/navico/data.rs @@ -2,9 +2,9 @@ use bincode::deserialize; use log::{debug, trace, warn}; use serde::Deserialize; use std::io; -use std::sync::Arc; use std::time::Duration; use tokio::net::UdpSocket; +use tokio::sync::mpsc::Receiver; use tokio::time::sleep; use tokio_graceful_shutdown::SubsystemHandle; @@ -12,7 +12,7 @@ use crate::locator::LocatorId; use crate::radar::*; use crate::util::{create_multicast, PrintableSpoke}; -use super::NavicoSettings; +use super::DataUpdate; // Length of a spoke in pixels. Every pixel is 4 bits (one nibble.) const NAVICO_SPOKE_LEN: usize = 1024; @@ -105,55 +105,28 @@ enum LookupSpokeEnum { HighApproaching = 5, } -const LOOKUP_PIXEL_VALUE: [[u8; 256]; 6] = { - let mut lookup: [[u8; 256]; 6] = [[0; 256]; 6]; - // Cannot use for() in const expr, so use while instead - let mut j: usize = 0; - while j < 256 { - let low: u8 = j as u8 & 0x0f; - let high: u8 = (j as u8 >> 4) & 0x0f; - - lookup[LookupSpokeEnum::LowNormal as usize][j] = BLOB_NORMAL_START + low; - lookup[LookupSpokeEnum::LowBoth as usize][j] = match low { - 0x0f => BLOB_DOPPLER_APPROACHING, - 0x0e => BLOB_DOPPLER_RECEDING, - _ => BLOB_NORMAL_START + low, - }; - lookup[LookupSpokeEnum::LowApproaching as usize][j] = match low { - 0x0f => BLOB_DOPPLER_APPROACHING, - _ => BLOB_NORMAL_START + low, - }; - lookup[LookupSpokeEnum::HighNormal as usize][j] = high; - lookup[LookupSpokeEnum::HighBoth as usize][j] = match high { - 0x0f => BLOB_DOPPLER_APPROACHING, - 0x0e => BLOB_DOPPLER_RECEDING, - _ => BLOB_NORMAL_START + high, - }; - lookup[LookupSpokeEnum::HighApproaching as usize][j] = match high { - 0x0f => BLOB_DOPPLER_APPROACHING, - _ => BLOB_NORMAL_START + high, - }; - j += 1; - } - lookup -}; - -pub struct Receive { +pub struct NavicoDataReceiver { statistics: Statistics, info: RadarInfo, buf: Vec, sock: Option, - settings: Arc, + rx: Receiver, + doppler: DopplerMode, + legend: Option, + pixel_to_blob: Option<[[u8; 256]; 6]>, } -impl Receive { - pub fn new(info: RadarInfo, settings: Arc) -> Receive { - Receive { +impl NavicoDataReceiver { + pub fn new(info: RadarInfo, rx: Receiver) -> NavicoDataReceiver { + NavicoDataReceiver { statistics: Statistics { broken_packets: 0 }, info: info, buf: Vec::with_capacity(size_of::()), sock: None, - settings, + rx, + doppler: DopplerMode::None, + legend: None, + pixel_to_blob: None, } } @@ -178,12 +151,61 @@ impl Receive { } } + fn fill_pixel_to_blob(&mut self, legend: &Legend) { + let mut lookup: [[u8; 256]; 6] = [[0; 256]; 6]; + // Cannot use for() in const expr, so use while instead + let mut j: usize = 0; + while j < 256 { + let low: u8 = j as u8 & 0x0f; + let high: u8 = (j as u8 >> 4) & 0x0f; + + lookup[LookupSpokeEnum::LowNormal as usize][j] = low; + lookup[LookupSpokeEnum::LowBoth as usize][j] = match low { + 0x0f => legend.doppler_approaching, + 0x0e => legend.doppler_receding, + _ => low, + }; + lookup[LookupSpokeEnum::LowApproaching as usize][j] = match low { + 0x0f => legend.doppler_approaching, + _ => low, + }; + lookup[LookupSpokeEnum::HighNormal as usize][j] = high; + lookup[LookupSpokeEnum::HighBoth as usize][j] = match high { + 0x0f => legend.doppler_approaching, + 0x0e => legend.doppler_receding, + _ => high, + }; + lookup[LookupSpokeEnum::HighApproaching as usize][j] = match high { + 0x0f => legend.doppler_approaching, + _ => high, + }; + j += 1; + } + self.pixel_to_blob = Some(lookup); + } + + fn handle_data_update(&mut self, r: Option) { + match r { + Some(DataUpdate::Doppler(doppler)) => { + self.doppler = doppler; + } + Some(DataUpdate::Legend(legend)) => { + self.fill_pixel_to_blob(&legend); + self.legend = Some(legend); + } + None => {} + } + } + async fn socket_loop(&mut self, subsys: &SubsystemHandle) -> Result<(), RadarError> { loop { tokio::select! { biased; _ = subsys.on_shutdown_requested() => { return Err(RadarError::Shutdown); }, + r = self.rx.recv() => { + self.handle_data_update(r); + }, r = self.sock.as_ref().unwrap().recv_buf_from(&mut self.buf) => { match r { Ok(_) => { @@ -239,8 +261,6 @@ impl Receive { } } - let doppler_mode = self.settings.doppler.load(); - debug!("Received UDP frame with {} spokes", &scanlines_in_packet); let mut offset: usize = FRAME_HEADER_LENGTH; @@ -255,7 +275,8 @@ impl Receive { Ok(header) => { trace!("Received {:04} header {:?}", scanline, header); - if let Some((range, angle, heading)) = Receive::validate_header(&header) + if let Some((range, angle, heading)) = + NavicoDataReceiver::validate_header(&header) { debug!("range {} angle {} heading {}", range, angle, heading); debug!( @@ -263,13 +284,7 @@ impl Receive { scanline, PrintableSpoke::new(spoke_slice), ); - self.process_spoke( - range, - angle, - heading, - doppler_mode, - spoke_slice, - ); + self.process_spoke(range, angle, heading, spoke_slice); } else { warn!("Invalid spoke: header {:02X?}", &header_slice); self.statistics.broken_packets += 1; @@ -287,7 +302,7 @@ impl Receive { trace!("Received {:04} header {:?}", scanline, header); if let Some((range, angle, heading)) = - Receive::validate_br24_header(&header) + NavicoDataReceiver::validate_br24_header(&header) { debug!("range {} angle {} heading {}", range, angle, heading); trace!( @@ -295,13 +310,7 @@ impl Receive { scanline, PrintableSpoke::new(spoke_slice), ); - self.process_spoke( - range, - angle, - heading, - doppler_mode, - spoke_slice, - ); + self.process_spoke(range, angle, heading, spoke_slice); } else { warn!("Invalid spoke: header {:02X?}", &header_slice); self.statistics.broken_packets += 1; @@ -369,40 +378,35 @@ impl Receive { Some((range, heading, angle)) } - fn process_spoke( - &self, - range: u32, - angle: u16, - heading: i16, - doppler_mode: DopplerMode, - spoke: &[u8], - ) { - // Convert the spoke data to bytes - let mut generic_spoke: Vec = Vec::with_capacity(1024); - let low_nibble_index = match doppler_mode { - DopplerMode::None => LookupSpokeEnum::LowNormal, - DopplerMode::Both => LookupSpokeEnum::LowBoth, - DopplerMode::Approaching => LookupSpokeEnum::LowApproaching, - } as usize; - let high_nibble_index = match doppler_mode { - DopplerMode::None => LookupSpokeEnum::HighNormal, - DopplerMode::Both => LookupSpokeEnum::HighBoth, - DopplerMode::Approaching => LookupSpokeEnum::HighApproaching, - } as usize; - - for pixel in spoke { - let pixel = *pixel as usize; - generic_spoke.push(LOOKUP_PIXEL_VALUE[low_nibble_index][pixel]); - generic_spoke.push(LOOKUP_PIXEL_VALUE[high_nibble_index][pixel]); - } + fn process_spoke(&self, range: u32, angle: u16, heading: i16, spoke: &[u8]) { + if let Some(pixel_to_blob) = self.pixel_to_blob { + // Convert the spoke data to bytes + let mut generic_spoke: Vec = Vec::with_capacity(1024); + let low_nibble_index = match self.doppler { + DopplerMode::None => LookupSpokeEnum::LowNormal, + DopplerMode::Both => LookupSpokeEnum::LowBoth, + DopplerMode::Approaching => LookupSpokeEnum::LowApproaching, + } as usize; + let high_nibble_index = match self.doppler { + DopplerMode::None => LookupSpokeEnum::HighNormal, + DopplerMode::Both => LookupSpokeEnum::HighBoth, + DopplerMode::Approaching => LookupSpokeEnum::HighApproaching, + } as usize; + + for pixel in spoke { + let pixel = *pixel as usize; + generic_spoke.push(pixel_to_blob[low_nibble_index][pixel]); + generic_spoke.push(pixel_to_blob[high_nibble_index][pixel]); + } - trace!( - "Spoke {}/{}/{} len {}", - range, - heading, - angle, - generic_spoke.len() - ); + trace!( + "Spoke {}/{}/{} len {}", + range, + heading, + angle, + generic_spoke.len() + ); + } } } /* diff --git a/src/navico/mod.rs b/src/navico/mod.rs index 808cbc0..7d4aa12 100644 --- a/src/navico/mod.rs +++ b/src/navico/mod.rs @@ -7,10 +7,11 @@ use serde::Deserialize; use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}; use std::sync::{Arc, RwLock}; use std::{fmt, io}; +use tokio::sync::mpsc; use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle}; use crate::locator::{LocatorId, RadarListenAddress, RadarLocator}; -use crate::radar::{located, DopplerMode, RadarInfo, Radars}; +use crate::radar::{located, DopplerMode, Legend, RadarInfo, Radars}; use crate::util::c_string; use crate::util::PrintableSlice; @@ -21,6 +22,12 @@ mod report; const NAVICO_BEACON_ADDRESS: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(236, 6, 7, 5)), 6878); +// Messages sent to Data receiver +pub enum DataUpdate { + Doppler(DopplerMode), + Legend(Legend), +} + #[derive(Deserialize, Debug, Copy, Clone)] struct NetworkSocketAddrV4 { addr: Ipv4Addr, @@ -224,7 +231,6 @@ impl fmt::Display for Model { #[derive(Debug)] pub struct NavicoSettings { radars: Arc>, - doppler: AtomicCell, model: AtomicCell, } @@ -237,10 +243,10 @@ fn found(info: RadarInfo, radars: &Arc>, subsys: &SubsystemHandle // It's new, start the RadarProcessor thread let navico_settings = Arc::new(NavicoSettings { radars: radars.clone(), - doppler: AtomicCell::new(DopplerMode::None), model: AtomicCell::new(Model::Unknown), }); + let (tx_data, rx_data) = mpsc::channel(10); let command_sender = command::Command::new(info.clone(), navico_settings.clone()); // Clone everything moved into future twice or more @@ -249,9 +255,13 @@ fn found(info: RadarInfo, radars: &Arc>, subsys: &SubsystemHandle let info_clone = info.clone(); let navico_settings_clone = navico_settings.clone(); - let data_receiver = data::Receive::new(info, navico_settings); - let report_receiver = - report::Receive::new(info_clone, navico_settings_clone, command_sender); + let data_receiver = data::NavicoDataReceiver::new(info, rx_data); + let report_receiver = report::NavicoReportReceiver::new( + info_clone, + navico_settings_clone, + command_sender, + tx_data, + ); subsys.start(SubsystemBuilder::new(data_name, move |s| { data_receiver.run(s) diff --git a/src/navico/report.rs b/src/navico/report.rs index 315c80f..8bf3add 100644 --- a/src/navico/report.rs +++ b/src/navico/report.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use std::time::Duration; use std::{fmt, io}; use tokio::net::UdpSocket; +use tokio::sync::mpsc::Sender; use tokio::time::{sleep, sleep_until, Instant}; use tokio_graceful_shutdown::SubsystemHandle; @@ -13,15 +14,16 @@ use crate::radar::{DopplerMode, RadarError, RadarInfo}; use crate::util::{c_wide_string, create_multicast}; use super::command::{self, Command}; -use super::{Model, NavicoSettings}; +use super::{DataUpdate, Model, NavicoSettings}; -pub struct Receive { +pub struct NavicoReportReceiver { info: RadarInfo, key: String, buf: Vec, sock: Option, settings: Arc, command_sender: Command, + data_tx: Sender, subtype_timeout: Instant, subtype_repeat: Duration, } @@ -138,10 +140,15 @@ impl RadarReport8_21 { const REPORT_08_C4_18_OR_21: u8 = 0x08; -impl Receive { - pub fn new(info: RadarInfo, settings: Arc, command_sender: Command) -> Receive { +impl NavicoReportReceiver { + pub fn new( + info: RadarInfo, + settings: Arc, + command_sender: Command, + data_tx: Sender, + ) -> NavicoReportReceiver { let key = info.key(); - Receive { + NavicoReportReceiver { key: key, info: info, buf: Vec::with_capacity(1024), @@ -150,6 +157,7 @@ impl Receive { command_sender, subtype_timeout: Instant::now(), subtype_repeat: Duration::from_millis(5000), + data_tx, } } @@ -223,7 +231,7 @@ impl Receive { } } - fn process_report(&mut self) -> Result<(), Error> { + async fn process_report(&mut self) -> Result<(), Error> { let data = &self.buf; trace!("{}: report received: {:02X?}", self.key, data); @@ -238,13 +246,13 @@ impl Receive { let report_identification = data[0]; match report_identification { REPORT_01_C4_18 => { - return self.process_report_01(); + return self.process_report_01().await; } REPORT_03_C4_129 => { - return self.process_report_03(); + return self.process_report_03().await; } REPORT_08_C4_18_OR_21 => { - return self.process_report_08(); + return self.process_report_08().await; } _ => { bail!( @@ -256,7 +264,7 @@ impl Receive { }; } - fn process_report_01(&mut self) -> Result<(), Error> { + async fn process_report_01(&mut self) -> Result<(), Error> { let report = RadarReport1_18::transmute(&self.buf)?; let status: Result = report.status.try_into(); @@ -267,7 +275,7 @@ impl Receive { Ok(()) } - fn process_report_03(&mut self) -> Result<(), Error> { + async fn process_report_03(&mut self) -> Result<(), Error> { let report = RadarReport3_129::transmute(&self.buf)?; let model = report.model; let hours = u32::from_le_bytes(report.hours); @@ -291,6 +299,11 @@ impl Receive { if let Some(info) = radars.info.get_mut(&self.key) { info.model = Some(format!("{}", model)); info.set_legend(model == Model::HALO); + if let Some(legend) = &info.legend { + self.data_tx + .send(DataUpdate::Legend(legend.clone())) + .await?; + } } } } @@ -301,7 +314,7 @@ impl Receive { Ok(()) } - fn process_report_08(&mut self) -> Result<(), Error> { + async fn process_report_08(&mut self) -> Result<(), Error> { let data = &self.buf; if data.len() != 18 && data.len() != 21 { @@ -342,9 +355,10 @@ impl Receive { } Ok(doppler_mode) => { debug!( - "{}: doppler state={} speed={}", + "{}: doppler mode={} speed={}", self.key, doppler_mode, doppler_speed ); + self.data_tx.send(DataUpdate::Doppler(doppler_mode)).await?; } } } diff --git a/src/radar.rs b/src/radar.rs index a5209d9..1a84e3d 100644 --- a/src/radar.rs +++ b/src/radar.rs @@ -38,24 +38,6 @@ enum PixelType { Normal, } -// The Target Trails code is the same on all radars, and all spoke -// pixel values contain [0..32> as history values. -pub const BLOB_HISTORY_COLORS: u8 = 32; -pub const BLOB_TARGET_BORDER: u8 = 32; -pub const BLOB_DOPPLER_APPROACHING: u8 = 33; -pub const BLOB_DOPPLER_RECEDING: u8 = 34; -pub const BLOB_NORMAL_START: u8 = 35; - -pub fn map_pixel_to_type(p: u8) -> PixelType { - match p { - 0..BLOB_HISTORY_COLORS => PixelType::History, - BLOB_TARGET_BORDER => PixelType::TargetBorder, - BLOB_DOPPLER_APPROACHING => PixelType::DopplerApproaching, - BLOB_DOPPLER_RECEDING => PixelType::DopplerReceding, - _ => PixelType::Normal, - } -} - #[derive(Clone, Debug)] struct Colour { r: u8, @@ -266,6 +248,8 @@ pub fn located(new_info: RadarInfo, radars: &Arc>) -> Option Legend { let mut legend = Legend { pixels: Vec::new(), diff --git a/src/web.rs b/src/web.rs index fa157d2..25162c6 100644 --- a/src/web.rs +++ b/src/web.rs @@ -11,8 +11,8 @@ use log::info; use miette::Result; use serde::Serialize; use std::{ - collections::{BTreeMap, HashMap}, - fmt, io, + collections::HashMap, + io, net::{IpAddr, Ipv4Addr, SocketAddr}, sync::{Arc, RwLock}, }; @@ -20,7 +20,7 @@ use thiserror::Error; use tokio::net::TcpListener; use tokio_graceful_shutdown::SubsystemHandle; -use crate::radar::{Legend, Lookup, Radars}; +use crate::radar::{Legend, Radars}; use crate::VERSION; #[derive(Error, Debug)]