diff --git a/CHANGES.md b/CHANGES.md index e008f9e..4444c67 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [4.1.0] - 2024-10-10 + +* Do not check readiness for call + +* Handle service readiness errors during shutdown + ## [4.0.0] - 2024-10-05 * Middlewares support for mqtt server diff --git a/Cargo.toml b/Cargo.toml index e4f2336..cd21479 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-mqtt" -version = "4.0.0" +version = "4.1.0" authors = ["ntex contributors "] description = "Client and Server framework for MQTT v5 and v3.1.1 protocols" documentation = "https://docs.rs/ntex-mqtt" diff --git a/src/io.rs b/src/io.rs index e6a722f..8ae11de 100644 --- a/src/io.rs +++ b/src/io.rs @@ -2,7 +2,6 @@ use std::task::{ready, Context, Poll}; use std::{cell::RefCell, collections::VecDeque, future::Future, pin::Pin, rc::Rc}; -use ntex_bytes::Pool; use ntex_codec::{Decoder, Encoder}; use ntex_io::{ Decoded, DispatchItem, DispatcherConfig, IoBoxed, IoRef, IoStatusUpdate, RecvError, @@ -22,8 +21,7 @@ pin_project_lite::pin_project! { U: Decoder, U: 'static, { - inner: DispatcherInner, - pool: Pool, + inner: DispatcherInner } } @@ -122,11 +120,9 @@ where base: 0, queue: VecDeque::new(), })); - let pool = io.memory_pool().pool(); let keepalive_timeout = config.keepalive_timeout(); Dispatcher { - pool, inner: DispatcherInner { io, codec, @@ -249,14 +245,6 @@ where } } - // handle memory pool pressure - if this.pool.poll_ready(cx).is_pending() { - inner.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT); - inner.io.stop_timer(); - inner.io.pause(); - return Poll::Pending; - } - loop { match inner.st { IoDispatcherState::Processing => { @@ -312,9 +300,7 @@ where IoDispatcherState::Backpressure => { match ready!(inner.poll_service(cx)) { PollService::Ready => (), - PollService::Item(item) => { - inner.call_service(cx, item); - } + PollService::Item(item) => inner.call_service(cx, item), PollService::Continue => continue, }; @@ -334,7 +320,11 @@ where // service may relay on poll_ready for response results if !inner.flags.contains(Flags::READY_ERR) { - let _ = inner.service.poll_ready(cx); + if let Poll::Ready(res) = inner.service.poll_ready(cx) { + if res.is_err() { + inner.flags.insert(Flags::READY_ERR); + } + } } if inner.state.borrow().queue.is_empty() { @@ -394,7 +384,7 @@ where { fn call_service(&mut self, cx: &mut Context<'_>, item: DispatchItem) { let mut state = self.state.borrow_mut(); - let mut fut = self.service.call(item); + let mut fut = self.service.call_nowait(item); // optimize first call if self.response.is_none() { @@ -429,8 +419,7 @@ where let codec = self.codec.clone(); let state = self.state.clone(); - #[allow(clippy::let_underscore_future)] - let _ = ntex_util::spawn(async move { + ntex_util::spawn(async move { let item = fut.await; state.borrow_mut().handle_result(item, response_idx, &st, &codec, true); }); @@ -502,8 +491,8 @@ where Poll::Ready(Err(err)) => { log::trace!("{}: Service readiness check failed, stopping", self.io.tag()); self.st = IoDispatcherState::Stop; - self.state.borrow_mut().error = Some(IoDispatcherError::Service(err)); self.flags.insert(Flags::READY_ERR); + self.state.borrow_mut().error = Some(IoDispatcherError::Service(err)); Poll::Ready(PollService::Continue) } } @@ -629,7 +618,6 @@ mod tests { ( Dispatcher { - pool: io.memory_pool().pool(), inner: DispatcherInner { codec, state,