Skip to content

Commit

Permalink
Shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
keesverruijt committed Aug 22, 2024
1 parent 43308be commit 8b37be3
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 54 deletions.
41 changes: 12 additions & 29 deletions src/locator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,14 @@ use log::{debug, error, info, trace, warn};
use miette::Result;
use network_interface::{NetworkInterface, NetworkInterfaceConfig};
use serde::Serialize;
use thiserror::Error;
use tokio::net::UdpSocket;
use tokio::task::JoinSet;
use tokio::time::sleep;
use tokio_graceful_shutdown::SubsystemHandle;

use crate::radar::Radars;
use crate::radar::{RadarError, Radars};
use crate::{navico, util};

#[derive(Error, Debug)]
pub enum LocatorError {
#[error("Multicast socket operation failed")]
Io(#[from] io::Error),
#[error("Interface '{0}' is not available")]
InterfaceNotFound(String),
#[error("Interface '{0}' has no valid IPv4 address")]
InterfaceNoV4(String),
#[error("Cannot detect Ethernet devices")]
EnumerationFailed,
#[error("Timeout")]
Timeout,
#[error("Shutdown")]
Shutdown,
}

#[derive(PartialEq, Eq, Copy, Clone, Serialize, Debug)]
pub enum LocatorId {
GenBR24,
Expand Down Expand Up @@ -141,7 +124,7 @@ impl Locator {
Locator { radars, interface }
}

pub async fn run(self, subsys: SubsystemHandle) -> Result<(), LocatorError> {
pub async fn run(self, subsys: SubsystemHandle) -> Result<(), RadarError> {
let radars = self.radars;
let navico_locator = navico::create_locator();
let navico_br24_locator = navico::create_br24_locator();
Expand Down Expand Up @@ -181,11 +164,11 @@ impl Locator {
}
set.spawn(async move {
cancellation_token.cancelled().await;
Err(LocatorError::Shutdown)
Err(RadarError::Shutdown)
});
set.spawn(async move {
sleep(Duration::from_millis(30000)).await;
Err(LocatorError::Timeout)
Err(RadarError::Timeout)
});

// Now that we're listening to the radars, send any address request (wake) packets
Expand Down Expand Up @@ -216,11 +199,11 @@ impl Locator {
}
Err(e) => {
match e {
LocatorError::Shutdown => {
RadarError::Shutdown => {
info!("Locator shutdown");
return Ok(());
}
LocatorError::Timeout => {
RadarError::Timeout => {
// Loop, reread everything
break;
}
Expand All @@ -240,7 +223,7 @@ impl Locator {
}

fn spawn_receive(
set: &mut JoinSet<Result<(LocatorInfo, SocketAddr, Vec<u8>), LocatorError>>,
set: &mut JoinSet<Result<(LocatorInfo, SocketAddr, Vec<u8>), RadarError>>,
socket: LocatorInfo,
) {
set.spawn(async move {
Expand All @@ -249,7 +232,7 @@ fn spawn_receive(

match res {
Ok((_, addr)) => Ok((socket, addr, buf)),
Err(e) => Err(LocatorError::Io(e)),
Err(e) => Err(RadarError::Io(e)),
}
});
}
Expand Down Expand Up @@ -284,7 +267,7 @@ fn send_multicast_packet(addr: &SocketAddr, msg: &[u8]) {
fn create_multicast_sockets(
listen_addresses: &Vec<RadarListenAddress>,
interface_state: &mut InterfaceState,
) -> Result<Vec<LocatorInfo>, LocatorError> {
) -> Result<Vec<LocatorInfo>, RadarError> {
let only_interface = &interface_state.interface;

match NetworkInterface::show() {
Expand Down Expand Up @@ -361,7 +344,7 @@ fn create_multicast_sockets(
if interface_state.interface.is_some()
&& interface_state.active_nic_addresses.len() == 0
{
return Err(LocatorError::InterfaceNoV4(
return Err(RadarError::InterfaceNoV4(
interface_state.interface.clone().unwrap(),
));
}
Expand Down Expand Up @@ -389,12 +372,12 @@ fn create_multicast_sockets(
if interface_state.interface.is_some()
&& interface_state.active_nic_addresses.len() == 0
{
return Err(LocatorError::InterfaceNotFound(
return Err(RadarError::InterfaceNotFound(
interface_state.interface.clone().unwrap(),
));
}
Ok(sockets)
}
Err(_) => Err(LocatorError::EnumerationFailed),
Err(_) => Err(RadarError::EnumerationFailed),
}
}
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn main() -> Result<()> {
s.start(SubsystemBuilder::new("Webserver", |a| web.run(a)));
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.handle_shutdown_requests(Duration::from_millis(5000))
.await
.map_err(Into::into)
}
39 changes: 23 additions & 16 deletions src/navico/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,33 +178,40 @@ impl Receive {
}
}

async fn socket_loop(&mut self, subsys: &SubsystemHandle) -> io::Result<()> {
async fn socket_loop(&mut self, subsys: &SubsystemHandle) -> Result<(), RadarError> {
loop {
tokio::select! { biased;
_ = subsys.on_shutdown_requested() => {
break;
},
r = self.sock.as_ref().unwrap().recv_buf_from(&mut self.buf) => {
match r {
Ok(_) => {
self.process_frame();

},
Err(e) => { return Err(e); }
}
},
_ = subsys.on_shutdown_requested() => {
return Err(RadarError::Shutdown);
},
r = self.sock.as_ref().unwrap().recv_buf_from(&mut self.buf) => {
match r {
Ok(_) => {
self.process_frame();
},
Err(e) => {
return Err(RadarError::Io(e));
}
}
},
};
self.buf.clear();
}
Ok(())
}

pub async fn run(mut self, subsys: SubsystemHandle) -> Result<(), io::Error> {
pub async fn run(mut self, subsys: SubsystemHandle) -> Result<(), RadarError> {
debug!("Started receive thread");
self.start_socket().await.unwrap();
loop {
if self.sock.is_some() {
self.socket_loop(&subsys).await.unwrap();
match self.socket_loop(&subsys).await {
Err(RadarError::Shutdown) => {
return Ok(());
}
_ => {
// Ignore, reopen socket
}
}
self.sock = None;
} else {
sleep(Duration::from_millis(1000)).await;
Expand Down
6 changes: 4 additions & 2 deletions src/navico/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,17 +244,19 @@ fn found(info: RadarInfo, radars: &Arc<RwLock<Radars>>, subsys: &SubsystemHandle
let command_sender = command::Command::new(info.clone(), navico_settings.clone());

// Clone everything moved into future twice or more
let data_name = info.key() + " data";
let report_name = info.key() + " reports";
let info_clone = info.clone();
let navico_settings_clone = navico_settings.clone();

let data_receiver = data::Receive::new(info, navico_settings);
let report_receiver =
report::Receive::new(info_clone, navico_settings_clone, command_sender);

subsys.start(SubsystemBuilder::new("Navico Data Receiver", move |s| {
subsys.start(SubsystemBuilder::new(data_name, move |s| {
data_receiver.run(s)
}));
subsys.start(SubsystemBuilder::new("Navico Report Receiver", |s| {
subsys.start(SubsystemBuilder::new(report_name, |s| {
report_receiver.run(s)
}));
}
Expand Down
19 changes: 13 additions & 6 deletions src/navico/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::net::UdpSocket;
use tokio::time::{sleep, sleep_until, Instant};
use tokio_graceful_shutdown::SubsystemHandle;

use crate::radar::{DopplerMode, RadarInfo};
use crate::radar::{DopplerMode, RadarError, RadarInfo};
use crate::util::{c_wide_string, create_multicast};

use super::command::{self, Command};
Expand Down Expand Up @@ -174,11 +174,12 @@ impl Receive {
}
}

async fn socket_loop(&mut self, subsys: &SubsystemHandle) -> io::Result<()> {
async fn socket_loop(&mut self, subsys: &SubsystemHandle) -> Result<(), RadarError> {
loop {
tokio::select! { biased;
_ = subsys.on_shutdown_requested() => {
break;
info!("{}: shutdown", self.key);
return Err(RadarError::Shutdown);
},
_ = sleep_until(self.subtype_timeout) => {
self.send_report_requests().await?;
Expand All @@ -190,7 +191,6 @@ impl Receive {
},
};
}
Ok(())
}

async fn send_report_requests(&mut self) -> Result<(), io::Error> {
Expand All @@ -203,11 +203,18 @@ impl Receive {
Ok(())
}

pub async fn run(mut self, subsys: SubsystemHandle) -> io::Result<()> {
pub async fn run(mut self, subsys: SubsystemHandle) -> Result<(), RadarError> {
self.start_socket().await?;
loop {
if self.sock.is_some() {
let _ = self.socket_loop(&subsys).await; // Ignore the error, re-open socket
match self.socket_loop(&subsys).await {
Err(RadarError::Shutdown) => {
return Ok(());
}
_ => {
// Ignore, reopen socket
}
}
self.sock = None;
} else {
sleep(Duration::from_millis(1000)).await;
Expand Down
18 changes: 18 additions & 0 deletions src/radar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,33 @@ use enum_primitive_derive::Primitive;
use log::info;
use serde::ser::{SerializeMap, Serializer};
use serde::Serialize;
use std::io;
use std::{
collections::HashMap,
fmt::{self, Display, Write},
net::{Ipv4Addr, SocketAddrV4},
sync::{Arc, RwLock},
};
use thiserror::Error;

use crate::locator::LocatorId;

#[derive(Error, Debug)]
pub enum RadarError {
#[error("Socket operation failed")]
Io(#[from] io::Error),
#[error("Interface '{0}' is not available")]
InterfaceNotFound(String),
#[error("Interface '{0}' has no valid IPv4 address")]
InterfaceNoV4(String),
#[error("Cannot detect Ethernet devices")]
EnumerationFailed,
#[error("Timeout")]
Timeout,
#[error("Shutdown")]
Shutdown,
}

#[derive(Serialize, Clone, Debug)]
enum PixelType {
History,
Expand Down

0 comments on commit 8b37be3

Please sign in to comment.