Skip to content

Commit

Permalink
rdx changes for async, pollable, and tasks
Browse files Browse the repository at this point in the history
this commit has various experimental changes in it which will likely be removed down the road

I tried to get pollable working but it blocks on native and panics on web

the async was worked around and should be cleaned up at a future date
  • Loading branch information
DougAnderson444 committed Jan 8, 2025
1 parent 66a9c5c commit cf6b5c9
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 80 deletions.
2 changes: 2 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[target.wasm32-unknown-unknown]
rustflags = ["--deny", "warnings"]
5 changes: 4 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ check: build
./check.sh

check32:
cargo check --target wasm32-unknown-unknown
RUSTFLAGS="--deny warnings" cargo check --target wasm32-unknown-unknown

build32:
cargo +nightly build -Z build-std --target wasm32-unknown-unknown

force:
cargo run --bin force-build-wasm-bins
98 changes: 73 additions & 25 deletions src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ use send_wrapper::SendWrapper;
pub mod resource_table;

use std::any::Any;
use std::cell::{Ref, RefMut};
use std::cell::RefMut;
use std::collections::HashMap;
use std::ops::Deref;
use std::ops::DerefMut;
use std::pin::Pin;

use std::cell::RefCell;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
#[cfg(not(target_arch = "wasm32"))]
pub use std::time::{Duration, Instant, SystemTime};
Expand All @@ -44,28 +46,47 @@ pub use wasmtime_runtime_layer as runtime_layer;
pub use js_wasm_runtime_layer as runtime_layer;

pub use crate::Error;
use parking_lot::{lock_api::MutexGuard, RawMutex};

pub enum ScopeRef<'a> {
Borrowed(&'a rhai::Scope<'static>),
Refcell(Ref<'a, rhai::Scope<'static>>),
/// Immutable reference to the [rhai::Scope]
pub enum ScopeRef {
Borrowed(Arc<parking_lot::Mutex<rhai::Scope<'static>>>),
Refcell(Rc<RefCell<rhai::Scope<'static>>>),
}

pub enum ScopeRefMut<'a> {
Borrowed(&'a mut rhai::Scope<'static>),
Refcell(RefMut<'a, rhai::Scope<'static>>),
#[cfg(not(target_arch = "wasm32"))]
impl Deref for ScopeRef {
type Target = Arc<parking_lot::Mutex<rhai::Scope<'static>>>;

fn deref(&self) -> &Self::Target {
match self {
ScopeRef::Borrowed(scope) => scope,
ScopeRef::Refcell(_ref_scope) => {
unreachable!()
}
}
}
}

impl Deref for ScopeRef<'_> {
type Target = rhai::Scope<'static>;
#[cfg(target_arch = "wasm32")]
impl Deref for ScopeRef {
type Target = Rc<RefCell<rhai::Scope<'static>>>;

fn deref(&self) -> &Self::Target {
match self {
ScopeRef::Borrowed(scope) => scope,
ScopeRef::Borrowed(_) => {
unreachable!()
}
ScopeRef::Refcell(ref_scope) => ref_scope,
}
}
}

pub enum ScopeRefMut<'a> {
Borrowed(MutexGuard<'a, RawMutex, rhai::Scope<'static>>),
Refcell(RefMut<'a, rhai::Scope<'static>>),
}

impl Deref for ScopeRefMut<'_> {
type Target = rhai::Scope<'static>;

Expand All @@ -87,7 +108,7 @@ impl DerefMut for ScopeRefMut<'_> {
}
pub trait Inner {
/// Update the state with the given key and value
fn update(&mut self, key: &str, value: impl Into<rhai::Dynamic> + Copy);
fn update(&mut self, key: &str, value: impl Into<rhai::Dynamic> + Clone);

/// Return the [rhai::Scope]
fn scope(&self) -> ScopeRef;
Expand All @@ -110,17 +131,27 @@ impl poll::Subscribe for Sleep {
async fn ready(&mut self) {
#[cfg(not(target_arch = "wasm32"))]
{
tracing::info!("Sleeping until {:?} at {:?}", self.end, Instant::now());
tracing::info!(
"READY: Sleeping until {:?} at {:?}",
self.end,
Instant::now()
);
tokio::time::sleep_until(self.end.into()).await;
tracing::info!("Woke up at {:?}", Instant::now());
}

#[cfg(target_arch = "wasm32")]
{
tracing::info!(
"READY Sleeping until {:?} at {:?}",
self.end,
Instant::now()
);
send_wrapper::SendWrapper::new(async move {
js_sleep(self.end.elapsed().as_millis() as i32)
.await
.unwrap();
if let Err(e) = js_sleep(self.end.elapsed().as_millis() as i32).await {
tracing::error!("Error sleeping: {:?}", e);
}
tracing::info!("READY Woke up at {:?}", Instant::now());
})
.await;
}
Expand Down Expand Up @@ -154,14 +185,24 @@ impl Subscribe for Deadline {
Deadline::Instant(instant) => {
#[cfg(not(target_arch = "wasm32"))]
{
tracing::info!(
"Deadline: Sleeping until {:?} at {:?}",
instant,
Instant::now()
);
tokio::time::sleep_until((*instant).into()).await;
}
#[cfg(target_arch = "wasm32")]
{
tracing::info!(
"Deadline: Sleeping until {:?} at {:?}",
instant,
Instant::now()
);
send_wrapper::SendWrapper::new(async move {
js_sleep(instant.elapsed().as_millis() as i32)
.await
.unwrap();
if let Err(e) = js_sleep(instant.elapsed().as_millis() as i32).await {
tracing::error!("Error sleeping: {:?}", e);
}
})
.await;
}
Expand All @@ -176,13 +217,16 @@ fn subscribe_to_duration(
duration: Duration,
) -> anyhow::Result<Resource<Pollable>> {
let sleep = if duration.is_zero() {
tracing::info!("subscribe_to_duration: Duration is zero, returning Past");
table.lock().unwrap().push(Deadline::Past)?
} else if let Some(deadline) = Instant::now().checked_add(duration) {
tracing::info!("subscribe_to_duration: Duration is not zero, returning Instant");
// NB: this resource created here is not actually exposed to wasm, it's
// only an internal implementation detail used to match the signature
// expected by `subscribe`.
table.lock().unwrap().push(Deadline::Instant(deadline))?
} else {
tracing::info!("subscribe_to_duration: Duration is too far in the future, returning Never");
// If the user specifies a time so far in the future we can't
// represent it, wait forever rather than trap.
table.lock().unwrap().push(Deadline::Never)?
Expand Down Expand Up @@ -537,13 +581,17 @@ pub fn instantiate_instance<T: Inner + 'static>(
panic!("Incorrect input type.")
};

tracing::info!("Subscribing to duration: {:?}", millis);

let resource_pollable =
subscribe_to_duration(table_clone.clone(), Duration::from_millis(millis))
.map_err(|e| {
tracing::error!("Error subscribing to duration: {:?}", e);
e
})?;

tracing::info!("Subscribed to duration");

let pollable_resource = ResourceOwn::new(
&mut store,
resource_pollable,
Expand Down Expand Up @@ -641,29 +689,29 @@ mod tests {
#[derive(Default)]
struct State {
count: rhai::Dynamic,
scope: rhai::Scope<'static>,
scope: Arc<parking_lot::Mutex<rhai::Scope<'static>>>,
}

impl Inner for State {
fn update(&mut self, key: &str, value: impl Into<rhai::Dynamic> + Copy) {
println!("updating {}: {}", key, value.into());
fn update(&mut self, key: &str, value: impl Into<rhai::Dynamic> + Clone) {
println!("updating {}: {}", key, value.clone().into());
// set count to value
// TODO: Chg hard code into rhai scope change
if key == "count" {
self.count = value.into();
self.count = value.clone().into();
}
}

fn scope(&self) -> ScopeRef {
ScopeRef::Borrowed(&self.scope)
ScopeRef::Borrowed(self.scope.clone())
}

fn scope_mut(&mut self) -> ScopeRefMut {
ScopeRefMut::Borrowed(&mut self.scope)
ScopeRefMut::Borrowed(self.scope.lock())
}

fn into_scope(self) -> rhai::Scope<'static> {
self.scope
self.scope.lock().clone()
}
}

Expand Down
69 changes: 41 additions & 28 deletions src/layer/resource.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![allow(dead_code, unused)]
#[cfg(not(target_arch = "wasm32"))]
use core::sync::atomic::{AtomicU64, Ordering::Relaxed};
use std::marker;

Expand All @@ -9,10 +9,16 @@ pub struct Resource<T> {
/// Dear rust please consider `T` used even though it's not actually used.
_marker: marker::PhantomData<fn() -> T>,

#[cfg(not(target_arch = "wasm32"))]
state: AtomicResourceState,
#[cfg(target_arch = "wasm32")]
state: AtomicResourceState,
}

#[cfg(not(target_arch = "wasm32"))]
struct AtomicResourceState(AtomicU64);
#[cfg(target_arch = "wasm32")]
struct AtomicResourceState(u64);

impl<T> Resource<T>
where
Expand Down Expand Up @@ -56,19 +62,25 @@ enum ResourceState {
Index(HostResourceIndex),
}

#[cfg(target_arch = "wasm32")]
impl AtomicResourceState {
const NOT_IN_TABLE: Self = Self(ResourceState::NOT_IN_TABLE);

/// get
fn get(&self) -> ResourceState {
// the u64 equivalent of AtomicU64 load
ResourceState::decode(self.0)
}
}

#[cfg(not(target_arch = "wasm32"))]
impl AtomicResourceState {
#[allow(clippy::declare_interior_mutable_const)]
const BORROW: Self = Self(AtomicU64::new(ResourceState::BORROW));
#[allow(clippy::declare_interior_mutable_const)]
const NOT_IN_TABLE: Self = Self(AtomicU64::new(ResourceState::NOT_IN_TABLE));

fn get(&self) -> ResourceState {
ResourceState::decode(self.0.load(Relaxed))
}

fn swap(&self, state: ResourceState) -> ResourceState {
ResourceState::decode(self.0.swap(state.encode(), Relaxed))
}
}

impl ResourceState {
Expand All @@ -86,14 +98,15 @@ impl ResourceState {
}
}

fn encode(&self) -> u64 {
match self {
Self::Borrow => Self::BORROW,
Self::NotInTable => Self::NOT_IN_TABLE,
Self::Taken => Self::TAKEN,
Self::Index(index) => index.0,
}
}
// unused
//fn encode(&self) -> u64 {
// match self {
// Self::Borrow => Self::BORROW,
// Self::NotInTable => Self::NOT_IN_TABLE,
// Self::Taken => Self::TAKEN,
// Self::Index(index) => index.0,
// }
//}
}

/// Host representation of an index into a table slot.
Expand All @@ -105,16 +118,16 @@ impl ResourceState {
#[repr(transparent)]
pub struct HostResourceIndex(u64);

impl HostResourceIndex {
fn new(idx: u32, gen: u32) -> HostResourceIndex {
HostResourceIndex(u64::from(idx) | (u64::from(gen) << 32))
}

fn index(&self) -> u32 {
u32::try_from(self.0 & 0xffffffff).unwrap()
}

fn gen(&self) -> u32 {
u32::try_from(self.0 >> 32).unwrap()
}
}
//impl HostResourceIndex {
// fn new(idx: u32, gen: u32) -> HostResourceIndex {
// HostResourceIndex(u64::from(idx) | (u64::from(gen) << 32))
// }
//
// fn index(&self) -> u32 {
// u32::try_from(self.0 & 0xffffffff).unwrap()
// }
//
// fn gen(&self) -> u32 {
// u32::try_from(self.0 >> 32).unwrap()
// }
//}
Loading

0 comments on commit cf6b5c9

Please sign in to comment.