Skip to content

Commit

Permalink
Do not check readiness for call
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Oct 10, 2024
1 parent 001ae5f commit 5bf8f07
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 23 deletions.
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changes

## [4.1.0] - 2024-10-10

* Do not check readiness for call

* Handle service readiness errors during shutdown

## [4.0.0] - 2024-10-05

* Middlewares support for mqtt server
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "4.0.0"
version = "4.1.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand Down
32 changes: 10 additions & 22 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
use std::task::{ready, Context, Poll};
use std::{cell::RefCell, collections::VecDeque, future::Future, pin::Pin, rc::Rc};

use ntex_bytes::Pool;
use ntex_codec::{Decoder, Encoder};
use ntex_io::{
Decoded, DispatchItem, DispatcherConfig, IoBoxed, IoRef, IoStatusUpdate, RecvError,
Expand All @@ -22,8 +21,7 @@ pin_project_lite::pin_project! {
U: Decoder,
U: 'static,
{
inner: DispatcherInner<S, U>,
pool: Pool,
inner: DispatcherInner<S, U>
}
}

Expand Down Expand Up @@ -122,11 +120,9 @@ where
base: 0,
queue: VecDeque::new(),
}));
let pool = io.memory_pool().pool();
let keepalive_timeout = config.keepalive_timeout();

Dispatcher {
pool,
inner: DispatcherInner {
io,
codec,
Expand Down Expand Up @@ -249,14 +245,6 @@ where
}
}

// handle memory pool pressure
if this.pool.poll_ready(cx).is_pending() {
inner.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT);
inner.io.stop_timer();
inner.io.pause();
return Poll::Pending;
}

loop {
match inner.st {
IoDispatcherState::Processing => {
Expand Down Expand Up @@ -312,9 +300,7 @@ where
IoDispatcherState::Backpressure => {
match ready!(inner.poll_service(cx)) {
PollService::Ready => (),
PollService::Item(item) => {
inner.call_service(cx, item);
}
PollService::Item(item) => inner.call_service(cx, item),
PollService::Continue => continue,
};

Expand All @@ -334,7 +320,11 @@ where

// service may relay on poll_ready for response results
if !inner.flags.contains(Flags::READY_ERR) {
let _ = inner.service.poll_ready(cx);
if let Poll::Ready(res) = inner.service.poll_ready(cx) {
if res.is_err() {
inner.flags.insert(Flags::READY_ERR);
}
}
}

if inner.state.borrow().queue.is_empty() {
Expand Down Expand Up @@ -394,7 +384,7 @@ where
{
fn call_service(&mut self, cx: &mut Context<'_>, item: DispatchItem<U>) {
let mut state = self.state.borrow_mut();
let mut fut = self.service.call(item);
let mut fut = self.service.call_nowait(item);

// optimize first call
if self.response.is_none() {
Expand Down Expand Up @@ -429,8 +419,7 @@ where
let codec = self.codec.clone();
let state = self.state.clone();

#[allow(clippy::let_underscore_future)]
let _ = ntex_util::spawn(async move {
ntex_util::spawn(async move {
let item = fut.await;
state.borrow_mut().handle_result(item, response_idx, &st, &codec, true);
});
Expand Down Expand Up @@ -502,8 +491,8 @@ where
Poll::Ready(Err(err)) => {
log::trace!("{}: Service readiness check failed, stopping", self.io.tag());
self.st = IoDispatcherState::Stop;
self.state.borrow_mut().error = Some(IoDispatcherError::Service(err));
self.flags.insert(Flags::READY_ERR);
self.state.borrow_mut().error = Some(IoDispatcherError::Service(err));
Poll::Ready(PollService::Continue)
}
}
Expand Down Expand Up @@ -629,7 +618,6 @@ mod tests {

(
Dispatcher {
pool: io.memory_pool().pool(),
inner: DispatcherInner {
codec,
state,
Expand Down

0 comments on commit 5bf8f07

Please sign in to comment.