Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track digital temp & humidity for area #88

Merged
merged 7 commits into from
Jul 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion microcontrollers/area_dht11_esp8266/area_dht11_esp8266.ino
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@


// WARNING! YOU MUST REPLACE THE EXAMPLE VALUE BELOW
const char* DEVICE_ID = "b1c8ae88-8622-415f-951e-27a21888fe19";
const char* DEVICE_ID = "ext_id_..._not_a_uuid";


// WIFI CONFIGURABLE CONSTANTS
Expand Down
4 changes: 3 additions & 1 deletion services/sensor_tracker/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion services/sensor_tracker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "sensor_tracker"
version = "0.2.3"
version = "0.3.0"
authors = ["Terkwood <metaterkhorn@gmail.com>"]
edition = "2018"

Expand Down
71 changes: 68 additions & 3 deletions services/sensor_tracker/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,41 @@ pub struct SensorMessage {
pub temp_c: Option<f64>,
pub ph: Option<f64>,
pub ph_mv: Option<f64>,
pub status: Option<String>,
pub humidity: Option<f64>,
pub heat_index_c: Option<f64>,
pub heat_index_f: Option<f64>,
}

/// `external_device_id` is usually reported as a
/// e.g. "28654597090000e4"
impl SensorMessage {
pub fn measurements(&self) -> Vec<Measurement> {
let mut v: Vec<Measurement> = vec![];
if let (Some(temp_f), Some(temp_c)) = (self.temp_f, self.temp_c) {
if let (
Some(humidity),
Some(status),
Some(temp_f),
Some(temp_c),
Some(heat_index_f),
Some(heat_index_c),
) = (
self.humidity,
&self.status,
self.temp_f,
self.temp_c,
self.heat_index_f,
self.heat_index_c,
) {
v.push(Measurement::DHT {
status: status.to_owned(),
humidity,
temp_f,
temp_c,
heat_index_f,
heat_index_c,
})
} else if let (Some(temp_f), Some(temp_c)) = (self.temp_f, self.temp_c) {
v.push(Measurement::Temp { temp_f, temp_c })
}

Expand All @@ -28,8 +55,23 @@ impl SensorMessage {

#[derive(Debug)]
pub enum Measurement {
Temp { temp_f: f64, temp_c: f64 },
PH { ph: f64, ph_mv: f64 },
Temp {
temp_f: f64,
temp_c: f64,
},
PH {
ph: f64,
ph_mv: f64,
},
/// Digital humidity and temp, e.g. DHT11 sensor
DHT {
status: String,
humidity: f64,
temp_f: f64,
temp_c: f64,
heat_index_f: f64,
heat_index_c: f64,
},
}

impl Measurement {
Expand All @@ -40,6 +82,14 @@ impl Measurement {
temp_c: _,
} => "temp".to_string(),
Measurement::PH { ph: _, ph_mv: _ } => "ph".to_string(),
Measurement::DHT {
status: _,
humidity: _,
temp_f: _,
temp_c: _,
heat_index_f: _,
heat_index_c: _,
} => "dht".to_string(),
}
}

Expand All @@ -52,6 +102,21 @@ impl Measurement {
Measurement::PH { ph, ph_mv } => {
vec![("ph", ph.to_string()), ("ph_mv", ph_mv.to_string())]
}
Measurement::DHT {
status,
humidity,
temp_f,
temp_c,
heat_index_f,
heat_index_c,
} => vec![
("status", status.to_string()),
("humidity", humidity.to_string()),
("temp_f", temp_f.to_string()),
("temp_c", temp_c.to_string()),
("heat_index_f", heat_index_f.to_string()),
("heat_index_c", heat_index_c.to_string()),
],
}
}
}
67 changes: 48 additions & 19 deletions services/sensor_tracker/src/predis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub fn update<'a, 'b>(
let mut delta_events: Vec<REvent> = vec![];

println!("Received redis {} update: {:?}", measure.name(), measure);

let ext_device_namespace = &redis_ctx.get_external_device_namespace(measure.name())?;
let device_id = internal_device_id(ext_device_id, ext_device_namespace).unwrap();

Expand All @@ -33,17 +34,24 @@ pub fn update<'a, 'b>(
// lookup associated tank
let sensor_hash_key = &format!("{}/sensors/{}/{}", rn, measure.name(), device_id).to_string();

let tank_and_update_count: Result<Vec<Option<u64>>, _> = redis_ctx.conn.hget(
let tank_and_area_and_update_count: Result<Vec<Option<u64>>, _> = redis_ctx.conn.hget(
sensor_hash_key,
vec!["tank", &format!("{}_update_count", measure.name())],
vec!["tank", "area", &format!("{}_update_count", measure.name())],
);

if let Ok(v) = tank_and_update_count {
let rando_update = match v.get(0).unwrap_or(&None) {
Some(tank_num) => update_tank_hash(redis_ctx, tank_num, &measure),
None => ensure_sensor_hash_exists(redis_ctx, sensor_hash_key, ext_device_id),
if let Ok(v) = tank_and_area_and_update_count {
// Tank associated with this sensor?
let revent = match (v.get(0).unwrap_or(&None), v.get(1).unwrap_or(&None)) {
(Some(tank_num), _) => {
update_container_hash(redis_ctx, Container::Tanks, tank_num, &measure)
}
(_, Some(area_num)) => {
update_container_hash(redis_ctx, Container::Areas, area_num, &measure)
}
(None, None) => ensure_sensor_hash_exists(redis_ctx, sensor_hash_key, ext_device_id),
};
if let Some(ev) = rando_update {

if let Some(ev) = revent {
delta_events.push(ev)
}

Expand All @@ -54,8 +62,9 @@ pub fn update<'a, 'b>(
redis_ctx,
sensor_hash_key,
measure,
v.get(1).unwrap_or(&None),
v.get(2).unwrap_or(&None),
);

if let Some(ev) = sensor_updated {
delta_events.push(ev)
}
Expand Down Expand Up @@ -84,19 +93,39 @@ fn update_sensor_set(
}
}

fn update_tank_hash(
enum Container {
Tanks,
Areas,
}

impl Container {
pub fn to_string(self) -> String {
match self {
Container::Tanks => "tanks".to_string(),
Container::Areas => "areas".to_string(),
}
}
}

fn update_container_hash(
redis_ctx: &RedisContext,
tank_num: &u64,
container: Container,
container_num: &u64,
measure: &model::Measurement,
) -> Option<REvent> {
// We found the tank associated with this
// sensor ID, so we should update that tank's
// We found the area associated with this
// sensor ID, so we should update that area's
// current reading.
let tank_key = format!("{}/tanks/{}", redis_ctx.namespace, tank_num);
let container_key = format!(
"{}/{}/{}",
redis_ctx.namespace,
container.to_string(),
container_num
);

let tank_measure_count: Result<Option<u32>, _> = redis_ctx
let container_measure_count: Result<Option<u32>, _> = redis_ctx
.conn
.hget(&tank_key, &format!("{}_update_count", measure.name()));
.hget(&container_key, &format!("{}_update_count", measure.name()));

let uc_name = format!("{}_update_count", measure.name());
let ut_name = format!("{}_update_time", measure.name());
Expand All @@ -105,7 +134,7 @@ fn update_tank_hash(

data.push((
&uc_name,
tank_measure_count
container_measure_count
.unwrap_or(None)
.map(|u| u + 1)
.unwrap_or(1)
Expand All @@ -114,20 +143,20 @@ fn update_tank_hash(

data.push((&ut_name, epoch_secs().to_string()));
(
redis_ctx.conn.hset_multiple(&tank_key, &data[..]),
redis_ctx.conn.hset_multiple(&container_key, &data[..]),
data.iter().map(|(a, _)| *a).collect(),
)
};

match update {
(Err(e), _) => {
println!("update fails for {}: {:?}", tank_key, e);
println!("update fails for {}: {:?}", container_key, e);
None
}
(Ok(_), fields) if fields.len() > 0 => {
let fs = fields.iter().map(|s| s.to_string()).collect();
Some(REvent::HashUpdated {
key: tank_key.to_string(),
key: container_key.to_string(),
fields: fs,
})
}
Expand Down