Skip to content

Commit

Permalink
Allow async yield from epoch interruption callback
Browse files Browse the repository at this point in the history
When an epoch interruption deadline arrives, previously it was possible
to yield to the async executor, or to invoke a callback on the wasm
stack, but not both. This changes the API to allow callbacks to run and
then request yielding to the async executor.
  • Loading branch information
jameysharp committed Jun 2, 2023
1 parent 112e52d commit 00b1c74
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 50 deletions.
4 changes: 4 additions & 0 deletions RELEASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ Unreleased.

### Changed

* An `epoch_deadline_callback` now returns an `UpdateDeadline` enum to allow
optionally yielding to the async executor after the callback runs.
[#6464](https://github.com/bytecodealliance/wasmtime/pull/6464)

* The "raw" representation of `funcref` and `externref` in the embedding API has
been updated from a `usize` to a `*mut u8` to be compatible with Rust's
proposed strict provenance rules. This change is additionally reflected into
Expand Down
4 changes: 3 additions & 1 deletion crates/wasmtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,9 @@ pub use crate::profiling::GuestProfiler;
pub use crate::r#ref::ExternRef;
#[cfg(feature = "async")]
pub use crate::store::CallHookHandler;
pub use crate::store::{AsContext, AsContextMut, CallHook, Store, StoreContext, StoreContextMut};
pub use crate::store::{
AsContext, AsContextMut, CallHook, Store, StoreContext, StoreContextMut, UpdateDeadline,
};
pub use crate::trap::*;
pub use crate::types::*;
pub use crate::values::*;
Expand Down
96 changes: 52 additions & 44 deletions crates/wasmtime/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ pub struct StoreInner<T> {

limiter: Option<ResourceLimiterInner<T>>,
call_hook: Option<CallHookInner<T>>,
epoch_deadline_behavior: EpochDeadline<T>,
epoch_deadline_behavior:
Option<Box<dyn FnMut(StoreContextMut<T>) -> Result<UpdateDeadline> + Send + Sync>>,
// for comments about `ManuallyDrop`, see `Store::into_data`
data: ManuallyDrop<T>,
}
Expand All @@ -230,6 +231,18 @@ enum CallHookInner<T> {
Async(Box<dyn CallHookHandler<T> + Send + Sync>),
}

/// What to do after returning from a callback when the engine epoch reaches
/// the deadline for a Store during execution of a function using that store.
pub enum UpdateDeadline {
/// Extend the deadline by the specified number of ticks.
Continue(u64),
/// Extend the deadline by the specified number of ticks after yielding to
/// the async executor loop. This can only be used with an async [`Store`]
/// configured via [`Config::async_support`](crate::Config::async_support).
#[cfg(feature = "async")]
Yield(u64),
}

// Forward methods on `StoreOpaque` to also being on `StoreInner<T>`
impl<T> Deref for StoreInner<T> {
type Target = StoreOpaque;
Expand Down Expand Up @@ -422,21 +435,6 @@ enum OutOfGas {
},
}

/// What to do when the engine epoch reaches the deadline for a Store
/// during execution of a function using that store.
#[derive(Default)]
enum EpochDeadline<T> {
/// Return early with a trap.
#[default]
Trap,
/// Call a custom deadline handler.
Callback(Box<dyn FnMut(StoreContextMut<T>) -> Result<u64> + Send + Sync>),
/// Extend the deadline by the specified number of ticks after
/// yielding to the async executor loop.
#[cfg(feature = "async")]
YieldAndExtendDeadline { delta: u64 },
}

impl<T> Store<T> {
/// Creates a new [`Store`] to be associated with the given [`Engine`] and
/// `data` provided.
Expand Down Expand Up @@ -480,7 +478,7 @@ impl<T> Store<T> {
},
limiter: None,
call_hook: None,
epoch_deadline_behavior: EpochDeadline::Trap,
epoch_deadline_behavior: None,
data: ManuallyDrop::new(data),
});

Expand Down Expand Up @@ -928,10 +926,17 @@ impl<T> Store<T> {
/// store and the epoch deadline is reached before completion, the
/// provided callback function is invoked.
///
/// This function should return a positive `delta`, which is used to
/// update the new epoch, setting it to the current epoch plus
/// `delta` ticks. Alternatively, the callback may return an error,
/// which will terminate execution.
/// This callback should either return an [`UpdateDeadline`], or
/// return an error, which will terminate execution with a trap.
///
/// The [`UpdateDeadline`] is a positive number of ticks to
/// add to the epoch deadline, as well as indicating what
/// to do after the callback returns. If the [`Store`] is
/// configured with async support, then the callback may return
/// [`UpdateDeadline::Yield`] to yield to the async executor before
/// updating the epoch deadline. Alternatively, the callback may
/// return [`UpdateDeadline::Continue`] to update the epoch deadline
/// immediately.
///
/// This setting is intended to allow for coarse-grained
/// interruption, but not a deterministic deadline of a fixed,
Expand All @@ -943,7 +948,7 @@ impl<T> Store<T> {
/// for an introduction to epoch-based interruption.
pub fn epoch_deadline_callback(
&mut self,
callback: impl FnMut(StoreContextMut<T>) -> Result<u64> + Send + Sync + 'static,
callback: impl FnMut(StoreContextMut<T>) -> Result<UpdateDeadline> + Send + Sync + 'static,
) {
self.inner.epoch_deadline_callback(Box::new(callback));
}
Expand Down Expand Up @@ -1945,29 +1950,31 @@ unsafe impl<T> wasmtime_runtime::Store for StoreInner<T> {
fn new_epoch(&mut self) -> Result<u64, anyhow::Error> {
// Temporarily take the configured behavior to avoid mutably borrowing
// multiple times.
let mut behavior = std::mem::take(&mut self.epoch_deadline_behavior);
let mut behavior = self.epoch_deadline_behavior.take();
let delta_result = match &mut behavior {
EpochDeadline::Trap => Err(Trap::Interrupt.into()),
EpochDeadline::Callback(callback) => {
let delta = callback((&mut *self).as_context_mut())?;
None => Err(Trap::Interrupt.into()),
Some(callback) => callback((&mut *self).as_context_mut()).and_then(|update| {
let delta = match update {
UpdateDeadline::Continue(delta) => delta,

#[cfg(feature = "async")]
UpdateDeadline::Yield(delta) => {
assert!(
self.async_support(),
"cannot use `UpdateDeadline::Yield` without enabling async support in the config"
);
// Do the async yield. May return a trap if future was
// canceled while we're yielded.
self.async_yield_impl()?;
delta
}
};

// Set a new deadline and return the new epoch deadline so
// the Wasm code doesn't have to reload it.
self.set_epoch_deadline(delta);
Ok(self.get_epoch_deadline())
}
#[cfg(feature = "async")]
EpochDeadline::YieldAndExtendDeadline { delta } => {
let delta = *delta;
// Do the async yield. May return a trap if future was
// canceled while we're yielded.
self.async_yield_impl()?;
// Set a new deadline.
self.set_epoch_deadline(delta);

// Return the new epoch deadline so the Wasm code
// doesn't have to reload it.
Ok(self.get_epoch_deadline())
}
})
};

// Put back the original behavior which was replaced by `take`.
Expand All @@ -1992,14 +1999,14 @@ impl<T> StoreInner<T> {
}

fn epoch_deadline_trap(&mut self) {
self.epoch_deadline_behavior = EpochDeadline::Trap;
self.epoch_deadline_behavior = None;
}

fn epoch_deadline_callback(
&mut self,
callback: Box<dyn FnMut(StoreContextMut<T>) -> Result<u64> + Send + Sync>,
callback: Box<dyn FnMut(StoreContextMut<T>) -> Result<UpdateDeadline> + Send + Sync>,
) {
self.epoch_deadline_behavior = EpochDeadline::Callback(callback);
self.epoch_deadline_behavior = Some(callback);
}

fn epoch_deadline_async_yield_and_update(&mut self, delta: u64) {
Expand All @@ -2009,7 +2016,8 @@ impl<T> StoreInner<T> {
);
#[cfg(feature = "async")]
{
self.epoch_deadline_behavior = EpochDeadline::YieldAndExtendDeadline { delta };
self.epoch_deadline_behavior =
Some(Box::new(move |_store| Ok(UpdateDeadline::Yield(delta))));
}
let _ = delta; // suppress warning in non-async build
}
Expand Down
6 changes: 3 additions & 3 deletions src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::thread;
use std::time::Duration;
use wasmtime::{
AsContextMut, Engine, Func, GuestProfiler, Linker, Module, Store, StoreLimits,
StoreLimitsBuilder, Val, ValType,
StoreLimitsBuilder, UpdateDeadline, Val, ValType,
};
use wasmtime_cli_flags::{CommonOptions, WasiModules};
use wasmtime_wasi::maybe_exit_on_error;
Expand Down Expand Up @@ -467,12 +467,12 @@ impl RunCommand {
if timeout == 0 {
bail!("timeout exceeded");
}
Ok(1)
Ok(UpdateDeadline::Continue(1))
});
} else {
store.epoch_deadline_callback(move |mut store| {
sample(&mut store);
Ok(1)
Ok(UpdateDeadline::Continue(1))
});
}

Expand Down
29 changes: 27 additions & 2 deletions tests/all/epoch_interruption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn make_env<T>(engine: &Engine) -> Linker<T> {

enum InterruptMode {
Trap,
Callback(fn(StoreContextMut<usize>) -> Result<u64>),
Callback(fn(StoreContextMut<usize>) -> Result<UpdateDeadline>),
Yield(u64),
}

Expand Down Expand Up @@ -339,7 +339,32 @@ async fn epoch_callback_continue() {
InterruptMode::Callback(|mut cx| {
let s = cx.data_mut();
*s += 1;
Ok(1)
Ok(UpdateDeadline::Continue(1))
}),
|_| {},
)
.await
);
}

#[tokio::test]
async fn epoch_callback_yield() {
assert_eq!(
Some((1, 1)),
run_and_count_yields_or_trap(
"
(module
(import \"\" \"bump_epoch\" (func $bump))
(func (export \"run\")
call $bump ;; bump epoch
call $subfunc) ;; call func; will notice new epoch and yield
(func $subfunc))
",
1,
InterruptMode::Callback(|mut cx| {
let s = cx.data_mut();
*s += 1;
Ok(UpdateDeadline::Yield(1))
}),
|_| {},
)
Expand Down

0 comments on commit 00b1c74

Please sign in to comment.