Skip to content

Commit

Permalink
better naming
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed May 14, 2024
1 parent aa34aad commit e70568b
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 76 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [2.0.1] - 2024-05-14

* Better naming

## [2.0.0] - 2024-05-1x

* Mark `Control` type as `non exhaustive`
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "2.0.0"
version = "2.0.1"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand Down
32 changes: 19 additions & 13 deletions src/v3/client/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ pub struct MqttConnector<A, T> {
address: A,
connector: Pipeline<T>,
pkt: codec::Connect,
max_size: u32,
max_send: usize,
max_receive: usize,
max_packet_size: u32,
handshake_timeout: Seconds,
config: DispatcherConfig,
pool: Rc<MqttSinkPool>,
Expand All @@ -37,9 +37,9 @@ where
config,
pkt: codec::Connect::default(),
connector: Pipeline::new(Connector::default()),
max_size: 64 * 1024,
max_send: 16,
max_receive: 16,
max_packet_size: 64 * 1024,
handshake_timeout: Seconds::ZERO,
pool: Rc::new(MqttSinkPool::default()),
}
Expand Down Expand Up @@ -102,32 +102,38 @@ where
self
}

#[inline]
/// Max incoming packet size.
///
/// To disable max size limit set value to 0.
pub fn max_size(mut self, val: u32) -> Self {
self.max_size = val;
self
}

#[inline]
/// Set max send packets number
///
/// Number of in-flight outgoing publish packets. By default receive max is set to 16 packets.
/// Number of in-flight outgoing publish packets. By default send max is set to 16 packets.
/// To disable in-flight limit set value to 0.
pub fn max_send(mut self, val: u16) -> Self {
self.max_send = val as usize;
self
}

#[inline]
/// Set max receive packets number
/// Number of inbound in-flight concurrent messages.
///
/// Number of in-flight incoming publish packets. By default receive max is set to 16 packets.
/// To disable in-flight limit set value to 0.
/// By default inbound is set to 16 messages To disable in-flight limit set value to 0.
pub fn max_receive(mut self, val: u16) -> Self {
self.max_receive = val as usize;
self
}

#[inline]
/// Max incoming packet size.
///
/// To disable max size limit set value to 0.
#[deprecated]
#[doc(hidden)]
pub fn max_packet_size(mut self, val: u32) -> Self {
self.max_packet_size = val;
self.max_size = val;
self
}

Expand Down Expand Up @@ -184,9 +190,9 @@ where
pkt: self.pkt,
address: self.address,
config: self.config,
max_size: self.max_size,
max_send: self.max_send,
max_receive: self.max_receive,
max_packet_size: self.max_packet_size,
handshake_timeout: self.handshake_timeout,
pool: self.pool,
}
Expand Down Expand Up @@ -216,7 +222,7 @@ where
let config = self.config.clone();
let pool = self.pool.clone();
let codec = codec::Codec::new();
codec.set_max_size(self.max_packet_size);
codec.set_max_size(self.max_size);

io.encode(pkt.into(), &codec)?;

Expand Down
8 changes: 4 additions & 4 deletions src/v3/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use super::{codec, publish::Publish, shared::Ack, shared::MqttShared, Session};
pub(super) fn factory<St, T, C, E>(
publish: T,
control: C,
inflight: u16,
inflight_size: usize,
inbound: u16,
inbound_size: usize,
max_qos: QoS,
handle_qos_after_disconnect: Option<QoS>,
) -> impl ServiceFactory<
Expand Down Expand Up @@ -62,8 +62,8 @@ where
Ok(
// limit number of in-flight messages
crate::inflight::InFlightService::new(
inflight,
inflight_size,
inbound,
inbound_size,
Dispatcher::<_, _, E>::new(
sink,
publish,
Expand Down
21 changes: 14 additions & 7 deletions src/v3/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl Handshake {
keepalive,
session_present,
session: Some(st),
outgoing: None,
max_send: None,
return_code: mqtt::ConnectAckReason::ConnectionAccepted,
}
}
Expand All @@ -79,7 +79,7 @@ impl Handshake {
session: None,
session_present: false,
keepalive: DEFAULT_KEEPALIVE,
outgoing: None,
max_send: None,
return_code: mqtt::ConnectAckReason::IdentifierRejected,
}
}
Expand All @@ -91,7 +91,7 @@ impl Handshake {
shared: self.shared,
session: None,
session_present: false,
outgoing: None,
max_send: None,
keepalive: DEFAULT_KEEPALIVE,
return_code: mqtt::ConnectAckReason::BadUserNameOrPassword,
}
Expand All @@ -104,7 +104,7 @@ impl Handshake {
shared: self.shared,
session: None,
session_present: false,
outgoing: None,
max_send: None,
keepalive: DEFAULT_KEEPALIVE,
return_code: mqtt::ConnectAckReason::NotAuthorized,
}
Expand All @@ -117,7 +117,7 @@ impl Handshake {
shared: self.shared,
session: None,
session_present: false,
outgoing: None,
max_send: None,
keepalive: DEFAULT_KEEPALIVE,
return_code: mqtt::ConnectAckReason::ServiceUnavailable,
}
Expand All @@ -138,7 +138,7 @@ pub struct HandshakeAck<St> {
pub(crate) return_code: mqtt::ConnectAckReason,
pub(crate) shared: Rc<MqttShared>,
pub(crate) keepalive: Seconds,
pub(crate) outgoing: Option<u16>,
pub(crate) max_send: Option<u16>,
}

impl<St> HandshakeAck<St> {
Expand All @@ -153,8 +153,15 @@ impl<St> HandshakeAck<St> {
/// Number of outgoing concurrent messages.
///
/// By default outgoing is set to 16 messages
pub fn max_send(mut self, val: u16) -> Self {
self.max_send = Some(val);
self
}

#[deprecated]
#[doc(hidden)]
pub fn max_outgoing(mut self, val: u16) -> Self {
self.outgoing = Some(val);
self.max_send = Some(val);
self
}
}
90 changes: 52 additions & 38 deletions src/v3/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ pub struct MqttServer<St, H, C, P> {
publish: P,
max_qos: QoS,
max_size: u32,
max_inflight: u16,
max_inflight_size: usize,
max_outgoing: u16,
max_outgoing_size: (u32, u32),
max_receive: u16,
max_receive_size: usize,
max_send: u16,
max_send_size: (u32, u32),
handle_qos_after_disconnect: Option<QoS>,
connect_timeout: Seconds,
config: DispatcherConfig,
Expand Down Expand Up @@ -79,10 +79,10 @@ where
publish: DefaultPublishService::default(),
max_qos: QoS::AtLeastOnce,
max_size: 0,
max_inflight: 16,
max_inflight_size: 65535,
max_outgoing: 16,
max_outgoing_size: (65535, 512),
max_receive: 16,
max_receive_size: 65535,
max_send: 16,
max_send_size: (65535, 512),
handle_qos_after_disconnect: None,
connect_timeout: Seconds::ZERO,
pool: Default::default(),
Expand Down Expand Up @@ -155,35 +155,49 @@ where
self
}

/// Number of in-flight concurrent messages.
/// Number of inbound in-flight concurrent messages.
///
/// By default in-flight is set to 16 messages
/// By default inbound is set to 16 messages
pub fn max_receive(mut self, val: u16) -> Self {
self.max_receive = val;
self
}

#[deprecated]
#[doc(hidden)]
pub fn max_inflight(mut self, val: u16) -> Self {
self.max_inflight = val;
self.max_receive = val;
self
}

/// Total size of in-flight messages.
/// Total size of inbound in-flight messages.
///
/// By default total in-flight size is set to 64Kb
/// By default total inbound in-flight size is set to 64Kb
pub fn max_receive_size(mut self, val: usize) -> Self {
self.max_receive_size = val;
self
}

#[deprecated]
#[doc(hidden)]
pub fn max_inflight_size(mut self, val: usize) -> Self {
self.max_inflight_size = val;
self.max_receive_size = val;
self
}

/// Number of outgoing concurrent messages.
///
/// By default outgoing is set to 16 messages
pub fn max_outgoing(mut self, val: u16) -> Self {
self.max_outgoing = val;
pub fn max_send(mut self, val: u16) -> Self {
self.max_send = val;
self
}

/// Total size of outgoing messages.
///
/// By default total outgoing size is set to 64Kb
pub fn max_outgoing_size(mut self, val: u32) -> Self {
self.max_outgoing_size = (val, val / 10);
pub fn max_send_size(mut self, val: u32) -> Self {
self.max_send_size = (val, val / 10);
self
}

Expand Down Expand Up @@ -230,10 +244,10 @@ where
config: self.config,
max_qos: self.max_qos,
max_size: self.max_size,
max_inflight: self.max_inflight,
max_inflight_size: self.max_inflight_size,
max_outgoing: self.max_outgoing,
max_outgoing_size: self.max_outgoing_size,
max_receive: self.max_receive,
max_receive_size: self.max_receive_size,
max_send: self.max_send,
max_send_size: self.max_send_size,
handle_qos_after_disconnect: self.handle_qos_after_disconnect,
connect_timeout: self.connect_timeout,
pool: self.pool,
Expand All @@ -255,10 +269,10 @@ where
config: self.config,
max_qos: self.max_qos,
max_size: self.max_size,
max_inflight: self.max_inflight,
max_inflight_size: self.max_inflight_size,
max_outgoing: self.max_outgoing,
max_outgoing_size: self.max_outgoing_size,
max_receive: self.max_receive,
max_receive_size: self.max_receive_size,
max_send: self.max_send,
max_send_size: self.max_send_size,
handle_qos_after_disconnect: self.handle_qos_after_disconnect,
connect_timeout: self.connect_timeout,
pool: self.pool,
Expand Down Expand Up @@ -290,17 +304,17 @@ where
HandshakeFactory {
factory: self.handshake,
max_size: self.max_size,
max_outgoing: self.max_outgoing,
max_outgoing_size: self.max_outgoing_size,
max_send: self.max_send,
max_send_size: self.max_send_size,
connect_timeout: self.connect_timeout,
pool: self.pool.clone(),
_t: PhantomData,
},
factory(
self.publish,
self.control,
self.max_inflight,
self.max_inflight_size,
self.max_receive,
self.max_receive_size,
self.max_qos,
self.handle_qos_after_disconnect,
),
Expand All @@ -312,8 +326,8 @@ where
struct HandshakeFactory<St, H> {
factory: H,
max_size: u32,
max_outgoing: u16,
max_outgoing_size: (u32, u32),
max_send: u16,
max_send_size: (u32, u32),
connect_timeout: Seconds,
pool: Rc<MqttSinkPool>,
_t: PhantomData<St>,
Expand All @@ -333,8 +347,8 @@ where
async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
Ok(HandshakeService {
max_size: self.max_size,
max_outgoing: self.max_outgoing,
max_outgoing_size: self.max_outgoing_size,
max_send: self.max_send,
max_send_size: self.max_send_size,
pool: self.pool.clone(),
service: self.factory.create(()).await?,
connect_timeout: self.connect_timeout.into(),
Expand All @@ -346,8 +360,8 @@ where
struct HandshakeService<St, H> {
service: H,
max_size: u32,
max_outgoing: u16,
max_outgoing_size: (u32, u32),
max_send: u16,
max_send_size: (u32, u32),
pool: Rc<MqttSinkPool>,
connect_timeout: Millis,
_t: PhantomData<St>,
Expand All @@ -371,7 +385,7 @@ where
) -> Result<Self::Response, Self::Error> {
log::trace!("Starting mqtt v3 handshake");

let (h, l) = self.max_outgoing_size;
let (h, l) = self.max_send_size;
io.memory_pool().set_write_params(h, l);

let codec = mqtt::Codec::default();
Expand Down Expand Up @@ -408,7 +422,7 @@ where

log::trace!("Sending success handshake ack: {:#?}", pkt);

ack.shared.set_cap(ack.outgoing.unwrap_or(self.max_outgoing) as usize);
ack.shared.set_cap(ack.max_send.unwrap_or(self.max_send) as usize);
ack.io.encode(pkt, &ack.shared.codec)?;
Ok((
ack.io,
Expand Down
Loading

0 comments on commit e70568b

Please sign in to comment.