Skip to content

Commit

Permalink
Trigger a flow start by REST API or MQTT (#1648)
Browse files Browse the repository at this point in the history
* Trigger flow start by Rest API

* Increase handlers

* Update

* Update

* Update

* Change max handlers

* Add debug message

* Trigger flow start by MQTT

* Update

* Remove unused function

* Remove handler_doflow + routines

* Cleanup

* MergeCheck
  • Loading branch information
Slider0007 authored Dec 21, 2022
1 parent e54d914 commit f6bf7e3
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 134 deletions.
97 changes: 56 additions & 41 deletions code/components/jomjol_mqtt/interface_mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
static const char *TAG = "MQTT IF";

std::map<std::string, std::function<void()>>* connectFunktionMap = NULL;
std::map<std::string, std::function<bool(std::string, char*, int)>>* subscribeFunktionMap = NULL;

std::map<std::string, std::function<bool(std::string, char*, int)>>* subscribeFunktionMap = NULL;

int failedOnRound = -1;

Expand Down Expand Up @@ -86,7 +85,6 @@ bool MQTTPublish(std::string _key, std::string _content, int retained_flag)


static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) {
int msg_id;
std::string topic = "";
switch (event->event_id) {
case MQTT_EVENT_BEFORE_CONNECT:
Expand All @@ -106,8 +104,6 @@ static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) {
break;
case MQTT_EVENT_SUBSCRIBED:
ESP_LOGD(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0);
ESP_LOGD(TAG, "sent publish successful, msg_id=%d", msg_id);
break;
case MQTT_EVENT_UNSUBSCRIBED:
ESP_LOGD(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
Expand All @@ -117,12 +113,12 @@ static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) {
break;
case MQTT_EVENT_DATA:
ESP_LOGD(TAG, "MQTT_EVENT_DATA");
ESP_LOGD(TAG, "TOPIC=%.*s\r\n", event->topic_len, event->topic);
ESP_LOGD(TAG, "DATA=%.*s\r\n", event->data_len, event->data);
ESP_LOGD(TAG, "TOPIC=%.*s", event->topic_len, event->topic);
ESP_LOGD(TAG, "DATA=%.*s", event->data_len, event->data);
topic.assign(event->topic, event->topic_len);
if (subscribeFunktionMap != NULL) {
if (subscribeFunktionMap->find(topic) != subscribeFunktionMap->end()) {
ESP_LOGD(TAG, "call handler function\r\n");
ESP_LOGD(TAG, "call subcribe function for topic %s", topic.c_str());
(*subscribeFunktionMap)[topic](topic, event->data, event->data_len);
}
} else {
Expand All @@ -148,6 +144,7 @@ static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) {
return ESP_OK;
}


static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) {
ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id);
mqtt_event_handler_cb((esp_mqtt_event_handle_t) event_data);
Expand Down Expand Up @@ -276,24 +273,70 @@ int MQTT_Init() {
void MQTTdestroy_client() {
if (client) {
if (mqtt_connected) {
MQTTdestroySubscribeFunction();
esp_mqtt_client_disconnect(client);
mqtt_connected = false;
}
}
esp_mqtt_client_stop(client);
esp_mqtt_client_destroy(client);
client = NULL;
mqtt_initialized = false;
}
}


bool getMQTTisEnabled() {
return mqtt_enabled;
}


bool getMQTTisConnected() {
return mqtt_connected;
}


bool mqtt_handler_flow_start(std::string _topic, char* _data, int _data_len) {
ESP_LOGD(TAG, "Handler called: topic %s, data %.*s", _topic.c_str(), _data_len, _data);

if (_data_len > 0) {
MQTTCtrlFlowStart(_topic);
}

return ESP_OK;
}


void MQTTconnected(){
if (mqtt_connected) {
LogFile.WriteToFile(ESP_LOG_INFO, TAG, "Connected to broker");
MQTTPublish(lwt_topic, lwt_connected, true); // Publish "connected" to maintopic/connection

if (connectFunktionMap != NULL) {
for(std::map<std::string, std::function<void()>>::iterator it = connectFunktionMap->begin(); it != connectFunktionMap->end(); ++it) {
it->second();
ESP_LOGD(TAG, "call connect function %s", it->first.c_str());
}
}

/* Subcribe to topics */
std::function<bool(std::string topic, char* data, int data_len)> subHandler = mqtt_handler_flow_start;
MQTTregisterSubscribeFunction(maintopic + "/ctrl/flow_start", subHandler); // subcribe to maintopic/ctrl/flow_start

if (subscribeFunktionMap != NULL) {
for(std::map<std::string, std::function<bool(std::string, char*, int)>>::iterator it = subscribeFunktionMap->begin(); it != subscribeFunktionMap->end(); ++it) {
int msg_id = esp_mqtt_client_subscribe(client, it->first.c_str(), 0);
LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "topic " + it->first + " subscribe successful, msg_id=" + std::to_string(msg_id));
}
}
}

vTaskDelay(10000 / portTICK_PERIOD_MS); // Delay execution of callback routine after connection got established
if (callbackOnConnected) { // Call onConnected callback routine --> mqtt_server
callbackOnConnected(maintopic, SetRetainFlag);
}
}


void MQTTregisterConnectFunction(std::string name, std::function<void()> func){
ESP_LOGD(TAG, "MQTTregisteronnectFunction %s\r\n", name.c_str());
if (connectFunktionMap == NULL) {
Expand All @@ -312,15 +355,17 @@ void MQTTregisterConnectFunction(std::string name, std::function<void()> func){
}
}


void MQTTunregisterConnectFunction(std::string name){
ESP_LOGD(TAG, "unregisterConnnectFunction %s\r\n", name.c_str());
if ((connectFunktionMap != NULL) && (connectFunktionMap->find(name) != connectFunktionMap->end())) {
connectFunktionMap->erase(name);
}
}


void MQTTregisterSubscribeFunction(std::string topic, std::function<bool(std::string, char*, int)> func){
ESP_LOGD(TAG, "registerSubscribeFunction %s\r\n", topic.c_str());
ESP_LOGD(TAG, "registerSubscribeFunction %s", topic.c_str());
if (subscribeFunktionMap == NULL) {
subscribeFunktionMap = new std::map<std::string, std::function<bool(std::string, char*, int)>>();
}
Expand All @@ -331,45 +376,15 @@ void MQTTregisterSubscribeFunction(std::string topic, std::function<bool(std::st
}

(*subscribeFunktionMap)[topic] = func;

if (mqtt_connected) {
int msg_id = esp_mqtt_client_subscribe(client, topic.c_str(), 0);
ESP_LOGD(TAG, "topic %s subscribe successful, msg_id=%d", topic.c_str(), msg_id);
}
}

void MQTTconnected(){
if (mqtt_connected) {
LogFile.WriteToFile(ESP_LOG_INFO, TAG, "Connected to broker");

MQTTPublish(lwt_topic, lwt_connected, true);

if (connectFunktionMap != NULL) {
for(std::map<std::string, std::function<void()>>::iterator it = connectFunktionMap->begin(); it != connectFunktionMap->end(); ++it) {
it->second();
ESP_LOGD(TAG, "call connect function %s", it->first.c_str());
}
}

if (subscribeFunktionMap != NULL) {
for(std::map<std::string, std::function<bool(std::string, char*, int)>>::iterator it = subscribeFunktionMap->begin(); it != subscribeFunktionMap->end(); ++it) {
int msg_id = esp_mqtt_client_subscribe(client, it->first.c_str(), 0);
LogFile.WriteToFile(ESP_LOG_INFO, TAG, "topic " + it->first + " subscribe successful, msg_id=" + std::to_string(msg_id));
}
}

if (callbackOnConnected) {
callbackOnConnected(maintopic, SetRetainFlag);
}
}
}

void MQTTdestroySubscribeFunction(){
if (subscribeFunktionMap != NULL) {
if (mqtt_connected) {
for(std::map<std::string, std::function<bool(std::string, char*, int)>>::iterator it = subscribeFunktionMap->begin(); it != subscribeFunktionMap->end(); ++it) {
int msg_id = esp_mqtt_client_unsubscribe(client, it->first.c_str());
ESP_LOGI(TAG, "topic %s unsubscribe successful, msg_id=%d", it->first.c_str(), msg_id);
ESP_LOGD(TAG, "topic %s unsubscribe successful, msg_id=%d", it->first.c_str(), msg_id);
}
}

Expand Down
1 change: 0 additions & 1 deletion code/components/jomjol_mqtt/interface_mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,5 @@ void MQTTregisterSubscribeFunction(std::string topic, std::function<bool(std::st
void MQTTdestroySubscribeFunction();
void MQTTconnected();

void MQTTdisable();
#endif //INTERFACE_MQTT_H
#endif //#ENABLE_MQTT
1 change: 0 additions & 1 deletion code/components/jomjol_mqtt/server_mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ esp_err_t sendDiscovery_and_static_Topics(httpd_req_t *req) {
}

void GotConnected(std::string maintopic, int retainFlag) {
vTaskDelay(10000 / portTICK_PERIOD_MS); // Delay execution by 10s after connection got established
if (HomeassistantDiscovery) {
MQTThomeassistantDiscovery();
}
Expand Down
Loading

0 comments on commit f6bf7e3

Please sign in to comment.