diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c916609..d1972715 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ # Unreleased (0.20.2) +- Fix read-predominant auto pong responses not flushing when hitting WouldBlock errors. - Improve `FrameHeader::format` write correctness. - Up minimum _rustls_ to `0.21.6`. diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index dd8e279d..21c996a9 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -13,13 +13,10 @@ use self::{ }, message::{IncompleteMessage, IncompleteMessageType}, }; -use crate::{ - error::{Error, ProtocolError, Result}, - util::NonBlockingResult, -}; +use crate::error::{Error, ProtocolError, Result}; use log::*; use std::{ - io::{ErrorKind as IoErrorKind, Read, Write}, + io::{self, Read, Write}, mem::replace, }; @@ -313,6 +310,9 @@ pub struct WebSocketContext { incomplete: Option, /// Send in addition to regular messages E.g. "pong" or "close". additional_send: Option, + /// True indicates there is an additional message (like a pong) + /// that failed to flush previously and we should try again. + unflushed_additional: bool, /// The configuration for the websocket session. config: WebSocketConfig, } @@ -344,6 +344,7 @@ impl WebSocketContext { state: WebSocketState::Active, incomplete: None, additional_send: None, + unflushed_additional: false, config, } } @@ -391,10 +392,16 @@ impl WebSocketContext { self.state.check_not_terminated()?; loop { - if self.additional_send.is_some() { + if self.additional_send.is_some() || self.unflushed_additional { // Since we may get ping or close, we need to reply to the messages even during read. - // Thus we flush but ignore its blocking. - self.flush(stream).no_block()?; + match self.flush(stream) { + Ok(_) => {} + Err(Error::Io(err)) if err.kind() == io::ErrorKind::WouldBlock => { + // If blocked continue reading, but try again later + self.unflushed_additional = true; + } + Err(err) => return Err(err), + } } else if self.role == Role::Server && !self.state.can_read() { self.state = WebSocketState::Terminated; return Err(Error::ConnectionClosed); @@ -462,7 +469,9 @@ impl WebSocketContext { { self._write(stream, None)?; self.frame.write_out_buffer(stream)?; - Ok(stream.flush()?) + stream.flush()?; + self.unflushed_additional = false; + Ok(()) } /// Writes any data in the out_buffer, `additional_send` and given `data`. @@ -495,7 +504,7 @@ impl WebSocketContext { Ok(_) => true, } } else { - false + self.unflushed_additional }; // If we're closing and there is nothing to send anymore, we should close the connection. @@ -774,7 +783,7 @@ impl CheckConnectionReset for Result { fn check_connection_reset(self, state: WebSocketState) -> Self { match self { Err(Error::Io(io_error)) => Err({ - if !state.can_read() && io_error.kind() == IoErrorKind::ConnectionReset { + if !state.can_read() && io_error.kind() == io::ErrorKind::ConnectionReset { Error::ConnectionClosed } else { Error::Io(io_error) diff --git a/tests/auto_pong_flush.rs b/tests/auto_pong_flush.rs new file mode 100644 index 00000000..ccddb11e --- /dev/null +++ b/tests/auto_pong_flush.rs @@ -0,0 +1,124 @@ +use std::{ + io::{self, Cursor, Read, Write}, + mem, +}; +use tungstenite::{ + protocol::frame::{ + coding::{Control, OpCode}, + Frame, FrameHeader, + }, + Message, WebSocket, +}; + +const NUMBER_OF_FLUSHES_TO_GET_IT_TO_WORK: usize = 3; + +/// `Read`/`Write` mock. +/// * Reads a single ping, then returns `WouldBlock` forever after. +/// * Writes work fine. +/// * Flush `WouldBlock` twice then works on the 3rd attempt. +#[derive(Debug, Default)] +struct MockWrite { + /// Data written, but not flushed. + written_data: Vec, + /// The latest successfully flushed data. + flushed_data: Vec, + write_calls: usize, + flush_calls: usize, + read_calls: usize, +} + +impl Read for MockWrite { + fn read(&mut self, mut buf: &mut [u8]) -> io::Result { + self.read_calls += 1; + if self.read_calls == 1 { + let ping = Frame::ping(vec![]); + let len = ping.len(); + ping.format(&mut buf).expect("format failed"); + Ok(len) + } else { + Err(io::Error::new(io::ErrorKind::WouldBlock, "nothing else to read")) + } + } +} +impl Write for MockWrite { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.write_calls += 1; + self.written_data.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.flush_calls += 1; + if self.flush_calls % NUMBER_OF_FLUSHES_TO_GET_IT_TO_WORK == 0 { + mem::swap(&mut self.written_data, &mut self.flushed_data); + self.written_data.clear(); + eprintln!("flush success"); + Ok(()) + } else { + eprintln!("flush would block"); + Err(io::Error::new(io::ErrorKind::WouldBlock, "try again")) + } + } +} + +/// Test for auto pong write & flushing behaviour. +/// +/// In read-only/read-predominant usage auto pong responses should be written and flushed +/// even if WouldBlock errors are encountered. +#[test] +fn read_usage_auto_pong_flush() { + let mut ws = + WebSocket::from_raw_socket(MockWrite::default(), tungstenite::protocol::Role::Client, None); + + // Receiving a ping should auto scheduled a pong on next read or write (but not written yet). + let msg = ws.read().unwrap(); + assert!(matches!(msg, Message::Ping(_)), "Unexpected msg {:?}", msg); + assert_eq!(ws.get_ref().read_calls, 1); + assert!(ws.get_ref().written_data.is_empty(), "Unexpected {:?}", ws.get_ref()); + assert!(ws.get_ref().flushed_data.is_empty(), "Unexpected {:?}", ws.get_ref()); + + // Next read fails as there is nothing else to read. + // This read call should have tried to write & flush a pong response, with the flush WouldBlock-ing + let next = ws.read().unwrap_err(); + assert!( + matches!(next, tungstenite::Error::Io(ref err) if err.kind() == io::ErrorKind::WouldBlock), + "Unexpected read err {:?}", + next + ); + assert_eq!(ws.get_ref().read_calls, 2); + assert!(!ws.get_ref().written_data.is_empty(), "Should have written a pong frame"); + assert_eq!(ws.get_ref().write_calls, 1); + + let pong_header = + FrameHeader::parse(&mut Cursor::new(&ws.get_ref().written_data)).unwrap().unwrap().0; + assert_eq!(pong_header.opcode, OpCode::Control(Control::Pong)); + let written_data = ws.get_ref().written_data.clone(); + + assert_eq!(ws.get_ref().flush_calls, 1); + assert!(ws.get_ref().flushed_data.is_empty(), "Unexpected {:?}", ws.get_ref()); + + // Next read fails as before. + // This read call should try to flush the pong again, which again WouldBlock + let next = ws.read().unwrap_err(); + assert!( + matches!(next, tungstenite::Error::Io(ref err) if err.kind() == io::ErrorKind::WouldBlock), + "Unexpected read err {:?}", + next + ); + assert_eq!(ws.get_ref().read_calls, 3); + assert_eq!(ws.get_ref().write_calls, 1); + assert_eq!(ws.get_ref().flush_calls, 2); + assert!(ws.get_ref().flushed_data.is_empty(), "Unexpected {:?}", ws.get_ref()); + + // Next read fails as before. + // This read call should try to flush the pong again, 3rd flush attempt is the charm + let next = ws.read().unwrap_err(); + assert!( + matches!(next, tungstenite::Error::Io(ref err) if err.kind() == io::ErrorKind::WouldBlock), + "Unexpected read err {:?}", + next + ); + assert_eq!(ws.get_ref().read_calls, 4); + assert_eq!(ws.get_ref().write_calls, 1); + assert_eq!(ws.get_ref().flush_calls, 3); + assert!(ws.get_ref().flushed_data == written_data, "Unexpected {:?}", ws.get_ref()); +}