Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(event streaming): configurable worker path, use SharedWorker #2080

Merged
merged 12 commits into from
Mar 7, 2024
14 changes: 14 additions & 0 deletions mm2src/mm2_event_stream/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use serde::Deserialize;
use std::collections::HashMap;
#[cfg(target_arch = "wasm32")] use std::path::PathBuf;

#[cfg(target_arch = "wasm32")]
const DEFAULT_WORKER_PATH: &str = "event_streaming_worker.js";

/// Multi-purpose/generic event type that can easily be used over the event streaming
pub struct Event {
Expand Down Expand Up @@ -34,8 +38,16 @@ pub struct EventStreamConfiguration {
pub access_control_allow_origin: String,
#[serde(default)]
active_events: HashMap<String, EventConfig>,
/// The path to the worker script for event streaming.
#[cfg(target_arch = "wasm32")]
#[serde(default = "default_worker_path")]
pub worker_path: PathBuf,
}

#[cfg(target_arch = "wasm32")]
#[inline]
fn default_worker_path() -> PathBuf { PathBuf::from(DEFAULT_WORKER_PATH) }

/// Represents the configuration for a specific event within the event stream.
#[derive(Clone, Default, Deserialize)]
pub struct EventConfig {
Expand All @@ -51,6 +63,8 @@ impl Default for EventStreamConfiguration {
Self {
access_control_allow_origin: String::from("*"),
active_events: Default::default(),
#[cfg(target_arch = "wasm32")]
worker_path: default_worker_path(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ wasm-bindgen-futures = "0.4.21"
web-sys = { version = "0.3.55", features = ["console", "CloseEvent", "DomException", "ErrorEvent", "IdbDatabase",
"IdbCursor", "IdbCursorWithValue", "IdbFactory", "IdbIndex", "IdbIndexParameters", "IdbObjectStore",
"IdbObjectStoreParameters", "IdbOpenDbRequest", "IdbKeyRange", "IdbTransaction", "IdbTransactionMode",
"IdbVersionChangeEvent", "MessageEvent", "ReadableStreamDefaultReader", "ReadableStream", "WebSocket", "Worker"] }
"IdbVersionChangeEvent", "MessageEvent", "MessagePort", "ReadableStreamDefaultReader", "ReadableStream", "SharedWorker", "WebSocket"] }


[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
Expand Down
35 changes: 31 additions & 4 deletions mm2src/mm2_net/src/wasm_event_stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
use mm2_core::mm_ctx::MmArc;
use serde_json::json;
use web_sys::SharedWorker;

struct SendableSharedWorker(SharedWorker);

unsafe impl Send for SendableSharedWorker {}

struct SendableMessagePort(web_sys::MessagePort);

unsafe impl Send for SendableMessagePort {}

/// Handles broadcasted messages from `mm2_event_stream` continuously for WASM.
pub async fn handle_worker_stream(ctx: MmArc) {
Expand All @@ -11,16 +20,34 @@ pub async fn handle_worker_stream(ctx: MmArc) {
let mut channel_controller = ctx.stream_channel_controller.clone();
let mut rx = channel_controller.create_channel(config.total_active_events());

let worker_path = config
.worker_path
.to_str()
.expect("worker_path contains invalid UTF-8 characters");
let worker = SendableSharedWorker(
SharedWorker::new(worker_path).unwrap_or_else(|_| {
panic!(
"Failed to create a new SharedWorker with path '{}'.\n\
This could be due to the file missing or the browser being incompatible.\n\
For more details, please refer to https://developer.mozilla.org/en-US/docs/Web/API/SharedWorker#browser_compatibility",
worker_path
)
}),
);

let port = SendableMessagePort(worker.0.port());
port.0.start();

while let Some(event) = rx.recv().await {
let data = json!({
"_type": event.event_type(),
"message": event.message(),
});

let worker = web_sys::Worker::new("worker.js").expect("Missing worker.js");
let message_js = wasm_bindgen::JsValue::from_str(&data.to_string());

worker.post_message(&message_js)
.expect("Incompatible browser!\nSee https://developer.mozilla.org/en-US/docs/Web/API/Worker/postMessage#browser_compatibility for details.");
port.0.post_message(&message_js)
.expect("Failed to post a message to the SharedWorker.\n\
This could be due to the browser being incompatible.\n\
For more details, please refer to https://developer.mozilla.org/en-US/docs/Web/API/MessagePort/postMessage#browser_compatibility");
}
}
Loading