diff --git a/rumqttc/examples/new_pub.rs b/rumqttc/examples/new_pub.rs new file mode 100644 index 000000000..b36c00634 --- /dev/null +++ b/rumqttc/examples/new_pub.rs @@ -0,0 +1,31 @@ +use tokio::{task, time}; + +use rumqttc::{self, MqttOptions, QoS, ReqHandler}; +use std::error::Error; +use std::time::Duration; + +#[tokio::main(worker_threads = 1)] +async fn main() -> Result<(), Box> { + pretty_env_logger::init(); + // color_backtrace::install(); + + let mut mqttoptions = MqttOptions::new("test-pub", "localhost", 1883); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + + let (client, mut sub_handler) = ReqHandler::new(mqttoptions, 10); + task::spawn(async move { + sub_handler.start().await.unwrap(); + }); + + let mut publisher = client.publisher("hello/world"); + + publisher.qos = QoS::ExactlyOnce; + publisher.retain = false; + for i in 1..=10 { + publisher.publish(vec![1; i]).await.unwrap(); + + time::sleep(Duration::from_secs(1)).await; + } + + Ok(()) +} diff --git a/rumqttc/examples/new_pubsub.rs b/rumqttc/examples/new_pubsub.rs deleted file mode 100644 index b978d57b3..000000000 --- a/rumqttc/examples/new_pubsub.rs +++ /dev/null @@ -1,47 +0,0 @@ -use tokio::{task, time}; - -use rumqttc::{self, ReqHandler, MqttOptions, QoS, Publisher}; -use std::error::Error; -use std::time::Duration; - -#[tokio::main(worker_threads = 1)] -async fn main() -> Result<(), Box> { - pretty_env_logger::init(); - // color_backtrace::install(); - - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); - mqttoptions.set_keep_alive(Duration::from_secs(5)); - - let (client, mut sub_handler) = ReqHandler::new(mqttoptions, 10); - task::spawn(async move { - sub_handler.start().await.unwrap(); - }); - - let subscriber = client.subscriber("hello/world", QoS::AtMostOnce).await.unwrap(); - let publisher = client.publisher("hello/world"); - - task::spawn(async move { - requests(publisher).await; - time::sleep(Duration::from_secs(3)).await; - }); - - loop { - let publish = subscriber.next().await; - println!("{:?}", publish.unwrap()); - } -} - -async fn requests(mut publisher: Publisher) { - publisher.qos = QoS::ExactlyOnce; - publisher.retain = false; - for i in 1..=10 { - publisher - .publish(vec![1; i]) - .await - .unwrap(); - - time::sleep(Duration::from_secs(1)).await; - } - - time::sleep(Duration::from_secs(120)).await; -} diff --git a/rumqttc/examples/new_sub.rs b/rumqttc/examples/new_sub.rs new file mode 100644 index 000000000..8e7bb8e97 --- /dev/null +++ b/rumqttc/examples/new_sub.rs @@ -0,0 +1,29 @@ +use tokio::task; + +use rumqttc::{self, MqttOptions, QoS, ReqHandler}; +use std::error::Error; +use std::time::Duration; + +#[tokio::main(worker_threads = 1)] +async fn main() -> Result<(), Box> { + pretty_env_logger::init(); + // color_backtrace::install(); + + let mut mqttoptions = MqttOptions::new("test-sub", "localhost", 1883); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + + let (client, mut sub_handler) = ReqHandler::new(mqttoptions, 10); + task::spawn(async move { + sub_handler.start().await.unwrap(); + }); + + let mut subscriber = client.subscriber("hello/world", QoS::AtMostOnce).await?; + + loop { + let publish = subscriber.next().await?; + println!("{:?}", publish); + if publish.topic.contains("10") { + subscriber.unsubscribe().await?; + } + } +} diff --git a/rumqttc/src/framed.rs b/rumqttc/src/framed.rs index 995234c70..c45f8d55a 100644 --- a/rumqttc/src/framed.rs +++ b/rumqttc/src/framed.rs @@ -118,5 +118,5 @@ impl Network { } } -pub trait N: AsyncRead + AsyncWrite + Send + Unpin {} -impl N for T where T: AsyncRead + AsyncWrite + Send + Unpin {} +pub trait N: AsyncRead + AsyncWrite + Send + Unpin + Sync {} +impl N for T where T: AsyncRead + AsyncWrite + Send + Unpin + Sync {} diff --git a/rumqttc/src/new.rs b/rumqttc/src/new.rs index 26aff7e4d..aa23f4b6f 100644 --- a/rumqttc/src/new.rs +++ b/rumqttc/src/new.rs @@ -2,12 +2,14 @@ use std::collections::HashMap; use async_channel::{bounded, Receiver, RecvError, SendError, Sender}; use mqttbytes::{ - v4::{Packet, Publish}, + v4::{Packet, Publish, Unsubscribe}, QoS, }; use tokio::select; -use crate::{AsyncClient, ClientError, ConnectionError, Event, EventLoop, MqttOptions}; +use crate::{ + AsyncClient, ClientError, ConnectionError, Event, EventLoop, MqttOptions, Outgoing, Request, +}; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -19,8 +21,12 @@ pub enum Error { Recv(#[from] RecvError), #[error("Failed to send subscription request")] Subscribe(#[from] SendError), + #[error("Failed to send unsubscribe request")] + Unsubscribe(#[from] SendError), #[error("Failed to send publish msg")] Publish(#[from] SendError), + #[error("Failed to send request")] + Request(#[from] SendError), } type Subscription = (String, Sender); @@ -28,19 +34,30 @@ type Subscription = (String, Sender); pub struct ReqHandler { client: AsyncClient, sub_tx: Sender, + unsub_tx: Sender, + shutdown_tx: Sender<()>, } impl ReqHandler { pub fn new(options: MqttOptions, cap: usize) -> (ReqHandler, SubHandler) { let (client, eventloop) = AsyncClient::new(options, cap); let (sub_tx, sub_rx) = bounded(10); + let (unsub_tx, unsub_rx) = bounded(10); + let (shutdown_tx, shutdown_rx) = bounded(1); ( - ReqHandler { client, sub_tx }, + ReqHandler { + client, + sub_tx, + unsub_tx, + shutdown_tx, + }, SubHandler { eventloop, sub_rx, subscribers: HashMap::new(), + unsub_rx, + shutdown_rx, }, ) } @@ -56,7 +73,11 @@ impl ReqHandler { let (pub_tx, pub_rx) = bounded(10); self.sub_tx.send((topic.clone(), pub_tx)).await?; - Ok(Subscriber { pub_rx }) + Ok(Subscriber { + topic, + pub_rx, + unsub_tx: self.unsub_tx.clone(), + }) } pub fn publisher>(&self, topic: S) -> Publisher { @@ -69,10 +90,18 @@ impl ReqHandler { } } +impl Drop for ReqHandler { + fn drop(&mut self) { + self.shutdown_tx.try_send(()).unwrap(); + } +} + pub struct SubHandler { eventloop: EventLoop, sub_rx: Receiver, subscribers: HashMap>, + unsub_rx: Receiver, + shutdown_rx: Receiver<()>, } impl SubHandler { @@ -92,19 +121,48 @@ impl SubHandler { let (topic, pub_tx) = s?; self.subscribers.insert(topic, pub_tx); } + + u = self.unsub_rx.recv() => { + let topic = u?; + self.subscribers.remove(&topic); + let req = Request::Unsubscribe(Unsubscribe::new(topic)); + self.eventloop.handle().send(req).await?; + } + + _ = self.shutdown_rx.recv() => { + self.disconnect().await?; + return Ok(()) + } + } + } + } + + async fn disconnect(&mut self) -> Result<(), Error> { + self.eventloop.handle().send(Request::Disconnect).await?; + + loop { + if let Event::Outgoing(Outgoing::Disconnect) = self.eventloop.poll().await? { + return Ok(()); } } } } pub struct Subscriber { + topic: String, pub_rx: Receiver, + unsub_tx: Sender, } impl Subscriber { pub async fn next(&self) -> Result { Ok(self.pub_rx.recv().await?) } + + pub async fn unsubscribe(&mut self) -> Result<(), Error> { + self.unsub_tx.send(self.topic.clone()).await?; + Ok(()) + } } pub struct Publisher { @@ -119,12 +177,14 @@ impl Publisher { self.client .publish(&self.topic, self.qos, self.retain, payload) .await?; + Ok(()) } pub fn try_publish>>(&self, payload: V) -> Result<(), Error> { self.client .try_publish(&self.topic, self.qos, self.retain, payload)?; + Ok(()) } }