From 45c68d8bf959e151e3dfcaa2b4a261242bedc593 Mon Sep 17 00:00:00 2001 From: Christopher Serr Date: Sun, 24 Apr 2022 12:25:54 +0200 Subject: [PATCH] Introduce a Watchdog in the Auto Splitting Runtime While the auto splitting crate itself provided a way to interrupt the execution, it wasn't actually used by `livesplit-core`. This bumps the version of `wasmtime` which actually removes the support for interrupt handles, but provides an alternative system based on epochs. We now provide our own interrupt handle that uses the epochs internally. Additionally this handle is now actually used. There is now an additional watchdog thread that expects a signal to be sent to it after the time it takes for the next call to `update` to start (which is based on the tick rate) and an additional 5 seconds it is allowed to execute. If it doesn't stop executing after that time, the interrupt handle is triggered and the module gets unloaded. Additionally the handle is used in the `Drop` implementation of the runtime to ensure that it actually shuts down. --- Cargo.toml | 8 +- capi/src/auto_splitting_runtime.rs | 11 +- capi/src/lib.rs | 2 +- crates/livesplit-auto-splitting/Cargo.toml | 2 +- crates/livesplit-auto-splitting/src/lib.rs | 3 +- .../livesplit-auto-splitting/src/runtime.rs | 55 ++-- src/auto_splitting/mod.rs | 269 +++++++++++++----- src/rendering/component/splits.rs | 2 +- 8 files changed, 235 insertions(+), 117 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index eff7dffa..826ed435 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,11 @@ splits-io-api = { version = "0.2.0", optional = true } # Auto Splitting livesplit-auto-splitting = { path = "crates/livesplit-auto-splitting", version = "0.1.0", optional = true } -crossbeam-channel = { version = "0.5.1", default-features = false, features = ["std"], optional = true } +tokio = { version = "1.17.0", default-features = false, features = [ + "rt", + "sync", + "time", +], optional = true } log = { version = "0.4.14", default-features = false, optional = true } [target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies] @@ -156,7 +160,7 @@ wasm-web = [ "web-sys", ] networking = ["std", "splits-io-api"] -auto-splitting = ["std", "livesplit-auto-splitting", "crossbeam-channel", "log"] +auto-splitting = ["std", "livesplit-auto-splitting", "tokio", "log"] # FIXME: Some targets don't have atomics, but we can't test for this properly # yet. So there's a feature you explicitly have to opt into to deactivate the diff --git a/capi/src/auto_splitting_runtime.rs b/capi/src/auto_splitting_runtime.rs index 7b4c26c0..f74e11ab 100644 --- a/capi/src/auto_splitting_runtime.rs +++ b/capi/src/auto_splitting_runtime.rs @@ -3,8 +3,7 @@ use super::str; use crate::shared_timer::OwnedSharedTimer; -use std::os::raw::c_char; -use std::path::PathBuf; +use std::{os::raw::c_char, path::PathBuf}; #[cfg(feature = "auto-splitting")] use livesplit_core::auto_splitting::Runtime as AutoSplittingRuntime; @@ -23,11 +22,11 @@ impl AutoSplittingRuntime { Self } - pub fn unload_script(&self) -> Result<(), ()> { + pub fn unload_script_blocking(&self) -> Result<(), ()> { Err(()) } - pub fn load_script(&self, _: PathBuf) -> Result<(), ()> { + pub fn load_script_blocking(&self, _: PathBuf) -> Result<(), ()> { Err(()) } } @@ -53,7 +52,7 @@ pub unsafe extern "C" fn AutoSplittingRuntime_load_script( ) -> bool { let path = str(path); if !path.is_empty() { - this.load_script(PathBuf::from(path)).is_ok() + this.load_script_blocking(PathBuf::from(path)).is_ok() } else { false } @@ -62,7 +61,7 @@ pub unsafe extern "C" fn AutoSplittingRuntime_load_script( /// Attempts to unload the auto splitter. Returns true if successful. #[no_mangle] pub extern "C" fn AutoSplittingRuntime_unload_script(this: &AutoSplittingRuntime) -> bool { - this.unload_script().is_ok() + this.unload_script_blocking().is_ok() } /// drop diff --git a/capi/src/lib.rs b/capi/src/lib.rs index e3437728..2d699358 100644 --- a/capi/src/lib.rs +++ b/capi/src/lib.rs @@ -12,9 +12,9 @@ use std::{ }; pub mod analysis; -pub mod auto_splitting_runtime; pub mod atomic_date_time; pub mod attempt; +pub mod auto_splitting_runtime; pub mod blank_space_component; pub mod blank_space_component_state; pub mod component; diff --git a/crates/livesplit-auto-splitting/Cargo.toml b/crates/livesplit-auto-splitting/Cargo.toml index f8ca0774..51c7818a 100644 --- a/crates/livesplit-auto-splitting/Cargo.toml +++ b/crates/livesplit-auto-splitting/Cargo.toml @@ -13,7 +13,7 @@ slotmap = { version = "1.0.2", default-features = false } snafu = "0.7.0" sysinfo = { version = "0.23.0", default-features = false, features = ["multithread"] } time = { version = "0.3.3", default-features = false } -wasmtime = { version = "0.35.1", default-features = false, features = [ +wasmtime = { version = "0.36.0", default-features = false, features = [ "cranelift", "parallel-compilation", ] } diff --git a/crates/livesplit-auto-splitting/src/lib.rs b/crates/livesplit-auto-splitting/src/lib.rs index eed7e9d3..0f28ac94 100644 --- a/crates/livesplit-auto-splitting/src/lib.rs +++ b/crates/livesplit-auto-splitting/src/lib.rs @@ -123,6 +123,5 @@ mod process; mod runtime; mod timer; -pub use runtime::{CreationError, RunError, Runtime}; +pub use runtime::{CreationError, InterruptHandle, RunError, Runtime}; pub use timer::{Timer, TimerState}; -pub use wasmtime::InterruptHandle; diff --git a/crates/livesplit-auto-splitting/src/runtime.rs b/crates/livesplit-auto-splitting/src/runtime.rs index 6d3b84a7..e0247f36 100644 --- a/crates/livesplit-auto-splitting/src/runtime.rs +++ b/crates/livesplit-auto-splitting/src/runtime.rs @@ -1,11 +1,11 @@ -use crate::{process::Process, timer::Timer, InterruptHandle}; +use crate::{process::Process, timer::Timer}; use log::info; use slotmap::{Key, KeyData, SlotMap}; use snafu::{ResultExt, Snafu}; use std::{ path::Path, - result, str, thread, + result, str, time::{Duration, Instant}, }; use sysinfo::{ProcessRefreshKind, RefreshKind, System, SystemExt}; @@ -40,14 +40,14 @@ pub enum CreationError { /// The underlying error. source: anyhow::Error, }, - /// The WebAssembly module has no exported function named `update`, which is + /// The WebAssembly module has no exported function called `update`, which is /// a required function. MissingUpdateFunction { /// The underlying error. source: anyhow::Error, }, - /// The WebAssembly module has no exported function memory called `memory`, - /// which is a requirement. + /// The WebAssembly module has no exported memory called `memory`, which is + /// a requirement. MissingMemory, } @@ -74,6 +74,16 @@ pub struct Context { process_list: ProcessList, } +/// A threadsafe handle used to interrupt the execution of the script. +pub struct InterruptHandle(Engine); + +impl InterruptHandle { + /// Interrupts the execution. + pub fn interrupt(&self) { + self.0.increment_epoch(); + } +} + pub struct ProcessList { system: System, last_check: Instant, @@ -115,20 +125,22 @@ impl ProcessList { pub struct Runtime { store: Store>, update: TypedFunc<(), ()>, - prev_time: Instant, + engine: Engine, } impl Runtime { /// Creates a new runtime with the given path to the WebAssembly module and /// the timer that the module then controls. - pub fn new>(path: P, timer: T) -> Result { + pub fn new(path: &Path, timer: T) -> Result { let engine = Engine::new( Config::new() .cranelift_opt_level(OptLevel::Speed) - .interruptable(true), + .epoch_interruption(true), ) .context(EngineCreation)?; + let module = Module::from_file(&engine, path).context(ModuleLoading)?; + let mut store = Store::new( &engine, Context { @@ -140,7 +152,8 @@ impl Runtime { }, ); - let module = Module::from_file(&engine, path).context(ModuleLoading)?; + store.set_epoch_deadline(1); + let mut linker = Linker::new(&engine); bind_interface(&mut linker)?; let instance = linker @@ -158,9 +171,9 @@ impl Runtime { } Ok(Self { + engine, store, update, - prev_time: Instant::now(), }) } @@ -168,27 +181,15 @@ impl Runtime { /// execution of the WebAssembly module. A WebAssembly module may /// accidentally or maliciously loop forever, which is why this is needed. pub fn interrupt_handle(&self) -> InterruptHandle { - self.store - .interrupt_handle() - .expect("We configured the runtime to produce an interrupt handle.") + InterruptHandle(self.engine.clone()) } /// Runs the exported `update` function of the WebAssembly module a single - /// time. If the module has not been configured yet, this will also call the - /// optional `configure` function beforehand. - pub fn step(&mut self) -> Result<(), RunError> { - self.update.call(&mut self.store, ()).context(RunUpdate) - } - - /// Sleeps until the next tick based on the current tick rate. The auto + /// time and returns the duration to wait until the next execution. The auto /// splitter can change this tick rate. It is 120Hz by default. - pub fn sleep(&mut self) { - let target = self.store.data().tick_rate; - let delta = self.prev_time.elapsed(); - if delta < target { - thread::sleep(target - delta); - } - self.prev_time = Instant::now(); + pub fn step(&mut self) -> Result { + self.update.call(&mut self.store, ()).context(RunUpdate)?; + Ok(self.store.data().tick_rate) } } diff --git a/src/auto_splitting/mod.rs b/src/auto_splitting/mod.rs index a52ee6b8..39de5f37 100644 --- a/src/auto_splitting/mod.rs +++ b/src/auto_splitting/mod.rs @@ -110,16 +110,16 @@ //! ``` use crate::timing::{SharedTimer, TimerPhase}; -use crossbeam_channel::{bounded, unbounded, Sender}; use livesplit_auto_splitting::{ - CreationError, Runtime as ScriptRuntime, Timer as AutoSplitTimer, TimerState, + CreationError, InterruptHandle, Runtime as ScriptRuntime, Timer as AutoSplitTimer, TimerState, }; -use snafu::Snafu; -use std::{ - path::PathBuf, - thread::{self, JoinHandle}, +use snafu::{ErrorCompat, Snafu}; +use std::{fmt, path::PathBuf, thread, time::Duration}; +use tokio::{ + runtime, + sync::{mpsc, oneshot, watch}, + time::{timeout_at, Instant}, }; -use time::Duration; /// An error that the [`Runtime`] can return. #[derive(Debug, Snafu)] @@ -136,14 +136,15 @@ pub enum Error { /// An auto splitter runtime that allows using an auto splitter provided as a /// WebAssembly module to control a timer. pub struct Runtime { - sender: Sender, - join_handle: Option>>, + interrupt_receiver: watch::Receiver>, + sender: mpsc::UnboundedSender, } impl Drop for Runtime { fn drop(&mut self) { - self.sender.send(Request::End).ok(); - self.join_handle.take().unwrap().join().ok(); + if let Some(handle) = &*self.interrupt_receiver.borrow() { + handle.interrupt(); + } } } @@ -151,97 +152,93 @@ impl Runtime { /// Starts the runtime. Doesn't actually load an auto splitter until /// [`load_script`][Runtime::load_script] is called. pub fn new(timer: SharedTimer) -> Self { - let (sender, receiver) = unbounded(); - let join_handle = thread::spawn(move || -> Result<(), Error> { - 'back_to_not_having_a_runtime: loop { - let mut runtime = loop { - match receiver.recv().map_err(|_| Error::ThreadStopped)? { - Request::LoadScript(script, ret) => { - match ScriptRuntime::new(&script, Timer(timer.clone())) { - Ok(r) => { - ret.send(Ok(())).ok(); - break r; - } - Err(source) => ret.send(Err(Error::LoadFailed { source })).unwrap(), - }; - } - Request::UnloadScript(ret) => { - log::warn!(target: "Auto Splitter", "Attempted to unload already unloaded script"); - ret.send(()).ok(); - } - Request::End => return Ok(()), - }; - }; - log::info!(target: "Auto Splitter", "Loaded script"); - loop { - if let Ok(request) = receiver.try_recv() { - match request { - Request::LoadScript(script, ret) => { - match ScriptRuntime::new(&script, Timer(timer.clone())) { - Ok(r) => { - ret.send(Ok(())).ok(); - runtime = r; - log::info!(target: "Auto Splitter", "Reloaded script"); - } - Err(source) => { - ret.send(Err(Error::LoadFailed { source })).ok(); - log::info!(target: "Auto Splitter", "Failed to load"); - } - } - } - Request::UnloadScript(ret) => { - ret.send(()).ok(); - log::info!(target: "Auto Splitter", "Unloaded script"); - continue 'back_to_not_having_a_runtime; - } - Request::End => return Ok(()), - } - } - if let Err(e) = runtime.step() { - log::error!(target: "Auto Splitter", "Unloaded due to failure: {e}"); - continue 'back_to_not_having_a_runtime; - }; - runtime.sleep(); + let (sender, receiver) = mpsc::unbounded_channel(); + let (interrupt_sender, interrupt_receiver) = watch::channel(None); + let (timeout_sender, timeout_receiver) = watch::channel(None); + + thread::Builder::new() + .name("Auto Splitting Runtime".into()) + .spawn(move || { + runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap() + .block_on(run(receiver, timer, timeout_sender, interrupt_sender)) + }) + .unwrap(); + + thread::Builder::new() + .name("Auto Splitting Watchdog".into()) + .spawn({ + let interrupt_receiver = interrupt_receiver.clone(); + move || { + runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap() + .block_on(watchdog(timeout_receiver, interrupt_receiver)) } - } - }); + }) + .unwrap(); Self { + interrupt_receiver, sender, - join_handle: Some(join_handle), } } /// Attempts to load a wasm file containing an auto splitter module. This /// call will block until the auto splitter has either loaded successfully /// or failed. - pub fn load_script(&self, script: PathBuf) -> Result<(), Error> { - // FIXME: replace with `futures::channel::oneshot` - let (sender, receiver) = bounded(1); + pub async fn load_script(&self, script: PathBuf) -> Result<(), Error> { + let (sender, receiver) = oneshot::channel(); self.sender .send(Request::LoadScript(script, sender)) .map_err(|_| Error::ThreadStopped)?; - receiver.recv().map_err(|_| Error::ThreadStopped)??; + + receiver.await.map_err(|_| Error::ThreadStopped)??; + Ok(()) } + /// Attempts to load a wasm file containing an auto splitter module. This + /// call will block until the auto splitter has either loaded successfully + /// or failed. + pub fn load_script_blocking(&self, script: PathBuf) -> Result<(), Error> { + runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap() + .block_on(self.load_script(script)) + } + /// Unloads the current auto splitter. This will _not_ return an error if /// there isn't currently an auto splitter loaded, only if the runtime /// thread stops unexpectedly. - pub fn unload_script(&self) -> Result<(), Error> { - // FIXME: replace with `futures::channel::oneshot` - let (sender, receiver) = bounded(1); + pub async fn unload_script(&self) -> Result<(), Error> { + let (sender, receiver) = oneshot::channel(); self.sender .send(Request::UnloadScript(sender)) .map_err(|_| Error::ThreadStopped)?; - receiver.recv().map_err(|_| Error::ThreadStopped) + + receiver.await.map_err(|_| Error::ThreadStopped) + } + + /// Unloads the current auto splitter. This will _not_ return an error if + /// there isn't currently an auto splitter loaded, only if the runtime + /// thread stops unexpectedly. + pub fn unload_script_blocking(&self) -> Result<(), Error> { + runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap() + .block_on(self.unload_script()) } } enum Request { - LoadScript(PathBuf, Sender>), - UnloadScript(Sender<()>), - End, + LoadScript(PathBuf, oneshot::Sender>), + UnloadScript(oneshot::Sender<()>), } // This newtype is required because SharedTimer is an Arc>, so we @@ -270,7 +267,7 @@ impl AutoSplitTimer for Timer { self.0.write().reset(true) } - fn set_game_time(&mut self, time: Duration) { + fn set_game_time(&mut self, time: time::Duration) { self.0.write().set_game_time(time.into()); } @@ -286,3 +283,121 @@ impl AutoSplitTimer for Timer { self.0.write().set_custom_variable(name, value) } } + +async fn run( + mut receiver: mpsc::UnboundedReceiver, + timer: SharedTimer, + timeout_sender: watch::Sender>, + interrupt_sender: watch::Sender>, +) { + 'back_to_not_having_a_runtime: loop { + interrupt_sender.send(None).ok(); + timeout_sender.send(None).ok(); + + let mut runtime = loop { + match receiver.recv().await { + Some(Request::LoadScript(script, ret)) => { + match ScriptRuntime::new(&script, Timer(timer.clone())) { + Ok(r) => { + ret.send(Ok(())).ok(); + break r; + } + Err(source) => { + ret.send(Err(Error::LoadFailed { source })).ok(); + } + }; + } + Some(Request::UnloadScript(ret)) => { + log::warn!(target: "Auto Splitter", "Attempted to unload already unloaded script"); + ret.send(()).ok(); + } + None => { + return; + } + }; + }; + + log::info!(target: "Auto Splitter", "Loaded script"); + let mut next_step = Instant::now(); + interrupt_sender.send(Some(runtime.interrupt_handle())).ok(); + timeout_sender.send(Some(next_step)).ok(); + + loop { + match timeout_at(next_step, receiver.recv()).await { + Ok(Some(request)) => match request { + Request::LoadScript(script, ret) => { + match ScriptRuntime::new(&script, Timer(timer.clone())) { + Ok(r) => { + ret.send(Ok(())).ok(); + runtime = r; + log::info!(target: "Auto Splitter", "Reloaded script"); + } + Err(source) => { + ret.send(Err(Error::LoadFailed { source })).ok(); + log::info!(target: "Auto Splitter", "Failed to load"); + } + } + } + Request::UnloadScript(ret) => { + ret.send(()).ok(); + log::info!(target: "Auto Splitter", "Unloaded script"); + continue 'back_to_not_having_a_runtime; + } + }, + Ok(None) => return, + Err(_) => match runtime.step() { + Ok(tick_rate) => { + next_step += tick_rate; + timeout_sender.send(Some(next_step)).ok(); + } + Err(e) => { + log::error!(target: "Auto Splitter", "Unloaded due to failure: {}", PrintChain(e)); + continue 'back_to_not_having_a_runtime; + } + }, + } + } + } +} + +async fn watchdog( + mut timeout_receiver: watch::Receiver>, + interrupt_receiver: watch::Receiver>, +) { + const TIMEOUT: Duration = Duration::from_secs(5); + + loop { + let instant = *timeout_receiver.borrow(); + match instant { + Some(time) => match timeout_at(time + TIMEOUT, timeout_receiver.changed()).await { + Ok(Ok(_)) => {} + Ok(Err(_)) => return, + Err(_) => { + if let Some(handle) = &*interrupt_receiver.borrow() { + handle.interrupt(); + } + } + }, + None => { + if timeout_receiver.changed().await.is_err() { + return; + } + } + } + } +} + +struct PrintChain(E); + +impl fmt::Display for PrintChain { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut chain = self.0.iter_chain(); + if let Some(next) = chain.next() { + write!(f, "{}", next)?; + } + for next in chain { + write!(f, " {}", next)?; + } + Ok(()) + } +} diff --git a/src/rendering/component/splits.rs b/src/rendering/component/splits.rs index 5f10d17a..bfdcf0f3 100644 --- a/src/rendering/component/splits.rs +++ b/src/rendering/component/splits.rs @@ -1,6 +1,7 @@ use crate::{ component::splits::State, layout::{LayoutDirection, LayoutState}, + platform::prelude::*, rendering::{ consts::{ vertical_padding, BOTH_PADDINGS, DEFAULT_COMPONENT_HEIGHT, DEFAULT_TEXT_SIZE, PADDING, @@ -13,7 +14,6 @@ use crate::{ solid, RenderContext, }, settings::{Gradient, ListGradient}, - platform::prelude::*, }; pub const COLUMN_WIDTH: f32 = 2.75;