From 5466fd42cbd432fc6f949640ee421da6b4ab55f9 Mon Sep 17 00:00:00 2001 From: Terkwood Date: Wed, 3 Jul 2019 18:28:33 -0400 Subject: [PATCH 1/7] model DHT sensor and measurements --- services/sensor_tracker/Cargo.lock | 4 +- services/sensor_tracker/Cargo.toml | 2 +- services/sensor_tracker/src/model.rs | 71 ++++++++++++++++++++++++++-- 3 files changed, 72 insertions(+), 5 deletions(-) diff --git a/services/sensor_tracker/Cargo.lock b/services/sensor_tracker/Cargo.lock index d52b22bf..2b6df651 100644 --- a/services/sensor_tracker/Cargo.lock +++ b/services/sensor_tracker/Cargo.lock @@ -1,3 +1,5 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. [[package]] name = "aho-corasick" version = "0.5.3" @@ -512,7 +514,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "sensor_tracker" -version = "0.2.3" +version = "0.3.0" dependencies = [ "dotenv 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", "envy 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/services/sensor_tracker/Cargo.toml b/services/sensor_tracker/Cargo.toml index 48dfc55a..320a2035 100644 --- a/services/sensor_tracker/Cargo.toml +++ b/services/sensor_tracker/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sensor_tracker" -version = "0.2.3" +version = "0.3.0" authors = ["Terkwood "] edition = "2018" diff --git a/services/sensor_tracker/src/model.rs b/services/sensor_tracker/src/model.rs index 9fecf41c..3628ce5d 100644 --- a/services/sensor_tracker/src/model.rs +++ b/services/sensor_tracker/src/model.rs @@ -7,6 +7,10 @@ pub struct SensorMessage { pub temp_c: Option, pub ph: Option, pub ph_mv: Option, + pub status: Option, + pub humidity: Option, + pub heat_index_c: Option, + pub heat_index_f: Option, } /// `external_device_id` is usually reported as a @@ -14,7 +18,30 @@ pub struct SensorMessage { impl SensorMessage { pub fn measurements(&self) -> Vec { let mut v: Vec = vec![]; - if let (Some(temp_f), Some(temp_c)) = (self.temp_f, self.temp_c) { + if let ( + Some(humidity), + Some(status), + Some(temp_f), + Some(temp_c), + Some(heat_index_f), + Some(heat_index_c), + ) = ( + self.humidity, + &self.status, + self.temp_f, + self.temp_c, + self.heat_index_f, + self.heat_index_c, + ) { + v.push(Measurement::DHT { + status: status.to_owned(), + humidity, + temp_f, + temp_c, + heat_index_f, + heat_index_c, + }) + } else if let (Some(temp_f), Some(temp_c)) = (self.temp_f, self.temp_c) { v.push(Measurement::Temp { temp_f, temp_c }) } @@ -28,8 +55,23 @@ impl SensorMessage { #[derive(Debug)] pub enum Measurement { - Temp { temp_f: f64, temp_c: f64 }, - PH { ph: f64, ph_mv: f64 }, + Temp { + temp_f: f64, + temp_c: f64, + }, + PH { + ph: f64, + ph_mv: f64, + }, + /// Digital humidity and temp, e.g. DHT11 sensor + DHT { + status: String, + humidity: f64, + temp_f: f64, + temp_c: f64, + heat_index_f: f64, + heat_index_c: f64, + }, } impl Measurement { @@ -40,6 +82,14 @@ impl Measurement { temp_c: _, } => "temp".to_string(), Measurement::PH { ph: _, ph_mv: _ } => "ph".to_string(), + Measurement::DHT { + status: _, + humidity: _, + temp_f: _, + temp_c: _, + heat_index_f: _, + heat_index_c: _, + } => "dht".to_string(), } } @@ -52,6 +102,21 @@ impl Measurement { Measurement::PH { ph, ph_mv } => { vec![("ph", ph.to_string()), ("ph_mv", ph_mv.to_string())] } + Measurement::DHT { + status, + humidity, + temp_f, + temp_c, + heat_index_f, + heat_index_c, + } => vec![ + ("status", status.to_string()), + ("humidity", humidity.to_string()), + ("temp_f", temp_f.to_string()), + ("temp_c", temp_c.to_string()), + ("heat_index_f", heat_index_f.to_string()), + ("heat_index_c", heat_index_c.to_string()), + ], } } } From bcb65c8a83addc1dd32f4c9ab6c4c1a51173197e Mon Sep 17 00:00:00 2001 From: Terkwood Date: Wed, 3 Jul 2019 18:40:08 -0400 Subject: [PATCH 2/7] add area logic --- services/sensor_tracker/src/predis.rs | 71 ++++++++++++++++++++++++--- 1 file changed, 63 insertions(+), 8 deletions(-) diff --git a/services/sensor_tracker/src/predis.rs b/services/sensor_tracker/src/predis.rs index efa29d94..4e2031f8 100644 --- a/services/sensor_tracker/src/predis.rs +++ b/services/sensor_tracker/src/predis.rs @@ -33,17 +33,20 @@ pub fn update<'a, 'b>( // lookup associated tank let sensor_hash_key = &format!("{}/sensors/{}/{}", rn, measure.name(), device_id).to_string(); - let tank_and_update_count: Result>, _> = redis_ctx.conn.hget( + let tank_and_area_and_update_count: Result>, _> = redis_ctx.conn.hget( sensor_hash_key, - vec!["tank", &format!("{}_update_count", measure.name())], + vec!["tank", "area", &format!("{}_update_count", measure.name())], ); - if let Ok(v) = tank_and_update_count { - let rando_update = match v.get(0).unwrap_or(&None) { - Some(tank_num) => update_tank_hash(redis_ctx, tank_num, &measure), - None => ensure_sensor_hash_exists(redis_ctx, sensor_hash_key, ext_device_id), + if let Ok(v) = tank_and_area_and_update_count { + // Tank associated with this sensor? + let revent = match (v.get(0).unwrap_or(&None), v.get(1).unwrap_or(&None)) { + (Some(tank_num), _) => update_tank_hash(redis_ctx, tank_num, &measure), + (_, Some(area_num)) => update_area_hash(redis_ctx, area_num, &measure), + (None, None) => ensure_sensor_hash_exists(redis_ctx, sensor_hash_key, ext_device_id), }; - if let Some(ev) = rando_update { + + if let Some(ev) = revent { delta_events.push(ev) } @@ -54,8 +57,9 @@ pub fn update<'a, 'b>( redis_ctx, sensor_hash_key, measure, - v.get(1).unwrap_or(&None), + v.get(2).unwrap_or(&None), ); + if let Some(ev) = sensor_updated { delta_events.push(ev) } @@ -84,6 +88,57 @@ fn update_sensor_set( } } +fn update_area_hash( + redis_ctx: &RedisContext, + area_num: &u64, + measure: &model::Measurement, +) -> Option { + // We found the area associated with this + // sensor ID, so we should update that area's + // current reading. + let area_key = format!("{}/area/{}", redis_ctx.namespace, area_num); + + let area_measure_count: Result, _> = redis_ctx + .conn + .hget(&area_key, &format!("{}_update_count", measure.name())); + + let uc_name = format!("{}_update_count", measure.name()); + let ut_name = format!("{}_update_time", measure.name()); + let update: (Result, Vec<&str>) = { + let mut data: Vec<(&str, String)> = measure.to_redis(); + + data.push(( + &uc_name, + area_measure_count + .unwrap_or(None) + .map(|u| u + 1) + .unwrap_or(1) + .to_string(), + )); + + data.push((&ut_name, epoch_secs().to_string())); + ( + redis_ctx.conn.hset_multiple(&area_key, &data[..]), + data.iter().map(|(a, _)| *a).collect(), + ) + }; + + match update { + (Err(e), _) => { + println!("update fails for {}: {:?}", area_key, e); + None + } + (Ok(_), fields) if fields.len() > 0 => { + let fs = fields.iter().map(|s| s.to_string()).collect(); + Some(REvent::HashUpdated { + key: area_key.to_string(), + fields: fs, + }) + } + _ => None, + } +} + fn update_tank_hash( redis_ctx: &RedisContext, tank_num: &u64, From 90f6a77651d8582610a31f8be47187a05eaf5009 Mon Sep 17 00:00:00 2001 From: Terkwood Date: Wed, 3 Jul 2019 18:48:06 -0400 Subject: [PATCH 3/7] use given device id for DHTs --- services/sensor_tracker/src/predis.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/services/sensor_tracker/src/predis.rs b/services/sensor_tracker/src/predis.rs index 4e2031f8..5aa20e62 100644 --- a/services/sensor_tracker/src/predis.rs +++ b/services/sensor_tracker/src/predis.rs @@ -19,8 +19,20 @@ pub fn update<'a, 'b>( let mut delta_events: Vec = vec![]; println!("Received redis {} update: {:?}", measure.name(), measure); - let ext_device_namespace = &redis_ctx.get_external_device_namespace(measure.name())?; - let device_id = internal_device_id(ext_device_id, ext_device_namespace).unwrap(); + let device_id = match &measure { + &model::Measurement::DHT { + status: _, + humidity: _, + temp_f: _, + temp_c: _, + heat_index_f: _, + heat_index_c: _, + } => Uuid::parse_str(ext_device_id).unwrap(), + _ => { + let ext_device_namespace = &redis_ctx.get_external_device_namespace(measure.name())?; + internal_device_id(ext_device_id, ext_device_namespace).unwrap() + } + }; println!("\tDevice ID (internal): {}", device_id); let rn = &redis_ctx.namespace; From eb8e414276c7e79b2129385cd473c16f92482e7f Mon Sep 17 00:00:00 2001 From: Terkwood Date: Wed, 3 Jul 2019 19:13:07 -0400 Subject: [PATCH 4/7] use internal device id for DHT sensor --- services/sensor_tracker/src/predis.rs | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/services/sensor_tracker/src/predis.rs b/services/sensor_tracker/src/predis.rs index 5aa20e62..deed69fe 100644 --- a/services/sensor_tracker/src/predis.rs +++ b/services/sensor_tracker/src/predis.rs @@ -19,20 +19,9 @@ pub fn update<'a, 'b>( let mut delta_events: Vec = vec![]; println!("Received redis {} update: {:?}", measure.name(), measure); - let device_id = match &measure { - &model::Measurement::DHT { - status: _, - humidity: _, - temp_f: _, - temp_c: _, - heat_index_f: _, - heat_index_c: _, - } => Uuid::parse_str(ext_device_id).unwrap(), - _ => { - let ext_device_namespace = &redis_ctx.get_external_device_namespace(measure.name())?; - internal_device_id(ext_device_id, ext_device_namespace).unwrap() - } - }; + + let ext_device_namespace = &redis_ctx.get_external_device_namespace(measure.name())?; + let device_id = internal_device_id(ext_device_id, ext_device_namespace).unwrap(); println!("\tDevice ID (internal): {}", device_id); let rn = &redis_ctx.namespace; From 0232aaaf6cd1cc9706c166192e525ccc3a70c333 Mon Sep 17 00:00:00 2001 From: Terkwood Date: Wed, 3 Jul 2019 19:14:29 -0400 Subject: [PATCH 5/7] correct example --- microcontrollers/area_dht11_esp8266/area_dht11_esp8266.ino | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/microcontrollers/area_dht11_esp8266/area_dht11_esp8266.ino b/microcontrollers/area_dht11_esp8266/area_dht11_esp8266.ino index a8822410..092c5298 100644 --- a/microcontrollers/area_dht11_esp8266/area_dht11_esp8266.ino +++ b/microcontrollers/area_dht11_esp8266/area_dht11_esp8266.ino @@ -13,7 +13,7 @@ // WARNING! YOU MUST REPLACE THE EXAMPLE VALUE BELOW -const char* DEVICE_ID = "b1c8ae88-8622-415f-951e-27a21888fe19"; +const char* DEVICE_ID = "ext_id_..._not_a_uuid"; // WIFI CONFIGURABLE CONSTANTS From b498719db3eb2844631de0f9f8f21edc393cb529 Mon Sep 17 00:00:00 2001 From: Terkwood Date: Wed, 3 Jul 2019 19:29:38 -0400 Subject: [PATCH 6/7] fix typo --- services/sensor_tracker/src/predis.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/sensor_tracker/src/predis.rs b/services/sensor_tracker/src/predis.rs index deed69fe..ac3563c7 100644 --- a/services/sensor_tracker/src/predis.rs +++ b/services/sensor_tracker/src/predis.rs @@ -97,7 +97,7 @@ fn update_area_hash( // We found the area associated with this // sensor ID, so we should update that area's // current reading. - let area_key = format!("{}/area/{}", redis_ctx.namespace, area_num); + let area_key = format!("{}/areas/{}", redis_ctx.namespace, area_num); let area_measure_count: Result, _> = redis_ctx .conn From 76602b582bbb79d6423600e0cd251ea21e78a7e0 Mon Sep 17 00:00:00 2001 From: Terkwood Date: Wed, 3 Jul 2019 19:39:26 -0400 Subject: [PATCH 7/7] dedupe method --- services/sensor_tracker/src/predis.rs | 91 ++++++++++----------------- 1 file changed, 32 insertions(+), 59 deletions(-) diff --git a/services/sensor_tracker/src/predis.rs b/services/sensor_tracker/src/predis.rs index ac3563c7..3d60ad78 100644 --- a/services/sensor_tracker/src/predis.rs +++ b/services/sensor_tracker/src/predis.rs @@ -42,8 +42,12 @@ pub fn update<'a, 'b>( if let Ok(v) = tank_and_area_and_update_count { // Tank associated with this sensor? let revent = match (v.get(0).unwrap_or(&None), v.get(1).unwrap_or(&None)) { - (Some(tank_num), _) => update_tank_hash(redis_ctx, tank_num, &measure), - (_, Some(area_num)) => update_area_hash(redis_ctx, area_num, &measure), + (Some(tank_num), _) => { + update_container_hash(redis_ctx, Container::Tanks, tank_num, &measure) + } + (_, Some(area_num)) => { + update_container_hash(redis_ctx, Container::Areas, area_num, &measure) + } (None, None) => ensure_sensor_hash_exists(redis_ctx, sensor_hash_key, ext_device_id), }; @@ -89,70 +93,39 @@ fn update_sensor_set( } } -fn update_area_hash( - redis_ctx: &RedisContext, - area_num: &u64, - measure: &model::Measurement, -) -> Option { - // We found the area associated with this - // sensor ID, so we should update that area's - // current reading. - let area_key = format!("{}/areas/{}", redis_ctx.namespace, area_num); - - let area_measure_count: Result, _> = redis_ctx - .conn - .hget(&area_key, &format!("{}_update_count", measure.name())); - - let uc_name = format!("{}_update_count", measure.name()); - let ut_name = format!("{}_update_time", measure.name()); - let update: (Result, Vec<&str>) = { - let mut data: Vec<(&str, String)> = measure.to_redis(); - - data.push(( - &uc_name, - area_measure_count - .unwrap_or(None) - .map(|u| u + 1) - .unwrap_or(1) - .to_string(), - )); - - data.push((&ut_name, epoch_secs().to_string())); - ( - redis_ctx.conn.hset_multiple(&area_key, &data[..]), - data.iter().map(|(a, _)| *a).collect(), - ) - }; +enum Container { + Tanks, + Areas, +} - match update { - (Err(e), _) => { - println!("update fails for {}: {:?}", area_key, e); - None +impl Container { + pub fn to_string(self) -> String { + match self { + Container::Tanks => "tanks".to_string(), + Container::Areas => "areas".to_string(), } - (Ok(_), fields) if fields.len() > 0 => { - let fs = fields.iter().map(|s| s.to_string()).collect(); - Some(REvent::HashUpdated { - key: area_key.to_string(), - fields: fs, - }) - } - _ => None, } } -fn update_tank_hash( +fn update_container_hash( redis_ctx: &RedisContext, - tank_num: &u64, + container: Container, + container_num: &u64, measure: &model::Measurement, ) -> Option { - // We found the tank associated with this - // sensor ID, so we should update that tank's + // We found the area associated with this + // sensor ID, so we should update that area's // current reading. - let tank_key = format!("{}/tanks/{}", redis_ctx.namespace, tank_num); + let container_key = format!( + "{}/{}/{}", + redis_ctx.namespace, + container.to_string(), + container_num + ); - let tank_measure_count: Result, _> = redis_ctx + let container_measure_count: Result, _> = redis_ctx .conn - .hget(&tank_key, &format!("{}_update_count", measure.name())); + .hget(&container_key, &format!("{}_update_count", measure.name())); let uc_name = format!("{}_update_count", measure.name()); let ut_name = format!("{}_update_time", measure.name()); @@ -161,7 +134,7 @@ fn update_tank_hash( data.push(( &uc_name, - tank_measure_count + container_measure_count .unwrap_or(None) .map(|u| u + 1) .unwrap_or(1) @@ -170,20 +143,20 @@ fn update_tank_hash( data.push((&ut_name, epoch_secs().to_string())); ( - redis_ctx.conn.hset_multiple(&tank_key, &data[..]), + redis_ctx.conn.hset_multiple(&container_key, &data[..]), data.iter().map(|(a, _)| *a).collect(), ) }; match update { (Err(e), _) => { - println!("update fails for {}: {:?}", tank_key, e); + println!("update fails for {}: {:?}", container_key, e); None } (Ok(_), fields) if fields.len() > 0 => { let fs = fields.iter().map(|s| s.to_string()).collect(); Some(REvent::HashUpdated { - key: tank_key.to_string(), + key: container_key.to_string(), fields: fs, }) }