Skip to content

Commit

Permalink
On Web, implement Send and `Sync where appropriate
Browse files Browse the repository at this point in the history
  • Loading branch information
daxpedda committed Jun 1, 2023
1 parent 4ac2006 commit eb4a545
Show file tree
Hide file tree
Showing 22 changed files with 625 additions and 211 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ And please only add new entries to the top of this list, right below the `# Unre
- On Web: fix position of touch events to be relative to the canvas.
- On Web, fix `Window:::set_fullscreen` doing nothing when called outside the event loop but during
a transient activation.
- On Web, `EventLoopProxy` now implements `Send`.
- On Web, `Window` now implements `Send` and `Sync`.

# 0.28.6

Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ features = [
]

[target.'cfg(target_family = "wasm")'.dependencies]
atomic-waker = "1"
js-sys = "0.3"
wasm-bindgen = "0.2.45"
wasm-bindgen-futures = "0.4"
Expand Down
3 changes: 3 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
disallowed-methods = [
{ path = "web_sys::window", reason = "is not available in every context" },
]
4 changes: 2 additions & 2 deletions examples/web.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![allow(clippy::single_match)]
#![allow(clippy::disallowed_methods, clippy::single_match)]

use winit::{
event::{Event, WindowEvent},
Expand Down Expand Up @@ -52,7 +52,7 @@ mod wasm {
pub fn insert_canvas_and_create_log_list(window: &Window) -> web_sys::Element {
use winit::platform::web::WindowExtWebSys;

let canvas = window.canvas();
let canvas = window.canvas().unwrap();

let window = web_sys::window().unwrap();
let document = window.document().unwrap();
Expand Down
4 changes: 3 additions & 1 deletion examples/web_aspect_ratio.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(clippy::disallowed_methods)]

pub fn main() {
println!("This example must be run with cargo run-wasm --example web_aspect_ratio")
}
Expand Down Expand Up @@ -66,7 +68,7 @@ This example demonstrates the desired future functionality which will possibly b
let body = document.body().unwrap();

// Set a background color for the canvas to make it easier to tell the where the canvas is for debugging purposes.
let canvas = window.canvas();
let canvas = window.canvas().unwrap();
canvas
.style()
.set_css_text("display: block; background-color: crimson; margin: auto; width: 50%; aspect-ratio: 4 / 1;");
Expand Down
3 changes: 2 additions & 1 deletion src/platform/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use crate::window::WindowBuilder;
use web_sys::HtmlCanvasElement;

pub trait WindowExtWebSys {
fn canvas(&self) -> HtmlCanvasElement;
/// Only returns the canvas if called from inside the window.
fn canvas(&self) -> Option<HtmlCanvasElement>;

/// Whether the browser reports the preferred color scheme to be "dark".
fn is_dark_mode(&self) -> bool;
Expand Down
314 changes: 314 additions & 0 deletions src/platform_impl/web/async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
use atomic_waker::AtomicWaker;
use once_cell::unsync::Lazy;
use std::future;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, RecvError, SendError, Sender, TryRecvError};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::task::Poll;
use wasm_bindgen::prelude::wasm_bindgen;
use wasm_bindgen::{JsCast, JsValue};

// Unsafe wrapper type that allows us to use `T` when it's not `Send` from other threads.
// `value` **must** only be accessed on the main thread.
pub struct MainThreadSafe<const SYNC: bool, T: 'static, E: 'static> {
// We wrap this in an `Arc` to allow it to be safely cloned without accessing the value.
// The `RwLock` lets us safely drop in any thread.
// The `Option` lets us safely drop `T` only in the main thread, while letting other threads drop `None`.
value: Arc<RwLock<Option<T>>>,
handler: fn(&RwLock<Option<T>>, E),
sender: AsyncSender<E>,
// Prevent's `Send` or `Sync` to be automatically implemented.
local: PhantomData<*const ()>,
}

impl<const SYNC: bool, T, E> MainThreadSafe<SYNC, T, E> {
thread_local! {
static MAIN_THREAD: Lazy<bool> = Lazy::new(|| {
#[wasm_bindgen]
extern "C" {
#[derive(Clone)]
type Global;

#[wasm_bindgen(method, getter, js_name = Window)]
fn window(this: &Global) -> JsValue;
}

let global: Global = js_sys::global().unchecked_into();
!global.window().is_undefined()
});
}

#[track_caller]
fn new(value: T, handler: fn(&RwLock<Option<T>>, E)) -> Option<Self> {
Self::MAIN_THREAD.with(|safe| {
if !*safe.deref() {
panic!("only callable from inside the `Window`")
}
});

let value = Arc::new(RwLock::new(Some(value)));

let (sender, receiver) = channel::<E>();

wasm_bindgen_futures::spawn_local({
let value = Arc::clone(&value);
async move {
while let Ok(event) = receiver.next().await {
handler(&value, event)
}

// An error was returned because the channel was closed, which
// happens when all senders are dropped.
value.write().unwrap().take().unwrap();
}
});

Some(Self {
value,
handler,
sender,
local: PhantomData,
})
}

pub fn send(&self, event: E) {
Self::MAIN_THREAD.with(|is_main_thread| {
if *is_main_thread.deref() {
(self.handler)(&self.value, event)
} else {
self.sender.send(event).unwrap()
}
})
}

fn is_main_thread(&self) -> bool {
Self::MAIN_THREAD.with(|is_main_thread| *is_main_thread.deref())
}

pub fn with<R>(&self, f: impl FnOnce(&T) -> R) -> Option<R> {
Self::MAIN_THREAD.with(|is_main_thread| {
if *is_main_thread.deref() {
Some(f(self.value.read().unwrap().as_ref().unwrap()))
} else {
None
}
})
}

fn with_mut<R>(&self, f: impl FnOnce(&mut T) -> R) -> Option<R> {
Self::MAIN_THREAD.with(|is_main_thread| {
if *is_main_thread.deref() {
Some(f(self.value.write().unwrap().as_mut().unwrap()))
} else {
None
}
})
}
}

impl<const SYNC: bool, T, E> Clone for MainThreadSafe<SYNC, T, E> {
fn clone(&self) -> Self {
Self {
value: self.value.clone(),
handler: self.handler,
sender: self.sender.clone(),
local: PhantomData,
}
}
}

unsafe impl<const SYNC: bool, T, E> Send for MainThreadSafe<SYNC, T, E> {}
unsafe impl<T, E> Sync for MainThreadSafe<true, T, E> {}

fn channel<T>() -> (AsyncSender<T>, AsyncReceiver<T>) {
let (sender, receiver) = mpsc::channel();
let sender = Arc::new(Mutex::new(sender));
let waker = Arc::new(AtomicWaker::new());
let closed = Arc::new(AtomicBool::new(false));

let sender = AsyncSender {
sender,
closed: closed.clone(),
waker: Arc::clone(&waker),
};
let receiver = AsyncReceiver {
receiver,
closed,
waker,
};

(sender, receiver)
}

struct AsyncSender<T> {
// We need to wrap it into a `Mutex` to make it `Sync`. So the sender can't
// be accessed on the main thread, as it could block. Additionally we need
// to wrap it in an `Arc` to make it clonable on the main thread without
// having to block.
sender: Arc<Mutex<Sender<T>>>,
closed: Arc<AtomicBool>,
waker: Arc<AtomicWaker>,
}

impl<T> AsyncSender<T> {
pub fn send(&self, event: T) -> Result<(), SendError<T>> {
self.sender.lock().unwrap().send(event)?;
self.waker.wake();

Ok(())
}
}

impl<T> Clone for AsyncSender<T> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
waker: self.waker.clone(),
closed: self.closed.clone(),
}
}
}

impl<T> Drop for AsyncSender<T> {
fn drop(&mut self) {
// If it's the last + the one held by the receiver make sure to wake it
// up and tell it that all receiver have dropped.
if Arc::strong_count(&self.closed) == 2 {
self.closed.store(true, Ordering::Relaxed);
self.waker.wake()
}
}
}

struct AsyncReceiver<T> {
receiver: Receiver<T>,
closed: Arc<AtomicBool>,
waker: Arc<AtomicWaker>,
}

impl<T> AsyncReceiver<T> {
pub async fn next(&self) -> Result<T, RecvError> {
future::poll_fn(|cx| match self.receiver.try_recv() {
Ok(event) => Poll::Ready(Ok(event)),
Err(TryRecvError::Empty) => {
if self.closed.load(Ordering::Relaxed) {
return Poll::Ready(Err(RecvError));
}

self.waker.register(cx.waker());

match self.receiver.try_recv() {
Ok(event) => Poll::Ready(Ok(event)),
Err(TryRecvError::Empty) => {
if self.closed.load(Ordering::Relaxed) {
Poll::Ready(Err(RecvError))
} else {
Poll::Pending
}
}
Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)),
}
}
Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)),
})
.await
}
}

pub struct Dispatcher<T: 'static>(MainThreadSafe<true, T, Closure<T>>);

pub enum Closure<T> {
Ref(Box<dyn FnOnce(&T) + Send>),
RefMut(Box<dyn FnOnce(&mut T) + Send>),
}

impl<T> Dispatcher<T> {
#[track_caller]
pub fn new(value: T) -> Option<Self> {
MainThreadSafe::new(value, |value, closure| match closure {
Closure::Ref(f) => f(value.read().unwrap().as_ref().unwrap()),
Closure::RefMut(f) => f(value.write().unwrap().as_mut().unwrap()),
})
.map(Self)
}

pub fn dispatch(&self, f: impl 'static + FnOnce(&T) + Send) {
if self.is_main_thread() {
self.0.with(f).unwrap()
} else {
self.0.send(Closure::Ref(Box::new(f)))
}
}

pub fn dispatch_mut(&self, f: impl 'static + FnOnce(&mut T) + Send) {
if self.is_main_thread() {
self.0.with_mut(f).unwrap()
} else {
self.0.send(Closure::RefMut(Box::new(f)))
}
}

pub fn queue<R: 'static + Send>(&self, f: impl 'static + FnOnce(&T) -> R + Send) -> R {
if self.is_main_thread() {
self.0.with(f).unwrap()
} else {
let pair = Arc::new((Mutex::new(None), Condvar::new()));
let closure = Closure::Ref(Box::new({
let pair = pair.clone();
move |value| {
*pair.0.lock().unwrap() = Some(f(value));
pair.1.notify_one();
}
}));

self.0.send(closure);

let mut started = pair.0.lock().unwrap();

while started.is_none() {
started = pair.1.wait(started).unwrap();
}

started.take().unwrap()
}
}
}

impl<T> Deref for Dispatcher<T> {
type Target = MainThreadSafe<true, T, Closure<T>>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

type ChannelValue<T, E> = MainThreadSafe<false, (T, fn(&T, E)), E>;

pub struct Channel<T: 'static, E: 'static>(ChannelValue<T, E>);

impl<T, E> Channel<T, E> {
pub fn new(value: T, handler: fn(&T, E)) -> Option<Self> {
MainThreadSafe::new((value, handler), |runner, event| {
let lock = runner.read().unwrap();
let (value, handler) = lock.as_ref().unwrap();
handler(value, event);
})
.map(Self)
}
}

impl<T, E> Clone for Channel<T, E> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

impl<T, E> Deref for Channel<T, E> {
type Target = ChannelValue<T, E>;

fn deref(&self) -> &Self::Target {
&self.0
}
}
Loading

0 comments on commit eb4a545

Please sign in to comment.