Skip to content

Commit

Permalink
Continuous Websocket Message Reading on Readiness Event (#2852)
Browse files Browse the repository at this point in the history
* ws connection read messages fix

* fix fmt

* optim handling

* close ws when read error

* optimize error handling

---------

Co-authored-by: Kasper Ziemianek <kasper.ziemianek@gmail.com>
Co-authored-by: Jayanring <junjie@liteng.io>
  • Loading branch information
3 people authored Jul 3, 2024
1 parent e5f9e40 commit 6691a68
Showing 1 changed file with 40 additions and 19 deletions.
59 changes: 40 additions & 19 deletions bitacross-worker/core/tls-websocket-server/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ use mio::{event::Event, net::TcpStream, Poll, Ready, Token};
use rustls::{ServerSession, Session};
use std::{
format,
io::ErrorKind,
string::{String, ToString},
sync::Arc,
time::Instant,
vec,
};
use tungstenite::Message;

Expand Down Expand Up @@ -131,31 +133,51 @@ where
/// Read from a web-socket, or initiate handshake if websocket is not initialized yet.
///
/// Returns a boolean 'connection should be closed'.
fn read_or_initialize_websocket(&mut self) -> WebSocketResult<bool> {
fn drain_message_or_initialize_websocket(&mut self) -> WebSocketResult<bool> {
if let StreamState::Established(web_socket) = &mut self.stream_state {
trace!(
"Read is possible for connection {}: {}",
self.connection_token.0,
web_socket.can_read()
);
match web_socket.read_message() {
Ok(m) =>
if let Err(e) = self.handle_message(m) {
error!(
"Failed to handle web-socket message (connection {}): {:?}",
self.connection_token.0, e
);

let mut messages = vec![];
let mut is_closing = false;

// Looping over 'read_message' is merely a workaround for the unexpected behavior of mio event triggering.
// Final solution will be applied in P-907.
loop {
match web_socket.read_message() {
Ok(m) => messages.push(m),
Err(e) => {
match e {
tungstenite::Error::Io(e)
if matches!(e.kind(), ErrorKind::WouldBlock) => {}, // no message to read
_ => {
trace!(
"Failed to read message from web-socket (connection {}): {:?}",
self.connection_token.0,
e
);
is_closing = true;
},
}
break
},
Err(e) => match e {
tungstenite::Error::ConnectionClosed => return Ok(true),
tungstenite::Error::AlreadyClosed => return Ok(true),
_ => error!(
"Failed to read message from web-socket (connection {}): {:?}",
self.connection_token.0, e
),
},
}
}

messages.into_iter().for_each(|m| {
if let Err(e) = self.handle_message(m) {
error!(
"Failed to handle web-socket message (connection {}): {:?}",
self.connection_token.0, e
);
}
});

trace!("Read successful for connection {}", self.connection_token.0);
Ok(is_closing)
} else {
trace!("Initialize connection {}", self.connection_token.0);
self.stream_state = std::mem::take(&mut self.stream_state).attempt_handshake();
Expand All @@ -164,9 +186,8 @@ where
return Ok(true)
}
debug!("Initialized connection {} successfully", self.connection_token.0);
Ok(false)
}

Ok(false)
}

fn handle_message(&mut self, message: Message) -> WebSocketResult<()> {
Expand Down Expand Up @@ -283,7 +304,7 @@ where
let connection_state = self.maybe_do_tls_read();

if connection_state.is_alive() {
is_closing = self.read_or_initialize_websocket()?;
is_closing = self.drain_message_or_initialize_websocket()?;
} else {
is_closing = connection_state.is_closing();
}
Expand Down

0 comments on commit 6691a68

Please sign in to comment.