Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read telegram with configurable pause in between #85

Merged
merged 4 commits into from
Nov 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions debian/postinst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ DATALOGGER_INPUT_METHOD=serial
# Baudrate for reading telegrams from the serial line.
DATALOGGER_SERIAL_BAUDRATE=9600

# The time in seconds that the datalogger will pause after each telegram written to the DSMR-reader API.
DATALOGGER_SLEEP=5

EOF
# Ensure that the config file has the correct ownership
chown ${PKG_USER}:${PKG_USER} ${PKG_CONF}
Expand Down
120 changes: 34 additions & 86 deletions src/dsmr/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ use super::settings::ParityBitSetting;

use serialport::SerialPort;

use std::borrow::Cow;
use std::io::BufRead;
use std::io::BufReader;
use std::io::{BufRead, BufReader};
use std::str;
use std::time::Duration;

Expand Down Expand Up @@ -42,11 +40,9 @@ fn find_end_of_telegram(buffer: &str, from: usize) -> Option<usize> {
}

// Scans the buffer to see if there is a telegram in it.
// If so, invokes the consumer with that telegram.
// Otherwise, clear the buffer (if there is no telegram unders construction)
// or keep it intact (if a telegram is under construction).
// Returns the new state of the buffer - which may be the unmodified buffer, or a new buffer.
fn eat_telegrams<'a>(buffer: &'a str, consumer: &mut dyn super::TelegramConsumer) -> Cow<'a, str> {
// If so, returns that telegram.
// Otherwise, return nothing
fn extract_telegram(buffer: &str) -> std::option::Option<&str> {
let start_index = find_start_of_telegram(buffer);
let end_index = find_end_of_telegram(buffer, start_index.unwrap_or(0));

Expand All @@ -59,25 +55,22 @@ fn eat_telegrams<'a>(buffer: &'a str, consumer: &mut dyn super::TelegramConsumer
end,
telegram
);
consumer.consume(telegram);
let new_buffer = &buffer.replace(telegram, "");
log::trace!("Buffer {} truncated to {}", buffer, new_buffer);
Cow::Owned(String::from(new_buffer))
Option::Some(telegram)
}
(None, Some(_)) => {
log::trace!("No start of telegram, clearing buffer");
Cow::Owned(String::new())
Option::None
}
(Some(_), _) => {
log::trace!("There is no end of the telegram, keeping buffer {}", buffer);
Cow::Borrowed(buffer)
Option::None
}
(None, None) => {
log::trace!(
"There is no start and no end of the telegram, keeping buffer {}",
buffer
);
Cow::Borrowed(buffer)
Option::None
}
}
}
Expand All @@ -99,29 +92,14 @@ pub fn read_from_serial_port(
log::info!("Read error {}, clearing buffer", error.to_string());
// Just drop this telegram
buffer.clear();
} else {
let clone = buffer.clone();

let new_buffer = eat_telegrams(&clone, consumer);

if buffer != new_buffer {
log::trace!("Replacing buffer {} with {}", buffer, new_buffer);
buffer.clear();
buffer.push_str(&new_buffer);
}
} else if let Some(telegram) = extract_telegram(&buffer) {
consumer.consume(telegram);
return;
}
}
}

pub fn connect_to_meter(serial_settings: settings::SerialSettings) -> Box<dyn SerialPort> {
log::info!(
"Connecting to {} using baud rate {}, byte size {} and parity bit {:#?}",
&serial_settings.port,
&serial_settings.baud_rate,
&serial_settings.byte_size,
&serial_settings.parity_bit
);

pub fn connect_to_meter(serial_settings: &settings::SerialSettings) -> Box<dyn SerialPort> {
serialport::new(&serial_settings.port, serial_settings.baud_rate)
.data_bits(to_databits(&serial_settings.byte_size))
.flow_control(serialport::FlowControl::None)
Expand Down Expand Up @@ -179,98 +157,70 @@ mod tests {
);
}

struct TestConsumer {
invoked: bool,
telegram: String,
}
impl TestConsumer {
fn new() -> Self {
TestConsumer {
invoked: false,
telegram: String::new(),
}
}
}
impl super::super::TelegramConsumer for TestConsumer {
fn consume(&mut self, telegram: &str) -> bool {
self.invoked = true;
self.telegram = String::from(telegram);
true
}
}

#[test]
fn eat_telegrams_only_start() {
fn eat_telegrams_only_start_1() {
let text = String::from("/ISk5\\2MT382-1000\r\n\r\n1-3:0.2.8(40)");
let mut consumer = TestConsumer::new();

let result = eat_telegrams(&text, &mut consumer);
let result = extract_telegram(&text);

assert_eq!(result, text);
assert_eq!(consumer.invoked, false);
assert_eq!(result.is_none(), true);
}

#[test]
fn eat_telegrams_only_start_2() {
let text = String::from("/ISk5\\2MT382-1000\r\n\r\n1-3:0.2.8(40)!");
let mut consumer = TestConsumer::new();

let result = eat_telegrams(&text, &mut consumer);
let result = extract_telegram(&text);

assert_eq!(result, text);
assert_eq!(consumer.invoked, false);
assert_eq!(result.is_none(), true);
}

#[test]
fn eat_telegrams_only_end() {
let text = String::from("0-1:24.4.0(1)\r\n!522B\r\n");
let mut consumer = TestConsumer::new();

let result = eat_telegrams(&text, &mut consumer);
let result = extract_telegram(&text);

assert_eq!(result, "");
assert_eq!(consumer.invoked, false);
assert_eq!(result.is_none(), true);
}

#[test]
fn eat_telegrams_start_and_end() {
let mut text = String::from(
let text = String::from(
"/ISk5\\2MT382-1000\r\n\r\n1-3:0.2.8(40)\r\n!522B\r\n\r\n/ISk5\\2MT382-1000",
);
let mut consumer = TestConsumer::new();

let result = eat_telegrams(&mut text, &mut consumer);
let result = extract_telegram(&text);

assert_eq!(consumer.invoked, true);
assert_eq!(result.is_some(), true);
assert_eq!(
consumer.telegram,
result.unwrap(),
"/ISk5\\2MT382-1000\r\n\r\n1-3:0.2.8(40)\r\n!522B\r\n"
);
assert_eq!(result, "\r\n/ISk5\\2MT382-1000");
}

#[test]
fn eat_telegrams_without_checksum_start_and_end() {
let mut text =
let text =
String::from("/ISk5\\2MT382-1000\r\n\r\n1-3:0.2.8(40)\r\n!\r\n\r\n/ISk5\\2MT382-1000");
let mut consumer = TestConsumer::new();

let result = eat_telegrams(&mut text, &mut consumer);
let result = extract_telegram(&text);

assert_eq!(consumer.invoked, true);
assert_eq!(result.is_some(), true);
assert_eq!(
consumer.telegram,
result.unwrap(),
"/ISk5\\2MT382-1000\r\n\r\n1-3:0.2.8(40)\r\n!\r\n"
);
assert_eq!(result, "\r\n/ISk5\\2MT382-1000");
}

#[test]
fn eat_complete_telegram() {
let mut input = read_test_resource("input1.txt".into());
let mut consumer = TestConsumer::new();
let input = read_test_resource("input1.txt".into());

let _result = eat_telegrams(&mut input, &mut consumer);
let result = extract_telegram(&input);

assert_eq!(consumer.invoked, true);
assert_eq!(consumer.telegram, read_test_resource("output1.txt".into()),);
assert_eq!(result.is_some(), true);
assert_eq!(result.unwrap(), read_test_resource("output1.txt".into()),);
}

fn read_test_resource(path: PathBuf) -> String {
Expand All @@ -281,7 +231,5 @@ mod tests {
let mut binding = fs::read_to_string(test_file).expect("Failed to read file");
let text = binding.as_mut_str();
return String::from(text);

// return fs::read_to_string(test_file).expect("Failed to read file").as_mut_str();
}
}
2 changes: 1 addition & 1 deletion src/dsmr/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub struct DelegatingConsumer {
logger: LoggingConsumer,
}
impl DelegatingConsumer {
pub fn new(targets: Vec<settings::Host>) -> Self {
pub fn new(targets: &Vec<settings::Host>) -> Self {
let mut delegates: Vec<Box<dyn TelegramConsumer>> = Vec::with_capacity(targets.len() + 1);

let logger: LoggingConsumer = LoggingConsumer::new(targets.len() as u32);
Expand Down
16 changes: 11 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use serialport::SerialPort;

mod dsmr;
mod scheduler;

fn init_logger(debug_logging: bool) {
let console_level = if debug_logging {
Expand Down Expand Up @@ -48,11 +47,18 @@ pub fn main() {
let debug_logging = settings.get_bool("debug_logging").unwrap_or(false);
init_logger(debug_logging);

let read_interval = settings.get_float("sleep").unwrap_or(0.5);

log::info!("dsmr-rs starting...");
let (serial_settings, api_settings) = dsmr::settings::settings(settings).unwrap();
let mut consumer = dsmr::sender::DelegatingConsumer::new(api_settings.hosts);

let port: Box<dyn SerialPort> = dsmr::reader::connect_to_meter(serial_settings);
log::info!(
"Using serial port {} with baud rate {}, byte size {} and parity bit {:#?}",
&serial_settings.port,
&serial_settings.baud_rate,
&serial_settings.byte_size,
&serial_settings.parity_bit
);

dsmr::reader::read_from_serial_port(port, &mut consumer);
scheduler::main_loop(api_settings, serial_settings, read_interval);
}
25 changes: 25 additions & 0 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use std::{thread, time};

use crate::dsmr;

pub fn main_loop(
api_settings: dsmr::settings::HostSettings,
serial_settings: dsmr::settings::SerialSettings,
read_interval: f64,
) {
let interval = time::Duration::from_millis((read_interval * 1_000.0).round() as u64);

loop {
{
let port = dsmr::reader::connect_to_meter(&serial_settings);
let mut consumer = dsmr::sender::DelegatingConsumer::new(&api_settings.hosts);

dsmr::reader::read_from_serial_port(port, &mut consumer);

// Close a port following the Resource Acquisition Is Initialization (RAII) paradigm
// by explicitly dropping the reference.
}

thread::sleep(interval);
}
}
Loading