Skip to content

Commit

Permalink
Individual register MQTT export throttling.
Browse files Browse the repository at this point in the history
Since all registers in a block are read on one go on MODBUS, which is probably efficient, it is hard to find a good trade-off between getting fast updates on config changes and events, vs. not being flooded with continuous measurements.

Added a simple throttling option, to set the minimum interval between samples on chosen sensors.

Turned throttling on on fan RPM measurements (10sec) and the 5 temperature sensor registers (20sec)

Added MQTT retain bit per register, default on, and turned it off for the measurement sensors above and the clock registers.

Made it possible to flag in the RegisterManager::Visitor to visit only if the throttle timer is also signaling ready - used by MQTT visitor.

This throttling function required 32bits extra memory per register. Chose to not go full 32 bit on the time stamping to save space, and to reduce the number of divisions on checking, chose a power of 2 factor resolution. So, throttle times have a resolution of 256ms, and timestamps are stored as 16-bit values in units of 256ms. This means the max configurable throttle time is 0xffff * 256ms = 16777s or 4.6hrs.
Remark that the values exported, when throttling, may be up to the polling period of the register block old, but I gather e.g. temperatures should change pretty slowly, while the fan speed jiggling reported are probably inaccuracy on the sensors.

Tested by adding a delta time measurement in a MQTT subscriber and printing, looks OK:

00:24:13;178: systemair/temperature/exhaust           :            13.5 dt 24.2
00:24:13;242: systemair/clock/second                  :               9 dt 1.1
00:24:14;537: systemair/clock/second                  :              10 dt 1.3
00:24:15;423: systemair/clock/second                  :              11 dt 0.9
00:24:17;284: systemair/fan/supply/rpm                :            3120 dt 10.7
00:24:17;345: systemair/fan/extract/rpm               :            2820 dt 10.8
00:24:17;345: systemair/clock/second                  :              13 dt 1.9
00:24:17;962: systemair/temperature/extract           :            20.9 dt 20.3
00:24:18;025: systemair/clock/second                  :              14 dt 0.7
00:24:19;220: systemair/clock/second                  :              15 dt 1.2
00:24:20;126: systemair/clock/second                  :              16 dt 0.9
00:24:21;469: systemair/clock/second                  :              17 dt 1.3
00:24:22;366: systemair/clock/second                  :              18 dt 0.9
00:24:26;472: systemair/clock/second                  :              22 dt 4.1
00:24:27;563: systemair/fan/supply/rpm                :            2820 dt 10.3
00:24:27;610: systemair/fan/extract/rpm               :            2580 dt 10.3
00:24:27;611: systemair/clock/second                  :              23 dt 1.1
00:24:28;225: systemair/clock/second                  :              24 dt 0.6
00:24:29;214: systemair/clock/second                  :              25 dt 1.0
00:24:31;173: systemair/temperature/protection        :            21.3 dt 21.2
00:24:31;221: systemair/clock/second                  :              27 dt 2.0
00:24:32;059: systemair/clock/second                  :              28 dt 0.8
00:24:33;421: systemair/temperature/exhaust           :            13.8 dt 20.2
00:24:33;467: systemair/clock/second                  :              29 dt 1.4
00:24:34;403: systemair/clock/second                  :              30 dt 0.9
00:24:35;759: systemair/clock/second                  :              31 dt 1.4
00:24:36;656: systemair/clock/second                  :              32 dt 0.9
00:24:38;501: systemair/fan/supply/rpm                :            2820 dt 10.9
00:24:38;563: systemair/fan/extract/rpm               :            2580 dt 11.0
00:24:38;563: systemair/temperature/extract           :            21.1 dt 20.6
00:24:38;563: systemair/clock/second                  :              34 dt 1.9
00:24:39;178: systemair/clock/second                  :              35 dt 0.6
00:24:40;553: systemair/clock/second                  :              36 dt 1.4
00:24:41;439: systemair/clock/second                  :              37 dt 0.9
00:24:42;798: systemair/clock/second                  :              38 dt 1.4
00:24:43;697: systemair/clock/second                  :              39 dt 0.9
00:24:45;238: systemair/clock/second                  :              41 dt 1.5
00:24:46;148: systemair/clock/second                  :              42 dt 0.9
00:24:47;505: systemair/clock/second                  :              43 dt 1.4
00:24:48;500: systemair/fan/supply/rpm                :            3060 dt 10.0
00:24:48;564: systemair/fan/extract/rpm               :            2820 dt 10.0
00:24:49;068: systemair/clock/second                  :              45 dt 1.6
00:24:50;049: systemair/clock/second                  :              46 dt 1.0
00:24:52;005: systemair/clock/second                  :              48 dt 2.0
00:24:52;051: systemair/temperature/protection        :            21.6 dt 20.9
00:24:52;888: systemair/clock/second                  :              49 dt 0.9
00:24:53;616: systemair/temperature/exhaust           :            13.8 dt 20.2
00:24:54;230: systemair/clock/second                  :              50 dt 1.3
00:24:55;239: systemair/clock/second                  :              51 dt 1.0
00:24:55;287: systemair/temperature/intake            :            10.3 dt 45.3
00:24:56;588: systemair/clock/second                  :              52 dt 1.3
  • Loading branch information
sigvind committed May 21, 2022
1 parent 27f9b6e commit 7b75d0f
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 63 deletions.
22 changes: 14 additions & 8 deletions src/main.ino
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ class MQTTUpdateVisitor : public RegisterManager::Visitor
{
uint32_t m_messages_sent; // num MQTT messages sent.
public:
MQTTUpdateVisitor() : Visitor(0,FLG_VISIT_REG_SINGLE|FLG_VISIT_UPDATED), m_messages_sent(0) {}
MQTTUpdateVisitor() : Visitor(0,FLG_VISIT_REG_SINGLE|FLG_VISIT_UPDATED_THROTTLE), m_messages_sent(0) {}
//~MQTTUpdateVisitor();

uint32_t numMessagesSent() const { return m_messages_sent; }
Expand All @@ -388,24 +388,26 @@ public:
// Implement in your visitor class and return negative to
// interrupt traverse.
virtual int32_t visit( Register &reg ) { return 0; };
virtual int32_t visit( int32_t address, Register &reg );;
virtual int32_t visit( int32_t address, Register &reg, const Register::SingleReg &sreg );

};

// Called to visit updated nodes only, no need to check specifically
int32_t MQTTUpdateVisitor::visit( int32_t address, Register &reg)
int32_t MQTTUpdateVisitor::visit( int32_t address, Register &reg, const Register::SingleReg &sreg)
{
const String &name = reg.getRegisterName( address );
String formatted = reg.getFormattedValue( address );

debugI("visit %s", name.c_str());
bool retain = sreg.mqttRetain();

sendMqttMessage(name, formatted);
//debugI("visit %s retain %s", name.c_str(), retain ? "Y":"N");

sendMqttMessage(name, formatted, retain);

//TODO : could send fail and we need to handle that ?

// Clear updated flag - i.e signal that change has been handled.
reg.confirmUpdate( address );
reg.confirmUpdate( address, millis() );

m_messages_sent++;

Expand Down Expand Up @@ -643,12 +645,16 @@ void mqttMessageReceived(String &topic, String &payload) {

}

void sendMqttMessage(const String &name, String &payload) {
void sendMqttMessage(const String &name, String &payload, bool retain) {

if(!mqtt.connected() || mqttConfig == NULL || strlen(mqttConfig->publishTopic) == 0) return;

String topic = String(mqttConfig->publishTopic) + "/" + name;
debugD("MQTT publish to %s with payload %s", topic.c_str(), payload.c_str());

debugD("MQTT publish %-20s : %s %s", topic.c_str(), payload.c_str(), retain ? "retain" : "");

mqtt.publish(topic.c_str(), payload.c_str(), true, 0);
mqtt.loop();

yield();
}
14 changes: 7 additions & 7 deletions src/register/clockregister.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
class ClockRegister : public Register {
public:
ClockRegister() : Register("clock", 550, 7, 1000, false) {
addRegister(REG_CLK_S, "clock/second");
addRegister(REG_CLK_M, "clock/minute");
addRegister(REG_CLK_H, "clock/hour");
addRegister(REG_CLK_D, "clock/day");
addRegister(REG_CLK_MNTH, "clock/month");
addRegister(REG_CLK_Y, "clock/year");
addRegister(REG_CLK_WD, "clock/weekday", PERM_READ_ONLY);
addRegister(REG_CLK_S, "clock/second", PERM_FULL, false );
addRegister(REG_CLK_M, "clock/minute", PERM_FULL, false );
addRegister(REG_CLK_H, "clock/hour", PERM_FULL, false );
addRegister(REG_CLK_D, "clock/day", PERM_FULL, false);
addRegister(REG_CLK_MNTH, "clock/month", PERM_FULL, false );
addRegister(REG_CLK_Y, "clock/year", PERM_FULL, false);
addRegister(REG_CLK_WD, "clock/weekday", PERM_READ_ONLY, false);
};

String getFormattedValue(int address) const;
Expand Down
4 changes: 2 additions & 2 deletions src/register/fanregister.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ class FanRegister : public Register {
addRegister(REG_FAN_FLOW_UNITS, "fan/flow/unit");
addRegister(REG_FAN_SF_PWM, "fan/supply/pwm", PERM_READ_ONLY);
addRegister(REG_FAN_EF_PWM, "fan/extract/pwm", PERM_READ_ONLY);
addRegister(REG_FAN_SF_RPM, "fan/supply/rpm", PERM_READ_ONLY);
addRegister(REG_FAN_EF_RPM, "fan/extract/rpm", PERM_READ_ONLY);
addRegister(REG_FAN_SF_RPM, "fan/supply/rpm", PERM_READ_ONLY, false, 10000 );
addRegister(REG_FAN_EF_RPM, "fan/extract/rpm", PERM_READ_ONLY, false, 10000 );
addRegister(REG_FAN_SPEED_LVL_CD, "fan/speed/display", PERM_READ_ONLY);
addRegister(REG_FAN_ALLOW_MANUAL_FAN_STOP, "fan/allowstop");
addRegister(REG_FAN_SPEED_LOG_RESET, "fan/speed/log/reset", PERM_WRITE_ONLY);
Expand Down
10 changes: 5 additions & 5 deletions src/register/heaterregister.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ class HeaterRegister : public Register {
addRegister(REG_HC_TEMP_LVL3, "temperature/level3", PERM_READ_ONLY);
addRegister(REG_HC_TEMP_LVL4, "temperature/level4", PERM_READ_ONLY);
addRegister(REG_HC_TEMP_LVL5, "temperature/level5", PERM_READ_ONLY);
addRegister(REG_HC_TEMP_IN1, "temperature/supply", PERM_READ_ONLY);
addRegister(REG_HC_TEMP_IN2, "temperature/extract", PERM_READ_ONLY);
addRegister(REG_HC_TEMP_IN3, "temperature/exhaust", PERM_READ_ONLY);
addRegister(REG_HC_TEMP_IN4, "temperature/protection", PERM_READ_ONLY);
addRegister(REG_HC_TEMP_IN5, "temperature/intake", PERM_READ_ONLY);
addRegister(REG_HC_TEMP_IN1, "temperature/supply", PERM_READ_ONLY, false, 20000);
addRegister(REG_HC_TEMP_IN2, "temperature/extract", PERM_READ_ONLY, false,20000);
addRegister(REG_HC_TEMP_IN3, "temperature/exhaust", PERM_READ_ONLY, false,20000);
addRegister(REG_HC_TEMP_IN4, "temperature/protection", PERM_READ_ONLY, false, 20000);
addRegister(REG_HC_TEMP_IN5, "temperature/intake", PERM_READ_ONLY, false, 20000 );
addRegister(REG_HC_TEMP_STATE, "", PERM_NONE);
addRegister(REG_HC_PREHEATER_TYPE, "preheater/type");
addRegister(REG_HC_HEATER_TEMP_SP, "heater/setpoint", PERM_READ_ONLY);
Expand Down
72 changes: 64 additions & 8 deletions src/register/register.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,44 @@
#include "register.h"
#include "../debugger.h"

/* ******************************************************** */
// set flags, return the ones that were previously 0.
uint8_t Register::SingleReg::setFlags( uint8_t flags )
{
uint8_t prev = getFlags( flags );

m_flags |= flags;

return flags & ~prev;
}

/* ******************************************************** */
// clear flags return the ones that were previusly 1.
uint8_t Register::SingleReg::clearFlags( uint8_t flags )
{
uint32_t prev = getFlags( flags );
m_flags &= ~flags;
return flags & prev;
}

/* ******************************************************** */
bool Register::SingleReg::readyForMqttExport( uint32_t ms_now ) const
{
if ( getFlags( FLG_VALUE_UPDATED ) == 0 )
return false;

if ( m_mqtt_period_256ms==0 ) return true; // no throttling set.
if ( m_mqtt_timestamp_256ms==0 ) return true; // no timestamp set yet.

uint16_t tm = MsTo256ms( ms_now );

uint16_t diff = tm - m_mqtt_timestamp_256ms; // signed diff, negative values will have upper bit set, i.e large.
if ( diff > m_mqtt_period_256ms )
return true;

return false;
}

/* ******************************************************** */
String Register::getName() const {
return m_name;
Expand Down Expand Up @@ -55,29 +93,42 @@ boolean Register::setValue(int address, int value)

bool changed = (current != value) || !isReadable(address);
if ( changed )
reg.m_flags |= FLG_VALUE_UPDATED;

{
if ( reg.setFlags(FLG_VALUE_UPDATED) != 0 )
{
}
}

return changed;
}

/* ******************************************************** */
bool Register::hasUpdatedValue(int32_t address) const
bool Register::hasUpdatedValue(int32_t address, const SingleReg **sreg_pp) const
{
int32_t idx = addr2Index( address, "hasUpdatedValue" );
if( idx == VAL_INVALID )
return false;

return (m_single_regs[idx].m_flags & FLG_VALUE_UPDATED);
SingleReg *sr = &m_single_regs[idx];

if ( sreg_pp ) *sreg_pp = sr; // return single register context pointer

return (sr->getFlags(FLG_VALUE_UPDATED)) ? true : false;

}

/* ******************************************************** */
void Register::confirmUpdate( int32_t address )
void Register::confirmUpdate( int32_t address, uint32_t millis )
{
int idx = addr2Index( address, "confirmUpdate");
if( idx == VAL_INVALID )
return ;

m_single_regs[idx].m_flags &= (~FLG_VALUE_UPDATED);

m_single_regs[idx].clearFlags( FLG_VALUE_UPDATED);

// remember time last exported for possible additional throttling.
m_single_regs[idx].setMqttExportTimestamp( millis );

}


Expand Down Expand Up @@ -135,7 +186,7 @@ void Register::confirmPendingWrite( int32_t address )


/* ******************************************************** */
void Register::addRegister(int address, const String &name, byte permission) {
void Register::addRegister(int address, const String &name, byte permission, bool mqtt_retain, uint32_t mqtt_throttle_time_ms) {
int idx = addr2Index( address, "addRegister");
if( idx == VAL_INVALID )
return;
Expand All @@ -144,6 +195,11 @@ void Register::addRegister(int address, const String &name, byte permission) {
r.m_perm = permission;
r.m_value = isReadable( address ) ? VAL_INVALID : 0;
r.m_flags = 0; // TODO add flags?
r.m_mqtt_timestamp_256ms = 0;

if ( mqtt_retain ) r.setFlags(FLG_MQTT_RETAIN);
r.setMqttThrottlePeriodMs( mqtt_throttle_time_ms);

}

/* ******************************************************** */
Expand Down
108 changes: 91 additions & 17 deletions src/register/register.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,66 @@
*/
class Register
{


public:
enum Flags
{
FLG_VALUE_UPDATED = 0x01,
FLG_PENDING_WRITE = 0x02,
FLG_MQTT_RETAIN = 0x04
};

// ******************************************
// Context info kept for single registers
// *****************************************************************************
// Context info kept for single registers. Only readables public to others than
// Register class.
class SingleReg
{
SingleReg() : m_value(VAL_INVALID), m_pending_set_value(VAL_INVALID),m_perm(0),m_flags(0){}
private:
String m_name;
int32_t m_value;
int32_t m_pending_set_value;
uint8_t m_perm;
uint8_t m_flags;
public:
SingleReg() : m_value(VAL_INVALID), m_pending_set_value(VAL_INVALID),m_perm(0),m_flags(0),m_mqtt_period_256ms(0),m_mqtt_timestamp_256ms(0){}

// Read, typically in a visitor, whether register is due for MQTT export, i.e
// the value is updated compared to the previously exported, and any throttling
// time has passed too.
// PS: visitors can also be flagged to only visit registers that fulfill this
// check, i.e let the RegistryManager check before allowing the visit.
bool readyForMqttExport( uint32_t ms_now ) const;

// Get MQTT retain flag for single-register. Typically, frequently updated
// sensor values may not have retain set, while slow updating fields would
// use retain. Also retain flags may need overriding for test purposes.
bool mqttRetain() const { return getFlags(FLG_MQTT_RETAIN)!=0 ? true : false; }

protected:
// Throttle export interval in addition to the polling period for the register block.
// This can be used if a block is polled for fast response on setting registers, while
// not wanting to be floded by continously changing measurement registers.
// To save memory and ease calcs period is set in resolution of 256ms so we can shift
// the millisecond counter 8 bits down and look at the next 16 next bits.
// storing period as 16 bits yields a max throttle period of 65536*256ms = 16776s = 4.6hrs
void setMqttThrottlePeriod256ms( uint16_t period_256ms ) { m_mqtt_period_256ms=period_256ms; }
void setMqttThrottlePeriodMs( uint32_t ms ) { setMqttThrottlePeriod256ms( MsTo256ms(ms) ); }
uint16_t getMqttThrottlePeriod256ms( uint16_t period ) const { return m_mqtt_period_256ms; }
uint16_t setMqttExportTimestamp( uint32_t ms_now ) { m_mqtt_timestamp_256ms = MsTo256ms(ms_now); return m_mqtt_timestamp_256ms; }

static uint16_t MsTo256ms(uint32_t ms ) { return (uint16_t) ((ms>>8) & 0xffff); }

// set flags, return the ones that were previously 0.
uint8_t setFlags( uint8_t flags );
// clear flags return the ones that were previusly 1.
uint8_t clearFlags( uint8_t flags );
uint8_t getFlags(uint8_t flags=0xff) const { return m_flags & flags; }


friend class Register; // to let Register access private members
private:
String m_name;
int32_t m_value;
int32_t m_pending_set_value;
uint8_t m_perm;
uint8_t m_flags;
uint16_t m_mqtt_period_256ms;
uint16_t m_mqtt_timestamp_256ms;

friend class Register; // to let Register access private members
};

public:
Expand All @@ -56,25 +96,52 @@ class Register
int getLength() const;
bool isCoil() const;

// Use this method to set a new value to be written
// by the registry manager.
// Use this method to set a new value to be written by the registry manager.
// returns true if there was a change.
bool setNewPendingWriteValue( int32_t address, int32_t value );

// check if there is a pending write request
bool hasPendingWrite(int32_t address) const;

// Get value of pending write.
int32_t getPendingWriteValue( int32_t address ) const;

// Clear pending write flag
void confirmPendingWrite( int32_t address );



bool hasUpdatedValue(int32_t address) const; // check update flag.
void confirmUpdate(int32_t address); // acknowledge the updated value.
// Check if value is updated. Returns flag, but also returns
// in ready_for_export whether throttling indicates whether to send MQTT
// given the ms timestamp now.
bool hasUpdatedValue(int32_t address, const SingleReg **sreg_pp=0) const; // check update flag.
void confirmUpdate(int32_t address, uint32_t millis); // acknowledge the updated value.
int getValue(int address) const; // get latest read value.


void addRegister(int address, const String &name, byte permission=PERM_FULL);
/* add a single register definition.
@param address
Address from MODBUS spec SystemAir Villavent
@param name
Topic name on MQTT.
@param permission
Read/Write/FUll permission.
@param mqtt_retain
Whether retain flag should be used when posting MQTT messages for this parameter.
Later a global override rule may be added if required
@param mqtt_throttle_time_ms
Throtling of frequency of MQTT export of this parameter.
For space/performance has a resolution of 256ms steps, if you must
specify at least 256ms, and e.g. 1000ms yields (1000/256)*256 = (1000>>8)*256 = 768ms
Also beware that the polling time of the register block containing the register will
influence how old the value exported could be, but you always get the latest sampled
value.
If having a MODBUS poll period of 500ms on the register block you can choose to set
throttle time on some sub-registers higher, but not lower, to reduce the frequency of
getting continous measurements, f.ex for the fan speed and temperature readings,
that jump up and down and cause most of the traffic.
Some user may prefer a higher interval to reduce storage space in logging databases.
*/
void addRegister(int address, const String &name, byte permission=PERM_FULL, bool mqtt_retain=true, uint32_t mqtt_throttle_time_ms=0 );
const String &getRegisterName(int address) const;
int getRegisterAddress(const String &name) const;
bool isReadable(int address) const;
Expand All @@ -83,14 +150,21 @@ class Register
virtual String getFormattedValue(int address) const;
virtual boolean setFormattedValue(int address, const String &value, bool setpending=false);

// Check if register block needs a new poll on Modbus.
boolean needsUpdate(uint32_t millis) const;
// Set timetamp of poll in milliseconds.
void setLastUpdated(uint32_t millis);
// Get timestamp of last poll in milliseconds.
uint32_t getLastUpdated() const { return m_last_updated; }
// Get configured interval for polling of register block.
int32_t getInterval() const { return m_interval; }

protected:
// Set the value in the cache for a given register address. This is
// typically only done by the RegisterManager after reading a new value on Modbus.
boolean setValue(int address, int value); // set value, return true if changed.

// Map from Modbus address to index to Register block containing that register address.
int addr2Index( int address, const char *source ) const;

private:
Expand Down
Loading

0 comments on commit 7b75d0f

Please sign in to comment.