From 5ccf76438924a9151e89b87bea2bd15af900feae Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Sat, 8 May 2021 17:54:07 +0200 Subject: [PATCH 01/13] move bench_large_message and cleanup bench --- cli/bench/main.rs | 2 +- .../bench_large_message.ts} | 12 ++++-------- ...rge_message_worker.js => worker_large_message.js} | 0 3 files changed, 5 insertions(+), 9 deletions(-) rename cli/tests/{workers_large_message_bench.ts => workers/bench_large_message.ts} (66%) rename cli/tests/workers/{large_message_worker.js => worker_large_message.js} (100%) diff --git a/cli/bench/main.rs b/cli/bench/main.rs index f2ade54d80142f..c3346bc88b748a 100644 --- a/cli/bench/main.rs +++ b/cli/bench/main.rs @@ -75,7 +75,7 @@ const EXEC_TIME_BENCHMARKS: &[(&str, &[&str], Option)] = &[ &[ "run", "--allow-read", - "cli/tests/workers_large_message_bench.ts", + "cli/tests/workers/bench_large_message.ts", ], None, ), diff --git a/cli/tests/workers_large_message_bench.ts b/cli/tests/workers/bench_large_message.ts similarity index 66% rename from cli/tests/workers_large_message_bench.ts rename to cli/tests/workers/bench_large_message.ts index 9cda5a40d623ef..53076e7113adf2 100644 --- a/cli/tests/workers_large_message_bench.ts +++ b/cli/tests/workers/bench_large_message.ts @@ -1,14 +1,10 @@ // Copyright 2020 the Deno authors. All rights reserved. MIT license. -// deno-lint-ignore-file - -import { deferred } from "../../test_util/std/async/deferred.ts"; - -function oneWorker(i: any): Promise { +function oneWorker(i: number) { return new Promise((resolve) => { let countDown = 10; const worker = new Worker( - new URL("workers/large_message_worker.js", import.meta.url).href, + new URL("worker_large_message.js", import.meta.url).href, { type: "module" }, ); worker.onmessage = (e): void => { @@ -23,8 +19,8 @@ function oneWorker(i: any): Promise { }); } -function bench(): Promise { - let promises = []; +function bench() { + const promises = []; for (let i = 0; i < 50; i++) { promises.push(oneWorker(i)); } diff --git a/cli/tests/workers/large_message_worker.js b/cli/tests/workers/worker_large_message.js similarity index 100% rename from cli/tests/workers/large_message_worker.js rename to cli/tests/workers/worker_large_message.js From b6ce4fb4ef8dc9edc868095641b97eecf612615f Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Sat, 8 May 2021 17:54:33 +0200 Subject: [PATCH 02/13] deserialize wrong error message --- core/bindings.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/bindings.rs b/core/bindings.rs index edf115d2753e33..f6c94b335cb043 100644 --- a/core/bindings.rs +++ b/core/bindings.rs @@ -553,7 +553,7 @@ fn deserialize( match value { Some(deserialized) => rv.set(deserialized), None => { - let msg = v8::String::new(scope, "string too long").unwrap(); + let msg = v8::String::new(scope, "could not deserialize value").unwrap(); let exception = v8::Exception::range_error(scope, msg); scope.throw_exception(exception); } From 63dea20c650b14c605c1345454062081be340b28 Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Sun, 9 May 2021 15:15:16 +0200 Subject: [PATCH 03/13] use structured cloning and improve worker handles --- cli/tests/workers/racy_worker.js | 4 +- runtime/js/11_workers.js | 41 +---- runtime/js/99_main.js | 98 ++++++----- runtime/ops/web_worker.rs | 48 ++++-- runtime/ops/worker_host.rs | 38 ++--- runtime/web_worker.rs | 272 +++++++++++++++---------------- runtime/worker.rs | 2 +- 7 files changed, 247 insertions(+), 256 deletions(-) diff --git a/cli/tests/workers/racy_worker.js b/cli/tests/workers/racy_worker.js index 83756b791d0f2a..a7c75be883ee7b 100644 --- a/cli/tests/workers/racy_worker.js +++ b/cli/tests/workers/racy_worker.js @@ -14,7 +14,9 @@ setTimeout(() => { while (true) { await new Promise((done) => { setTimeout(() => { - postMessage({ buf: new Array(999999) }); + postMessage({ + buf: [...new Array(999999)].map((element, index) => index), + }); done(); }); }); diff --git a/runtime/js/11_workers.js b/runtime/js/11_workers.js index 508dd46d4811ed..728d4a9b04e645 100644 --- a/runtime/js/11_workers.js +++ b/runtime/js/11_workers.js @@ -39,26 +39,8 @@ return core.opAsync("op_host_get_message", id); } - const encoder = new TextEncoder(); const decoder = new TextDecoder(); - function encodeMessage(data) { - const dataJson = JSON.stringify(data); - return encoder.encode(dataJson); - } - - function decodeMessage(dataIntArray) { - // Temporary solution until structured clone arrives in v8. - // Current clone is made by parsing json to byte array and from byte array back to json. - // In that case "undefined" transforms to empty byte array, but empty byte array does not transform back to undefined. - // Thats why this special is statement is needed. - if (dataIntArray.length == 0) { - return undefined; - } - const dataJson = decoder.decode(dataIntArray); - return JSON.parse(dataJson); - } - /** * @param {string} permission * @return {boolean} @@ -211,18 +193,7 @@ this.#poll(); } - #handleMessage = (msgData) => { - let data; - try { - data = decodeMessage(new Uint8Array(msgData)); - } catch (e) { - const msgErrorEvent = new MessageEvent("messageerror", { - cancelable: false, - data, - }); - return; - } - + #handleMessage = (data) => { const msgEvent = new MessageEvent("message", { cancelable: false, data, @@ -269,7 +240,7 @@ throw new Error("Unhandled error event reached main worker."); } else { core.opSync( - "op_host_unhandled_error", + "op_worker_unhandled_error", event.error.message, ); } @@ -278,7 +249,8 @@ } if (type === "msg") { - this.#handleMessage(event.data); + const data = core.deserialize(new Uint8Array(event.data)); + this.#handleMessage(data); continue; } @@ -288,7 +260,7 @@ throw new Error("Unhandled error event reached main worker."); } else { core.opSync( - "op_host_unhandled_error", + "op_worker_unhandled_error", event.error.message, ); } @@ -317,7 +289,8 @@ return; } - hostPostMessage(this.#id, encodeMessage(message)); + const bufferMsg = core.serialize(message); + hostPostMessage(this.#id, bufferMsg); } terminate() { diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index fee7cd2d7bb21e..305ce211648ae3 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -75,60 +75,71 @@ delete Object.prototype.__proto__; const onerror = () => {}; function postMessage(data) { - const dataJson = JSON.stringify(data); - const dataIntArray = encoder.encode(dataJson); + const dataIntArray = core.serialize(data); opPostMessage(dataIntArray); } let isClosing = false; - async function workerMessageRecvCallback(data) { - const msgEvent = new MessageEvent("message", { - cancelable: false, - data, - }); - - try { - if (globalThis["onmessage"]) { - const result = globalThis.onmessage(msgEvent); - if (result && "then" in result) { - await result; - } - } - globalThis.dispatchEvent(msgEvent); - } catch (e) { - let handled = false; - - const errorEvent = new ErrorEvent("error", { - cancelable: true, - message: e.message, - lineno: e.lineNumber ? e.lineNumber + 1 : undefined, - colno: e.columnNumber ? e.columnNumber + 1 : undefined, - filename: e.fileName, - error: null, + async function pollForMessages() { + while (!isClosing) { + const bufferMsg = await opGetMessage(); + const data = core.deserialize(new Uint8Array(bufferMsg)); + + const msgEvent = new MessageEvent("message", { + cancelable: false, + data, }); - if (globalThis["onerror"]) { - const ret = globalThis.onerror( - e.message, - e.fileName, - e.lineNumber, - e.columnNumber, - e, - ); - handled = ret === true; - } + try { + if (globalThis["onmessage"]) { + const result = globalThis.onmessage(msgEvent); + if (result && "then" in result) { + await result; + } + } + globalThis.dispatchEvent(msgEvent); + } catch (e) { + let handled = false; + + const errorEvent = new ErrorEvent("error", { + cancelable: true, + message: e.message, + lineno: e.lineNumber ? e.lineNumber + 1 : undefined, + colno: e.columnNumber ? e.columnNumber + 1 : undefined, + filename: e.fileName, + error: null, + }); + + if (globalThis["onerror"]) { + const ret = globalThis.onerror( + e.message, + e.fileName, + e.lineNumber, + e.columnNumber, + e, + ); + handled = ret === true; + } - globalThis.dispatchEvent(errorEvent); - if (errorEvent.defaultPrevented) { - handled = true; - } + globalThis.dispatchEvent(errorEvent); + if (errorEvent.defaultPrevented) { + handled = true; + } - if (!handled) { - throw e; + if (!handled) { + core.opSync( + "op_worker_unhandled_error", + e.message, + ); + } } } } + function opGetMessage() { + return core.opAsync("op_worker_get_message"); + } + function opPostMessage(data) { core.opSync("op_worker_post_message", null, data); } @@ -371,7 +382,6 @@ delete Object.prototype.__proto__; // TODO(bartlomieju): should be readonly? close: util.nonEnumerable(workerClose), postMessage: util.writable(postMessage), - workerMessageRecvCallback: util.nonEnumerable(workerMessageRecvCallback), }; let hasBootstrapped = false; @@ -482,6 +492,8 @@ delete Object.prototype.__proto__; location.setLocationHref(locationHref); registerErrors(); + pollForMessages(); + const internalSymbol = Symbol("Deno.internal"); const finalDenoNs = { diff --git a/runtime/ops/web_worker.rs b/runtime/ops/web_worker.rs index 1689b258706564..fd5cd6103c7852 100644 --- a/runtime/ops/web_worker.rs +++ b/runtime/ops/web_worker.rs @@ -1,9 +1,10 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use crate::web_worker::WebWorkerHandle; +use crate::web_worker::WebWorkerInternalHandle; use crate::web_worker::WorkerEvent; +use deno_core::error::generic_error; use deno_core::error::null_opbuf; -use deno_core::futures::channel::mpsc; +use deno_core::op_async; use deno_core::op_sync; use deno_core::Extension; use deno_core::ZeroCopyBuf; @@ -16,26 +17,53 @@ pub fn init() -> Extension { op_sync(move |state, _args: (), buf: Option| { let buf = buf.ok_or_else(null_opbuf)?; let msg_buf: Box<[u8]> = (*buf).into(); - let mut sender = state.borrow::>().clone(); - sender - .try_send(WorkerEvent::Message(msg_buf)) + let handle = state.borrow::().clone(); + handle + .post_event(WorkerEvent::Message(msg_buf)) .expect("Failed to post message to host"); Ok(()) }), ), + ( + "op_worker_get_message", + op_async(move |state, _: (), _: ()| async move { + let temp = { + let a = state.borrow(); + a.borrow::().clone() + }; + + let maybe_data = temp.get_message().await; + + if let Some(data) = maybe_data { + return Ok(data); + } + + Ok(Box::new([])) + }), + ), // Notify host that guest worker closes. ( "op_worker_close", - op_sync(move |state, _: (), _: ()| { + op_sync(|state, _: (), _: ()| { // Notify parent that we're finished - let mut sender = state.borrow::>().clone(); - sender.close_channel(); - // Terminate execution of current worker - let handle = state.borrow::(); + let mut handle = + state.borrow_mut::().clone(); + handle.terminate(); Ok(()) }), ), + // Notify host that guest worker has unhandled error. + ( + "op_worker_unhandled_error", + op_sync(|state, message: String, _: ()| { + let sender = state.borrow::().clone(); + sender + .post_event(WorkerEvent::Error(generic_error(message))) + .expect("Failed to propagate error event to parent worker"); + Ok(true) + }), + ), ]) .build() } diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs index f8d03850d89265..cf708758d974eb 100644 --- a/runtime/ops/worker_host.rs +++ b/runtime/ops/worker_host.rs @@ -15,12 +15,11 @@ use crate::web_worker::run_web_worker; use crate::web_worker::WebWorker; use crate::web_worker::WebWorkerHandle; use crate::web_worker::WorkerEvent; +use crate::web_worker::WorkerId; use deno_core::error::custom_error; -use deno_core::error::generic_error; use deno_core::error::null_opbuf; use deno_core::error::AnyError; use deno_core::error::JsError; -use deno_core::futures::channel::mpsc; use deno_core::op_async; use deno_core::op_sync; use deno_core::serde::de; @@ -46,7 +45,7 @@ use std::thread::JoinHandle; pub struct CreateWebWorkerArgs { pub name: String, - pub worker_id: u32, + pub worker_id: WorkerId, pub parent_permissions: Permissions, pub permissions: Permissions, pub main_module: ModuleSpecifier, @@ -68,13 +67,9 @@ pub struct WorkerThread { worker_handle: WebWorkerHandle, } -pub type WorkersTable = HashMap; -pub type WorkerId = u32; +pub type WorkersTable = HashMap; -pub fn init( - is_main_worker: bool, - create_web_worker_cb: Arc, -) -> Extension { +pub fn init(create_web_worker_cb: Arc) -> Extension { Extension::builder() .state(move |state| { state.put::(WorkersTable::default()); @@ -94,20 +89,6 @@ pub fn init( ), ("op_host_post_message", op_sync(op_host_post_message)), ("op_host_get_message", op_async(op_host_get_message)), - ( - "op_host_unhandled_error", - op_sync(move |state, message: String, _: ()| { - if is_main_worker { - return Err(generic_error("Cannot be called from main worker.")); - } - - let mut sender = state.borrow::>().clone(); - sender - .try_send(WorkerEvent::Error(generic_error(message))) - .expect("Failed to propagate error event to parent worker"); - Ok(true) - }), - ), ]) .build() } @@ -501,7 +482,7 @@ fn op_create_worker( use_deno_namespace, }); - // Send thread safe handle to newly created worker to host thread + // Send thread safe handle from newly created worker to host thread handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); drop(handle_sender); @@ -512,6 +493,7 @@ fn op_create_worker( run_web_worker(worker, module_specifier, maybe_source_code) })?; + // Receive WebWorkerHandle from newly created worker let worker_handle = handle_receiver.recv().unwrap()?; let worker_thread = WorkerThread { @@ -534,7 +516,7 @@ fn op_host_terminate_worker( id: WorkerId, _: (), ) -> Result<(), AnyError> { - let worker_thread = state + let mut worker_thread = state .borrow_mut::() .remove(&id) .expect("No worker handle found"); @@ -590,11 +572,11 @@ fn serialize_worker_event(event: WorkerEvent) -> Value { /// Try to remove worker from workers table - NOTE: `Worker.terminate()` /// might have been called already meaning that we won't find worker in /// table - in that case ignore. -fn try_remove_and_close(state: Rc>, id: u32) { +fn try_remove_and_close(state: Rc>, id: WorkerId) { let mut s = state.borrow_mut(); let workers = s.borrow_mut::(); if let Some(mut worker_thread) = workers.remove(&id) { - worker_thread.worker_handle.sender.close_channel(); + worker_thread.worker_handle.terminate(); worker_thread .join_handle .join() @@ -642,7 +624,7 @@ fn op_host_post_message( data: Option, ) -> Result<(), AnyError> { let data = data.ok_or_else(null_opbuf)?; - let msg = Vec::from(&*data).into_boxed_slice(); + let msg: Box<[u8]> = (*data).into(); debug!("post message to worker {}", id); let worker_thread = state diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 690b6fb58a4f36..7c531b4bbed777 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -13,7 +13,6 @@ use deno_core::futures::channel::mpsc; use deno_core::futures::future::poll_fn; use deno_core::futures::future::FutureExt; use deno_core::futures::stream::StreamExt; -use deno_core::futures::task::AtomicWaker; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::url::Url; @@ -22,11 +21,13 @@ use deno_core::Extension; use deno_core::GetErrorClassFn; use deno_core::JsErrorCreateFn; use deno_core::JsRuntime; +use deno_core::ModuleId; use deno_core::ModuleLoader; use deno_core::ModuleSpecifier; use deno_core::RuntimeOptions; use deno_file::BlobUrlStore; use log::debug; +use std::cell::RefCell; use std::env; use std::rc::Rc; use std::sync::atomic::AtomicBool; @@ -36,38 +37,83 @@ use std::task::Context; use std::task::Poll; use tokio::sync::Mutex as AsyncMutex; +pub type WorkerId = u32; +pub type WorkerMessage = Box<[u8]>; + /// Events that are sent to host from child /// worker. pub enum WorkerEvent { - Message(Box<[u8]>), + Message(WorkerMessage), Error(AnyError), TerminalError(AnyError), } -pub struct WorkerChannelsInternal { - pub sender: mpsc::Sender, - pub receiver: mpsc::Receiver>, +// Channels used for communication with worker's parent +#[derive(Clone)] +pub struct WebWorkerInternalHandle { + sender: mpsc::Sender, + receiver: Rc>>, + terminated: Arc, + isolate_handle: v8::IsolateHandle, +} + +impl WebWorkerInternalHandle { + /// Post WorkerEvent to parent as a worker + pub fn post_event(&self, event: WorkerEvent) -> Result<(), AnyError> { + let mut sender = self.sender.clone(); + // If the channel is closed, + // the worker must have terminated but the termination message has not yet been recieved. + // + // Therefore just treat it as if the worker has terminated and return. + if sender.is_closed() { + self.terminated.store(true, Ordering::SeqCst); + return Ok(()); + } + sender.try_send(event)?; + Ok(()) + } + + /// Get the WorkerEvent with lock + /// Panic if more than one listener tries to get event + pub async fn get_message(&self) -> Option { + let mut receiver = self.receiver.borrow_mut(); + receiver.next().await + } + + /// Check if this worker is terminated or being terminated + pub fn is_terminated(&self) -> bool { + self.terminated.load(Ordering::SeqCst) + } + + /// Terminate the worker + /// This function will set terminated to true, terminate the isolate and close the message channel + pub fn terminate(&mut self) { + // This function can be called multiple times by whomever holds + // the handle. However only a single "termination" should occur so + // we need a guard here. + let already_terminated = self.terminated.swap(true, Ordering::SeqCst); + + if !already_terminated { + // Stop javascript execution + self.isolate_handle.terminate_execution(); + } + + // Wake parent by closing the channel + self.sender.close_channel(); + } } -/// Wrapper for `WorkerHandle` that adds functionality -/// for terminating workers. -/// -/// This struct is used by host as well as worker itself. -/// -/// Host uses it to communicate with worker and terminate it, -/// while worker uses it only to finish execution on `self.close()`. #[derive(Clone)] pub struct WebWorkerHandle { - pub sender: mpsc::Sender>, - pub receiver: Arc>>, - terminate_tx: mpsc::Sender<()>, + sender: mpsc::Sender, + receiver: Arc>>, terminated: Arc, isolate_handle: v8::IsolateHandle, } impl WebWorkerHandle { - /// Post message to worker as a host. - pub fn post_message(&self, buf: Box<[u8]>) -> Result<(), AnyError> { + /// Post WorkerMessage to worker as a host + pub fn post_message(&self, buf: WorkerMessage) -> Result<(), AnyError> { let mut sender = self.sender.clone(); // If the channel is closed, // the worker must have terminated but the termination message has not yet been recieved. @@ -81,47 +127,50 @@ impl WebWorkerHandle { Ok(()) } - /// Get the event with lock. + /// Get the WorkerEvent with lock /// Return error if more than one listener tries to get event pub async fn get_event(&self) -> Result, AnyError> { let mut receiver = self.receiver.try_lock()?; Ok(receiver.next().await) } - pub fn terminate(&self) { + /// Terminate the worker + /// This function will set terminated to true, terminate the isolate and close the message channel + pub fn terminate(&mut self) { // This function can be called multiple times by whomever holds // the handle. However only a single "termination" should occur so // we need a guard here. let already_terminated = self.terminated.swap(true, Ordering::SeqCst); if !already_terminated { + // Stop javascript execution self.isolate_handle.terminate_execution(); - let mut sender = self.terminate_tx.clone(); - // This call should be infallible hence the `expect`. - // This might change in the future. - sender.try_send(()).expect("Failed to terminate"); } + + // Wake web worker by closing the channel + self.sender.close_channel(); } } -fn create_channels( +fn create_handles( isolate_handle: v8::IsolateHandle, - terminate_tx: mpsc::Sender<()>, -) -> (WorkerChannelsInternal, WebWorkerHandle) { - let (in_tx, in_rx) = mpsc::channel::>(1); +) -> (WebWorkerInternalHandle, WebWorkerHandle) { + let (in_tx, in_rx) = mpsc::channel::(1); let (out_tx, out_rx) = mpsc::channel::(1); - let internal_channels = WorkerChannelsInternal { + let terminated = Arc::new(AtomicBool::new(false)); + let internal_handle = WebWorkerInternalHandle { sender: out_tx, - receiver: in_rx, + receiver: Rc::new(RefCell::new(in_rx)), + terminated: terminated.clone(), + isolate_handle: isolate_handle.clone(), }; - let external_channels = WebWorkerHandle { + let external_handle = WebWorkerHandle { sender: in_tx, receiver: Arc::new(AsyncMutex::new(out_rx)), - terminated: Arc::new(AtomicBool::new(false)), - terminate_tx, + terminated, isolate_handle, }; - (internal_channels, external_channels) + (internal_handle, external_handle) } /// This struct is an implementation of `Worker` Web API @@ -129,17 +178,12 @@ fn create_channels( /// Each `WebWorker` is either a child of `MainWorker` or other /// `WebWorker`. pub struct WebWorker { - id: u32, + id: WorkerId, inspector: Option>, - // Following fields are pub because they are accessed - // when creating a new WebWorker instance. - pub(crate) internal_channels: WorkerChannelsInternal, pub js_runtime: JsRuntime, pub name: String, - waker: AtomicWaker, - event_loop_idle: bool, - terminate_rx: mpsc::Receiver<()>, - handle: WebWorkerHandle, + internal_handle: WebWorkerInternalHandle, + external_handle: WebWorkerHandle, pub use_deno_namespace: bool, pub main_module: ModuleSpecifier, } @@ -174,7 +218,7 @@ impl WebWorker { name: String, permissions: Permissions, main_module: ModuleSpecifier, - worker_id: u32, + worker_id: WorkerId, options: &WebWorkerOptions, ) -> Self { // Permissions: many ops depend on this @@ -218,7 +262,7 @@ impl WebWorker { let runtime_exts = vec![ ops::web_worker::init(), ops::runtime::init(main_module.clone()), - ops::worker_host::init(false, options.create_web_worker_cb.clone()), + ops::worker_host::init(options.create_web_worker_cb.clone()), ops::io::init(), ]; @@ -264,38 +308,24 @@ impl WebWorker { None }; - let (terminate_tx, terminate_rx) = mpsc::channel::<()>(1); - let isolate_handle = js_runtime.v8_isolate().thread_safe_handle(); - let (internal_channels, handle) = - create_channels(isolate_handle, terminate_tx); + let (internal_handle, external_handle) = { + let handle = js_runtime.v8_isolate().thread_safe_handle(); + let (internal_handle, external_handle) = create_handles(handle); + let op_state = js_runtime.op_state(); + let mut op_state = op_state.borrow_mut(); + op_state.put(internal_handle.clone()); + (internal_handle, external_handle) + }; - let mut worker = Self { + Self { id: worker_id, inspector, - internal_channels, js_runtime, name, - waker: AtomicWaker::new(), - event_loop_idle: false, - terminate_rx, - handle, + internal_handle, + external_handle, use_deno_namespace: options.use_deno_namespace, main_module, - }; - - // Setup worker-dependant OpState and return worker - { - let handle = worker.thread_safe_handle(); - let sender = worker.internal_channels.sender.clone(); - let js_runtime = &mut worker.js_runtime; - let op_state = js_runtime.op_state(); - let mut op_state = op_state.borrow_mut(); - - // Required by runtime::ops::worker_host/web_worker - op_state.put(handle); - op_state.put(sender); - - worker } } @@ -338,12 +368,20 @@ impl WebWorker { self.js_runtime.execute(url.as_str(), js_source) } + /// Loads and instantiates specified JavaScript module. + pub async fn preload_module( + &mut self, + module_specifier: &ModuleSpecifier, + ) -> Result { + self.js_runtime.load_module(module_specifier, None).await + } + /// Loads, instantiates and executes specified JavaScript module. pub async fn execute_module( &mut self, module_specifier: &ModuleSpecifier, ) -> Result<(), AnyError> { - let id = self.js_runtime.load_module(module_specifier, None).await?; + let id = self.preload_module(module_specifier).await?; let mut receiver = self.js_runtime.mod_evaluate(id); tokio::select! { @@ -357,8 +395,8 @@ impl WebWorker { } event_loop_result = self.run_event_loop() => { - if self.has_been_terminated() { - return Ok(()); + if self.internal_handle.is_terminated() { + return Ok(()); } event_loop_result?; let maybe_result = receiver.next().await; @@ -370,82 +408,42 @@ impl WebWorker { /// Returns a way to communicate with the Worker from other threads. pub fn thread_safe_handle(&self) -> WebWorkerHandle { - self.handle.clone() - } - - pub fn has_been_terminated(&self) -> bool { - self.handle.terminated.load(Ordering::SeqCst) + self.external_handle.clone() } pub fn poll_event_loop( &mut self, cx: &mut Context, ) -> Poll> { - if self.has_been_terminated() { + // If awakened because we are terminating, just return Ok + if self.internal_handle.is_terminated() { return Poll::Ready(Ok(())); } - if !self.event_loop_idle { - let poll_result = { - // We always poll the inspector if it exists. - let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx)); - self.waker.register(cx.waker()); - self.js_runtime.poll_event_loop(cx) - }; - - if let Poll::Ready(r) = poll_result { - if self.has_been_terminated() { + // We always poll the inspector if it exists. + let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx)); + match self.js_runtime.poll_event_loop(cx) { + Poll::Ready(r) => { + // If js ended because we are terminating, just return Ok + if self.internal_handle.is_terminated() { return Poll::Ready(Ok(())); } + // In case of an error, pass to parent without terminating worker if let Err(e) = r { print_worker_error(e.to_string(), &self.name); - let mut sender = self.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) + let handle = self.internal_handle.clone(); + handle + .post_event(WorkerEvent::Error(e)) .expect("Failed to post message to host"); - } - self.event_loop_idle = true; - } - } - - if let Poll::Ready(r) = self.terminate_rx.poll_next_unpin(cx) { - // terminate_rx should never be closed - assert!(r.is_some()); - return Poll::Ready(Ok(())); - } - let maybe_msg_poll_result = - self.internal_channels.receiver.poll_next_unpin(cx); - - if let Poll::Ready(maybe_msg) = maybe_msg_poll_result { - let msg = - maybe_msg.expect("Received `None` instead of message in worker"); - let msg = String::from_utf8(msg.to_vec()).unwrap(); - let script = format!("workerMessageRecvCallback({})", msg); - - // TODO(bartlomieju): set proper script name like "deno:runtime/web_worker.js" - // so it's dimmed in stack trace instead of using "__anonymous__" - if let Err(e) = self.execute(&script) { - // If execution was terminated during message callback then - // just ignore it - if self.has_been_terminated() { - return Poll::Ready(Ok(())); + return Poll::Pending; } - // Otherwise forward error to host - let mut sender = self.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) - .expect("Failed to post message to host"); + panic!("coding error: either js is polling or the worker is terminted"); } - - // Let event loop be polled again - self.event_loop_idle = false; - self.waker.wake(); + Poll::Pending => Poll::Pending, } - - Poll::Pending } pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { @@ -495,18 +493,18 @@ pub fn run_web_worker( rt.block_on(load_future) }; - let mut sender = worker.internal_channels.sender.clone(); + let internal_handle = worker.internal_handle.clone(); // If sender is closed it means that worker has already been closed from // within using "globalThis.close()" - if sender.is_closed() { + if internal_handle.is_terminated() { return Ok(()); } if let Err(e) = result { print_worker_error(e.to_string(), &name); - sender - .try_send(WorkerEvent::TerminalError(e)) + internal_handle + .post_event(WorkerEvent::TerminalError(e)) .expect("Failed to post message to host"); // Failure to execute script is a terminal error, bye, bye. @@ -522,7 +520,6 @@ pub fn run_web_worker( mod tests { use super::*; use crate::tokio_util; - use deno_core::serde_json::json; fn create_test_web_worker() -> WebWorker { let main_module = deno_core::resolve_url_or_path("./hello.js").unwrap(); @@ -589,7 +586,7 @@ mod tests { let mut handle = handle_receiver.recv().unwrap(); - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded let r = handle.post_message(msg.clone()); assert!(r.is_ok()); @@ -603,15 +600,12 @@ mod tests { assert!(maybe_msg.is_some()); match maybe_msg { Some(WorkerEvent::Message(buf)) => { - assert_eq!(*buf, *b"[1,2,3]"); + assert_eq!(*buf, [65, 3, 73, 2, 73, 4, 73, 6, 36, 0, 3]); } _ => unreachable!(), } - let msg = json!("exit") - .to_string() - .into_boxed_str() - .into_boxed_bytes(); + let msg = vec![34, 4, 101, 120, 105, 116].into_boxed_slice(); // "exit" encoded let r = handle.post_message(msg); assert!(r.is_ok()); let event = handle.get_event().await.unwrap(); @@ -636,7 +630,7 @@ mod tests { let mut handle = handle_receiver.recv().unwrap(); - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded let r = handle.post_message(msg.clone()); assert!(r.is_ok()); let event = handle.get_event().await.unwrap(); diff --git a/runtime/worker.rs b/runtime/worker.rs index b4c27b4f4a6206..6742802625e83c 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -111,7 +111,7 @@ impl MainWorker { metrics::init(), // Runtime ops ops::runtime::init(main_module), - ops::worker_host::init(true, options.create_web_worker_cb.clone()), + ops::worker_host::init(options.create_web_worker_cb.clone()), ops::fs_events::init(), ops::fs::init(), ops::http::init(), From 31f5da6d61fc7dd20e7fa64e3dd71cc168b33345 Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Sun, 9 May 2021 21:08:10 +0200 Subject: [PATCH 04/13] fix racy worker --- cli/tests/workers/racy_worker.js | 36 +++++++++++++++++--------------- cli/tests/workers/test.ts | 5 +---- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/cli/tests/workers/racy_worker.js b/cli/tests/workers/racy_worker.js index a7c75be883ee7b..0f66c6278521fd 100644 --- a/cli/tests/workers/racy_worker.js +++ b/cli/tests/workers/racy_worker.js @@ -1,23 +1,25 @@ // See issue for details // https://github.com/denoland/deno/issues/4080 // -// After first call to `postMessage() this worker schedules -// [close(), postMessage()] ops on the same turn of microtask queue -// (because message is rather big). -// Only single `postMessage()` call should make it -// to host, ie. after calling `close()` no more code should be run. +// After first received message, this worker schedules +// [assert(), close(), assert()] ops on the same turn of microtask queue +// All tasks after close should not make it -setTimeout(() => { - close(); -}, 50); - -while (true) { - await new Promise((done) => { +onmessage = async function () { + let stage = 0; + await new Promise((_) => { + setTimeout(() => { + if (stage !== 0) throw "Unexpected stage"; + stage = 1; + }, 50); + setTimeout(() => { + if (stage !== 1) throw "Unexpected stage"; + stage = 2; + postMessage("DONE"); + close(); + }, 50); setTimeout(() => { - postMessage({ - buf: [...new Array(999999)].map((element, index) => index), - }); - done(); - }); + throw "This should not be run"; + }, 50); }); -} +}; diff --git a/cli/tests/workers/test.ts b/cli/tests/workers/test.ts index c3ccebfbb9b2c4..402978da968372 100644 --- a/cli/tests/workers/test.ts +++ b/cli/tests/workers/test.ts @@ -198,15 +198,12 @@ Deno.test({ ); racyWorker.onmessage = (e): void => { - assertEquals(e.data.buf.length, 999999); - racyWorker.onmessage = (_e): void => { - throw new Error("unreachable"); - }; setTimeout(() => { promise.resolve(); }, 100); }; + racyWorker.postMessage("START"); await promise; }, }); From 4aa3ffeb8b133b1128a0c0a3d29e0faa0fdc164b Mon Sep 17 00:00:00 2001 From: Tim Ramlot <42113979+inteon@users.noreply.github.com> Date: Mon, 10 May 2021 10:55:51 +0200 Subject: [PATCH 05/13] Apply suggestions from code review Co-authored-by: Ben Noordhuis --- runtime/ops/web_worker.rs | 6 +----- runtime/web_worker.rs | 2 +- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/runtime/ops/web_worker.rs b/runtime/ops/web_worker.rs index fd5cd6103c7852..184729f6cc0ea3 100644 --- a/runtime/ops/web_worker.rs +++ b/runtime/ops/web_worker.rs @@ -34,11 +34,7 @@ pub fn init() -> Extension { let maybe_data = temp.get_message().await; - if let Some(data) = maybe_data { - return Ok(data); - } - - Ok(Box::new([])) + Ok(maybe_data.unwrap_or_default()) }), ), // Notify host that guest worker closes. diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 7c531b4bbed777..ff74eed3e0246b 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -62,7 +62,7 @@ impl WebWorkerInternalHandle { pub fn post_event(&self, event: WorkerEvent) -> Result<(), AnyError> { let mut sender = self.sender.clone(); // If the channel is closed, - // the worker must have terminated but the termination message has not yet been recieved. + // the worker must have terminated but the termination message has not yet been received. // // Therefore just treat it as if the worker has terminated and return. if sender.is_closed() { From 676f4baf95094c4c72a5a85c6269a4e761043509 Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Mon, 10 May 2021 11:39:47 +0200 Subject: [PATCH 06/13] define WorkerId as new struct instead of type alias --- runtime/ops/worker_host.rs | 4 ++-- runtime/web_worker.rs | 25 +++++++++++++++++++++---- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs index cf708758d974eb..b24294ccbbd474 100644 --- a/runtime/ops/worker_host.rs +++ b/runtime/ops/worker_host.rs @@ -454,7 +454,7 @@ fn op_create_worker( let worker_id = state.take::(); let create_module_loader = state.take::(); state.put::(create_module_loader.clone()); - state.put::(worker_id + 1); + state.put::(worker_id.next().unwrap()); let module_specifier = deno_core::resolve_url(&specifier)?; let worker_name = args_name.unwrap_or_else(|| "".to_string()); @@ -464,7 +464,7 @@ fn op_create_worker( // Setup new thread let thread_builder = - std::thread::Builder::new().name(format!("deno-worker-{}", worker_id)); + std::thread::Builder::new().name(format!("{}", worker_id)); // Spawn it let join_handle = thread_builder.spawn(move || { diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index ff74eed3e0246b..2571efedab5cbc 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -13,6 +13,8 @@ use deno_core::futures::channel::mpsc; use deno_core::futures::future::poll_fn; use deno_core::futures::future::FutureExt; use deno_core::futures::stream::StreamExt; +use deno_core::serde::Deserialize; +use deno_core::serde::Serialize; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::url::Url; @@ -29,6 +31,7 @@ use deno_file::BlobUrlStore; use log::debug; use std::cell::RefCell; use std::env; +use std::fmt; use std::rc::Rc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; @@ -37,8 +40,22 @@ use std::task::Context; use std::task::Poll; use tokio::sync::Mutex as AsyncMutex; -pub type WorkerId = u32; -pub type WorkerMessage = Box<[u8]>; +#[derive( + Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, +)] +pub struct WorkerId(u32); +impl fmt::Display for WorkerId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "worker-{}", self.0) + } +} +impl WorkerId { + pub fn next(&self) -> Option { + self.0.checked_add(1).map(WorkerId) + } +} + +type WorkerMessage = Box<[u8]>; /// Events that are sent to host from child /// worker. @@ -351,7 +368,7 @@ impl WebWorker { // Instead of using name for log we use `worker-${id}` because // WebWorkers can have empty string as name. let script = format!( - "bootstrap.workerRuntime({}, \"{}\", {}, \"worker-{}\")", + "bootstrap.workerRuntime({}, \"{}\", {}, \"{}\")", runtime_options_str, self.name, options.use_deno_namespace, self.id ); self @@ -551,7 +568,7 @@ mod tests { "TEST".to_string(), Permissions::allow_all(), main_module, - 1, + WorkerId(1), &options, ); worker.bootstrap(&options); From 16014592f34c03b8ff3bdcb2fb7c830689b57e4e Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Mon, 10 May 2021 11:46:49 +0200 Subject: [PATCH 07/13] added TODO comments --- runtime/web_worker.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 2571efedab5cbc..603447da6c3293 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -603,6 +603,7 @@ mod tests { let mut handle = handle_receiver.recv().unwrap(); + // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded let r = handle.post_message(msg.clone()); assert!(r.is_ok()); @@ -617,11 +618,13 @@ mod tests { assert!(maybe_msg.is_some()); match maybe_msg { Some(WorkerEvent::Message(buf)) => { + // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value assert_eq!(*buf, [65, 3, 73, 2, 73, 4, 73, 6, 36, 0, 3]); } _ => unreachable!(), } + // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value let msg = vec![34, 4, 101, 120, 105, 116].into_boxed_slice(); // "exit" encoded let r = handle.post_message(msg); assert!(r.is_ok()); @@ -647,6 +650,7 @@ mod tests { let mut handle = handle_receiver.recv().unwrap(); + // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded let r = handle.post_message(msg.clone()); assert!(r.is_ok()); From 1256524aa9da3b844d1eee0d0d3a358f35453946 Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Mon, 10 May 2021 23:51:21 +0200 Subject: [PATCH 08/13] optimize worker <-> host message passing ops --- runtime/js/11_workers.js | 69 ++++++++++-------------- runtime/js/99_main.js | 2 +- runtime/ops/web_worker.rs | 101 ++++++++++++++++++++--------------- runtime/ops/worker_host.rs | 85 ++++++++++++++--------------- runtime/web_worker.rs | 12 +++-- serde_v8/src/magic/buffer.rs | 13 ++++- 6 files changed, 147 insertions(+), 135 deletions(-) diff --git a/runtime/js/11_workers.js b/runtime/js/11_workers.js index 728d4a9b04e645..dca83c818172d1 100644 --- a/runtime/js/11_workers.js +++ b/runtime/js/11_workers.js @@ -224,57 +224,44 @@ #poll = async () => { while (!this.#terminated) { - const event = await hostGetMessage(this.#id); + const [type, data] = await hostGetMessage(this.#id); // If terminate was called then we ignore all messages if (this.#terminated) { return; } - const type = event.type; - - if (type === "terminalError") { - this.#terminated = true; - if (!this.#handleError(event.error)) { - if (globalThis instanceof Window) { - throw new Error("Unhandled error event reached main worker."); - } else { - core.opSync( - "op_worker_unhandled_error", - event.error.message, - ); - } + switch (type) { + case 0: { // Message + const msg = core.deserialize(data); + this.#handleMessage(msg); + break; } - continue; - } - - if (type === "msg") { - const data = core.deserialize(new Uint8Array(event.data)); - this.#handleMessage(data); - continue; - } - - if (type === "error") { - if (!this.#handleError(event.error)) { - if (globalThis instanceof Window) { - throw new Error("Unhandled error event reached main worker."); - } else { - core.opSync( - "op_worker_unhandled_error", - event.error.message, - ); + case 1: { // TerminalError + this.#terminated = true; + } /* falls through */ + case 2: { // Error + if (!this.#handleError(data)) { + if (globalThis instanceof Window) { + throw new Error("Unhandled error event reached main worker."); + } else { + core.opSync( + "op_worker_unhandled_error", + data.message, + ); + } } + break; + } + case 3: { // Close + log(`Host got "close" message from worker: ${this.#name}`); + this.#terminated = true; + return; + } + default: { + throw new Error(`Unknown worker event: "${type}"`); } - continue; - } - - if (type === "close") { - log(`Host got "close" message from worker: ${this.#name}`); - this.#terminated = true; - return; } - - throw new Error(`Unknown worker event: "${type}"`); } }; diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index 305ce211648ae3..28788f27ddec5d 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -83,7 +83,7 @@ delete Object.prototype.__proto__; async function pollForMessages() { while (!isClosing) { const bufferMsg = await opGetMessage(); - const data = core.deserialize(new Uint8Array(bufferMsg)); + const data = core.deserialize(bufferMsg); const msgEvent = new MessageEvent("message", { cancelable: false, diff --git a/runtime/ops/web_worker.rs b/runtime/ops/web_worker.rs index 184729f6cc0ea3..374134b86153cd 100644 --- a/runtime/ops/web_worker.rs +++ b/runtime/ops/web_worker.rs @@ -4,62 +4,77 @@ use crate::web_worker::WebWorkerInternalHandle; use crate::web_worker::WorkerEvent; use deno_core::error::generic_error; use deno_core::error::null_opbuf; +use deno_core::error::AnyError; use deno_core::op_async; use deno_core::op_sync; use deno_core::Extension; +use deno_core::OpState; use deno_core::ZeroCopyBuf; +use std::cell::RefCell; +use std::rc::Rc; pub fn init() -> Extension { Extension::builder() .ops(vec![ - ( - "op_worker_post_message", - op_sync(move |state, _args: (), buf: Option| { - let buf = buf.ok_or_else(null_opbuf)?; - let msg_buf: Box<[u8]> = (*buf).into(); - let handle = state.borrow::().clone(); - handle - .post_event(WorkerEvent::Message(msg_buf)) - .expect("Failed to post message to host"); - Ok(()) - }), - ), - ( - "op_worker_get_message", - op_async(move |state, _: (), _: ()| async move { - let temp = { - let a = state.borrow(); - a.borrow::().clone() - }; - - let maybe_data = temp.get_message().await; - - Ok(maybe_data.unwrap_or_default()) - }), - ), + ("op_worker_post_message", op_sync(op_worker_post_message)), + ("op_worker_get_message", op_async(op_worker_get_message)), // Notify host that guest worker closes. - ( - "op_worker_close", - op_sync(|state, _: (), _: ()| { - // Notify parent that we're finished - let mut handle = - state.borrow_mut::().clone(); - - handle.terminate(); - Ok(()) - }), - ), + ("op_worker_close", op_sync(op_worker_close)), // Notify host that guest worker has unhandled error. ( "op_worker_unhandled_error", - op_sync(|state, message: String, _: ()| { - let sender = state.borrow::().clone(); - sender - .post_event(WorkerEvent::Error(generic_error(message))) - .expect("Failed to propagate error event to parent worker"); - Ok(true) - }), + op_sync(op_worker_unhandled_error), ), ]) .build() } + +fn op_worker_post_message( + state: &mut OpState, + _: (), + buf: Option, +) -> Result<(), AnyError> { + let buf = buf.ok_or_else(null_opbuf)?; + let handle = state.borrow::().clone(); + handle + .post_event(WorkerEvent::Message(buf)) + .expect("Failed to post message to host"); + Ok(()) +} + +async fn op_worker_get_message( + state: Rc>, + _: (), + _: (), +) -> Result { + let temp = { + let a = state.borrow(); + a.borrow::().clone() + }; + + let maybe_data = temp.get_message().await; + + Ok(maybe_data.unwrap_or_default()) +} + +#[allow(clippy::unnecessary_wraps)] +fn op_worker_close(state: &mut OpState, _: (), _: ()) -> Result<(), AnyError> { + // Notify parent that we're finished + let mut handle = state.borrow_mut::().clone(); + + handle.terminate(); + Ok(()) +} + +#[allow(clippy::unnecessary_wraps)] +fn op_worker_unhandled_error( + state: &mut OpState, + message: String, + _: (), +) -> Result<(), AnyError> { + let sender = state.borrow::().clone(); + sender + .post_event(WorkerEvent::Error(generic_error(message))) + .expect("Failed to propagate error event to parent worker"); + Ok(()) +} diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs index b24294ccbbd474..a5698fa6e7cfc5 100644 --- a/runtime/ops/worker_host.rs +++ b/runtime/ops/worker_host.rs @@ -27,7 +27,6 @@ use deno_core::serde::de::SeqAccess; use deno_core::serde::Deserialize; use deno_core::serde::Deserializer; use deno_core::serde_json::json; -use deno_core::serde_json::Value; use deno_core::Extension; use deno_core::ModuleSpecifier; use deno_core::OpState; @@ -529,43 +528,42 @@ fn op_host_terminate_worker( Ok(()) } -fn serialize_worker_event(event: WorkerEvent) -> Value { - match event { - WorkerEvent::Message(buf) => json!({ "type": "msg", "data": buf }), - WorkerEvent::TerminalError(error) => match error.downcast::() { - Ok(js_error) => json!({ - "type": "terminalError", - "error": { - "message": js_error.message, - "fileName": js_error.script_resource_name, - "lineNumber": js_error.line_number, - "columnNumber": js_error.start_column, - } - }), - Err(error) => json!({ - "type": "terminalError", - "error": { - "message": error.to_string(), - } - }), - }, - WorkerEvent::Error(error) => match error.downcast::() { - Ok(js_error) => json!({ - "type": "error", - "error": { - "message": js_error.message, - "fileName": js_error.script_resource_name, - "lineNumber": js_error.line_number, - "columnNumber": js_error.start_column, - } - }), - Err(error) => json!({ - "type": "error", - "error": { - "message": error.to_string(), - } - }), - }, +use deno_core::serde::Serialize; +use deno_core::serde::Serializer; + +impl Serialize for WorkerEvent { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let type_id = match &self { + WorkerEvent::Message(_) => 0_i32, + WorkerEvent::TerminalError(_) => 1_i32, + WorkerEvent::Error(_) => 2_i32, + WorkerEvent::Close => 3_i32, + }; + + match self { + WorkerEvent::Message(buf) => { + Serialize::serialize(&(type_id, buf), serializer) + } + WorkerEvent::TerminalError(error) | WorkerEvent::Error(error) => { + let value = match error.downcast_ref::() { + Some(js_error) => json!({ + "message": js_error.message, + "fileName": js_error.script_resource_name, + "lineNumber": js_error.line_number, + "columnNumber": js_error.start_column, + }), + None => json!({ + "message": error.to_string(), + }), + }; + + Serialize::serialize(&(type_id, value), serializer) + } + _ => Serialize::serialize(&(type_id, ()), serializer), + } } } @@ -590,7 +588,7 @@ async fn op_host_get_message( state: Rc>, id: WorkerId, _: (), -) -> Result { +) -> Result { let worker_handle = { let s = state.borrow(); let workers_table = s.borrow::(); @@ -599,7 +597,7 @@ async fn op_host_get_message( handle.worker_handle.clone() } else { // If handle was not found it means worker has already shutdown - return Ok(json!({ "type": "close" })); + return Ok(WorkerEvent::Close); } }; @@ -609,12 +607,12 @@ async fn op_host_get_message( if let WorkerEvent::TerminalError(_) = &event { try_remove_and_close(state, id); } - return Ok(serialize_worker_event(event)); + return Ok(event); } // If there was no event from worker it means it has already been closed. try_remove_and_close(state, id); - Ok(json!({ "type": "close" })) + Ok(WorkerEvent::Close) } /// Post message to guest worker as host @@ -623,8 +621,7 @@ fn op_host_post_message( id: WorkerId, data: Option, ) -> Result<(), AnyError> { - let data = data.ok_or_else(null_opbuf)?; - let msg: Box<[u8]> = (*data).into(); + let msg = data.ok_or_else(null_opbuf)?; debug!("post message to worker {}", id); let worker_thread = state diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 603447da6c3293..9a25ef940f0e7f 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -27,6 +27,7 @@ use deno_core::ModuleId; use deno_core::ModuleLoader; use deno_core::ModuleSpecifier; use deno_core::RuntimeOptions; +use deno_core::ZeroCopyBuf; use deno_file::BlobUrlStore; use log::debug; use std::cell::RefCell; @@ -55,7 +56,7 @@ impl WorkerId { } } -type WorkerMessage = Box<[u8]>; +type WorkerMessage = ZeroCopyBuf; /// Events that are sent to host from child /// worker. @@ -63,6 +64,7 @@ pub enum WorkerEvent { Message(WorkerMessage), Error(AnyError), TerminalError(AnyError), + Close, } // Channels used for communication with worker's parent @@ -605,13 +607,13 @@ mod tests { // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded - let r = handle.post_message(msg.clone()); + let r = handle.post_message(msg.clone().into()); assert!(r.is_ok()); let maybe_msg = handle.get_event().await.unwrap(); assert!(maybe_msg.is_some()); - let r = handle.post_message(msg.clone()); + let r = handle.post_message(msg.clone().into()); assert!(r.is_ok()); let maybe_msg = handle.get_event().await.unwrap(); @@ -626,7 +628,7 @@ mod tests { // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value let msg = vec![34, 4, 101, 120, 105, 116].into_boxed_slice(); // "exit" encoded - let r = handle.post_message(msg); + let r = handle.post_message(msg.into()); assert!(r.is_ok()); let event = handle.get_event().await.unwrap(); assert!(event.is_none()); @@ -652,7 +654,7 @@ mod tests { // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded - let r = handle.post_message(msg.clone()); + let r = handle.post_message(msg.clone().into()); assert!(r.is_ok()); let event = handle.get_event().await.unwrap(); assert!(event.is_none()); diff --git a/serde_v8/src/magic/buffer.rs b/serde_v8/src/magic/buffer.rs index 893bf35e1c0226..c14284c7774482 100644 --- a/serde_v8/src/magic/buffer.rs +++ b/serde_v8/src/magic/buffer.rs @@ -14,6 +14,8 @@ pub enum MagicBuffer { ToV8(Cell>>), } +unsafe impl Sync for MagicBuffer {} + impl MagicBuffer { pub fn new<'s>( scope: &mut v8::HandleScope<'s>, @@ -32,6 +34,12 @@ impl Clone for MagicBuffer { } } +impl Default for MagicBuffer { + fn default() -> Self { + MagicBuffer::ToV8(Cell::new(Some(vec![0_u8; 0].into_boxed_slice()))) + } +} + impl AsRef<[u8]> for MagicBuffer { fn as_ref(&self) -> &[u8] { &*self @@ -88,7 +96,10 @@ impl serde::Serialize for MagicBuffer { let mut s = serializer.serialize_struct(BUF_NAME, 1)?; let boxed: Box<[u8]> = match self { - Self::FromV8(_) => unreachable!(), + Self::FromV8(buf) => { + let value: &[u8] = &*buf; + value.into() + } Self::ToV8(x) => x.take().expect("MagicBuffer was empty"), }; let hack: [usize; 2] = unsafe { std::mem::transmute(boxed) }; From 921f354ce248f8b5330d6ee5a2ee07e3655bcc6f Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Tue, 11 May 2021 09:45:13 +0200 Subject: [PATCH 09/13] add comment to op_worker_unhandled_error --- runtime/ops/web_worker.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/runtime/ops/web_worker.rs b/runtime/ops/web_worker.rs index 374134b86153cd..54d31fc4c4ba9f 100644 --- a/runtime/ops/web_worker.rs +++ b/runtime/ops/web_worker.rs @@ -66,6 +66,11 @@ fn op_worker_close(state: &mut OpState, _: (), _: ()) -> Result<(), AnyError> { Ok(()) } +/// A worker that encounters an uncaught error will pass this error +/// to its parent worker using this op. The parent worker will use +/// this same op to pass the error to its own parent (in case +/// `e.preventDefault()` was not called in `worker.onerror`). This +/// is done until the error reaches the root/ main worker. #[allow(clippy::unnecessary_wraps)] fn op_worker_unhandled_error( state: &mut OpState, From e95db553a5cc808704294b4350b19d9fc4308780 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 11 May 2021 17:42:50 +0200 Subject: [PATCH 10/13] review comments --- runtime/js/99_main.js | 18 +++--------------- runtime/ops/web_worker.rs | 2 +- runtime/web_worker.rs | 4 +++- serde_v8/src/magic/buffer.rs | 20 ++++++++------------ 4 files changed, 15 insertions(+), 29 deletions(-) diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index 28788f27ddec5d..1c88931c4c128b 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -66,7 +66,7 @@ delete Object.prototype.__proto__; } isClosing = true; - opCloseWorker(); + core.opSync("op_worker_close"); } // TODO(bartlomieju): remove these functions @@ -76,13 +76,13 @@ delete Object.prototype.__proto__; function postMessage(data) { const dataIntArray = core.serialize(data); - opPostMessage(dataIntArray); + core.opSync("op_worker_post_message", null, dataIntArray); } let isClosing = false; async function pollForMessages() { while (!isClosing) { - const bufferMsg = await opGetMessage(); + const bufferMsg = await core.opAsync("op_worker_get_message"); const data = core.deserialize(bufferMsg); const msgEvent = new MessageEvent("message", { @@ -136,18 +136,6 @@ delete Object.prototype.__proto__; } } - function opGetMessage() { - return core.opAsync("op_worker_get_message"); - } - - function opPostMessage(data) { - core.opSync("op_worker_post_message", null, data); - } - - function opCloseWorker() { - core.opSync("op_worker_close"); - } - function opMainModule() { return core.opSync("op_main_module"); } diff --git a/runtime/ops/web_worker.rs b/runtime/ops/web_worker.rs index 54d31fc4c4ba9f..e3ede869da0122 100644 --- a/runtime/ops/web_worker.rs +++ b/runtime/ops/web_worker.rs @@ -54,7 +54,7 @@ async fn op_worker_get_message( let maybe_data = temp.get_message().await; - Ok(maybe_data.unwrap_or_default()) + Ok(maybe_data.unwrap_or_else(ZeroCopyBuf::empty)) } #[allow(clippy::unnecessary_wraps)] diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 9a25ef940f0e7f..5b731a0f514edd 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -459,7 +459,9 @@ impl WebWorker { return Poll::Pending; } - panic!("coding error: either js is polling or the worker is terminted"); + panic!( + "coding error: either js is polling or the worker is terminated" + ); } Poll::Pending => Poll::Pending, } diff --git a/serde_v8/src/magic/buffer.rs b/serde_v8/src/magic/buffer.rs index c14284c7774482..6bda7024cd84fc 100644 --- a/serde_v8/src/magic/buffer.rs +++ b/serde_v8/src/magic/buffer.rs @@ -1,9 +1,9 @@ use rusty_v8 as v8; -use std::cell::Cell; use std::fmt; use std::ops::Deref; use std::ops::DerefMut; +use std::sync::Mutex; use super::zero_copy_buf::ZeroCopyBuf; @@ -11,11 +11,9 @@ use super::zero_copy_buf::ZeroCopyBuf; // allowing us to use a single type for familiarity pub enum MagicBuffer { FromV8(ZeroCopyBuf), - ToV8(Cell>>), + ToV8(Mutex>>), } -unsafe impl Sync for MagicBuffer {} - impl MagicBuffer { pub fn new<'s>( scope: &mut v8::HandleScope<'s>, @@ -23,6 +21,10 @@ impl MagicBuffer { ) -> Self { Self::FromV8(ZeroCopyBuf::new(scope, view)) } + + pub fn empty() -> Self { + MagicBuffer::ToV8(Mutex::new(Some(vec![0_u8; 0].into_boxed_slice()))) + } } impl Clone for MagicBuffer { @@ -34,12 +36,6 @@ impl Clone for MagicBuffer { } } -impl Default for MagicBuffer { - fn default() -> Self { - MagicBuffer::ToV8(Cell::new(Some(vec![0_u8; 0].into_boxed_slice()))) - } -} - impl AsRef<[u8]> for MagicBuffer { fn as_ref(&self) -> &[u8] { &*self @@ -73,7 +69,7 @@ impl DerefMut for MagicBuffer { impl From> for MagicBuffer { fn from(buf: Box<[u8]>) -> Self { - MagicBuffer::ToV8(Cell::new(Some(buf))) + MagicBuffer::ToV8(Mutex::new(Some(buf))) } } @@ -100,7 +96,7 @@ impl serde::Serialize for MagicBuffer { let value: &[u8] = &*buf; value.into() } - Self::ToV8(x) => x.take().expect("MagicBuffer was empty"), + Self::ToV8(x) => x.lock().unwrap().take().expect("MagicBuffer was empty"), }; let hack: [usize; 2] = unsafe { std::mem::transmute(boxed) }; let f1: u64 = hack[0] as u64; From db000ce774363a44f946ccf7d32fa5340af94205 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Tue, 11 May 2021 18:14:53 +0200 Subject: [PATCH 11/13] nit --- runtime/js/99_main.js | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index 1c88931c4c128b..12f61c64dddd0e 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -91,11 +91,8 @@ delete Object.prototype.__proto__; }); try { - if (globalThis["onmessage"]) { - const result = globalThis.onmessage(msgEvent); - if (result && "then" in result) { - await result; - } + if (globalThis.onmessage) { + await globalThis.onmessage(msgEvent); } globalThis.dispatchEvent(msgEvent); } catch (e) { From 0e7705d7a5b9b3efdb99feaa3ec6b847baed6b02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 11 May 2021 18:56:34 +0200 Subject: [PATCH 12/13] fix buf copy --- serde_v8/src/magic/buffer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/serde_v8/src/magic/buffer.rs b/serde_v8/src/magic/buffer.rs index 6bda7024cd84fc..1fcfffc723dad2 100644 --- a/serde_v8/src/magic/buffer.rs +++ b/serde_v8/src/magic/buffer.rs @@ -93,7 +93,7 @@ impl serde::Serialize for MagicBuffer { let mut s = serializer.serialize_struct(BUF_NAME, 1)?; let boxed: Box<[u8]> = match self { Self::FromV8(buf) => { - let value: &[u8] = &*buf; + let value: &[u8] = &buf; value.into() } Self::ToV8(x) => x.lock().unwrap().take().expect("MagicBuffer was empty"), From ba897c251958c49b1f3e5920b65c7221a2c9d38b Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Tue, 11 May 2021 20:01:39 +0200 Subject: [PATCH 13/13] added postMessage structured cloning regression test --- cli/tests/workers/test.ts | 35 +++++++++++++++++++ .../workers/worker_structured_cloning.ts | 15 ++++++++ 2 files changed, 50 insertions(+) create mode 100644 cli/tests/workers/worker_structured_cloning.ts diff --git a/cli/tests/workers/test.ts b/cli/tests/workers/test.ts index 402978da968372..41988d204a9467 100644 --- a/cli/tests/workers/test.ts +++ b/cli/tests/workers/test.ts @@ -723,3 +723,38 @@ Deno.test({ worker.terminate(); }, }); + +Deno.test({ + name: "structured cloning postMessage", + fn: async function (): Promise { + const result = deferred(); + const worker = new Worker( + new URL("worker_structured_cloning.ts", import.meta.url).href, + { type: "module" }, + ); + + worker.onmessage = (e): void => { + // self field should reference itself (circular ref) + const value = e.data.self.self.self; + + // fields a and b refer to the same array + assertEquals(value.a, ["a", true, 432]); + assertEquals(value.a, ["a", true, 432]); + value.b[0] = "b"; + value.a[2] += 5; + assertEquals(value.a, ["b", true, 437]); + assertEquals(value.b, ["b", true, 437]); + + const len = value.c.size; + value.c.add(1); // This value is already in the set. + value.c.add(2); + assertEquals(len + 1, value.c.size); + + result.resolve(); + }; + + worker.postMessage("START"); + await result; + worker.terminate(); + }, +}); diff --git a/cli/tests/workers/worker_structured_cloning.ts b/cli/tests/workers/worker_structured_cloning.ts new file mode 100644 index 00000000000000..eb1719a9a84c92 --- /dev/null +++ b/cli/tests/workers/worker_structured_cloning.ts @@ -0,0 +1,15 @@ +// More info on structured cloning can be found here: +// https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm + +self.onmessage = () => { + const arr = ["a", true, 432]; + const set = new Set([1, 3, 5, 7, 9]); + const selfReference = { + a: arr, + b: arr, + c: set, + }; + // deno-lint-ignore no-explicit-any + (selfReference as any).self = selfReference; + self.postMessage(selfReference); +};