Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite packet parsing code #249

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
* Add ReadInput4 with EG4 18k generator data (#239, @pmccut)
* Add ReadInput4 keys to HA discovery (#240, @jgulick48)
* Fix min_chg_curr/max_chg_curr decoding in ReadInputAll packet (#242, @presto8)
* Cache Hold/Input registers internally as they're seen for later use (#248)
* Remove publish_individual_input configuration option (now they always are) (#250)
* Convert HA to use individual inputs messages (#253)
* New configuration open publish_inputs_all_trigger (#256)
* Move time register messages into new parser (start+end times for AC Charge etc) (#259)


# 0.13.0 - 27th October 2023
Expand Down
30 changes: 0 additions & 30 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ env_logger = { version = "~0.10", default-features = false, features = [] }
futures = "~0.3"
log = "~0.4"
net2 = "~0.2"
nom = "~7"
nom-derive = "~0.10"
num_enum = "~0.5"
rumqttc = "~0.20"
serde = { version = "~1 ", features = ["derive"] }
Expand Down
10 changes: 4 additions & 6 deletions src/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ pub struct Channels {
pub from_mqtt: broadcast::Sender<mqtt::ChannelData>,
pub to_mqtt: broadcast::Sender<mqtt::ChannelData>,
pub to_influx: broadcast::Sender<influx::ChannelData>,
pub to_database: broadcast::Sender<database::ChannelData>,
pub read_register_cache: broadcast::Sender<register_cache::ChannelData>,
pub to_register_cache: broadcast::Sender<register_cache::ChannelData>,
//pub to_database: broadcast::Sender<database::ChannelData>,
pub register_cache: broadcast::Sender<register_cache::ChannelData>,
}

impl Default for Channels {
Expand All @@ -26,9 +25,8 @@ impl Channels {
from_mqtt: Self::channel(),
to_mqtt: Self::channel(),
to_influx: Self::channel(),
to_database: Self::channel(),
read_register_cache: Self::channel(),
to_register_cache: Self::channel(),
//to_database: Self::channel(),
register_cache: Self::channel(),
}
}

Expand Down
16 changes: 10 additions & 6 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub struct Config {

#[serde(default = "Config::default_loglevel")]
pub loglevel: String,

#[serde(default = "Config::default_publish_inputs_all_trigger")]
pub publish_inputs_all_trigger: u16,
}

// Inverter {{{
Expand Down Expand Up @@ -109,8 +112,6 @@ pub struct Mqtt {

#[serde(default = "Config::default_mqtt_homeassistant")]
pub homeassistant: HomeAssistant,

pub publish_individual_input: Option<bool>,
}
impl Mqtt {
pub fn enabled(&self) -> bool {
Expand Down Expand Up @@ -140,10 +141,6 @@ impl Mqtt {
pub fn homeassistant(&self) -> &HomeAssistant {
&self.homeassistant
}

pub fn publish_individual_input(&self) -> bool {
self.publish_individual_input == Some(true)
}
} // }}}

// Influx {{{
Expand Down Expand Up @@ -329,6 +326,9 @@ impl ConfigWrapper {
pub fn loglevel(&self) -> String {
self.config.borrow().loglevel.to_owned()
}
pub fn publish_inputs_all_trigger(&self) -> u16 {
self.config.borrow().publish_inputs_all_trigger
}
}

impl Config {
Expand Down Expand Up @@ -364,6 +364,10 @@ impl Config {
fn default_loglevel() -> String {
"debug".to_string()
}

pub fn default_publish_inputs_all_trigger() -> u16 {
80
}
}

fn de_serial<'de, D>(deserializer: D) -> Result<Serial, D::Error>
Expand Down
74 changes: 8 additions & 66 deletions src/coordinator/commands/time_register_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,7 @@ use lxp::{
packet::{DeviceFunction, TranslatedData},
};

use serde::Serialize;

pub struct ReadTimeRegister {
channels: Channels,
inverter: config::Inverter,
action: Action,
}

#[derive(Debug, Serialize)]
struct MqttReplyPayload {
start: String,
end: String,
}

#[derive(Clone)]
pub enum Action {
AcCharge(u16),
AcFirst(u16),
Expand All @@ -42,20 +29,15 @@ impl Action {
ForcedDischarge(1) => Ok(84),
ForcedDischarge(2) => Ok(86),
ForcedDischarge(3) => Ok(88),
_ => bail!("unsupported command"),
_ => Err(anyhow!("unsupported command")),
}
}
}

fn mqtt_reply_topic(&self, datalog: Serial) -> String {
use Action::*;
// no need to be defensive about n here, we checked it already in register()
match self {
AcCharge(n) => format!("{}/ac_charge/{}", datalog, n),
AcFirst(n) => format!("{}/ac_first/{}", datalog, n),
ChargePriority(n) => format!("{}/charge_priority/{}", datalog, n),
ForcedDischarge(n) => format!("{}/forced_discharge/{}", datalog, n),
}
}
pub struct ReadTimeRegister {
channels: Channels,
inverter: config::Inverter,
action: Action,
}

impl ReadTimeRegister {
Expand All @@ -76,8 +58,6 @@ impl ReadTimeRegister {
values: vec![2, 0],
});

let mut receiver = self.channels.from_inverter.subscribe();

if self
.channels
.to_inverter
Expand All @@ -87,28 +67,7 @@ impl ReadTimeRegister {
bail!("send(to_inverter) failed - channel closed?");
}

let reply = receiver.wait_for_reply(&packet).await?;

if let Packet::TranslatedData(td) = reply {
let payload = MqttReplyPayload {
start: format!("{:02}:{:02}", td.values[0], td.values[1]),
end: format!("{:02}:{:02}", td.values[2], td.values[3]),
};
let message = mqtt::Message {
topic: self.action.mqtt_reply_topic(td.datalog),
retain: true,
payload: serde_json::to_string(&payload)?,
};
let channel_data = mqtt::ChannelData::Message(message);

if self.channels.to_mqtt.send(channel_data).is_err() {
bail!("send(to_mqtt) failed - channel closed?");
}

Ok(())
} else {
bail!("didn't get expected reply from inverter");
}
Ok(())
}
}

Expand Down Expand Up @@ -140,23 +99,6 @@ impl SetTimeRegister {
self.set_register(self.action.register()? + 1, &self.values[2..4])
.await?;

// FIXME: If we only update one of the two registers, we should probably
// still output the change we did manage to make here.
let payload = MqttReplyPayload {
start: format!("{:02}:{:02}", self.values[0], self.values[1]),
end: format!("{:02}:{:02}", self.values[2], self.values[3]),
};
let message = mqtt::Message {
topic: self.action.mqtt_reply_topic(self.inverter.datalog),
retain: true,
payload: serde_json::to_string(&payload)?,
};
let channel_data = mqtt::ChannelData::Message(message);

if self.channels.to_mqtt.send(channel_data).is_err() {
bail!("send(to_mqtt) failed - channel closed?");
}

Ok(())
}

Expand Down
Loading
Loading