Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

web-sys agent conversion #818

Merged
merged 10 commits into from
Jan 9, 2020
219 changes: 175 additions & 44 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use crate::callback::Callback;
use crate::scheduler::{scheduler, Runnable, Shared};
use anymap::{self, AnyMap};
use bincode;
use cfg_if::cfg_if;
use cfg_match::cfg_match;
use log::warn;
use serde::{Deserialize, Serialize};
use slab::Slab;
Expand All @@ -14,9 +16,17 @@ use std::fmt;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::rc::Rc;
use stdweb::Value;
#[allow(unused_imports)]
use stdweb::{_js_impl, js};
cfg_if! {
if #[cfg(feature = "std_web")] {
use stdweb::Value;
#[allow(unused_imports)]
use stdweb::{_js_impl, js};
} else if #[cfg(feature = "web_sys")] {
use js_sys::{Reflect, Uint8Array};
use wasm_bindgen::{closure::Closure, JsCast, JsValue};
use web_sys::{DedicatedWorkerGlobalScope, MessageEvent, Worker, WorkerOptions};
}
}

/// Serializable messages to worker
#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -162,21 +172,29 @@ where
ToWorker::Destroy => {
let upd = AgentLifecycleEvent::Destroy;
scope.send(upd);
js! {
// Terminates web worker
self.close();
// Terminates web worker
cfg_match! {
daxpedda marked this conversation as resolved.
Show resolved Hide resolved
feature = "std_web" => js! { self.close(); },
feature = "web_sys" => worker_self().close(),
};
}
}
};
let loaded: FromWorker<T::Output> = FromWorker::WorkerLoaded;
let loaded = loaded.pack();
js! {
var handler = @{handler};
self.onmessage = function(event) {
handler(event.data);
};
self.postMessage(@{loaded});
cfg_match! {
feature = "std_web" => js! {
var handler = @{handler};
self.onmessage = function(event) {
handler(event.data);
};
self.postMessage(@{loaded});
},
feature = "web_sys" => ({
let worker = worker_self();
worker.set_onmessage_closure(handler);
worker.post_message_vec(loaded);
}),
};
}
}
Expand Down Expand Up @@ -433,13 +451,20 @@ impl Discoverer for Private {
};
// TODO Need somethig better...
let name_of_resource = AGN::name_of_resource();
let worker = js! {
var worker = new Worker(@{name_of_resource});
var handler = @{handler};
worker.onmessage = function(event) {
handler(event.data);
};
return worker;
let worker = cfg_match! {
feature = "std_web" => js! {
var worker = new Worker(@{name_of_resource});
var handler = @{handler};
worker.onmessage = function(event) {
handler(event.data);
};
return worker;
},
feature = "web_sys" => ({
let worker = worker_new(name_of_resource, AGN::is_module());
worker.set_onmessage_closure(handler);
worker
}),
};
let bridge = PrivateBridge {
worker,
Expand All @@ -451,7 +476,10 @@ impl Discoverer for Private {

/// A connection manager for components interaction with workers.
pub struct PrivateBridge<T: Agent> {
#[cfg(feature = "std_web")]
worker: Value,
#[cfg(feature = "web_sys")]
worker: Worker,
_agent: PhantomData<T>,
}

Expand All @@ -467,12 +495,17 @@ impl<AGN: Agent> Bridge<AGN> for PrivateBridge<AGN> {
// Use a queue to collect a messages if an instance is not ready
// and send them to an agent when it will reported readiness.
let msg = ToWorker::ProcessInput(SINGLETON_ID, msg).pack();
let worker = &self.worker;
js! {
var worker = @{worker};
var bytes = @{msg};
worker.postMessage(bytes);
};
cfg_match! {
feature = "std_web" => ({
let worker = &self.worker;
js! {
var worker = @{worker};
var bytes = @{msg};
worker.postMessage(bytes);
};
}),
feature = "web_sys" => self.worker.post_message_vec(msg),
}
}
}

Expand All @@ -483,12 +516,19 @@ impl<AGN: Agent> Drop for PrivateBridge<AGN> {
}

struct RemoteAgent<AGN: Agent> {
#[cfg(feature = "std_web")]
worker: Value,
#[cfg(feature = "web_sys")]
worker: Worker,
slab: SharedOutputSlab<AGN>,
}

impl<AGN: Agent> RemoteAgent<AGN> {
pub fn new(worker: Value, slab: SharedOutputSlab<AGN>) -> Self {
pub fn new(
#[cfg(feature = "std_web")] worker: Value,
#[cfg(feature = "web_sys")] worker: Worker,
slab: SharedOutputSlab<AGN>,
) -> Self {
RemoteAgent { worker, slab }
}

Expand Down Expand Up @@ -533,7 +573,9 @@ impl Discoverer for Public {
Rc::new(RefCell::new(Slab::new()));
let handler = {
let slab = slab.clone();
move |data: Vec<u8>, worker: Value| {
move |data: Vec<u8>,
#[cfg(feature = "std_web")] worker: Value,
#[cfg(feature = "web_sys")] worker: &Worker| {
let msg = FromWorker::<AGN::Output>::unpack(&data);
match msg {
FromWorker::WorkerLoaded => {
Expand All @@ -546,8 +588,13 @@ impl Discoverer for Public {
local.borrow_mut().get_mut(&TypeId::of::<AGN>())
{
for msg in msgs.drain(..) {
let worker = &worker;
js! {@{worker}.postMessage(@{msg});};
cfg_match! {
feature = "std_web" => ({
let worker = &worker;
js! {@{worker}.postMessage(@{msg});};
}),
feature = "web_sys" => worker.post_message_vec(msg),
}
}
}
});
Expand All @@ -559,13 +606,23 @@ impl Discoverer for Public {
}
};
let name_of_resource = AGN::name_of_resource();
let worker = js! {
var worker = new Worker(@{name_of_resource});
var handler = @{handler};
worker.onmessage = function(event) {
handler(event.data, worker);
};
return worker;
let worker = cfg_match! {
feature = "std_web" => js! {
var worker = new Worker(@{name_of_resource});
var handler = @{handler};
worker.onmessage = function(event) {
handler(event.data, worker);
};
return worker;
},
feature = "web_sys" => ({
let worker = worker_new(name_of_resource, AGN::is_module());
let worker_clone = worker.clone();
worker.set_onmessage_closure(move |data: Vec<u8>| {
handler(data, &worker_clone);
});
worker
}),
};
let launched = RemoteAgent::new(worker, slab);
entry.insert(launched).create_bridge(callback)
Expand All @@ -580,7 +637,10 @@ impl Dispatchable for Public {}

/// A connection manager for components interaction with workers.
pub struct PublicBridge<AGN: Agent> {
#[cfg(feature = "std_web")]
worker: Value,
#[cfg(feature = "web_sys")]
worker: Worker,
id: HandlerId,
_agent: PhantomData<AGN>,
}
Expand Down Expand Up @@ -614,15 +674,22 @@ impl<AGN: Agent> PublicBridge<AGN> {
}
}

fn send_to_remote<AGN: Agent>(worker: &Value, msg: ToWorker<AGN::Input>) {
fn send_to_remote<AGN: Agent>(
#[cfg(feature = "std_web")] worker: &Value,
#[cfg(feature = "web_sys")] worker: &Worker,
msg: ToWorker<AGN::Input>,
) {
// TODO Important! Implement.
// Use a queue to collect a messages if an instance is not ready
// and send them to an agent when it will reported readiness.
let msg = msg.pack();
js! {
var worker = @{worker};
var bytes = @{msg};
worker.postMessage(bytes);
cfg_match! {
feature = "std_web" => js! {
var worker = @{worker};
var bytes = @{msg};
worker.postMessage(bytes);
},
feature = "web_sys" => worker.post_message_vec(msg),
};
}

Expand Down Expand Up @@ -705,6 +772,12 @@ pub trait Agent: Sized + 'static {
fn name_of_resource() -> &'static str {
"main.js"
}

/// Signifies if resource is a module.
/// This has pending browser support.
fn is_module() -> bool {
false
}
}

/// This struct holds a reference to a component and to a global scheduler.
Expand Down Expand Up @@ -761,9 +834,12 @@ impl<AGN: Agent> Responder<AGN> for WorkerResponder {
fn respond(&self, id: HandlerId, output: AGN::Output) {
let msg = FromWorker::ProcessOutput(id, output);
let data = msg.pack();
js! {
var data = @{data};
self.postMessage(data);
cfg_match! {
feature = "std_web" => js! {
var data = @{data};
self.postMessage(data);
},
feature = "web_sys" => worker_self().post_message_vec(data),
};
}
}
Expand Down Expand Up @@ -904,3 +980,58 @@ where
}
}
}

#[cfg(feature = "web_sys")]
fn worker_new(name_of_resource: &str, is_module: bool) -> Worker {
if is_module {
let options = WorkerOptions::new();
Reflect::set(
options.as_ref(),
&JsValue::from_str("type"),
&JsValue::from_str("module"),
)
.unwrap();
Worker::new_with_options(name_of_resource, &options).expect("failed to spawn worker")
} else {
Worker::new(name_of_resource).expect("failed to spawn worker")
}
}

#[cfg(feature = "web_sys")]
fn worker_self() -> DedicatedWorkerGlobalScope {
JsValue::from(js_sys::global()).into()
}

#[cfg(feature = "web_sys")]
trait WorkerExt {
fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec<u8>));

fn post_message_vec(&self, data: Vec<u8>);
}

#[cfg(feature = "web_sys")]
macro_rules! worker_ext_impl {
($($type:ident),+) => {$(
impl WorkerExt for $type {
fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec<u8>)) {
let handler = move |message: MessageEvent| {
let data = Uint8Array::from(message.data()).to_vec();
handler(data);
};
let closure = Closure::wrap(Box::new(handler) as Box<dyn Fn(MessageEvent)>);
self.set_onmessage(Some(closure.as_ref().unchecked_ref()));
closure.forget();
}

fn post_message_vec(&self, data: Vec<u8>) {
self.post_message(&Uint8Array::from(data.as_slice()))
.expect("failed to post message");
}
}
)+};
}

#[cfg(feature = "web_sys")]
worker_ext_impl! {
Worker, DedicatedWorkerGlobalScope
}