diff --git a/src/main.ino b/src/main.ino index 2afefd5..9aaf917 100644 --- a/src/main.ino +++ b/src/main.ino @@ -379,7 +379,7 @@ class MQTTUpdateVisitor : public RegisterManager::Visitor { uint32_t m_messages_sent; // num MQTT messages sent. public: - MQTTUpdateVisitor() : Visitor(0,FLG_VISIT_REG_SINGLE|FLG_VISIT_UPDATED), m_messages_sent(0) {} + MQTTUpdateVisitor() : Visitor(0,FLG_VISIT_REG_SINGLE|FLG_VISIT_UPDATED_THROTTLE), m_messages_sent(0) {} //~MQTTUpdateVisitor(); uint32_t numMessagesSent() const { return m_messages_sent; } @@ -388,24 +388,26 @@ public: // Implement in your visitor class and return negative to // interrupt traverse. virtual int32_t visit( Register ® ) { return 0; }; - virtual int32_t visit( int32_t address, Register ® );; + virtual int32_t visit( int32_t address, Register ®, const Register::SingleReg &sreg ); }; // Called to visit updated nodes only, no need to check specifically -int32_t MQTTUpdateVisitor::visit( int32_t address, Register ®) +int32_t MQTTUpdateVisitor::visit( int32_t address, Register ®, const Register::SingleReg &sreg) { const String &name = reg.getRegisterName( address ); String formatted = reg.getFormattedValue( address ); - debugI("visit %s", name.c_str()); + bool retain = sreg.mqttRetain(); - sendMqttMessage(name, formatted); + //debugI("visit %s retain %s", name.c_str(), retain ? "Y":"N"); + + sendMqttMessage(name, formatted, retain); //TODO : could send fail and we need to handle that ? // Clear updated flag - i.e signal that change has been handled. - reg.confirmUpdate( address ); + reg.confirmUpdate( address, millis() ); m_messages_sent++; @@ -643,12 +645,16 @@ void mqttMessageReceived(String &topic, String &payload) { } -void sendMqttMessage(const String &name, String &payload) { +void sendMqttMessage(const String &name, String &payload, bool retain) { + if(!mqtt.connected() || mqttConfig == NULL || strlen(mqttConfig->publishTopic) == 0) return; String topic = String(mqttConfig->publishTopic) + "/" + name; - debugD("MQTT publish to %s with payload %s", topic.c_str(), payload.c_str()); + + debugD("MQTT publish %-20s : %s %s", topic.c_str(), payload.c_str(), retain ? "retain" : ""); + mqtt.publish(topic.c_str(), payload.c_str(), true, 0); mqtt.loop(); + yield(); } \ No newline at end of file diff --git a/src/register/clockregister.h b/src/register/clockregister.h index 58f2e40..ff21e0a 100644 --- a/src/register/clockregister.h +++ b/src/register/clockregister.h @@ -14,13 +14,13 @@ class ClockRegister : public Register { public: ClockRegister() : Register("clock", 550, 7, 1000, false) { - addRegister(REG_CLK_S, "clock/second"); - addRegister(REG_CLK_M, "clock/minute"); - addRegister(REG_CLK_H, "clock/hour"); - addRegister(REG_CLK_D, "clock/day"); - addRegister(REG_CLK_MNTH, "clock/month"); - addRegister(REG_CLK_Y, "clock/year"); - addRegister(REG_CLK_WD, "clock/weekday", PERM_READ_ONLY); + addRegister(REG_CLK_S, "clock/second", PERM_FULL, false ); + addRegister(REG_CLK_M, "clock/minute", PERM_FULL, false ); + addRegister(REG_CLK_H, "clock/hour", PERM_FULL, false ); + addRegister(REG_CLK_D, "clock/day", PERM_FULL, false); + addRegister(REG_CLK_MNTH, "clock/month", PERM_FULL, false ); + addRegister(REG_CLK_Y, "clock/year", PERM_FULL, false); + addRegister(REG_CLK_WD, "clock/weekday", PERM_READ_ONLY, false); }; String getFormattedValue(int address) const; diff --git a/src/register/fanregister.h b/src/register/fanregister.h index bfee120..ccfd494 100644 --- a/src/register/fanregister.h +++ b/src/register/fanregister.h @@ -55,8 +55,8 @@ class FanRegister : public Register { addRegister(REG_FAN_FLOW_UNITS, "fan/flow/unit"); addRegister(REG_FAN_SF_PWM, "fan/supply/pwm", PERM_READ_ONLY); addRegister(REG_FAN_EF_PWM, "fan/extract/pwm", PERM_READ_ONLY); - addRegister(REG_FAN_SF_RPM, "fan/supply/rpm", PERM_READ_ONLY); - addRegister(REG_FAN_EF_RPM, "fan/extract/rpm", PERM_READ_ONLY); + addRegister(REG_FAN_SF_RPM, "fan/supply/rpm", PERM_READ_ONLY, false, 10000 ); + addRegister(REG_FAN_EF_RPM, "fan/extract/rpm", PERM_READ_ONLY, false, 10000 ); addRegister(REG_FAN_SPEED_LVL_CD, "fan/speed/display", PERM_READ_ONLY); addRegister(REG_FAN_ALLOW_MANUAL_FAN_STOP, "fan/allowstop"); addRegister(REG_FAN_SPEED_LOG_RESET, "fan/speed/log/reset", PERM_WRITE_ONLY); diff --git a/src/register/heaterregister.h b/src/register/heaterregister.h index 1913f51..1ba9f9e 100644 --- a/src/register/heaterregister.h +++ b/src/register/heaterregister.h @@ -41,11 +41,11 @@ class HeaterRegister : public Register { addRegister(REG_HC_TEMP_LVL3, "temperature/level3", PERM_READ_ONLY); addRegister(REG_HC_TEMP_LVL4, "temperature/level4", PERM_READ_ONLY); addRegister(REG_HC_TEMP_LVL5, "temperature/level5", PERM_READ_ONLY); - addRegister(REG_HC_TEMP_IN1, "temperature/supply", PERM_READ_ONLY); - addRegister(REG_HC_TEMP_IN2, "temperature/extract", PERM_READ_ONLY); - addRegister(REG_HC_TEMP_IN3, "temperature/exhaust", PERM_READ_ONLY); - addRegister(REG_HC_TEMP_IN4, "temperature/protection", PERM_READ_ONLY); - addRegister(REG_HC_TEMP_IN5, "temperature/intake", PERM_READ_ONLY); + addRegister(REG_HC_TEMP_IN1, "temperature/supply", PERM_READ_ONLY, false, 20000); + addRegister(REG_HC_TEMP_IN2, "temperature/extract", PERM_READ_ONLY, false,20000); + addRegister(REG_HC_TEMP_IN3, "temperature/exhaust", PERM_READ_ONLY, false,20000); + addRegister(REG_HC_TEMP_IN4, "temperature/protection", PERM_READ_ONLY, false, 20000); + addRegister(REG_HC_TEMP_IN5, "temperature/intake", PERM_READ_ONLY, false, 20000 ); addRegister(REG_HC_TEMP_STATE, "", PERM_NONE); addRegister(REG_HC_PREHEATER_TYPE, "preheater/type"); addRegister(REG_HC_HEATER_TEMP_SP, "heater/setpoint", PERM_READ_ONLY); diff --git a/src/register/register.cpp b/src/register/register.cpp index 41db91c..42e0984 100644 --- a/src/register/register.cpp +++ b/src/register/register.cpp @@ -1,6 +1,44 @@ #include "register.h" #include "../debugger.h" +/* ******************************************************** */ + // set flags, return the ones that were previously 0. +uint8_t Register::SingleReg::setFlags( uint8_t flags ) +{ + uint8_t prev = getFlags( flags ); + + m_flags |= flags; + + return flags & ~prev; +} + +/* ******************************************************** */ +// clear flags return the ones that were previusly 1. +uint8_t Register::SingleReg::clearFlags( uint8_t flags ) +{ + uint32_t prev = getFlags( flags ); + m_flags &= ~flags; + return flags & prev; +} + +/* ******************************************************** */ +bool Register::SingleReg::readyForMqttExport( uint32_t ms_now ) const +{ + if ( getFlags( FLG_VALUE_UPDATED ) == 0 ) + return false; + + if ( m_mqtt_period_256ms==0 ) return true; // no throttling set. + if ( m_mqtt_timestamp_256ms==0 ) return true; // no timestamp set yet. + + uint16_t tm = MsTo256ms( ms_now ); + + uint16_t diff = tm - m_mqtt_timestamp_256ms; // signed diff, negative values will have upper bit set, i.e large. + if ( diff > m_mqtt_period_256ms ) + return true; + + return false; +} + /* ******************************************************** */ String Register::getName() const { return m_name; @@ -55,29 +93,42 @@ boolean Register::setValue(int address, int value) bool changed = (current != value) || !isReadable(address); if ( changed ) - reg.m_flags |= FLG_VALUE_UPDATED; - + { + if ( reg.setFlags(FLG_VALUE_UPDATED) != 0 ) + { + } + } + return changed; } /* ******************************************************** */ -bool Register::hasUpdatedValue(int32_t address) const +bool Register::hasUpdatedValue(int32_t address, const SingleReg **sreg_pp) const { int32_t idx = addr2Index( address, "hasUpdatedValue" ); if( idx == VAL_INVALID ) return false; - return (m_single_regs[idx].m_flags & FLG_VALUE_UPDATED); + SingleReg *sr = &m_single_regs[idx]; + + if ( sreg_pp ) *sreg_pp = sr; // return single register context pointer + + return (sr->getFlags(FLG_VALUE_UPDATED)) ? true : false; + } /* ******************************************************** */ -void Register::confirmUpdate( int32_t address ) +void Register::confirmUpdate( int32_t address, uint32_t millis ) { int idx = addr2Index( address, "confirmUpdate"); if( idx == VAL_INVALID ) return ; - - m_single_regs[idx].m_flags &= (~FLG_VALUE_UPDATED); + + m_single_regs[idx].clearFlags( FLG_VALUE_UPDATED); + + // remember time last exported for possible additional throttling. + m_single_regs[idx].setMqttExportTimestamp( millis ); + } @@ -135,7 +186,7 @@ void Register::confirmPendingWrite( int32_t address ) /* ******************************************************** */ -void Register::addRegister(int address, const String &name, byte permission) { +void Register::addRegister(int address, const String &name, byte permission, bool mqtt_retain, uint32_t mqtt_throttle_time_ms) { int idx = addr2Index( address, "addRegister"); if( idx == VAL_INVALID ) return; @@ -144,6 +195,11 @@ void Register::addRegister(int address, const String &name, byte permission) { r.m_perm = permission; r.m_value = isReadable( address ) ? VAL_INVALID : 0; r.m_flags = 0; // TODO add flags? + r.m_mqtt_timestamp_256ms = 0; + + if ( mqtt_retain ) r.setFlags(FLG_MQTT_RETAIN); + r.setMqttThrottlePeriodMs( mqtt_throttle_time_ms); + } /* ******************************************************** */ diff --git a/src/register/register.h b/src/register/register.h index e8b15be..efb2a48 100644 --- a/src/register/register.h +++ b/src/register/register.h @@ -19,26 +19,66 @@ */ class Register { - + +public: enum Flags { FLG_VALUE_UPDATED = 0x01, FLG_PENDING_WRITE = 0x02, + FLG_MQTT_RETAIN = 0x04 }; - // ****************************************** - // Context info kept for single registers + // ***************************************************************************** + // Context info kept for single registers. Only readables public to others than + // Register class. class SingleReg { - SingleReg() : m_value(VAL_INVALID), m_pending_set_value(VAL_INVALID),m_perm(0),m_flags(0){} - private: - String m_name; - int32_t m_value; - int32_t m_pending_set_value; - uint8_t m_perm; - uint8_t m_flags; + public: + SingleReg() : m_value(VAL_INVALID), m_pending_set_value(VAL_INVALID),m_perm(0),m_flags(0),m_mqtt_period_256ms(0),m_mqtt_timestamp_256ms(0){} + + // Read, typically in a visitor, whether register is due for MQTT export, i.e + // the value is updated compared to the previously exported, and any throttling + // time has passed too. + // PS: visitors can also be flagged to only visit registers that fulfill this + // check, i.e let the RegistryManager check before allowing the visit. + bool readyForMqttExport( uint32_t ms_now ) const; + + // Get MQTT retain flag for single-register. Typically, frequently updated + // sensor values may not have retain set, while slow updating fields would + // use retain. Also retain flags may need overriding for test purposes. + bool mqttRetain() const { return getFlags(FLG_MQTT_RETAIN)!=0 ? true : false; } + + protected: + // Throttle export interval in addition to the polling period for the register block. + // This can be used if a block is polled for fast response on setting registers, while + // not wanting to be floded by continously changing measurement registers. + // To save memory and ease calcs period is set in resolution of 256ms so we can shift + // the millisecond counter 8 bits down and look at the next 16 next bits. + // storing period as 16 bits yields a max throttle period of 65536*256ms = 16776s = 4.6hrs + void setMqttThrottlePeriod256ms( uint16_t period_256ms ) { m_mqtt_period_256ms=period_256ms; } + void setMqttThrottlePeriodMs( uint32_t ms ) { setMqttThrottlePeriod256ms( MsTo256ms(ms) ); } + uint16_t getMqttThrottlePeriod256ms( uint16_t period ) const { return m_mqtt_period_256ms; } + uint16_t setMqttExportTimestamp( uint32_t ms_now ) { m_mqtt_timestamp_256ms = MsTo256ms(ms_now); return m_mqtt_timestamp_256ms; } + + static uint16_t MsTo256ms(uint32_t ms ) { return (uint16_t) ((ms>>8) & 0xffff); } + + // set flags, return the ones that were previously 0. + uint8_t setFlags( uint8_t flags ); + // clear flags return the ones that were previusly 1. + uint8_t clearFlags( uint8_t flags ); + uint8_t getFlags(uint8_t flags=0xff) const { return m_flags & flags; } + - friend class Register; // to let Register access private members + private: + String m_name; + int32_t m_value; + int32_t m_pending_set_value; + uint8_t m_perm; + uint8_t m_flags; + uint16_t m_mqtt_period_256ms; + uint16_t m_mqtt_timestamp_256ms; + + friend class Register; // to let Register access private members }; public: @@ -56,25 +96,52 @@ class Register int getLength() const; bool isCoil() const; - // Use this method to set a new value to be written - // by the registry manager. + // Use this method to set a new value to be written by the registry manager. // returns true if there was a change. bool setNewPendingWriteValue( int32_t address, int32_t value ); + // check if there is a pending write request bool hasPendingWrite(int32_t address) const; + // Get value of pending write. int32_t getPendingWriteValue( int32_t address ) const; + // Clear pending write flag void confirmPendingWrite( int32_t address ); - - bool hasUpdatedValue(int32_t address) const; // check update flag. - void confirmUpdate(int32_t address); // acknowledge the updated value. + // Check if value is updated. Returns flag, but also returns + // in ready_for_export whether throttling indicates whether to send MQTT + // given the ms timestamp now. + bool hasUpdatedValue(int32_t address, const SingleReg **sreg_pp=0) const; // check update flag. + void confirmUpdate(int32_t address, uint32_t millis); // acknowledge the updated value. int getValue(int address) const; // get latest read value. - void addRegister(int address, const String &name, byte permission=PERM_FULL); + /* add a single register definition. + @param address + Address from MODBUS spec SystemAir Villavent + @param name + Topic name on MQTT. + @param permission + Read/Write/FUll permission. + @param mqtt_retain + Whether retain flag should be used when posting MQTT messages for this parameter. + Later a global override rule may be added if required + @param mqtt_throttle_time_ms + Throtling of frequency of MQTT export of this parameter. + For space/performance has a resolution of 256ms steps, if you must + specify at least 256ms, and e.g. 1000ms yields (1000/256)*256 = (1000>>8)*256 = 768ms + Also beware that the polling time of the register block containing the register will + influence how old the value exported could be, but you always get the latest sampled + value. + If having a MODBUS poll period of 500ms on the register block you can choose to set + throttle time on some sub-registers higher, but not lower, to reduce the frequency of + getting continous measurements, f.ex for the fan speed and temperature readings, + that jump up and down and cause most of the traffic. + Some user may prefer a higher interval to reduce storage space in logging databases. + */ + void addRegister(int address, const String &name, byte permission=PERM_FULL, bool mqtt_retain=true, uint32_t mqtt_throttle_time_ms=0 ); const String &getRegisterName(int address) const; int getRegisterAddress(const String &name) const; bool isReadable(int address) const; @@ -83,14 +150,21 @@ class Register virtual String getFormattedValue(int address) const; virtual boolean setFormattedValue(int address, const String &value, bool setpending=false); + // Check if register block needs a new poll on Modbus. boolean needsUpdate(uint32_t millis) const; + // Set timetamp of poll in milliseconds. void setLastUpdated(uint32_t millis); + // Get timestamp of last poll in milliseconds. uint32_t getLastUpdated() const { return m_last_updated; } + // Get configured interval for polling of register block. int32_t getInterval() const { return m_interval; } protected: + // Set the value in the cache for a given register address. This is + // typically only done by the RegisterManager after reading a new value on Modbus. boolean setValue(int address, int value); // set value, return true if changed. + // Map from Modbus address to index to Register block containing that register address. int addr2Index( int address, const char *source ) const; private: diff --git a/src/registermanager.cpp b/src/registermanager.cpp index badb7fd..c9140c1 100644 --- a/src/registermanager.cpp +++ b/src/registermanager.cpp @@ -1,4 +1,3 @@ -#include "registermanager.h" #include "register/fanregister.h" @@ -15,6 +14,7 @@ #include "register/alarmcoil.h" #include "register/alarmregister.h" +#include "registermanager.h" #include "debugger.h" @@ -112,6 +112,23 @@ void RegisterManager::unprotect() m_sem.give(); } +/* *************************************************** */ +void RegisterManager::simulateChangesOnNotConnected() +{ +/* TEST CODE TO SIMULATE CHANGES WHEN NOT CONNECTED: + Register *r = findRegisterByAddress( REG_FAN_SF_RPM ); + if ( r ) + { + uint16_t v = millis() & 0xffff; + const Register::SingleReg *rp; + bool was_changed = r->hasUpdatedValue(REG_FAN_SF_RPM, &rp ); + bool ch = r->setValue( REG_FAN_SF_RPM, v); // set value, return true if changed. + bool now_changed = r->hasUpdatedValue(REG_FAN_SF_RPM, &rp ); + bool rdy = rp->readyForMqttExport( millis() ); + debugI("Set SF fan %d upd %s, changed %s => %s rdy %s", v, ch?"Y":"N", was_changed?"Y":"N", now_changed ? "Y":"NO", rdy ? "Y":"N"); + } + END TEST CODE */ +} /* *************************************************** */ /* The main thread loop. @@ -155,6 +172,9 @@ void RegisterManager::taskCode() } else { + // In this method one can add simulation of changes on a not-connected test device. + simulateChangesOnNotConnected(); + switchState( STATE_CONNECTING ); backoff_time_ms = 1000; } @@ -411,12 +431,12 @@ class PendingWriteVisitor : public RegisterManager::Visitor // Implement in your visitor class and return negative to // interrupt traverse. virtual int32_t visit( Register ® ) { return 0; }; - virtual int32_t visit( int32_t address, Register ® );; + virtual int32_t visit( int32_t address, Register ®, const Register::SingleReg &sreg );; }; /* *************************************************** */ -int32_t PendingWriteVisitor::visit( int32_t address, Register ® ) +int32_t PendingWriteVisitor::visit( int32_t address, Register ®, const Register::SingleReg &sreg ) { if ( reg.hasPendingWrite(address)== false ) // uneeded extra check.. return 0; @@ -526,7 +546,8 @@ int32_t RegisterManager::acceptNoProtect(Visitor &visitor) int32_t visitor_res = 0; - // debugI("Accept traverse count %d start at %d", count, visitor.currIndex() ); + // debugI("Accept traverse count %d start at %d", count, visitor.currIndex() ); + uint32_t ms_now = millis(); // visit each reg at most once. Break if visitor returns negative. for ( i=0; visitor_res>=0 && ihasUpdatedValue(address); // whether a new values is available. + const Register::SingleReg *srp = 0; + + bool updated = reg->hasUpdatedValue(address, &srp ); // whether a new values is available. + bool ready_for_export = updated; + if ( srp ) + ready_for_export = srp->readyForMqttExport(ms_now); // checks both update falgs and throttle timer. + bool pnd_wr = reg->hasPendingWrite(address); // whether there is a pending write on register. if ( (visitor.getFlags(Visitor::FLG_VISIT_WRITE_PENDING) && pnd_wr) || (visitor.getFlags(Visitor::FLG_VISIT_UPDATED) && updated) || + (visitor.getFlags(Visitor::FLG_VISIT_UPDATED_THROTTLE) && ready_for_export ) || (visitor.getFlags(Visitor::FLG_VISIT_UNCHANGED) ) ) { - visitor_res = visitor.visit( address, *reg ); + visitor_res = visitor.visit( address, *reg, *srp ); if ( visitor_res < 0 ) break; } diff --git a/src/registermanager.h b/src/registermanager.h index f5ee6a3..8c9d788 100644 --- a/src/registermanager.h +++ b/src/registermanager.h @@ -73,12 +73,13 @@ class RegisterManager : public OSAPI_Task public: // Flags enum FilterMasks { - FLG_VISIT_REG_BLOCK = 0x01, - FLG_VISIT_REG_SINGLE = 0x02, - FLG_VISIT_UNCHANGED = 0x04, - FLG_VISIT_UPDATED = 0x08, - FLG_VISIT_WRITE_PENDING = 0x10, - FLG_VISIT_ALL= FLG_VISIT_REG_BLOCK|FLG_VISIT_REG_SINGLE|FLG_VISIT_UNCHANGED|FLG_VISIT_UPDATED|FLG_VISIT_WRITE_PENDING + FLG_VISIT_REG_BLOCK = 0x01, + FLG_VISIT_REG_SINGLE = 0x02, + FLG_VISIT_UNCHANGED = 0x04, + FLG_VISIT_UPDATED = 0x08, + FLG_VISIT_UPDATED_THROTTLE = 0x10, // only visit single registers if updated and throttle timer ready. + FLG_VISIT_WRITE_PENDING = 0x20, + FLG_VISIT_ALL= FLG_VISIT_REG_BLOCK|FLG_VISIT_REG_SINGLE|FLG_VISIT_UNCHANGED|FLG_VISIT_UPDATED|FLG_VISIT_UPDATED_THROTTLE|FLG_VISIT_WRITE_PENDING }; // Visitor. start_idx can be set to start from previous value to implement @@ -99,7 +100,7 @@ class RegisterManager : public OSAPI_Task // Implement this to visit individual single registers ///return negative will interrupt traverse. - virtual int32_t visit( int32_t address, Register ® ) { return 0; }; + virtual int32_t visit( int32_t address, Register ®, const Register::SingleReg &sreg ) { return 0; }; int32_t currIndex() const { return m_curr_idx; } void resetIndex() { m_curr_idx=0; } @@ -140,6 +141,9 @@ class RegisterManager : public OSAPI_Task void clearStats(); + void simulateChangesOnNotConnected(); + + private: OSAPI_Semaphore m_sem; bool m_enabled; @@ -166,4 +170,4 @@ class RegisterManager : public OSAPI_Task -#endif // sentinel \ No newline at end of file +#endif // sentinel diff --git a/src/web/InternalWebServer.cpp b/src/web/InternalWebServer.cpp index c824084..b7a5c65 100644 --- a/src/web/InternalWebServer.cpp +++ b/src/web/InternalWebServer.cpp @@ -704,7 +704,7 @@ void InternalWebServer::applicationJs() { } // Called on every single register -int32_t InternalWebServer::RegisterVisitor::visit( int32_t address, Register ®) +int32_t InternalWebServer::RegisterVisitor::visit( int32_t address, Register ®, const Register::SingleReg &sreg) { const String &name = reg.getRegisterName( address ); diff --git a/src/web/InternalWebServer.h b/src/web/InternalWebServer.h index 429aa92..1717770 100644 --- a/src/web/InternalWebServer.h +++ b/src/web/InternalWebServer.h @@ -53,7 +53,7 @@ class InternalWebServer { // Implement in your visitor class and return negative to // interrupt traverse. virtual int32_t visit( Register ® ) { return 0; }; - virtual int32_t visit( int32_t address, Register ® );; + virtual int32_t visit( int32_t address, Register ® , const Register::SingleReg &sreg); };