Skip to content

Commit

Permalink
rpn: rfbridge operators and mqtt fixes (#2302)
Browse files Browse the repository at this point in the history
- cache received rfbridge codes in the internal list, allow to operate on it via the rpn operators
- add `<N> <proto> <code> rfb_match`, matching when we receive specified protocol + code string at least N times
- add `<proto> <code> <proto> <code> rfb_sequence`, checking if specified protocol + code pairs happen in sequence
- add `<TIME> <N> <proto> <code> rfb_match_wait` - similar to `rfb_match`, but waiting for at least `TIME` (ms) via oneshot runner
- add `<proto> <code> rfb_info`, pushes code's latest timestamp and it's counter on the stack
- add `<proto> <code> rfb_pop`, which removes the specified protocol + code from the internal cache
- fix MQTT skip setting making RPN variables absent on initial connection
- default to no skip when receiving MQTT
(small issue still stands with us having non-clean MQTT session, broker will persist variable subscriptions even after unsubscribe event)
  • Loading branch information
mcspr authored Aug 26, 2020
1 parent 82d5de0 commit 1f9479b
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 22 deletions.
6 changes: 1 addition & 5 deletions code/espurna/config/general.h
Original file line number Diff line number Diff line change
Expand Up @@ -1098,12 +1098,8 @@
#endif


#ifndef MQTT_SKIP_RETAINED
#define MQTT_SKIP_RETAINED 1 // Skip retained messages on connection
#endif

#ifndef MQTT_SKIP_TIME
#define MQTT_SKIP_TIME 1000 // Skip messages for 1 second anter connection
#define MQTT_SKIP_TIME 0 // Skip messages for N ms after connection. Disabled by default
#endif

#ifndef MQTT_USE_JSON
Expand Down
29 changes: 16 additions & 13 deletions code/espurna/mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,13 @@ Updated secure client support by Niek van der Maas < mail at niekvandermaas dot
#endif // MQTT_LIBRARY == MQTT_ASYNCMQTTCLIENT


bool _mqtt_enabled = MQTT_ENABLED;
bool _mqtt_use_json = false;
unsigned long _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
unsigned long _mqtt_last_connection = 0;
AsyncClientState _mqtt_state = AsyncClientState::Disconnected;
bool _mqtt_retain_skipped = false;
bool _mqtt_skip_messages = false;
unsigned long _mqtt_skip_time = MQTT_SKIP_TIME;
unsigned long _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
bool _mqtt_enabled = MQTT_ENABLED;
bool _mqtt_use_json = false;
bool _mqtt_retain = MQTT_RETAIN;
int _mqtt_qos = MQTT_QOS;
int _mqtt_keepalive = MQTT_KEEPALIVE;
Expand Down Expand Up @@ -343,6 +344,9 @@ void _mqttConfigure() {
_mqttApplyTopic(_mqtt_topic_json, MQTT_TOPIC_JSON);
}

// Skip messages in a small window right after the connection
_mqtt_skip_time = getSetting("mqttSkipTime", MQTT_SKIP_TIME);

// Custom payload strings
settingsProcessConfig({
{_mqtt_payload_online, "mqttPayloadOnline", MQTT_STATUS_ONLINE},
Expand Down Expand Up @@ -433,6 +437,7 @@ void _mqttInfo() {
);
}
}

}

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -535,7 +540,6 @@ void _mqttOnConnect() {

_mqtt_last_connection = millis();
_mqtt_state = AsyncClientState::Connected;
_mqtt_retain_skipped = false;

DEBUG_MSG_P(PSTR("[MQTT] Connected!\n"));

Expand All @@ -554,7 +558,6 @@ void _mqttOnDisconnect() {
// Reset reconnection delay
_mqtt_last_connection = millis();
_mqtt_state = AsyncClientState::Disconnected;
_mqtt_retain_skipped = false;

DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n"));

Expand All @@ -568,14 +571,12 @@ void _mqttOnDisconnect() {
// Force-skip everything received in a short window right after connecting to avoid syncronization issues.

bool _mqttMaybeSkipRetained(char* topic) {
#if MQTT_SKIP_RETAINED
if (!_mqtt_retain_skipped && (millis() - _mqtt_last_connection < MQTT_SKIP_TIME)) {
DEBUG_MSG_P(PSTR("[MQTT] Received %s - SKIPPED\n"), topic);
return true;
}
#endif
if (_mqtt_skip_messages && (millis() - _mqtt_last_connection < _mqtt_skip_time)) {
DEBUG_MSG_P(PSTR("[MQTT] Received %s - SKIPPED\n"), topic);
return true;
}

_mqtt_retain_skipped = true;
_mqtt_skip_messages = false;
return false;
}

Expand Down Expand Up @@ -1006,6 +1007,8 @@ void _mqttConnect() {

_mqtt_state = AsyncClientState::Connecting;

_mqtt_skip_messages = (_mqtt_skip_time > 0);

#if SECURE_CLIENT != SECURE_CLIENT_NONE
const bool secure = getSetting("mqttUseSSL", 1 == MQTT_SSL_ENABLED);
#else
Expand Down
233 changes: 229 additions & 4 deletions code/espurna/rpnrules.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ Copyright (C) 2019 by Xose Pérez <xose dot perez at gmail dot com>
#include "relay.h"
#include "rpc.h"
#include "sensor.h"
#include "rfbridge.h"
#include "terminal.h"
#include "ws.h"

#include <list>
#include <vector>

// -----------------------------------------------------------------------------
// Custom commands
// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -236,6 +239,214 @@ rpn_error _rpnRelayStatus(rpn_context & ctxt, bool force) {

#endif // RELAY_SUPPORT

#if RFB_SUPPORT

struct rpn_rfbridge_code {
unsigned char protocol;
String raw;
size_t count;
decltype(millis()) last;
};

// TODO: in theory, we could do with forward_list. however, this would require a more complicated removal process,
// as we would no longer know the previous element and would need to track 2 elements at a time
static std::list<rpn_rfbridge_code> _rfb_codes;

static uint32_t _rfb_code_repeat_window;
static uint32_t _rfb_code_stale_delay;

static uint32_t _rfb_code_match_window;

struct rpn_rfbridge_match {
unsigned char protocol;
String raw;
};

rpn_error _rpnRfbSequence(rpn_context& ctxt) {
auto raw_second = rpn_stack_pop(ctxt);
auto proto_second = rpn_stack_pop(ctxt);

auto raw_first = rpn_stack_pop(ctxt);
auto proto_first = rpn_stack_pop(ctxt);

// find 2 codes in the same order and save pointers
rpn_rfbridge_match match[2] {
{static_cast<unsigned char>(proto_first.toUint()), raw_first.toString()},
{static_cast<unsigned char>(proto_second.toUint()), raw_second.toString()}
};
rpn_rfbridge_code* refs[2] {nullptr, nullptr};

for (auto& recent : _rfb_codes) {
if ((refs[0] != nullptr) && (refs[1] != nullptr)) {
break;
}
for (int index = 0; index < 2; ++index) {
if ((refs[index] == nullptr)
&& (match[index].protocol == recent.protocol)
&& (match[index].raw == recent.raw)) {
refs[index] = &recent;
}
}
}

if ((refs[0] == nullptr) || (refs[1] == nullptr)) {
return rpn_operator_error::CannotContinue;
}

// purge codes to avoid matching again on the next rules run
if ((millis() - refs[0]->last) > (millis() - refs[1]->last)) {
_rfb_codes.remove_if([&refs](rpn_rfbridge_code& code) {
return (refs[0] == &code) || (refs[1] == &code);
});
return rpn_operator_error::Ok;
}

return rpn_operator_error::CannotContinue;
}

decltype(_rfb_codes)::iterator _rpnRfbFindCode(unsigned char protocol, const String& match) {
return std::find_if(_rfb_codes.begin(), _rfb_codes.end(), [protocol, &match](const rpn_rfbridge_code& code) {
return (code.protocol == protocol) && (code.raw == match);
});
}

rpn_error _rpnRfbPop(rpn_context& ctxt) {
auto code = rpn_stack_pop(ctxt);
auto proto = rpn_stack_pop(ctxt);

auto result = _rpnRfbFindCode(proto.toUint(), code.toString());
if (result == _rfb_codes.end()) {
return rpn_operator_error::CannotContinue;
}

_rfb_codes.erase(result);
return rpn_operator_error::Ok;
}

rpn_error _rpnRfbInfo(rpn_context& ctxt) {
auto code = rpn_stack_pop(ctxt);
auto proto = rpn_stack_pop(ctxt);

auto result = _rpnRfbFindCode(proto.toUint(), code.toString());
if (result == _rfb_codes.end()) {
return rpn_operator_error::CannotContinue;
}

rpn_stack_push(ctxt, rpn_value(
static_cast<rpn_uint>((*result).count)));
rpn_stack_push(ctxt, rpn_value(
static_cast<rpn_uint>((*result).last)));

return rpn_operator_error::Ok;
}

rpn_error _rpnRfbWaitMatch(rpn_context& ctxt) {
auto code = rpn_stack_pop(ctxt);
auto proto = rpn_stack_pop(ctxt);
auto count = rpn_stack_pop(ctxt);
auto time = rpn_stack_pop(ctxt);

auto result = _rpnRfbFindCode(proto.toUint(), code.toString());
if (result == _rfb_codes.end()) {
return rpn_operator_error::CannotContinue;
}

if ((*result).count < count.toUint()) {
return rpn_operator_error::CannotContinue;
}

// purge code to avoid matching again on the next rules run
if (rpn_operator_error::Ok == _rpnRunnerHandler(ctxt, RpnRunner::Policy::OneShot, time.toUint())) {
_rfb_codes.erase(result);
return rpn_operator_error::Ok;
}

return rpn_operator_error::CannotContinue;
}

rpn_error _rpnRfbMatcher(rpn_context& ctxt) {
auto code = rpn_stack_pop(ctxt);
auto proto = rpn_stack_pop(ctxt);
auto count = rpn_stack_pop(ctxt);

auto result = _rpnRfbFindCode(proto.toUint(), code.toString());
if (result == _rfb_codes.end()) {
return rpn_operator_error::CannotContinue;
}

// only process recent codes, ignore when rule is processing outside of this small window
if (millis() - (*result).last >= _rfb_code_match_window) {
return rpn_operator_error::CannotContinue;
}

// purge code to avoid matching again on the next rules run
if ((*result).count == count.toUint()) {
_rfb_codes.erase(result);
return rpn_operator_error::Ok;
}

return rpn_operator_error::CannotContinue;
}

void _rpnBrokerRfbridgeCallback(unsigned char protocol, const char* raw_code) {

// remove really old codes that we have not seen in a while to avoid memory exhaustion
auto ts = millis();
auto old = std::remove_if(_rfb_codes.begin(), _rfb_codes.end(), [ts](rpn_rfbridge_code& code) {
return (ts - code.last) >= _rfb_code_stale_delay;
});

if (old != _rfb_codes.end()) {
_rfb_codes.erase(old, _rfb_codes.end());
}

auto result = _rpnRfbFindCode(protocol, raw_code);
if (result != _rfb_codes.end()) {
// we also need to reset the counter at a certain point to allow next batch of repeats to go through
if (millis() - (*result).last >= _rfb_code_repeat_window) {
(*result).count = 0;
}
(*result).last = millis();
(*result).count += 1u;
} else {
_rfb_codes.push_back({protocol, raw_code, 1u, millis()});
}

_rpn_run = true;
}

void _rpnRfbSetup() {
// - Repeat window is an arbitrary time, just about 3-4 more times it takes for
// a code to be sent again when holding a generic remote button
// Code counter is reset to 0 when outside of the window.
// - Stale delay allows broker callback to remove really old codes.
// (TODO: can this happen in loop() cb instead?)
_rfb_code_repeat_window = getSetting("rfbRepeatWindow", 2000ul);
_rfb_code_match_window = getSetting("rfbMatchWindow", 2000ul);
_rfb_code_stale_delay = getSetting("rfbStaleDelay", 10000ul);

#if TERMINAL_SUPPORT
terminalRegisterCommand(F("RFB.CODES"), [](const terminal::CommandContext& ctx) {
for (auto& code : _rfb_codes) {
char buffer[128] = {0};
snprintf_P(buffer, sizeof(buffer),
PSTR("\"%s\" proto=%u count=%u last=%u"),
code.protocol,
code.raw.c_str(),
code.count,
code.last
);
ctx.output.println(buffer);
}
});
#endif

// Main bulk of the processing goes on in here
RfbridgeBroker::Register(_rpnBrokerRfbridgeCallback);
}

#endif // RFB_SUPPORT

void _rpnShowStack(Print& print) {
print.println(F("Stack:"));

Expand Down Expand Up @@ -369,6 +580,14 @@ void _rpnInit() {

#endif

#if RFB_SUPPORT
rpn_operator_set(_rpn_ctxt, "rfb_pop", 2, _rpnRfbPop);
rpn_operator_set(_rpn_ctxt, "rfb_info", 2, _rpnRfbInfo);
rpn_operator_set(_rpn_ctxt, "rfb_sequence", 4, _rpnRfbSequence);
rpn_operator_set(_rpn_ctxt, "rfb_match", 3, _rpnRfbMatcher);
rpn_operator_set(_rpn_ctxt, "rfb_match_wait", 4, _rpnRfbWaitMatch);
#endif

#if MQTT_SUPPORT
rpn_operator_set(_rpn_ctxt, "mqtt_send", 2, [](rpn_context & ctxt) -> rpn_error {
rpn_value message;
Expand All @@ -384,10 +603,12 @@ void _rpnInit() {
#endif

// Some debugging. Dump stack contents
rpn_operator_set(_rpn_ctxt, "showstack", 0, [](rpn_context & ctxt) -> rpn_error {
_rpnShowStack(terminalDefaultStream());
return 0;
});
#if TERMINAL_SUPPORT
rpn_operator_set(_rpn_ctxt, "showstack", 0, [](rpn_context & ctxt) -> rpn_error {
_rpnShowStack(terminalDefaultStream());
return 0;
});
#endif

// And, simple string logging
#if DEBUG_SUPPORT
Expand Down Expand Up @@ -591,6 +812,10 @@ void rpnSetup() {

StatusBroker::Register(_rpnBrokerStatus);

#if RFB_SUPPORT
_rpnRfbSetup();
#endif

#if SENSOR_SUPPORT
SensorReadBroker::Register(_rpnBrokerCallback);
#endif
Expand Down

0 comments on commit 1f9479b

Please sign in to comment.