Skip to content

Commit

Permalink
v5: Allow to configure ConnectAck::max_qos value
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Nov 4, 2020
1 parent 67887cd commit 1786406
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 2 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.3.17] - 2020-11-04

* v5: Allow to configure ConnectAck::max_qos value

## [0.3.16] - 2020-10-28

* Do not print publish payload in debug fmt
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "0.3.16"
version = "0.3.17"
authors = ["ntex contributors <team@ntex.rs>"]
description = "MQTT Client/Server framework for v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand Down
1 change: 0 additions & 1 deletion src/v5/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ impl<Io> Connect<Io> {
/// Ack connect message and set state
pub fn ack<St>(self, st: St) -> ConnectAck<Io, St> {
let mut packet = codec::ConnectAck {
max_qos: Some(codec::QoS::AtLeastOnce),
reason_code: codec::ConnectAckReason::Success,
topic_alias_max: self.max_topic_alias,
..codec::ConnectAck::default()
Expand Down
24 changes: 24 additions & 0 deletions src/v5/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use ntex_codec::{AsyncRead, AsyncWrite, Framed};
use crate::error::{MqttError, ProtocolError};
use crate::handshake::{Handshake, HandshakeResult};
use crate::service::{FactoryBuilder, FactoryBuilder2};
use crate::types::QoS;

use super::codec as mqtt;
use super::connect::{Connect, ConnectAck};
Expand All @@ -27,6 +28,7 @@ pub struct MqttServer<Io, St, C: ServiceFactory, Cn: ServiceFactory, P: ServiceF
srv_publish: P,
max_size: u32,
max_receive: u16,
max_qos: Option<QoS>,
handshake_timeout: usize,
disconnect_timeout: usize,
max_topic_alias: u16,
Expand Down Expand Up @@ -59,6 +61,7 @@ where
srv_publish: DefaultPublishService::default(),
max_size: 0,
max_receive: 15,
max_qos: None,
handshake_timeout: 0,
disconnect_timeout: 3000,
max_topic_alias: 32,
Expand Down Expand Up @@ -130,6 +133,14 @@ where
self
}

/// Set server max qos setting.
///
/// By default max qos is not set`
pub fn max_qos(mut self, qos: QoS) -> Self {
self.max_qos = Some(qos);
self
}

/// Service to handle control messages
pub fn control<F, Srv>(self, service: F) -> MqttServer<Io, St, C, Srv, P>
where
Expand All @@ -148,6 +159,7 @@ where
max_size: self.max_size,
max_receive: self.max_receive,
max_topic_alias: self.max_topic_alias,
max_qos: self.max_qos,
handshake_timeout: self.handshake_timeout,
disconnect_timeout: self.disconnect_timeout,
pool: self.pool,
Expand All @@ -172,6 +184,7 @@ where
max_size: self.max_size,
max_receive: self.max_receive,
max_topic_alias: self.max_topic_alias,
max_qos: self.max_qos,
handshake_timeout: self.handshake_timeout,
disconnect_timeout: self.disconnect_timeout,
pool: self.pool,
Expand Down Expand Up @@ -218,6 +231,7 @@ where
self.max_size,
self.max_receive,
self.max_topic_alias,
self.max_qos,
self.handshake_timeout,
self.pool,
))
Expand Down Expand Up @@ -249,6 +263,7 @@ where
self.max_size,
self.max_receive,
self.max_topic_alias,
self.max_qos,
self.handshake_timeout,
self.pool,
))
Expand All @@ -263,6 +278,7 @@ fn handshake_service_factory<Io, St, C>(
max_size: u32,
max_receive: u16,
max_topic_alias: u16,
max_qos: Option<QoS>,
handshake_timeout: usize,
pool: Rc<MqttSinkPool>,
) -> impl ServiceFactory<
Expand Down Expand Up @@ -291,6 +307,7 @@ where
max_size,
max_receive,
max_topic_alias,
max_qos,
pool.clone(),
)
})
Expand All @@ -308,6 +325,7 @@ fn handshake_service_factory2<Io, St, C>(
max_size: u32,
max_receive: u16,
max_topic_alias: u16,
max_qos: Option<QoS>,
handshake_timeout: usize,
pool: Rc<MqttSinkPool>,
) -> impl ServiceFactory<
Expand Down Expand Up @@ -336,6 +354,7 @@ where
max_size,
max_receive,
max_topic_alias,
max_qos,
pool.clone(),
)
})
Expand All @@ -354,6 +373,7 @@ async fn handshake<Io, S, St, E>(
max_size: u32,
mut max_receive: u16,
mut max_topic_alias: u16,
max_qos: Option<QoS>,
pool: Rc<MqttSinkPool>,
) -> Result<HandshakeResult<Io, Session<St>, mqtt::Codec>, S::Error>
where
Expand Down Expand Up @@ -415,6 +435,10 @@ where

max_topic_alias = ack.packet.topic_alias_max;

if ack.packet.max_qos.is_none() {
ack.packet.max_qos = max_qos;
}

if let Some(num) = ack.packet.receive_max {
max_receive = num.get();
} else {
Expand Down
1 change: 1 addition & 0 deletions tests/test_server_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ async fn test_max_receive() {
let srv = server::test_server(move || {
MqttServer::new(connect)
.receive_max(1)
.max_qos(codec::QoS::AtLeastOnce)
.publish(|p: Publish| {
delay_for(Duration::from_millis(10000))
.map(move |_| Ok::<_, TestError>(p.ack()))
Expand Down

0 comments on commit 1786406

Please sign in to comment.