Skip to content

Commit

Permalink
StoreForward updates (#3194)
Browse files Browse the repository at this point in the history
* StoreForward updates
- Send history in "text" variant
- Don't send history the client already got
- Check if PSRAM is full
- More sensible defaults

* Set `TEXT_BROADCAST` or `TEXT_DIRECT` RequestResponse tag

* feat: E-Ink "Dynamic Partial" (#3193)

Use a mixture of full refresh, partial refresh, and skipped updates, balancing urgency and display health.

Co-authored-by: Ben Meadors <benmmeadors@gmail.com>

* [create-pull-request] automated change (#3209)

Co-authored-by: thebentern <thebentern@users.noreply.github.com>

* Reset `last_index` if history was cleared, e.g. by reboot

---------

Co-authored-by: Ben Meadors <benmmeadors@gmail.com>
Co-authored-by: todd-herbert <herbert.todd@gmail.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: thebentern <thebentern@users.noreply.github.com>
Co-authored-by: Garth Vander Houwen <garthvh@yahoo.com>
  • Loading branch information
6 people authored Feb 14, 2024
1 parent d2a74a5 commit d9bd9bd
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 53 deletions.
93 changes: 51 additions & 42 deletions src/modules/esp32/StoreForwardModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,11 @@ int32_t StoreForwardModule::runOnce()
// Only send packets if the channel is less than 25% utilized.
if (airTime->isTxAllowedChannelUtil(true)) {
storeForwardModule->sendPayload(this->busyTo, this->packetHistoryTXQueue_index);
if (this->packetHistoryTXQueue_index == packetHistoryTXQueue_size) {
// Tell the client we're done sending
meshtastic_StoreAndForward sf = meshtastic_StoreAndForward_init_zero;
sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_PING;
storeForwardModule->sendMessage(this->busyTo, sf);
LOG_INFO("*** S&F - Done. (ROUTER_PING)\n");
if (this->packetHistoryTXQueue_index < packetHistoryTXQueue_size - 1) {
this->packetHistoryTXQueue_index++;
} else {
this->packetHistoryTXQueue_index = 0;
this->busy = false;
} else {
this->packetHistoryTXQueue_index++;
}
}
} else if ((millis() - lastHeartbeat > (heartbeatInterval * 1000)) && airTime->isTxAllowedChannelUtil(true)) {
Expand All @@ -56,7 +51,7 @@ int32_t StoreForwardModule::runOnce()
meshtastic_StoreAndForward sf = meshtastic_StoreAndForward_init_zero;
sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_HEARTBEAT;
sf.which_variant = meshtastic_StoreAndForward_heartbeat_tag;
sf.variant.heartbeat.period = 300;
sf.variant.heartbeat.period = heartbeatInterval;
sf.variant.heartbeat.secondary = 0; // TODO we always have one primary router for now
storeForwardModule->sendMessage(NODENUM_BROADCAST, sf);
}
Expand Down Expand Up @@ -101,10 +96,11 @@ void StoreForwardModule::populatePSRAM()
*
* @param msAgo The number of milliseconds ago from which to start sending messages.
* @param to The recipient ID to send the messages to.
* @param last_request_index The index in the packet history of the last request from this node.
*/
void StoreForwardModule::historySend(uint32_t msAgo, uint32_t to)
void StoreForwardModule::historySend(uint32_t msAgo, uint32_t to, uint32_t last_request_index)
{
uint32_t queueSize = storeForwardModule->historyQueueCreate(msAgo, to);
uint32_t queueSize = storeForwardModule->historyQueueCreate(msAgo, to, &last_request_index);

if (queueSize) {
LOG_INFO("*** S&F - Sending %u message(s)\n", queueSize);
Expand All @@ -118,39 +114,38 @@ void StoreForwardModule::historySend(uint32_t msAgo, uint32_t to)
sf.which_variant = meshtastic_StoreAndForward_history_tag;
sf.variant.history.history_messages = queueSize;
sf.variant.history.window = msAgo;
sf.variant.history.last_request = last_request_index;
storeForwardModule->sendMessage(to, sf);
}

/**
* Creates a new history queue with messages that were received within the specified time frame.
*
* @param msAgo The number of milliseconds ago to start the history queue.
* @param to The maximum number of messages to include in the history queue.
* @param to The NodeNum of the recipient.
* @param last_request_index The index in the packet history of the last request from this node.
* @return The ID of the newly created history queue.
*/
uint32_t StoreForwardModule::historyQueueCreate(uint32_t msAgo, uint32_t to)
uint32_t StoreForwardModule::historyQueueCreate(uint32_t msAgo, uint32_t to, uint32_t *last_request_index)
{

this->packetHistoryTXQueue_size = 0;
// If our history was cleared, ignore what the client is telling us
uint32_t last_index = *last_request_index >= this->packetHistoryCurrent ? 0 : *last_request_index;

for (int i = 0; i < this->packetHistoryCurrent; i++) {
for (int i = last_index; i < this->packetHistoryCurrent; i++) {
/*
LOG_DEBUG("SF historyQueueCreate\n");
LOG_DEBUG("SF historyQueueCreate - time %d\n", this->packetHistory[i].time);
LOG_DEBUG("SF historyQueueCreate - millis %d\n", millis());
LOG_DEBUG("SF historyQueueCreate - math %d\n", (millis() - msAgo));
*/
if (this->packetHistory[i].time && (this->packetHistory[i].time < (millis() - msAgo))) {
LOG_DEBUG("*** SF historyQueueCreate - Time matches - ok\n");
/*
Copy the messages that were received by the router in the last msAgo
/* Copy the messages that were received by the router in the last msAgo
to the packetHistoryTXQueue structure.
TODO: The condition (this->packetHistory[i].to & NODENUM_BROADCAST) == to) is not tested since
I don't have an easy way to target a specific user. Will need to do this soon.
*/
if ((this->packetHistory[i].to & NODENUM_BROADCAST) == NODENUM_BROADCAST ||
((this->packetHistory[i].to & NODENUM_BROADCAST) == to)) {
Client not interested in packets from itself and only in broadcast packets or packets towards it. */
if (this->packetHistory[i].from != to &&
(this->packetHistory[i].to == NODENUM_BROADCAST || this->packetHistory[i].to == to)) {
this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].time = this->packetHistory[i].time;
this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].to = this->packetHistory[i].to;
this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].from = this->packetHistory[i].from;
Expand All @@ -159,9 +154,10 @@ uint32_t StoreForwardModule::historyQueueCreate(uint32_t msAgo, uint32_t to)
memcpy(this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].payload, this->packetHistory[i].payload,
meshtastic_Constants_DATA_PAYLOAD_LEN);
this->packetHistoryTXQueue_size++;
*last_request_index = i + 1; // Set to one higher such that we don't send the same message again

LOG_DEBUG("*** PacketHistoryStruct time=%d\n", this->packetHistory[i].time);
LOG_DEBUG("*** PacketHistoryStruct msg=%s\n", this->packetHistory[i].payload);
LOG_DEBUG("*** PacketHistoryStruct time=%d, msg=%s\n", this->packetHistory[i].time,
this->packetHistory[i].payload);
}
}
}
Expand All @@ -177,15 +173,20 @@ void StoreForwardModule::historyAdd(const meshtastic_MeshPacket &mp)
{
const auto &p = mp.decoded;

this->packetHistory[this->packetHistoryCurrent].time = millis();
this->packetHistory[this->packetHistoryCurrent].to = mp.to;
this->packetHistory[this->packetHistoryCurrent].channel = mp.channel;
this->packetHistory[this->packetHistoryCurrent].from = mp.from;
this->packetHistory[this->packetHistoryCurrent].payload_size = p.payload.size;
memcpy(this->packetHistory[this->packetHistoryCurrent].payload, p.payload.bytes, meshtastic_Constants_DATA_PAYLOAD_LEN);
if (this->packetHistoryCurrent < this->records) {
this->packetHistory[this->packetHistoryCurrent].time = millis();
this->packetHistory[this->packetHistoryCurrent].to = mp.to;
this->packetHistory[this->packetHistoryCurrent].channel = mp.channel;
this->packetHistory[this->packetHistoryCurrent].from = mp.from;
this->packetHistory[this->packetHistoryCurrent].payload_size = p.payload.size;
memcpy(this->packetHistory[this->packetHistoryCurrent].payload, p.payload.bytes, meshtastic_Constants_DATA_PAYLOAD_LEN);

this->packetHistoryCurrent++;
this->packetHistoryMax++;
this->packetHistoryCurrent++;
this->packetHistoryMax++;
} else {
// TODO: Overwrite the oldest message in the history buffer when it is full.
LOG_WARN("*** S&F - PSRAM Full. Packet is not added to the history.\n");
}
}

meshtastic_MeshPacket *StoreForwardModule::allocReply()
Expand Down Expand Up @@ -213,10 +214,19 @@ void StoreForwardModule::sendPayload(NodeNum dest, uint32_t packetHistory_index)
// TODO: Make this configurable.
p->want_ack = false;

p->decoded.payload.size =
this->packetHistoryTXQueue[packetHistory_index].payload_size; // You must specify how many bytes are in the reply
memcpy(p->decoded.payload.bytes, this->packetHistoryTXQueue[packetHistory_index].payload,
meshtastic_StoreAndForward sf = meshtastic_StoreAndForward_init_zero;
sf.which_variant = meshtastic_StoreAndForward_text_tag;
sf.variant.text.size = this->packetHistoryTXQueue[packetHistory_index].payload_size;
memcpy(sf.variant.text.bytes, this->packetHistoryTXQueue[packetHistory_index].payload,
this->packetHistoryTXQueue[packetHistory_index].payload_size);
if (this->packetHistoryTXQueue[packetHistory_index].to == NODENUM_BROADCAST) {
sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_TEXT_BROADCAST;
} else {
sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_TEXT_DIRECT;
}

p->decoded.payload.size =
pb_encode_to_bytes(p->decoded.payload.bytes, sizeof(p->decoded.payload.bytes), &meshtastic_StoreAndForward_msg, &sf);

service.sendToMesh(p);
}
Expand Down Expand Up @@ -387,7 +397,9 @@ bool StoreForwardModule::handleReceivedProtobuf(const meshtastic_MeshPacket &mp,
LOG_INFO("*** S&F - Busy. Try again shortly.\n");
} else {
if ((p->which_variant == meshtastic_StoreAndForward_history_tag) && (p->variant.history.window > 0)) {
storeForwardModule->historySend(p->variant.history.window * 60000, getFrom(&mp)); // window is in minutes
// window is in minutes
storeForwardModule->historySend(p->variant.history.window * 60000, getFrom(&mp),
p->variant.history.last_request);
} else {
storeForwardModule->historySend(historyReturnWindow * 60000, getFrom(&mp)); // defaults to 4 hours
}
Expand All @@ -406,8 +418,7 @@ bool StoreForwardModule::handleReceivedProtobuf(const meshtastic_MeshPacket &mp,
case meshtastic_StoreAndForward_RequestResponse_CLIENT_PONG:
if (is_server) {
LOG_INFO("*** StoreAndForward_RequestResponse_CLIENT_PONG\n");
// The Client is alive, update NodeDB
nodeDB.updateFrom(mp);
// NodeDB is already updated
}
break;

Expand Down Expand Up @@ -546,9 +557,7 @@ StoreForwardModule::StoreForwardModule()
}

// Client
}
if ((config.device.role == meshtastic_Config_DeviceConfig_Role_CLIENT) ||
(config.device.role == meshtastic_Config_DeviceConfig_Role_ROUTER_CLIENT)) {
} else {
is_client = true;
LOG_INFO("*** Initializing Store & Forward Module in Client mode\n");
}
Expand Down
21 changes: 10 additions & 11 deletions src/modules/esp32/StoreForwardModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ struct PacketHistoryStruct {
uint32_t to;
uint32_t from;
uint8_t channel;
bool ack;
uint8_t payload[meshtastic_Constants_DATA_PAYLOAD_LEN];
pb_size_t payload_size;
};
Expand All @@ -32,7 +31,7 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule<
uint32_t packetHistoryTXQueue_size = 0;
uint32_t packetHistoryTXQueue_index = 0;

uint32_t packetTimeMax = 5000;
uint32_t packetTimeMax = 5000; // Interval between sending history packets as a server.

bool is_client = false;
bool is_server = false;
Expand All @@ -41,17 +40,17 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule<
StoreForwardModule();

unsigned long lastHeartbeat = 0;
uint32_t heartbeatInterval = 300;
uint32_t heartbeatInterval = default_broadcast_interval_secs;

/**
Update our local reference of when we last saw that node.
@return 0 if we have never seen that node before otherwise return the last time we saw the node.
*/
void historyAdd(const meshtastic_MeshPacket &mp);
void statsSend(uint32_t to);
void historySend(uint32_t msAgo, uint32_t to);
void historySend(uint32_t msAgo, uint32_t to, uint32_t last_request_index = 0);

uint32_t historyQueueCreate(uint32_t msAgo, uint32_t to);
uint32_t historyQueueCreate(uint32_t msAgo, uint32_t to, uint32_t *last_request_index);

/**
* Send our payload into the mesh
Expand Down Expand Up @@ -79,16 +78,16 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule<
void populatePSRAM();

// S&F Defaults
uint32_t historyReturnMax = 250; // 250 records
uint32_t historyReturnWindow = 240; // 4 hours
uint32_t historyReturnMax = 25; // Return maximum of 25 records by default.
uint32_t historyReturnWindow = 240; // Return history of last 4 hours by default.
uint32_t records = 0; // Calculated
bool heartbeat = false; // No heartbeat.

// stats
uint32_t requests = 0;
uint32_t requests_history = 0;
uint32_t requests = 0; // Number of times any client sent a request to the S&F.
uint32_t requests_history = 0; // Number of times the history was requested.

uint32_t retry_delay = 0;
uint32_t retry_delay = 0; // If server is busy, retry after this delay (in ms).

protected:
virtual int32_t runOnce() override;
Expand All @@ -102,4 +101,4 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule<
virtual bool handleReceivedProtobuf(const meshtastic_MeshPacket &mp, meshtastic_StoreAndForward *p);
};

extern StoreForwardModule *storeForwardModule;
extern StoreForwardModule *storeForwardModule;

0 comments on commit d9bd9bd

Please sign in to comment.