Skip to content

Commit

Permalink
#8 Added support for I_TIME requests from sensors
Browse files Browse the repository at this point in the history
  • Loading branch information
tsathishkumar committed Nov 22, 2018
1 parent a56d181 commit 598b8a3
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 31 deletions.
10 changes: 10 additions & 0 deletions src/core/message/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ impl InternalMessage {
payload: String::from(payload),
})
}

pub fn as_response(&mut self, response: String) -> String {
self.sub_type = match self.sub_type {
InternalType::IdRequest => InternalType::IdResponse,
_other_type => _other_type,
};

self.payload = response;
self.to_string()
}
}

impl fmt::Display for InternalMessage {
Expand Down
100 changes: 69 additions & 31 deletions src/core/message_handler/internal.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::channel::{Receiver, Sender};
use crate::core::message::internal::*;
use crate::model::node::nodes::dsl;
use crate::model::node::Node;
use diesel;
use diesel::prelude::*;
use diesel::r2d2::ConnectionManager;
use crate::model::node::nodes::dsl;
use crate::model::node::Node;
use r2d2::*;
use std::time::{SystemTime, UNIX_EPOCH};

const MIN_NODE_ID: u8 = 1;
const MAX_NODE_ID: u8 = 254;
Expand All @@ -18,42 +19,75 @@ pub fn handle(
) {
loop {
match receiver.recv() {
Ok(internal_message_request) => match internal_message_request {
InternalMessage {node_id: 255, child_sensor_id: 255, ack: 0, sub_type: InternalType::IdRequest, ref payload } if payload == "0"
=> match get_next_node_id(&db_connection) {
Some(new_node_id) => {
let mut node_id_response = internal_message_request.clone();
node_id_response.sub_type = InternalType::IdResponse;
node_id_response.payload = new_node_id.to_string();
match create_node(&db_connection, new_node_id as i32) {
Ok(_) => match response_sender.send(node_id_response.to_string()) {
Ok(_) => continue,
Err(_) => error!("Error while sending to node_handler"),
},
Err(_) => error!("Error while creating node with new id"),
}
},
None => error!("There is no free node id! All 254 id's are already reserved!"),
},
InternalMessage {node_id, child_sensor_id: 255, ack: _, sub_type: InternalType::DiscoverResponse, ref payload } => {
let parent_node_id = payload.parse::<u8>().unwrap();
match update_network_topology(&db_connection, node_id as i32, parent_node_id as i32) {
Ok(_) => info!("Updated network topology"),
Err(e) => error!("Update network topology failed {:?}", e),
}
forward_to_controller(controller_forward_sender, internal_message_request)
},
_ => forward_to_controller(controller_forward_sender, internal_message_request),
Ok(message) => match message.sub_type {
InternalType::IdRequest => send_node_id(&db_connection, response_sender, message),
InternalType::Time => send_current_time(response_sender, message),
InternalType::DiscoverResponse => {
send_discover_response(&db_connection, &message);
forward_to_controller(controller_forward_sender, message)
}
_ => forward_to_controller(controller_forward_sender, message),
},
_ => (),
}
}
}

fn send_node_id(
db_connection: &PooledConnection<ConnectionManager<SqliteConnection>>,
response_sender: &Sender<String>,
mut message: InternalMessage,
) -> () {
match get_next_node_id(db_connection) {
Some(new_node_id) => match create_node(db_connection, new_node_id as i32) {
Ok(_) => match response_sender.send(message.as_response(new_node_id.to_string())) {
Ok(_) => (),
Err(_) => error!("Error while sending to node_handler"),
},
Err(_) => error!("Error while creating node with new id"),
},
None => error!("There is no free node id! All 254 id's are already reserved!"),
}
}

fn send_current_time(response_sender: &Sender<String>, mut message: InternalMessage) {
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
match response_sender.send(message.as_response(since_the_epoch.as_secs().to_string())) {
Ok(_) => (),
Err(_) => error!("Error while sending to node_handler"),
}
}

fn send_discover_response(
db_connection: &PooledConnection<ConnectionManager<SqliteConnection>>,
message: &InternalMessage,
) {
match message.payload.parse::<u8>() {
Ok(parent_node_id) => match update_network_topology(
&db_connection,
message.node_id as i32,
parent_node_id as i32,
) {
Ok(_) => info!("Updated network topology"),
Err(e) => error!("Update network topology failed {:?}", e),
},
Err(e) => error!(
"Error {:?} while parsing discover message payload {:?}",
e, &message.payload
),
}
}

fn forward_to_controller(controller_sender: &Sender<String>, message: InternalMessage) {
match controller_sender.send(message.to_string()) {
Ok(_) => (),
Err(error) => error!("Error while forwarding internal message to controller {:?}", error),
Err(error) => error!(
"Error while forwarding internal message to controller {:?}",
error
),
}
}

Expand All @@ -75,12 +109,16 @@ pub fn create_node(conn: &SqliteConnection, id: i32) -> Result<usize, diesel::re
.execute(conn)
}

pub fn update_network_topology(conn: &SqliteConnection, _node_id: i32, _parent_node_id: i32) -> Result<usize, diesel::result::Error> {
pub fn update_network_topology(
conn: &SqliteConnection,
_node_id: i32,
_parent_node_id: i32,
) -> Result<usize, diesel::result::Error> {
use crate::model::node::nodes::dsl::*;
diesel::update(nodes)
.filter(node_id.eq(_node_id))
.set(parent_node_id.eq(_parent_node_id))
.execute(conn)
.execute(conn)
}

pub fn get_next_node_id(conn: &SqliteConnection) -> Option<u8> {
Expand Down
6 changes: 6 additions & 0 deletions src/core/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ pub fn start(
gateway_out_sender,
controller_out_receiver,
);
} else {
loop {
match controller_out_receiver.recv() {
_ => debug!("Ignoring messages to controller, as no controller is configured"),
}
}
}
});

Expand Down

0 comments on commit 598b8a3

Please sign in to comment.