diff --git a/debian/postinst b/debian/postinst index 38e5b4a..f0ebb87 100644 --- a/debian/postinst +++ b/debian/postinst @@ -30,6 +30,9 @@ DATALOGGER_INPUT_METHOD=serial # Baudrate for reading telegrams from the serial line. DATALOGGER_SERIAL_BAUDRATE=9600 +# The time in seconds that the datalogger will pause after each telegram written to the DSMR-reader API. +DATALOGGER_SLEEP=5 + EOF # Ensure that the config file has the correct ownership chown ${PKG_USER}:${PKG_USER} ${PKG_CONF} diff --git a/src/dsmr/reader.rs b/src/dsmr/reader.rs index 97abaa7..18d3f72 100644 --- a/src/dsmr/reader.rs +++ b/src/dsmr/reader.rs @@ -3,9 +3,7 @@ use super::settings::ParityBitSetting; use serialport::SerialPort; -use std::borrow::Cow; -use std::io::BufRead; -use std::io::BufReader; +use std::io::{BufRead, BufReader}; use std::str; use std::time::Duration; @@ -42,11 +40,9 @@ fn find_end_of_telegram(buffer: &str, from: usize) -> Option { } // Scans the buffer to see if there is a telegram in it. -// If so, invokes the consumer with that telegram. -// Otherwise, clear the buffer (if there is no telegram unders construction) -// or keep it intact (if a telegram is under construction). -// Returns the new state of the buffer - which may be the unmodified buffer, or a new buffer. -fn eat_telegrams<'a>(buffer: &'a str, consumer: &mut dyn super::TelegramConsumer) -> Cow<'a, str> { +// If so, returns that telegram. +// Otherwise, return nothing +fn extract_telegram(buffer: &str) -> std::option::Option<&str> { let start_index = find_start_of_telegram(buffer); let end_index = find_end_of_telegram(buffer, start_index.unwrap_or(0)); @@ -59,25 +55,22 @@ fn eat_telegrams<'a>(buffer: &'a str, consumer: &mut dyn super::TelegramConsumer end, telegram ); - consumer.consume(telegram); - let new_buffer = &buffer.replace(telegram, ""); - log::trace!("Buffer {} truncated to {}", buffer, new_buffer); - Cow::Owned(String::from(new_buffer)) + Option::Some(telegram) } (None, Some(_)) => { log::trace!("No start of telegram, clearing buffer"); - Cow::Owned(String::new()) + Option::None } (Some(_), _) => { log::trace!("There is no end of the telegram, keeping buffer {}", buffer); - Cow::Borrowed(buffer) + Option::None } (None, None) => { log::trace!( "There is no start and no end of the telegram, keeping buffer {}", buffer ); - Cow::Borrowed(buffer) + Option::None } } } @@ -99,29 +92,14 @@ pub fn read_from_serial_port( log::info!("Read error {}, clearing buffer", error.to_string()); // Just drop this telegram buffer.clear(); - } else { - let clone = buffer.clone(); - - let new_buffer = eat_telegrams(&clone, consumer); - - if buffer != new_buffer { - log::trace!("Replacing buffer {} with {}", buffer, new_buffer); - buffer.clear(); - buffer.push_str(&new_buffer); - } + } else if let Some(telegram) = extract_telegram(&buffer) { + consumer.consume(telegram); + return; } } } -pub fn connect_to_meter(serial_settings: settings::SerialSettings) -> Box { - log::info!( - "Connecting to {} using baud rate {}, byte size {} and parity bit {:#?}", - &serial_settings.port, - &serial_settings.baud_rate, - &serial_settings.byte_size, - &serial_settings.parity_bit - ); - +pub fn connect_to_meter(serial_settings: &settings::SerialSettings) -> Box { serialport::new(&serial_settings.port, serial_settings.baud_rate) .data_bits(to_databits(&serial_settings.byte_size)) .flow_control(serialport::FlowControl::None) @@ -179,98 +157,70 @@ mod tests { ); } - struct TestConsumer { - invoked: bool, - telegram: String, - } - impl TestConsumer { - fn new() -> Self { - TestConsumer { - invoked: false, - telegram: String::new(), - } - } - } - impl super::super::TelegramConsumer for TestConsumer { - fn consume(&mut self, telegram: &str) -> bool { - self.invoked = true; - self.telegram = String::from(telegram); - true - } - } - #[test] - fn eat_telegrams_only_start() { + fn eat_telegrams_only_start_1() { let text = String::from("/ISk5\\2MT382-1000\r\n\r\n1-3:0.2.8(40)"); - let mut consumer = TestConsumer::new(); - let result = eat_telegrams(&text, &mut consumer); + let result = extract_telegram(&text); - assert_eq!(result, text); - assert_eq!(consumer.invoked, false); + assert_eq!(result.is_none(), true); + } + #[test] + fn eat_telegrams_only_start_2() { let text = String::from("/ISk5\\2MT382-1000\r\n\r\n1-3:0.2.8(40)!"); - let mut consumer = TestConsumer::new(); - let result = eat_telegrams(&text, &mut consumer); + let result = extract_telegram(&text); - assert_eq!(result, text); - assert_eq!(consumer.invoked, false); + assert_eq!(result.is_none(), true); } #[test] fn eat_telegrams_only_end() { let text = String::from("0-1:24.4.0(1)\r\n!522B\r\n"); - let mut consumer = TestConsumer::new(); - let result = eat_telegrams(&text, &mut consumer); + let result = extract_telegram(&text); - assert_eq!(result, ""); - assert_eq!(consumer.invoked, false); + assert_eq!(result.is_none(), true); } #[test] fn eat_telegrams_start_and_end() { - let mut text = String::from( + let text = String::from( "/ISk5\\2MT382-1000\r\n\r\n1-3:0.2.8(40)\r\n!522B\r\n\r\n/ISk5\\2MT382-1000", ); - let mut consumer = TestConsumer::new(); - let result = eat_telegrams(&mut text, &mut consumer); + let result = extract_telegram(&text); - assert_eq!(consumer.invoked, true); + assert_eq!(result.is_some(), true); assert_eq!( - consumer.telegram, + result.unwrap(), "/ISk5\\2MT382-1000\r\n\r\n1-3:0.2.8(40)\r\n!522B\r\n" ); - assert_eq!(result, "\r\n/ISk5\\2MT382-1000"); } #[test] fn eat_telegrams_without_checksum_start_and_end() { - let mut text = + let text = String::from("/ISk5\\2MT382-1000\r\n\r\n1-3:0.2.8(40)\r\n!\r\n\r\n/ISk5\\2MT382-1000"); - let mut consumer = TestConsumer::new(); - let result = eat_telegrams(&mut text, &mut consumer); + let result = extract_telegram(&text); - assert_eq!(consumer.invoked, true); + assert_eq!(result.is_some(), true); assert_eq!( - consumer.telegram, + result.unwrap(), "/ISk5\\2MT382-1000\r\n\r\n1-3:0.2.8(40)\r\n!\r\n" ); - assert_eq!(result, "\r\n/ISk5\\2MT382-1000"); } #[test] fn eat_complete_telegram() { - let mut input = read_test_resource("input1.txt".into()); - let mut consumer = TestConsumer::new(); + let input = read_test_resource("input1.txt".into()); - let _result = eat_telegrams(&mut input, &mut consumer); + let result = extract_telegram(&input); - assert_eq!(consumer.invoked, true); - assert_eq!(consumer.telegram, read_test_resource("output1.txt".into()),); + assert_eq!(result.is_some(), true); + assert_eq!(result.unwrap(), read_test_resource("output1.txt".into()),); } fn read_test_resource(path: PathBuf) -> String { @@ -281,7 +231,5 @@ mod tests { let mut binding = fs::read_to_string(test_file).expect("Failed to read file"); let text = binding.as_mut_str(); return String::from(text); - - // return fs::read_to_string(test_file).expect("Failed to read file").as_mut_str(); } } diff --git a/src/dsmr/sender.rs b/src/dsmr/sender.rs index 7443c8d..7eb669d 100644 --- a/src/dsmr/sender.rs +++ b/src/dsmr/sender.rs @@ -52,7 +52,7 @@ pub struct DelegatingConsumer { logger: LoggingConsumer, } impl DelegatingConsumer { - pub fn new(targets: Vec) -> Self { + pub fn new(targets: &Vec) -> Self { let mut delegates: Vec> = Vec::with_capacity(targets.len() + 1); let logger: LoggingConsumer = LoggingConsumer::new(targets.len() as u32); diff --git a/src/main.rs b/src/main.rs index 5982179..1fa8ab9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,5 @@ -use serialport::SerialPort; - mod dsmr; +mod scheduler; fn init_logger(debug_logging: bool) { let console_level = if debug_logging { @@ -48,11 +47,18 @@ pub fn main() { let debug_logging = settings.get_bool("debug_logging").unwrap_or(false); init_logger(debug_logging); + let read_interval = settings.get_float("sleep").unwrap_or(0.5); + log::info!("dsmr-rs starting..."); let (serial_settings, api_settings) = dsmr::settings::settings(settings).unwrap(); - let mut consumer = dsmr::sender::DelegatingConsumer::new(api_settings.hosts); - let port: Box = dsmr::reader::connect_to_meter(serial_settings); + log::info!( + "Using serial port {} with baud rate {}, byte size {} and parity bit {:#?}", + &serial_settings.port, + &serial_settings.baud_rate, + &serial_settings.byte_size, + &serial_settings.parity_bit + ); - dsmr::reader::read_from_serial_port(port, &mut consumer); + scheduler::main_loop(api_settings, serial_settings, read_interval); } diff --git a/src/scheduler.rs b/src/scheduler.rs new file mode 100644 index 0000000..5b884ac --- /dev/null +++ b/src/scheduler.rs @@ -0,0 +1,25 @@ +use std::{thread, time}; + +use crate::dsmr; + +pub fn main_loop( + api_settings: dsmr::settings::HostSettings, + serial_settings: dsmr::settings::SerialSettings, + read_interval: f64, +) { + let interval = time::Duration::from_millis((read_interval * 1_000.0).round() as u64); + + loop { + { + let port = dsmr::reader::connect_to_meter(&serial_settings); + let mut consumer = dsmr::sender::DelegatingConsumer::new(&api_settings.hosts); + + dsmr::reader::read_from_serial_port(port, &mut consumer); + + // Close a port following the Resource Acquisition Is Initialization (RAII) paradigm + // by explicitly dropping the reference. + } + + thread::sleep(interval); + } +}