Skip to content

Commit

Permalink
Create new abstractions facilitating pub/sub
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Feb 15, 2022
1 parent e0ee2ae commit 150cec5
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 0 deletions.
47 changes: 47 additions & 0 deletions rumqttc/examples/new_pubsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use tokio::{task, time};

use rumqttc::{self, ReqHandler, MqttOptions, QoS, Publisher};
use std::error::Error;
use std::time::Duration;

#[tokio::main(worker_threads = 1)]
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut sub_handler) = ReqHandler::new(mqttoptions, 10);
task::spawn(async move {
sub_handler.start().await.unwrap();
});

let subscriber = client.subscriber("hello/world", QoS::AtMostOnce).await.unwrap();
let publisher = client.publisher("hello/world");

task::spawn(async move {
requests(publisher).await;
time::sleep(Duration::from_secs(3)).await;
});

loop {
let publish = subscriber.next().await;
println!("{:?}", publish.unwrap());
}
}

async fn requests(mut publisher: Publisher) {
publisher.qos = QoS::ExactlyOnce;
publisher.retain = false;
for i in 1..=10 {
publisher
.publish(vec![1; i])
.await
.unwrap();

time::sleep(Duration::from_secs(1)).await;
}

time::sleep(Duration::from_secs(120)).await;
}
2 changes: 2 additions & 0 deletions rumqttc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ use std::time::Duration;
mod client;
mod eventloop;
mod framed;
mod new;
mod state;
mod tls;

Expand All @@ -114,6 +115,7 @@ pub use client::{AsyncClient, Client, ClientError, Connection};
pub use eventloop::{ConnectionError, Event, EventLoop};
pub use mqttbytes::v4::*;
pub use mqttbytes::*;
pub use new::{ReqHandler, Publisher, Subscriber};
pub use state::{MqttState, StateError};
pub use tokio_rustls::rustls::ClientConfig;
pub use tls::Error;
Expand Down
130 changes: 130 additions & 0 deletions rumqttc/src/new.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use std::collections::HashMap;

use async_channel::{bounded, Receiver, RecvError, SendError, Sender};
use mqttbytes::{
v4::{Packet, Publish},
QoS,
};
use tokio::select;

use crate::{AsyncClient, ClientError, ConnectionError, Event, EventLoop, MqttOptions};

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Failed to handle request to Client")]
Client(#[from] ClientError),
#[error("Failed to handle request to Eventloop")]
Connection(#[from] ConnectionError),
#[error("Failed to recv")]
Recv(#[from] RecvError),
#[error("Failed to send subscription request")]
Subscribe(#[from] SendError<Subscription>),
#[error("Failed to send publish msg")]
Publish(#[from] SendError<Publish>),
}

type Subscription = (String, Sender<Publish>);

pub struct ReqHandler {
client: AsyncClient,
sub_tx: Sender<Subscription>,
}

impl ReqHandler {
pub fn new(options: MqttOptions, cap: usize) -> (ReqHandler, SubHandler) {
let (client, eventloop) = AsyncClient::new(options, cap);
let (sub_tx, sub_rx) = bounded(10);

(
ReqHandler { client, sub_tx },
SubHandler {
eventloop,
sub_rx,
subscribers: HashMap::new(),
},
)
}

pub async fn subscriber<S: Into<String>>(
&self,
topic: S,
qos: QoS,
) -> Result<Subscriber, Error> {
let topic = topic.into();
self.client.subscribe(topic.clone(), qos).await?;

let (pub_tx, pub_rx) = bounded(10);
self.sub_tx.send((topic.clone(), pub_tx)).await?;

Ok(Subscriber { pub_rx })
}

pub fn publisher<S: Into<String>>(&self, topic: S) -> Publisher {
Publisher {
topic: topic.into(),
client: self.client.clone(),
qos: QoS::AtLeastOnce,
retain: false,
}
}
}

pub struct SubHandler {
eventloop: EventLoop,
sub_rx: Receiver<Subscription>,
subscribers: HashMap<String, Sender<Publish>>,
}

impl SubHandler {
// Start a loop to handle subscriptions
pub async fn start(&mut self) -> Result<(), Error> {
loop {
select! {
e = self.eventloop.poll() => {
if let Event::Incoming(Packet::Publish(publish)) = e? {
if let Some(pub_tx) = self.subscribers.get(&publish.topic) {
pub_tx.send(publish).await?;
}
}
}

s = self.sub_rx.recv() => {
let (topic, pub_tx) = s?;
self.subscribers.insert(topic, pub_tx);
}
}
}
}
}

pub struct Subscriber {
pub_rx: Receiver<Publish>,
}

impl Subscriber {
pub async fn next(&self) -> Result<Publish, Error> {
Ok(self.pub_rx.recv().await?)
}
}

pub struct Publisher {
topic: String,
client: AsyncClient,
pub qos: QoS,
pub retain: bool,
}

impl Publisher {
pub async fn publish<V: Into<Vec<u8>>>(&self, payload: V) -> Result<(), Error> {
self.client
.publish(&self.topic, self.qos, self.retain, payload)
.await?;
Ok(())
}

pub fn try_publish<V: Into<Vec<u8>>>(&self, payload: V) -> Result<(), Error> {
self.client
.try_publish(&self.topic, self.qos, self.retain, payload)?;
Ok(())
}
}

0 comments on commit 150cec5

Please sign in to comment.