Skip to content

Commit

Permalink
fixed feature flags
Browse files Browse the repository at this point in the history
  • Loading branch information
GunnarMorrigan committed Jan 1, 2024
1 parent b550ea3 commit 62ee5ac
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 38 deletions.
40 changes: 29 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ categories = ["network-programming"]
readme = "README.md"
edition = "2021"
license = "MPL-2.0"
keywords = [ "MQTT", "IoT", "MQTTv5", "messaging", "client" ]
keywords = ["MQTT", "IoT", "MQTTv5", "messaging", "client"]
description = "Pure rust MQTTv5 client implementation Smol and Tokio"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["smol", "tokio", "tokio_concurrent"]
tokio_concurrent = ["dep:tokio", "tokio/rt"]
tokio = ["dep:tokio"]
default = [
"smol",
"tokio",
]
tokio = ["dep:tokio", "tokio/rt"]
smol = ["dep:smol"]
logs = ["dep:tracing"]

Expand All @@ -30,24 +32,40 @@ tracing = { version = "0.1.40", optional = true }

async-channel = "2.1.1"
#async-mutex = "1.4.0"
futures = { version = "0.3.30", default-features = false, features = ["std", "async-await"] }
futures = { version = "0.3.30", default-features = false, features = [
"std",
"async-await",
] }

# quic feature flag
# quinn = {version = "0.9.0", optional = true }

# tokio feature flag
tokio = { version = "1.35.1", features = ["macros", "io-util", "net", "time"], optional = true }
tokio = { version = "1.35.1", features = [
"macros",
"io-util",
"net",
"time",
], optional = true }

# smol feature flag
smol = { version = "2.0.0", optional = true }

[dev-dependencies]
criterion = {version="0.5.1", features=["async_tokio"]}
criterion = { version = "0.5.1", features = ["async_tokio"] }

tracing-subscriber = {version = "0.3.18", features = ["env-filter"]}
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

smol = { version = "2.0.0" }
tokio = { version = "1.33.0", features = ["rt-multi-thread", "rt", "macros", "sync", "io-util", "net", "time"] }
tokio = { version = "1.33.0", features = [
"rt-multi-thread",
"rt",
"macros",
"sync",
"io-util",
"net",
"time",
] }

rustls = { version = "0.21.7" }
rustls-pemfile = { version = "1.0.3" }
Expand All @@ -59,5 +77,5 @@ rand = "0.8.5"


[[bench]]
name = "bench_main"
harness = false
name = "bench_main"
harness = false
18 changes: 8 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ mod connect_options;
mod mqtt_handler;
mod util;

#[cfg(any(feature = "tokio", feature = "tokio_concurrent"))]
#[cfg(any(feature = "tokio"))]
pub mod tokio;
#[cfg(feature = "smol")]
pub mod smol;
Expand All @@ -190,7 +190,6 @@ pub use event_handlers::*;
pub use client::MqttClient;
pub use connect_options::ConnectOptions;
use mqtt_handler::StateHandler;
use available_packet_ids::AvailablePacketIds;

#[cfg(test)]
pub mod tests;
Expand Down Expand Up @@ -234,11 +233,12 @@ impl<H, S> NetworkBuilder<H, S> {
}
}

#[cfg(feature = "tokio")]
impl<H, S> NetworkBuilder<H, S>
where
H: AsyncEventHandlerMut,
S: ::tokio::io::AsyncReadExt + ::tokio::io::AsyncWriteExt + Sized + Unpin,
{
{
/// Creates the needed components to run the MQTT client using a stream that implements [`tokio::io::AsyncReadExt`] and [`tokio::io::AsyncWriteExt`]
/// This network is supposed to be ran on a single task/thread. The read and write operations happen one after the other.
/// This approach does not give the most speed in terms of reading and writing but provides a simple and easy to use client with low overhead for low throughput clients.
Expand All @@ -257,7 +257,7 @@ where
pub fn tokio_sequential_network(self) -> (tokio::Network<tokio::SequentialHandler, H, S>, MqttClient) where H: AsyncEventHandlerMut {
let (to_network_s, to_network_r) = async_channel::bounded(CHANNEL_SIZE);

let (apkids, apkids_r) = AvailablePacketIds::new(self.options.send_maximum());
let (apkids, apkids_r) = available_packet_ids::AvailablePacketIds::new(self.options.send_maximum());

let max_packet_size = self.options.maximum_packet_size();

Expand All @@ -270,12 +270,12 @@ where
}


#[cfg(feature = "tokio")]
impl<H, S> NetworkBuilder<H, S>
where
H: AsyncEventHandler,
S: ::tokio::io::AsyncReadExt + ::tokio::io::AsyncWriteExt + Sized + Unpin,
{
#[cfg(feature = "tokio_concurrent")]
/// Creates the needed components to run the MQTT client using a stream that implements [`tokio::io::AsyncReadExt`] and [`tokio::io::AsyncWriteExt`]
/// # Example
///
Expand All @@ -290,7 +290,7 @@ where
pub fn tokio_concurrent_network(self) -> (tokio::Network<tokio::ConcurrentHandler, H, S>, MqttClient) {
let (to_network_s, to_network_r) = async_channel::bounded(CHANNEL_SIZE);

let (apkids, apkids_r) = AvailablePacketIds::new(self.options.send_maximum());
let (apkids, apkids_r) = available_packet_ids::AvailablePacketIds::new(self.options.send_maximum());

let max_packet_size = self.options.maximum_packet_size();

Expand Down Expand Up @@ -496,7 +496,7 @@ mod smol_lib_test {
}
}

#[cfg(feature = "tokio_concurrent")]
#[cfg(feature = "tokio")]
#[cfg(test)]
mod tokio_lib_test {
use crate::example_handlers::PingPong;
Expand All @@ -513,7 +513,7 @@ mod tokio_lib_test {
;


#[cfg(feature = "tokio_concurrent")]

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_tokio_tcp() {
use std::hint::black_box;
Expand Down Expand Up @@ -562,9 +562,7 @@ mod tokio_lib_test {
let _ = black_box(read_result);
}



// #[cfg(feature = "tokio_concurrent")]
// #[tokio::test]
// async fn test_tokio_ping_req() {
// let mut client_id: String = rand::thread_rng().sample_iter(&rand::distributions::Alphanumeric).take(7).map(char::from).collect();
Expand Down
2 changes: 1 addition & 1 deletion src/smol/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::packets::ConnAck;
use crate::packets::{
error::ReadBytes,
reason_codes::ConnAckReasonCode,
{FixedHeader, Packet, PacketType},
{FixedHeader, Packet},
};
use crate::{connect_options::ConnectOptions, error::ConnectionError};

Expand Down
12 changes: 5 additions & 7 deletions src/tokio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@ pub(crate) mod network;

use futures::Future;
pub use network::Network;
pub use network::NetworkReader;
pub use network::NetworkWriter;
pub use network::{NetworkReader, NetworkWriter};

use crate::AsyncEventHandler;
use crate::AsyncEventHandlerMut;
use crate::error::ConnectionError;
use crate::packets::Packet;

Expand All @@ -23,7 +20,6 @@ pub struct SequentialHandler;
/// This kind of handler is used for both concurrent message handling and concurrent TCP read and write operations.
pub struct ConcurrentHandler;


trait HandlerExt<H>: Sized{
/// Should call the handler in the fashion of the handler.
/// (e.g. spawn a task if or await the handle call)
Expand All @@ -45,7 +41,8 @@ trait HandlerExt<H>: Sized{
;

}
impl<H: AsyncEventHandlerMut + Send> HandlerExt<H> for SequentialHandler {

impl<H: crate::AsyncEventHandlerMut + Send> HandlerExt<H> for SequentialHandler {
#[inline]
fn call_handler(handler: &mut H, incoming_packet: Packet) -> impl Future<Output = ()> + Send{
handler.handle(incoming_packet)
Expand All @@ -67,7 +64,8 @@ impl<H: AsyncEventHandlerMut + Send> HandlerExt<H> for SequentialHandler {
}
}
}
impl<H: AsyncEventHandler + Send + Sync + Clone + 'static> HandlerExt<H> for ConcurrentHandler {

impl<H: crate::AsyncEventHandler + Send + Sync + Clone + 'static> HandlerExt<H> for ConcurrentHandler {
fn call_handler(handler: &mut H, incoming_packet: Packet) -> impl Future<Output = ()> + Send{
let handler_clone = handler.clone();
tokio::spawn(async move {
Expand Down
11 changes: 2 additions & 9 deletions src/tokio/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ use crate::packets::{Disconnect, Packet, PacketType};
use crate::{AsyncEventHandlerMut, StateHandler, NetworkStatus};

use super::{SequentialHandler, HandlerExt};
use super::stream::read_half::ReadStream;
use super::stream::write_half::WriteStream;
use super::stream::Stream;

// type StreamType = tokio::io::AsyncReadExt + tokio::io::AsyncWriteExt + Sized + Unpin + Send + 'static;
Expand Down Expand Up @@ -212,7 +210,6 @@ where
}
}


impl<N, H, S> Network<N, H, S>
where
S: tokio::io::AsyncReadExt + tokio::io::AsyncWriteExt + Sized + Unpin + Send + 'static,
Expand Down Expand Up @@ -262,21 +259,19 @@ where
}
}

#[cfg(feature = "tokio_concurrent")]
pub struct NetworkReader<N, H, S> {
pub(crate) run_signal: Arc<AtomicBool>,

pub(crate) handler_helper: PhantomData<N>,
pub handler: H,

pub(crate) read_stream: ReadStream<S>,
pub(crate) read_stream: super::stream::read_half::ReadStream<S>,
pub(crate) await_pingresp_atomic: Arc<AtomicBool>,
pub(crate) state_handler: Arc<StateHandler>,
pub(crate) to_writer_s: Sender<Packet>,
pub(crate) join_set: JoinSet<Result<(), ConnectionError>>,
}

#[cfg(feature = "tokio_concurrent")]
impl<N, H, S> NetworkReader<N, H, S>
where
N: HandlerExt<H>,
Expand Down Expand Up @@ -347,11 +342,10 @@ where
}
}

#[cfg(feature = "tokio_concurrent")]
pub struct NetworkWriter<S> {
run_signal: Arc<AtomicBool>,

write_stream: WriteStream<S>,
write_stream: super::stream::write_half::WriteStream<S>,

keep_alive_interval: Duration,

Expand All @@ -366,7 +360,6 @@ pub struct NetworkWriter<S> {
to_network_r: Receiver<Packet>,
}

#[cfg(feature = "tokio_concurrent")]
impl<S> NetworkWriter<S>
where
S: tokio::io::AsyncWriteExt + Sized + Unpin,
Expand Down

0 comments on commit 62ee5ac

Please sign in to comment.