Skip to content

Commit

Permalink
Rework shutdown handling
Browse files Browse the repository at this point in the history
  • Loading branch information
keesverruijt committed Aug 21, 2024
1 parent fc1b309 commit c48ae01
Show file tree
Hide file tree
Showing 8 changed files with 522 additions and 203 deletions.
285 changes: 271 additions & 14 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ enum-primitive-derive = "0.3.0"
env_logger = "0.11.5"
libc = "0.2.156"
log = "0.4.22"
miette = { version = "7.2.0", features = ["fancy"] }
network-interface = "2.0.0"
num-traits = "0.2.19"
serde = { version = "1.0.206", features = ["derive", "serde_derive"] }
serde_json = "1.0.125"
socket2 = "0.5.7"
terminal_size = "0.3.0"
thiserror = "1.0.63"
time = { version = "0.3.36", features = ["formatting"] }
tokio = { version = "1", features = ["full"] }
tokio-graceful-shutdown = "0.15.1"
tokio-shutdown = "0.1.4"
tokio-tungstenite = "0.23.1"

237 changes: 135 additions & 102 deletions src/locator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,41 @@
//

use std::collections::HashMap;
use std::io::{self, ErrorKind};
use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::{Arc, RwLock};
use std::time::Duration;

use async_trait::async_trait;
use log::{debug, error, info, trace, warn};
use miette::Result;
use network_interface::{NetworkInterface, NetworkInterfaceConfig};
use serde::Serialize;
use thiserror::Error;
use tokio::net::UdpSocket;
use tokio::task::JoinSet;
use tokio::time::sleep;
use tokio_shutdown;
use tokio_shutdown::Shutdown;
use tokio_graceful_shutdown::SubsystemHandle;

use crate::radar::Radars;
use crate::{navico, util};

#[derive(Error, Debug)]
pub enum LocatorError {
#[error("Multicast socket operation failed")]
Io(#[from] io::Error),
#[error("Interface '{0}' is not available")]
InterfaceNotFound(String),
#[error("Interface '{0}' has no valid IPv4 address")]
InterfaceNoV4(String),
#[error("Cannot detect Ethernet devices")]
EnumerationFailed,
#[error("Timeout")]
Timeout,
#[error("Shutdown")]
Shutdown,
}

#[derive(PartialEq, Eq, Copy, Clone, Serialize, Debug)]
pub enum LocatorId {
GenBR24,
Expand Down Expand Up @@ -53,8 +70,8 @@ pub struct RadarListenAddress {
&SocketAddr, // from
&Ipv4Addr, // nic_addr
&Arc<RwLock<Radars>>,
&Shutdown,
) -> io::Result<()>,
&SubsystemHandle,
) -> Result<(), io::Error>,
}

// The only part of RadioListenAddress that isn't Send is process, but since this is static it really
Expand All @@ -72,8 +89,8 @@ impl RadarListenAddress {
&SocketAddr,
&Ipv4Addr,
&Arc<RwLock<Radars>>,
&Shutdown,
) -> io::Result<()>,
&SubsystemHandle,
) -> Result<(), io::Error>,
) -> RadarListenAddress {
RadarListenAddress {
id,
Expand All @@ -93,8 +110,8 @@ struct LocatorInfo {
&SocketAddr, // from
&Ipv4Addr, // nic_addr
&Arc<RwLock<Radars>>,
&Shutdown,
) -> io::Result<()>,
&SubsystemHandle,
) -> Result<(), io::Error>,
}

// The only part of LocatorInfo that isn't Send is process, but since this is static it really
Expand All @@ -114,100 +131,116 @@ struct InterfaceState {
first_loop: bool,
}

pub async fn new(
radars: &Arc<RwLock<Radars>>,
interface: Option<String>,
shutdown: Shutdown,
) -> io::Result<()> {
let navico_locator = navico::create_locator();
let navico_br24_locator = navico::create_br24_locator();
//let mut garmin_locator = garmin::create_locator();

let mut listen_addresses: Vec<RadarListenAddress> = Vec::new();
navico_locator.update_listen_addresses(&mut listen_addresses);
navico_br24_locator.update_listen_addresses(&mut listen_addresses);
// garmin_locator.update_listen_addresses(&listen_addresses);

info!("Entering loop, listening for radars");
let mut interface_state = InterfaceState {
interface,
active_nic_addresses: Vec::new(),
inactive_nic_names: HashMap::new(),
lost_nic_names: HashMap::new(),
first_loop: true,
};

loop {
// create a list of sockets for all listen addresses
let shutdown_handle = shutdown.clone().handle();
let sockets = create_multicast_sockets(&listen_addresses, &mut interface_state);
let mut set = JoinSet::new();
if sockets.is_err() {
if interface_state.interface.is_some() {
return Err(sockets.err().unwrap());
}
debug!("No NIC addresses found");
sleep(Duration::from_millis(5000)).await;
}
let sockets = sockets.unwrap();
pub struct Locator {
pub radars: Arc<RwLock<Radars>>,
pub interface: Option<String>,
}

for socket in sockets {
spawn_receive(&mut set, socket);
}
set.spawn(async move {
shutdown_handle.await;
Err(io::Error::new(ErrorKind::WriteZero, "shutdown"))
});
set.spawn(async move {
sleep(Duration::from_millis(30000)).await;
Err(io::Error::new(ErrorKind::Other, "timeout"))
});

// Now that we're listening to the radars, send any ping (wake) packets
{
for x in &listen_addresses {
if let Some(ping) = x.adress_request_packet {
send_multicast_packet(&x.address, ping);
impl Locator {
pub fn new(radars: Arc<RwLock<Radars>>, interface: Option<String>) -> Self {
Locator { radars, interface }
}

pub async fn run(self, subsys: SubsystemHandle) -> Result<(), LocatorError> {
let radars = self.radars;
let navico_locator = navico::create_locator();
let navico_br24_locator = navico::create_br24_locator();
//let mut garmin_locator = garmin::create_locator();

let mut listen_addresses: Vec<RadarListenAddress> = Vec::new();
navico_locator.update_listen_addresses(&mut listen_addresses);
navico_br24_locator.update_listen_addresses(&mut listen_addresses);
// garmin_locator.update_listen_addresses(&listen_addresses);

info!("Entering loop, listening for radars");
let mut interface_state = InterfaceState {
interface: self.interface,
active_nic_addresses: Vec::new(),
inactive_nic_names: HashMap::new(),
lost_nic_names: HashMap::new(),
first_loop: true,
};

loop {
let cancellation_token = subsys.create_cancellation_token();

// create a list of sockets for all listen addresses
let sockets = create_multicast_sockets(&listen_addresses, &mut interface_state);
let mut set = JoinSet::new();
if sockets.is_err() {
if interface_state.interface.is_some() {
return Err(sockets.err().unwrap());
}
debug!("No NIC addresses found");
sleep(Duration::from_millis(5000)).await;
}
};
let sockets = sockets.unwrap();

while let Some(join_result) = set.join_next().await {
match join_result {
Ok(join_result) => {
match join_result {
Ok((socket, addr, buf)) => {
trace!("{} via {} -> {:02X?}", &addr, &socket.nic_addr, &buf);

let _ =
(socket.process)(&buf, &addr, &socket.nic_addr, radars, &shutdown);
// Respawn this task
spawn_receive(&mut set, socket);
}
Err(e) => {
if e.kind() == ErrorKind::WriteZero && e.to_string() == "shutdown" {
// Shutdown!
info!("Locator shutdown");
return Ok(());
for socket in sockets {
spawn_receive(&mut set, socket);
}
set.spawn(async move {
cancellation_token.cancelled().await;
Err(LocatorError::Shutdown)
});
set.spawn(async move {
sleep(Duration::from_millis(30000)).await;
Err(LocatorError::Timeout)
});

// Now that we're listening to the radars, send any address request (wake) packets
{
for x in &listen_addresses {
if let Some(address_request) = x.adress_request_packet {
send_multicast_packet(&x.address, address_request);
}
}
};

while let Some(join_result) = set.join_next().await {
match join_result {
Ok(join_result) => {
match join_result {
Ok((socket, addr, buf)) => {
trace!("{} via {} -> {:02X?}", &addr, &socket.nic_addr, &buf);

let _ = (socket.process)(
&buf,
&addr,
&socket.nic_addr,
&radars,
&subsys,
);
// Respawn this task
spawn_receive(&mut set, socket);
}
if e.kind() == ErrorKind::Other && e.to_string() == "timeout" {
// Loop, reread everything
break;
Err(e) => {
match e {
LocatorError::Shutdown => {
info!("Locator shutdown");
return Ok(());
}
LocatorError::Timeout => {
// Loop, reread everything
break;
}
_ => {}
}
debug!("receive error: {}", e);
}
debug!("receive error: {}", e);
}
}
}
Err(e) => {
debug!("JoinError: {}", e);
}
};
Err(e) => {
debug!("JoinError: {}", e);
}
};
}
}
}
}

fn spawn_receive(
set: &mut JoinSet<Result<(LocatorInfo, SocketAddr, Vec<u8>), io::Error>>,
set: &mut JoinSet<Result<(LocatorInfo, SocketAddr, Vec<u8>), LocatorError>>,
socket: LocatorInfo,
) {
set.spawn(async move {
Expand All @@ -216,7 +249,7 @@ fn spawn_receive(

match res {
Ok((_, addr)) => Ok((socket, addr, buf)),
Err(e) => Err(e),
Err(e) => Err(LocatorError::Io(e)),
}
});
}
Expand Down Expand Up @@ -251,7 +284,7 @@ fn send_multicast_packet(addr: &SocketAddr, msg: &[u8]) {
fn create_multicast_sockets(
listen_addresses: &Vec<RadarListenAddress>,
interface_state: &mut InterfaceState,
) -> io::Result<Vec<LocatorInfo>> {
) -> Result<Vec<LocatorInfo>, LocatorError> {
let only_interface = &interface_state.interface;

match NetworkInterface::show() {
Expand Down Expand Up @@ -325,6 +358,13 @@ fn create_multicast_sockets(
}
}
}
if interface_state.interface.is_some()
&& interface_state.active_nic_addresses.len() == 0
{
return Err(LocatorError::InterfaceNoV4(
interface_state.interface.clone().unwrap(),
));
}
}
if !active && only_interface.is_none() {
if interface_state
Expand All @@ -349,19 +389,12 @@ fn create_multicast_sockets(
if interface_state.interface.is_some()
&& interface_state.active_nic_addresses.len() == 0
{
return Err(io::Error::new(
io::ErrorKind::NotFound,
format!(
"No interface '{}' with IPv4 address found",
interface_state.interface.clone().unwrap()
),
return Err(LocatorError::InterfaceNotFound(
interface_state.interface.clone().unwrap(),
));
}
Ok(sockets)
}
Err(e) => {
error!("Unable to list Ethernet interfaces on this platform: {}", e);
Err(io::Error::last_os_error())
}
Err(_) => Err(LocatorError::EnumerationFailed),
}
}
Loading

0 comments on commit c48ae01

Please sign in to comment.