Skip to content

Commit

Permalink
add new watchdog
Browse files Browse the repository at this point in the history
  • Loading branch information
and-rewsmith committed Jun 25, 2020
1 parent f1b3401 commit bfc781f
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 69 deletions.
146 changes: 146 additions & 0 deletions edge-hub/watchdog/linux/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion edge-hub/watchdog/linux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ edition = "2018"
nix = "0.17.0"
signal-hook = "0.1.15"
tracing = "0.1"
tracing-subscriber = "0.1"
tracing-subscriber = "0.1"
futures = "0.3"
171 changes: 103 additions & 68 deletions edge-hub/watchdog/linux/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,96 +1,131 @@
extern crate nix;

use std::{error::Error, thread};
use std::process::{Command, Stdio, Child};
use std::sync::{Arc};
use futures::executor::block_on;
use futures::join;
use nix::sys::signal::{self, Signal};
use nix::unistd::Pid;
use signal_hook::{iterator::Signals, SIGINT, SIGTERM};
use std::io::Error;
use std::process::{exit, Child, Command, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use nix::unistd::Pid;
use nix::sys::signal::{self, Signal};
use signal_hook::{iterator::Signals, SIGTERM};
use tracing::{info};
use tracing_subscriber;
use tracing::{error, info, subscriber, Level};
use tracing_subscriber::fmt::Subscriber;

fn main() -> Result<(), Box<dyn Error>> {
fn main() -> Result<(), Error> {
block_on(async_main())
}

async fn async_main() -> Result<(), Error> {
init_logging();
info!("Starting watchdog");

let signals = Signals::new(&[SIGTERM])?;
let has_received_sigterm = Arc::new(AtomicBool::new(true));
let sigterm_listener = has_received_sigterm.clone();
thread::spawn(move || {
for _sig in signals.forever() {
info!("Received SIGTERM for watchdog");
sigterm_listener.store(false, Ordering::Relaxed);
let should_shutdown = match register_sigterm_listener() {
Ok(should_shutdown) => should_shutdown,
Err(e) => {
error!("Failed to register sigterm listener. Shutting down.");
return Err(e);
}
});
};

// start edgehub and blow up if can't start
let mut edgehub = Command::new("dotnet")
.arg("/app/Microsoft.Azure.Devices.Edge.Hub.Service.dll")
.stdout(Stdio::inherit())
.spawn()
.expect("failed to execute Edge Hub process");

let mut broker = Command::new("/usr/local/bin/mqttd")
.stdout(Stdio::inherit())
.spawn()
.expect("failed to execute MQTT broker process");

.arg("/app/Microsoft.Azure.Devices.Edge.Hub.Service.dll")
.stdout(Stdio::inherit())
.spawn()
.expect("Failed to start Edge Hub process");
info!("Launched Edge Hub process with pid {:?}", edgehub.id());
info!("Launched MQTT Broker process with pid {:?}", broker.id());
let mut is_edgehub_running = is_child_process_running(&mut edgehub);
let mut is_broker_running = is_child_process_running(&mut broker);
while has_received_sigterm.load(Ordering::Relaxed) && is_edgehub_running && is_broker_running {
is_edgehub_running = is_child_process_running(&mut edgehub);
is_broker_running = is_child_process_running(&mut broker);

thread::sleep(Duration::from_secs(4));

// unwrap broker if started, else shutdown edgehub and exit
let maybe_broker = match Command::new("/usr/local/bin/mqttd")
.stdout(Stdio::inherit())
.spawn()
{
Ok(child) => Some(child),
Err(e) => {
error!("Failed to start MQTT Broker process. {:?}", e);
None
}
};
let mut broker = maybe_broker.unwrap_or_else(|| {
info!("Broker process not started, so shutting down EdgeHub and exiting");
let shutdown = shutdown(&mut edgehub);
block_on(shutdown);
exit(1);
});
info!("Launched MQTT Broker process");

while !should_shutdown.load(Ordering::Relaxed) {
if !is_running(&mut edgehub) {
break;
}

if !is_running(&mut broker) {
break;
}

thread::sleep(Duration::from_secs(1));
}

shutdown(&mut edgehub, &mut broker);
let shutdown_edgehub = shutdown(&mut edgehub);
let shutdown_broker = shutdown(&mut broker);
join!(shutdown_edgehub, shutdown_broker);

info!("Exiting");
info!("Stopped");
Ok(())
}

fn init_logging() {
let subscriber = tracing_subscriber::fmt::Subscriber::builder()
.with_max_level(tracing::Level::INFO)
.finish();
let _ = tracing::subscriber::set_global_default(subscriber);

info!("Starting watchdog");
let subscriber = Subscriber::builder().with_max_level(Level::INFO).finish();
let _ = subscriber::set_global_default(subscriber);
}

fn is_child_process_running(child_process: &mut Child) -> bool
{
return !child_process.try_wait().unwrap().is_some();
}
fn register_sigterm_listener() -> Result<Arc<AtomicBool>, Error> {
let signals = Signals::new(&[SIGTERM, SIGINT])?;
let should_shutdown = Arc::new(AtomicBool::new(false));
let sigterm_listener = should_shutdown.clone();
thread::spawn(move || {
for _sig in signals.forever() {
info!("Received shutdown signal for watchdog");
sigterm_listener.store(true, Ordering::Relaxed);
}
});

fn shutdown(mut edgehub: &mut Child, mut broker: &mut Child) {
info!("Initiating shutdown of MQTT Broker and Edge Hub");
Ok(should_shutdown)
}

let mut is_edgehub_running = is_child_process_running(&mut edgehub);
let mut is_broker_running = is_child_process_running(&mut broker);
if is_edgehub_running {
info!("Sending SIGTERM to Edge Hub");
signal::kill(Pid::from_raw(edgehub.id() as i32), Signal::SIGTERM).unwrap();
fn is_running(child_process: &mut Child) -> bool {
match child_process.try_wait() {
Ok(status) => status.is_none(),
Err(e) => {
error!("Error while polling child process. {:?}", e);
false
}
}
if is_broker_running {
info!("Sending SIGTERM to MQTT Broker");
signal::kill(Pid::from_raw(broker.id() as i32), Signal::SIGTERM).unwrap();
}

async fn shutdown(mut child: &mut Child) {
if is_running(&mut child) {
info!("Terminating child process");
terminate(&mut child);
}

thread::sleep(Duration::from_secs(10));
thread::sleep(Duration::from_secs(60));

is_edgehub_running = is_child_process_running(&mut edgehub);
is_broker_running = is_child_process_running(&mut broker);
if is_edgehub_running {
info!("Killing Edge Hub");
edgehub.kill().unwrap();
if is_running(&mut child) {
info!("Killing child process");
kill(child);
}
if !is_broker_running {
info!("Killing MQTT Broker");
broker.kill().unwrap();
}

fn terminate(child: &mut Child) {
if let Err(e) = signal::kill(Pid::from_raw(child.id() as i32), Signal::SIGTERM) {
error!("Failed to send SIGTERM signal to child process. {:?}", e);
}
}

fn kill(child: &mut Child) {
if let Err(e) = child.kill() {
error!("Failed to kill child process. {:?}", e);
}
}

0 comments on commit bfc781f

Please sign in to comment.