Skip to content

Commit

Permalink
Worked around closure reference bug
Browse files Browse the repository at this point in the history
  • Loading branch information
w-henderson committed Feb 7, 2022
1 parent 6ce5044 commit 5feddf0
Showing 1 changed file with 22 additions and 26 deletions.
48 changes: 22 additions & 26 deletions humphrey-ws/src/async_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,15 @@ where
/// Represents an asynchronous WebSocket stream.
pub struct AsyncStream {
addr: SocketAddr,
message_queue: Vec<Message>,
message_queue: Arc<Mutex<Vec<Message>>>,
connected: bool,
}

pub trait EventHandler<S>: Fn(&mut AsyncStream, Arc<S>) + Send + Sync + 'static {}
impl<T, S> EventHandler<S> for T where T: Fn(&mut AsyncStream, Arc<S>) + Send + Sync + 'static {}
pub trait EventHandler<S>: Fn(AsyncStream, Arc<S>) + Send + Sync + 'static {}
impl<T, S> EventHandler<S> for T where T: Fn(AsyncStream, Arc<S>) + Send + Sync + 'static {}

pub trait MessageHandler<S>: Fn(&mut AsyncStream, Message, Arc<S>) + Send + Sync + 'static {}
impl<T, S> MessageHandler<S> for T where
T: Fn(&mut AsyncStream, Message, Arc<S>) + Send + Sync + 'static
{
}
pub trait MessageHandler<S>: Fn(AsyncStream, Message, Arc<S>) + Send + Sync + 'static {}
impl<T, S> MessageHandler<S> for T where T: Fn(AsyncStream, Message, Arc<S>) + Send + Sync + 'static {}

impl<State> AsyncWebsocketApp<State>
where
Expand Down Expand Up @@ -90,19 +87,21 @@ where

match stream.recv_nonblocking() {
Restion::Ok(message) => {
let mut async_stream = AsyncStream::new(addr);
let messages = Arc::new(Mutex::new(Vec::new()));
let async_stream = AsyncStream::new(addr, messages.clone());
if let Some(handler) = &self.on_message {
handler(&mut async_stream, message, self.state.clone());
handler(async_stream, message, self.state.clone());
}

for message in async_stream.into_inner() {
for message in messages.lock().unwrap().drain(..) {
stream.send(message).unwrap();
}
}
Restion::Err(_) => {
let mut async_stream = AsyncStream::disconnected(addr);
let messages = Arc::new(Mutex::new(Vec::new()));
let async_stream = AsyncStream::disconnected(addr, messages.clone());
if let Some(handler) = &self.on_disconnect {
handler(&mut async_stream, self.state.clone())
handler(async_stream, self.state.clone());
}

self.streams.remove(&addr);
Expand All @@ -119,12 +118,13 @@ where
.try_iter()
.filter_map(|s| s.peer_addr().map(|a| (a, s)).ok())
{
let mut async_stream = AsyncStream::new(addr);
let messages = Arc::new(Mutex::new(Vec::new()));
let async_stream = AsyncStream::new(addr, messages.clone());
if let Some(handler) = &self.on_connect {
handler(&mut async_stream, self.state.clone());
handler(async_stream, self.state.clone());
}

for message in async_stream.into_inner() {
for message in messages.lock().unwrap().drain(..) {
stream.send(message).unwrap();
}

Expand All @@ -135,18 +135,18 @@ where
}

impl AsyncStream {
pub fn new(addr: SocketAddr) -> Self {
pub fn new(addr: SocketAddr, messages: Arc<Mutex<Vec<Message>>>) -> Self {
Self {
addr,
message_queue: vec![],
message_queue: messages,
connected: true,
}
}

pub fn disconnected(addr: SocketAddr) -> Self {
pub fn disconnected(addr: SocketAddr, messages: Arc<Mutex<Vec<Message>>>) -> Self {
Self {
addr,
message_queue: vec![],
message_queue: messages,
connected: false,
}
}
Expand All @@ -155,12 +155,8 @@ impl AsyncStream {
self.addr
}

pub fn send(&mut self, message: Message) {
pub fn send(&self, message: Message) {
assert!(self.connected);
self.message_queue.push(message);
}

fn into_inner(self) -> Vec<Message> {
self.message_queue
self.message_queue.lock().unwrap().push(message);
}
}

0 comments on commit 5feddf0

Please sign in to comment.