diff --git a/Cargo.toml b/Cargo.toml index 6e850735..4d524229 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ gloo-render = { version = "0.1.0", path = "crates/render" } gloo-console = { version = "0.2.1", path = "crates/console" } gloo-utils = { version = "0.1.1", path = "crates/utils" } gloo-history = { version = "0.1.0", path = "crates/history" } +gloo-worker = { path = "crates/worker" } [features] default = [] @@ -39,4 +40,5 @@ members = [ "crates/console", "crates/utils", "crates/history", + "crates/worker", ] diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml new file mode 100644 index 00000000..2b5b5949 --- /dev/null +++ b/crates/worker/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "gloo-worker" +version = "0.1.0" +authors = ["Rust and WebAssembly Working Group"] +edition = "2018" +readme = "README.md" +description = "Convenience crate for working with Web Workers" +repository = "https://github.com/rustwasm/gloo/tree/master/crates/worker" +homepage = "https://github.com/rustwasm/gloo" +license = "MIT OR Apache-2.0" +categories = ["api-bindings", "asynchronous", "wasm"] + +[package.metadata.docs.rs] +all-features = true + +rustdoc-args = ["--cfg", "docsrs"] + + +[dependencies] +anymap2 = "0.13" +bincode = "1" +gloo-console = { path = "../console", version = "0.2" } +gloo-utils = { path = "../utils", version = "0.1" } +js-sys = "0.3" +serde = { version = "1", features = ["derive"] } +slab = "0.4" +wasm-bindgen = "0.2" +wasm-bindgen-futures = { version = "0.4", optional = true } + +[dependencies.web-sys] +version = "0.3" +features = [ + "Blob", + "BlobPropertyBag", + "DedicatedWorkerGlobalScope", + "MessageEvent", + "Url", + "Worker", + "WorkerOptions", +] + +[features] +default = [] +futures = ["wasm-bindgen-futures"] diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs new file mode 100644 index 00000000..6f7e9450 --- /dev/null +++ b/crates/worker/src/lib.rs @@ -0,0 +1,158 @@ +//! Workers are a way to offload tasks to web workers. These are run concurrently using +//! [web-workers](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Using_web_workers). +//! +//! # Types of Workers +//! +//! ## Reaches +//! +//! * Public - There will exist at most one instance of a Public Worker at any given time. +//! Bridges will spawn or connect to an already spawned worker in a web worker. +//! When no bridges are connected to this worker, the worker will disappear. +//! +//! * Private - Spawn a new worker in a web worker for every new bridge. This is good for +//! moving shared but independent behavior that communicates with the browser out of components. +//! When the the connected bridge is dropped, the worker will disappear. +//! +//! # Communicating with workers +//! +//! ## Bridges +//! +//! A bridge allows bi-directional communication between an worker and a component. +//! Bridges also allow workers to communicate with one another. +//! +//! ## Dispatchers +//! +//! A dispatcher allows uni-directional communication between a component and an worker. +//! A dispatcher allows a component to send messages to an worker. +//! +//! # Overhead +//! +//! Workers use web workers (i.e. Private and Public). They incur a serialization overhead on the +//! messages they send and receive. Workers use [bincode](https://!github.com/servo/bincode) +//! to communicate with other browser worker, so the cost is substantially higher +//! than just calling a function. + +#![cfg_attr(docsrs, feature(doc_cfg))] + +mod link; +mod pool; +mod worker; + +pub use link::WorkerLink; +pub(crate) use link::*; +pub(crate) use pool::*; +pub use pool::{Dispatched, Dispatcher}; +use std::cell::RefCell; +pub use worker::{Private, PrivateWorker, Public, PublicWorker}; + +use serde::{Deserialize, Serialize}; +use std::fmt; +use std::ops::{Deref, DerefMut}; +use std::rc::Rc; + +/// Alias for `Rc>` +pub type Shared = Rc>; + +/// Alias for `Rc` +pub type Callback = Rc; + +/// Declares the behavior of the worker. +pub trait Worker: Sized + 'static { + /// Reach capability of the worker. + type Reach: Discoverer; + /// Type of an input message. + type Message; + /// Incoming message type. + type Input; + /// Outgoing message type. + type Output; + + /// Creates an instance of an worker. + fn create(link: WorkerLink) -> Self; + + /// This method called on every update message. + fn update(&mut self, msg: Self::Message); + + /// This method called on when a new bridge created. + fn connected(&mut self, _id: HandlerId) {} + + /// This method called on every incoming message. + fn handle_input(&mut self, msg: Self::Input, id: HandlerId); + + /// This method called on when a new bridge destroyed. + fn disconnected(&mut self, _id: HandlerId) {} + + /// This method called when the worker is destroyed. + fn destroy(&mut self) {} + + /// Represents the name of loading resource for remote workers which + /// have to live in a separate files. + fn name_of_resource() -> &'static str { + "main.js" + } + + /// Indicates whether the name of the resource is relative. + /// + /// The default implementation returns `false`, which will cause the result + /// returned by [`Self::name_of_resource`] to be interpreted as an absolute + /// URL. If `true` is returned, it will be interpreted as a relative URL. + fn resource_path_is_relative() -> bool { + false + } + + /// Signifies if resource is a module. + /// This has pending browser support. + fn is_module() -> bool { + false + } +} + +/// Id of responses handler. +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Clone, Copy)] +pub struct HandlerId(usize, bool); + +impl HandlerId { + fn new(id: usize, respondable: bool) -> Self { + HandlerId(id, respondable) + } + fn raw_id(self) -> usize { + self.0 + } + /// Indicates if a handler id corresponds to callback in the Worker runtime. + pub fn is_respondable(self) -> bool { + self.1 + } +} + +/// Determine a visibility of an worker. +#[doc(hidden)] +pub trait Discoverer { + type Worker: Worker; + + /// Spawns an worker and returns `Bridge` implementation. + fn spawn_or_join( + _callback: Option::Output>>, + ) -> Box>; +} + +/// Bridge to a specific kind of worker. +pub trait Bridge { + /// Send a message to an worker. + fn send(&mut self, msg: W::Input); +} + +/// This trait allows registering or getting the address of a worker. +pub trait Bridged: Worker + Sized + 'static { + /// Creates a messaging bridge between a worker and the component. + fn bridge(callback: Callback) -> Box>; +} + +impl Bridged for T +where + T: Worker, + ::Reach: Discoverer, +{ + fn bridge(callback: Callback) -> Box> { + Self::Reach::spawn_or_join(Some(callback)) + } +} diff --git a/crates/worker/src/link.rs b/crates/worker/src/link.rs new file mode 100644 index 00000000..958b7655 --- /dev/null +++ b/crates/worker/src/link.rs @@ -0,0 +1,259 @@ +use crate::Shared; +use crate::{HandlerId, Worker}; +use std::cell::RefCell; +use std::fmt; +#[cfg(feature = "futures")] +use std::future::Future; +use std::rc::Rc; + +/// Defines communication from Worker to Consumers +pub(crate) trait Responder { + /// Implementation for communication channel from Worker to Consumers + fn respond(&self, id: HandlerId, output: W::Output); +} + +/// Link to worker's scope for creating callbacks. +pub struct WorkerLink { + scope: WorkerScope, + responder: Rc>, +} + +impl WorkerLink { + /// Create link for a scope. + pub(crate) fn connect(scope: &WorkerScope, responder: T) -> Self + where + T: Responder + 'static, + { + WorkerLink { + scope: scope.clone(), + responder: Rc::new(responder), + } + } + + /// Send response to an worker. + pub fn respond(&self, id: HandlerId, output: W::Output) { + self.responder.respond(id, output); + } + + /// Send a message to the worker + pub fn send_message(&self, msg: T) + where + T: Into, + { + self.scope.send(WorkerLifecycleEvent::Message(msg.into())); + } + + /// Send an input to self + pub fn send_input(&self, input: T) + where + T: Into, + { + let handler_id = HandlerId::new(0, false); + self.scope + .send(WorkerLifecycleEvent::Input(input.into(), handler_id)); + } + + /// Create a callback which will send a message to the worker when invoked. + pub fn callback(&self, function: F) -> Rc + where + M: Into, + F: Fn(IN) -> M + 'static, + { + let scope = self.scope.clone(); + let closure = move |input| { + let output = function(input).into(); + scope.send(WorkerLifecycleEvent::Message(output)); + }; + Rc::new(closure) + } + + /// This method creates a callback which returns a Future which + /// returns a message to be sent back to the worker + /// + /// # Panics + /// If the future panics, then the promise will not resolve, and + /// will leak. + #[cfg(feature = "futures")] + #[cfg_attr(docsrs, doc(cfg(feature = "futures")))] + pub fn callback_future(&self, function: FN) -> Rc + where + M: Into, + FU: Future + 'static, + FN: Fn(IN) -> FU + 'static, + { + let link = self.clone(); + + let closure = move |input: IN| { + let future: FU = function(input); + link.send_future(future); + }; + + Rc::new(closure) + } + + /// This method processes a Future that returns a message and sends it back to the worker. + /// + /// # Panics + /// If the future panics, then the promise will not resolve, and will leak. + #[cfg(feature = "futures")] + #[cfg_attr(docsrs, doc(cfg(feature = "futures")))] + pub fn send_future(&self, future: F) + where + M: Into, + F: Future + 'static, + { + let link: WorkerLink = self.clone(); + let js_future = async move { + let message: W::Message = future.await.into(); + let cb = link.callback(|m: W::Message| m); + (*cb)(message); + }; + wasm_bindgen_futures::spawn_local(js_future); + } +} + +impl fmt::Debug for WorkerLink { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("WorkerLink<_>") + } +} + +impl Clone for WorkerLink { + fn clone(&self) -> Self { + WorkerLink { + scope: self.scope.clone(), + responder: self.responder.clone(), + } + } +} +/// This struct holds a reference to a component and to a global scheduler. +pub(crate) struct WorkerScope { + state: Shared>, +} + +impl fmt::Debug for WorkerScope { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("WorkerScope<_>") + } +} + +impl Clone for WorkerScope { + fn clone(&self) -> Self { + WorkerScope { + state: self.state.clone(), + } + } +} + +impl WorkerScope { + /// Create worker scope + pub fn new() -> Self { + let state = Rc::new(RefCell::new(WorkerState::new())); + WorkerScope { state } + } + + /// Schedule message for sending to worker + pub fn send(&self, event: WorkerLifecycleEvent) { + let runnable = Box::new(WorkerRunnable { + state: self.state.clone(), + event, + }); + runnable.run(); + } +} + +impl Default for WorkerScope { + fn default() -> Self { + Self::new() + } +} + +struct WorkerState { + worker: Option, + // TODO: Use worker field to control create message this flag + destroyed: bool, +} + +impl WorkerState { + fn new() -> Self { + WorkerState { + worker: None, + destroyed: false, + } + } +} + +/// Internal Worker lifecycle events +#[derive(Debug)] +pub(crate) enum WorkerLifecycleEvent { + /// Request to create link + Create(WorkerLink), + /// Internal Worker message + Message(W::Message), + /// Client connected + Connected(HandlerId), + /// Received message from Client + Input(W::Input, HandlerId), + /// Client disconnected + Disconnected(HandlerId), + /// Request to destroy worker + Destroy, +} + +struct WorkerRunnable { + state: Shared>, + event: WorkerLifecycleEvent, +} + +impl WorkerRunnable +where + W: Worker, +{ + fn run(self) { + let mut state = self.state.borrow_mut(); + if state.destroyed { + return; + } + match self.event { + WorkerLifecycleEvent::Create(link) => { + state.worker = Some(W::create(link)); + } + WorkerLifecycleEvent::Message(msg) => { + state + .worker + .as_mut() + .expect("worker was not created to process messages") + .update(msg); + } + WorkerLifecycleEvent::Connected(id) => { + state + .worker + .as_mut() + .expect("worker was not created to send a connected message") + .connected(id); + } + WorkerLifecycleEvent::Input(inp, id) => { + state + .worker + .as_mut() + .expect("worker was not created to process inputs") + .handle_input(inp, id); + } + WorkerLifecycleEvent::Disconnected(id) => { + state + .worker + .as_mut() + .expect("worker was not created to send a disconnected message") + .disconnected(id); + } + WorkerLifecycleEvent::Destroy => { + let mut worker = state + .worker + .take() + .expect("trying to destroy not existent worker"); + worker.destroy(); + state.destroyed = true; + } + } + } +} diff --git a/crates/worker/src/pool.rs b/crates/worker/src/pool.rs new file mode 100644 index 00000000..65ef40ce --- /dev/null +++ b/crates/worker/src/pool.rs @@ -0,0 +1,89 @@ +use super::*; +use crate::Shared; +use gloo_console as console; +use slab::Slab; + +pub(crate) type Last = bool; + +/// Type alias to a sharable Slab that owns optional callbacks that emit messages of the type of the specified Worker. +pub(crate) type SharedOutputSlab = Shared::Output>>>>; + +/// The slab contains the callback, the id is used to look up the callback, +/// and the output is the message that will be sent via the callback. +pub(crate) fn locate_callback_and_respond( + slab: &SharedOutputSlab, + id: HandlerId, + output: W::Output, +) { + let callback = { + let slab = slab.borrow(); + match slab.get(id.raw_id()).cloned() { + Some(callback) => callback, + None => { + console::warn!(format!( + "Id of handler does not exist in the slab: {}.", + id.raw_id() + )); + return; + } + } + }; + match callback { + Some(callback) => (*callback)(output), + None => console::warn!(format!("The Id of the handler: {}, while present in the slab, is not associated with a callback.", id.raw_id())), + } +} + +/// A newtype around a bridge to indicate that it is distinct from a normal bridge +pub struct Dispatcher(pub(crate) Box>); + +impl fmt::Debug for Dispatcher { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("Dispatcher<_>") + } +} + +impl Deref for Dispatcher { + type Target = dyn Bridge; + + fn deref(&self) -> &Self::Target { + self.0.deref() + } +} +impl DerefMut for Dispatcher { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0.deref_mut() + } +} + +/// This trait allows the creation of a dispatcher to an existing worker that will not send replies when messages are sent. +pub trait Dispatched: Worker + Sized + 'static { + /// Creates a dispatcher to the worker that will not send messages back. + /// + /// # Note + /// Dispatchers don't have `HandlerId`s and therefore `Worker::handle` will be supplied `None` + /// for the `id` parameter, and `connected` and `disconnected` will not be called. + /// + /// # Important + /// Because the Workers using Context or Public reaches use the number of existing bridges to + /// keep track of if the worker itself should exist, creating dispatchers will not guarantee that + /// an Worker will exist to service requests sent from Dispatchers. You **must** keep at least one + /// bridge around if you wish to use a dispatcher. If you are using workers in a write-only manner, + /// then it is suggested that you create a bridge that handles no-op responses as high up in the + /// component hierarchy as possible - oftentimes the root component for simplicity's sake. + fn dispatcher() -> Dispatcher; +} + +#[doc(hidden)] +pub trait Dispatchable {} + +impl Dispatched for T +where + T: Worker, + ::Reach: Discoverer, + ::Reach: Dispatchable, +{ + fn dispatcher() -> Dispatcher { + Dispatcher(Self::Reach::spawn_or_join(None)) + } +} diff --git a/crates/worker/src/worker/mod.rs b/crates/worker/src/worker/mod.rs new file mode 100644 index 00000000..fcb5d6bc --- /dev/null +++ b/crates/worker/src/worker/mod.rs @@ -0,0 +1,169 @@ +mod private; +mod public; +mod queue; + +pub use private::{Private, PrivateWorker}; +pub use public::{Public, PublicWorker}; + +use crate::{HandlerId, Responder, Worker}; +use js_sys::{Array, Reflect, Uint8Array}; +use serde::{Deserialize, Serialize}; +use wasm_bindgen::{closure::Closure, JsCast, JsValue, UnwrapThrowExt}; +use web_sys::{ + Blob, BlobPropertyBag, DedicatedWorkerGlobalScope, MessageEvent, Url, WorkerOptions, +}; + +pub(crate) struct WorkerResponder; + +impl Responder for WorkerResponder +where + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + fn respond(&self, id: HandlerId, output: W::Output) { + let msg = FromWorker::ProcessOutput(id, output); + let data = msg.pack(); + worker_self().post_message_vec(data); + } +} + +/// Message packager, based on serde::Serialize/Deserialize +pub trait Packed { + /// Pack serializable message into Vec + fn pack(&self) -> Vec; + /// Unpack deserializable message of byte slice + fn unpack(data: &[u8]) -> Self; +} + +impl Deserialize<'de>> Packed for T { + fn pack(&self) -> Vec { + bincode::serialize(&self).expect("can't serialize an worker message") + } + + fn unpack(data: &[u8]) -> Self { + bincode::deserialize(data).expect("can't deserialize an worker message") + } +} + +/// Serializable messages to worker +#[derive(Serialize, Deserialize, Debug)] +enum ToWorker { + /// Client is connected + Connected(HandlerId), + /// Incoming message to Worker + ProcessInput(HandlerId, T), + /// Client is disconnected + Disconnected(HandlerId), + /// Worker should be terminated + Destroy, +} + +/// Serializable messages sent by worker to consumer +#[derive(Serialize, Deserialize, Debug)] +enum FromWorker { + /// Worker sends this message when `wasm` bundle has loaded. + WorkerLoaded, + /// Outgoing message to consumer + ProcessOutput(HandlerId, T), +} + +fn send_to_remote(worker: &web_sys::Worker, msg: ToWorker) +where + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + let msg = msg.pack(); + worker.post_message_vec(msg); +} +fn worker_new( + name_of_resource: &str, + resource_is_relative: bool, + is_module: bool, +) -> web_sys::Worker { + let origin = gloo_utils::document() + .location() + .unwrap_throw() + .origin() + .unwrap_throw(); + let pathname = gloo_utils::window().location().pathname().unwrap_throw(); + + let prefix = if resource_is_relative { + pathname + .rfind(|c| c == '/') + .map(|i| &pathname[..i]) + .unwrap_or_default() + } else { + "" + }; + let script_url = format!("{}{}/{}", origin, prefix, name_of_resource); + let wasm_url = format!( + "{}{}/{}", + origin, + prefix, + name_of_resource.replace(".js", "_bg.wasm") + ); + let array = Array::new(); + array.push( + &format!( + r#"importScripts("{}");wasm_bindgen("{}");"#, + script_url, wasm_url + ) + .into(), + ); + let blob = Blob::new_with_str_sequence_and_options( + &array, + BlobPropertyBag::new().type_("application/javascript"), + ) + .unwrap(); + let url = Url::create_object_url_with_blob(&blob).unwrap(); + + if is_module { + let options = WorkerOptions::new(); + Reflect::set( + options.as_ref(), + &JsValue::from_str("type"), + &JsValue::from_str("module"), + ) + .unwrap(); + web_sys::Worker::new_with_options(&url, &options).expect("failed to spawn worker") + } else { + web_sys::Worker::new(&url).expect("failed to spawn worker") + } +} + +fn worker_self() -> DedicatedWorkerGlobalScope { + JsValue::from(js_sys::global()).into() +} + +trait WorkerExt { + fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec)); + + fn post_message_vec(&self, data: Vec); +} + +macro_rules! worker_ext_impl { + ($($type:path),+) => {$( + impl WorkerExt for $type { + fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec)) { + let handler = move |message: MessageEvent| { + let data = Uint8Array::from(message.data()).to_vec(); + handler(data); + }; + let closure = Closure::wrap(Box::new(handler) as Box); + self.set_onmessage(Some(closure.as_ref().unchecked_ref())); + closure.forget(); + } + + fn post_message_vec(&self, data: Vec) { + self.post_message(&Uint8Array::from(data.as_slice())) + .expect("failed to post message"); + } + } + )+}; +} + +worker_ext_impl! { + web_sys::Worker, DedicatedWorkerGlobalScope +} diff --git a/crates/worker/src/worker/private.rs b/crates/worker/src/worker/private.rs new file mode 100644 index 00000000..e990538e --- /dev/null +++ b/crates/worker/src/worker/private.rs @@ -0,0 +1,212 @@ +use crate::worker::*; +use crate::{ + Bridge, Callback, Discoverer, HandlerId, Worker, WorkerLifecycleEvent, WorkerLink, WorkerScope, +}; +use queue::Queue; +use serde::{Deserialize, Serialize}; +use std::cell::RefCell; +use std::fmt; +use std::marker::PhantomData; +use std::rc::Rc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +thread_local! { + static QUEUE: Queue = Queue::new(); +} + +static PRIVATE_ID_COUNTER: AtomicUsize = AtomicUsize::new(0); +const SINGLETON_ID: HandlerId = HandlerId(0, true); + +/// Create a new instance for every bridge. +#[allow(missing_debug_implementations)] +pub struct Private { + _worker: PhantomData, +} + +/// A trait to enable private workers being registered in a web worker. +pub trait PrivateWorker { + /// Executes an worker in the current environment. + /// Uses in `main` function of a worker. + fn register(); +} + +impl PrivateWorker for W +where + W: Worker>, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + fn register() { + let scope = WorkerScope::::new(); + let responder = WorkerResponder; + let link = WorkerLink::connect(&scope, responder); + let upd = WorkerLifecycleEvent::Create(link); + scope.send(upd); + let handler = move |data: Vec| { + let msg = ToWorker::::unpack(&data); + match msg { + ToWorker::Connected(_) => { + let upd = WorkerLifecycleEvent::Connected(SINGLETON_ID); + scope.send(upd); + } + ToWorker::ProcessInput(_, value) => { + let upd = WorkerLifecycleEvent::Input(value, SINGLETON_ID); + scope.send(upd); + } + ToWorker::Disconnected(_) => { + let upd = WorkerLifecycleEvent::Disconnected(SINGLETON_ID); + scope.send(upd); + } + ToWorker::Destroy => { + let upd = WorkerLifecycleEvent::Destroy; + scope.send(upd); + // Terminates web worker + worker_self().close(); + } + } + }; + let loaded: FromWorker = FromWorker::WorkerLoaded; + let loaded = loaded.pack(); + let worker = worker_self(); + worker.set_onmessage_closure(handler); + worker.post_message_vec(loaded); + } +} + +impl Discoverer for Private +where + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + type Worker = W; + + fn spawn_or_join(callback: Option>) -> Box> { + let id = PRIVATE_ID_COUNTER.fetch_add(1, Ordering::Relaxed); + let callback = callback.expect("Callback required for Private workers"); + let handler = move |data: Vec, worker: &web_sys::Worker| { + let msg = FromWorker::::unpack(&data); + match msg { + FromWorker::WorkerLoaded => { + QUEUE.with(|queue| { + queue.insert_loaded_worker(id); + + if let Some(msgs) = queue.remove_msg_queue(&id) { + for msg in msgs { + worker.post_message_vec(msg) + } + } + }); + } + FromWorker::ProcessOutput(id, output) => { + assert_eq!(id.raw_id(), SINGLETON_ID.raw_id()); + (*callback)(output); + } + } + }; + + let name_of_resource = W::name_of_resource(); + let is_relative = W::resource_path_is_relative(); + let handler_cell = Rc::new(RefCell::new(Some(handler))); + + let worker = { + let handler_cell = handler_cell.clone(); + let worker = worker_new(name_of_resource, is_relative, W::is_module()); + let worker_clone = worker.clone(); + worker.set_onmessage_closure(move |data: Vec| { + if let Some(handler) = handler_cell.borrow().as_ref() { + handler(data, &worker_clone) + } + }); + worker + }; + let bridge = PrivateBridge { + handler_cell, + worker, + _worker: PhantomData, + id, + }; + bridge.send_message(ToWorker::Connected(SINGLETON_ID)); + Box::new(bridge) + } +} + +/// A connection manager for components interaction with workers. +pub struct PrivateBridge +where + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, + HNDL: Fn(Vec, &web_sys::Worker), +{ + handler_cell: Rc>>, + worker: web_sys::Worker, + _worker: PhantomData, + id: usize, +} + +impl PrivateBridge +where + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, + HNDL: Fn(Vec, &web_sys::Worker), +{ + /// Send a message to the worker, queuing the message if necessary + fn send_message(&self, msg: ToWorker) { + QUEUE.with(|queue| { + if queue.is_worker_loaded(&self.id) { + send_to_remote::(&self.worker, msg); + } else { + queue.add_msg_to_queue(msg.pack(), self.id); + } + }); + } +} + +impl fmt::Debug for PrivateBridge +where + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, + HNDL: Fn(Vec, &web_sys::Worker), +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("PrivateBridge<_>") + } +} + +impl Bridge for PrivateBridge +where + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, + HNDL: Fn(Vec, &web_sys::Worker), +{ + fn send(&mut self, msg: W::Input) { + let msg = ToWorker::ProcessInput(SINGLETON_ID, msg); + self.send_message(msg); + } +} + +impl Drop for PrivateBridge +where + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, + HNDL: Fn(Vec, &web_sys::Worker), +{ + fn drop(&mut self) { + let disconnected = ToWorker::Disconnected(SINGLETON_ID); + send_to_remote::(&self.worker, disconnected); + + let destroy = ToWorker::Destroy; + send_to_remote::(&self.worker, destroy); + + self.handler_cell.borrow_mut().take(); + + QUEUE.with(|queue| { + queue.remove_worker(&self.id); + }); + } +} diff --git a/crates/worker/src/worker/public.rs b/crates/worker/src/worker/public.rs new file mode 100644 index 00000000..0905b98c --- /dev/null +++ b/crates/worker/src/worker/public.rs @@ -0,0 +1,274 @@ +use crate::worker::*; +use crate::{ + locate_callback_and_respond, Bridge, Callback, Discoverer, Dispatchable, HandlerId, Last, + Shared, SharedOutputSlab, Worker, WorkerLifecycleEvent, WorkerLink, WorkerScope, +}; +use anymap2::{self, AnyMap}; +use queue::Queue; +use slab::Slab; +use std::any::TypeId; +use std::cell::RefCell; +use std::fmt; +use std::marker::PhantomData; +use std::rc::Rc; + +thread_local! { + static REMOTE_WORKERS_POOL: RefCell = RefCell::new(AnyMap::new()); + static QUEUE: Queue = Queue::new(); +} + +/// Create a single instance in a tab. +#[allow(missing_debug_implementations)] +pub struct Public { + _worker: PhantomData, +} + +impl Discoverer for Public +where + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + type Worker = W; + + fn spawn_or_join(callback: Option>) -> Box> { + let bridge = REMOTE_WORKERS_POOL.with(|pool| { + let mut pool = pool.borrow_mut(); + match pool.entry::>() { + anymap2::Entry::Occupied(mut entry) => entry.get_mut().create_bridge(callback), + anymap2::Entry::Vacant(entry) => { + let slab: Shared>>> = + Rc::new(RefCell::new(Slab::new())); + let handler = { + let slab = slab.clone(); + move |data: Vec, worker: &web_sys::Worker| { + let msg = FromWorker::::unpack(&data); + match msg { + FromWorker::WorkerLoaded => { + QUEUE.with(|queue| { + queue.insert_loaded_worker(TypeId::of::()); + + if let Some(msgs) = + queue.remove_msg_queue(&TypeId::of::()) + { + for msg in msgs { + worker.post_message_vec(msg) + } + } + }); + } + FromWorker::ProcessOutput(id, output) => { + locate_callback_and_respond::(&slab, id, output); + } + } + } + }; + let name_of_resource = W::name_of_resource(); + let is_relative = W::resource_path_is_relative(); + let worker = { + let worker = worker_new(name_of_resource, is_relative, W::is_module()); + let worker_clone = worker.clone(); + worker.set_onmessage_closure(move |data: Vec| { + handler(data, &worker_clone); + }); + worker + }; + let launched = RemoteWorker::new(worker, slab); + entry.insert(launched).create_bridge(callback) + } + } + }); + Box::new(bridge) + } +} + +impl Dispatchable for Public +where + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ +} + +/// A connection manager for components interaction with workers. +pub struct PublicBridge +where + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + worker: web_sys::Worker, + id: HandlerId, + _worker: PhantomData, +} + +impl fmt::Debug for PublicBridge +where + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("PublicBridge<_>") + } +} + +impl PublicBridge +where + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + /// Send a message to the worker, queuing the message if necessary + fn send_message(&self, msg: ToWorker) { + QUEUE.with(|queue| { + if queue.is_worker_loaded(&TypeId::of::()) { + send_to_remote::(&self.worker, msg); + } else { + queue.add_msg_to_queue(msg.pack(), TypeId::of::()); + } + }); + } +} + +impl Bridge for PublicBridge +where + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + fn send(&mut self, msg: W::Input) { + let msg = ToWorker::ProcessInput(self.id, msg); + self.send_message(msg); + } +} + +impl Drop for PublicBridge +where + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + fn drop(&mut self) { + let terminate_worker = REMOTE_WORKERS_POOL.with(|pool| { + let mut pool = pool.borrow_mut(); + let terminate_worker = { + if let Some(launched) = pool.get_mut::>() { + launched.remove_bridge(self) + } else { + false + } + }; + + if terminate_worker { + pool.remove::>(); + } + + terminate_worker + }); + + let disconnected = ToWorker::Disconnected(self.id); + self.send_message(disconnected); + + if terminate_worker { + let destroy = ToWorker::Destroy; + self.send_message(destroy); + + QUEUE.with(|queue| { + queue.remove_worker(&TypeId::of::()); + }); + } + } +} + +/// A trait to enable public workers being registered in a web worker. +pub trait PublicWorker { + /// Executes an worker in the current environment. + /// Uses in `main` function of a worker. + fn register(); +} + +impl PublicWorker for W +where + W: Worker>, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + fn register() { + let scope = WorkerScope::::new(); + let responder = WorkerResponder; + let link = WorkerLink::connect(&scope, responder); + let upd = WorkerLifecycleEvent::Create(link); + scope.send(upd); + let handler = move |data: Vec| { + let msg = ToWorker::::unpack(&data); + match msg { + ToWorker::Connected(id) => { + let upd = WorkerLifecycleEvent::Connected(id); + scope.send(upd); + } + ToWorker::ProcessInput(id, value) => { + let upd = WorkerLifecycleEvent::Input(value, id); + scope.send(upd); + } + ToWorker::Disconnected(id) => { + let upd = WorkerLifecycleEvent::Disconnected(id); + scope.send(upd); + } + ToWorker::Destroy => { + let upd = WorkerLifecycleEvent::Destroy; + scope.send(upd); + // Terminates web worker + worker_self().close(); + } + } + }; + let loaded: FromWorker = FromWorker::WorkerLoaded; + let loaded = loaded.pack(); + let worker = worker_self(); + worker.set_onmessage_closure(handler); + worker.post_message_vec(loaded); + } +} + +struct RemoteWorker +where + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + worker: web_sys::Worker, + slab: SharedOutputSlab, +} + +impl RemoteWorker +where + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + pub fn new(worker: web_sys::Worker, slab: SharedOutputSlab) -> Self { + RemoteWorker { worker, slab } + } + + fn create_bridge(&mut self, callback: Option>) -> PublicBridge { + let respondable = callback.is_some(); + let mut slab = self.slab.borrow_mut(); + let id: usize = slab.insert(callback); + let id = HandlerId::new(id, respondable); + let bridge = PublicBridge { + worker: self.worker.clone(), + id, + _worker: PhantomData, + }; + bridge.send_message(ToWorker::Connected(bridge.id)); + + bridge + } + + fn remove_bridge(&mut self, bridge: &PublicBridge) -> Last { + let mut slab = self.slab.borrow_mut(); + let _ = slab.remove(bridge.id.raw_id()); + slab.is_empty() + } +} diff --git a/crates/worker/src/worker/queue.rs b/crates/worker/src/worker/queue.rs new file mode 100644 index 00000000..9faf2324 --- /dev/null +++ b/crates/worker/src/worker/queue.rs @@ -0,0 +1,52 @@ +use std::cell::RefCell; +use std::collections::{hash_map::Entry, HashMap, HashSet}; +use std::hash::Hash; + +/// Thread-local instance used to queue worker messages +pub struct Queue { + loaded_workers: RefCell>, + msg_queue: RefCell>>>, +} + +impl Queue { + pub fn new() -> Queue { + Queue { + loaded_workers: RefCell::new(HashSet::new()), + msg_queue: RefCell::new(HashMap::new()), + } + } + + #[inline] + pub fn remove_msg_queue(&self, id: &T) -> Option>> { + self.msg_queue.borrow_mut().remove(id) + } + + #[inline] + pub fn insert_loaded_worker(&self, id: T) { + self.loaded_workers.borrow_mut().insert(id); + } + + #[inline] + pub fn is_worker_loaded(&self, id: &T) -> bool { + self.loaded_workers.borrow().contains(id) + } + + pub fn add_msg_to_queue(&self, msg: Vec, id: T) { + let mut queue = self.msg_queue.borrow_mut(); + match queue.entry(id) { + Entry::Vacant(record) => { + record.insert(vec![msg]); + } + Entry::Occupied(ref mut record) => { + record.get_mut().push(msg); + } + } + } + + /// This is called by a worker's `Drop` implementation in order to remove the worker from the list + /// of loaded workers. + pub fn remove_worker(&self, id: &T) { + self.loaded_workers.borrow_mut().remove(id); + self.msg_queue.borrow_mut().remove(id); + } +} diff --git a/src/lib.rs b/src/lib.rs index de62f288..655f5c61 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,3 +13,4 @@ pub use gloo_render as render; pub use gloo_storage as storage; pub use gloo_timers as timers; pub use gloo_utils as utils; +pub use gloo_worker as worker;