Skip to content

Commit

Permalink
Implement creating fibers in the instance allocator.
Browse files Browse the repository at this point in the history
This commit implements creating fibers in the instance allocator.

The allocator will return "raw" (platform-dependent) fibers that `Store` can
use to run async code on.

The future pooling allocator will use these changes to create fibers from
preallocated regions of memory on Linux and macOS.

On Windows, native fiber implementations are always used.
  • Loading branch information
peterhuene committed Feb 4, 2021
1 parent e7e98cb commit 0f7431c
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 44 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 23 additions & 4 deletions crates/fiber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ mod unix;
#[cfg(unix)]
use unix as imp;

pub use imp::RawFiber;

pub struct Fiber<'a, Resume, Yield, Return> {
inner: imp::Fiber,
inner: RawFiber,
done: Cell<bool>,
_phantom: PhantomData<&'a (Resume, Yield, Return)>,
}
Expand All @@ -37,18 +39,35 @@ impl<'a, Resume, Yield, Return> Fiber<'a, Resume, Yield, Return> {
/// Creates a new fiber which will execute `func` on a new native stack of
/// size `stack_size`.
///
/// A stack size of `0` will use the default stack size for the current platform.
///
/// This function returns a `Fiber` which, when resumed, will execute `func`
/// to completion. When desired the `func` can suspend itself via
/// `Fiber::suspend`.
pub fn new(
stack_size: usize,
func: impl FnOnce(Resume, &Suspend<Resume, Yield, Return>) -> Return + 'a,
) -> io::Result<Fiber<'a, Resume, Yield, Return>> {
Ok(Fiber {
inner: imp::Fiber::new(stack_size, func)?,
let fiber = RawFiber::new(stack_size)?;
Ok(Self::new_from_raw(fiber, func))
}

/// Creates a new fiber which will execute `func` on the given raw fiber.
///
/// This function returns a `Fiber` which, when resumed, will execute `func`
/// to completion. When desired the `func` can suspend itself via
/// `Fiber::suspend`.
pub fn new_from_raw(
fiber: RawFiber,
func: impl FnOnce(Resume, &Suspend<Resume, Yield, Return>) -> Return + 'a,
) -> Self {
fiber.init(func);

Self {
inner: fiber,
done: Cell::new(false),
_phantom: PhantomData,
})
}
}

/// Resumes execution of this fiber.
Expand Down
59 changes: 34 additions & 25 deletions crates/fiber/src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ use std::cell::Cell;
use std::io;
use std::ptr;

pub struct Fiber {
// Description of the mmap region we own. This should be abstracted
// eventually so we aren't personally mmap-ing this region.
mmap: *mut libc::c_void,
mmap_len: usize,
pub struct RawFiber {
// The top of the stack; for stacks allocated by the fiber implementation itself,
// the base address of the allocation will be `top.sub(alloc_len.unwrap())`
top_of_stack: *mut u8,
alloc_len: Option<usize>,
}

pub struct Suspend {
Expand All @@ -65,28 +65,37 @@ where
}
}

impl Fiber {
pub fn new<F, A, B, C>(stack_size: usize, func: F) -> io::Result<Fiber>
impl RawFiber {
pub fn new(stack_size: usize) -> io::Result<Self> {
Self::alloc_with_stack(stack_size)
}

pub fn from_stack(top: *mut u8) -> Self {
Self {
top_of_stack: top,
alloc_len: None,
}
}

pub(crate) fn init<F, A, B, C>(&self, func: F)
where
F: FnOnce(A, &super::Suspend<A, B, C>) -> C,
{
let fiber = Fiber::alloc_with_stack(stack_size)?;
unsafe {
// Initialize the top of the stack to be resumed from
let top_of_stack = fiber.top_of_stack();
let data = Box::into_raw(Box::new(func)).cast();
wasmtime_fiber_init(top_of_stack, fiber_start::<F, A, B, C>, data);
Ok(fiber)
wasmtime_fiber_init(self.top_of_stack, fiber_start::<F, A, B, C>, data);
}
}

fn alloc_with_stack(stack_size: usize) -> io::Result<Fiber> {
fn alloc_with_stack(stack_size: usize) -> io::Result<Self> {
unsafe {
// Round up our stack size request to the nearest multiple of the
// page size.
let page_size = libc::sysconf(libc::_SC_PAGESIZE) as usize;
let stack_size = if stack_size == 0 {
page_size
// Default to an 8 MB stack size (common default for 64-bit Linux and macOS)
// TODO: respect stack rlimit
8 * 1024 * 1024
} else {
(stack_size + (page_size - 1)) & (!(page_size - 1))
};
Expand All @@ -104,7 +113,10 @@ impl Fiber {
if mmap == libc::MAP_FAILED {
return Err(io::Error::last_os_error());
}
let ret = Fiber { mmap, mmap_len };
let ret = Self {
top_of_stack: mmap.cast::<u8>().add(mmap_len),
alloc_len: Some(mmap_len),
};
let res = libc::mprotect(
mmap.cast::<u8>().add(page_size).cast(),
stack_size,
Expand All @@ -124,27 +136,24 @@ impl Fiber {
// stack, otherwise known as our reserved slot for this information.
//
// In the diagram above this is updating address 0xAff8
let top_of_stack = self.top_of_stack();
let addr = top_of_stack.cast::<usize>().offset(-1);
let addr = self.top_of_stack.cast::<usize>().offset(-1);
addr.write(result as *const _ as usize);

wasmtime_fiber_switch(top_of_stack);
wasmtime_fiber_switch(self.top_of_stack);

// null this out to help catch use-after-free
addr.write(0);
}
}

unsafe fn top_of_stack(&self) -> *mut u8 {
self.mmap.cast::<u8>().add(self.mmap_len)
}
}

impl Drop for Fiber {
impl Drop for RawFiber {
fn drop(&mut self) {
unsafe {
let ret = libc::munmap(self.mmap, self.mmap_len);
debug_assert!(ret == 0);
if let Some(alloc_len) = self.alloc_len {
let ret = libc::munmap(self.top_of_stack.sub(alloc_len) as _, alloc_len);
debug_assert!(ret == 0);
}
}
}
}
Expand Down
25 changes: 16 additions & 9 deletions crates/fiber/src/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use winapi::shared::minwindef::*;
use winapi::um::fibersapi::*;
use winapi::um::winbase::*;

pub struct Fiber {
pub struct RawFiber {
fiber: LPVOID,
state: Box<StartState>,
}
Expand Down Expand Up @@ -37,14 +37,11 @@ where
super::Suspend::<A, B, C>::execute(suspend, initial, *func);
}

impl Fiber {
pub fn new<F, A, B, C>(stack_size: usize, func: F) -> io::Result<Fiber>
where
F: FnOnce(A, &super::Suspend<A, B, C>) -> C,
{
impl RawFiber {
pub fn new(stack_size: usize) -> io::Result<Self> {
unsafe {
let state = Box::new(StartState {
initial_closure: Cell::new(Box::into_raw(Box::new(func)).cast()),
initial_closure: Cell::new(ptr::null_mut()),
parent: Cell::new(ptr::null_mut()),
result_location: Cell::new(ptr::null()),
});
Expand All @@ -57,11 +54,21 @@ impl Fiber {
drop(Box::from_raw(state.initial_closure.get().cast::<F>()));
Err(io::Error::last_os_error())
} else {
Ok(Fiber { fiber, state })
Ok(Self { fiber, state })
}
}
}

pub(crate) fn init<F, A, B, C>(&self, func: F)
where
F: FnOnce(A, &super::Suspend<A, B, C>) -> C,
{
assert!(self.state.initial_closure.is_null());
self.state
.initial_closure
.set(Box::into_raw(Box::new(func)).cast());
}

pub(crate) fn resume<A, B, C>(&self, result: &Cell<RunResult<A, B, C>>) {
unsafe {
let is_fiber = IsThreadAFiber() != 0;
Expand Down Expand Up @@ -89,7 +96,7 @@ impl Fiber {
}
}

impl Drop for Fiber {
impl Drop for RawFiber {
fn drop(&mut self) {
unsafe {
DeleteFiber(self.fiber);
Expand Down
7 changes: 7 additions & 0 deletions crates/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ edition = "2018"

[dependencies]
wasmtime-environ = { path = "../environ", version = "0.22.0" }
wasmtime-fiber = { path = "../fiber", version = "0.22.0", optional = true }
region = "2.1.0"
libc = { version = "0.2.82", default-features = false }
log = "0.4.8"
Expand All @@ -33,3 +34,9 @@ cc = "1.0"

[badges]
maintenance = { status = "actively-developed" }

[features]
default = ["async"]

# Enables support for "async" fibers in the instance allocator
async = ["wasmtime-fiber"]
22 changes: 22 additions & 0 deletions crates/runtime/src/instance/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ pub enum InstantiationError {
Trap(Trap),
}

/// An error while creating a fiber.
#[derive(Error, Debug)]
pub enum FiberError {
/// An I/O error occured while creating the fiber.
#[error("I/O error: {0}")]
Io(std::io::Error),

/// A limit on how many fibers are supported has been reached.
#[error("Limit of {0} concurrent fibers has been reached")]
Limit(u32),
}

/// Represents a runtime instance allocator.
///
/// # Safety
Expand Down Expand Up @@ -127,6 +139,10 @@ pub unsafe trait InstanceAllocator: Send + Sync {
///
/// Use extreme care when deallocating an instance so that there are no dangling instance pointers.
unsafe fn deallocate(&self, handle: &InstanceHandle);

/// Creates a fiber for executing async instances on.
#[cfg(feature = "async")]
fn create_fiber(&self) -> Result<wasmtime_fiber::RawFiber, FiberError>;
}

unsafe fn initialize_vmcontext(
Expand Down Expand Up @@ -544,4 +560,10 @@ unsafe impl InstanceAllocator for OnDemandInstanceAllocator {
ptr::drop_in_place(instance as *const Instance as *mut Instance);
alloc::dealloc(instance as *const Instance as *mut _, layout);
}

/// Creates a fiber for executing async instances on.
#[cfg(feature = "async")]
fn create_fiber(&self) -> Result<wasmtime_fiber::RawFiber, FiberError> {
wasmtime_fiber::RawFiber::new(0).map_err(FiberError::Io)
}
}
2 changes: 1 addition & 1 deletion crates/wasmtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,4 @@ experimental_x64 = ["wasmtime-jit/experimental_x64"]

# Enables support for "async stores" as well as defining host functions as
# `async fn` and calling functions asynchronously.
async = ["wasmtime-fiber"]
async = ["wasmtime-runtime/async", "wasmtime-fiber"]
15 changes: 10 additions & 5 deletions crates/wasmtime/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,10 +713,16 @@ impl Store {
pub(crate) async fn on_fiber<R>(&self, func: impl FnOnce() -> R) -> Result<R, Trap> {
debug_assert!(self.is_async());

// TODO: allocation of a fiber should be much more abstract where we
// shouldn't be allocating huge stacks on every async wasm function call.
// Create the raw fiber to run the function on
let raw_fiber = self
.engine()
.config()
.instance_allocator()
.create_fiber()
.map_err(|e| Trap::from(anyhow::Error::from(e)))?;

let mut slot = None;
let fiber = wasmtime_fiber::Fiber::new(10 * 1024 * 1024, |keep_going, suspend| {
let fiber = wasmtime_fiber::Fiber::new_from_raw(raw_fiber, |keep_going, suspend| {
// First check and see if we were interrupted/dropped, and only
// continue if we haven't been.
keep_going?;
Expand All @@ -734,8 +740,7 @@ impl Store {

slot = Some(func());
Ok(())
})
.map_err(|e| Trap::from(anyhow::Error::from(e)))?;
});

// Once we have the fiber representing our synchronous computation, we
// wrap that in a custom future implementation which does the
Expand Down

0 comments on commit 0f7431c

Please sign in to comment.