Skip to content

Commit

Permalink
Merge pull request #2145 from oasisprotocol/kostko/feature/rofl-watchdog
Browse files Browse the repository at this point in the history
runtime-sdk/modules/rofl: Add watchdog task
  • Loading branch information
kostko authored Feb 5, 2025
2 parents 84a1be4 + 81d993a commit 5053110
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rofl-containers/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rofl-containers"
version = "0.3.3"
version = "0.3.5"
edition = "2021"

[dependencies]
Expand Down
1 change: 1 addition & 0 deletions runtime-sdk/src/modules/rofl/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod notifier;
pub mod prelude;
mod processor;
mod registration;
mod watchdog;

pub use crate::modules::rofl::app_id::AppId;
pub use client::Client;
Expand Down
8 changes: 7 additions & 1 deletion runtime-sdk/src/modules/rofl/app/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
};
use rand::rngs::OsRng;

use super::{notifier, registration, App, Environment};
use super::{notifier, registration, watchdog, App, Environment};

/// Size of the processor command queue.
const CMDQ_BACKLOG: usize = 32;
Expand All @@ -31,6 +31,8 @@ pub(super) enum Command {
GetLatestRound(oneshot::Sender<u64>),
/// Notification that initial registration has been completed.
InitialRegistrationCompleted,
/// Registration refreshed.
RegistrationRefreshed,
}

/// Processor state.
Expand All @@ -45,6 +47,7 @@ pub(super) struct State<A: App> {
struct Tasks<A: App> {
registration: registration::Task<A>,
notifier: notifier::Task<A>,
watchdog: watchdog::Task,
}

/// Processor.
Expand Down Expand Up @@ -89,6 +92,7 @@ where
tasks: Tasks {
registration: registration::Task::new(state.clone(), env.clone()),
notifier: notifier::Task::new(state.clone(), env.clone()),
watchdog: watchdog::Task::new(),
},
state,
env,
Expand Down Expand Up @@ -125,6 +129,7 @@ where
// Start the tasks.
self.tasks.registration.start();
self.tasks.notifier.start();
self.tasks.watchdog.start();

slog::info!(self.logger, "entering processor loop");
while let Some(cmd) = self.cmdq.recv().await {
Expand All @@ -146,6 +151,7 @@ where
Command::InitialRegistrationCompleted => {
self.cmd_initial_registration_completed().await
}
Command::RegistrationRefreshed => self.tasks.watchdog.keep_alive().await,
}
}

Expand Down
5 changes: 5 additions & 0 deletions runtime-sdk/src/modules/rofl/app/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ where
}
self.last_registration_epoch = Some(epoch);

// Notify about registration refresh.
self.env
.send_command(processor::Command::RegistrationRefreshed)
.await?;

Ok(())
}
}
77 changes: 77 additions & 0 deletions runtime-sdk/src/modules/rofl/app/watchdog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use anyhow::Result;
use tokio::{sync::mpsc, time};

use crate::core::common::{logger::get_logger, process};

/// Interval in which at least one keep-alive must be delivered to avoid the watchdog from
/// terminating the application.
const WATCHDOG_TRIGGER_INTERVAL: u64 = 6 * 3600; // 6 hours

/// Application watchdog task.
pub(super) struct Task {
imp: Option<Impl>,
tx: mpsc::Sender<()>,
}

impl Task {
/// Create an application watchdog task.
pub(super) fn new() -> Self {
let (tx, rx) = mpsc::channel(16);

let imp = Impl {
logger: get_logger("modules/rofl/app/watchdog"),
notify: rx,
};

Self { imp: Some(imp), tx }
}

/// Start the application watchdog task.
pub(super) fn start(&mut self) {
if let Some(imp) = self.imp.take() {
imp.start();
}
}

/// Notify the watchdog that we are still alive.
pub(super) async fn keep_alive(&self) -> Result<()> {
self.tx.send(()).await?;
Ok(())
}
}

struct Impl {
logger: slog::Logger,

notify: mpsc::Receiver<()>,
}

impl Impl {
/// Start the application watchdog task.
pub(super) fn start(self) {
tokio::task::spawn(self.run());
}

/// Run the application watchdog task.
async fn run(mut self) {
slog::info!(self.logger, "starting watchdog task");

loop {
tokio::select! {
Some(()) = self.notify.recv() => {
// Keep-alive received, reset watchdog.
},

_ = time::sleep(time::Duration::from_secs(WATCHDOG_TRIGGER_INTERVAL)) => {
// Watchdog triggered, kill the process.
slog::error!(self.logger, "keep-alive not received, terminating application");
process::abort();
},

else => break,
}
}

slog::info!(self.logger, "watchdog task stopped");
}
}

0 comments on commit 5053110

Please sign in to comment.