From 8fe5c1fbde8ce92457db10bb12efe2aa98c3fd6a Mon Sep 17 00:00:00 2001 From: Hamza Date: Sun, 2 Jan 2022 01:06:10 +0500 Subject: [PATCH 1/5] move yew-agent to gloo-workers All the code in this commit is moved from the [Yew repository](https://github.com/yewstack/yew) as of commit https://github.com/yewstack/yew/commit/6f6519dab4c5990029fe8d85c47daf439fdc3988 All the credits go to original code authors. --- Cargo.toml | 2 + crates/worker/Cargo.toml | 34 ++++ crates/worker/src/lib.rs | 124 +++++++++++++ crates/worker/src/link.rs | 255 ++++++++++++++++++++++++++ crates/worker/src/pool.rs | 89 +++++++++ crates/worker/src/worker/mod.rs | 165 +++++++++++++++++ crates/worker/src/worker/private.rs | 210 +++++++++++++++++++++ crates/worker/src/worker/public.rs | 274 ++++++++++++++++++++++++++++ crates/worker/src/worker/queue.rs | 52 ++++++ src/lib.rs | 1 + 10 files changed, 1206 insertions(+) create mode 100644 crates/worker/Cargo.toml create mode 100644 crates/worker/src/lib.rs create mode 100644 crates/worker/src/link.rs create mode 100644 crates/worker/src/pool.rs create mode 100644 crates/worker/src/worker/mod.rs create mode 100644 crates/worker/src/worker/private.rs create mode 100644 crates/worker/src/worker/public.rs create mode 100644 crates/worker/src/worker/queue.rs diff --git a/Cargo.toml b/Cargo.toml index 6e850735..4d524229 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ gloo-render = { version = "0.1.0", path = "crates/render" } gloo-console = { version = "0.2.1", path = "crates/console" } gloo-utils = { version = "0.1.1", path = "crates/utils" } gloo-history = { version = "0.1.0", path = "crates/history" } +gloo-worker = { path = "crates/worker" } [features] default = [] @@ -39,4 +40,5 @@ members = [ "crates/console", "crates/utils", "crates/history", + "crates/worker", ] diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml new file mode 100644 index 00000000..873ac9db --- /dev/null +++ b/crates/worker/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "gloo-worker" +version = "0.1.0" +authors = ["Rust and WebAssembly Working Group"] +edition = "2018" +readme = "README.md" +description = "Convenience crate for working with Web Workers" +repository = "https://github.com/rustwasm/gloo/tree/master/crates/worker" +homepage = "https://github.com/rustwasm/gloo" +license = "MIT OR Apache-2.0" +categories = ["api-bindings", "asynchronous", "wasm"] + +[dependencies] +anymap2 = "0.13" +bincode = "1" +gloo-console = "0.2" +gloo-utils = "0.1" +js-sys = "0.3" +serde = { version = "1", features = ["derive"] } +slab = "0.4" +wasm-bindgen = "0.2" +wasm-bindgen-futures = "0.4" + +[dependencies.web-sys] +version = "0.3" +features = [ + "Blob", + "BlobPropertyBag", + "DedicatedWorkerGlobalScope", + "MessageEvent", + "Url", + "Worker", + "WorkerOptions", +] diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs new file mode 100644 index 00000000..166fa1ef --- /dev/null +++ b/crates/worker/src/lib.rs @@ -0,0 +1,124 @@ +//! This module contains Yew's web worker implementation. + +// mod hooks; +mod link; +mod pool; +mod worker; + +use std::cell::RefCell; +// pub use hooks::{use_bridge, UseBridgeHandle}; +pub use link::AgentLink; +pub(crate) use link::*; +pub(crate) use pool::*; +pub use pool::{Dispatched, Dispatcher}; +pub use worker::{Private, PrivateAgent, Public, PublicAgent}; + +use serde::{Deserialize, Serialize}; +use std::fmt; +use std::ops::{Deref, DerefMut}; +use std::rc::Rc; + +/// Alias for Rc> +pub type Shared = Rc>; +pub type Callback = Rc; + +/// Declares the behavior of the agent. +pub trait Agent: Sized + 'static { + /// Reach capability of the agent. + type Reach: Discoverer; + /// Type of an input message. + type Message; + /// Incoming message type. + type Input; + /// Outgoing message type. + type Output; + + /// Creates an instance of an agent. + fn create(link: AgentLink) -> Self; + + /// This method called on every update message. + fn update(&mut self, msg: Self::Message); + + /// This method called on when a new bridge created. + fn connected(&mut self, _id: HandlerId) {} + + /// This method called on every incoming message. + fn handle_input(&mut self, msg: Self::Input, id: HandlerId); + + /// This method called on when a new bridge destroyed. + fn disconnected(&mut self, _id: HandlerId) {} + + /// This method called when the agent is destroyed. + fn destroy(&mut self) {} + + /// Represents the name of loading resorce for remote workers which + /// have to live in a separate files. + fn name_of_resource() -> &'static str { + "main.js" + } + + /// Indicates whether the name of the resource is relative. + /// + /// The default implementation returns `false`, which will cause the result + /// returned by [`Self::name_of_resource`] to be interpreted as an absolute + /// URL. If `true` is returned, it will be interpreted as a relative URL. + fn resource_path_is_relative() -> bool { + false + } + + /// Signifies if resource is a module. + /// This has pending browser support. + fn is_module() -> bool { + false + } +} + +/// Id of responses handler. +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Clone, Copy)] +pub struct HandlerId(usize, bool); + +impl HandlerId { + fn new(id: usize, respondable: bool) -> Self { + HandlerId(id, respondable) + } + fn raw_id(self) -> usize { + self.0 + } + /// Indicates if a handler id corresponds to callback in the Agent runtime. + pub fn is_respondable(self) -> bool { + self.1 + } +} + +/// Determine a visibility of an agent. +#[doc(hidden)] +pub trait Discoverer { + type Agent: Agent; + + /// Spawns an agent and returns `Bridge` implementation. + fn spawn_or_join( + _callback: Option::Output>>, + ) -> Box>; +} + +/// Bridge to a specific kind of worker. +pub trait Bridge { + /// Send a message to an agent. + fn send(&mut self, msg: AGN::Input); +} + +/// This trait allows registering or getting the address of a worker. +pub trait Bridged: Agent + Sized + 'static { + /// Creates a messaging bridge between a worker and the component. + fn bridge(callback: Callback) -> Box>; +} + +impl Bridged for T +where + T: Agent, + ::Reach: Discoverer, +{ + fn bridge(callback: Callback) -> Box> { + Self::Reach::spawn_or_join(Some(callback)) + } +} diff --git a/crates/worker/src/link.rs b/crates/worker/src/link.rs new file mode 100644 index 00000000..21255d90 --- /dev/null +++ b/crates/worker/src/link.rs @@ -0,0 +1,255 @@ +use super::*; +use std::cell::RefCell; +use std::fmt; +use std::future::Future; +use std::rc::Rc; +use wasm_bindgen_futures::spawn_local; +use crate::{Shared}; + +/// Defines communication from Worker to Consumers +pub(crate) trait Responder { + /// Implementation for communication channel from Worker to Consumers + fn respond(&self, id: HandlerId, output: AGN::Output); +} + +/// Link to agent's scope for creating callbacks. +pub struct AgentLink { + scope: AgentScope, + responder: Rc>, +} + +impl AgentLink { + /// Create link for a scope. + pub(crate) fn connect(scope: &AgentScope, responder: T) -> Self + where + T: Responder + 'static, + { + AgentLink { + scope: scope.clone(), + responder: Rc::new(responder), + } + } + + /// Send response to an agent. + pub fn respond(&self, id: HandlerId, output: AGN::Output) { + self.responder.respond(id, output); + } + + /// Send a message to the agent + pub fn send_message(&self, msg: T) + where + T: Into, + { + self.scope.send(AgentLifecycleEvent::Message(msg.into())); + } + + /// Send an input to self + pub fn send_input(&self, input: T) + where + T: Into, + { + let handler_id = HandlerId::new(0, false); + self.scope + .send(AgentLifecycleEvent::Input(input.into(), handler_id)); + } + + /// Create a callback which will send a message to the agent when invoked. + pub fn callback(&self, function: F) -> Rc + where + M: Into, + F: Fn(IN) -> M + 'static, + { + let scope = self.scope.clone(); + let closure = move |input| { + let output = function(input).into(); + scope.send(AgentLifecycleEvent::Message(output)); + }; + Rc::new(closure) + } + + /// This method creates a [`Callback`] which returns a Future which + /// returns a message to be sent back to the agent + /// + /// # Panics + /// If the future panics, then the promise will not resolve, and + /// will leak. + pub fn callback_future(&self, function: FN) -> Rc + where + M: Into, + FU: Future + 'static, + FN: Fn(IN) -> FU + 'static, + { + let link = self.clone(); + + let closure = move |input: IN| { + let future: FU = function(input); + link.send_future(future); + }; + + Rc::new(closure) + } + + /// This method processes a Future that returns a message and sends it back to the agent. + /// + /// # Panics + /// If the future panics, then the promise will not resolve, and will leak. + pub fn send_future(&self, future: F) + where + M: Into, + F: Future + 'static, + { + let link: AgentLink = self.clone(); + let js_future = async move { + let message: AGN::Message = future.await.into(); + let cb = link.callback(|m: AGN::Message| m); + (*cb)(message); + }; + spawn_local(js_future); + } +} + +impl fmt::Debug for AgentLink { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("AgentLink<_>") + } +} + +impl Clone for AgentLink { + fn clone(&self) -> Self { + AgentLink { + scope: self.scope.clone(), + responder: self.responder.clone(), + } + } +} +/// This struct holds a reference to a component and to a global scheduler. +pub(crate) struct AgentScope { + state: Shared>, +} + +impl fmt::Debug for AgentScope { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("AgentScope<_>") + } +} + +impl Clone for AgentScope { + fn clone(&self) -> Self { + AgentScope { + state: self.state.clone(), + } + } +} + +impl AgentScope { + /// Create agent scope + pub fn new() -> Self { + let state = Rc::new(RefCell::new(AgentState::new())); + AgentScope { state } + } + + /// Schedule message for sending to agent + pub fn send(&self, event: AgentLifecycleEvent) { + let runnable = Box::new(AgentRunnable { + state: self.state.clone(), + event, + }); + runnable.run(); + } +} + +impl Default for AgentScope { + fn default() -> Self { + Self::new() + } +} + +struct AgentState { + agent: Option, + // TODO(#939): Use agent field to control create message this flag + destroyed: bool, +} + +impl AgentState { + fn new() -> Self { + AgentState { + agent: None, + destroyed: false, + } + } +} + +/// Internal Agent lifecycle events +#[derive(Debug)] +pub(crate) enum AgentLifecycleEvent { + /// Request to create link + Create(AgentLink), + /// Internal Agent message + Message(AGN::Message), + /// Client connected + Connected(HandlerId), + /// Received message from Client + Input(AGN::Input, HandlerId), + /// Client disconnected + Disconnected(HandlerId), + /// Request to destroy agent + Destroy, +} + +struct AgentRunnable { + state: Shared>, + event: AgentLifecycleEvent, +} + +impl AgentRunnable +where + AGN: Agent, +{ + fn run(self: Box) { + let mut state = self.state.borrow_mut(); + if state.destroyed { + return; + } + match self.event { + AgentLifecycleEvent::Create(link) => { + state.agent = Some(AGN::create(link)); + } + AgentLifecycleEvent::Message(msg) => { + state + .agent + .as_mut() + .expect("agent was not created to process messages") + .update(msg); + } + AgentLifecycleEvent::Connected(id) => { + state + .agent + .as_mut() + .expect("agent was not created to send a connected message") + .connected(id); + } + AgentLifecycleEvent::Input(inp, id) => { + state + .agent + .as_mut() + .expect("agent was not created to process inputs") + .handle_input(inp, id); + } + AgentLifecycleEvent::Disconnected(id) => { + state + .agent + .as_mut() + .expect("agent was not created to send a disconnected message") + .disconnected(id); + } + AgentLifecycleEvent::Destroy => { + let mut agent = state + .agent + .take() + .expect("trying to destroy not existent agent"); + agent.destroy(); + state.destroyed = true; + } + } + } +} diff --git a/crates/worker/src/pool.rs b/crates/worker/src/pool.rs new file mode 100644 index 00000000..39ef888c --- /dev/null +++ b/crates/worker/src/pool.rs @@ -0,0 +1,89 @@ +use super::*; +use gloo_console as console; +use slab::Slab; +use crate::Shared; + +pub(crate) type Last = bool; + +/// Type alias to a sharable Slab that owns optional callbacks that emit messages of the type of the specified Agent. +pub(crate) type SharedOutputSlab = Shared::Output>>>>; + +/// The slab contains the callback, the id is used to look up the callback, +/// and the output is the message that will be sent via the callback. +pub(crate) fn locate_callback_and_respond( + slab: &SharedOutputSlab, + id: HandlerId, + output: AGN::Output, +) { + let callback = { + let slab = slab.borrow(); + match slab.get(id.raw_id()).cloned() { + Some(callback) => callback, + None => { + console::warn!(format!( + "Id of handler does not exist in the slab: {}.", + id.raw_id() + )); + return; + } + } + }; + match callback { + Some(callback) => (*callback)(output), + None => console::warn!(format!("The Id of the handler: {}, while present in the slab, is not associated with a callback.", id.raw_id())), + } +} + +/// A newtype around a bridge to indicate that it is distinct from a normal bridge +pub struct Dispatcher(pub(crate) Box>); + +impl fmt::Debug for Dispatcher { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("Dispatcher<_>") + } +} + +impl Deref for Dispatcher { + type Target = dyn Bridge; + + fn deref(&self) -> &Self::Target { + self.0.deref() + } +} +impl DerefMut for Dispatcher { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0.deref_mut() + } +} + +/// This trait allows the creation of a dispatcher to an existing agent that will not send replies when messages are sent. +pub trait Dispatched: Agent + Sized + 'static { + /// Creates a dispatcher to the agent that will not send messages back. + /// + /// # Note + /// Dispatchers don't have `HandlerId`s and therefore `Agent::handle` will be supplied `None` + /// for the `id` parameter, and `connected` and `disconnected` will not be called. + /// + /// # Important + /// Because the Agents using Context or Public reaches use the number of existing bridges to + /// keep track of if the agent itself should exist, creating dispatchers will not guarantee that + /// an Agent will exist to service requests sent from Dispatchers. You **must** keep at least one + /// bridge around if you wish to use a dispatcher. If you are using agents in a write-only manner, + /// then it is suggested that you create a bridge that handles no-op responses as high up in the + /// component hierarchy as possible - oftentimes the root component for simplicity's sake. + fn dispatcher() -> Dispatcher; +} + +#[doc(hidden)] +pub trait Dispatchable {} + +impl Dispatched for T +where + T: Agent, + ::Reach: Discoverer, + ::Reach: Dispatchable, +{ + fn dispatcher() -> Dispatcher { + Dispatcher(Self::Reach::spawn_or_join(None)) + } +} diff --git a/crates/worker/src/worker/mod.rs b/crates/worker/src/worker/mod.rs new file mode 100644 index 00000000..39202777 --- /dev/null +++ b/crates/worker/src/worker/mod.rs @@ -0,0 +1,165 @@ +mod private; +mod public; +mod queue; + +pub use private::{Private, PrivateAgent}; +pub use public::{Public, PublicAgent}; + +use super::*; +use js_sys::{Array, Reflect, Uint8Array}; +use serde::{Deserialize, Serialize}; +use wasm_bindgen::{closure::Closure, JsCast, JsValue, UnwrapThrowExt}; +use web_sys::{ + Blob, BlobPropertyBag, DedicatedWorkerGlobalScope, MessageEvent, Url, Worker, WorkerOptions, +}; + +pub(crate) struct WorkerResponder {} + +impl Responder for WorkerResponder +where + AGN: Agent, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + fn respond(&self, id: HandlerId, output: AGN::Output) { + let msg = FromWorker::ProcessOutput(id, output); + let data = msg.pack(); + worker_self().post_message_vec(data); + } +} + +/// Message packager, based on serde::Serialize/Deserialize +pub trait Packed { + /// Pack serializable message into Vec + fn pack(&self) -> Vec; + /// Unpack deserializable message of byte slice + fn unpack(data: &[u8]) -> Self; +} + +impl Deserialize<'de>> Packed for T { + fn pack(&self) -> Vec { + bincode::serialize(&self).expect("can't serialize an agent message") + } + + fn unpack(data: &[u8]) -> Self { + bincode::deserialize(data).expect("can't deserialize an agent message") + } +} + +/// Serializable messages to worker +#[derive(Serialize, Deserialize, Debug)] +enum ToWorker { + /// Client is connected + Connected(HandlerId), + /// Incoming message to Worker + ProcessInput(HandlerId, T), + /// Client is disconnected + Disconnected(HandlerId), + /// Worker should be terminated + Destroy, +} + +/// Serializable messages sent by worker to consumer +#[derive(Serialize, Deserialize, Debug)] +enum FromWorker { + /// Worker sends this message when `wasm` bundle has loaded. + WorkerLoaded, + /// Outgoing message to consumer + ProcessOutput(HandlerId, T), +} + +fn send_to_remote(worker: &Worker, msg: ToWorker) +where + AGN: Agent, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + let msg = msg.pack(); + worker.post_message_vec(msg); +} +fn worker_new(name_of_resource: &str, resource_is_relative: bool, is_module: bool) -> Worker { + let origin = gloo_utils::document() + .location() + .unwrap_throw() + .origin() + .unwrap_throw(); + let pathname = gloo_utils::window().location().pathname().unwrap_throw(); + + let prefix = if resource_is_relative { + pathname + .rfind(|c| c == '/') + .map(|i| &pathname[..i]) + .unwrap_or_default() + } else { + "" + }; + let script_url = format!("{}{}/{}", origin, prefix, name_of_resource); + let wasm_url = format!( + "{}{}/{}", + origin, + prefix, + name_of_resource.replace(".js", "_bg.wasm") + ); + let array = Array::new(); + array.push( + &format!( + r#"importScripts("{}");wasm_bindgen("{}");"#, + script_url, wasm_url + ) + .into(), + ); + let blob = Blob::new_with_str_sequence_and_options( + &array, + BlobPropertyBag::new().type_("application/javascript"), + ) + .unwrap(); + let url = Url::create_object_url_with_blob(&blob).unwrap(); + + if is_module { + let options = WorkerOptions::new(); + Reflect::set( + options.as_ref(), + &JsValue::from_str("type"), + &JsValue::from_str("module"), + ) + .unwrap(); + Worker::new_with_options(&url, &options).expect("failed to spawn worker") + } else { + Worker::new(&url).expect("failed to spawn worker") + } +} + +fn worker_self() -> DedicatedWorkerGlobalScope { + JsValue::from(js_sys::global()).into() +} + +trait WorkerExt { + fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec)); + + fn post_message_vec(&self, data: Vec); +} + +macro_rules! worker_ext_impl { + ($($type:ident),+) => {$( + impl WorkerExt for $type { + fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec)) { + let handler = move |message: MessageEvent| { + let data = Uint8Array::from(message.data()).to_vec(); + handler(data); + }; + let closure = Closure::wrap(Box::new(handler) as Box); + self.set_onmessage(Some(closure.as_ref().unchecked_ref())); + closure.forget(); + } + + fn post_message_vec(&self, data: Vec) { + self.post_message(&Uint8Array::from(data.as_slice())) + .expect("failed to post message"); + } + } + )+}; +} + +worker_ext_impl! { + Worker, DedicatedWorkerGlobalScope +} diff --git a/crates/worker/src/worker/private.rs b/crates/worker/src/worker/private.rs new file mode 100644 index 00000000..7a8edded --- /dev/null +++ b/crates/worker/src/worker/private.rs @@ -0,0 +1,210 @@ +use super::*; +use queue::Queue; +use std::cell::RefCell; +use std::fmt; +use std::marker::PhantomData; +use std::rc::Rc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use web_sys::Worker; +use crate::Callback; + +thread_local! { + static QUEUE: Queue = Queue::new(); +} + +static PRIVATE_ID_COUNTER: AtomicUsize = AtomicUsize::new(0); +const SINGLETON_ID: HandlerId = HandlerId(0, true); + +/// Create a new instance for every bridge. +#[allow(missing_debug_implementations)] +pub struct Private { + _agent: PhantomData, +} + +/// A trait to enable private agents being registered in a web worker. +pub trait PrivateAgent { + /// Executes an agent in the current environment. + /// Uses in `main` function of a worker. + fn register(); +} + +impl PrivateAgent for AGN +where + AGN: Agent>, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + fn register() { + let scope = AgentScope::::new(); + let responder = WorkerResponder {}; + let link = AgentLink::connect(&scope, responder); + let upd = AgentLifecycleEvent::Create(link); + scope.send(upd); + let handler = move |data: Vec| { + let msg = ToWorker::::unpack(&data); + match msg { + ToWorker::Connected(_id) => { + let upd = AgentLifecycleEvent::Connected(SINGLETON_ID); + scope.send(upd); + } + ToWorker::ProcessInput(_id, value) => { + let upd = AgentLifecycleEvent::Input(value, SINGLETON_ID); + scope.send(upd); + } + ToWorker::Disconnected(_id) => { + let upd = AgentLifecycleEvent::Disconnected(SINGLETON_ID); + scope.send(upd); + } + ToWorker::Destroy => { + let upd = AgentLifecycleEvent::Destroy; + scope.send(upd); + // Terminates web worker + worker_self().close(); + } + } + }; + let loaded: FromWorker = FromWorker::WorkerLoaded; + let loaded = loaded.pack(); + let worker = worker_self(); + worker.set_onmessage_closure(handler); + worker.post_message_vec(loaded); + } +} + +impl Discoverer for Private +where + AGN: Agent, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + type Agent = AGN; + + fn spawn_or_join(callback: Option>) -> Box> { + let id = PRIVATE_ID_COUNTER.fetch_add(1, Ordering::Relaxed); + let callback = callback.expect("Callback required for Private agents"); + let handler = move |data: Vec, worker: &Worker| { + let msg = FromWorker::::unpack(&data); + match msg { + FromWorker::WorkerLoaded => { + QUEUE.with(|queue| { + queue.insert_loaded_agent(id); + + if let Some(msgs) = queue.remove_msg_queue(&id) { + for msg in msgs { + worker.post_message_vec(msg) + } + } + }); + } + FromWorker::ProcessOutput(id, output) => { + assert_eq!(id.raw_id(), SINGLETON_ID.raw_id()); + (*callback)(output); + } + } + }; + + let name_of_resource = AGN::name_of_resource(); + let is_relative = AGN::resource_path_is_relative(); + let handler_cell = Rc::new(RefCell::new(Some(handler))); + + let worker = { + let handler_cell = handler_cell.clone(); + let worker = worker_new(name_of_resource, is_relative, AGN::is_module()); + let worker_clone = worker.clone(); + worker.set_onmessage_closure(move |data: Vec| { + if let Some(handler) = handler_cell.borrow().as_ref() { + handler(data, &worker_clone) + } + }); + worker + }; + let bridge = PrivateBridge { + handler_cell, + worker, + _agent: PhantomData, + id, + }; + bridge.send_message(ToWorker::Connected(SINGLETON_ID)); + Box::new(bridge) + } +} + +/// A connection manager for components interaction with workers. +pub struct PrivateBridge +where + AGN: Agent, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, + HNDL: Fn(Vec, &Worker), +{ + handler_cell: Rc>>, + worker: Worker, + _agent: PhantomData, + id: usize, +} + +impl PrivateBridge +where + AGN: Agent, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, + HNDL: Fn(Vec, &Worker), +{ + /// Send a message to the worker, queuing the message if necessary + fn send_message(&self, msg: ToWorker) { + QUEUE.with(|queue| { + if queue.is_worker_loaded(&self.id) { + send_to_remote::(&self.worker, msg); + } else { + queue.add_msg_to_queue(msg.pack(), self.id); + } + }); + } +} + +impl fmt::Debug for PrivateBridge +where + AGN: Agent, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, + HNDL: Fn(Vec, &Worker), +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("PrivateBridge<_>") + } +} + +impl Bridge for PrivateBridge +where + AGN: Agent, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, + HNDL: Fn(Vec, &Worker), +{ + fn send(&mut self, msg: AGN::Input) { + let msg = ToWorker::ProcessInput(SINGLETON_ID, msg); + self.send_message(msg); + } +} + +impl Drop for PrivateBridge +where + AGN: Agent, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, + HNDL: Fn(Vec, &Worker), +{ + fn drop(&mut self) { + let disconnected = ToWorker::Disconnected(SINGLETON_ID); + send_to_remote::(&self.worker, disconnected); + + let destroy = ToWorker::Destroy; + send_to_remote::(&self.worker, destroy); + + self.handler_cell.borrow_mut().take(); + + QUEUE.with(|queue| { + queue.remove_agent(&self.id); + }); + } +} diff --git a/crates/worker/src/worker/public.rs b/crates/worker/src/worker/public.rs new file mode 100644 index 00000000..5af5c14e --- /dev/null +++ b/crates/worker/src/worker/public.rs @@ -0,0 +1,274 @@ +use super::WorkerExt; +use super::*; +use anymap2::{self, AnyMap}; +use queue::Queue; +use slab::Slab; +use std::any::TypeId; +use std::cell::RefCell; +use std::fmt; +use std::marker::PhantomData; +use std::rc::Rc; +use web_sys::Worker; +use crate::Callback; +use crate::Shared; + +thread_local! { + static REMOTE_AGENTS_POOL: RefCell = RefCell::new(AnyMap::new()); + static QUEUE: Queue = Queue::new(); +} + +/// Create a single instance in a tab. +#[allow(missing_debug_implementations)] +pub struct Public { + _agent: PhantomData, +} + +impl Discoverer for Public +where + AGN: Agent, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + type Agent = AGN; + + fn spawn_or_join(callback: Option>) -> Box> { + let bridge = REMOTE_AGENTS_POOL.with(|pool| { + let mut pool = pool.borrow_mut(); + match pool.entry::>() { + anymap2::Entry::Occupied(mut entry) => entry.get_mut().create_bridge(callback), + anymap2::Entry::Vacant(entry) => { + let slab: Shared>>> = + Rc::new(RefCell::new(Slab::new())); + let handler = { + let slab = slab.clone(); + move |data: Vec, worker: &Worker| { + let msg = FromWorker::::unpack(&data); + match msg { + FromWorker::WorkerLoaded => { + QUEUE.with(|queue| { + queue.insert_loaded_agent(TypeId::of::()); + + if let Some(msgs) = + queue.remove_msg_queue(&TypeId::of::()) + { + for msg in msgs { + worker.post_message_vec(msg) + } + } + }); + } + FromWorker::ProcessOutput(id, output) => { + locate_callback_and_respond::(&slab, id, output); + } + } + } + }; + let name_of_resource = AGN::name_of_resource(); + let is_relative = AGN::resource_path_is_relative(); + let worker = { + let worker = worker_new(name_of_resource, is_relative, AGN::is_module()); + let worker_clone = worker.clone(); + worker.set_onmessage_closure(move |data: Vec| { + handler(data, &worker_clone); + }); + worker + }; + let launched = RemoteAgent::new(worker, slab); + entry.insert(launched).create_bridge(callback) + } + } + }); + Box::new(bridge) + } +} + +impl Dispatchable for Public +where + AGN: Agent, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ +} + +/// A connection manager for components interaction with workers. +pub struct PublicBridge +where + AGN: Agent, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + worker: Worker, + id: HandlerId, + _agent: PhantomData, +} + +impl fmt::Debug for PublicBridge +where + AGN: Agent, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("PublicBridge<_>") + } +} + +impl PublicBridge +where + AGN: Agent, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + /// Send a message to the worker, queuing the message if necessary + fn send_message(&self, msg: ToWorker) { + QUEUE.with(|queue| { + if queue.is_worker_loaded(&TypeId::of::()) { + send_to_remote::(&self.worker, msg); + } else { + queue.add_msg_to_queue(msg.pack(), TypeId::of::()); + } + }); + } +} + +impl Bridge for PublicBridge +where + AGN: Agent, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + fn send(&mut self, msg: AGN::Input) { + let msg = ToWorker::ProcessInput(self.id, msg); + self.send_message(msg); + } +} + +impl Drop for PublicBridge +where + AGN: Agent, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + fn drop(&mut self) { + let terminate_worker = REMOTE_AGENTS_POOL.with(|pool| { + let mut pool = pool.borrow_mut(); + let terminate_worker = { + if let Some(launched) = pool.get_mut::>() { + launched.remove_bridge(self) + } else { + false + } + }; + + if terminate_worker { + pool.remove::>(); + } + + terminate_worker + }); + + let disconnected = ToWorker::Disconnected(self.id); + self.send_message(disconnected); + + if terminate_worker { + let destroy = ToWorker::Destroy; + self.send_message(destroy); + + QUEUE.with(|queue| { + queue.remove_agent(&TypeId::of::()); + }); + } + } +} + +/// A trait to enable public agents being registered in a web worker. +pub trait PublicAgent { + /// Executes an agent in the current environment. + /// Uses in `main` function of a worker. + fn register(); +} + +impl PublicAgent for AGN +where + AGN: Agent>, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + fn register() { + let scope = AgentScope::::new(); + let responder = WorkerResponder {}; + let link = AgentLink::connect(&scope, responder); + let upd = AgentLifecycleEvent::Create(link); + scope.send(upd); + let handler = move |data: Vec| { + let msg = ToWorker::::unpack(&data); + match msg { + ToWorker::Connected(id) => { + let upd = AgentLifecycleEvent::Connected(id); + scope.send(upd); + } + ToWorker::ProcessInput(id, value) => { + let upd = AgentLifecycleEvent::Input(value, id); + scope.send(upd); + } + ToWorker::Disconnected(id) => { + let upd = AgentLifecycleEvent::Disconnected(id); + scope.send(upd); + } + ToWorker::Destroy => { + let upd = AgentLifecycleEvent::Destroy; + scope.send(upd); + // Terminates web worker + worker_self().close(); + } + } + }; + let loaded: FromWorker = FromWorker::WorkerLoaded; + let loaded = loaded.pack(); + let worker = worker_self(); + worker.set_onmessage_closure(handler); + worker.post_message_vec(loaded); + } +} + +struct RemoteAgent +where + AGN: Agent, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + worker: Worker, + slab: SharedOutputSlab, +} + +impl RemoteAgent +where + AGN: Agent, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + pub fn new(worker: Worker, slab: SharedOutputSlab) -> Self { + RemoteAgent { worker, slab } + } + + fn create_bridge(&mut self, callback: Option>) -> PublicBridge { + let respondable = callback.is_some(); + let mut slab = self.slab.borrow_mut(); + let id: usize = slab.insert(callback); + let id = HandlerId::new(id, respondable); + let bridge = PublicBridge { + worker: self.worker.clone(), + id, + _agent: PhantomData, + }; + bridge.send_message(ToWorker::Connected(bridge.id)); + + bridge + } + + fn remove_bridge(&mut self, bridge: &PublicBridge) -> Last { + let mut slab = self.slab.borrow_mut(); + let _ = slab.remove(bridge.id.raw_id()); + slab.is_empty() + } +} diff --git a/crates/worker/src/worker/queue.rs b/crates/worker/src/worker/queue.rs new file mode 100644 index 00000000..cbe2e83b --- /dev/null +++ b/crates/worker/src/worker/queue.rs @@ -0,0 +1,52 @@ +use std::cell::RefCell; +use std::collections::{hash_map, HashMap, HashSet}; +use std::hash::Hash; + +/// Thread-local instance used to queue worker messages +pub struct Queue { + loaded_agents: RefCell>, + msg_queue: RefCell>>>, +} + +impl Queue { + pub fn new() -> Queue { + Queue { + loaded_agents: RefCell::new(HashSet::new()), + msg_queue: RefCell::new(HashMap::new()), + } + } + + #[inline] + pub fn remove_msg_queue(&self, id: &T) -> Option>> { + self.msg_queue.borrow_mut().remove(id) + } + + #[inline] + pub fn insert_loaded_agent(&self, id: T) { + self.loaded_agents.borrow_mut().insert(id); + } + + #[inline] + pub fn is_worker_loaded(&self, id: &T) -> bool { + self.loaded_agents.borrow().contains(id) + } + + pub fn add_msg_to_queue(&self, msg: Vec, id: T) { + let mut queue = self.msg_queue.borrow_mut(); + match queue.entry(id) { + hash_map::Entry::Vacant(record) => { + record.insert(vec![msg]); + } + hash_map::Entry::Occupied(ref mut record) => { + record.get_mut().push(msg); + } + } + } + + /// This is called by a worker's `Drop` implementation in order to remove the worker from the list + /// of loaded workers. + pub fn remove_agent(&self, id: &T) { + self.loaded_agents.borrow_mut().remove(id); + self.msg_queue.borrow_mut().remove(id); + } +} diff --git a/src/lib.rs b/src/lib.rs index de62f288..655f5c61 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,3 +13,4 @@ pub use gloo_render as render; pub use gloo_storage as storage; pub use gloo_timers as timers; pub use gloo_utils as utils; +pub use gloo_worker as worker; From 1268174ea7a24ce111b649608097301c7336619b Mon Sep 17 00:00:00 2001 From: Hamza Date: Sun, 2 Jan 2022 21:29:41 +0500 Subject: [PATCH 2/5] Improve documentation The crate level documentation is copied from the Agents docs at yew.rs --- crates/worker/Cargo.toml | 16 ++++++++-- crates/worker/src/lib.rs | 46 +++++++++++++++++++++++++---- crates/worker/src/link.rs | 16 ++++++---- crates/worker/src/pool.rs | 2 +- crates/worker/src/worker/mod.rs | 4 +-- crates/worker/src/worker/private.rs | 9 ++++-- crates/worker/src/worker/public.rs | 11 +++---- crates/worker/src/worker/queue.rs | 6 ++-- 8 files changed, 81 insertions(+), 29 deletions(-) diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 873ac9db..2b5b5949 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -10,16 +10,22 @@ homepage = "https://github.com/rustwasm/gloo" license = "MIT OR Apache-2.0" categories = ["api-bindings", "asynchronous", "wasm"] +[package.metadata.docs.rs] +all-features = true + +rustdoc-args = ["--cfg", "docsrs"] + + [dependencies] anymap2 = "0.13" bincode = "1" -gloo-console = "0.2" -gloo-utils = "0.1" +gloo-console = { path = "../console", version = "0.2" } +gloo-utils = { path = "../utils", version = "0.1" } js-sys = "0.3" serde = { version = "1", features = ["derive"] } slab = "0.4" wasm-bindgen = "0.2" -wasm-bindgen-futures = "0.4" +wasm-bindgen-futures = { version = "0.4", optional = true } [dependencies.web-sys] version = "0.3" @@ -32,3 +38,7 @@ features = [ "Worker", "WorkerOptions", ] + +[features] +default = [] +futures = ["wasm-bindgen-futures"] diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 166fa1ef..104ff357 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -1,16 +1,48 @@ -//! This module contains Yew's web worker implementation. +//! Workers are a way to offload tasks to web workers. These are run concurrently using +//! [web-workers](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Using_web_workers). +//! +//! # Types of Agents +//! +//! ## Reaches +//! +//! * Public - There will exist at most one instance of a Public Agent at any given time. +//! Bridges will spawn or connect to an already spawned agent in a web worker. +//! When no bridges are connected to this agent, the agent will disappear. +//! +//! * Private - Spawn a new agent in a web worker for every new bridge. This is good for +//! moving shared but independent behavior that communicates with the browser out of components. +//! When the the connected bridge is dropped, the agent will disappear. +//! +//! # Communicating with workers +//! +//! ## Bridges +//! +//! A bridge allows bi-directional communication between an agent and a component. +//! Bridges also allow agents to communicate with one another. +//! +//! ## Dispatchers +//! +//! A dispatcher allows uni-directional communication between a component and an agent. +//! A dispatcher allows a component to send messages to an agent. +//! +//! # Overhead +//! +//! Agents use web workers (i.e. Private and Public). They incur a serialization overhead on the +//! messages they send and receive. Agents use [bincode](https://!github.com/servo/bincode) +//! to communicate with other browser worker, so the cost is substantially higher +//! than just calling a function. + +#![cfg_attr(docsrs, feature(doc_cfg))] -// mod hooks; mod link; mod pool; mod worker; -use std::cell::RefCell; -// pub use hooks::{use_bridge, UseBridgeHandle}; pub use link::AgentLink; pub(crate) use link::*; pub(crate) use pool::*; pub use pool::{Dispatched, Dispatcher}; +use std::cell::RefCell; pub use worker::{Private, PrivateAgent, Public, PublicAgent}; use serde::{Deserialize, Serialize}; @@ -18,8 +50,10 @@ use std::fmt; use std::ops::{Deref, DerefMut}; use std::rc::Rc; -/// Alias for Rc> +/// Alias for `Rc>` pub type Shared = Rc>; + +/// Alias for `Rc` pub type Callback = Rc; /// Declares the behavior of the agent. @@ -51,7 +85,7 @@ pub trait Agent: Sized + 'static { /// This method called when the agent is destroyed. fn destroy(&mut self) {} - /// Represents the name of loading resorce for remote workers which + /// Represents the name of loading resource for remote workers which /// have to live in a separate files. fn name_of_resource() -> &'static str { "main.js" diff --git a/crates/worker/src/link.rs b/crates/worker/src/link.rs index 21255d90..68824269 100644 --- a/crates/worker/src/link.rs +++ b/crates/worker/src/link.rs @@ -1,10 +1,10 @@ -use super::*; +use crate::Shared; +use crate::{Agent, HandlerId}; use std::cell::RefCell; use std::fmt; +#[cfg(feature = "futures")] use std::future::Future; use std::rc::Rc; -use wasm_bindgen_futures::spawn_local; -use crate::{Shared}; /// Defines communication from Worker to Consumers pub(crate) trait Responder { @@ -67,12 +67,14 @@ impl AgentLink { Rc::new(closure) } - /// This method creates a [`Callback`] which returns a Future which + /// This method creates a callback which returns a Future which /// returns a message to be sent back to the agent /// /// # Panics /// If the future panics, then the promise will not resolve, and /// will leak. + #[cfg(feature = "futures")] + #[cfg_attr(docsrs, doc(cfg(feature = "futures")))] pub fn callback_future(&self, function: FN) -> Rc where M: Into, @@ -93,6 +95,8 @@ impl AgentLink { /// /// # Panics /// If the future panics, then the promise will not resolve, and will leak. + #[cfg(feature = "futures")] + #[cfg_attr(docsrs, doc(cfg(feature = "futures")))] pub fn send_future(&self, future: F) where M: Into, @@ -104,7 +108,7 @@ impl AgentLink { let cb = link.callback(|m: AGN::Message| m); (*cb)(message); }; - spawn_local(js_future); + wasm_bindgen_futures::spawn_local(js_future); } } @@ -205,7 +209,7 @@ impl AgentRunnable where AGN: Agent, { - fn run(self: Box) { + fn run(self) { let mut state = self.state.borrow_mut(); if state.destroyed { return; diff --git a/crates/worker/src/pool.rs b/crates/worker/src/pool.rs index 39ef888c..41847ccd 100644 --- a/crates/worker/src/pool.rs +++ b/crates/worker/src/pool.rs @@ -1,7 +1,7 @@ use super::*; +use crate::Shared; use gloo_console as console; use slab::Slab; -use crate::Shared; pub(crate) type Last = bool; diff --git a/crates/worker/src/worker/mod.rs b/crates/worker/src/worker/mod.rs index 39202777..0c4563df 100644 --- a/crates/worker/src/worker/mod.rs +++ b/crates/worker/src/worker/mod.rs @@ -5,7 +5,7 @@ mod queue; pub use private::{Private, PrivateAgent}; pub use public::{Public, PublicAgent}; -use super::*; +use crate::{Agent, HandlerId, Responder}; use js_sys::{Array, Reflect, Uint8Array}; use serde::{Deserialize, Serialize}; use wasm_bindgen::{closure::Closure, JsCast, JsValue, UnwrapThrowExt}; @@ -13,7 +13,7 @@ use web_sys::{ Blob, BlobPropertyBag, DedicatedWorkerGlobalScope, MessageEvent, Url, Worker, WorkerOptions, }; -pub(crate) struct WorkerResponder {} +pub(crate) struct WorkerResponder; impl Responder for WorkerResponder where diff --git a/crates/worker/src/worker/private.rs b/crates/worker/src/worker/private.rs index 7a8edded..5f7aed8a 100644 --- a/crates/worker/src/worker/private.rs +++ b/crates/worker/src/worker/private.rs @@ -1,12 +1,15 @@ -use super::*; +use crate::worker::*; +use crate::{ + Agent, AgentLifecycleEvent, AgentLink, AgentScope, Bridge, Callback, Discoverer, HandlerId, +}; use queue::Queue; +use serde::{Deserialize, Serialize}; use std::cell::RefCell; use std::fmt; use std::marker::PhantomData; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; use web_sys::Worker; -use crate::Callback; thread_local! { static QUEUE: Queue = Queue::new(); @@ -36,7 +39,7 @@ where { fn register() { let scope = AgentScope::::new(); - let responder = WorkerResponder {}; + let responder = WorkerResponder; let link = AgentLink::connect(&scope, responder); let upd = AgentLifecycleEvent::Create(link); scope.send(upd); diff --git a/crates/worker/src/worker/public.rs b/crates/worker/src/worker/public.rs index 5af5c14e..99fdd554 100644 --- a/crates/worker/src/worker/public.rs +++ b/crates/worker/src/worker/public.rs @@ -1,5 +1,8 @@ -use super::WorkerExt; -use super::*; +use crate::worker::*; +use crate::{ + locate_callback_and_respond, Agent, AgentLifecycleEvent, AgentLink, AgentScope, Bridge, + Callback, Discoverer, Dispatchable, HandlerId, Last, Shared, SharedOutputSlab, +}; use anymap2::{self, AnyMap}; use queue::Queue; use slab::Slab; @@ -9,8 +12,6 @@ use std::fmt; use std::marker::PhantomData; use std::rc::Rc; use web_sys::Worker; -use crate::Callback; -use crate::Shared; thread_local! { static REMOTE_AGENTS_POOL: RefCell = RefCell::new(AnyMap::new()); @@ -196,7 +197,7 @@ where { fn register() { let scope = AgentScope::::new(); - let responder = WorkerResponder {}; + let responder = WorkerResponder; let link = AgentLink::connect(&scope, responder); let upd = AgentLifecycleEvent::Create(link); scope.send(upd); diff --git a/crates/worker/src/worker/queue.rs b/crates/worker/src/worker/queue.rs index cbe2e83b..4c306b94 100644 --- a/crates/worker/src/worker/queue.rs +++ b/crates/worker/src/worker/queue.rs @@ -1,5 +1,5 @@ use std::cell::RefCell; -use std::collections::{hash_map, HashMap, HashSet}; +use std::collections::{hash_map::Entry, HashMap, HashSet}; use std::hash::Hash; /// Thread-local instance used to queue worker messages @@ -34,10 +34,10 @@ impl Queue { pub fn add_msg_to_queue(&self, msg: Vec, id: T) { let mut queue = self.msg_queue.borrow_mut(); match queue.entry(id) { - hash_map::Entry::Vacant(record) => { + Entry::Vacant(record) => { record.insert(vec![msg]); } - hash_map::Entry::Occupied(ref mut record) => { + Entry::Occupied(ref mut record) => { record.get_mut().push(msg); } } From 12dd0a29683b4c3a00e20c06c75111e5f0c8d5b1 Mon Sep 17 00:00:00 2001 From: Hamza Date: Sun, 2 Jan 2022 21:34:09 +0500 Subject: [PATCH 3/5] please no duplication warning --- crates/worker/src/worker/private.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/worker/src/worker/private.rs b/crates/worker/src/worker/private.rs index 5f7aed8a..ad469cc8 100644 --- a/crates/worker/src/worker/private.rs +++ b/crates/worker/src/worker/private.rs @@ -46,15 +46,15 @@ where let handler = move |data: Vec| { let msg = ToWorker::::unpack(&data); match msg { - ToWorker::Connected(_id) => { + ToWorker::Connected(_) => { let upd = AgentLifecycleEvent::Connected(SINGLETON_ID); scope.send(upd); } - ToWorker::ProcessInput(_id, value) => { + ToWorker::ProcessInput(_, value) => { let upd = AgentLifecycleEvent::Input(value, SINGLETON_ID); scope.send(upd); } - ToWorker::Disconnected(_id) => { + ToWorker::Disconnected(_) => { let upd = AgentLifecycleEvent::Disconnected(SINGLETON_ID); scope.send(upd); } From 45bb5da9f57d2d32d0bd119afb04cff76be9f74b Mon Sep 17 00:00:00 2001 From: Hamza Date: Sun, 2 Jan 2022 21:44:48 +0500 Subject: [PATCH 4/5] rename: agents -> workers --- crates/worker/src/lib.rs | 66 +++++------ crates/worker/src/link.rs | 166 +++++++++++++------------- crates/worker/src/pool.rs | 32 ++--- crates/worker/src/worker/mod.rs | 40 +++---- crates/worker/src/worker/private.rs | 133 +++++++++++---------- crates/worker/src/worker/public.rs | 177 ++++++++++++++-------------- crates/worker/src/worker/queue.rs | 14 +-- 7 files changed, 313 insertions(+), 315 deletions(-) diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 104ff357..6f7e9450 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -1,34 +1,34 @@ //! Workers are a way to offload tasks to web workers. These are run concurrently using //! [web-workers](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Using_web_workers). //! -//! # Types of Agents +//! # Types of Workers //! //! ## Reaches //! -//! * Public - There will exist at most one instance of a Public Agent at any given time. -//! Bridges will spawn or connect to an already spawned agent in a web worker. -//! When no bridges are connected to this agent, the agent will disappear. +//! * Public - There will exist at most one instance of a Public Worker at any given time. +//! Bridges will spawn or connect to an already spawned worker in a web worker. +//! When no bridges are connected to this worker, the worker will disappear. //! -//! * Private - Spawn a new agent in a web worker for every new bridge. This is good for +//! * Private - Spawn a new worker in a web worker for every new bridge. This is good for //! moving shared but independent behavior that communicates with the browser out of components. -//! When the the connected bridge is dropped, the agent will disappear. +//! When the the connected bridge is dropped, the worker will disappear. //! //! # Communicating with workers //! //! ## Bridges //! -//! A bridge allows bi-directional communication between an agent and a component. -//! Bridges also allow agents to communicate with one another. +//! A bridge allows bi-directional communication between an worker and a component. +//! Bridges also allow workers to communicate with one another. //! //! ## Dispatchers //! -//! A dispatcher allows uni-directional communication between a component and an agent. -//! A dispatcher allows a component to send messages to an agent. +//! A dispatcher allows uni-directional communication between a component and an worker. +//! A dispatcher allows a component to send messages to an worker. //! //! # Overhead //! -//! Agents use web workers (i.e. Private and Public). They incur a serialization overhead on the -//! messages they send and receive. Agents use [bincode](https://!github.com/servo/bincode) +//! Workers use web workers (i.e. Private and Public). They incur a serialization overhead on the +//! messages they send and receive. Workers use [bincode](https://!github.com/servo/bincode) //! to communicate with other browser worker, so the cost is substantially higher //! than just calling a function. @@ -38,12 +38,12 @@ mod link; mod pool; mod worker; -pub use link::AgentLink; +pub use link::WorkerLink; pub(crate) use link::*; pub(crate) use pool::*; pub use pool::{Dispatched, Dispatcher}; use std::cell::RefCell; -pub use worker::{Private, PrivateAgent, Public, PublicAgent}; +pub use worker::{Private, PrivateWorker, Public, PublicWorker}; use serde::{Deserialize, Serialize}; use std::fmt; @@ -56,10 +56,10 @@ pub type Shared = Rc>; /// Alias for `Rc` pub type Callback = Rc; -/// Declares the behavior of the agent. -pub trait Agent: Sized + 'static { - /// Reach capability of the agent. - type Reach: Discoverer; +/// Declares the behavior of the worker. +pub trait Worker: Sized + 'static { + /// Reach capability of the worker. + type Reach: Discoverer; /// Type of an input message. type Message; /// Incoming message type. @@ -67,8 +67,8 @@ pub trait Agent: Sized + 'static { /// Outgoing message type. type Output; - /// Creates an instance of an agent. - fn create(link: AgentLink) -> Self; + /// Creates an instance of an worker. + fn create(link: WorkerLink) -> Self; /// This method called on every update message. fn update(&mut self, msg: Self::Message); @@ -82,7 +82,7 @@ pub trait Agent: Sized + 'static { /// This method called on when a new bridge destroyed. fn disconnected(&mut self, _id: HandlerId) {} - /// This method called when the agent is destroyed. + /// This method called when the worker is destroyed. fn destroy(&mut self) {} /// Represents the name of loading resource for remote workers which @@ -118,39 +118,39 @@ impl HandlerId { fn raw_id(self) -> usize { self.0 } - /// Indicates if a handler id corresponds to callback in the Agent runtime. + /// Indicates if a handler id corresponds to callback in the Worker runtime. pub fn is_respondable(self) -> bool { self.1 } } -/// Determine a visibility of an agent. +/// Determine a visibility of an worker. #[doc(hidden)] pub trait Discoverer { - type Agent: Agent; + type Worker: Worker; - /// Spawns an agent and returns `Bridge` implementation. + /// Spawns an worker and returns `Bridge` implementation. fn spawn_or_join( - _callback: Option::Output>>, - ) -> Box>; + _callback: Option::Output>>, + ) -> Box>; } /// Bridge to a specific kind of worker. -pub trait Bridge { - /// Send a message to an agent. - fn send(&mut self, msg: AGN::Input); +pub trait Bridge { + /// Send a message to an worker. + fn send(&mut self, msg: W::Input); } /// This trait allows registering or getting the address of a worker. -pub trait Bridged: Agent + Sized + 'static { +pub trait Bridged: Worker + Sized + 'static { /// Creates a messaging bridge between a worker and the component. fn bridge(callback: Callback) -> Box>; } impl Bridged for T where - T: Agent, - ::Reach: Discoverer, + T: Worker, + ::Reach: Discoverer, { fn bridge(callback: Callback) -> Box> { Self::Reach::spawn_or_join(Some(callback)) diff --git a/crates/worker/src/link.rs b/crates/worker/src/link.rs index 68824269..d20ccb57 100644 --- a/crates/worker/src/link.rs +++ b/crates/worker/src/link.rs @@ -1,5 +1,5 @@ use crate::Shared; -use crate::{Agent, HandlerId}; +use crate::{Worker, HandlerId}; use std::cell::RefCell; use std::fmt; #[cfg(feature = "futures")] @@ -7,68 +7,68 @@ use std::future::Future; use std::rc::Rc; /// Defines communication from Worker to Consumers -pub(crate) trait Responder { +pub(crate) trait Responder { /// Implementation for communication channel from Worker to Consumers - fn respond(&self, id: HandlerId, output: AGN::Output); + fn respond(&self, id: HandlerId, output: W::Output); } -/// Link to agent's scope for creating callbacks. -pub struct AgentLink { - scope: AgentScope, - responder: Rc>, +/// Link to worker's scope for creating callbacks. +pub struct WorkerLink { + scope: WorkerScope, + responder: Rc>, } -impl AgentLink { +impl WorkerLink { /// Create link for a scope. - pub(crate) fn connect(scope: &AgentScope, responder: T) -> Self + pub(crate) fn connect(scope: &WorkerScope, responder: T) -> Self where - T: Responder + 'static, + T: Responder + 'static, { - AgentLink { + WorkerLink { scope: scope.clone(), responder: Rc::new(responder), } } - /// Send response to an agent. - pub fn respond(&self, id: HandlerId, output: AGN::Output) { + /// Send response to an worker. + pub fn respond(&self, id: HandlerId, output: W::Output) { self.responder.respond(id, output); } - /// Send a message to the agent + /// Send a message to the worker pub fn send_message(&self, msg: T) where - T: Into, + T: Into, { - self.scope.send(AgentLifecycleEvent::Message(msg.into())); + self.scope.send(WorkerLifecycleEvent::Message(msg.into())); } /// Send an input to self pub fn send_input(&self, input: T) where - T: Into, + T: Into, { let handler_id = HandlerId::new(0, false); self.scope - .send(AgentLifecycleEvent::Input(input.into(), handler_id)); + .send(WorkerLifecycleEvent::Input(input.into(), handler_id)); } - /// Create a callback which will send a message to the agent when invoked. + /// Create a callback which will send a message to the worker when invoked. pub fn callback(&self, function: F) -> Rc where - M: Into, + M: Into, F: Fn(IN) -> M + 'static, { let scope = self.scope.clone(); let closure = move |input| { let output = function(input).into(); - scope.send(AgentLifecycleEvent::Message(output)); + scope.send(WorkerLifecycleEvent::Message(output)); }; Rc::new(closure) } /// This method creates a callback which returns a Future which - /// returns a message to be sent back to the agent + /// returns a message to be sent back to the worker /// /// # Panics /// If the future panics, then the promise will not resolve, and @@ -77,7 +77,7 @@ impl AgentLink { #[cfg_attr(docsrs, doc(cfg(feature = "futures")))] pub fn callback_future(&self, function: FN) -> Rc where - M: Into, + M: Into, FU: Future + 'static, FN: Fn(IN) -> FU + 'static, { @@ -91,7 +91,7 @@ impl AgentLink { Rc::new(closure) } - /// This method processes a Future that returns a message and sends it back to the agent. + /// This method processes a Future that returns a message and sends it back to the worker. /// /// # Panics /// If the future panics, then the promise will not resolve, and will leak. @@ -99,62 +99,62 @@ impl AgentLink { #[cfg_attr(docsrs, doc(cfg(feature = "futures")))] pub fn send_future(&self, future: F) where - M: Into, + M: Into, F: Future + 'static, { - let link: AgentLink = self.clone(); + let link: WorkerLink = self.clone(); let js_future = async move { - let message: AGN::Message = future.await.into(); - let cb = link.callback(|m: AGN::Message| m); + let message: W::Message = future.await.into(); + let cb = link.callback(|m: W::Message| m); (*cb)(message); }; wasm_bindgen_futures::spawn_local(js_future); } } -impl fmt::Debug for AgentLink { +impl fmt::Debug for WorkerLink { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("AgentLink<_>") + f.write_str("WorkerLink<_>") } } -impl Clone for AgentLink { +impl Clone for WorkerLink { fn clone(&self) -> Self { - AgentLink { + WorkerLink { scope: self.scope.clone(), responder: self.responder.clone(), } } } /// This struct holds a reference to a component and to a global scheduler. -pub(crate) struct AgentScope { - state: Shared>, +pub(crate) struct WorkerScope { + state: Shared>, } -impl fmt::Debug for AgentScope { +impl fmt::Debug for WorkerScope { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("AgentScope<_>") + f.write_str("WorkerScope<_>") } } -impl Clone for AgentScope { +impl Clone for WorkerScope { fn clone(&self) -> Self { - AgentScope { + WorkerScope { state: self.state.clone(), } } } -impl AgentScope { - /// Create agent scope +impl WorkerScope { + /// Create worker scope pub fn new() -> Self { - let state = Rc::new(RefCell::new(AgentState::new())); - AgentScope { state } + let state = Rc::new(RefCell::new(WorkerState::new())); + WorkerScope { state } } - /// Schedule message for sending to agent - pub fn send(&self, event: AgentLifecycleEvent) { - let runnable = Box::new(AgentRunnable { + /// Schedule message for sending to worker + pub fn send(&self, event: WorkerLifecycleEvent) { + let runnable = Box::new(WorkerRunnable { state: self.state.clone(), event, }); @@ -162,52 +162,52 @@ impl AgentScope { } } -impl Default for AgentScope { +impl Default for WorkerScope { fn default() -> Self { Self::new() } } -struct AgentState { - agent: Option, - // TODO(#939): Use agent field to control create message this flag +struct WorkerState { + worker: Option, + // TODO: Use worker field to control create message this flag destroyed: bool, } -impl AgentState { +impl WorkerState { fn new() -> Self { - AgentState { - agent: None, + WorkerState { + worker: None, destroyed: false, } } } -/// Internal Agent lifecycle events +/// Internal Worker lifecycle events #[derive(Debug)] -pub(crate) enum AgentLifecycleEvent { +pub(crate) enum WorkerLifecycleEvent { /// Request to create link - Create(AgentLink), - /// Internal Agent message - Message(AGN::Message), + Create(WorkerLink), + /// Internal Worker message + Message(W::Message), /// Client connected Connected(HandlerId), /// Received message from Client - Input(AGN::Input, HandlerId), + Input(W::Input, HandlerId), /// Client disconnected Disconnected(HandlerId), - /// Request to destroy agent + /// Request to destroy worker Destroy, } -struct AgentRunnable { - state: Shared>, - event: AgentLifecycleEvent, +struct WorkerRunnable { + state: Shared>, + event: WorkerLifecycleEvent, } -impl AgentRunnable +impl WorkerRunnable where - AGN: Agent, + W: Worker, { fn run(self) { let mut state = self.state.borrow_mut(); @@ -215,43 +215,43 @@ where return; } match self.event { - AgentLifecycleEvent::Create(link) => { - state.agent = Some(AGN::create(link)); + WorkerLifecycleEvent::Create(link) => { + state.worker = Some(W::create(link)); } - AgentLifecycleEvent::Message(msg) => { + WorkerLifecycleEvent::Message(msg) => { state - .agent + .worker .as_mut() - .expect("agent was not created to process messages") + .expect("worker was not created to process messages") .update(msg); } - AgentLifecycleEvent::Connected(id) => { + WorkerLifecycleEvent::Connected(id) => { state - .agent + .worker .as_mut() - .expect("agent was not created to send a connected message") + .expect("worker was not created to send a connected message") .connected(id); } - AgentLifecycleEvent::Input(inp, id) => { + WorkerLifecycleEvent::Input(inp, id) => { state - .agent + .worker .as_mut() - .expect("agent was not created to process inputs") + .expect("worker was not created to process inputs") .handle_input(inp, id); } - AgentLifecycleEvent::Disconnected(id) => { + WorkerLifecycleEvent::Disconnected(id) => { state - .agent + .worker .as_mut() - .expect("agent was not created to send a disconnected message") + .expect("worker was not created to send a disconnected message") .disconnected(id); } - AgentLifecycleEvent::Destroy => { - let mut agent = state - .agent + WorkerLifecycleEvent::Destroy => { + let mut worker = state + .worker .take() - .expect("trying to destroy not existent agent"); - agent.destroy(); + .expect("trying to destroy not existent worker"); + worker.destroy(); state.destroyed = true; } } diff --git a/crates/worker/src/pool.rs b/crates/worker/src/pool.rs index 41847ccd..65ef40ce 100644 --- a/crates/worker/src/pool.rs +++ b/crates/worker/src/pool.rs @@ -5,15 +5,15 @@ use slab::Slab; pub(crate) type Last = bool; -/// Type alias to a sharable Slab that owns optional callbacks that emit messages of the type of the specified Agent. -pub(crate) type SharedOutputSlab = Shared::Output>>>>; +/// Type alias to a sharable Slab that owns optional callbacks that emit messages of the type of the specified Worker. +pub(crate) type SharedOutputSlab = Shared::Output>>>>; /// The slab contains the callback, the id is used to look up the callback, /// and the output is the message that will be sent via the callback. -pub(crate) fn locate_callback_and_respond( - slab: &SharedOutputSlab, +pub(crate) fn locate_callback_and_respond( + slab: &SharedOutputSlab, id: HandlerId, - output: AGN::Output, + output: W::Output, ) { let callback = { let slab = slab.borrow(); @@ -56,19 +56,19 @@ impl DerefMut for Dispatcher { } } -/// This trait allows the creation of a dispatcher to an existing agent that will not send replies when messages are sent. -pub trait Dispatched: Agent + Sized + 'static { - /// Creates a dispatcher to the agent that will not send messages back. +/// This trait allows the creation of a dispatcher to an existing worker that will not send replies when messages are sent. +pub trait Dispatched: Worker + Sized + 'static { + /// Creates a dispatcher to the worker that will not send messages back. /// /// # Note - /// Dispatchers don't have `HandlerId`s and therefore `Agent::handle` will be supplied `None` + /// Dispatchers don't have `HandlerId`s and therefore `Worker::handle` will be supplied `None` /// for the `id` parameter, and `connected` and `disconnected` will not be called. /// /// # Important - /// Because the Agents using Context or Public reaches use the number of existing bridges to - /// keep track of if the agent itself should exist, creating dispatchers will not guarantee that - /// an Agent will exist to service requests sent from Dispatchers. You **must** keep at least one - /// bridge around if you wish to use a dispatcher. If you are using agents in a write-only manner, + /// Because the Workers using Context or Public reaches use the number of existing bridges to + /// keep track of if the worker itself should exist, creating dispatchers will not guarantee that + /// an Worker will exist to service requests sent from Dispatchers. You **must** keep at least one + /// bridge around if you wish to use a dispatcher. If you are using workers in a write-only manner, /// then it is suggested that you create a bridge that handles no-op responses as high up in the /// component hierarchy as possible - oftentimes the root component for simplicity's sake. fn dispatcher() -> Dispatcher; @@ -79,9 +79,9 @@ pub trait Dispatchable {} impl Dispatched for T where - T: Agent, - ::Reach: Discoverer, - ::Reach: Dispatchable, + T: Worker, + ::Reach: Discoverer, + ::Reach: Dispatchable, { fn dispatcher() -> Dispatcher { Dispatcher(Self::Reach::spawn_or_join(None)) diff --git a/crates/worker/src/worker/mod.rs b/crates/worker/src/worker/mod.rs index 0c4563df..6d0bb540 100644 --- a/crates/worker/src/worker/mod.rs +++ b/crates/worker/src/worker/mod.rs @@ -2,26 +2,26 @@ mod private; mod public; mod queue; -pub use private::{Private, PrivateAgent}; -pub use public::{Public, PublicAgent}; +pub use private::{Private, PrivateWorker}; +pub use public::{Public, PublicWorker}; -use crate::{Agent, HandlerId, Responder}; +use crate::{Worker, HandlerId, Responder}; use js_sys::{Array, Reflect, Uint8Array}; use serde::{Deserialize, Serialize}; use wasm_bindgen::{closure::Closure, JsCast, JsValue, UnwrapThrowExt}; use web_sys::{ - Blob, BlobPropertyBag, DedicatedWorkerGlobalScope, MessageEvent, Url, Worker, WorkerOptions, + Blob, BlobPropertyBag, DedicatedWorkerGlobalScope, MessageEvent, Url, WorkerOptions, }; pub(crate) struct WorkerResponder; -impl Responder for WorkerResponder +impl Responder for WorkerResponder where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { - fn respond(&self, id: HandlerId, output: AGN::Output) { + fn respond(&self, id: HandlerId, output: W::Output) { let msg = FromWorker::ProcessOutput(id, output); let data = msg.pack(); worker_self().post_message_vec(data); @@ -38,11 +38,11 @@ pub trait Packed { impl Deserialize<'de>> Packed for T { fn pack(&self) -> Vec { - bincode::serialize(&self).expect("can't serialize an agent message") + bincode::serialize(&self).expect("can't serialize an worker message") } fn unpack(data: &[u8]) -> Self { - bincode::deserialize(data).expect("can't deserialize an agent message") + bincode::deserialize(data).expect("can't deserialize an worker message") } } @@ -68,16 +68,16 @@ enum FromWorker { ProcessOutput(HandlerId, T), } -fn send_to_remote(worker: &Worker, msg: ToWorker) +fn send_to_remote(worker: &web_sys::Worker, msg: ToWorker) where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { let msg = msg.pack(); worker.post_message_vec(msg); } -fn worker_new(name_of_resource: &str, resource_is_relative: bool, is_module: bool) -> Worker { +fn worker_new(name_of_resource: &str, resource_is_relative: bool, is_module: bool) -> web_sys::Worker { let origin = gloo_utils::document() .location() .unwrap_throw() @@ -123,9 +123,9 @@ fn worker_new(name_of_resource: &str, resource_is_relative: bool, is_module: boo &JsValue::from_str("module"), ) .unwrap(); - Worker::new_with_options(&url, &options).expect("failed to spawn worker") + web_sys::Worker::new_with_options(&url, &options).expect("failed to spawn worker") } else { - Worker::new(&url).expect("failed to spawn worker") + web_sys::Worker::new(&url).expect("failed to spawn worker") } } @@ -140,7 +140,7 @@ trait WorkerExt { } macro_rules! worker_ext_impl { - ($($type:ident),+) => {$( + ($($type:path),+) => {$( impl WorkerExt for $type { fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec)) { let handler = move |message: MessageEvent| { @@ -161,5 +161,5 @@ macro_rules! worker_ext_impl { } worker_ext_impl! { - Worker, DedicatedWorkerGlobalScope + web_sys::Worker, DedicatedWorkerGlobalScope } diff --git a/crates/worker/src/worker/private.rs b/crates/worker/src/worker/private.rs index ad469cc8..e599011c 100644 --- a/crates/worker/src/worker/private.rs +++ b/crates/worker/src/worker/private.rs @@ -1,6 +1,6 @@ use crate::worker::*; use crate::{ - Agent, AgentLifecycleEvent, AgentLink, AgentScope, Bridge, Callback, Discoverer, HandlerId, + Worker, WorkerLifecycleEvent, WorkerLink, WorkerScope, Bridge, Callback, Discoverer, HandlerId, }; use queue::Queue; use serde::{Deserialize, Serialize}; @@ -9,7 +9,6 @@ use std::fmt; use std::marker::PhantomData; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; -use web_sys::Worker; thread_local! { static QUEUE: Queue = Queue::new(); @@ -20,53 +19,53 @@ const SINGLETON_ID: HandlerId = HandlerId(0, true); /// Create a new instance for every bridge. #[allow(missing_debug_implementations)] -pub struct Private { - _agent: PhantomData, +pub struct Private { + _worker: PhantomData, } -/// A trait to enable private agents being registered in a web worker. -pub trait PrivateAgent { - /// Executes an agent in the current environment. +/// A trait to enable private workers being registered in a web worker. +pub trait PrivateWorker { + /// Executes an worker in the current environment. /// Uses in `main` function of a worker. fn register(); } -impl PrivateAgent for AGN +impl PrivateWorker for W where - AGN: Agent>, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, + W: Worker>, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { fn register() { - let scope = AgentScope::::new(); + let scope = WorkerScope::::new(); let responder = WorkerResponder; - let link = AgentLink::connect(&scope, responder); - let upd = AgentLifecycleEvent::Create(link); + let link = WorkerLink::connect(&scope, responder); + let upd = WorkerLifecycleEvent::Create(link); scope.send(upd); let handler = move |data: Vec| { - let msg = ToWorker::::unpack(&data); + let msg = ToWorker::::unpack(&data); match msg { ToWorker::Connected(_) => { - let upd = AgentLifecycleEvent::Connected(SINGLETON_ID); + let upd = WorkerLifecycleEvent::Connected(SINGLETON_ID); scope.send(upd); } ToWorker::ProcessInput(_, value) => { - let upd = AgentLifecycleEvent::Input(value, SINGLETON_ID); + let upd = WorkerLifecycleEvent::Input(value, SINGLETON_ID); scope.send(upd); } ToWorker::Disconnected(_) => { - let upd = AgentLifecycleEvent::Disconnected(SINGLETON_ID); + let upd = WorkerLifecycleEvent::Disconnected(SINGLETON_ID); scope.send(upd); } ToWorker::Destroy => { - let upd = AgentLifecycleEvent::Destroy; + let upd = WorkerLifecycleEvent::Destroy; scope.send(upd); // Terminates web worker worker_self().close(); } } }; - let loaded: FromWorker = FromWorker::WorkerLoaded; + let loaded: FromWorker = FromWorker::WorkerLoaded; let loaded = loaded.pack(); let worker = worker_self(); worker.set_onmessage_closure(handler); @@ -74,23 +73,23 @@ where } } -impl Discoverer for Private +impl Discoverer for Private where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { - type Agent = AGN; + type Worker = W; - fn spawn_or_join(callback: Option>) -> Box> { + fn spawn_or_join(callback: Option>) -> Box> { let id = PRIVATE_ID_COUNTER.fetch_add(1, Ordering::Relaxed); - let callback = callback.expect("Callback required for Private agents"); - let handler = move |data: Vec, worker: &Worker| { - let msg = FromWorker::::unpack(&data); + let callback = callback.expect("Callback required for Private workers"); + let handler = move |data: Vec, worker: &web_sys::Worker| { + let msg = FromWorker::::unpack(&data); match msg { FromWorker::WorkerLoaded => { QUEUE.with(|queue| { - queue.insert_loaded_agent(id); + queue.insert_loaded_worker(id); if let Some(msgs) = queue.remove_msg_queue(&id) { for msg in msgs { @@ -106,13 +105,13 @@ where } }; - let name_of_resource = AGN::name_of_resource(); - let is_relative = AGN::resource_path_is_relative(); + let name_of_resource = W::name_of_resource(); + let is_relative = W::resource_path_is_relative(); let handler_cell = Rc::new(RefCell::new(Some(handler))); let worker = { let handler_cell = handler_cell.clone(); - let worker = worker_new(name_of_resource, is_relative, AGN::is_module()); + let worker = worker_new(name_of_resource, is_relative, W::is_module()); let worker_clone = worker.clone(); worker.set_onmessage_closure(move |data: Vec| { if let Some(handler) = handler_cell.borrow().as_ref() { @@ -124,7 +123,7 @@ where let bridge = PrivateBridge { handler_cell, worker, - _agent: PhantomData, + _worker: PhantomData, id, }; bridge.send_message(ToWorker::Connected(SINGLETON_ID)); @@ -133,31 +132,31 @@ where } /// A connection manager for components interaction with workers. -pub struct PrivateBridge +pub struct PrivateBridge where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, - HNDL: Fn(Vec, &Worker), + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, + HNDL: Fn(Vec, &web_sys::Worker), { handler_cell: Rc>>, - worker: Worker, - _agent: PhantomData, + worker: web_sys::Worker, + _worker: PhantomData, id: usize, } -impl PrivateBridge +impl PrivateBridge where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, - HNDL: Fn(Vec, &Worker), + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, + HNDL: Fn(Vec, &web_sys::Worker), { /// Send a message to the worker, queuing the message if necessary - fn send_message(&self, msg: ToWorker) { + fn send_message(&self, msg: ToWorker) { QUEUE.with(|queue| { if queue.is_worker_loaded(&self.id) { - send_to_remote::(&self.worker, msg); + send_to_remote::(&self.worker, msg); } else { queue.add_msg_to_queue(msg.pack(), self.id); } @@ -165,49 +164,49 @@ where } } -impl fmt::Debug for PrivateBridge +impl fmt::Debug for PrivateBridge where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, - HNDL: Fn(Vec, &Worker), + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, + HNDL: Fn(Vec, &web_sys::Worker), { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("PrivateBridge<_>") } } -impl Bridge for PrivateBridge +impl Bridge for PrivateBridge where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, - HNDL: Fn(Vec, &Worker), + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, + HNDL: Fn(Vec, &web_sys::Worker), { - fn send(&mut self, msg: AGN::Input) { + fn send(&mut self, msg: W::Input) { let msg = ToWorker::ProcessInput(SINGLETON_ID, msg); self.send_message(msg); } } -impl Drop for PrivateBridge +impl Drop for PrivateBridge where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, - HNDL: Fn(Vec, &Worker), + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, + HNDL: Fn(Vec, &web_sys::Worker), { fn drop(&mut self) { let disconnected = ToWorker::Disconnected(SINGLETON_ID); - send_to_remote::(&self.worker, disconnected); + send_to_remote::(&self.worker, disconnected); let destroy = ToWorker::Destroy; - send_to_remote::(&self.worker, destroy); + send_to_remote::(&self.worker, destroy); self.handler_cell.borrow_mut().take(); QUEUE.with(|queue| { - queue.remove_agent(&self.id); + queue.remove_worker(&self.id); }); } } diff --git a/crates/worker/src/worker/public.rs b/crates/worker/src/worker/public.rs index 99fdd554..ff67987f 100644 --- a/crates/worker/src/worker/public.rs +++ b/crates/worker/src/worker/public.rs @@ -1,6 +1,6 @@ use crate::worker::*; use crate::{ - locate_callback_and_respond, Agent, AgentLifecycleEvent, AgentLink, AgentScope, Bridge, + locate_callback_and_respond, Worker, WorkerLifecycleEvent, WorkerLink, WorkerScope, Bridge, Callback, Discoverer, Dispatchable, HandlerId, Last, Shared, SharedOutputSlab, }; use anymap2::{self, AnyMap}; @@ -11,46 +11,45 @@ use std::cell::RefCell; use std::fmt; use std::marker::PhantomData; use std::rc::Rc; -use web_sys::Worker; thread_local! { - static REMOTE_AGENTS_POOL: RefCell = RefCell::new(AnyMap::new()); + static REMOTE_WORKERS_POOL: RefCell = RefCell::new(AnyMap::new()); static QUEUE: Queue = Queue::new(); } /// Create a single instance in a tab. #[allow(missing_debug_implementations)] -pub struct Public { - _agent: PhantomData, +pub struct Public { + _worker: PhantomData, } -impl Discoverer for Public +impl Discoverer for Public where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { - type Agent = AGN; + type Worker = W; - fn spawn_or_join(callback: Option>) -> Box> { - let bridge = REMOTE_AGENTS_POOL.with(|pool| { + fn spawn_or_join(callback: Option>) -> Box> { + let bridge = REMOTE_WORKERS_POOL.with(|pool| { let mut pool = pool.borrow_mut(); - match pool.entry::>() { + match pool.entry::>() { anymap2::Entry::Occupied(mut entry) => entry.get_mut().create_bridge(callback), anymap2::Entry::Vacant(entry) => { - let slab: Shared>>> = + let slab: Shared>>> = Rc::new(RefCell::new(Slab::new())); let handler = { let slab = slab.clone(); - move |data: Vec, worker: &Worker| { - let msg = FromWorker::::unpack(&data); + move |data: Vec, worker: &web_sys::Worker| { + let msg = FromWorker::::unpack(&data); match msg { FromWorker::WorkerLoaded => { QUEUE.with(|queue| { - queue.insert_loaded_agent(TypeId::of::()); + queue.insert_loaded_worker(TypeId::of::()); if let Some(msgs) = - queue.remove_msg_queue(&TypeId::of::()) + queue.remove_msg_queue(&TypeId::of::()) { for msg in msgs { worker.post_message_vec(msg) @@ -59,22 +58,22 @@ where }); } FromWorker::ProcessOutput(id, output) => { - locate_callback_and_respond::(&slab, id, output); + locate_callback_and_respond::(&slab, id, output); } } } }; - let name_of_resource = AGN::name_of_resource(); - let is_relative = AGN::resource_path_is_relative(); + let name_of_resource = W::name_of_resource(); + let is_relative = W::resource_path_is_relative(); let worker = { - let worker = worker_new(name_of_resource, is_relative, AGN::is_module()); + let worker = worker_new(name_of_resource, is_relative, W::is_module()); let worker_clone = worker.clone(); worker.set_onmessage_closure(move |data: Vec| { handler(data, &worker_clone); }); worker }; - let launched = RemoteAgent::new(worker, slab); + let launched = RemoteWorker::new(worker, slab); entry.insert(launched).create_bridge(callback) } } @@ -83,78 +82,78 @@ where } } -impl Dispatchable for Public +impl Dispatchable for Public where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { } /// A connection manager for components interaction with workers. -pub struct PublicBridge +pub struct PublicBridge where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { - worker: Worker, + worker: web_sys::Worker, id: HandlerId, - _agent: PhantomData, + _worker: PhantomData, } -impl fmt::Debug for PublicBridge +impl fmt::Debug for PublicBridge where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("PublicBridge<_>") } } -impl PublicBridge +impl PublicBridge where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { /// Send a message to the worker, queuing the message if necessary - fn send_message(&self, msg: ToWorker) { + fn send_message(&self, msg: ToWorker) { QUEUE.with(|queue| { - if queue.is_worker_loaded(&TypeId::of::()) { - send_to_remote::(&self.worker, msg); + if queue.is_worker_loaded(&TypeId::of::()) { + send_to_remote::(&self.worker, msg); } else { - queue.add_msg_to_queue(msg.pack(), TypeId::of::()); + queue.add_msg_to_queue(msg.pack(), TypeId::of::()); } }); } } -impl Bridge for PublicBridge +impl Bridge for PublicBridge where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { - fn send(&mut self, msg: AGN::Input) { + fn send(&mut self, msg: W::Input) { let msg = ToWorker::ProcessInput(self.id, msg); self.send_message(msg); } } -impl Drop for PublicBridge +impl Drop for PublicBridge where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { fn drop(&mut self) { - let terminate_worker = REMOTE_AGENTS_POOL.with(|pool| { + let terminate_worker = REMOTE_WORKERS_POOL.with(|pool| { let mut pool = pool.borrow_mut(); let terminate_worker = { - if let Some(launched) = pool.get_mut::>() { + if let Some(launched) = pool.get_mut::>() { launched.remove_bridge(self) } else { false @@ -162,7 +161,7 @@ where }; if terminate_worker { - pool.remove::>(); + pool.remove::>(); } terminate_worker @@ -176,55 +175,55 @@ where self.send_message(destroy); QUEUE.with(|queue| { - queue.remove_agent(&TypeId::of::()); + queue.remove_worker(&TypeId::of::()); }); } } } -/// A trait to enable public agents being registered in a web worker. -pub trait PublicAgent { - /// Executes an agent in the current environment. +/// A trait to enable public workers being registered in a web worker. +pub trait PublicWorker { + /// Executes an worker in the current environment. /// Uses in `main` function of a worker. fn register(); } -impl PublicAgent for AGN +impl PublicWorker for W where - AGN: Agent>, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, + W: Worker>, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { fn register() { - let scope = AgentScope::::new(); + let scope = WorkerScope::::new(); let responder = WorkerResponder; - let link = AgentLink::connect(&scope, responder); - let upd = AgentLifecycleEvent::Create(link); + let link = WorkerLink::connect(&scope, responder); + let upd = WorkerLifecycleEvent::Create(link); scope.send(upd); let handler = move |data: Vec| { - let msg = ToWorker::::unpack(&data); + let msg = ToWorker::::unpack(&data); match msg { ToWorker::Connected(id) => { - let upd = AgentLifecycleEvent::Connected(id); + let upd = WorkerLifecycleEvent::Connected(id); scope.send(upd); } ToWorker::ProcessInput(id, value) => { - let upd = AgentLifecycleEvent::Input(value, id); + let upd = WorkerLifecycleEvent::Input(value, id); scope.send(upd); } ToWorker::Disconnected(id) => { - let upd = AgentLifecycleEvent::Disconnected(id); + let upd = WorkerLifecycleEvent::Disconnected(id); scope.send(upd); } ToWorker::Destroy => { - let upd = AgentLifecycleEvent::Destroy; + let upd = WorkerLifecycleEvent::Destroy; scope.send(upd); // Terminates web worker worker_self().close(); } } }; - let loaded: FromWorker = FromWorker::WorkerLoaded; + let loaded: FromWorker = FromWorker::WorkerLoaded; let loaded = loaded.pack(); let worker = worker_self(); worker.set_onmessage_closure(handler); @@ -232,27 +231,27 @@ where } } -struct RemoteAgent +struct RemoteWorker where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { - worker: Worker, - slab: SharedOutputSlab, + worker: web_sys::Worker, + slab: SharedOutputSlab, } -impl RemoteAgent +impl RemoteWorker where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, + W: Worker, + ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { - pub fn new(worker: Worker, slab: SharedOutputSlab) -> Self { - RemoteAgent { worker, slab } + pub fn new(worker: web_sys::Worker, slab: SharedOutputSlab) -> Self { + RemoteWorker { worker, slab } } - fn create_bridge(&mut self, callback: Option>) -> PublicBridge { + fn create_bridge(&mut self, callback: Option>) -> PublicBridge { let respondable = callback.is_some(); let mut slab = self.slab.borrow_mut(); let id: usize = slab.insert(callback); @@ -260,14 +259,14 @@ where let bridge = PublicBridge { worker: self.worker.clone(), id, - _agent: PhantomData, + _worker: PhantomData, }; bridge.send_message(ToWorker::Connected(bridge.id)); bridge } - fn remove_bridge(&mut self, bridge: &PublicBridge) -> Last { + fn remove_bridge(&mut self, bridge: &PublicBridge) -> Last { let mut slab = self.slab.borrow_mut(); let _ = slab.remove(bridge.id.raw_id()); slab.is_empty() diff --git a/crates/worker/src/worker/queue.rs b/crates/worker/src/worker/queue.rs index 4c306b94..9faf2324 100644 --- a/crates/worker/src/worker/queue.rs +++ b/crates/worker/src/worker/queue.rs @@ -4,14 +4,14 @@ use std::hash::Hash; /// Thread-local instance used to queue worker messages pub struct Queue { - loaded_agents: RefCell>, + loaded_workers: RefCell>, msg_queue: RefCell>>>, } impl Queue { pub fn new() -> Queue { Queue { - loaded_agents: RefCell::new(HashSet::new()), + loaded_workers: RefCell::new(HashSet::new()), msg_queue: RefCell::new(HashMap::new()), } } @@ -22,13 +22,13 @@ impl Queue { } #[inline] - pub fn insert_loaded_agent(&self, id: T) { - self.loaded_agents.borrow_mut().insert(id); + pub fn insert_loaded_worker(&self, id: T) { + self.loaded_workers.borrow_mut().insert(id); } #[inline] pub fn is_worker_loaded(&self, id: &T) -> bool { - self.loaded_agents.borrow().contains(id) + self.loaded_workers.borrow().contains(id) } pub fn add_msg_to_queue(&self, msg: Vec, id: T) { @@ -45,8 +45,8 @@ impl Queue { /// This is called by a worker's `Drop` implementation in order to remove the worker from the list /// of loaded workers. - pub fn remove_agent(&self, id: &T) { - self.loaded_agents.borrow_mut().remove(id); + pub fn remove_worker(&self, id: &T) { + self.loaded_workers.borrow_mut().remove(id); self.msg_queue.borrow_mut().remove(id); } } From d3415840506e4411d5683c12f4acda24d672a699 Mon Sep 17 00:00:00 2001 From: Hamza Date: Sun, 2 Jan 2022 22:04:19 +0500 Subject: [PATCH 5/5] fmt --- crates/worker/src/link.rs | 2 +- crates/worker/src/worker/mod.rs | 8 ++++++-- crates/worker/src/worker/private.rs | 2 +- crates/worker/src/worker/public.rs | 4 ++-- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/crates/worker/src/link.rs b/crates/worker/src/link.rs index d20ccb57..958b7655 100644 --- a/crates/worker/src/link.rs +++ b/crates/worker/src/link.rs @@ -1,5 +1,5 @@ use crate::Shared; -use crate::{Worker, HandlerId}; +use crate::{HandlerId, Worker}; use std::cell::RefCell; use std::fmt; #[cfg(feature = "futures")] diff --git a/crates/worker/src/worker/mod.rs b/crates/worker/src/worker/mod.rs index 6d0bb540..fcb5d6bc 100644 --- a/crates/worker/src/worker/mod.rs +++ b/crates/worker/src/worker/mod.rs @@ -5,7 +5,7 @@ mod queue; pub use private::{Private, PrivateWorker}; pub use public::{Public, PublicWorker}; -use crate::{Worker, HandlerId, Responder}; +use crate::{HandlerId, Responder, Worker}; use js_sys::{Array, Reflect, Uint8Array}; use serde::{Deserialize, Serialize}; use wasm_bindgen::{closure::Closure, JsCast, JsValue, UnwrapThrowExt}; @@ -77,7 +77,11 @@ where let msg = msg.pack(); worker.post_message_vec(msg); } -fn worker_new(name_of_resource: &str, resource_is_relative: bool, is_module: bool) -> web_sys::Worker { +fn worker_new( + name_of_resource: &str, + resource_is_relative: bool, + is_module: bool, +) -> web_sys::Worker { let origin = gloo_utils::document() .location() .unwrap_throw() diff --git a/crates/worker/src/worker/private.rs b/crates/worker/src/worker/private.rs index e599011c..e990538e 100644 --- a/crates/worker/src/worker/private.rs +++ b/crates/worker/src/worker/private.rs @@ -1,6 +1,6 @@ use crate::worker::*; use crate::{ - Worker, WorkerLifecycleEvent, WorkerLink, WorkerScope, Bridge, Callback, Discoverer, HandlerId, + Bridge, Callback, Discoverer, HandlerId, Worker, WorkerLifecycleEvent, WorkerLink, WorkerScope, }; use queue::Queue; use serde::{Deserialize, Serialize}; diff --git a/crates/worker/src/worker/public.rs b/crates/worker/src/worker/public.rs index ff67987f..0905b98c 100644 --- a/crates/worker/src/worker/public.rs +++ b/crates/worker/src/worker/public.rs @@ -1,7 +1,7 @@ use crate::worker::*; use crate::{ - locate_callback_and_respond, Worker, WorkerLifecycleEvent, WorkerLink, WorkerScope, Bridge, - Callback, Discoverer, Dispatchable, HandlerId, Last, Shared, SharedOutputSlab, + locate_callback_and_respond, Bridge, Callback, Discoverer, Dispatchable, HandlerId, Last, + Shared, SharedOutputSlab, Worker, WorkerLifecycleEvent, WorkerLink, WorkerScope, }; use anymap2::{self, AnyMap}; use queue::Queue;