Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consume Ranges and send Exit events and adapt to the new taxonomy API #11

Merged
merged 1 commit into from
Apr 12, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 94 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,39 @@ impl<Kind, Type> IdMap<Kind, Type> where Type: Eq + Clone, Kind: Clone {
}
}

type SyncExtSender = Mutex<Box<ExtSender<WatchEvent>>>;
type WatchersMap = HashMap<usize, Arc<SyncExtSender>>;
#[derive(Debug, Copy, Clone, PartialEq)]
enum SendDirection {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Not sure that SendDirection is the right name, but I have no better to offer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#12

Enter,
Exit,
}

trait RangeChecker {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1/ Could you document this in a followup?
2/ You actually don't need a trait for that, right? I think that you just want to

impl Option<Range> {
  fn should_send(...) {
     // ...
  }
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About 2/ we can't likely implement it here because these objects are not from our crate.

About 1/ #13

fn should_send(&self, &Value, SendDirection) -> bool;
}

impl RangeChecker for Option<Range> {
fn should_send(&self, value: &Value, direction: SendDirection) -> bool {
match *self {
None => direction == SendDirection::Enter, // no range means we send only Enter events
Some(ref range) => range.contains(value)
}
}
}

impl RangeChecker for Range {
fn should_send(&self, value: &Value, _: SendDirection) -> bool {
self.contains(value)
}
}

type SyncSender = Mutex<Box<ExtSender<WatchEvent>>>;
type WatchersMap = HashMap<usize, Arc<SyncSender>>;
type RangedWeakSender = (Option<Range>, Weak<SyncSender>);
type RangedSyncSender = (Option<Range>, Arc<SyncSender>);
struct Watchers {
current_index: usize,
map: Arc<Mutex<WatchersMap>>,
getter_map: HashMap<TaxId<Getter>, Vec<Weak<SyncExtSender>>>,
getter_map: HashMap<TaxId<Getter>, Vec<RangedWeakSender>>,
}

impl Watchers {
Expand All @@ -137,7 +164,7 @@ impl Watchers {
}
}

fn push(&mut self, tax_id: TaxId<Getter>, watcher: Arc<SyncExtSender>) -> WatcherGuard {
fn push(&mut self, tax_id: TaxId<Getter>, range: Option<Range>, watcher: Arc<SyncSender>) -> WatcherGuard {
let index = self.current_index;
self.current_index += 1;
{
Expand All @@ -146,22 +173,25 @@ impl Watchers {
}

let entry = self.getter_map.entry(tax_id).or_insert(Vec::new());
entry.push(Arc::downgrade(&watcher));
entry.push((range, Arc::downgrade(&watcher)));

WatcherGuard {
key: index,
map: self.map.clone()
}
}

fn get(&self, index: usize) -> Option<Arc<SyncExtSender>> {
fn get(&self, index: usize) -> Option<Arc<SyncSender>> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest wrapping these usize as a less fragile type. You can do it in a followup, though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#14

let map = self.map.lock().unwrap();
map.get(&index).cloned()
}

fn get_from_tax_id(&self, tax_id: &TaxId<Getter>) -> Option<Vec<Arc<SyncExtSender>>> {
fn get_from_tax_id(&self, tax_id: &TaxId<Getter>) -> Option<Vec<RangedSyncSender>> {
self.getter_map.get(tax_id).and_then(|vec| {
let vec: Vec<_> = vec.iter().filter_map(|weak_sender| weak_sender.upgrade()).collect();
let vec: Vec<_> = vec.iter().filter_map(|&(ref range, ref weak_sender)| {
let range = range.clone();
weak_sender.upgrade().map(|sender| (range, sender))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a good place as any to actually remove those that you can't upgrade, no?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#15

}).collect();
if vec.len() == 0 { None } else { Some(vec) }
})
}
Expand Down Expand Up @@ -198,6 +228,8 @@ impl Drop for WatcherGuard {

impl AdapterWatchGuard for WatcherGuard {}

type ValueCache = HashMap<TaxId<Getter>, Value>;

pub struct OpenzwaveAdapter {
id: TaxId<AdapterId>,
name: String,
Expand All @@ -208,6 +240,7 @@ pub struct OpenzwaveAdapter {
getter_map: IdMap<Getter, ValueID>,
setter_map: IdMap<Setter, ValueID>,
watchers: Arc<Mutex<Watchers>>,
value_cache: Arc<Mutex<ValueCache>>,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This deserves some documentation.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you clean up this cache at some point?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#16

}

fn ensure_directory<T: AsRef<Path> + ?Sized>(directory: &T) -> Result<(), Error> {
Expand Down Expand Up @@ -257,6 +290,7 @@ impl OpenzwaveAdapter {
getter_map: IdMap::new(),
setter_map: IdMap::new(),
watchers: Arc::new(Mutex::new(Watchers::new())),
value_cache: Arc::new(Mutex::new(HashMap::new())),
});

adapter.spawn_notification_thread(rx, box_manager);
Expand All @@ -274,9 +308,11 @@ impl OpenzwaveAdapter {
let mut getter_map = self.getter_map.clone();
let mut setter_map = self.setter_map.clone();
let watchers = self.watchers.clone();
let value_cache = self.value_cache.clone();

thread::spawn(move || {
for notification in rx {
//debug!("Received notification {:?}", notification);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Dead code?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's useful to be able to quickly uncomment this.

match notification {
ZWaveNotification::ControllerReady(controller) => {
let service = format!("OpenZWave/{}", controller.get_home_id());
Expand Down Expand Up @@ -347,18 +383,50 @@ impl OpenzwaveAdapter {
_ => continue
};

let value = match to_open_closed(&value) {
Some(value) => value,
_ => continue
};

let watchers = watchers.lock().unwrap();

let watchers = match watchers.get_from_tax_id(&tax_id) {
Some(watchers) => watchers,
_ => continue
};

for sender in &watchers {
let sender = sender.lock().unwrap();
if let Some(value) = to_open_closed(&value) {
let previous_value = {
let mut cache = value_cache.lock().unwrap();
let previous = cache.get(&tax_id).cloned();
cache.insert(tax_id.clone(), value.clone());
previous
};

for &(ref range, ref sender) in &watchers {
debug!("Openzwave::Adapter::ValueChanged iterate over watcher {:?} {:?}", tax_id, range);

let should_send_value = range.should_send(&value, SendDirection::Enter);

if let Some(ref previous_value) = previous_value {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you actually need the previous value or just to know if it was in the range?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#17

let should_send_previous = range.should_send(previous_value, SendDirection::Exit);
// If the new and the old values are both in the same range, we
// need to send nothing.
if should_send_value && should_send_previous { continue }

if should_send_previous {
debug!("Openzwave::Adapter::ValueChanged Sending event Exit {:?} {:?}", tax_id, value);
let sender = sender.lock().unwrap();
sender.send(
WatchEvent::Exit { id: tax_id.clone(), value: value.clone() }
);
}
}

if should_send_value {
debug!("Openzwave::Adapter::ValueChanged Sending event Enter {:?} {:?}", tax_id, value);
let sender = sender.lock().unwrap();
sender.send(
WatchEvent::Enter { id: tax_id.clone(), value: value }
WatchEvent::Enter { id: tax_id.clone(), value: value.clone() }
);
}
}
Expand Down Expand Up @@ -411,12 +479,14 @@ impl taxonomy::adapter::Adapter for OpenzwaveAdapter {
unimplemented!()
}

fn register_watch(&self, mut values: Vec<(TaxId<Getter>, Option<Range>)>, sender: Box<ExtSender<WatchEvent>>) -> ResultMap<TaxId<Getter>, Box<AdapterWatchGuard>, TaxError> {
let sender = Arc::new(Mutex::new(sender)); // Mutex is necessary because cb is not Sync.
values.drain(..).map(|(id, _)| {
fn register_watch(&self, mut values: Vec<(TaxId<Getter>, Option<Range>, Box<ExtSender<WatchEvent>>)>) -> Vec<(TaxId<Getter>, Result<Box<AdapterWatchGuard>, TaxError>)> {
debug!("Openzwave::Adapter::register_watch Should register some watchers");
values.drain(..).map(|(id, range, sender)| {
let sender = Arc::new(Mutex::new(sender)); // Mutex is necessary because cb is not Sync.
debug!("Openzwave::Adapter::register_watch Should register a watcher for {:?} {:?}", id, range);
let watch_guard = {
let mut watchers = self.watchers.lock().unwrap();
watchers.push(id.clone(), sender.clone())
watchers.push(id.clone(), range.clone(), sender.clone())
};
let value_result: Result<Box<AdapterWatchGuard>, TaxError> = Ok(Box::new(watch_guard));

Expand All @@ -425,10 +495,14 @@ impl taxonomy::adapter::Adapter for OpenzwaveAdapter {
if let Some(value) = ozw_value {
if value.is_set() && value.get_type() == ValueType::ValueType_Bool {
if let Some(value) = to_open_closed(&value) {
let sender = sender.lock().unwrap();
sender.send(
WatchEvent::Enter { id: id.clone(), value: value }
);
self.value_cache.lock().unwrap().insert(id.clone(), value.clone());
if range.should_send(&value, SendDirection::Enter) {
debug!("Openzwave::Adapter::register_watch Sending event Enter {:?} {:?}", id, value);
let sender = sender.lock().unwrap();
sender.send(
WatchEvent::Enter { id: id.clone(), value: value }
);
}
}
}
}
Expand Down