Skip to content

Commit

Permalink
feat(event streaming): configurable worker path, use SharedWorker (#2080
Browse files Browse the repository at this point in the history
)

This commit allows any worker path to be used instead of hardcoded worker.js file. The worker path can be specified using worker_path in event_stream_configuration, if not specified worker.js will be used as default. Also, shared worker is used to allow using event streaming worker from multiple contexts.
  • Loading branch information
shamardy authored Mar 7, 2024
1 parent 7a8770c commit 10fe7fa
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 5 deletions.
14 changes: 14 additions & 0 deletions mm2src/mm2_event_stream/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use serde::Deserialize;
use std::collections::HashMap;
#[cfg(target_arch = "wasm32")] use std::path::PathBuf;

#[cfg(target_arch = "wasm32")]
const DEFAULT_WORKER_PATH: &str = "event_streaming_worker.js";

/// Multi-purpose/generic event type that can easily be used over the event streaming
pub struct Event {
Expand Down Expand Up @@ -34,8 +38,16 @@ pub struct EventStreamConfiguration {
pub access_control_allow_origin: String,
#[serde(default)]
active_events: HashMap<String, EventConfig>,
/// The path to the worker script for event streaming.
#[cfg(target_arch = "wasm32")]
#[serde(default = "default_worker_path")]
pub worker_path: PathBuf,
}

#[cfg(target_arch = "wasm32")]
#[inline]
fn default_worker_path() -> PathBuf { PathBuf::from(DEFAULT_WORKER_PATH) }

/// Represents the configuration for a specific event within the event stream.
#[derive(Clone, Default, Deserialize)]
pub struct EventConfig {
Expand All @@ -51,6 +63,8 @@ impl Default for EventStreamConfiguration {
Self {
access_control_allow_origin: String::from("*"),
active_events: Default::default(),
#[cfg(target_arch = "wasm32")]
worker_path: default_worker_path(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ wasm-bindgen-futures = "0.4.21"
web-sys = { version = "0.3.55", features = ["console", "CloseEvent", "DomException", "ErrorEvent", "IdbDatabase",
"IdbCursor", "IdbCursorWithValue", "IdbFactory", "IdbIndex", "IdbIndexParameters", "IdbObjectStore",
"IdbObjectStoreParameters", "IdbOpenDbRequest", "IdbKeyRange", "IdbTransaction", "IdbTransactionMode",
"IdbVersionChangeEvent", "MessageEvent", "ReadableStreamDefaultReader", "ReadableStream", "WebSocket", "Worker"] }
"IdbVersionChangeEvent", "MessageEvent", "MessagePort", "ReadableStreamDefaultReader", "ReadableStream", "SharedWorker", "WebSocket"] }


[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
Expand Down
35 changes: 31 additions & 4 deletions mm2src/mm2_net/src/wasm_event_stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
use mm2_core::mm_ctx::MmArc;
use serde_json::json;
use web_sys::SharedWorker;

struct SendableSharedWorker(SharedWorker);

unsafe impl Send for SendableSharedWorker {}

struct SendableMessagePort(web_sys::MessagePort);

unsafe impl Send for SendableMessagePort {}

/// Handles broadcasted messages from `mm2_event_stream` continuously for WASM.
pub async fn handle_worker_stream(ctx: MmArc) {
Expand All @@ -11,16 +20,34 @@ pub async fn handle_worker_stream(ctx: MmArc) {
let mut channel_controller = ctx.stream_channel_controller.clone();
let mut rx = channel_controller.create_channel(config.total_active_events());

let worker_path = config
.worker_path
.to_str()
.expect("worker_path contains invalid UTF-8 characters");
let worker = SendableSharedWorker(
SharedWorker::new(worker_path).unwrap_or_else(|_| {
panic!(
"Failed to create a new SharedWorker with path '{}'.\n\
This could be due to the file missing or the browser being incompatible.\n\
For more details, please refer to https://developer.mozilla.org/en-US/docs/Web/API/SharedWorker#browser_compatibility",
worker_path
)
}),
);

let port = SendableMessagePort(worker.0.port());
port.0.start();

while let Some(event) = rx.recv().await {
let data = json!({
"_type": event.event_type(),
"message": event.message(),
});

let worker = web_sys::Worker::new("worker.js").expect("Missing worker.js");
let message_js = wasm_bindgen::JsValue::from_str(&data.to_string());

worker.post_message(&message_js)
.expect("Incompatible browser!\nSee https://developer.mozilla.org/en-US/docs/Web/API/Worker/postMessage#browser_compatibility for details.");
port.0.post_message(&message_js)
.expect("Failed to post a message to the SharedWorker.\n\
This could be due to the browser being incompatible.\n\
For more details, please refer to https://developer.mozilla.org/en-US/docs/Web/API/MessagePort/postMessage#browser_compatibility");
}
}

0 comments on commit 10fe7fa

Please sign in to comment.