Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): fast Wasm streaming in Rust #349

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions core/01_core.js
Original file line number Diff line number Diff line change
Expand Up @@ -809,11 +809,6 @@
getProxyDetails: (proxy) => ops.op_get_proxy_details(proxy),
isProxy: (value) => ops.op_is_proxy(value),
memoryUsage: () => ops.op_memory_usage(),
setWasmStreamingCallback: (fn) => ops.op_set_wasm_streaming_callback(fn),
abortWasmStreaming: (
rid,
error,
) => ops.op_abort_wasm_streaming(rid, error),
destructureError: (error) => ops.op_destructure_error(error),
opNames: () => ops.op_op_names(),
eventLoopHasMoreWork: () => ops.op_event_loop_has_more_work(),
Expand Down
2 changes: 2 additions & 0 deletions core/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ pub use crate::ops_builtin::op_print;
pub use crate::ops_builtin::op_resources;
pub use crate::ops_builtin::op_void_async;
pub use crate::ops_builtin::op_void_sync;
pub use crate::ops_builtin_v8::set_wasm_streaming_callback;
pub use crate::ops_metrics::merge_op_metrics;
pub use crate::ops_metrics::OpMetricsEvent;
pub use crate::ops_metrics::OpMetricsFactoryFn;
Expand All @@ -134,6 +135,7 @@ pub use crate::runtime::PollEventLoopOptions;
pub use crate::runtime::RuntimeOptions;
pub use crate::runtime::SharedArrayBufferStore;
pub use crate::runtime::Snapshot;
pub use crate::runtime::WasmStreamingFn;
pub use crate::runtime::V8_WRAPPER_OBJECT_INDEX;
pub use crate::runtime::V8_WRAPPER_TYPE_INDEX;
pub use crate::source_map::SourceMapGetter;
Expand Down
51 changes: 0 additions & 51 deletions core/ops_builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::ops_builtin_v8;
use crate::resources::ResourceId;
use crate::JsBuffer;
use crate::OpState;
use crate::Resource;
use anyhow::Error;
use bytes::BytesMut;
use std::cell::RefCell;
Expand All @@ -25,8 +24,6 @@ deno_core::extension!(
op_try_close,
op_print,
op_resources,
op_wasm_streaming_feed,
op_wasm_streaming_set_url,
op_void_sync,
op_error_async,
op_error_async_deferred,
Expand Down Expand Up @@ -63,8 +60,6 @@ deno_core::extension!(
ops_builtin_v8::op_get_non_index_property_names,
ops_builtin_v8::op_get_constructor_name,
ops_builtin_v8::op_memory_usage,
ops_builtin_v8::op_set_wasm_streaming_callback,
ops_builtin_v8::op_abort_wasm_streaming,
ops_builtin_v8::op_destructure_error,
ops_builtin_v8::op_dispatch_exception,
ops_builtin_v8::op_op_names,
Expand Down Expand Up @@ -154,52 +149,6 @@ pub fn op_print(#[string] msg: &str, is_err: bool) -> Result<(), Error> {
Ok(())
}

pub struct WasmStreamingResource(pub(crate) RefCell<v8::WasmStreaming>);

impl Resource for WasmStreamingResource {
fn close(self: Rc<Self>) {
// At this point there are no clones of Rc<WasmStreamingResource> on the
// resource table, and no one should own a reference outside of the stack.
// Therefore, we can be sure `self` is the only reference.
if let Ok(wsr) = Rc::try_unwrap(self) {
wsr.0.into_inner().finish();
} else {
panic!("Couldn't consume WasmStreamingResource.");
}
}
}

/// Feed bytes to WasmStreamingResource.
#[op2(fast)]
pub fn op_wasm_streaming_feed(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
#[buffer] bytes: &[u8],
) -> Result<(), Error> {
let wasm_streaming = state
.borrow_mut()
.resource_table
.get::<WasmStreamingResource>(rid)?;

wasm_streaming.0.borrow_mut().on_bytes_received(bytes);

Ok(())
}

#[op2(fast)]
pub fn op_wasm_streaming_set_url(
state: &mut OpState,
#[smi] rid: ResourceId,
#[string] url: &str,
) -> Result<(), Error> {
let wasm_streaming =
state.resource_table.get::<WasmStreamingResource>(rid)?;

wasm_streaming.0.borrow_mut().set_url(url);

Ok(())
}

#[op2(async)]
async fn op_read(
state: Rc<RefCell<OpState>>,
Expand Down
70 changes: 10 additions & 60 deletions core/ops_builtin_v8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::error::range_error;
use crate::error::type_error;
use crate::error::JsError;
use crate::op2;
use crate::ops_builtin::WasmStreamingResource;
use crate::resolve_url;
use crate::runtime::script_origin;
use crate::runtime::JsRealm;
Expand All @@ -14,11 +13,9 @@ use crate::source_map::apply_source_map;
use crate::source_map::SourceMapApplication;
use crate::JsBuffer;
use crate::JsRuntime;
use crate::OpState;
use anyhow::Error;
use serde::Deserialize;
use serde::Serialize;
use std::cell::RefCell;
use std::rc::Rc;
use v8::ValueDeserializerHelper;
use v8::ValueSerializerHelper;
Expand Down Expand Up @@ -703,72 +700,25 @@ pub fn op_memory_usage(scope: &mut v8::HandleScope) -> MemoryUsage {
}
}

#[op2]
pub fn op_set_wasm_streaming_callback(
pub fn set_wasm_streaming_callback(
scope: &mut v8::HandleScope,
#[global] cb: v8::Global<v8::Function>,
) -> Result<(), Error> {
cb: crate::WasmStreamingFn,
) {
let context_state_rc = JsRealm::state_from_scope(scope);
let mut context_state = context_state_rc.borrow_mut();
// The callback to pass to the v8 API has to be a unit type, so it can't
// borrow or move any local variables. Therefore, we're storing the JS
// borrow or move any local variables. Therefore, we're storing the
// callback in a JsRuntimeState slot.
if context_state.js_wasm_streaming_cb.is_some() {
return Err(type_error("op_set_wasm_streaming_callback already called"));
}
context_state.js_wasm_streaming_cb = Some(Rc::new(cb));
context_state.wasm_streaming_cb = Some(Rc::new(cb));

scope.set_wasm_streaming_callback(|scope, arg, wasm_streaming| {
let (cb_handle, streaming_rid) = {
let context_state_rc = JsRealm::state_from_scope(scope);
let cb_handle = context_state_rc
.borrow()
.js_wasm_streaming_cb
.as_ref()
.unwrap()
.clone();
let state = JsRuntime::state_from(scope);
let streaming_rid = state
.op_state
.borrow_mut()
.resource_table
.add(WasmStreamingResource(RefCell::new(wasm_streaming)));
(cb_handle, streaming_rid)
};
let context_state_rc = JsRealm::state_from_scope(scope);
let ctx = context_state_rc.borrow();
let cb = ctx.wasm_streaming_cb.as_ref().unwrap();
let state_rc = JsRuntime::state_from(scope).op_state.clone();

let undefined = v8::undefined(scope);
let rid = serde_v8::to_v8(scope, streaming_rid).unwrap();
cb_handle
.open(scope)
.call(scope, undefined.into(), &[arg, rid]);
cb(state_rc, scope, arg, wasm_streaming);
});
Ok(())
}

// This op is re-entrant as it makes a v8 call. It also cannot be fast because
// we require a JS execution scope.
#[allow(clippy::let_and_return)]
#[op2(nofast, reentrant)]
pub fn op_abort_wasm_streaming(
state: Rc<RefCell<OpState>>,
rid: u32,
error: v8::Local<v8::Value>,
) -> Result<(), Error> {
// NOTE: v8::WasmStreaming::abort can't be called while `state` is borrowed;
let wasm_streaming = state
.borrow_mut()
.resource_table
.take::<WasmStreamingResource>(rid)?;

// At this point there are no clones of Rc<WasmStreamingResource> on the
// resource table, and no one should own a reference because we're never
// cloning them. So we can be sure `wasm_streaming` is the only reference.
if let Ok(wsr) = std::rc::Rc::try_unwrap(wasm_streaming) {
wsr.0.into_inner().abort(Some(error));
} else {
panic!("Couldn't consume WasmStreamingResource.");
}
Ok(())
}

// This op calls `op_apply_source_map` re-entrantly.
Expand Down
10 changes: 8 additions & 2 deletions core/runtime/jsrealm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,21 @@ impl Hasher for IdentityHasher {
}
}

pub type WasmStreamingFn = fn(
Rc<RefCell<crate::OpState>>,
&mut v8::HandleScope,
v8::Local<v8::Value>,
v8::WasmStreaming,
);

#[derive(Default)]
pub(crate) struct ContextState {
pub(crate) task_spawner_factory: Arc<V8TaskSpawnerFactory>,
pub(crate) js_event_loop_tick_cb: Option<Rc<v8::Global<v8::Function>>>,
pub(crate) js_build_custom_error_cb: Option<Rc<v8::Global<v8::Function>>>,
pub(crate) js_promise_reject_cb: Option<Rc<v8::Global<v8::Function>>>,
pub(crate) js_format_exception_cb: Option<Rc<v8::Global<v8::Function>>>,
pub(crate) js_wasm_streaming_cb: Option<Rc<v8::Global<v8::Function>>>,
pub(crate) wasm_streaming_cb: Option<Rc<WasmStreamingFn>>,
pub(crate) pending_promise_rejections:
VecDeque<(v8::Global<v8::Promise>, v8::Global<v8::Value>)>,
pub(crate) unrefed_ops: HashSet<i32, BuildHasherDefault<IdentityHasher>>,
Expand Down Expand Up @@ -179,7 +186,6 @@ impl JsRealmInner {
std::mem::take(&mut realm_state.js_build_custom_error_cb);
std::mem::take(&mut realm_state.js_promise_reject_cb);
std::mem::take(&mut realm_state.js_format_exception_cb);
std::mem::take(&mut realm_state.js_wasm_streaming_cb);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably still take this one just in case there are globals in the closure. Eventually I'd like to get rid of that explicit intermediate take step but I can't guarantee right now we won't leak bits of the isolate right now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Matt here - can you verify this end-to-end in Deno repo if that doesn't cause problems before merging this PR?

// The OpCtx slice may contain a circular reference
std::mem::take(&mut realm_state.op_ctxs);

Expand Down
1 change: 1 addition & 0 deletions core/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub const V8_WRAPPER_OBJECT_INDEX: i32 = 1;

pub(crate) use jsrealm::ContextState;
pub(crate) use jsrealm::JsRealm;
pub use jsrealm::WasmStreamingFn;
pub use jsruntime::CompiledWasmModuleStore;
pub use jsruntime::CreateRealmOptions;
pub use jsruntime::CrossIsolateStore;
Expand Down
32 changes: 23 additions & 9 deletions core/runtime/tests/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use cooked_waker::Wake;
use cooked_waker::WakeRef;
use futures::future::poll_fn;
use std::borrow::Cow;
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI8;
Expand Down Expand Up @@ -323,16 +324,29 @@ fn terminate_execution() {
async fn wasm_streaming_op_invocation_in_import() {
let (mut runtime, _dispatch_count) = setup(Mode::Async);

// Run an infinite loop in WebAssembly code, which should be terminated.
runtime.execute_script_static("setup.js",
r#"
Deno.core.setWasmStreamingCallback((source, rid) => {
Deno.core.ops.op_wasm_streaming_set_url(rid, "file:///foo.wasm");
Deno.core.ops.op_wasm_streaming_feed(rid, source);
Deno.core.close(rid);
});
"#).unwrap();
fn handle_wasm_streaming(
_state: Rc<RefCell<OpState>>,
scope: &mut v8::HandleScope,
value: v8::Local<v8::Value>,
mut wasm_streaming: v8::WasmStreaming,
) {
wasm_streaming.set_url("file:///foo.wasm");
let ab: v8::Local<v8::ArrayBufferView> =
value.try_into().expect("value is not an array buffer");
let backing_store = ab.buffer(scope).unwrap().get_backing_store();
let ptr = backing_store.data().unwrap().as_ptr() as *mut u8;
let len = backing_store.byte_length();
let contents = unsafe { std::slice::from_raw_parts_mut(ptr, len) };
wasm_streaming.on_bytes_received(contents);
wasm_streaming.finish();
}

crate::set_wasm_streaming_callback(
&mut runtime.handle_scope(),
handle_wasm_streaming,
);

// Run an infinite loop in WebAssembly code, which should be terminated.
let promise = runtime.execute_script_static("main.js",
r#"
// (module (import "env" "data" (global i64)))
Expand Down