Skip to content

Commit

Permalink
Refactor server config
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed May 12, 2024
1 parent fb917b5 commit 2631410
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 19 deletions.
6 changes: 3 additions & 3 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ impl<Err, InitErr> Default
impl<V3, V5, Err, InitErr> MqttServer<V3, V5, Err, InitErr> {
/// Set client timeout reading protocol version.
///
/// Defines a timeout for reading `Connect` frame. If a client does not transmit
/// Defines a timeout for reading protocol version. If a client does not transmit
/// version of the protocol within this time, the connection is terminated with
/// Mqtt::Handshake(HandshakeError::Timeout) error.
///
/// By default, connect timeuot is 5 seconds.
pub fn connect_timeout(mut self, timeout: Seconds) -> Self {
/// By default, timeuot is 5 seconds.
pub fn protocol_version_timeout(mut self, timeout: Seconds) -> Self {

Check warning on line 59 in src/server.rs

View check run for this annotation

Codecov / codecov/patch

src/server.rs#L59

Added line #L59 was not covered by tests
self.connect_timeout = timeout.into();
self
}
Expand Down
21 changes: 10 additions & 11 deletions src/v3/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use super::shared::MqttShared;
use super::sink::MqttSink;

const DEFAULT_KEEPALIVE: Seconds = Seconds(30);
const DEFAULT_OUTGOING_INFLIGHT: u16 = 16;

/// Connect message
pub struct Handshake {
Expand Down Expand Up @@ -67,7 +66,7 @@ impl Handshake {
keepalive,
session_present,
session: Some(st),
inflight: DEFAULT_OUTGOING_INFLIGHT,
outgoing: None,
return_code: mqtt::ConnectAckReason::ConnectionAccepted,
}
}
Expand All @@ -80,7 +79,7 @@ impl Handshake {
session: None,
session_present: false,
keepalive: DEFAULT_KEEPALIVE,
inflight: DEFAULT_OUTGOING_INFLIGHT,
outgoing: None,
return_code: mqtt::ConnectAckReason::IdentifierRejected,
}
}
Expand All @@ -92,8 +91,8 @@ impl Handshake {
shared: self.shared,
session: None,
session_present: false,
outgoing: None,
keepalive: DEFAULT_KEEPALIVE,
inflight: DEFAULT_OUTGOING_INFLIGHT,
return_code: mqtt::ConnectAckReason::BadUserNameOrPassword,
}
}
Expand All @@ -105,8 +104,8 @@ impl Handshake {
shared: self.shared,
session: None,
session_present: false,
outgoing: None,
keepalive: DEFAULT_KEEPALIVE,
inflight: DEFAULT_OUTGOING_INFLIGHT,
return_code: mqtt::ConnectAckReason::NotAuthorized,
}
}
Expand All @@ -118,8 +117,8 @@ impl Handshake {
shared: self.shared,
session: None,
session_present: false,
outgoing: None,
keepalive: DEFAULT_KEEPALIVE,
inflight: DEFAULT_OUTGOING_INFLIGHT,
return_code: mqtt::ConnectAckReason::ServiceUnavailable,
}
}
Expand All @@ -139,7 +138,7 @@ pub struct HandshakeAck<St> {
pub(crate) return_code: mqtt::ConnectAckReason,
pub(crate) shared: Rc<MqttShared>,
pub(crate) keepalive: Seconds,
pub(crate) inflight: u16,
pub(crate) outgoing: Option<u16>,
}

impl<St> HandshakeAck<St> {
Expand All @@ -151,11 +150,11 @@ impl<St> HandshakeAck<St> {
self
}

/// Number of outgoing in-flight concurrent messages.
/// Number of outgoing concurrent messages.
///
/// By default in-flight is set to 16 messages
pub fn inflight(mut self, val: u16) -> Self {
self.inflight = val;
/// By default outgoing is set to 16 messages
pub fn max_outgoing(mut self, val: u16) -> Self {
self.outgoing = Some(val);

Check warning on line 157 in src/v3/handshake.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/handshake.rs#L156-L157

Added lines #L156 - L157 were not covered by tests
self
}
}
43 changes: 39 additions & 4 deletions src/v3/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub struct MqttServer<St, H, C, P> {
max_size: u32,
max_inflight: u16,
max_inflight_size: usize,
max_outgoing: u16,
max_outgoing_size: (u32, u32),
handle_qos_after_disconnect: Option<QoS>,
connect_timeout: Seconds,
config: DispatcherConfig,
Expand Down Expand Up @@ -79,6 +81,8 @@ where
max_size: 0,
max_inflight: 16,
max_inflight_size: 65535,
max_outgoing: 16,
max_outgoing_size: (65535, 512),
handle_qos_after_disconnect: None,
connect_timeout: Seconds::ZERO,
pool: Default::default(),
Expand Down Expand Up @@ -154,19 +158,35 @@ where
/// Number of in-flight concurrent messages.
///
/// By default in-flight is set to 16 messages
pub fn inflight(mut self, val: u16) -> Self {
pub fn max_inflight(mut self, val: u16) -> Self {

Check warning on line 161 in src/v3/server.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/server.rs#L161

Added line #L161 was not covered by tests
self.max_inflight = val;
self
}

/// Total size of in-flight messages.
///
/// By default total in-flight size is set to 64Kb
pub fn inflight_size(mut self, val: usize) -> Self {
pub fn max_inflight_size(mut self, val: usize) -> Self {

Check warning on line 169 in src/v3/server.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/server.rs#L169

Added line #L169 was not covered by tests
self.max_inflight_size = val;
self
}

/// Number of outgoing concurrent messages.
///
/// By default outgoing is set to 16 messages
pub fn max_outgoing(mut self, val: u16) -> Self {
self.max_outgoing = val;
self
}

Check warning on line 180 in src/v3/server.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/server.rs#L177-L180

Added lines #L177 - L180 were not covered by tests

/// Total size of outgoing messages.
///
/// By default total outgoing size is set to 64Kb
pub fn max_outgoing_size(mut self, val: u32) -> Self {
self.max_outgoing_size = (val, val / 10);
self
}

Check warning on line 188 in src/v3/server.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/server.rs#L185-L188

Added lines #L185 - L188 were not covered by tests

/// Handle max received QoS messages after client disconnect.
///
/// By default, messages received before dispatched to the publish service will be dropped if
Expand Down Expand Up @@ -212,6 +232,8 @@ where
max_size: self.max_size,
max_inflight: self.max_inflight,
max_inflight_size: self.max_inflight_size,
max_outgoing: self.max_outgoing,
max_outgoing_size: self.max_outgoing_size,
handle_qos_after_disconnect: self.handle_qos_after_disconnect,
connect_timeout: self.connect_timeout,
pool: self.pool,
Expand All @@ -235,8 +257,10 @@ where
max_size: self.max_size,
max_inflight: self.max_inflight,
max_inflight_size: self.max_inflight_size,
connect_timeout: self.connect_timeout,
max_outgoing: self.max_outgoing,
max_outgoing_size: self.max_outgoing_size,
handle_qos_after_disconnect: self.handle_qos_after_disconnect,
connect_timeout: self.connect_timeout,
pool: self.pool,
_t: PhantomData,
}
Expand Down Expand Up @@ -266,6 +290,8 @@ where
HandshakeFactory {
factory: self.handshake,
max_size: self.max_size,
max_outgoing: self.max_outgoing,
max_outgoing_size: self.max_outgoing_size,
connect_timeout: self.connect_timeout,
pool: self.pool.clone(),
_t: PhantomData,
Expand All @@ -286,6 +312,8 @@ where
struct HandshakeFactory<St, H> {
factory: H,
max_size: u32,
max_outgoing: u16,
max_outgoing_size: (u32, u32),
connect_timeout: Seconds,
pool: Rc<MqttSinkPool>,
_t: PhantomData<St>,
Expand All @@ -305,6 +333,8 @@ where
async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
Ok(HandshakeService {
max_size: self.max_size,
max_outgoing: self.max_outgoing,
max_outgoing_size: self.max_outgoing_size,
pool: self.pool.clone(),
service: self.factory.create(()).await?,
connect_timeout: self.connect_timeout.into(),
Expand All @@ -316,6 +346,8 @@ where
struct HandshakeService<St, H> {
service: H,
max_size: u32,
max_outgoing: u16,
max_outgoing_size: (u32, u32),
pool: Rc<MqttSinkPool>,
connect_timeout: Millis,
_t: PhantomData<St>,
Expand All @@ -339,6 +371,9 @@ where
) -> Result<Self::Response, Self::Error> {
log::trace!("Starting mqtt v3 handshake");

let (h, l) = self.max_outgoing_size;
io.memory_pool().set_write_params(h, l);

let codec = mqtt::Codec::default();
codec.set_max_size(self.max_size);
let shared = Rc::new(MqttShared::new(io.get_ref(), codec, false, self.pool.clone()));
Expand Down Expand Up @@ -373,7 +408,7 @@ where

log::trace!("Sending success handshake ack: {:#?}", pkt);

ack.shared.set_cap(ack.inflight as usize);
ack.shared.set_cap(ack.outgoing.unwrap_or(self.max_outgoing) as usize);
ack.io.encode(pkt, &ack.shared.codec)?;
Ok((
ack.io,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_server_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,7 @@ async fn handle_or_drop_publish_after_disconnect(
sleep(Millis(1750)).await;
io.close();
drop(io);
sleep(Millis(1000)).await;
sleep(Millis(1500)).await;

assert!(disconnect.load(Relaxed));

Expand Down

0 comments on commit 2631410

Please sign in to comment.