Skip to content

Commit

Permalink
Merge pull request #11 from julienw/properly-use-ranges
Browse files Browse the repository at this point in the history
Consume Ranges and send Exit events and adapt to the new taxonomy API
  • Loading branch information
julienw committed Apr 12, 2016
2 parents 60829cb + f0d587c commit a4656b7
Showing 1 changed file with 94 additions and 20 deletions.
114 changes: 94 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,39 @@ impl<Kind, Type> IdMap<Kind, Type> where Type: Eq + Clone, Kind: Clone {
}
}

type SyncExtSender = Mutex<Box<ExtSender<WatchEvent>>>;
type WatchersMap = HashMap<usize, Arc<SyncExtSender>>;
#[derive(Debug, Copy, Clone, PartialEq)]
enum SendDirection {
Enter,
Exit,
}

trait RangeChecker {
fn should_send(&self, &Value, SendDirection) -> bool;
}

impl RangeChecker for Option<Range> {
fn should_send(&self, value: &Value, direction: SendDirection) -> bool {
match *self {
None => direction == SendDirection::Enter, // no range means we send only Enter events
Some(ref range) => range.contains(value)
}
}
}

impl RangeChecker for Range {
fn should_send(&self, value: &Value, _: SendDirection) -> bool {
self.contains(value)
}
}

type SyncSender = Mutex<Box<ExtSender<WatchEvent>>>;
type WatchersMap = HashMap<usize, Arc<SyncSender>>;
type RangedWeakSender = (Option<Range>, Weak<SyncSender>);
type RangedSyncSender = (Option<Range>, Arc<SyncSender>);
struct Watchers {
current_index: usize,
map: Arc<Mutex<WatchersMap>>,
getter_map: HashMap<TaxId<Getter>, Vec<Weak<SyncExtSender>>>,
getter_map: HashMap<TaxId<Getter>, Vec<RangedWeakSender>>,
}

impl Watchers {
Expand All @@ -137,7 +164,7 @@ impl Watchers {
}
}

fn push(&mut self, tax_id: TaxId<Getter>, watcher: Arc<SyncExtSender>) -> WatcherGuard {
fn push(&mut self, tax_id: TaxId<Getter>, range: Option<Range>, watcher: Arc<SyncSender>) -> WatcherGuard {
let index = self.current_index;
self.current_index += 1;
{
Expand All @@ -146,22 +173,25 @@ impl Watchers {
}

let entry = self.getter_map.entry(tax_id).or_insert(Vec::new());
entry.push(Arc::downgrade(&watcher));
entry.push((range, Arc::downgrade(&watcher)));

WatcherGuard {
key: index,
map: self.map.clone()
}
}

fn get(&self, index: usize) -> Option<Arc<SyncExtSender>> {
fn get(&self, index: usize) -> Option<Arc<SyncSender>> {
let map = self.map.lock().unwrap();
map.get(&index).cloned()
}

fn get_from_tax_id(&self, tax_id: &TaxId<Getter>) -> Option<Vec<Arc<SyncExtSender>>> {
fn get_from_tax_id(&self, tax_id: &TaxId<Getter>) -> Option<Vec<RangedSyncSender>> {
self.getter_map.get(tax_id).and_then(|vec| {
let vec: Vec<_> = vec.iter().filter_map(|weak_sender| weak_sender.upgrade()).collect();
let vec: Vec<_> = vec.iter().filter_map(|&(ref range, ref weak_sender)| {
let range = range.clone();
weak_sender.upgrade().map(|sender| (range, sender))
}).collect();
if vec.len() == 0 { None } else { Some(vec) }
})
}
Expand Down Expand Up @@ -198,6 +228,8 @@ impl Drop for WatcherGuard {

impl AdapterWatchGuard for WatcherGuard {}

type ValueCache = HashMap<TaxId<Getter>, Value>;

pub struct OpenzwaveAdapter {
id: TaxId<AdapterId>,
name: String,
Expand All @@ -208,6 +240,7 @@ pub struct OpenzwaveAdapter {
getter_map: IdMap<Getter, ValueID>,
setter_map: IdMap<Setter, ValueID>,
watchers: Arc<Mutex<Watchers>>,
value_cache: Arc<Mutex<ValueCache>>,
}

fn ensure_directory<T: AsRef<Path> + ?Sized>(directory: &T) -> Result<(), Error> {
Expand Down Expand Up @@ -257,6 +290,7 @@ impl OpenzwaveAdapter {
getter_map: IdMap::new(),
setter_map: IdMap::new(),
watchers: Arc::new(Mutex::new(Watchers::new())),
value_cache: Arc::new(Mutex::new(HashMap::new())),
});

adapter.spawn_notification_thread(rx, box_manager);
Expand All @@ -274,9 +308,11 @@ impl OpenzwaveAdapter {
let mut getter_map = self.getter_map.clone();
let mut setter_map = self.setter_map.clone();
let watchers = self.watchers.clone();
let value_cache = self.value_cache.clone();

thread::spawn(move || {
for notification in rx {
//debug!("Received notification {:?}", notification);
match notification {
ZWaveNotification::ControllerReady(controller) => {
let service = format!("OpenZWave/{}", controller.get_home_id());
Expand Down Expand Up @@ -347,18 +383,50 @@ impl OpenzwaveAdapter {
_ => continue
};

let value = match to_open_closed(&value) {
Some(value) => value,
_ => continue
};

let watchers = watchers.lock().unwrap();

let watchers = match watchers.get_from_tax_id(&tax_id) {
Some(watchers) => watchers,
_ => continue
};

for sender in &watchers {
let sender = sender.lock().unwrap();
if let Some(value) = to_open_closed(&value) {
let previous_value = {
let mut cache = value_cache.lock().unwrap();
let previous = cache.get(&tax_id).cloned();
cache.insert(tax_id.clone(), value.clone());
previous
};

for &(ref range, ref sender) in &watchers {
debug!("Openzwave::Adapter::ValueChanged iterate over watcher {:?} {:?}", tax_id, range);

let should_send_value = range.should_send(&value, SendDirection::Enter);

if let Some(ref previous_value) = previous_value {
let should_send_previous = range.should_send(previous_value, SendDirection::Exit);
// If the new and the old values are both in the same range, we
// need to send nothing.
if should_send_value && should_send_previous { continue }

if should_send_previous {
debug!("Openzwave::Adapter::ValueChanged Sending event Exit {:?} {:?}", tax_id, value);
let sender = sender.lock().unwrap();
sender.send(
WatchEvent::Exit { id: tax_id.clone(), value: value.clone() }
);
}
}

if should_send_value {
debug!("Openzwave::Adapter::ValueChanged Sending event Enter {:?} {:?}", tax_id, value);
let sender = sender.lock().unwrap();
sender.send(
WatchEvent::Enter { id: tax_id.clone(), value: value }
WatchEvent::Enter { id: tax_id.clone(), value: value.clone() }
);
}
}
Expand Down Expand Up @@ -411,12 +479,14 @@ impl taxonomy::adapter::Adapter for OpenzwaveAdapter {
unimplemented!()
}

fn register_watch(&self, mut values: Vec<(TaxId<Getter>, Option<Range>)>, sender: Box<ExtSender<WatchEvent>>) -> ResultMap<TaxId<Getter>, Box<AdapterWatchGuard>, TaxError> {
let sender = Arc::new(Mutex::new(sender)); // Mutex is necessary because cb is not Sync.
values.drain(..).map(|(id, _)| {
fn register_watch(&self, mut values: Vec<(TaxId<Getter>, Option<Range>, Box<ExtSender<WatchEvent>>)>) -> Vec<(TaxId<Getter>, Result<Box<AdapterWatchGuard>, TaxError>)> {
debug!("Openzwave::Adapter::register_watch Should register some watchers");
values.drain(..).map(|(id, range, sender)| {
let sender = Arc::new(Mutex::new(sender)); // Mutex is necessary because cb is not Sync.
debug!("Openzwave::Adapter::register_watch Should register a watcher for {:?} {:?}", id, range);
let watch_guard = {
let mut watchers = self.watchers.lock().unwrap();
watchers.push(id.clone(), sender.clone())
watchers.push(id.clone(), range.clone(), sender.clone())
};
let value_result: Result<Box<AdapterWatchGuard>, TaxError> = Ok(Box::new(watch_guard));

Expand All @@ -425,10 +495,14 @@ impl taxonomy::adapter::Adapter for OpenzwaveAdapter {
if let Some(value) = ozw_value {
if value.is_set() && value.get_type() == ValueType::ValueType_Bool {
if let Some(value) = to_open_closed(&value) {
let sender = sender.lock().unwrap();
sender.send(
WatchEvent::Enter { id: id.clone(), value: value }
);
self.value_cache.lock().unwrap().insert(id.clone(), value.clone());
if range.should_send(&value, SendDirection::Enter) {
debug!("Openzwave::Adapter::register_watch Sending event Enter {:?} {:?}", id, value);
let sender = sender.lock().unwrap();
sender.send(
WatchEvent::Enter { id: id.clone(), value: value }
);
}
}
}
}
Expand Down

0 comments on commit a4656b7

Please sign in to comment.