diff --git a/src/lib.rs b/src/lib.rs index 4a55f62..232a78b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -120,12 +120,39 @@ impl IdMap where Type: Eq + Clone, Kind: Clone { } } -type SyncExtSender = Mutex>>; -type WatchersMap = HashMap>; +#[derive(Debug, Copy, Clone, PartialEq)] +enum SendDirection { + Enter, + Exit, +} + +trait RangeChecker { + fn should_send(&self, &Value, SendDirection) -> bool; +} + +impl RangeChecker for Option { + fn should_send(&self, value: &Value, direction: SendDirection) -> bool { + match *self { + None => direction == SendDirection::Enter, // no range means we send only Enter events + Some(ref range) => range.contains(value) + } + } +} + +impl RangeChecker for Range { + fn should_send(&self, value: &Value, _: SendDirection) -> bool { + self.contains(value) + } +} + +type SyncSender = Mutex>>; +type WatchersMap = HashMap>; +type RangedWeakSender = (Option, Weak); +type RangedSyncSender = (Option, Arc); struct Watchers { current_index: usize, map: Arc>, - getter_map: HashMap, Vec>>, + getter_map: HashMap, Vec>, } impl Watchers { @@ -137,7 +164,7 @@ impl Watchers { } } - fn push(&mut self, tax_id: TaxId, watcher: Arc) -> WatcherGuard { + fn push(&mut self, tax_id: TaxId, range: Option, watcher: Arc) -> WatcherGuard { let index = self.current_index; self.current_index += 1; { @@ -146,7 +173,7 @@ impl Watchers { } let entry = self.getter_map.entry(tax_id).or_insert(Vec::new()); - entry.push(Arc::downgrade(&watcher)); + entry.push((range, Arc::downgrade(&watcher))); WatcherGuard { key: index, @@ -154,14 +181,17 @@ impl Watchers { } } - fn get(&self, index: usize) -> Option> { + fn get(&self, index: usize) -> Option> { let map = self.map.lock().unwrap(); map.get(&index).cloned() } - fn get_from_tax_id(&self, tax_id: &TaxId) -> Option>> { + fn get_from_tax_id(&self, tax_id: &TaxId) -> Option> { self.getter_map.get(tax_id).and_then(|vec| { - let vec: Vec<_> = vec.iter().filter_map(|weak_sender| weak_sender.upgrade()).collect(); + let vec: Vec<_> = vec.iter().filter_map(|&(ref range, ref weak_sender)| { + let range = range.clone(); + weak_sender.upgrade().map(|sender| (range, sender)) + }).collect(); if vec.len() == 0 { None } else { Some(vec) } }) } @@ -198,6 +228,8 @@ impl Drop for WatcherGuard { impl AdapterWatchGuard for WatcherGuard {} +type ValueCache = HashMap, Value>; + pub struct OpenzwaveAdapter { id: TaxId, name: String, @@ -208,6 +240,7 @@ pub struct OpenzwaveAdapter { getter_map: IdMap, setter_map: IdMap, watchers: Arc>, + value_cache: Arc>, } fn ensure_directory + ?Sized>(directory: &T) -> Result<(), Error> { @@ -257,6 +290,7 @@ impl OpenzwaveAdapter { getter_map: IdMap::new(), setter_map: IdMap::new(), watchers: Arc::new(Mutex::new(Watchers::new())), + value_cache: Arc::new(Mutex::new(HashMap::new())), }); adapter.spawn_notification_thread(rx, box_manager); @@ -274,9 +308,11 @@ impl OpenzwaveAdapter { let mut getter_map = self.getter_map.clone(); let mut setter_map = self.setter_map.clone(); let watchers = self.watchers.clone(); + let value_cache = self.value_cache.clone(); thread::spawn(move || { for notification in rx { + //debug!("Received notification {:?}", notification); match notification { ZWaveNotification::ControllerReady(controller) => { let service = format!("OpenZWave/{}", controller.get_home_id()); @@ -347,6 +383,11 @@ impl OpenzwaveAdapter { _ => continue }; + let value = match to_open_closed(&value) { + Some(value) => value, + _ => continue + }; + let watchers = watchers.lock().unwrap(); let watchers = match watchers.get_from_tax_id(&tax_id) { @@ -354,11 +395,38 @@ impl OpenzwaveAdapter { _ => continue }; - for sender in &watchers { - let sender = sender.lock().unwrap(); - if let Some(value) = to_open_closed(&value) { + let previous_value = { + let mut cache = value_cache.lock().unwrap(); + let previous = cache.get(&tax_id).cloned(); + cache.insert(tax_id.clone(), value.clone()); + previous + }; + + for &(ref range, ref sender) in &watchers { + debug!("Openzwave::Adapter::ValueChanged iterate over watcher {:?} {:?}", tax_id, range); + + let should_send_value = range.should_send(&value, SendDirection::Enter); + + if let Some(ref previous_value) = previous_value { + let should_send_previous = range.should_send(previous_value, SendDirection::Exit); + // If the new and the old values are both in the same range, we + // need to send nothing. + if should_send_value && should_send_previous { continue } + + if should_send_previous { + debug!("Openzwave::Adapter::ValueChanged Sending event Exit {:?} {:?}", tax_id, value); + let sender = sender.lock().unwrap(); + sender.send( + WatchEvent::Exit { id: tax_id.clone(), value: value.clone() } + ); + } + } + + if should_send_value { + debug!("Openzwave::Adapter::ValueChanged Sending event Enter {:?} {:?}", tax_id, value); + let sender = sender.lock().unwrap(); sender.send( - WatchEvent::Enter { id: tax_id.clone(), value: value } + WatchEvent::Enter { id: tax_id.clone(), value: value.clone() } ); } } @@ -411,12 +479,14 @@ impl taxonomy::adapter::Adapter for OpenzwaveAdapter { unimplemented!() } - fn register_watch(&self, mut values: Vec<(TaxId, Option)>, sender: Box>) -> ResultMap, Box, TaxError> { - let sender = Arc::new(Mutex::new(sender)); // Mutex is necessary because cb is not Sync. - values.drain(..).map(|(id, _)| { + fn register_watch(&self, mut values: Vec<(TaxId, Option, Box>)>) -> Vec<(TaxId, Result, TaxError>)> { + debug!("Openzwave::Adapter::register_watch Should register some watchers"); + values.drain(..).map(|(id, range, sender)| { + let sender = Arc::new(Mutex::new(sender)); // Mutex is necessary because cb is not Sync. + debug!("Openzwave::Adapter::register_watch Should register a watcher for {:?} {:?}", id, range); let watch_guard = { let mut watchers = self.watchers.lock().unwrap(); - watchers.push(id.clone(), sender.clone()) + watchers.push(id.clone(), range.clone(), sender.clone()) }; let value_result: Result, TaxError> = Ok(Box::new(watch_guard)); @@ -425,10 +495,14 @@ impl taxonomy::adapter::Adapter for OpenzwaveAdapter { if let Some(value) = ozw_value { if value.is_set() && value.get_type() == ValueType::ValueType_Bool { if let Some(value) = to_open_closed(&value) { - let sender = sender.lock().unwrap(); - sender.send( - WatchEvent::Enter { id: id.clone(), value: value } - ); + self.value_cache.lock().unwrap().insert(id.clone(), value.clone()); + if range.should_send(&value, SendDirection::Enter) { + debug!("Openzwave::Adapter::register_watch Sending event Enter {:?} {:?}", id, value); + let sender = sender.lock().unwrap(); + sender.send( + WatchEvent::Enter { id: id.clone(), value: value } + ); + } } } }