Skip to content

Commit

Permalink
Unsub/disconnect on drop, separate out examples
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Feb 16, 2022
1 parent 150cec5 commit b25ba08
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 53 deletions.
31 changes: 31 additions & 0 deletions rumqttc/examples/new_pub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use tokio::{task, time};

use rumqttc::{self, MqttOptions, QoS, ReqHandler};
use std::error::Error;
use std::time::Duration;

#[tokio::main(worker_threads = 1)]
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-pub", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut sub_handler) = ReqHandler::new(mqttoptions, 10);
task::spawn(async move {
sub_handler.start().await.unwrap();
});

let mut publisher = client.publisher("hello/world");

publisher.qos = QoS::ExactlyOnce;
publisher.retain = false;
for i in 1..=10 {
publisher.publish(vec![1; i]).await.unwrap();

time::sleep(Duration::from_secs(1)).await;
}

Ok(())
}
47 changes: 0 additions & 47 deletions rumqttc/examples/new_pubsub.rs

This file was deleted.

29 changes: 29 additions & 0 deletions rumqttc/examples/new_sub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use tokio::task;

use rumqttc::{self, MqttOptions, QoS, ReqHandler};
use std::error::Error;
use std::time::Duration;

#[tokio::main(worker_threads = 1)]
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-sub", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut sub_handler) = ReqHandler::new(mqttoptions, 10);
task::spawn(async move {
sub_handler.start().await.unwrap();
});

let mut subscriber = client.subscriber("hello/world", QoS::AtMostOnce).await?;

loop {
let publish = subscriber.next().await?;
println!("{:?}", publish);
if publish.topic.contains("10") {
subscriber.unsubscribe().await?;
}
}
}
4 changes: 2 additions & 2 deletions rumqttc/src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,5 @@ impl Network {
}
}

pub trait N: AsyncRead + AsyncWrite + Send + Unpin {}
impl<T> N for T where T: AsyncRead + AsyncWrite + Send + Unpin {}
pub trait N: AsyncRead + AsyncWrite + Send + Unpin + Sync {}
impl<T> N for T where T: AsyncRead + AsyncWrite + Send + Unpin + Sync {}
68 changes: 64 additions & 4 deletions rumqttc/src/new.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ use std::collections::HashMap;

use async_channel::{bounded, Receiver, RecvError, SendError, Sender};
use mqttbytes::{
v4::{Packet, Publish},
v4::{Packet, Publish, Unsubscribe},
QoS,
};
use tokio::select;

use crate::{AsyncClient, ClientError, ConnectionError, Event, EventLoop, MqttOptions};
use crate::{
AsyncClient, ClientError, ConnectionError, Event, EventLoop, MqttOptions, Outgoing, Request,
};

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand All @@ -19,28 +21,43 @@ pub enum Error {
Recv(#[from] RecvError),
#[error("Failed to send subscription request")]
Subscribe(#[from] SendError<Subscription>),
#[error("Failed to send unsubscribe request")]
Unsubscribe(#[from] SendError<String>),
#[error("Failed to send publish msg")]
Publish(#[from] SendError<Publish>),
#[error("Failed to send request")]
Request(#[from] SendError<Request>),
}

type Subscription = (String, Sender<Publish>);

pub struct ReqHandler {
client: AsyncClient,
sub_tx: Sender<Subscription>,
unsub_tx: Sender<String>,
shutdown_tx: Sender<()>,
}

impl ReqHandler {
pub fn new(options: MqttOptions, cap: usize) -> (ReqHandler, SubHandler) {
let (client, eventloop) = AsyncClient::new(options, cap);
let (sub_tx, sub_rx) = bounded(10);
let (unsub_tx, unsub_rx) = bounded(10);
let (shutdown_tx, shutdown_rx) = bounded(1);

(
ReqHandler { client, sub_tx },
ReqHandler {
client,
sub_tx,
unsub_tx,
shutdown_tx,
},
SubHandler {
eventloop,
sub_rx,
subscribers: HashMap::new(),
unsub_rx,
shutdown_rx,
},
)
}
Expand All @@ -56,7 +73,11 @@ impl ReqHandler {
let (pub_tx, pub_rx) = bounded(10);
self.sub_tx.send((topic.clone(), pub_tx)).await?;

Ok(Subscriber { pub_rx })
Ok(Subscriber {
topic,
pub_rx,
unsub_tx: self.unsub_tx.clone(),
})
}

pub fn publisher<S: Into<String>>(&self, topic: S) -> Publisher {
Expand All @@ -69,10 +90,18 @@ impl ReqHandler {
}
}

impl Drop for ReqHandler {
fn drop(&mut self) {
self.shutdown_tx.try_send(()).unwrap();
}
}

pub struct SubHandler {
eventloop: EventLoop,
sub_rx: Receiver<Subscription>,
subscribers: HashMap<String, Sender<Publish>>,
unsub_rx: Receiver<String>,
shutdown_rx: Receiver<()>,
}

impl SubHandler {
Expand All @@ -92,19 +121,48 @@ impl SubHandler {
let (topic, pub_tx) = s?;
self.subscribers.insert(topic, pub_tx);
}

u = self.unsub_rx.recv() => {
let topic = u?;
self.subscribers.remove(&topic);
let req = Request::Unsubscribe(Unsubscribe::new(topic));
self.eventloop.handle().send(req).await?;
}

_ = self.shutdown_rx.recv() => {
self.disconnect().await?;
return Ok(())
}
}
}
}

async fn disconnect(&mut self) -> Result<(), Error> {
self.eventloop.handle().send(Request::Disconnect).await?;

loop {
if let Event::Outgoing(Outgoing::Disconnect) = self.eventloop.poll().await? {
return Ok(());
}
}
}
}

pub struct Subscriber {
topic: String,
pub_rx: Receiver<Publish>,
unsub_tx: Sender<String>,
}

impl Subscriber {
pub async fn next(&self) -> Result<Publish, Error> {
Ok(self.pub_rx.recv().await?)
}

pub async fn unsubscribe(&mut self) -> Result<(), Error> {
self.unsub_tx.send(self.topic.clone()).await?;
Ok(())
}
}

pub struct Publisher {
Expand All @@ -119,12 +177,14 @@ impl Publisher {
self.client
.publish(&self.topic, self.qos, self.retain, payload)
.await?;

Ok(())
}

pub fn try_publish<V: Into<Vec<u8>>>(&self, payload: V) -> Result<(), Error> {
self.client
.try_publish(&self.topic, self.qos, self.retain, payload)?;

Ok(())
}
}

0 comments on commit b25ba08

Please sign in to comment.