Skip to content

Commit

Permalink
use on_event
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert-Steiner committed Mar 28, 2020
1 parent bd3e913 commit 10d543d
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 70 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ tokio = { version = "0.2.13", features = ["rt-core", "rt-threaded", "sync", "str
ruuvi-sensor-protocol = "0.4.1"
structopt = { version = "0.3", features = [ "paw" ] }
paw = "1.0"
colored = "1.9"

[[bin]]
name = "cli"
Expand Down
53 changes: 21 additions & 32 deletions src/bin/cli.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use btleplug::api::Central;
use btleplug::api::{BDAddr, ParseBDAddrError};
use ruuvitag_sensor_rs::ble::{collect, find_ruuvitags, get_central};
use ruuvitag_sensor_rs::influx::{run_influx_db, InfluxDBConnector};
use std::thread;
use tokio::runtime;

use btleplug::api::{BDAddr, Central, ParseBDAddrError};
use ruuvitag_sensor_rs::ble::{get_central, register_event_handler};
use ruuvitag_sensor_rs::controller::Controller;
use std::str::FromStr;

fn parse_address(address: &str) -> Result<BDAddr, ParseBDAddrError> {
Expand All @@ -22,13 +18,6 @@ enum Args {
parse(try_from_str = parse_address)
)]
ruuvitags_macs: Vec<BDAddr>,
#[structopt(
short = "s",
long = "rate",
default_value = "5",
help = "Scanning rate in seconds."
)]
scanning_rate: u16,
#[structopt(
long = "influxdb_url",
default_value = "http://localhost:8086",
Expand All @@ -43,45 +32,45 @@ enum Args {
influxdb_db_name: String,
#[structopt(
long = "influxdb_measurement_name",
default_value = "ruuvi",
default_value = "ruuvi_measurements",
help = "Name of the measurement."
)]
influxdb_measurement_name: String,
},
Find {},
Show {},
}

#[paw::main]
fn main(args: Args) -> Result<(), std::io::Error> {
let (controller, event_tx) = Controller::new();

let central = get_central();
central.active(false);
central.start_scan().unwrap();
register_event_handler(event_tx, &central);

match args {
Args::Collect {
ruuvitags_macs,
scanning_rate,
influxdb_url,
influxdb_db_name,
influxdb_measurement_name,
} => {
let (influx_client, sender) = InfluxDBConnector::new(&influxdb_url, &influxdb_db_name);

thread::spawn(|| {
let mut rt = runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.unwrap();
let _ = rt.block_on(async move {
run_influx_db(influx_client, &influxdb_measurement_name).await
});
});

collect(&central, sender, &ruuvitags_macs, scanning_rate);
central.start_scan().unwrap();
controller.collect(
&ruuvitags_macs,
&influxdb_url,
&influxdb_db_name,
&influxdb_measurement_name,
);
}
Args::Find {} => {
find_ruuvitags(&central);
central.start_scan().unwrap();
controller.find();
}
Args::Show {} => {
central.start_scan().unwrap();
controller.show();
}
}
Ok(())
Expand Down
74 changes: 38 additions & 36 deletions src/ble.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use btleplug::api::{BDAddr, Central, CentralEvent::DeviceDiscovered, Peripheral};
use btleplug::bluez::{adapter::ConnectedAdapter, manager::Manager};
use tokio::sync::mpsc::UnboundedSender;

use crate::ruuvitag::{from_manufacturer_data, is_ruuvitag, RuuviTag};

use std::thread;
use std::time::Duration;
use btleplug::api::{
Central,
CentralEvent::{DeviceDiscovered, DeviceUpdated},
Peripheral,
};
use btleplug::bluez::{adapter::ConnectedAdapter, manager::Manager};
use std::sync::mpsc::Sender;

pub fn get_central() -> ConnectedAdapter {
let manager = Manager::new().unwrap();
Expand All @@ -19,48 +19,50 @@ pub fn get_central() -> ConnectedAdapter {
adapter.connect().unwrap()
}

pub fn collect(
central: &ConnectedAdapter,
sender: UnboundedSender<RuuviTag>,
ruuvitags_macs: &Vec<BDAddr>,
scanning_rate: u16,
) {
loop {
thread::sleep(Duration::from_secs(scanning_rate.into()));
for mac in ruuvitags_macs.iter() {
if let Some(peripheral) = central.peripheral(*mac) {
pub enum Event {
DeviceDiscovered(RuuviTag),
DeviceUpdated(RuuviTag),
}

pub fn register_event_handler(event_sender: Sender<Event>, central: &ConnectedAdapter) {
let central_clone = central.clone();
central.on_event(Box::new(move |event| match event {
DeviceDiscovered(bd_addr) => {
if let Some(peripheral) = central_clone.peripheral(bd_addr) {
if let Some(manufacturer_data) = peripheral.properties().manufacturer_data {
let sensor_values = from_manufacturer_data(&manufacturer_data);
match sensor_values {
Ok(data) => {
let _ = sender.send(RuuviTag {
mac: peripheral.properties().address.to_string(),
sensor_values: data,
});
if is_ruuvitag(&manufacturer_data) {
let sensor_values = from_manufacturer_data(&manufacturer_data);
match sensor_values {
Ok(data) => {
let _ = event_sender.send(Event::DeviceDiscovered(RuuviTag {
mac: peripheral.properties().address,
sensor_values: data,
}));
}
Err(_) => eprintln!("Error DeviceDiscovered"),
}
Err(_) => eprintln!("Parse error!"),
}
}
} else {
eprintln!("RuuviTag not found {}!", mac)
}
}
}
}

pub fn find_ruuvitags(central: &ConnectedAdapter) {
let central_clone = central.clone();
central.on_event(Box::new(move |event| match event {
DeviceDiscovered(bd_addr) => {
DeviceUpdated(bd_addr) => {
if let Some(peripheral) = central_clone.peripheral(bd_addr) {
if let Some(manufacturer_data) = peripheral.properties().manufacturer_data {
if is_ruuvitag(&manufacturer_data) {
println!("Found RuuviTag: {}", peripheral.properties().address)
let sensor_values = from_manufacturer_data(&manufacturer_data);
match sensor_values {
Ok(data) => {
let _ = event_sender.send(Event::DeviceUpdated(RuuviTag {
mac: peripheral.properties().address,
sensor_values: data,
}));
}
Err(_) => eprintln!("Error DeviceUpdated"),
}
}
}
}
}
_ => (),
}));
thread::sleep(Duration::from_secs(15));
}
70 changes: 70 additions & 0 deletions src/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use crate::ble::Event;
use crate::ble::Event::{DeviceDiscovered, DeviceUpdated};
use crate::influx::{run_influx_db, InfluxDBConnector};
use btleplug::api::BDAddr;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::thread;
use tokio::runtime;

pub struct Controller {
receiver: Receiver<Event>,
}

impl Controller {
pub fn new() -> (Controller, Sender<Event>) {
let (sender, receiver) = mpsc::channel();
(Controller { receiver }, sender)
}

pub fn collect(
self,
ruuvitags_macs: &Vec<BDAddr>,
influxdb_url: &str,
influxdb_db_name: &str,
influxdb_measurement_name: &str,
) {
let (influx_client, sender) = InfluxDBConnector::new(influxdb_url, influxdb_db_name);
let measurement_name = influxdb_measurement_name.to_owned();
thread::spawn(move || {
let mut rt = runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.unwrap();
let _ = rt
.block_on(async move { run_influx_db(influx_client, &measurement_name[..]).await });
});

loop {
let event = self.receiver.recv().unwrap();
match event {
DeviceUpdated(tag) => {
if ruuvitags_macs.contains(&tag.mac) {
let _ = sender.send(tag);
}
}
_ => (),
}
}
}

pub fn find(self) {
loop {
let event = self.receiver.recv().unwrap();
match event {
DeviceDiscovered(tag) => println!("Found RuuviTag: {}", tag.mac),
_ => (),
};
}
}

pub fn show(self) {
loop {
let event = self.receiver.recv().unwrap();
match event {
DeviceDiscovered(tag) | DeviceUpdated(tag) => println!("{}", tag),
};
}
}
}
2 changes: 1 addition & 1 deletion src/influx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl From<RuuviTag> for RuuviTagMeasurement {
/ 1000_f64,
movementCounter: tag.sensor_values.movement_counter().unwrap(),
measurementSequenceNumber: tag.sensor_values.measurement_sequence_number().unwrap(),
mac: tag.mac,
mac: tag.mac.to_string(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod ble;
pub mod controller;
pub mod influx;
pub mod ruuvitag;
51 changes: 50 additions & 1 deletion src/ruuvitag.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,60 @@
use btleplug::api::BDAddr;
use colored::*;
use ruuvi_sensor_protocol::{
Acceleration, AccelerationVector, BatteryPotential, Humidity, MeasurementSequenceNumber,
MovementCounter, Pressure, Temperature,
};
use ruuvi_sensor_protocol::{ParseError, SensorValues};
use std::fmt;

#[derive(Debug)]
pub struct RuuviTag {
pub mac: String,
pub mac: BDAddr,
pub sensor_values: SensorValues,
}

impl fmt::Display for RuuviTag {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let AccelerationVector(acc_x, acc_y, acc_z) =
self.sensor_values.acceleration_vector_as_milli_g().unwrap();

let temperature = (self.sensor_values.temperature_as_millicelsius().unwrap() as f64
/ 1000_f64)
.to_string();
let humidity =
(self.sensor_values.humidity_as_ppm().unwrap() as f64 / 10000_f64).to_string();
let pressure = ((self.sensor_values.pressure_as_pascals().unwrap() as f64 / 100_f64)
as u32)
.to_string();
let acceleration_x = acc_x.to_string();
let acceleration_y = acc_y.to_string();
let acceleration_z = acc_z.to_string();
let battery_voltage = (self
.sensor_values
.battery_potential_as_millivolts()
.unwrap() as f64
/ 1000_f64)
.to_string();
let movement_counter = (self.sensor_values.movement_counter().unwrap()).to_string();
let measurement_sequence_number =
(self.sensor_values.measurement_sequence_number().unwrap()).to_string();
let mac = self.mac.to_string();

write!(f, "{} : {:>5} °C | {:>7} RH-% | {:>4} Pa | ACC-X {:>5} G | ACC-Y {:>5} G | ACC-Z {:>5} G | {:>5} mV | movement {:>3} | seq# {:>5}",
mac.bold(),
temperature.blue(),
humidity.yellow(),
pressure.green(),
acceleration_x,
acceleration_y,
acceleration_z,
battery_voltage.cyan(),
movement_counter.red(),
measurement_sequence_number.magenta()
)
}
}

pub fn is_ruuvitag(data: &[u8]) -> bool {
let manufacturer_id = u16::from(data[0]) + (u16::from(data[1]) << 8);
manufacturer_id == 0x0499
Expand Down

0 comments on commit 10d543d

Please sign in to comment.