-
Notifications
You must be signed in to change notification settings - Fork 147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Worker v2 #200
Worker v2 #200
Changes from 33 commits
435f9d6
3845128
ea8e5ec
7abcaf2
fb8cf16
ff774ff
defbbba
950060e
e3aebf5
2845afa
a3eb5b9
74d5d1a
a2ba75f
d26e873
19974b7
f292201
3130340
64a08e0
3af881f
8ee5efa
891ced8
1d16320
c5a0bd6
a3dbd0e
1bbeff4
6843d99
3952ebb
75402be
ae2435b
fa79c2c
cc6968f
23a1e27
a2101e1
9a7b422
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,8 @@ | |
**/*.rs.bk | ||
Cargo.lock | ||
|
||
examples/*/dist | ||
|
||
# editor configs | ||
.vscode | ||
.idea | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,4 +44,7 @@ members = [ | |
"crates/history", | ||
"crates/worker", | ||
"crates/net", | ||
|
||
"examples/markdown", | ||
"examples/clock", | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
use std::cell::RefCell; | ||
use std::collections::HashMap; | ||
use std::fmt; | ||
use std::marker::PhantomData; | ||
use std::rc::Rc; | ||
use std::rc::Weak; | ||
|
||
use crate::codec::Codec; | ||
use crate::handler_id::HandlerId; | ||
use crate::messages::ToWorker; | ||
use crate::native_worker::NativeWorkerExt; | ||
use crate::traits::Worker; | ||
use crate::{Callback, Shared}; | ||
|
||
pub(crate) type ToWorkerQueue<W> = Vec<ToWorker<W>>; | ||
pub(crate) type CallbackMap<W> = HashMap<HandlerId, Weak<dyn Fn(<W as Worker>::Output)>>; | ||
|
||
struct WorkerBridgeInner<W> | ||
where | ||
W: Worker, | ||
{ | ||
// When worker is loaded, queue becomes None. | ||
pending_queue: Shared<Option<ToWorkerQueue<W>>>, | ||
callbacks: Shared<CallbackMap<W>>, | ||
post_msg: Rc<dyn Fn(ToWorker<W>)>, | ||
} | ||
|
||
impl<W> fmt::Debug for WorkerBridgeInner<W> | ||
where | ||
W: Worker, | ||
{ | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
f.write_str("WorkerBridgeInner<_>") | ||
} | ||
} | ||
|
||
impl<W> WorkerBridgeInner<W> | ||
where | ||
W: Worker, | ||
{ | ||
/// Send a message to the worker, queuing the message if necessary | ||
fn send_message(&self, msg: ToWorker<W>) { | ||
let mut pending_queue = self.pending_queue.borrow_mut(); | ||
|
||
match pending_queue.as_mut() { | ||
Some(m) => { | ||
m.push(msg); | ||
} | ||
None => { | ||
(self.post_msg)(msg); | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl<W> Drop for WorkerBridgeInner<W> | ||
where | ||
W: Worker, | ||
{ | ||
fn drop(&mut self) { | ||
let destroy = ToWorker::Destroy; | ||
self.send_message(destroy); | ||
} | ||
} | ||
|
||
/// A connection manager for components interaction with workers. | ||
pub struct WorkerBridge<W> | ||
where | ||
W: Worker, | ||
{ | ||
inner: Rc<WorkerBridgeInner<W>>, | ||
id: HandlerId, | ||
_worker: PhantomData<W>, | ||
cb: Option<Rc<dyn Fn(W::Output)>>, | ||
} | ||
|
||
impl<W> WorkerBridge<W> | ||
where | ||
W: Worker, | ||
{ | ||
pub(crate) fn new<CODEC>( | ||
id: HandlerId, | ||
native_worker: web_sys::Worker, | ||
pending_queue: Rc<RefCell<Option<ToWorkerQueue<W>>>>, | ||
callbacks: Rc<RefCell<CallbackMap<W>>>, | ||
callback: Option<Callback<W::Output>>, | ||
) -> Self | ||
where | ||
CODEC: Codec, | ||
{ | ||
let post_msg = | ||
{ move |msg: ToWorker<W>| native_worker.post_packed_message::<_, CODEC>(msg) }; | ||
|
||
Self { | ||
inner: WorkerBridgeInner { | ||
pending_queue, | ||
callbacks, | ||
post_msg: Rc::new(post_msg), | ||
} | ||
.into(), | ||
id, | ||
_worker: PhantomData, | ||
cb: callback, | ||
} | ||
} | ||
|
||
/// Send a message to the current worker. | ||
pub fn send(&self, msg: W::Input) { | ||
let msg = ToWorker::ProcessInput(self.id, msg); | ||
self.inner.send_message(msg); | ||
} | ||
|
||
/// Forks the bridge with a different callback. | ||
/// | ||
/// This creates a new [HandlerID] that helps the worker to differentiate bridges. | ||
pub fn fork<F>(&self, cb: Option<F>) -> Self | ||
where | ||
F: 'static + Fn(W::Output), | ||
{ | ||
let cb = cb.map(|m| Rc::new(m) as Rc<dyn Fn(W::Output)>); | ||
let handler_id = HandlerId::new(); | ||
|
||
if let Some(cb_weak) = cb.as_ref().map(Rc::downgrade) { | ||
self.inner | ||
.callbacks | ||
.borrow_mut() | ||
.insert(handler_id, cb_weak); | ||
} | ||
|
||
Self { | ||
inner: self.inner.clone(), | ||
id: handler_id, | ||
_worker: PhantomData, | ||
cb, | ||
} | ||
} | ||
} | ||
|
||
impl<W> Clone for WorkerBridge<W> | ||
where | ||
W: Worker, | ||
{ | ||
fn clone(&self) -> Self { | ||
Self { | ||
inner: self.inner.clone(), | ||
id: self.id, | ||
_worker: PhantomData, | ||
cb: self.cb.clone(), | ||
} | ||
} | ||
} | ||
|
||
impl<W> Drop for WorkerBridge<W> | ||
where | ||
W: Worker, | ||
{ | ||
fn drop(&mut self) { | ||
let disconnected = ToWorker::Disconnected(self.id); | ||
self.inner.send_message(disconnected); | ||
} | ||
} | ||
|
||
impl<W> fmt::Debug for WorkerBridge<W> | ||
where | ||
W: Worker, | ||
{ | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
f.write_str("WorkerBridge<_>") | ||
} | ||
} | ||
|
||
impl<W> PartialEq for WorkerBridge<W> | ||
where | ||
W: Worker, | ||
{ | ||
fn eq(&self, rhs: &Self) -> bool { | ||
self.id == rhs.id | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
use js_sys::Uint8Array; | ||
use serde::{Deserialize, Serialize}; | ||
use wasm_bindgen::JsValue; | ||
|
||
/// Message Encoding and Decoding Format | ||
pub trait Codec { | ||
/// Encode an input to JsValue | ||
fn encode<I>(input: I) -> JsValue | ||
where | ||
I: Serialize; | ||
|
||
/// Decode a message to a type | ||
fn decode<O>(input: JsValue) -> O | ||
where | ||
O: for<'de> Deserialize<'de>; | ||
} | ||
|
||
/// Default message encoding with [bincode]. | ||
#[derive(Debug)] | ||
pub struct Bincode {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should bincode be the default, with no way to opt out? Postcard recently hit 1.0, which is no_std so it should have a smaller footprint in the binary. I think we should allow users to customize it. A simple feature flag (enabled by default) will be enough here. Those who want to customize can implement What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If we do not provide a default encoding, it means that users will have to specify Dead code elimination is effective in removing Bincode when it is not selected.
This is already possible by specifying a |
||
|
||
impl Codec for Bincode { | ||
fn encode<I>(input: I) -> JsValue | ||
where | ||
I: Serialize, | ||
{ | ||
let buf = bincode::serialize(&input).expect("can't serialize an worker message"); | ||
Uint8Array::from(buf.as_slice()).into() | ||
} | ||
|
||
fn decode<O>(input: JsValue) -> O | ||
where | ||
O: for<'de> Deserialize<'de>, | ||
{ | ||
let data = Uint8Array::from(input).to_vec(); | ||
bincode::deserialize(&data).expect("can't deserialize an worker message") | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
use std::sync::atomic::{AtomicUsize, Ordering}; | ||
|
||
use serde::{Deserialize, Serialize}; | ||
|
||
/// Identifier to send output to bridges. | ||
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Clone, Copy)] | ||
pub struct HandlerId(usize); | ||
|
||
impl HandlerId { | ||
pub(crate) fn new() -> Self { | ||
static CTR: AtomicUsize = AtomicUsize::new(0); | ||
|
||
let id = CTR.fetch_add(1, Ordering::SeqCst); | ||
|
||
HandlerId(id) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They have different semantics. The current implementation can't be created like:
I think it's better to remove the
{ }
. Adding fields will be a breaking change regardless.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You will never need to instantiate the Bincode struct.
It is supposed to be used with
Worker::spawner().encoding::<Bincode>().spawn()
.But I have removed it anyways.