Skip to content

Commit

Permalink
Migrate to ntex-0.7 (#142)
Browse files Browse the repository at this point in the history
* Migrate to ntex-0.7
  • Loading branch information
fafhrd91 authored Jun 16, 2023
1 parent 7902800 commit a0f359d
Show file tree
Hide file tree
Showing 23 changed files with 433 additions and 322 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.11.0-beta.0] - 2023-06-17

* Migrate to ntex-0.7

## [0.10.4] - 2023-05-12

* Expose size of prepared packet
Expand Down
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "0.10.4"
version = "0.11.0-beta.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand All @@ -12,7 +12,7 @@ exclude = [".gitignore", ".travis.yml", ".cargo/config"]
edition = "2018"

[dependencies]
ntex = "0.6.5"
ntex = "0.7.0-beta.0"
bitflags = "1.3"
log = "0.4"
pin-project-lite = "0.2"
Expand All @@ -22,11 +22,11 @@ thiserror = "1.0"

[dev-dependencies]
env_logger = "0.10"
ntex-tls = "0.2.1"
rustls = "0.20"
ntex-tls = "0.3.0-beta.0"
rustls = "0.21"
rustls-pemfile = "1.0"
openssl = "0.10"
ntex = { version = "0.6.3", features = ["tokio", "rustls", "openssl"] }
ntex = { version = "0.7.0-beta.0", features = ["tokio", "rustls", "openssl"] }
test-case = "3"

[profile.dev]
Expand Down
45 changes: 28 additions & 17 deletions src/inflight.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Service that limits number of in-flight async requests.
use std::{cell::Cell, future::Future, marker, pin::Pin, rc::Rc, task::Context, task::Poll};

use ntex::{service::Service, task::LocalWaker};
use ntex::{service::Ctx, service::Service, service::ServiceCall, task::LocalWaker};

pub(crate) trait SizedRequest {
fn size(&self) -> u32;
Expand Down Expand Up @@ -45,12 +45,12 @@ where
}

#[inline]
fn call(&self, req: R) -> Self::Future<'_> {
fn call<'a>(&'a self, req: R, ctx: Ctx<'a, Self>) -> Self::Future<'a> {
let size = if self.count.0.max_size > 0 { req.size() } else { 0 };
InFlightServiceResponse {
_guard: self.count.get(size),
_t: marker::PhantomData,
fut: self.service.call(req),
fut: ctx.call(&self.service, req),
}
}
}
Expand All @@ -61,7 +61,7 @@ pin_project_lite::pin_project! {
where T: 'f, R: 'f
{
#[pin]
fut: T::Future<'f>,
fut: ServiceCall<'f, T, R>,
_guard: CounterGuard,
_t: marker::PhantomData<R>
}
Expand Down Expand Up @@ -155,10 +155,10 @@ impl CounterInner {

#[cfg(test)]
mod tests {
use std::{cell::Cell, rc::Rc, task::Poll, time::Duration};
use std::{cell::Cell, task::Poll, time::Duration};

use ntex::util::{lazy, poll_fn, BoxFuture};
use ntex::{service::Service, time::sleep};
use ntex::{service::Container, service::Service, time::sleep};

use super::*;

Expand All @@ -169,7 +169,7 @@ mod tests {
type Error = ();
type Future<'f> = BoxFuture<'f, Result<(), ()>>;

fn call(&self, _: ()) -> Self::Future<'_> {
fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> {
let fut = sleep(self.0);
Box::pin(async move {
let _ = fut.await;
Expand All @@ -188,13 +188,17 @@ mod tests {
async fn test_inflight() {
let wait_time = Duration::from_millis(50);

let srv = InFlightService::new(1, 0, SleepService(wait_time));
let srv = Container::new(InFlightService::new(1, 0, SleepService(wait_time)));
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));

let res = srv.call(());
let srv2 = srv.clone();
ntex::rt::spawn(async move {
let _ = srv2.call(()).await;
});
ntex::time::sleep(Duration::from_millis(25)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);

let _ = res.await;
ntex::time::sleep(Duration::from_millis(50)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(lazy(|cx| srv.poll_shutdown(cx)).await.is_ready());
}
Expand All @@ -203,13 +207,17 @@ mod tests {
async fn test_inflight2() {
let wait_time = Duration::from_millis(50);

let srv = InFlightService::new(0, 10, SleepService(wait_time));
let srv = Container::new(InFlightService::new(0, 10, SleepService(wait_time)));
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));

let res = srv.call(());
let srv2 = srv.clone();
ntex::rt::spawn(async move {
let _ = srv2.call(()).await;
});
ntex::time::sleep(Duration::from_millis(25)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);

let _ = res.await;
ntex::time::sleep(Duration::from_millis(100)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
}

Expand All @@ -231,7 +239,7 @@ mod tests {
}
}

fn call(&self, _: ()) -> Self::Future<'_> {
fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> {
let fut = sleep(self.dur);
self.cnt.set(true);

Expand All @@ -250,14 +258,18 @@ mod tests {
async fn test_inflight3() {
let wait_time = Duration::from_millis(50);

let srv = Rc::new(InFlightService::new(
let srv = Container::new(InFlightService::new(
1,
10,
Srv2 { dur: wait_time, cnt: Cell::new(false) },
));
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));

let res = srv.call(());
let srv2 = srv.clone();
ntex::rt::spawn(async move {
let _ = srv2.call(()).await;
});
ntex::time::sleep(Duration::from_millis(25)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);

let srv2 = srv.clone();
Expand All @@ -267,7 +279,6 @@ mod tests {
let _ = tx.send(());
});

let _ = res.await;
let _ = rx.await;
}
}
18 changes: 11 additions & 7 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{cell::RefCell, collections::VecDeque, future::Future, mem, pin::Pin, r

use ntex::codec::{Decoder, Encoder};
use ntex::io::{DispatchItem, IoBoxed, IoRef, IoStatusUpdate, RecvError};
use ntex::service::{IntoService, Service};
use ntex::service::{Container, IntoService, Service, ServiceCall};
use ntex::time::Seconds;
use ntex::util::{ready, Pool};

Expand All @@ -21,11 +21,11 @@ pin_project_lite::pin_project! {
U: 'static,
{
codec: U,
service: Rc<S>,
service: Container<S>,
inner: DispatcherInner<S, U>,
pool: Pool,
#[pin]
response: Option<S::Future<'static>>,
response: Option<ServiceCall<'static, S, DispatchItem<U>>>,
response_idx: usize,
}
}
Expand Down Expand Up @@ -118,7 +118,7 @@ where
Dispatcher {
codec,
pool,
service: Rc::new(service.into_service()),
service: Container::new(service.into_service()),
response: None,
response_idx: 0,
inner: DispatcherInner {
Expand Down Expand Up @@ -509,7 +509,7 @@ mod tests {
use ntex::channel::condition::Condition;
use ntex::time::{sleep, Millis};
use ntex::util::{Bytes, Ready};
use ntex::{codec::BytesCodec, io as nio, testing::Io};
use ntex::{codec::BytesCodec, io as nio, service::Ctx, testing::Io};

use super::*;

Expand Down Expand Up @@ -539,7 +539,7 @@ mod tests {
(
Dispatcher {
codec,
service: Rc::new(service.into_service()),
service: Container::new(service.into_service()),
response: None,
response_idx: 0,
pool: io.memory_pool().pool(),
Expand Down Expand Up @@ -717,7 +717,11 @@ mod tests {
Poll::Ready(Err(()))
}

fn call(&self, _: DispatchItem<BytesCodec>) -> Self::Future<'_> {
fn call<'a>(
&'a self,
_: DispatchItem<BytesCodec>,
_: Ctx<'a, Self>,
) -> Self::Future<'a> {
Ready::Ok(None)
}
}
Expand Down
43 changes: 20 additions & 23 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::task::{Context, Poll};
use std::{convert::TryFrom, fmt, future::Future, io, marker, pin::Pin};

use ntex::io::{Filter, Io, IoBoxed, RecvError};
use ntex::service::{Service, ServiceFactory};
use ntex::service::{Ctx, Service, ServiceCall, ServiceFactory};
use ntex::time::{Deadline, Millis, Seconds};
use ntex::util::{join, ready, BoxFuture, Ready};

Expand Down Expand Up @@ -349,8 +349,9 @@ where
}

#[inline]
fn call(&self, req: IoBoxed) -> Self::Future<'_> {
fn call<'a>(&'a self, req: IoBoxed, ctx: Ctx<'a, Self>) -> Self::Future<'a> {
MqttServerImplResponse {
ctx,
state: MqttServerImplState::Version {
item: Some((req, VersionCodec, Deadline::new(self.handshake_timeout))),
},
Expand Down Expand Up @@ -380,8 +381,8 @@ where
}

#[inline]
fn call(&self, io: Io<F>) -> Self::Future<'_> {
Service::<IoBoxed>::call(self, IoBoxed::from(io))
fn call<'a>(&'a self, io: Io<F>, ctx: Ctx<'a, Self>) -> Self::Future<'a> {
Service::<IoBoxed>::call(self, IoBoxed::from(io), ctx)
}
}

Expand All @@ -404,6 +405,7 @@ pin_project_lite::pin_project! {
#[pin]
state: MqttServerImplState<'f, V3, V5>,
handlers: &'f (V3, V5),
ctx: Ctx<'f, MqttServerImpl<V3, V5, Err>>,
}
}

Expand All @@ -412,8 +414,8 @@ pin_project_lite::pin_project! {
pub(crate) enum MqttServerImplState<'f, V3: Service<(IoBoxed, Deadline)>, V5: Service<(IoBoxed, Deadline)>>
where V3: 'f, V5: 'f
{
V3 { #[pin] fut: V3::Future<'f> },
V5 { #[pin] fut: V5::Future<'f> },
V3 { #[pin] fut: ServiceCall<'f, V3, (IoBoxed, Deadline)> },
V5 { #[pin] fut: ServiceCall<'f, V5, (IoBoxed, Deadline)> },
Version { item: Option<(IoBoxed, VersionCodec, Deadline)> },
}
}
Expand All @@ -426,10 +428,10 @@ where
type Output = Result<(), MqttError<Err>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let mut this = self.as_mut().project();
let mut this = self.as_mut().project();

match this.state.project() {
loop {
match this.state.as_mut().project() {
MqttServerImplStateProject::V3 { fut } => return fut.poll(cx),
MqttServerImplStateProject::V5 { fut } => return fut.poll(cx),
MqttServerImplStateProject::Version { ref mut item } => {
Expand All @@ -442,19 +444,14 @@ where
return match ready!(st.0.poll_recv(&st.1, cx)) {
Ok(ver) => {
let (io, _, delay) = item.take().unwrap();
this = self.as_mut().project();
match ver {
ProtocolVersion::MQTT3 => {
this.state.set(MqttServerImplState::V3 {
fut: this.handlers.0.call((io, delay)),
})
}
ProtocolVersion::MQTT5 => {
this.state.set(MqttServerImplState::V5 {
fut: this.handlers.1.call((io, delay)),
})
}
}
this.state.set(match ver {
ProtocolVersion::MQTT3 => MqttServerImplState::V3 {
fut: this.ctx.call(&this.handlers.0, (io, delay)),
},
ProtocolVersion::MQTT5 => MqttServerImplState::V5 {
fut: this.ctx.call(&this.handlers.1, (io, delay)),
},
});
continue;
}
Err(RecvError::KeepAlive | RecvError::Stop) => {
Expand Down Expand Up @@ -506,7 +503,7 @@ impl<Err, InitErr> Service<(IoBoxed, Deadline)> for DefaultProtocolServer<Err, I
type Error = MqttError<Err>;
type Future<'f> = Ready<Self::Response, Self::Error> where Self: 'f;

fn call(&self, _: (IoBoxed, Deadline)) -> Self::Future<'_> {
fn call<'a>(&'a self, _: (IoBoxed, Deadline), _: Ctx<'a, Self>) -> Self::Future<'a> {
Ready::Err(MqttError::Disconnected(Some(io::Error::new(
io::ErrorKind::Other,
format!("Protocol is not supported: {:?}", self.ver),
Expand Down
Loading

0 comments on commit a0f359d

Please sign in to comment.