From 62ee5ac68c9e6d8bd7c4b3da35217b00e2a5b881 Mon Sep 17 00:00:00 2001 From: GunnarMorrigan <13799935+GunnarMorrigan@users.noreply.github.com> Date: Mon, 1 Jan 2024 20:51:06 +0100 Subject: [PATCH] fixed feature flags --- Cargo.toml | 40 +++++++++++++++++++++++++++++----------- src/lib.rs | 18 ++++++++---------- src/smol/stream.rs | 2 +- src/tokio/mod.rs | 12 +++++------- src/tokio/network.rs | 11 ++--------- 5 files changed, 45 insertions(+), 38 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1494a6e..3a3a529 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,15 +8,17 @@ categories = ["network-programming"] readme = "README.md" edition = "2021" license = "MPL-2.0" -keywords = [ "MQTT", "IoT", "MQTTv5", "messaging", "client" ] +keywords = ["MQTT", "IoT", "MQTTv5", "messaging", "client"] description = "Pure rust MQTTv5 client implementation Smol and Tokio" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["smol", "tokio", "tokio_concurrent"] -tokio_concurrent = ["dep:tokio", "tokio/rt"] -tokio = ["dep:tokio"] +default = [ + "smol", + "tokio", +] +tokio = ["dep:tokio", "tokio/rt"] smol = ["dep:smol"] logs = ["dep:tracing"] @@ -30,24 +32,40 @@ tracing = { version = "0.1.40", optional = true } async-channel = "2.1.1" #async-mutex = "1.4.0" -futures = { version = "0.3.30", default-features = false, features = ["std", "async-await"] } +futures = { version = "0.3.30", default-features = false, features = [ + "std", + "async-await", +] } # quic feature flag # quinn = {version = "0.9.0", optional = true } # tokio feature flag -tokio = { version = "1.35.1", features = ["macros", "io-util", "net", "time"], optional = true } +tokio = { version = "1.35.1", features = [ + "macros", + "io-util", + "net", + "time", +], optional = true } # smol feature flag smol = { version = "2.0.0", optional = true } [dev-dependencies] -criterion = {version="0.5.1", features=["async_tokio"]} +criterion = { version = "0.5.1", features = ["async_tokio"] } -tracing-subscriber = {version = "0.3.18", features = ["env-filter"]} +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } smol = { version = "2.0.0" } -tokio = { version = "1.33.0", features = ["rt-multi-thread", "rt", "macros", "sync", "io-util", "net", "time"] } +tokio = { version = "1.33.0", features = [ + "rt-multi-thread", + "rt", + "macros", + "sync", + "io-util", + "net", + "time", +] } rustls = { version = "0.21.7" } rustls-pemfile = { version = "1.0.3" } @@ -59,5 +77,5 @@ rand = "0.8.5" [[bench]] -name = "bench_main" -harness = false \ No newline at end of file +name = "bench_main" +harness = false diff --git a/src/lib.rs b/src/lib.rs index 2818d21..4bbf36e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -174,7 +174,7 @@ mod connect_options; mod mqtt_handler; mod util; -#[cfg(any(feature = "tokio", feature = "tokio_concurrent"))] +#[cfg(any(feature = "tokio"))] pub mod tokio; #[cfg(feature = "smol")] pub mod smol; @@ -190,7 +190,6 @@ pub use event_handlers::*; pub use client::MqttClient; pub use connect_options::ConnectOptions; use mqtt_handler::StateHandler; -use available_packet_ids::AvailablePacketIds; #[cfg(test)] pub mod tests; @@ -234,11 +233,12 @@ impl NetworkBuilder { } } +#[cfg(feature = "tokio")] impl NetworkBuilder where H: AsyncEventHandlerMut, S: ::tokio::io::AsyncReadExt + ::tokio::io::AsyncWriteExt + Sized + Unpin, -{ +{ /// Creates the needed components to run the MQTT client using a stream that implements [`tokio::io::AsyncReadExt`] and [`tokio::io::AsyncWriteExt`] /// This network is supposed to be ran on a single task/thread. The read and write operations happen one after the other. /// This approach does not give the most speed in terms of reading and writing but provides a simple and easy to use client with low overhead for low throughput clients. @@ -257,7 +257,7 @@ where pub fn tokio_sequential_network(self) -> (tokio::Network, MqttClient) where H: AsyncEventHandlerMut { let (to_network_s, to_network_r) = async_channel::bounded(CHANNEL_SIZE); - let (apkids, apkids_r) = AvailablePacketIds::new(self.options.send_maximum()); + let (apkids, apkids_r) = available_packet_ids::AvailablePacketIds::new(self.options.send_maximum()); let max_packet_size = self.options.maximum_packet_size(); @@ -270,12 +270,12 @@ where } +#[cfg(feature = "tokio")] impl NetworkBuilder where H: AsyncEventHandler, S: ::tokio::io::AsyncReadExt + ::tokio::io::AsyncWriteExt + Sized + Unpin, { - #[cfg(feature = "tokio_concurrent")] /// Creates the needed components to run the MQTT client using a stream that implements [`tokio::io::AsyncReadExt`] and [`tokio::io::AsyncWriteExt`] /// # Example /// @@ -290,7 +290,7 @@ where pub fn tokio_concurrent_network(self) -> (tokio::Network, MqttClient) { let (to_network_s, to_network_r) = async_channel::bounded(CHANNEL_SIZE); - let (apkids, apkids_r) = AvailablePacketIds::new(self.options.send_maximum()); + let (apkids, apkids_r) = available_packet_ids::AvailablePacketIds::new(self.options.send_maximum()); let max_packet_size = self.options.maximum_packet_size(); @@ -496,7 +496,7 @@ mod smol_lib_test { } } -#[cfg(feature = "tokio_concurrent")] +#[cfg(feature = "tokio")] #[cfg(test)] mod tokio_lib_test { use crate::example_handlers::PingPong; @@ -513,7 +513,7 @@ mod tokio_lib_test { ; - #[cfg(feature = "tokio_concurrent")] + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_tokio_tcp() { use std::hint::black_box; @@ -562,9 +562,7 @@ mod tokio_lib_test { let _ = black_box(read_result); } - - // #[cfg(feature = "tokio_concurrent")] // #[tokio::test] // async fn test_tokio_ping_req() { // let mut client_id: String = rand::thread_rng().sample_iter(&rand::distributions::Alphanumeric).take(7).map(char::from).collect(); diff --git a/src/smol/stream.rs b/src/smol/stream.rs index be8a72b..a24c1a6 100644 --- a/src/smol/stream.rs +++ b/src/smol/stream.rs @@ -11,7 +11,7 @@ use crate::packets::ConnAck; use crate::packets::{ error::ReadBytes, reason_codes::ConnAckReasonCode, - {FixedHeader, Packet, PacketType}, + {FixedHeader, Packet}, }; use crate::{connect_options::ConnectOptions, error::ConnectionError}; diff --git a/src/tokio/mod.rs b/src/tokio/mod.rs index 0520267..4e12ac2 100644 --- a/src/tokio/mod.rs +++ b/src/tokio/mod.rs @@ -4,11 +4,8 @@ pub(crate) mod network; use futures::Future; pub use network::Network; -pub use network::NetworkReader; -pub use network::NetworkWriter; +pub use network::{NetworkReader, NetworkWriter}; -use crate::AsyncEventHandler; -use crate::AsyncEventHandlerMut; use crate::error::ConnectionError; use crate::packets::Packet; @@ -23,7 +20,6 @@ pub struct SequentialHandler; /// This kind of handler is used for both concurrent message handling and concurrent TCP read and write operations. pub struct ConcurrentHandler; - trait HandlerExt: Sized{ /// Should call the handler in the fashion of the handler. /// (e.g. spawn a task if or await the handle call) @@ -45,7 +41,8 @@ trait HandlerExt: Sized{ ; } -impl HandlerExt for SequentialHandler { + +impl HandlerExt for SequentialHandler { #[inline] fn call_handler(handler: &mut H, incoming_packet: Packet) -> impl Future + Send{ handler.handle(incoming_packet) @@ -67,7 +64,8 @@ impl HandlerExt for SequentialHandler { } } } -impl HandlerExt for ConcurrentHandler { + +impl HandlerExt for ConcurrentHandler { fn call_handler(handler: &mut H, incoming_packet: Packet) -> impl Future + Send{ let handler_clone = handler.clone(); tokio::spawn(async move { diff --git a/src/tokio/network.rs b/src/tokio/network.rs index cc8d16f..e0d4e50 100644 --- a/src/tokio/network.rs +++ b/src/tokio/network.rs @@ -17,8 +17,6 @@ use crate::packets::{Disconnect, Packet, PacketType}; use crate::{AsyncEventHandlerMut, StateHandler, NetworkStatus}; use super::{SequentialHandler, HandlerExt}; -use super::stream::read_half::ReadStream; -use super::stream::write_half::WriteStream; use super::stream::Stream; // type StreamType = tokio::io::AsyncReadExt + tokio::io::AsyncWriteExt + Sized + Unpin + Send + 'static; @@ -212,7 +210,6 @@ where } } - impl Network where S: tokio::io::AsyncReadExt + tokio::io::AsyncWriteExt + Sized + Unpin + Send + 'static, @@ -262,21 +259,19 @@ where } } -#[cfg(feature = "tokio_concurrent")] pub struct NetworkReader { pub(crate) run_signal: Arc, pub(crate) handler_helper: PhantomData, pub handler: H, - pub(crate) read_stream: ReadStream, + pub(crate) read_stream: super::stream::read_half::ReadStream, pub(crate) await_pingresp_atomic: Arc, pub(crate) state_handler: Arc, pub(crate) to_writer_s: Sender, pub(crate) join_set: JoinSet>, } -#[cfg(feature = "tokio_concurrent")] impl NetworkReader where N: HandlerExt, @@ -347,11 +342,10 @@ where } } -#[cfg(feature = "tokio_concurrent")] pub struct NetworkWriter { run_signal: Arc, - write_stream: WriteStream, + write_stream: super::stream::write_half::WriteStream, keep_alive_interval: Duration, @@ -366,7 +360,6 @@ pub struct NetworkWriter { to_network_r: Receiver, } -#[cfg(feature = "tokio_concurrent")] impl NetworkWriter where S: tokio::io::AsyncWriteExt + Sized + Unpin,