Skip to content

Commit

Permalink
Check for duplicated in-flight packet ids
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed May 26, 2020
1 parent e6d66e3 commit 65efc03
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 14 deletions.
6 changes: 5 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.1.3] - 2020-05-26

* Check for duplicated in-flight packet ids

## [0.1.2] - 2020-04-20

* Update ntex
Expand Down Expand Up @@ -32,4 +36,4 @@

## [0.1.0] - 2019-09-25

* Initial release
* Initial release
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "0.1.2"
version = "0.1.3"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "MQTT v3.1.1 Client/Server framework"
documentation = "https://docs.rs/ntex-mqtt"
Expand All @@ -19,6 +19,7 @@ bytes = "0.5.4"
derive_more = "0.99.5"
either = "1.5.3"
futures = "0.3.4"
fxhash = "0.2.1"
pin-project = "0.4.8"
log = "0.4"
bytestring = "0.1.5"
Expand Down
38 changes: 26 additions & 12 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use std::rc::Rc;
use std::task::{Context, Poll};
use std::time::Duration;

use futures::future::{join3, ok, Either, FutureExt, LocalBoxFuture, Ready};
use futures::future::{err, join3, ok, Either, FutureExt, LocalBoxFuture, Ready};
use futures::ready;
use fxhash::FxHashSet;
use ntex::service::{boxed, fn_factory_with_config, pipeline, Service, ServiceFactory};
use ntex::util::inflight::InFlightService;
use ntex::util::keepalive::KeepAliveService;
Expand Down Expand Up @@ -75,7 +76,7 @@ where
let (publish, subscribe, unsubscribe) = fut.await;

// mqtt dispatcher
Ok(Dispatcher::new(
Ok(Dispatcher::<_, _, E>::new(
cfg,
// keep-alive connection
pipeline(KeepAliveService::new(timeout, time, || {
Expand All @@ -101,17 +102,18 @@ where
}

/// PUBLIS/SUBSCRIBER/UNSUBSCRIBER packets dispatcher
pub(crate) struct Dispatcher<St, T: Service> {
pub(crate) struct Dispatcher<St, T: Service<Error = MqttError<E>>, E> {
session: Session<St>,
publish: T,
subscribe: boxed::BoxService<Subscribe, SubscribeResult, T::Error>,
unsubscribe: boxed::BoxService<Unsubscribe, (), T::Error>,
disconnect: RefCell<Option<Rc<dyn Fn(&Session<St>, bool)>>>,
inflight: Rc<RefCell<FxHashSet<NonZeroU16>>>,
}

impl<St, T> Dispatcher<St, T>
impl<St, T, E> Dispatcher<St, T, E>
where
T: Service<Request = Publish, Response = ()>,
T: Service<Request = Publish, Response = (), Error = MqttError<E>>,
{
pub(crate) fn new(
session: Session<St>,
Expand All @@ -126,24 +128,25 @@ where
subscribe,
unsubscribe,
disconnect: RefCell::new(disconnect),
inflight: Rc::new(RefCell::new(FxHashSet::default())),
}
}
}

impl<St, T> Service for Dispatcher<St, T>
impl<St, T, E> Service for Dispatcher<St, T, E>
where
T: Service<Request = Publish, Response = ()>,
T::Error: 'static,
T: Service<Request = Publish, Response = (), Error = MqttError<E>>,
E: 'static,
{
type Request = mqtt::Packet;
type Response = Option<mqtt::Packet>;
type Error = T::Error;
type Error = MqttError<E>;
type Future = Either<
Either<
Ready<Result<Self::Response, T::Error>>,
LocalBoxFuture<'static, Result<Self::Response, T::Error>>,
Ready<Result<Self::Response, MqttError<E>>>,
LocalBoxFuture<'static, Result<Self::Response, MqttError<E>>>,
>,
PublishResponse<T::Future, T::Error>,
PublishResponse<T::Future, MqttError<E>>,
>;

fn poll_ready(&self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Expand Down Expand Up @@ -173,9 +176,18 @@ where
}
mqtt::Packet::Disconnect => Either::Left(Either::Left(ok(None))),
mqtt::Packet::Publish(publish) => {
let inflight = self.inflight.clone();
let packet_id = publish.packet_id;

// check for duplicated packet id
if let Some(pid) = packet_id {
if !inflight.borrow_mut().insert(pid) {
return Either::Left(Either::Left(err(MqttError::DuplicatedPacketId)));
}
}
Either::Right(PublishResponse {
packet_id,
inflight,
fut: self.publish.call(Publish::new(publish)),
_t: PhantomData,
})
Expand Down Expand Up @@ -214,6 +226,7 @@ pub(crate) struct PublishResponse<T, E> {
#[pin]
fut: T,
packet_id: Option<NonZeroU16>,
inflight: Rc<RefCell<FxHashSet<NonZeroU16>>>,
_t: PhantomData<E>,
}

Expand All @@ -228,6 +241,7 @@ where

ready!(this.fut.poll(cx))?;
if let Some(packet_id) = this.packet_id {
this.inflight.borrow_mut().remove(&packet_id);
Poll::Ready(Ok(Some(mqtt::Packet::PublishAck {
packet_id: *packet_id,
})))
Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pub enum MqttError<E> {
Unexpected(crate::codec3::Packet, &'static str),
/// "SUBSCRIBE, UNSUBSCRIBE, and PUBLISH (in cases where QoS > 0) Control Packets MUST contain a non-zero 16-bit Packet Identifier [MQTT-2.3.1-1]."
PacketIdRequired,
/// Multiple in-flight publish packet with same package_id
DuplicatedPacketId,
/// Keep alive timeout
KeepAliveTimeout,
/// Handshake timeout
Expand Down

0 comments on commit 65efc03

Please sign in to comment.