Skip to content

Commit

Permalink
refactor(net): clarify io_util code a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
CBenoit committed Oct 4, 2023
1 parent 110c78d commit f91b372
Showing 1 changed file with 16 additions and 19 deletions.
35 changes: 16 additions & 19 deletions crates/net/src/websocket/io_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ impl WebSocket {
}
}

macro_rules! try_in_poll_io {
($expr:expr) => {{
match $expr {
Ok(o) => o,
// WebSocket is closed, nothing more to read or write
Err(WebSocketError::ConnectionClose(event)) if event.was_clean => {
return Poll::Ready(Ok(0));
}
Err(e) => return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))),
}
}};
}

#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
impl AsyncRead for WebSocket {
fn poll_read(
Expand All @@ -33,14 +46,10 @@ impl AsyncRead for WebSocket {
data
} else {
match ready!(self.as_mut().poll_next(cx)) {
Some(Ok(m)) => match m {
Some(item) => match try_in_poll_io!(item) {
WebSocketMessage::Text(s) => s.into_bytes(),
WebSocketMessage::Bytes(data) => data,
},
Some(Err(WebSocketError::ConnectionClose(event))) if event.was_clean == true => {
return Poll::Ready(Ok(0));
}
Some(Err(e)) => return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))),
None => return Poll::Ready(Ok(0)),
}
};
Expand All @@ -64,26 +73,14 @@ impl AsyncWrite for WebSocket {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
macro_rules! try_in_poll {
($expr:expr) => {{
match $expr {
Ok(o) => o,
// When using `AsyncWriteExt::write_all`, `io::ErrorKind::WriteZero` will be raised.
// In this case it means "attempted to write on a closed socket".
Err(WebSocketError::ConnectionClose(_)) => return Poll::Ready(Ok(0)),
Err(e) => return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))),
}
}};
}

// try flushing preemptively
let _ = AsyncWrite::poll_flush(self.as_mut(), cx);

// make sure sink is ready to send
try_in_poll!(ready!(self.as_mut().poll_ready(cx)));
try_in_poll_io!(ready!(self.as_mut().poll_ready(cx)));

// actually submit new item
try_in_poll!(self.start_send(WebSocketMessage::Bytes(buf.to_vec())));
try_in_poll_io!(self.start_send(WebSocketMessage::Bytes(buf.to_vec())));
// ^ if no error occurred, message is accepted and queued when calling `start_send`
// (i.e.: `to_vec` is called only once)

Expand Down

0 comments on commit f91b372

Please sign in to comment.