diff --git a/.github/workflows/osx.yml b/.github/workflows/osx.yml index d575018..fcc0634 100644 --- a/.github/workflows/osx.yml +++ b/.github/workflows/osx.yml @@ -11,26 +11,30 @@ jobs: - stable - nightly - name: ${{ matrix.version }} - x86_64-apple-darwin + name: ${{ matrix.version }} - aarch64-apple-darwin runs-on: macOS-latest steps: - uses: actions/checkout@master - name: Install ${{ matrix.version }} - uses: actions-rs/toolchain@v1 + uses: actions-rust-lang/setup-rust-toolchain@v1 with: - toolchain: ${{ matrix.version }}-x86_64-apple-darwin + toolchain: ${{ matrix.version }}-aarch64-apple-darwin profile: minimal override: true - - name: Generate Cargo.lock - uses: actions-rs/cargo@v1 + - name: Cache cargo registry + uses: actions/cache@v4 with: - command: generate-lockfile + path: ~/.cargo/registry + key: ${{ matrix.version }}-aarch64-apple-darwin-cargo-registry-trimmed-${{ hashFiles('**/Cargo.lock') }} - - name: Cache Dependencies - uses: Swatinem/rust-cache@v1.0.1 + - name: Cache cargo index + uses: actions/cache@v4 + with: + path: ~/.cargo/git + key: ${{ matrix.version }}-aarch64-apple-darwin-cargo-index-trimmed-${{ hashFiles('**/Cargo.lock') }} - name: Run tests uses: actions-rs/cargo@v1 diff --git a/CHANGES.md b/CHANGES.md index 20ece44..8a3fef9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,6 @@ # Changes -## [2.0.0] - 2024-04-1x +## [2.0.0] - 2024-05-1x * Mark `Control` type as `non exhaustive` @@ -8,6 +8,8 @@ * Remove protocol variant services +* Disable keep-alive timer is not configured + ## [1.1.0] - 2024-03-07 * Use MqttService::connect_timeout() only for reading protocol version diff --git a/Cargo.toml b/Cargo.toml index 4d1e9fe..cc381a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ features = ["ntex/tokio"] [dependencies] ntex = "1.2" +ntex-io = "1.2" bitflags = "2" log = "0.4" pin-project-lite = "0.2" diff --git a/src/io.rs b/src/io.rs index 0eabe57..2895e44 100644 --- a/src/io.rs +++ b/src/io.rs @@ -121,6 +121,7 @@ where queue: VecDeque::new(), })); let pool = io.memory_pool().pool(); + let keepalive_timeout = config.keepalive_timeout(); Dispatcher { codec, @@ -131,13 +132,17 @@ where inner: DispatcherInner { io, state, - flags: Flags::empty(), + keepalive_timeout, + flags: if keepalive_timeout.is_zero() { + Flags::KA_ENABLED + } else { + Flags::empty() + }, config: config.clone(), st: IoDispatcherState::Processing, read_remains: 0, read_remains_prev: 0, read_max_timeout: Seconds::ZERO, - keepalive_timeout: config.keepalive_timeout(), }, } } @@ -552,24 +557,25 @@ where self.io.start_timer(timeout); return Ok(()); } - log::trace!("{}: Max payload timeout has been reached", self.io.tag()); } + log::trace!("{}: Max payload timeout has been reached", self.io.tag()); return Err(DispatchItem::ReadTimeout); } + } else if self.flags.contains(Flags::KA_TIMEOUT) { + log::trace!("{}: Keep-alive error, stopping dispatcher", self.io.tag()); + return Err(DispatchItem::KeepAliveTimeout); } - - log::trace!("{}: Keep-alive error, stopping dispatcher", self.io.tag()); - Err(DispatchItem::KeepAliveTimeout) + Ok(()) } } #[cfg(test)] mod tests { - use std::{cell::Cell, sync::Arc, sync::Mutex}; + use std::{cell::Cell, io, sync::Arc, sync::Mutex}; use ntex::channel::condition::Condition; use ntex::time::{sleep, Millis}; - use ntex::util::Bytes; + use ntex::util::{Bytes, BytesMut}; use ntex::{codec::BytesCodec, io as nio, service::ServiceCtx, testing::Io}; use super::*; @@ -587,9 +593,18 @@ mod tests { codec: U, service: F, ) -> (Self, nio::IoRef) { - let keepalive_timeout = Seconds(30); + Self::new_debug_cfg(io, codec, DispatcherConfig::default(), service) + } + + /// Construct new `Dispatcher` instance + pub(crate) fn new_debug_cfg>>( + io: nio::Io, + codec: U, + config: DispatcherConfig, + service: F, + ) -> (Self, nio::IoRef) { + let keepalive_timeout = config.keepalive_timeout(); let rio = io.get_ref(); - let config = DispatcherConfig::default(); let state = Rc::new(RefCell::new(DispatcherState { error: None, @@ -610,7 +625,11 @@ mod tests { keepalive_timeout, io: IoBoxed::from(io), st: IoDispatcherState::Processing, - flags: Flags::KA_ENABLED, + flags: if keepalive_timeout.is_zero() { + Flags::empty() + } else { + Flags::KA_ENABLED + }, read_remains: 0, read_remains_prev: 0, read_max_timeout: Seconds::ZERO, @@ -920,4 +939,138 @@ mod tests { assert!(!client.is_closed()); assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 0, 0]); } + + #[derive(Debug, Copy, Clone)] + struct BytesLenCodec(usize); + + impl Encoder for BytesLenCodec { + type Item = Bytes; + type Error = io::Error; + + #[inline] + fn encode(&self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> { + dst.extend_from_slice(&item[..]); + Ok(()) + } + } + + impl Decoder for BytesLenCodec { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&self, src: &mut BytesMut) -> Result, Self::Error> { + if src.len() >= self.0 { + Ok(Some(src.split_to(self.0))) + } else { + Ok(None) + } + } + } + + /// Do not use keep-alive timer if not configured + #[ntex::test] + async fn test_no_keepalive_err_after_frame_timeout() { + env_logger::init(); + let (client, server) = Io::create(); + client.remote_buffer_cap(1024); + + let data = Arc::new(Mutex::new(RefCell::new(Vec::new()))); + let data2 = data.clone(); + + let config = DispatcherConfig::default(); + config.set_keepalive_timeout(Seconds(0)).set_frame_read_rate(Seconds(1), Seconds(2), 2); + + let (disp, _) = Dispatcher::new_debug_cfg( + nio::Io::new(server), + BytesLenCodec(2), + config, + ntex::service::fn_service(move |msg: DispatchItem| { + let data = data2.clone(); + async move { + match msg { + DispatchItem::Item(bytes) => { + data.lock().unwrap().borrow_mut().push(0); + return Ok::<_, ()>(Some(bytes.freeze())); + } + DispatchItem::KeepAliveTimeout => { + data.lock().unwrap().borrow_mut().push(1); + } + _ => (), + } + Ok(None) + } + }), + ); + ntex::rt::spawn(async move { + let _ = disp.await; + }); + + client.write("1"); + sleep(Millis(250)).await; + client.write("2"); + let buf = client.read().await.unwrap(); + assert_eq!(buf, Bytes::from_static(b"12")); + sleep(Millis(2000)).await; + + assert_eq!(&data.lock().unwrap().borrow()[..], &[0]); + } + + #[ntex::test] + async fn test_read_timeout() { + let (client, server) = Io::create(); + client.remote_buffer_cap(1024); + + let data = Arc::new(Mutex::new(RefCell::new(Vec::new()))); + let data2 = data.clone(); + + let config = DispatcherConfig::default(); + config.set_keepalive_timeout(Seconds::ZERO).set_frame_read_rate( + Seconds(1), + Seconds(2), + 2, + ); + + let (disp, state) = Dispatcher::new_debug_cfg( + nio::Io::new(server), + BytesLenCodec(8), + config, + ntex::service::fn_service(move |msg: DispatchItem| { + let data = data2.clone(); + async move { + match msg { + DispatchItem::Item(bytes) => { + data.lock().unwrap().borrow_mut().push(0); + return Ok::<_, ()>(Some(bytes.freeze())); + } + DispatchItem::ReadTimeout => { + data.lock().unwrap().borrow_mut().push(1); + } + _ => (), + } + Ok(None) + } + }), + ); + ntex::rt::spawn(async move { + let _ = disp.await; + }); + + client.write("12345678"); + let buf = client.read().await.unwrap(); + assert_eq!(buf, Bytes::from_static(b"12345678")); + + client.write("1"); + sleep(Millis(1000)).await; + assert!(!state.flags().contains(nio::Flags::IO_STOPPING)); + client.write("23"); + sleep(Millis(1000)).await; + assert!(!state.flags().contains(nio::Flags::IO_STOPPING)); + client.write("4"); + sleep(Millis(2000)).await; + + // write side must be closed, dispatcher should fail with keep-alive + assert!(state.flags().contains(nio::Flags::IO_STOPPING)); + assert!(client.is_closed()); + assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1]); + } } diff --git a/src/v5/dispatcher.rs b/src/v5/dispatcher.rs index 2d617ef..138211b 100644 --- a/src/v5/dispatcher.rs +++ b/src/v5/dispatcher.rs @@ -548,6 +548,7 @@ where Err(err) => { // do not handle nested error if error { + inner.sink.drop_sink(); return Err(err); } else { // handle error from control service @@ -562,20 +563,19 @@ where } }; - if error { + let response = if error { if let Some(pkt) = result.packet { let _ = inner.sink.encode_packet(pkt); } - if result.disconnect { - inner.sink.drop_sink(); - } Ok(None) } else { - if result.disconnect { - inner.sink.drop_sink(); - } Ok(result.packet) + }; + + if result.disconnect { + inner.sink.drop_sink(); } + response } #[cfg(test)]