Skip to content

Commit

Permalink
Merge pull request #764 from SignalK/task_queue_producer
Browse files Browse the repository at this point in the history
Refactor TaskQueueProducer
  • Loading branch information
mairas authored Oct 8, 2024
2 parents 982f8fd + 1cf72c9 commit df306c4
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 73 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: SensESP Automatic Build

on: [pull_request]
on: [push]

jobs:
build:
Expand Down
2 changes: 1 addition & 1 deletion src/sensesp/signalk/signalk_ws_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class SKWSClient : virtual public FileSystemSaveable,
SKDeltaQueue* sk_delta_queue_;
/// @brief Emits the number of deltas sent since last report
TaskQueueProducer<int> delta_tx_tick_producer_ =
TaskQueueProducer<int>(0, event_loop(), 5, 990);
TaskQueueProducer<int>(0, event_loop(), 990);
Integrator<int, int> delta_tx_count_producer_{1, 0, ""};
Integrator<int, int> delta_rx_count_producer_{1, 0, ""};

Expand Down
25 changes: 6 additions & 19 deletions src/sensesp/system/saveable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,10 @@ bool FileSystemSaveable::load() {
return false;
}

ESP_LOGD(__FILENAME__, "Loading configuration for path %s from file %s",
config_path_.c_str(), filename.c_str());

File f = SPIFFS.open(filename, "r");
String str = f.readString();
ESP_LOGD(__FILENAME__, "Configuration file contents: %s", str.c_str());
JsonDocument json_doc;
auto error = deserializeJson(json_doc, str);
auto error = deserializeJson(json_doc, f);
f.close();
if (error) {
ESP_LOGW(__FILENAME__, "Could not parse configuration for %s",
config_path_.c_str());
Expand All @@ -36,8 +32,8 @@ bool FileSystemSaveable::load() {
if (!from_json(obj)) {
ESP_LOGW(__FILENAME__, "Could not convert configuration to Json for %s",
config_path_.c_str());
return false;
}
f.close();
ESP_LOGD(__FILENAME__, "Configuration loaded for %s", config_path_.c_str());
return true;
}
Expand All @@ -46,21 +42,14 @@ bool FileSystemSaveable::save() {
if (config_path_ == "") {
return false;
}
ESP_LOGI(__FILENAME__, "Saving configuration for path %s", config_path_.c_str());

String hash_path = String("/") + Base64Sha1(config_path_);

// Delete any legacy configuration files
// Delete any existing configuration files
String filename;
if (find_config_file(config_path_, filename)) {
ESP_LOGD(__FILENAME__, "Deleting legacy configuration file %s",
filename.c_str());
SPIFFS.remove(filename);
}

ESP_LOGD(__FILENAME__, "Saving configuration path %s to file %s",
config_path_.c_str(), hash_path.c_str());

JsonDocument json_doc;
JsonObject obj = json_doc.as<JsonObject>();
if (!to_json(obj)) {
Expand All @@ -74,16 +63,15 @@ bool FileSystemSaveable::save() {

String str;
serializeJson(json_doc, str);
ESP_LOGD(__FILENAME__, "Configuration saved for %s: %s", config_path_.c_str(),
ESP_LOGI(__FILENAME__, "Configuration saved for %s: %s", config_path_.c_str(),
str.c_str());

return true;
}

bool FileSystemSaveable::remove() {
if (config_path_ == "") {
ESP_LOGD(__FILENAME__,
"Could not clear configuration (config_path not set)");
return false;
}

String filename;
Expand All @@ -94,7 +82,6 @@ bool FileSystemSaveable::remove() {
String hash_path = String("/") + Base64Sha1(config_path_);

if (SPIFFS.exists(hash_path)) {
ESP_LOGD(__FILENAME__, "Deleting configuration file %s", hash_path.c_str());
SPIFFS.remove(hash_path);
}
return true;
Expand Down
5 changes: 4 additions & 1 deletion src/sensesp/system/semaphore_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ namespace sensesp {
*
* SemaphoreValue is primarily useful for synchronizing access to a value.
* It is a regular ValueConsumer that can receive values from a ValueProducer.
* However, the value is wrapped in a mutex, which can be waited on.
* The value is wrapped in a semaphore which can be waited on.
* This allows a thread to wait until the value is updated, making
* SemaphoreValue useful for synchronizing threads.
*
* SemaphoreValue is similar to TaskQueueProducer, but it is not a queue and
* does not poll or emit the values.
*
*/
template <typename T>
class SemaphoreValue : public ValueConsumer<T> {
Expand Down
40 changes: 22 additions & 18 deletions src/sensesp/system/stream_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,32 +37,36 @@ class StreamLineProducer : public ValueProducer<String> {
reactesp::EventLoop* event_loop = event_loop(),
int max_line_length = 256)
: stream_{stream}, max_line_length_{max_line_length} {
static int buf_pos = 0;
buf_ = new char[max_line_length_ + 1];
read_event_ = event_loop->onAvailable(*stream_, [this]() {
while (stream_->available()) {
char c = stream_->read();
if (c == '\n') {
// Include the newline character in the output
buf_[buf_pos++] = c;
buf_[buf_pos] = '\0';
this->emit(buf_);
buf_pos = 0;
} else {
buf_[buf_pos++] = c;
if (buf_pos >= max_line_length_ - 1) {
buf_pos = 0;
}
}
}
});
read_event_ =
event_loop->onAvailable(*stream_, [this]() { this->receive_line(); });
}

protected:
const int max_line_length_;
int buf_pos = 0;
char* buf_;
Stream* stream_;
reactesp::StreamEvent* read_event_;

void receive_line() {
while (stream_->available()) {
char c = stream_->read();
if (c == '\n') {
// Include the newline character in the output
buf_[buf_pos++] = c;
buf_[buf_pos] = '\0';
ESP_LOGV("StreamLineProducer", "About to emit line: %s", buf_);
this->emit(buf_);
buf_pos = 0;
} else {
buf_[buf_pos++] = c;
if (buf_pos >= max_line_length_ - 1) {
buf_pos = 0;
}
}
}
}
};

} // namespace sensesp
Expand Down
128 changes: 95 additions & 33 deletions src/sensesp/system/task_queue_producer.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,67 @@
#ifndef SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_
#define SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_

#include <limits>
#include <queue>

#include "ReactESP.h"
#include "observablevalue.h"
#include "sensesp_base_app.h"

namespace sensesp {

/**
* @brief Thread-safe queue for inter-task communication. Works like std::queue.
*
* @tparam T
*/
template <typename T>
class SafeQueue : public std::queue<T> {
public:
SafeQueue() : std::queue<T>() {
queue_semaphore_ =
xSemaphoreCreateCounting(std::numeric_limits<int>::max(), 0);
write_lock_ = xSemaphoreCreateMutex();
}

void push(const T& value) {
xSemaphoreTake(write_lock_, portMAX_DELAY);
std::queue<T>::push(value);
xSemaphoreGive(queue_semaphore_);
xSemaphoreGive(write_lock_);
}

bool pop(T& value, unsigned int max_duration_ms) {
if (xSemaphoreTake(queue_semaphore_,
max_duration_ms / portTICK_PERIOD_MS) == pdTRUE) {
xSemaphoreTake(write_lock_, portMAX_DELAY);
value = std::queue<T>::front();
std::queue<T>::pop();
xSemaphoreGive(write_lock_);
return true;
}
return false;
}

bool empty() {
xSemaphoreTake(write_lock_, portMAX_DELAY);
bool result = std::queue<T>::empty();
xSemaphoreGive(write_lock_);
return result;
}

size_t size() {
xSemaphoreTake(write_lock_, portMAX_DELAY);
size_t result = std::queue<T>::size();
xSemaphoreGive(write_lock_);
return result;
}

protected:
SemaphoreHandle_t queue_semaphore_; // Mirrors the items in the queue
SemaphoreHandle_t write_lock_; // Lock for writing to the queue
};

/**
* @brief Producer class that works across task boundaries.
*
Expand All @@ -16,54 +71,61 @@ namespace sensesp {
* in another.
*
* @tparam T
* @param consumer_app The app object in which the values should be consumed.
* @param queue_size Size of the queue.
* @param poll_rate How often to poll the queue. Note: in microseconds!
* @param consumer_event_loop The event loop in which the values should be
* consumed.
* @param poll_rate How often to poll the queue. Note: in microseconds! A value
* of 0 means that the queue will be polled on every tick.
*/
template <class T>
class TaskQueueProducer : public ObservableValue<T> {
public:
TaskQueueProducer(const T& value,
reactesp::EventLoop* consumer_app = event_loop(),
int queue_size = 1, unsigned int poll_rate = 990)
: ObservableValue<T>(value), queue_size_{queue_size} {
queue_ = xQueueCreate(queue_size, sizeof(T));
if (queue_ == NULL) {
ESP_LOGE(__FILENAME__, "Failed to create queue");
}

// Create a repeat event that will poll the queue and emit the values
consumer_app->onRepeatMicros(poll_rate, [this]() {
TaskQueueProducer(const T& value, reactesp::EventLoop* consumer_event_loop,
unsigned int poll_rate = 990)
: ObservableValue<T>(value) {
auto func = [this]() {
T value;
while (xQueueReceive(queue_, &value, 0) == pdTRUE) {
while (queue_.pop(value, 0)) {
this->emit(value);
}
});
};

// Create a repeat event that will poll the queue and emit the values
if (poll_rate == 0) {
consumer_event_loop->onTick(func);
} else {
consumer_event_loop->onRepeatMicros(poll_rate, func);
}
}

TaskQueueProducer(const T& value, int queue_size = 1,
unsigned int poll_rate = 990)
: TaskQueueProducer(value, event_loop(), queue_size,
poll_rate) {}
TaskQueueProducer(const T& value, unsigned int poll_rate = 990)
: TaskQueueProducer(value, event_loop(), poll_rate) {}

virtual void set(const T& value) override {
// WARNING: This does not check if the queue is full.
xQueueSend(queue_, &value, 0);
}
virtual void set(const T& value) override { queue_.push(value); }

int push(const T& value) {
int retval;
if (queue_size_ == 1) {
retval = xQueueOverwrite(queue_, &value);
} else {
retval = xQueueSend(queue_, &value, 0);
/**
* @brief Wait for a value to be available in the queue.
*
* This function will block until a value is available in the queue. When a
* value becomes available, it will be returned in the reference and
* emitted to the observers.
*
* @param value Received value if the function returns true.
* @param max_duration_ms Maximum duration to wait for the value.
* @return true Value was received successfully.
* @return false
*/
bool wait(T& value, unsigned int max_duration_ms) {
T received_value;
bool result = queue_.pop(received_value, max_duration_ms);
if (result) {
value = received_value;
this->emit(value);
}
return retval;
return result;
}

private:
int queue_size_;
QueueHandle_t queue_;
SafeQueue<T> queue_;
};

} // namespace sensesp
Expand Down
2 changes: 2 additions & 0 deletions src/sensesp/transforms/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

#include <functional>

#include "sensesp/transforms/transform.h"

namespace sensesp {

/**
Expand Down

0 comments on commit df306c4

Please sign in to comment.