From 2e1030e93134b5b26ccd41007bd5b400b29b0001 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 28 Feb 2019 10:37:53 -0800 Subject: [PATCH] Rewrite the parallel raytracing example with `rayon` One of the best parts about concurrency in Rust is using `rayon` and how easy it makes parallelization of tasks, so it's the ideal example for parallel Rust on the web! Previously we've been unable to use `rayon` because there wasn't a way to customize how rayon threads themselves are spawned, but [that's now being developed for us][rayon]! This commit uses that PR to rewrite the `raytrace-parallel` example in this repository. While not a perfect idiomatic representation of using `rayon` I think this is far more idiomatic than the previous iteration of `raytrace-parallel`! I'm hoping that we can continue to iterate on this, but otherwise show it off as a good example of parallel Rust on the web. [rayon]: https://github.com/rayon-rs/rayon/pull/636 --- Cargo.toml | 2 + azure-pipelines.yml | 7 +- examples/raytrace-parallel/Cargo.toml | 2 + examples/raytrace-parallel/index.js | 24 +- examples/raytrace-parallel/src/lib.rs | 384 +++++++------------------ examples/raytrace-parallel/src/pool.rs | 295 +++++++++++++++++++ examples/raytrace-parallel/worker.js | 6 +- 7 files changed, 418 insertions(+), 302 deletions(-) create mode 100644 examples/raytrace-parallel/src/pool.rs diff --git a/Cargo.toml b/Cargo.toml index b7c9b81e9f1d..6fdad2720486 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,3 +90,5 @@ wasm-bindgen = { path = '.' } wasm-bindgen-futures = { path = 'crates/futures' } js-sys = { path = 'crates/js-sys' } web-sys = { path = 'crates/web-sys' } +rayon = { git = 'https://github.com/rayon-rs/rayon' } +rayon-core = { git = 'https://github.com/rayon-rs/rayon' } diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 70a05351dd9f..6cf524c4e7d9 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -164,14 +164,11 @@ jobs: steps: - template: ci/azure-install-rust.yml parameters: - toolchain: nightly-2019-04-01 + toolchain: nightly - template: ci/azure-install-sccache.yml - script: rustup component add rust-src displayName: "install rust-src" - - script: | - set -e - curl -L https://github.com/japaric/xargo/releases/download/v0.3.13/xargo-v0.3.13-x86_64-unknown-linux-musl.tar.gz | tar xzf - - echo "##vso[task.prependpath]$PWD" + - script: cargo install xargo displayName: "install xargo" - script: | set -e diff --git a/examples/raytrace-parallel/Cargo.toml b/examples/raytrace-parallel/Cargo.toml index bf9531a4e6c9..6d15c19b4791 100644 --- a/examples/raytrace-parallel/Cargo.toml +++ b/examples/raytrace-parallel/Cargo.toml @@ -11,6 +11,8 @@ crate-type = ["cdylib"] console_error_panic_hook = "0.1" futures = "0.1" js-sys = "0.3.22" +rayon = "1.0.3" +rayon-core = "1.0.3" raytracer = { git = 'https://github.com/alexcrichton/raytracer', branch = 'update-deps' } wasm-bindgen = { version = "0.2.45", features = ['serde-serialize'] } wasm-bindgen-futures = "0.3.22" diff --git a/examples/raytrace-parallel/index.js b/examples/raytrace-parallel/index.js index 7a8a39c682bd..8017c2f576db 100644 --- a/examples/raytrace-parallel/index.js +++ b/examples/raytrace-parallel/index.js @@ -44,6 +44,7 @@ function run() { // Configure various buttons and such. button.onclick = function() { + button.disabled = true; console.time('render'); let json; try { @@ -82,22 +83,28 @@ class State { this.running = true; this.counter = 1; - this.interval = setInterval(() => this.updateTimer(), 100); + this.interval = setInterval(() => this.updateTimer(true), 100); wasm.promise() - .then(() => { - this.updateTimer(); + .then(data => { + this.updateTimer(false); + this.updateImage(data); this.stop(); }) .catch(console.error); } - updateTimer() { + updateTimer(updateImage) { const dur = performance.now() - this.start; timingVal.innerText = `${dur}ms`; this.counter += 1; - if (this.wasm && this.counter % 3 == 0) - this.wasm.requestUpdate(); + + if (updateImage && this.wasm && this.counter % 3 == 0) + this.updateImage(this.wasm.imageSoFar()); + } + + updateImage(data) { + ctx.putImageData(data, 0, 0); } stop() { @@ -105,9 +112,9 @@ class State { return; console.timeEnd('render'); this.running = false; - pool = this.wasm.cancel(); // this frees `wasm`, returning the worker pool this.wasm = null; clearInterval(this.interval); + button.disabled = false; } } @@ -116,6 +123,5 @@ function render(scene) { rendering.stop(); rendering = null; } - rendering = new State(scene.render(parseInt(concurrency.value), pool, ctx)); - pool = null; // previous call took ownership of `pool`, zero it out here too + rendering = new State(scene.render(parseInt(concurrency.value), pool)); } diff --git a/examples/raytrace-parallel/src/lib.rs b/examples/raytrace-parallel/src/lib.rs index d944416f2bcb..8f99ed9236c9 100644 --- a/examples/raytrace-parallel/src/lib.rs +++ b/examples/raytrace-parallel/src/lib.rs @@ -1,25 +1,21 @@ -use std::cell::RefCell; -use std::cmp; -use std::rc::Rc; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst}; -use std::sync::{Arc, Mutex, MutexGuard}; - -use futures::sync::oneshot; use futures::Future; -use js_sys::{Array, Error, Promise, Uint8ClampedArray, WebAssembly}; +use js_sys::{Promise, Uint8ClampedArray, WebAssembly}; +use rayon::prelude::*; use wasm_bindgen::prelude::*; use wasm_bindgen::JsCast; -use web_sys::{CanvasRenderingContext2d, ErrorEvent, Event, Worker}; -use web_sys::{DedicatedWorkerGlobalScope, MessageEvent}; macro_rules! console_log { - ($($t:tt)*) => (log(&format_args!($($t)*).to_string())) + ($($t:tt)*) => (crate::log(&format_args!($($t)*).to_string())) } +mod pool; + #[wasm_bindgen] extern "C" { #[wasm_bindgen(js_namespace = console)] fn log(s: &str); + #[wasm_bindgen(js_namespace = console, js_name = log)] + fn logv(x: &JsValue); } #[wasm_bindgen] @@ -27,10 +23,10 @@ pub struct Scene { inner: raytracer::scene::Scene, } -static NEXT_ID: AtomicUsize = AtomicUsize::new(0); - #[wasm_bindgen] impl Scene { + /// Creates a new scene from the JSON description in `object`, which we + /// deserialize here into an actual scene. #[wasm_bindgen(constructor)] pub fn new(object: &JsValue) -> Result { console_error_panic_hook::set_once(); @@ -41,304 +37,118 @@ impl Scene { }) } + /// Renders this scene with the provided concurrency and worker pool. + /// + /// This will spawn up to `concurrency` workers which are loaded from or + /// spawned into `pool`. The `RenderingScene` state contains information to + /// get notifications when the render has completed. pub fn render( self, concurrency: usize, - pool: WorkerPool, - ctx: CanvasRenderingContext2d, + pool: &pool::WorkerPool, ) -> Result { - let (tx, rx) = oneshot::channel(); - let rx = rx.then(|_| Ok(JsValue::undefined())); - - let data = Rc::new(RefCell::new(None::)); - - let pixels = (self.inner.width * self.inner.height) as usize; - let mut r = Render { - tx: Some(tx), - callback: None, - shared: Arc::new(Shared { - id: NEXT_ID.fetch_add(1, SeqCst), - need_update: AtomicBool::new(false), - scene: self.inner, - next_pixel: AtomicUsize::new(0), - remaining: AtomicUsize::new(concurrency), - rgb_data: Mutex::new(vec![0; 4 * pixels]), - }), - ctx, - }; - - let data2 = data.clone(); - let callback = Closure::wrap(Box::new(move |msg: Event| -> Result<(), JsValue> { - let mut slot = data2.borrow_mut(); - if let Some(mut data) = slot.take() { - match data.event(&msg) { - Ok(true) => {} - Ok(false) => *slot = Some(data), - Err(e) => { - *slot = Some(data); - return Err(e); - } - } - } - Ok(()) - }) as Box Result<(), JsValue>>); - - for worker in &pool.workers[..concurrency] { - let ptr_to_send = Arc::into_raw(r.shared.clone()) as u32; - let ptr_to_send = JsValue::from(ptr_to_send); - worker.post_message(&ptr_to_send)?; - worker.set_onmessage(Some(callback.as_ref().unchecked_ref())); - worker.set_onerror(Some(callback.as_ref().unchecked_ref())); - } - - r.callback = Some(callback); - *data.borrow_mut() = Some(r); + let scene = self.inner; + let height = scene.height; + let width = scene.width; + + // Allocate the pixel data which our threads will be writing into. + let pixels = (width * height) as usize; + let mut rgb_data = vec![0; 4 * pixels]; + let base = rgb_data.as_ptr() as usize; + let len = rgb_data.len(); + + // Configure a rayon thread pool which will pull web workers from + // `pool`. + let thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(concurrency - 1) + .spawn_handler(|thread| Ok(pool.run(|| thread.run()).unwrap())) + .build() + .unwrap(); + + // And now execute the render! The entire render happens on our worker + // threads so we don't lock up the main thread, so we ship off a thread + // which actually does the whole rayon business. When our returned + // future is resolved we can pull out the final version of the image. + let done = pool + .run_notify(move || { + thread_pool.install(|| { + rgb_data + .par_chunks_mut(4) + .enumerate() + .for_each(|(i, chunk)| { + let i = i as u32; + let x = i % width; + let y = i / width; + let ray = raytracer::Ray::create_prime(x, y, &scene); + let result = raytracer::cast_ray(&scene, &ray, 0).to_rgba(); + chunk[0] = result.data[0]; + chunk[1] = result.data[1]; + chunk[2] = result.data[2]; + chunk[3] = result.data[3]; + }); + }); + rgb_data + })? + .map(move |_data| image_data(base, len, width, height).into()); Ok(RenderingScene { - inner: data, - promise: wasm_bindgen_futures::future_to_promise(rx), - pool, + promise: wasm_bindgen_futures::future_to_promise(done), + base, + len, + height, + width, }) } } -#[wasm_bindgen] -pub struct WorkerPool { - workers: Vec, - callback: Closure, -} - -#[wasm_bindgen] -impl WorkerPool { - #[wasm_bindgen(constructor)] - pub fn new(max: u32) -> Result { - let callback = Closure::wrap(Box::new(|event: Event| { - console_log!("unhandled event: {}", event.type_()); - }) as Box); - let mut workers = Vec::new(); - for _ in 0..max { - // TODO: what do do about `./worker.js`: - // - // * the path is only known by the bundler. How can we, as a - // library, know what's going on? - // * How do we not fetch a script N times? It internally then - // causes another script to get fetched N times... - let worker = Worker::new("./worker.js")?; - let array = js_sys::Array::new(); - array.push(&wasm_bindgen::module()); - - // TODO: memory allocation error handling here is hard: - // - // * How to we make sure that our strong ref made it to a client - // thread? - // * Need to handle the `?` on `post_message` as well. - array.push(&wasm_bindgen::memory()); - worker.post_message(&array)?; - worker.set_onmessage(Some(callback.as_ref().unchecked_ref())); - worker.set_onerror(Some(callback.as_ref().unchecked_ref())); - workers.push(worker); - } - - Ok(WorkerPool { workers, callback }) - } -} - -impl Drop for WorkerPool { - fn drop(&mut self) { - for worker in self.workers.iter() { - worker.terminate(); - } - } -} - #[wasm_bindgen] pub struct RenderingScene { - inner: Rc>>, + base: usize, + len: usize, promise: Promise, - pool: WorkerPool, -} - -#[wasm_bindgen] -impl RenderingScene { - pub fn promise(&self) -> Promise { - self.promise.clone() - } - - #[wasm_bindgen(js_name = requestUpdate)] - pub fn request_update(&self) { - if let Some(render) = self.inner.borrow().as_ref() { - render.shared.need_update.store(true, SeqCst); - } - } - - pub fn cancel(self) -> WorkerPool { - if let Some(render) = self.inner.borrow_mut().take() { - // drain the rest of the pixels to cause all workers to cancel ASAP. - let pixels = render.shared.scene.width * render.shared.scene.height; - render.shared.next_pixel.fetch_add(pixels as usize, SeqCst); - } - for worker in self.pool.workers.iter() { - worker.set_onmessage(Some(&self.pool.callback.as_ref().unchecked_ref())); - worker.set_onerror(Some(&self.pool.callback.as_ref().unchecked_ref())); - } - self.pool - } -} - -struct Render { - callback: Option Result<(), JsValue>>>, - tx: Option>, - shared: Arc, - ctx: CanvasRenderingContext2d, -} - -struct Shared { - id: usize, - need_update: AtomicBool, - scene: raytracer::scene::Scene, - next_pixel: AtomicUsize, - remaining: AtomicUsize, - rgb_data: Mutex>, + width: u32, + height: u32, } +// Inline the definition of `ImageData` here because `web_sys` uses +// `&Clamped>`, whereas we want to pass in a JS object here. #[wasm_bindgen] extern "C" { - type ImageData; + pub type ImageData; #[wasm_bindgen(constructor, catch)] fn new(data: &Uint8ClampedArray, width: f64, height: f64) -> Result; } -impl Render { - fn event(&mut self, event: &Event) -> Result { - if let Some(error) = event.dyn_ref::() { - let msg = format!("error in worker: {}", error.message()); - return Err(Error::new(&msg).into()); - } - - if let Some(msg) = event.dyn_ref::() { - let data = msg.data(); - if let Some(data) = data.dyn_ref::() { - let id = data.pop(); - let done = data.pop(); - let image = data.pop(); - if let Some(id) = id.as_f64() { - if id == self.shared.id as f64 { - self.ctx.put_image_data(image.unchecked_ref(), 0.0, 0.0)?; - return Ok(done.as_bool() == Some(true)); - } - } - } - console_log!("unhandled message: {:?}", data); - return Ok(false); - } - - console_log!("unhandled event: {}", event.type_()); - - Ok(false) - } -} - #[wasm_bindgen] -pub fn child_entry_point(ptr: u32) -> Result<(), JsValue> { - let ptr = unsafe { Arc::from_raw(ptr as *const Shared) }; - assert_send(&ptr); - - let global = js_sys::global().unchecked_into::(); - ptr.work(&global)?; - - return Ok(()); - - fn assert_send(_: &T) {} -} - -impl Shared { - fn work(&self, global: &DedicatedWorkerGlobalScope) -> Result<(), JsValue> { - // Once we're done raytracing a pixel we need to actually write its rgb - // value into the shared memory buffer for our image. This, however, - // requires synchronization with other threads (as currently - // implemented). To help amortize the cost of synchronization each - // thread processes a chunk of pixels at a time, and this number is how - // many pixes will be rendered synchronously before committing them back - // to memory. - const BLOCK: usize = 1024; - - let width = self.scene.width as usize; - let height = self.scene.height as usize; - let end = width * height; - - // Thread-local storage for our RGB data, commited back in one batch to - // the main image memory. - let mut local_rgb = [0; BLOCK * 4]; - - loop { - // First up, grab a block of pixels to render using an atomic add. - // If we're beyond the end then we're done! - let start = self.next_pixel.fetch_add(BLOCK, SeqCst); - if start >= end { - break; - } - - // Raytrace all our pixels synchronously, writing all the results - // into our local memory buffer. - let len = cmp::min(end, start + BLOCK) - start; - for (i, dst) in local_rgb.chunks_mut(4).enumerate().take(len) { - let x = (start + i) % width; - let y = (start + i) / width; - let ray = raytracer::Ray::create_prime(x as u32, y as u32, &self.scene); - let result = raytracer::cast_ray(&self.scene, &ray, 0).to_rgba(); - dst[0] = result.data[0]; - dst[1] = result.data[1]; - dst[2] = result.data[2]; - dst[3] = result.data[3]; - } - - // Ok, time to synchronize and commit this data back into the main - // image buffer for other threads and the main thread to see. - let mut data = self.rgb_data.lock().unwrap(); - data[start * 4..(start + len) * 4].copy_from_slice(&mut local_rgb[..len * 4]); - - // As a "nifty feature" we try to have a live progressive rendering. - // That means that we need to periodically send an `ImageData` to - // the main thread. Do so whenever the main thread requests it. - if self.need_update.swap(false, SeqCst) { - self.update_image(false, data, global)?; - } - } - - // If we're the last thread out, be sure to update the main thread's - // image as this is the last chance we'll get! - if self.remaining.fetch_sub(1, SeqCst) == 1 { - let data = self.rgb_data.lock().unwrap(); - self.update_image(true, data, global)?; - } - - Ok(()) +impl RenderingScene { + /// Returns the JS promise object which resolves when the render is complete + pub fn promise(&self) -> Promise { + self.promise.clone() } - fn update_image( - &self, - done: bool, - data: MutexGuard<'_, Vec>, - global: &DedicatedWorkerGlobalScope, - ) -> Result<(), JsValue> { - // This is pretty icky. We can't create an `ImageData` backed by - // `SharedArrayBuffer`, so we need to copy the memory into a local - // JS array using `slice`. This means we can't use - // `web_sys::ImageData` right now but rather we have to use our own - // binding. - let mem = wasm_bindgen::memory().unchecked_into::(); - let mem = Uint8ClampedArray::new(&mem.buffer()).slice( - data.as_ptr() as u32, - data.as_ptr() as u32 + data.len() as u32, - ); - drop(data); // unlock the lock, we've copied the data now - let data = ImageData::new(&mem, self.scene.width as f64, self.scene.height as f64)?; - let arr = Array::new(); - arr.push(&data); - arr.push(&JsValue::from(done)); - arr.push(&JsValue::from(self.id as f64)); - global.post_message(&arr)?; - Ok(()) + /// Return a progressive rendering of the image so far + #[wasm_bindgen(js_name = imageSoFar)] + pub fn image_so_far(&self) -> ImageData { + image_data(self.base, self.len, self.width, self.height) } } + +fn image_data(base: usize, len: usize, width: u32, height: u32) -> ImageData { + // Use the raw access available through `memory.buffer`, but be sure to + // use `slice` instead of `subarray` to create a copy that isn't backed + // by `SharedArrayBuffer`. Currently `ImageData` rejects a view of + // `Uint8ClampedArray` that's backed by a shared buffer. + // + // FIXME: that this may or may not be UB based on Rust's rules. For example + // threads may be doing unsynchronized writes to pixel data as we read it + // off here. In the context of wasm this may or may not be UB, we're + // unclear! In any case for now it seems to work and produces a nifty + // progressive rendering. A more production-ready application may prefer to + // instead use some form of signaling here to request an update from the + // workers instead of synchronously acquiring an update, and that way we + // could ensure that even on the Rust side of things it's not UB. + let mem = wasm_bindgen::memory().unchecked_into::(); + let mem = Uint8ClampedArray::new(&mem.buffer()).slice(base as u32, (base + len) as u32); + ImageData::new(&mem, width as f64, height as f64).unwrap() +} diff --git a/examples/raytrace-parallel/src/pool.rs b/examples/raytrace-parallel/src/pool.rs new file mode 100644 index 000000000000..921d3c1611fb --- /dev/null +++ b/examples/raytrace-parallel/src/pool.rs @@ -0,0 +1,295 @@ +//! A small module that's intended to provide an example of creating a pool of +//! web workers which can be used to execute `rayon`-style work. + +use futures::sync::oneshot; +use futures::Future; +use std::cell::{RefCell, UnsafeCell}; +use std::mem; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; +use std::sync::Arc; +use wasm_bindgen::prelude::*; +use wasm_bindgen::JsCast; +use web_sys::{DedicatedWorkerGlobalScope, MessageEvent}; +use web_sys::{ErrorEvent, Event, Worker}; + +#[wasm_bindgen] +pub struct WorkerPool { + state: Rc, +} + +struct PoolState { + workers: RefCell>, + callback: Closure, +} + +struct Work { + func: Box, +} + +#[wasm_bindgen] +impl WorkerPool { + /// Creates a new `WorkerPool` which immediately creates `initial` workers. + /// + /// The pool created here can be used over a long period of time, and it + /// will be initially primed with `initial` workers. Currently workers are + /// never released or gc'd until the whole pool is destroyed. + /// + /// # Errors + /// + /// Returns any error that may happen while a JS web worker is created and a + /// message is sent to it. + #[wasm_bindgen(constructor)] + pub fn new(initial: usize) -> Result { + let pool = WorkerPool { + state: Rc::new(PoolState { + workers: RefCell::new(Vec::with_capacity(initial)), + callback: Closure::wrap(Box::new(|event: Event| { + console_log!("unhandled event: {}", event.type_()); + crate::logv(&event); + }) as Box), + }), + }; + for _ in 0..initial { + let worker = pool.spawn()?; + pool.state.push(worker); + } + + Ok(pool) + } + + /// Unconditionally spawns a new worker + /// + /// The worker isn't registered with this `WorkerPool` but is capable of + /// executing work for this wasm module. + /// + /// # Errors + /// + /// Returns any error that may happen while a JS web worker is created and a + /// message is sent to it. + fn spawn(&self) -> Result { + console_log!("spawning new worker"); + // TODO: what do do about `./worker.js`: + // + // * the path is only known by the bundler. How can we, as a + // library, know what's going on? + // * How do we not fetch a script N times? It internally then + // causes another script to get fetched N times... + let worker = Worker::new("./worker.js")?; + + // With a worker spun up send it the module/memory so it can start + // instantiating the wasm module. Later it might receive further + // messages about code to run on the wasm module. + let array = js_sys::Array::new(); + array.push(&wasm_bindgen::module()); + array.push(&wasm_bindgen::memory()); + worker.post_message(&array)?; + + Ok(worker) + } + + /// Fetches a worker from this pool, spawning one if necessary. + /// + /// This will attempt to pull an already-spawned web worker from our cache + /// if one is available, otherwise it will spawn a new worker and return the + /// newly spawned worker. + /// + /// # Errors + /// + /// Returns any error that may happen while a JS web worker is created and a + /// message is sent to it. + fn worker(&self) -> Result { + match self.state.workers.borrow_mut().pop() { + Some(worker) => Ok(worker), + None => self.spawn(), + } + } + + /// Executes the work `f` in a web worker, spawning a web worker if + /// necessary. + /// + /// This will acquire a web worker and then send the closure `f` to the + /// worker to execute. The worker won't be usable for anything else while + /// `f` is executing, and no callbacks are registered for when the worker + /// finishes. + /// + /// # Errors + /// + /// Returns any error that may happen while a JS web worker is created and a + /// message is sent to it. + fn execute(&self, f: impl FnOnce() + Send + 'static) -> Result { + let worker = self.worker()?; + let work = Box::new(Work { func: Box::new(f) }); + let ptr = Box::into_raw(work); + match worker.post_message(&JsValue::from(ptr as u32)) { + Ok(()) => Ok(worker), + Err(e) => { + unsafe { + drop(Box::from_raw(ptr)); + } + Err(e) + } + } + } + + /// Configures an `onmessage` callback for the `worker` specified for the + /// web worker to be reclaimed and re-inserted into this pool when a message + /// is received. + /// + /// Currently this `WorkerPool` abstraction is intended to execute one-off + /// style work where the work itself doesn't send any notifications and + /// whatn it's done the worker is ready to execute more work. This method is + /// used for all spawned workers to ensure that when the work is finished + /// the worker is reclaimed back into this pool. + fn reclaim_on_message(&self, worker: Worker, on_finish: impl FnOnce() + 'static) { + let state = Rc::downgrade(&self.state); + let worker2 = worker.clone(); + let reclaim_slot = Rc::new(RefCell::new(None)); + let slot2 = reclaim_slot.clone(); + let mut on_finish = Some(on_finish); + let reclaim = Closure::wrap(Box::new(move |event: Event| { + if let Some(error) = event.dyn_ref::() { + console_log!("error in worker: {}", error.message()); + // TODO: this probably leaks memory somehow? It's sort of + // unclear what to do about errors in workers right now. + return; + } + + // If this is a completion event then we can execute our `on_finish` + // callback and we can also deallocate our own callback by clearing + // out `slot2` which contains our own closure. + if let Some(_msg) = event.dyn_ref::() { + on_finish.take().unwrap()(); + if let Some(state) = state.upgrade() { + state.push(worker2.clone()); + } + *slot2.borrow_mut() = None; + return; + } + + console_log!("unhandled event: {}", event.type_()); + crate::logv(&event); + // TODO: like above, maybe a memory leak here? + }) as Box); + worker.set_onmessage(Some(reclaim.as_ref().unchecked_ref())); + *reclaim_slot.borrow_mut() = Some(reclaim); + } +} + +impl WorkerPool { + /// Executes `f` in a web worker. + /// + /// This pool manages a set of web workers to draw from, and `f` will be + /// spawned quickly into one if the worker is idle. If no idle workers are + /// available then a new web worker will be spawned. + /// + /// Once `f` returns the worker assigned to `f` is automatically reclaimed + /// by this `WorkerPool`. This method provides no method of learning when + /// `f` completes, and for that you'll need to use `run_notify`. + /// + /// # Errors + /// + /// If an error happens while spawning a web worker or sending a message to + /// a web worker, that error is returned. + pub fn run(&self, f: impl FnOnce() + Send + 'static) -> Result<(), JsValue> { + let worker = self.execute(f)?; + self.reclaim_on_message(worker, || {}); + Ok(()) + } + + /// Executes the closure `f` in a web worker, returning a future of the + /// value that `f` produces. + /// + /// This method is the same as `run` execept that it allows recovering the + /// return value of the closure `f` in a nonblocking fashion with the future + /// returned. + /// + /// # Errors + /// + /// If an error happens while spawning a web worker or sending a message to + /// a web worker, that error is returned. + pub fn run_notify( + &self, + f: impl FnOnce() -> T + Send + 'static, + ) -> Result + 'static, JsValue> + where + T: Send + 'static, + { + // FIXME(#1379) we should just use the `oneshot` directly as the future, + // but we have to use JS callbacks to ensure we don't have futures cross + // threads as that's currently not safe to do so. + let (tx, rx) = oneshot::channel(); + let storage = Arc::new(AtomicValue::new(None)); + let storage2 = storage.clone(); + let worker = self.execute(move || { + assert!(storage2.replace(Some(f())).is_ok()); + })?; + self.reclaim_on_message(worker, move || match storage.replace(None) { + Ok(Some(val)) => drop(tx.send(val)), + _ => unreachable!(), + }); + + Ok(rx.map_err(|_| JsValue::undefined())) + } +} + +/// A small helper struct representing atomic access to an internal value `T` +/// +/// This struct only supports one API, `replace`, which will either succeed and +/// replace the internal value with another (returning the previous one), or it +/// will fail returning the value passed in. Failure happens when two threads +/// try to `replace` at the same time. +/// +/// This is only really intended to help safely transfer information between +/// threads, it doesn't provide any synchronization capabilities itself other +/// than a guaranteed safe API. +struct AtomicValue { + modifying: AtomicBool, + slot: UnsafeCell, +} + +unsafe impl Send for AtomicValue {} +unsafe impl Sync for AtomicValue {} + +impl AtomicValue { + fn new(val: T) -> AtomicValue { + AtomicValue { + modifying: AtomicBool::new(false), + slot: UnsafeCell::new(val), + } + } + + fn replace(&self, val: T) -> Result { + if self.modifying.swap(true, SeqCst) { + return Err(val); + } + let ret = unsafe { mem::replace(&mut *self.slot.get(), val) }; + self.modifying.store(false, SeqCst); + Ok(ret) + } +} + +impl PoolState { + fn push(&self, worker: Worker) { + worker.set_onmessage(Some(self.callback.as_ref().unchecked_ref())); + worker.set_onerror(Some(self.callback.as_ref().unchecked_ref())); + let mut workers = self.workers.borrow_mut(); + for prev in workers.iter() { + let prev: &JsValue = prev; + let worker: &JsValue = &worker; + assert!(prev != worker); + } + workers.push(worker); + } +} + +/// Entry point invoked by `worker.js`, a bit of a hack but see the "TODO" above +/// about `worker.js` in general. +#[wasm_bindgen] +pub fn child_entry_point(ptr: u32) -> Result<(), JsValue> { + let ptr = unsafe { Box::from_raw(ptr as *mut Work) }; + let global = js_sys::global().unchecked_into::(); + (ptr.func)(); + global.post_message(&JsValue::undefined())?; + Ok(()) +} diff --git a/examples/raytrace-parallel/worker.js b/examples/raytrace-parallel/worker.js index dae9ae087240..05ca4f0076a7 100644 --- a/examples/raytrace-parallel/worker.js +++ b/examples/raytrace-parallel/worker.js @@ -28,5 +28,9 @@ function run(ptr) { return; } lastPtr = null; - wasm_bindgen.child_entry_point(ptr); + try { + wasm_bindgen.child_entry_point(ptr); + } catch (e) { + throw new Error(e.message + "\n\n" + e.stack); + } }