Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send all the eqLogics/cmds to the daemon #275

Open
github-actions bot opened this issue Nov 12, 2023 · 0 comments
Open

Send all the eqLogics/cmds to the daemon #275

github-actions bot opened this issue Nov 12, 2023 · 0 comments
Assignees
Labels
enhancement New feature or request php Pull requests that update Php code todo

Comments

@github-actions
Copy link

$all_data = jMQTT::full_export();

return json_encode($all_data, JSON_UNESCAPED_UNICODE);

Remove PID file

Remove PID file

$broker = jMQTT::byId($id); // Don't use getBrokerFromId here!

$broker = jMQTT::byId($id); // Don't use getBrokerFromId here!

Need to check if statusCmd exists, because during Remove cmd are destroyed first by eqLogic::remove()

Need to check if statusCmd exists, because during Remove cmd are destroyed first by eqLogic::remove()

// TODO: Send all the eqLogics/cmds to the daemon

class jMQTTComFromDaemon {

    /**
     * Daemon callback to tell Jeedom it is started
     */
    public static function daemonUp($ruid) {
        // If we get here, apikey is OK!
        //jMQTT::logger('debug', 'daemonUp(ruid='.$ruid.')');
        // Verify that daemon RemoteUID contains ':' or die
        if (is_null($ruid) || !is_string($ruid) || (strpos($ruid, ':') === false)) {
            jMQTT::logger(
                'warning',
                sprintf(
                    __("Démon [%s] : Inconsistant", __FILE__),
                    $ruid
                )
            );
            return '';
        }
        // Verify that this daemon is not already initialized
        $cuid = @cache::byKey('jMQTT::'.jMQTTConst::CACHE_DAEMON_UID)->getValue("0:0");
        if ($cuid == $ruid) {
            jMQTT::logger(
                'info',
                sprintf(
                    __("Démon [%s] : Déjà initialisé", __FILE__),
                    $ruid
                )
            );
            return '';
        }
        list($rpid, $rport) = array_map('intval', explode(":", $ruid));
        // Verify Remote UID coherence
        if ($rpid == 0) {
            // If Remote PID is NOT available
            jMQTT::logger(
                'warning',
                sprintf(
                    __("Démon [%s] : Pas d'identifiant d'exécution", __FILE__),
                    $ruid
                )
            );
            return '';
        }
        if (!@posix_getsid($rpid)) {
            // Remote PID is not running
            jMQTT::logger(
                'warning',
                sprintf(
                    __("Démon [%s] : Mauvais identifiant d'exécution", __FILE__),
                    $ruid
                )
            );
            return '';
        }
        // Searching a match for RemoteUID (PID and PORT) in listening ports
        $retval = 255;
        exec("ss -Htulpn 'sport = :" . $rport ."' 2> /dev/null | grep -E '[:]" . $rport . "[ \t]+.*[:][*][ \t]+.+pid=" . $rpid . "' 2> /dev/null", $output, $retval);
        // Execution issue with ss (too new)? Try (the good old) netstat!
        if ($retval != 0) {
            // Be sure to clear $output first
            unset($output);
            exec("netstat -lntp 2> /dev/null | grep -E '[:]" . $rport . "[ \t]+.*[:][*][ \t]+.+[ \t]+" . $rpid . "/python3' 2> /dev/null", $output, $retval);
        }
        // Execution issue with netstat? Try (the slow) lsof!
        if ($retval != 0) {
            // Be sure to clear $output first
            unset($output);
            exec("lsof -nP -iTCP -sTCP:LISTEN | grep -E 'python3[ \t]+" . $rpid . "[ \t]+.+[:]" . $rport ."[ \t]+' 2> /dev/null", $output, $retval);
        }
        if ($retval != 0 || count($output) == 0) {
            // Execution issue, could not get a match
            jMQTT::logger(
                'warning',
                sprintf(
                    __("Démon [%s] : N'a pas pu être authentifié", __FILE__),
                    $ruid
                )
            );
            return '';
        }
        // Verify if another daemon is not running
        list($cpid, $cport) = array_map('intval', explode(":", $cuid));
        if ($cpid != 0) { // Cached PID is available
            if (!@posix_getsid($cpid)) { // Cached PID is NOT running
                jMQTT::logger(
                    'warning',
                    sprintf(
                        __("Démon [%1\$s] va remplacer le Démon [%2\$s] !", __FILE__),
                        $ruid,
                        $cuid
                    )
                );
                // Must NOT `return ''` here, new daemon still needs to be accepted
                jMQTTDaemon::stop();
            } else { // Cached PID IS running
                jMQTT::logger(
                    'warning',
                    sprintf(
                        __("Démon [%1\$s] essaye de remplacer le Démon [%2\$s] !", __FILE__),
                        $ruid,
                        $cuid
                    )
                );
                exec(system::getCmdSudo() . 'fuser ' . $cport . '/tcp 2> /dev/null', $output, $retval);
                if ($retval != 0 || count($output) == 0) {
                    // Execution issue, could not get a match
                    jMQTT::logger(
                        'warning',
                        sprintf(
                            __("Démon [%s] : N'a pas pu être identifié", __FILE__),
                            $cuid
                        )
                    );
                    // Must NOT `return ''` here, new daemon still needs to be accepted
                    jMQTTDaemon::stop();
                } elseif (intval(trim($output[0])) != $cpid) {
                    // No match for old daemon
                    jMQTT::logger(
                        'warning',
                        sprintf(
                            __("Démon [%s] : Reprend la main", __FILE__),
                            $ruid
                        )
                    );
                    // Must NOT `return ''` here, new daemon still needs to be accepted
                    jMQTTDaemon::stop();
                } else {
                    // Old daemon is still alive. If Daemon is semi-dead, it may die by missing enough heartbeats
                    jMQTT::logger(
                        'warning',
                        sprintf(
                            __("Démon [%1\$s] va survivre au Démon [%2\$s] !", __FILE__),
                            $cuid,
                            $ruid
                        )
                    );
                    posix_kill($rpid, 15);
                    return '';
                }
            }
        }
        // VERY VERBOSE (1/5s to 1/m): Do not activate if not needed!
        //jMQTT::logger('debug', sprintf(__("Démon [%s] est vivant", __FILE__), $ruid));
        // Save in cache the daemon RemoteUID (as it is connected)
        cache::set('jMQTT::'.jMQTTConst::CACHE_DAEMON_UID, $ruid);
        cache::set('jMQTT::'.jMQTTConst::CACHE_DAEMON_PORT, $rport);
        cache::set('jMQTT::'.jMQTTConst::CACHE_DAEMON_LAST_RCV, time());
        cache::set('jMQTT::'.jMQTTConst::CACHE_DAEMON_LAST_SND, time());
        jMQTTDaemon::sendMqttDaemonStateEvent(true);
        // Launch MQTT Clients
        jMQTTDaemon::checkAllMqttClients();
        // Active listeners
        jMQTT::listenersAddAll();
        // Prepare and send initial data
        // TODO: Send all the eqLogics/cmds to the daemon
        //  labels: enhancement, php
        // $all_data = jMQTT::full_export();
        // return json_encode($all_data, JSON_UNESCAPED_UNICODE);
    }

    /**
     * Daemon callback to tell Jeedom it is OK
     */
    public static function hb($uid) {
        jMQTT::logger('debug', sprintf(__("Démon [%s] est en vie", __FILE__), $uid));
        cache::set('jMQTT::'.jMQTTConst::CACHE_DAEMON_LAST_RCV, time());
    }

    /**
     * Daemon callback to tell Jeedom it is stopped
     */
    public static function daemonDown($uid) {
        //jMQTT::logger('debug', 'daemonDown(uid='.$uid.')');
        // Remove PID file
        if (file_exists($pid_file = jeedom::getTmpFolder(jMQTT::class) . '/jmqttd.py.pid'))
            shell_exec(system::getCmdSudo() . 'rm -rf ' . $pid_file . ' 2>&1 > /dev/null');
        // Delete in cache the daemon uid (as it is disconnected)
        @cache::delete('jMQTT::' . jMQTTConst::CACHE_DAEMON_UID);
        // Send state to WebUI
        jMQTTDaemon::sendMqttDaemonStateEvent(false);
        // Remove listeners
        jMQTT::listenersRemoveAll();
        // Get all brokers and set them as disconnected
        foreach(jMQTT::getBrokers() as $broker) {
            try {
                jMQTTComFromDaemon::brkDown($broker->getId());
            } catch (Throwable $e) {
                if (log::getLogLevel(jMQTT::class) > 100) {
                    jMQTT::logger(
                        'error',
                        sprintf(
                            __("%1\$s() a levé l'Exception: %2\$s", __FILE__),
                            __METHOD__,
                            $e->getMessage()
                        )
                    );
                } else {
                    jMQTT::logger(
                        'error',
                        str_replace(
                            "\n",
                            ' <br/> ',
                            sprintf(
                                __("%1\$s() a levé l'Exception: %2\$s", __FILE__).
                                ",<br/>@Stack: %3\$s,<br/>@BrkId: %4\$s.",
                                __METHOD__,
                                $e->getMessage(),
                                $e->getTraceAsString(),
                                $broker->getId()
                            )
                        )
                    );
                }
            }
        }
    }

    public static function brkUp($id) {
        try { // Catch if broker is unknown / deleted
            $broker = jMQTT::getBrokerFromId(intval($id));
            // Save in cache that Mqtt Client is connected
            $broker->setCache(jMQTTConst::CACHE_MQTTCLIENT_CONNECTED, true);
            // If not existing at brkUp, create it
            $broker->checkAndUpdateCmd(
                $broker->getMqttClientStatusCmd(true),
                jMQTTConst::CLIENT_STATUS_ONLINE
            );
            // If not existing at brkUp, create it
            $broker->checkAndUpdateCmd(
                $broker->getMqttClientConnectedCmd(true),
                1
            );
            $broker->setStatus('warning', null);
            cache::set('jMQTT::'.jMQTTConst::CACHE_DAEMON_LAST_RCV, time());
            $broker->log('info', __('Client MQTT connecté au Broker', __FILE__));
            $broker->sendMqttClientStateEvent();
            // Subscribe to topics
            foreach (jMQTT::byBrkId($id) as $eq) {
                if ($eq->getIsEnable() && $eq->getId() != $broker->getId()) {
                    $eq->subscribeTopic($eq->getTopic(), $eq->getQos());
                }
            }

            // Enable Interactions
            if ($broker->getConf(jMQTTConst::CONF_KEY_MQTT_INT)) {
                $broker->log(
                    'info',
                    sprintf(
                        __("Souscription au topic d'Interaction '%s'", __FILE__),
                        $broker->getConf(jMQTTConst::CONF_KEY_MQTT_INT_TOPIC)
                    )
                );
                $broker->subscribeTopic($broker->getConf(jMQTTConst::CONF_KEY_MQTT_INT_TOPIC), '1');
                $broker->subscribeTopic($broker->getConf(jMQTTConst::CONF_KEY_MQTT_INT_TOPIC) . '/advanced', '1');
            } else
                $broker->log('debug', __("L'accès aux Interactions est désactivé", __FILE__));

            // Enable API
            if ($broker->getConf(jMQTTConst::CONF_KEY_MQTT_API)) {
                $broker->log(
                    'info',
                    sprintf(
                        __("Souscription au topic API '%s'", __FILE__),
                        $broker->getConf(jMQTTConst::CONF_KEY_MQTT_API_TOPIC)
                    )
                );
                $broker->subscribeTopic($broker->getConf(jMQTTConst::CONF_KEY_MQTT_API_TOPIC), '1');
            } else
                $broker->log('debug', __("L'accès à l'API est désactivé", __FILE__));

            // Active listeners
            jMQTT::listenersAddAll();
        } catch (Throwable $e) {
            if (log::getLogLevel(jMQTT::class) > 100)
                jMQTT::logger(
                    'error',
                    sprintf(
                        __("%1\$s() a levé l'Exception: %2\$s", __FILE__),
                        __METHOD__,
                        $e->getMessage()
                    )
                );
            else
                jMQTT::logger(
                    'error',
                    str_replace(
                        "\n",
                        ' <br/> ',
                        sprintf(
                            __("%1\$s() a levé l'Exception: %2\$s", __FILE__).
                            ",<br/>@Stack: %3\$s,<br/>@BrkId: %4\$s.",
                            __METHOD__,
                            $e->getMessage(),
                            $e->getTraceAsString(),
                            $id
                        )
                    )
                );
        }
    }

    public static function brkDown($id) {
        try { // Catch if broker is unknown / deleted
            $broker = jMQTT::byId($id); // Don't use getBrokerFromId here!
            if (!is_object($broker)) {
                jMQTT::logger(
                    'debug',
                    sprintf(
                        __("Pas d'équipement avec l'id %s (il vient probablement d'être supprimé)", __FILE__),
                        $id
                    )
                );
                return;
            }
            if ($broker->getType() != jMQTTConst::TYP_BRK) {
                jMQTT::logger(
                    'error',
                    sprintf(
                        __("L'équipement %s n'est pas de type Broker", __FILE__),
                        $id
                    )
                );
                return;
            }
            // Save in cache that Mqtt Client is disconnected
            $broker->setCache(jMQTTConst::CACHE_MQTTCLIENT_CONNECTED, false);

            // If command exists update the status (used to get broker connection status inside Jeedom)
            // Need to check if statusCmd exists, because during Remove cmd are destroyed first by eqLogic::remove()
            $broker->checkAndUpdateCmd(
                $broker->getMqttClientStatusCmd(),
                jMQTTConst::CLIENT_STATUS_OFFLINE
            );
            // Need to check if connectedCmd exists, because during Remove cmd are destroyed first by eqLogic::remove()
            $broker->checkAndUpdateCmd(
                $broker->getMqttClientConnectedCmd(),
                0
            );
            // Also set a warning if eq is enabled (should be always true)
            $broker->setStatus('warning', $broker->getIsEnable() ? 1 : null);

            // Clear Real Time mode
            $broker->setCache(jMQTTConst::CACHE_REALTIME_MODE, 0);

            cache::set('jMQTT::'.jMQTTConst::CACHE_DAEMON_LAST_RCV, time());
            $broker->log('info', __('Client MQTT déconnecté du Broker', __FILE__));
            $broker->sendMqttClientStateEvent();
        } catch (Throwable $e) {
            if (log::getLogLevel(jMQTT::class) > 100)
                jMQTT::logger(
                    'error',
                    sprintf(
                        __("%1\$s() a levé l'Exception: %2\$s", __FILE__),
                        __METHOD__,
                        $e->getMessage()
                    )
                );
            else
                jMQTT::logger(
                    'error',
                    str_replace(
                        "\n",
                        ' <br/> ',
                        sprintf(
                            __("%1\$s() a levé l'Exception: %2\$s", __FILE__).
                            ",<br/>@Stack: %3\$s,<br/>@BrkId: %4\$s.",
                            __METHOD__,
                            $e->getMessage(),
                            $e->getTraceAsString(),
                            $id
                        )
                    )
                );
        }
    }

    public static function msgIn($id, $topic, $payload, $qos, $retain) {
        try {
            $broker = jMQTT::getBrokerFromId(intval($id));
            $broker->brokerMessageCallback($topic, $payload, $qos, $retain);
            cache::set('jMQTT::'.jMQTTConst::CACHE_DAEMON_LAST_RCV, time());
        } catch (Throwable $e) {
            if (log::getLogLevel(jMQTT::class) > 100)
                jMQTT::logger(
                    'error',
                    sprintf(
                        __("%1\$s() a levé l'Exception: %2\$s", __FILE__),
                        __METHOD__,
                        $e->getMessage()
                    )
                );
            else
                jMQTT::logger(
                    'error',
                    str_replace(
                        "\n",
                        ' <br/> ',
                        sprintf(
                            __("%1\$s() a levé l'Exception: %2\$s", __FILE__).
                            ",<br/>@Stack: %3\$s,<br/>@BrkId: %4\$s,".
                            "<br/>@Topic: %5\$s,<br/>@Payload: %6\$s,".
                            "<br/>@Qos: %7\$s,<br/>@Retain: %8\$s.",
                            __METHOD__,
                            $e->getMessage(),
                            $e->getTraceAsString(),
                            $id, $topic, $payload, $qos, $retain
                        )
                    )
                );
        }
    }

    public static function value($cmdId, $value) {
        try {
            $cmd = jMQTTCmd::byId(intval($cmdId));
            if (!is_object($cmd)) {
                jMQTT::logger('debug', sprintf(
                    __("Pas de commande avec l'id %s", __FILE__),
                    $cmdId
                ));
                return;
            }
            $cmd->getEqLogic()->getBroker()->setStatus(array(
                'lastCommunication' => date('Y-m-d H:i:s'),
                'timeout' => 0
            ));
            $cmd->updateCmdValue($value);
            cache::set('jMQTT::'.jMQTTConst::CACHE_DAEMON_LAST_RCV, time());
        } catch (Throwable $e) {
            if (log::getLogLevel(jMQTT::class) > 100)
                jMQTT::logger(
                    'error',
                    sprintf(
                        __("%1\$s() a levé l'Exception: %2\$s", __FILE__),
                        __METHOD__,
                        $e->getMessage()
                    )
                );
            else
                jMQTT::logger(
                    'error',
                    str_replace(
                        "\n",
                        ' <br/> ',
                        sprintf(
                            __("%1\$s() a levé l'Exception: %2\$s", __FILE__).
                            ",<br/>@Stack: %3\$s,<br/>@cmdId: %4\$s,".
                            "<br/>@value: %5\$s.",
                            __METHOD__,
                            $e->getMessage(),
                            $e->getTraceAsString(),
                            $cmdId,
                            $value
                        )
                    )
                );
        }
    }

    public static function realTimeStarted($id) {
        $brk = jMQTT::getBrokerFromId(intval($id));
        // Update cache
        cache::set('jMQTT::'.jMQTTConst::CACHE_DAEMON_LAST_RCV, time());
        $brk->setCache(jMQTTConst::CACHE_REALTIME_MODE, 1);
        // Send event to WebUI
        $brk->log('info', __("Mode Temps Réel activé", __FILE__));
        $brk->sendMqttClientStateEvent();
    }

    public static function realTimeStopped($id, $nbMsgs) {
        $brk = jMQTT::getBrokerFromId(intval($id));
        // Update cache
        cache::set('jMQTT::'.jMQTTConst::CACHE_DAEMON_LAST_RCV, time());
        $brk->setCache(jMQTTConst::CACHE_REALTIME_MODE, 0);
        // Send event to WebUI
        $brk->log(
            'info',
            sprintf(
                __("Mode Temps Réel désactivé, %s messages disponibles", __FILE__),
                $nbMsgs
            )
        );
        $brk->sendMqttClientStateEvent();
    }

}
@github-actions github-actions bot added enhancement New feature or request php Pull requests that update Php code todo labels Nov 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request php Pull requests that update Php code todo
Projects
None yet
Development

No branches or pull requests

1 participant