Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpn: rfbridge rules #2302

Merged
merged 22 commits into from
Aug 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
568cc5b
wip rpn rules for rfbridge
mcspr Jul 16, 2020
d17158e
Exact hits for rfb_match condition, purge on rfb_match, tweak rfb_seq…
mcspr Aug 1, 2020
7b24bff
rfb_match_wait, rfb_info, rfb_pop
mcspr Aug 2, 2020
245723f
fix gcc4.8.2 vector push
mcspr Aug 2, 2020
2132df3
only match one-shot with one-shot
mcspr Aug 2, 2020
e788e12
don't try to match rfbON/OFF when there are no relays
mcspr Aug 2, 2020
9afd6eb
drop run_ from runners ops
mcspr Aug 2, 2020
bd125ad
Normalize codes instead of forcing leading zeroes
mcspr Aug 8, 2020
8b6ef70
Tweak logging, fix rpn.test bogus error message
mcspr Aug 8, 2020
af2e5ec
debug->showstack, log->dbgmsg (fix logarithm override)
mcspr Aug 8, 2020
21e2edb
Bump rpnlib to 0.24.0
mcspr Aug 8, 2020
cced11d
fixup! Normalize codes instead of forcing leading zeroes
mcspr Aug 8, 2020
7e1001e
fixup! debug->showstack, log->dbgmsg (fix logarithm override)
mcspr Aug 8, 2020
98480d4
Bump to the latest commit rpnlib
mcspr Aug 9, 2020
a47b8f9
hits->count, terminal rfb.codes, stale & repeat delay settings
mcspr Aug 9, 2020
3d1192c
remove debug prints when code is received
mcspr Aug 9, 2020
bd18be8
configure separate rfb_match window
mcspr Aug 9, 2020
75ed745
Merge branch 'dev' into rpn/rfb-codes-plus
mcspr Aug 10, 2020
0bd83f9
Fix loop overriding previous find
mcspr Aug 11, 2020
7be1446
No default stream without terminal support
mcspr Aug 11, 2020
d98dffa
Merge remote-tracking branch 'origin/dev' into rpn/rfb-codes-plus
mcspr Aug 18, 2020
78c6795
Merge remote-tracking branch 'origin/dev' into rpn/rfb-codes-plus
mcspr Aug 26, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions code/espurna/config/general.h
Original file line number Diff line number Diff line change
Expand Up @@ -1098,12 +1098,8 @@
#endif


#ifndef MQTT_SKIP_RETAINED
#define MQTT_SKIP_RETAINED 1 // Skip retained messages on connection
#endif

#ifndef MQTT_SKIP_TIME
#define MQTT_SKIP_TIME 1000 // Skip messages for 1 second anter connection
#define MQTT_SKIP_TIME 0 // Skip messages for N ms after connection. Disabled by default
#endif

#ifndef MQTT_USE_JSON
Expand Down
29 changes: 16 additions & 13 deletions code/espurna/mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,13 @@ Updated secure client support by Niek van der Maas < mail at niekvandermaas dot
#endif // MQTT_LIBRARY == MQTT_ASYNCMQTTCLIENT


bool _mqtt_enabled = MQTT_ENABLED;
bool _mqtt_use_json = false;
unsigned long _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
unsigned long _mqtt_last_connection = 0;
AsyncClientState _mqtt_state = AsyncClientState::Disconnected;
bool _mqtt_retain_skipped = false;
bool _mqtt_skip_messages = false;
unsigned long _mqtt_skip_time = MQTT_SKIP_TIME;
unsigned long _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
bool _mqtt_enabled = MQTT_ENABLED;
bool _mqtt_use_json = false;
bool _mqtt_retain = MQTT_RETAIN;
int _mqtt_qos = MQTT_QOS;
int _mqtt_keepalive = MQTT_KEEPALIVE;
Expand Down Expand Up @@ -343,6 +344,9 @@ void _mqttConfigure() {
_mqttApplyTopic(_mqtt_topic_json, MQTT_TOPIC_JSON);
}

// Skip messages in a small window right after the connection
_mqtt_skip_time = getSetting("mqttSkipTime", MQTT_SKIP_TIME);

// Custom payload strings
settingsProcessConfig({
{_mqtt_payload_online, "mqttPayloadOnline", MQTT_STATUS_ONLINE},
Expand Down Expand Up @@ -433,6 +437,7 @@ void _mqttInfo() {
);
}
}

}

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -535,7 +540,6 @@ void _mqttOnConnect() {

_mqtt_last_connection = millis();
_mqtt_state = AsyncClientState::Connected;
_mqtt_retain_skipped = false;

DEBUG_MSG_P(PSTR("[MQTT] Connected!\n"));

Expand All @@ -554,7 +558,6 @@ void _mqttOnDisconnect() {
// Reset reconnection delay
_mqtt_last_connection = millis();
_mqtt_state = AsyncClientState::Disconnected;
_mqtt_retain_skipped = false;

DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n"));

Expand All @@ -568,14 +571,12 @@ void _mqttOnDisconnect() {
// Force-skip everything received in a short window right after connecting to avoid syncronization issues.

bool _mqttMaybeSkipRetained(char* topic) {
#if MQTT_SKIP_RETAINED
if (!_mqtt_retain_skipped && (millis() - _mqtt_last_connection < MQTT_SKIP_TIME)) {
DEBUG_MSG_P(PSTR("[MQTT] Received %s - SKIPPED\n"), topic);
return true;
}
#endif
if (_mqtt_skip_messages && (millis() - _mqtt_last_connection < _mqtt_skip_time)) {
DEBUG_MSG_P(PSTR("[MQTT] Received %s - SKIPPED\n"), topic);
return true;
}

_mqtt_retain_skipped = true;
_mqtt_skip_messages = false;
return false;
}

Expand Down Expand Up @@ -1006,6 +1007,8 @@ void _mqttConnect() {

_mqtt_state = AsyncClientState::Connecting;

_mqtt_skip_messages = (_mqtt_skip_time > 0);

#if SECURE_CLIENT != SECURE_CLIENT_NONE
const bool secure = getSetting("mqttUseSSL", 1 == MQTT_SSL_ENABLED);
#else
Expand Down
233 changes: 229 additions & 4 deletions code/espurna/rpnrules.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ Copyright (C) 2019 by Xose Pérez <xose dot perez at gmail dot com>
#include "relay.h"
#include "rpc.h"
#include "sensor.h"
#include "rfbridge.h"
#include "terminal.h"
#include "ws.h"

#include <list>
#include <vector>

// -----------------------------------------------------------------------------
// Custom commands
// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -236,6 +239,214 @@ rpn_error _rpnRelayStatus(rpn_context & ctxt, bool force) {

#endif // RELAY_SUPPORT

#if RFB_SUPPORT

struct rpn_rfbridge_code {
unsigned char protocol;
String raw;
size_t count;
decltype(millis()) last;
};

// TODO: in theory, we could do with forward_list. however, this would require a more complicated removal process,
// as we would no longer know the previous element and would need to track 2 elements at a time
static std::list<rpn_rfbridge_code> _rfb_codes;

static uint32_t _rfb_code_repeat_window;
static uint32_t _rfb_code_stale_delay;

static uint32_t _rfb_code_match_window;

struct rpn_rfbridge_match {
unsigned char protocol;
String raw;
};

rpn_error _rpnRfbSequence(rpn_context& ctxt) {
auto raw_second = rpn_stack_pop(ctxt);
auto proto_second = rpn_stack_pop(ctxt);

auto raw_first = rpn_stack_pop(ctxt);
auto proto_first = rpn_stack_pop(ctxt);

// find 2 codes in the same order and save pointers
rpn_rfbridge_match match[2] {
{static_cast<unsigned char>(proto_first.toUint()), raw_first.toString()},
{static_cast<unsigned char>(proto_second.toUint()), raw_second.toString()}
};
rpn_rfbridge_code* refs[2] {nullptr, nullptr};

for (auto& recent : _rfb_codes) {
if ((refs[0] != nullptr) && (refs[1] != nullptr)) {
break;
}
for (int index = 0; index < 2; ++index) {
if ((refs[index] == nullptr)
&& (match[index].protocol == recent.protocol)
&& (match[index].raw == recent.raw)) {
refs[index] = &recent;
}
}
}

if ((refs[0] == nullptr) || (refs[1] == nullptr)) {
return rpn_operator_error::CannotContinue;
}

// purge codes to avoid matching again on the next rules run
if ((millis() - refs[0]->last) > (millis() - refs[1]->last)) {
_rfb_codes.remove_if([&refs](rpn_rfbridge_code& code) {
return (refs[0] == &code) || (refs[1] == &code);
});
return rpn_operator_error::Ok;
}

return rpn_operator_error::CannotContinue;
}

decltype(_rfb_codes)::iterator _rpnRfbFindCode(unsigned char protocol, const String& match) {
return std::find_if(_rfb_codes.begin(), _rfb_codes.end(), [protocol, &match](const rpn_rfbridge_code& code) {
return (code.protocol == protocol) && (code.raw == match);
});
}

rpn_error _rpnRfbPop(rpn_context& ctxt) {
auto code = rpn_stack_pop(ctxt);
auto proto = rpn_stack_pop(ctxt);

auto result = _rpnRfbFindCode(proto.toUint(), code.toString());
if (result == _rfb_codes.end()) {
return rpn_operator_error::CannotContinue;
}

_rfb_codes.erase(result);
return rpn_operator_error::Ok;
}

rpn_error _rpnRfbInfo(rpn_context& ctxt) {
auto code = rpn_stack_pop(ctxt);
auto proto = rpn_stack_pop(ctxt);

auto result = _rpnRfbFindCode(proto.toUint(), code.toString());
if (result == _rfb_codes.end()) {
return rpn_operator_error::CannotContinue;
}

rpn_stack_push(ctxt, rpn_value(
static_cast<rpn_uint>((*result).count)));
rpn_stack_push(ctxt, rpn_value(
static_cast<rpn_uint>((*result).last)));

return rpn_operator_error::Ok;
}

rpn_error _rpnRfbWaitMatch(rpn_context& ctxt) {
auto code = rpn_stack_pop(ctxt);
auto proto = rpn_stack_pop(ctxt);
auto count = rpn_stack_pop(ctxt);
auto time = rpn_stack_pop(ctxt);

auto result = _rpnRfbFindCode(proto.toUint(), code.toString());
if (result == _rfb_codes.end()) {
return rpn_operator_error::CannotContinue;
}

if ((*result).count < count.toUint()) {
return rpn_operator_error::CannotContinue;
}

// purge code to avoid matching again on the next rules run
if (rpn_operator_error::Ok == _rpnRunnerHandler(ctxt, RpnRunner::Policy::OneShot, time.toUint())) {
_rfb_codes.erase(result);
return rpn_operator_error::Ok;
}

return rpn_operator_error::CannotContinue;
}

rpn_error _rpnRfbMatcher(rpn_context& ctxt) {
auto code = rpn_stack_pop(ctxt);
auto proto = rpn_stack_pop(ctxt);
auto count = rpn_stack_pop(ctxt);

auto result = _rpnRfbFindCode(proto.toUint(), code.toString());
if (result == _rfb_codes.end()) {
return rpn_operator_error::CannotContinue;
}

// only process recent codes, ignore when rule is processing outside of this small window
if (millis() - (*result).last >= _rfb_code_match_window) {
return rpn_operator_error::CannotContinue;
}

// purge code to avoid matching again on the next rules run
if ((*result).count == count.toUint()) {
_rfb_codes.erase(result);
return rpn_operator_error::Ok;
}

return rpn_operator_error::CannotContinue;
}

void _rpnBrokerRfbridgeCallback(unsigned char protocol, const char* raw_code) {

// remove really old codes that we have not seen in a while to avoid memory exhaustion
auto ts = millis();
auto old = std::remove_if(_rfb_codes.begin(), _rfb_codes.end(), [ts](rpn_rfbridge_code& code) {
return (ts - code.last) >= _rfb_code_stale_delay;
});

if (old != _rfb_codes.end()) {
_rfb_codes.erase(old, _rfb_codes.end());
}

auto result = _rpnRfbFindCode(protocol, raw_code);
if (result != _rfb_codes.end()) {
// we also need to reset the counter at a certain point to allow next batch of repeats to go through
if (millis() - (*result).last >= _rfb_code_repeat_window) {
(*result).count = 0;
}
(*result).last = millis();
(*result).count += 1u;
} else {
_rfb_codes.push_back({protocol, raw_code, 1u, millis()});
}

_rpn_run = true;
}

void _rpnRfbSetup() {
// - Repeat window is an arbitrary time, just about 3-4 more times it takes for
// a code to be sent again when holding a generic remote button
// Code counter is reset to 0 when outside of the window.
// - Stale delay allows broker callback to remove really old codes.
// (TODO: can this happen in loop() cb instead?)
_rfb_code_repeat_window = getSetting("rfbRepeatWindow", 2000ul);
_rfb_code_match_window = getSetting("rfbMatchWindow", 2000ul);
_rfb_code_stale_delay = getSetting("rfbStaleDelay", 10000ul);

#if TERMINAL_SUPPORT
terminalRegisterCommand(F("RFB.CODES"), [](const terminal::CommandContext& ctx) {
for (auto& code : _rfb_codes) {
char buffer[128] = {0};
snprintf_P(buffer, sizeof(buffer),
PSTR("\"%s\" proto=%u count=%u last=%u"),
code.protocol,
code.raw.c_str(),
code.count,
code.last
);
ctx.output.println(buffer);
}
});
#endif

// Main bulk of the processing goes on in here
RfbridgeBroker::Register(_rpnBrokerRfbridgeCallback);
}

#endif // RFB_SUPPORT

void _rpnShowStack(Print& print) {
print.println(F("Stack:"));

Expand Down Expand Up @@ -369,6 +580,14 @@ void _rpnInit() {

#endif

#if RFB_SUPPORT
rpn_operator_set(_rpn_ctxt, "rfb_pop", 2, _rpnRfbPop);
rpn_operator_set(_rpn_ctxt, "rfb_info", 2, _rpnRfbInfo);
rpn_operator_set(_rpn_ctxt, "rfb_sequence", 4, _rpnRfbSequence);
rpn_operator_set(_rpn_ctxt, "rfb_match", 3, _rpnRfbMatcher);
rpn_operator_set(_rpn_ctxt, "rfb_match_wait", 4, _rpnRfbWaitMatch);
#endif

#if MQTT_SUPPORT
rpn_operator_set(_rpn_ctxt, "mqtt_send", 2, [](rpn_context & ctxt) -> rpn_error {
rpn_value message;
Expand All @@ -384,10 +603,12 @@ void _rpnInit() {
#endif

// Some debugging. Dump stack contents
rpn_operator_set(_rpn_ctxt, "showstack", 0, [](rpn_context & ctxt) -> rpn_error {
_rpnShowStack(terminalDefaultStream());
return 0;
});
#if TERMINAL_SUPPORT
rpn_operator_set(_rpn_ctxt, "showstack", 0, [](rpn_context & ctxt) -> rpn_error {
_rpnShowStack(terminalDefaultStream());
return 0;
});
#endif

// And, simple string logging
#if DEBUG_SUPPORT
Expand Down Expand Up @@ -591,6 +812,10 @@ void rpnSetup() {

StatusBroker::Register(_rpnBrokerStatus);

#if RFB_SUPPORT
_rpnRfbSetup();
#endif

#if SENSOR_SUPPORT
SensorReadBroker::Register(_rpnBrokerCallback);
#endif
Expand Down