Skip to content

Commit

Permalink
Use ntex-service 3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed May 28, 2024
1 parent ec80347 commit cdad5f5
Show file tree
Hide file tree
Showing 54 changed files with 356 additions and 428 deletions.
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changes

## [3.0.0] - 2024-05-28

* Switch to individual ntex_* crates

* Use ntex-service 3.0

## [2.0.2] - 2024-05-15

* Remove non_exhaustive marker
Expand Down
16 changes: 11 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "2.0.2"
version = "3.0.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand All @@ -15,8 +15,13 @@ edition = "2021"
features = ["ntex/tokio"]

[dependencies]
ntex = "1.2"
ntex-io = "1.2"
ntex-io = "2"
ntex-net = "2"
ntex-util = "2"
ntex-service = "3"
ntex-bytes = "0.1"
ntex-codec = "0.6"
ntex-router = "0.5"
bitflags = "2"
log = "0.4"
pin-project-lite = "0.2"
Expand All @@ -27,7 +32,8 @@ thiserror = "1"
[dev-dependencies]
rand = "0.8"
env_logger = "0.11"
ntex-tls = "1.1"
ntex-tls = "2"
ntex-macros = "0.1"
openssl = "0.10"
test-case = "3.2"
ntex = { version = "1", features = ["tokio", "openssl"] }
ntex = { version = "2", features = ["tokio", "openssl"] }
11 changes: 7 additions & 4 deletions examples/mqtt-ws-client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Mqtt-over-WS client
use std::{io, rc::Rc};

use ntex::connect::{openssl::Connector, Connect, ConnectError};
use ntex::connect::{openssl::SslConnector, Connect, ConnectError};
use ntex::time::{sleep, Millis, Seconds};
use ntex::{util::Bytes, ws};
use ntex_mqtt::v3;
Expand All @@ -21,9 +21,12 @@ async fn main() -> std::io::Result<()> {

// we need custom connector that would open ws connection and enable ws transport
let ws_client = Rc::new(
ws::WsClient::with_connector("https://127.0.0.1:8883", Connector::new(builder.build()))
.finish()
.unwrap(),
ws::WsClient::with_connector(
"https://127.0.0.1:8883",
SslConnector::new(builder.build()),
)
.finish()
.unwrap(),
);

// connect to server
Expand Down
4 changes: 2 additions & 2 deletions examples/openssl-client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use ntex::connect::openssl::Connector;
use ntex::connect::openssl::SslConnector;
use ntex::time::{sleep, Millis, Seconds};
use ntex_mqtt::v5;
use openssl::ssl;
Expand Down Expand Up @@ -35,7 +35,7 @@ async fn main() -> std::io::Result<()> {

// connect to server
let client = v5::client::MqttConnector::new("127.0.0.1:8883")
.connector(Connector::new(builder.build()))
.connector(SslConnector::new(builder.build()))
.client_id("user")
.keep_alive(Seconds::ONE)
.max_packet_size(30)
Expand Down
4 changes: 2 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{fmt, io, num::NonZeroU16};

use ntex::util::Either;
use ntex_util::future::Either;

use crate::v5::codec::DisconnectReasonCode;

Expand Down Expand Up @@ -195,7 +195,7 @@ pub enum ClientError<T: fmt::Debug> {
Disconnected(Option<std::io::Error>),
/// Connect error
#[error("Connect error: {}", _0)]
Connect(#[from] ntex::connect::ConnectError),
Connect(#[from] ntex_net::connect::ConnectError),
}

impl<T: fmt::Debug> From<EncodeError> for ClientError<T> {
Expand Down
84 changes: 42 additions & 42 deletions src/inflight.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Service that limits number of in-flight async requests.
use std::{cell::Cell, rc::Rc, task::Context, task::Poll};
use std::{cell::Cell, future::poll_fn, rc::Rc, task::Context, task::Poll};

use ntex::service::{Service, ServiceCtx};
use ntex::task::LocalWaker;
use ntex_service::{Service, ServiceCtx};
use ntex_util::task::LocalWaker;

pub(crate) trait SizedRequest {
fn size(&self) -> u32;
Expand All @@ -27,21 +27,13 @@ where
type Response = T::Response;
type Error = T::Error;

ntex::forward_poll_shutdown!(service);
ntex_service::forward_shutdown!(service);

#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let p1 = self.service.poll_ready(cx)?.is_pending();
let p2 = !self.count.available(cx);
if p2 {
log::trace!("InFlight limit exceeded");
}

if p1 || p2 {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
ctx.ready(&self.service).await?;
self.count.available().await;
Ok(())
}

#[inline]
Expand Down Expand Up @@ -77,8 +69,8 @@ impl Counter {
CounterGuard::new(size, self.0.clone())
}

fn available(&self, cx: &Context<'_>) -> bool {
self.0.available(cx)
async fn available(&self) {
poll_fn(|cx| if self.0.available(cx) { Poll::Ready(()) } else { Poll::Pending }).await
}
}

Expand Down Expand Up @@ -134,7 +126,8 @@ impl CounterInner {
mod tests {
use std::{future::poll_fn, time::Duration};

use ntex::{service::Pipeline, time::sleep, util::lazy};
use ntex_service::Pipeline;
use ntex_util::{future::lazy, task::LocalWaker, time::sleep};

use super::*;

Expand All @@ -157,63 +150,69 @@ mod tests {
}
}

#[ntex::test]
#[ntex_macros::rt_test]
async fn test_inflight() {
let wait_time = Duration::from_millis(50);

let srv = Pipeline::new(InFlightService::new(1, 0, SleepService(wait_time)));
let srv = Pipeline::new(InFlightService::new(1, 0, SleepService(wait_time))).bind();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));

let srv2 = srv.clone();
ntex::rt::spawn(async move {
ntex_util::spawn(async move {
let _ = srv2.call(()).await;
});
ntex::time::sleep(Duration::from_millis(25)).await;
ntex_util::time::sleep(Duration::from_millis(25)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);

ntex::time::sleep(Duration::from_millis(50)).await;
ntex_util::time::sleep(Duration::from_millis(50)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(lazy(|cx| srv.poll_shutdown(cx)).await.is_ready());
}

#[ntex::test]
#[ntex_macros::rt_test]
async fn test_inflight2() {
let wait_time = Duration::from_millis(50);

let srv = Pipeline::new(InFlightService::new(0, 10, SleepService(wait_time)));
let srv = Pipeline::new(InFlightService::new(0, 10, SleepService(wait_time))).bind();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));

let srv2 = srv.clone();
ntex::rt::spawn(async move {
ntex_util::spawn(async move {
let _ = srv2.call(()).await;
});
ntex::time::sleep(Duration::from_millis(25)).await;
ntex_util::time::sleep(Duration::from_millis(25)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);

ntex::time::sleep(Duration::from_millis(100)).await;
ntex_util::time::sleep(Duration::from_millis(100)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
}

struct Srv2 {
dur: Duration,
cnt: Cell<bool>,
waker: LocalWaker,
}

impl Service<()> for Srv2 {
type Response = ();
type Error = ();

fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), ()>> {
if !self.cnt.get() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), ()> {
poll_fn(|cx| {
if !self.cnt.get() {
Poll::Ready(Ok(()))
} else {
self.waker.register(cx.waker());
Poll::Pending
}
})
.await

Check warning on line 209 in src/inflight.rs

View check run for this annotation

Codecov / codecov/patch

src/inflight.rs#L209

Added line #L209 was not covered by tests
}

async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
let fut = sleep(self.dur);
self.cnt.set(true);
self.waker.wake();

let _ = fut.await;
self.cnt.set(false);
Expand All @@ -224,27 +223,28 @@ mod tests {
/// InflightService::poll_ready() must always register waker,
/// otherwise it can lose wake up if inner service's poll_ready
/// does not wakes dispatcher.
#[ntex::test]
#[ntex_macros::rt_test]
async fn test_inflight3() {
let wait_time = Duration::from_millis(50);

let srv = Pipeline::new(InFlightService::new(
1,
10,
Srv2 { dur: wait_time, cnt: Cell::new(false) },
));
Srv2 { dur: wait_time, cnt: Cell::new(false), waker: LocalWaker::new() },
))
.bind();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));

let srv2 = srv.clone();
ntex::rt::spawn(async move {
ntex_util::spawn(async move {
let _ = srv2.call(()).await;
});
ntex::time::sleep(Duration::from_millis(25)).await;
ntex_util::time::sleep(Duration::from_millis(25)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);

let srv2 = srv.clone();
let (tx, rx) = ntex::channel::oneshot::channel();
ntex::rt::spawn(async move {
let (tx, rx) = ntex_util::channel::oneshot::channel();
ntex_util::spawn(async move {
let _ = poll_fn(|cx| srv2.poll_ready(cx)).await;
let _ = tx.send(());
});
Expand Down
Loading

0 comments on commit cdad5f5

Please sign in to comment.