diff --git a/.clang-tidy b/.clang-tidy index 8f2e4abb..1d046d0d 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -1,6 +1,8 @@ Checks: > bugprone*, misc-const-correctness, + performance*, + -performance-avoid-endl, -llvmlibc*, -fuchsia-overloaded-operator, -fuchsia-statically-constructed-objects, diff --git a/CMakeLists.txt b/CMakeLists.txt index e3d0acae..e3989639 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,7 +1,7 @@ cmake_minimum_required(VERSION 3.14) project(everest-framework - VERSION 0.19.1 + VERSION 0.19.2 DESCRIPTION "The open operating system for e-mobility charging stations" LANGUAGES CXX C ) diff --git a/everestjs/conversions.cpp b/everestjs/conversions.cpp index 2ed49d76..e3038b4e 100644 --- a/everestjs/conversions.cpp +++ b/everestjs/conversions.cpp @@ -54,6 +54,14 @@ Everest::json convertToJson(const Napi::Value& value) { napi_valuetype_strings[value.Type()])); } +Everest::json convertToConfigMap(const Everest::json& json_config) { + json config_map; + for (auto& entry : json_config.items()) { + config_map[entry.key()] = entry.value().at("value"); + } + return config_map; +} + Everest::TelemetryMap convertToTelemetryMap(const Napi::Object& obj) { BOOST_LOG_FUNCTION(); Everest::TelemetryMap telemetry; diff --git a/everestjs/conversions.hpp b/everestjs/conversions.hpp index edbcdb7d..25798d47 100644 --- a/everestjs/conversions.hpp +++ b/everestjs/conversions.hpp @@ -29,6 +29,7 @@ static const char* const napi_valuetype_strings[] = { }; Everest::json convertToJson(const Napi::Value& value); +Everest::json convertToConfigMap(const Everest::json& json_config); Everest::TelemetryMap convertToTelemetryMap(const Napi::Object& obj); Napi::Value convertToNapiValue(const Napi::Env& env, const Everest::json& value); diff --git a/everestjs/everestjs.cpp b/everestjs/everestjs.cpp index 71cdb1c7..727a2ac1 100644 --- a/everestjs/everestjs.cpp +++ b/everestjs/everestjs.cpp @@ -556,6 +556,8 @@ static Napi::Value is_condition_satisfied_req(const Requirement& req, const Napi static Napi::Value boot_module(const Napi::CallbackInfo& info) { BOOST_LOG_FUNCTION(); + const auto start_time = std::chrono::system_clock::now(); + auto env = info.Env(); auto available_handlers_prop = Napi::Object::New(env); @@ -612,10 +614,10 @@ static Napi::Value boot_module(const Napi::CallbackInfo& info) { "Module with identifier '" << module_id << "' not found in config!")); } - const std::string& module_name = config->get_main_config()[module_id]["module"].get(); - auto module_manifest = config->get_manifests()[module_name]; + const std::string& module_name = config->get_module_name(module_id); + const auto& module_manifest = config->get_manifests().at(module_name); // FIXME (aw): get_classes should be called get_units and should contain the type of class for each unit - auto module_impls = config->get_interfaces()[module_name]; + const auto& module_impls = config->get_interfaces().at(module_name); // initialize everest framework const auto& module_identifier = config->printable_identifier(module_id); @@ -733,9 +735,9 @@ static Napi::Value boot_module(const Napi::CallbackInfo& info) { auto uses_list_reqs_prop = Napi::Object::New(env); auto uses_cmds_prop = Napi::Object::New(env); auto uses_list_cmds_prop = Napi::Object::New(env); - for (auto& requirement : module_manifest["requires"].items()) { + for (const auto& requirement : module_manifest.at("requires").items()) { auto req_prop = Napi::Object::New(env); - auto const& requirement_id = requirement.key(); + const auto& requirement_id = requirement.key(); json req_route_list = config->resolve_requirement(module_id, requirement_id); // if this was a requirement with min_connections == 1 and max_connections == 1, // this will be simply a single connection, but an array of connections otherwise @@ -747,13 +749,13 @@ static Napi::Value boot_module(const Napi::CallbackInfo& info) { auto req_array_prop = Napi::Array::New(env); auto req_mod_cmds_array = Napi::Array::New(env); for (std::size_t i = 0; i < req_route_list.size(); i++) { - auto req_route = req_route_list[i]; + const auto& req_route = req_route_list[i]; const std::string& requirement_module_id = req_route["module_id"]; const std::string& requirement_impl_id = req_route["implementation_id"]; // FIXME (aw): why is const auto& not possible for the following line? // we only want cmds/vars from the required interface to be usable, not from it's child interfaces - std::string interface_name = req_route["required_interface"].get(); - auto requirement_impl_intf = config->get_interface_definition(interface_name); + const std::string& interface_name = req_route["required_interface"].get(); + const auto& requirement_impl_intf = config->get_interface_definition(interface_name); auto requirement_vars = Everest::Config::keys(requirement_impl_intf["vars"]); auto requirement_cmds = Everest::Config::keys(requirement_impl_intf["cmds"]); @@ -885,14 +887,15 @@ static Napi::Value boot_module(const Napi::CallbackInfo& info) { auto module_config_prop = Napi::Object::New(env); auto module_config_impl_prop = Napi::Object::New(env); - for (auto& config_map : module_config.items()) { + for (const auto& config_map : module_config.items()) { + const auto& json_config_map = convertToConfigMap(config_map.value()); if (config_map.key() == "!module") { module_config_prop.DefineProperty(Napi::PropertyDescriptor::Value( - "module", convertToNapiValue(env, config_map.value()), napi_enumerable)); + "module", convertToNapiValue(env, json_config_map), napi_enumerable)); continue; } module_config_impl_prop.DefineProperty(Napi::PropertyDescriptor::Value( - config_map.key(), convertToNapiValue(env, config_map.value()), napi_enumerable)); + config_map.key(), convertToNapiValue(env, json_config_map), napi_enumerable)); } module_config_prop.DefineProperty( Napi::PropertyDescriptor::Value("impl", module_config_impl_prop, napi_enumerable)); @@ -946,6 +949,10 @@ static Napi::Value boot_module(const Napi::CallbackInfo& info) { ctx->js_cb = std::make_unique(env, callback_wrapper); ctx->everest->register_on_ready_handler(framework_ready_handler); + const auto end_time = std::chrono::system_clock::now(); + EVLOG_info << "Module " << fmt::format(Everest::TERMINAL_STYLE_BLUE, "{}", module_id) << " initialized [" + << std::chrono::duration_cast(end_time - start_time).count() << "ms]"; + ctx->everest->spawn_main_loop_thread(); } catch (std::exception& e) { diff --git a/everestpy/src/everest/everestpy.cpp b/everestpy/src/everest/everestpy.cpp index 5ed43c01..2637a103 100644 --- a/everestpy/src/everest/everestpy.cpp +++ b/everestpy/src/everest/everestpy.cpp @@ -22,10 +22,20 @@ namespace py = pybind11; PYBIND11_MODULE(everestpy, m) { + py::class_(m, "MQTTSettings") + .def(py::init<>()) + .def_readwrite("broker_socket_path", &Everest::MQTTSettings::broker_socket_path) + .def_readwrite("broker_host", &Everest::MQTTSettings::broker_host) + .def_readwrite("broker_port", &Everest::MQTTSettings::broker_port) + .def_readwrite("everest_prefix", &Everest::MQTTSettings::everest_prefix) + .def_readwrite("external_prefix", &Everest::MQTTSettings::external_prefix) + .def("uses_socket", &Everest::MQTTSettings::uses_socket); + // FIXME (aw): add m.doc? py::class_(m, "RuntimeSession") .def(py::init<>()) - .def(py::init()); + .def(py::init()) + .def(py::init()); py::class_(m, "ModuleInfoPaths") .def_readonly("etc", &ModuleInfo::Paths::etc) @@ -130,7 +140,7 @@ PYBIND11_MODULE(everestpy, m) { .def(py::init()) .def("say_hello", &Module::say_hello) .def("init_done", py::overload_cast<>(&Module::init_done)) - .def("init_done", py::overload_cast>(&Module::init_done)) + .def("init_done", py::overload_cast&>(&Module::init_done)) .def("call_command", &Module::call_command) .def("publish_variable", &Module::publish_variable) .def("implement_command", &Module::implement_command) diff --git a/everestpy/src/everest/misc.cpp b/everestpy/src/everest/misc.cpp index e2e948b8..d9d06a5b 100644 --- a/everestpy/src/everest/misc.cpp +++ b/everestpy/src/everest/misc.cpp @@ -53,6 +53,17 @@ static Everest::MQTTSettings get_mqtt_settings_from_env() { } } +RuntimeSession::RuntimeSession(const Everest::MQTTSettings& mqtt_settings, const std::string& logging_config) { + this->mqtt_settings = mqtt_settings; + if (logging_config.empty()) { + this->logging_config_file = Everest::assert_dir(Everest::defaults::PREFIX, "Default prefix") / + std::filesystem::path(Everest::defaults::SYSCONF_DIR) / + Everest::defaults::NAMESPACE / Everest::defaults::LOGGING_CONFIG_NAME; + } else { + this->logging_config_file = Everest::assert_file(logging_config, "Default logging config"); + } +} + /// This is just kept for compatibility RuntimeSession::RuntimeSession(const std::string& prefix, const std::string& config_file) { EVLOG_warning @@ -63,18 +74,14 @@ RuntimeSession::RuntimeSession(const std::string& prefix, const std::string& con // We extract the settings from the config file so everest-testing doesn't break const auto ms = Everest::ManagerSettings(prefix, config_file); - Everest::Logging::init(ms.runtime_settings->logging_config_file.string()); + this->logging_config_file = ms.runtime_settings->logging_config_file; this->mqtt_settings = ms.mqtt_settings; } RuntimeSession::RuntimeSession() { - const auto module_id = get_variable_from_env("EV_MODULE"); - - namespace fs = std::filesystem; - const fs::path logging_config_file = + this->logging_config_file = Everest::assert_file(get_variable_from_env("EV_LOG_CONF_FILE"), "Default logging config"); - Everest::Logging::init(logging_config_file.string(), module_id); this->mqtt_settings = get_mqtt_settings_from_env(); } @@ -82,8 +89,8 @@ RuntimeSession::RuntimeSession() { ModuleSetup create_setup_from_config(const std::string& module_id, Everest::Config& config) { ModuleSetup setup; - const std::string& module_name = config.get_main_config().at(module_id).at("module"); - const auto module_manifest = config.get_manifests().at(module_name); + const std::string& module_name = config.get_module_name(module_id); + const auto& module_manifest = config.get_manifests().at(module_name); // setup connections for (const auto& requirement : module_manifest.at("requires").items()) { @@ -107,7 +114,7 @@ ModuleSetup create_setup_from_config(const std::string& module_id, Everest::Conf const auto& req_route = req_route_list[i]; const auto fulfillment = Fulfillment{req_route["module_id"], req_route["implementation_id"], {requirement_id, i}}; - fulfillment_list.emplace_back(std::move(fulfillment)); + fulfillment_list.emplace_back(fulfillment); } } diff --git a/everestpy/src/everest/misc.hpp b/everestpy/src/everest/misc.hpp index dd817b2c..a49073d7 100644 --- a/everestpy/src/everest/misc.hpp +++ b/everestpy/src/everest/misc.hpp @@ -14,16 +14,27 @@ const std::string get_variable_from_env(const std::string& variable, const std:: class RuntimeSession { public: - RuntimeSession(const std::string& prefix, const std::string& config_file); + /// \brief Allows python modules to directly pass \p mqtt_settings as well as a \p logging_config + RuntimeSession(const Everest::MQTTSettings& mqtt_settings, const std::string& logging_config); + [[deprecated("Consider switching to the newer RuntimeSession() or RuntimeSession(mqtt_settings, logging_config) " + "ctors that receive module configuration via MQTT")]] RuntimeSession(const std::string& prefix, + const std::string& config_file); + + /// \brief Get settings and configuration via MQTT based on certain environment variables RuntimeSession(); const Everest::MQTTSettings& get_mqtt_settings() const { return mqtt_settings; } + const std::filesystem::path& get_logging_config_file() const { + return logging_config_file; + } + private: Everest::MQTTSettings mqtt_settings; + std::filesystem::path logging_config_file; }; struct Interface { diff --git a/everestpy/src/everest/module.cpp b/everestpy/src/everest/module.cpp index 82760241..3e0f3582 100644 --- a/everestpy/src/everest/module.cpp +++ b/everestpy/src/everest/module.cpp @@ -12,7 +12,7 @@ std::unique_ptr Module::create_everest_instance(const std::string& module_id, const Everest::Config& config, const Everest::RuntimeSettings& rs, - std::shared_ptr mqtt_abstraction) { + const std::shared_ptr& mqtt_abstraction) { return std::make_unique(module_id, config, rs.validate_schema, mqtt_abstraction, rs.telemetry_prefix, rs.telemetry_enabled); } @@ -21,7 +21,9 @@ Module::Module(const RuntimeSession& session) : Module(get_variable_from_env("EV } Module::Module(const std::string& module_id_, const RuntimeSession& session_) : - module_id(module_id_), session(session_) { + module_id(module_id_), session(session_), start_time(std::chrono::system_clock::now()) { + + Everest::Logging::init(session.get_logging_config_file().string(), module_id); this->mqtt_abstraction = std::make_shared(session.get_mqtt_settings()); this->mqtt_abstraction->connect(); diff --git a/everestpy/src/everest/module.hpp b/everestpy/src/everest/module.hpp index ffd801a7..75152474 100644 --- a/everestpy/src/everest/module.hpp +++ b/everestpy/src/everest/module.hpp @@ -3,6 +3,7 @@ #ifndef EVERESTPY_MODULE_HPP #define EVERESTPY_MODULE_HPP +#include #include #include #include @@ -21,13 +22,18 @@ class Module { ModuleSetup say_hello(); - void init_done(std::function on_ready_handler) { + void init_done(const std::function& on_ready_handler) { this->handle->check_code(); if (on_ready_handler) { - handle->register_on_ready_handler(std::move(on_ready_handler)); + handle->register_on_ready_handler(on_ready_handler); } + const auto end_time = std::chrono::system_clock::now(); + EVLOG_info << "Module " << fmt::format(Everest::TERMINAL_STYLE_BLUE, "{}", this->module_id) << " initialized [" + << std::chrono::duration_cast(end_time - this->start_time).count() + << "ms]"; + handle->signal_ready(); } @@ -78,6 +84,7 @@ class Module { private: const std::string module_id; const RuntimeSession& session; + const std::chrono::time_point start_time; std::unique_ptr rs; std::shared_ptr mqtt_abstraction; std::unique_ptr config_; @@ -94,7 +101,7 @@ class Module { static std::unique_ptr create_everest_instance(const std::string& module_id, const Everest::Config& config, const Everest::RuntimeSettings& rs, - std::shared_ptr mqtt_abstraction); + const std::shared_ptr& mqtt_abstraction); ModuleInfo module_info{}; std::map requirements; diff --git a/include/framework/everest.hpp b/include/framework/everest.hpp index e093125e..38feb466 100644 --- a/include/framework/everest.hpp +++ b/include/framework/everest.hpp @@ -49,7 +49,7 @@ struct ErrorFactory; class Everest { public: Everest(std::string module_id, const Config& config, bool validate_data_with_schema, - std::shared_ptr mqtt_abstraction, const std::string& telemetry_prefix, + const std::shared_ptr& mqtt_abstraction, const std::string& telemetry_prefix, bool telemetry_enabled); // forbid copy assignment and copy construction @@ -66,7 +66,7 @@ class Everest { /// /// \brief Allows a module to indicate that it provides the given command \p cmd /// - void provide_cmd(const std::string impl_id, const std::string cmd_name, const JsonCommand handler); + void provide_cmd(const std::string& impl_id, const std::string cmd_name, const JsonCommand& handler); void provide_cmd(const cmd& cmd); /// @@ -217,7 +217,7 @@ class Everest { bool telemetry_enabled; std::optional module_tier_mappings; - void handle_ready(nlohmann::json data); + void handle_ready(const nlohmann::json& data); void heartbeat(); diff --git a/include/framework/runtime.hpp b/include/framework/runtime.hpp index 3b7c4df0..5488328b 100644 --- a/include/framework/runtime.hpp +++ b/include/framework/runtime.hpp @@ -192,9 +192,10 @@ class ModuleLoader { public: explicit ModuleLoader(int argc, char* argv[], ModuleCallbacks callbacks) : - ModuleLoader(argc, argv, callbacks, {"undefined project", "undefined version", "undefined git version"}){}; + ModuleLoader(argc, argv, std::move(callbacks), + {"undefined project", "undefined version", "undefined git version"}){}; explicit ModuleLoader(int argc, char* argv[], ModuleCallbacks callbacks, - const VersionInformation version_information); + const VersionInformation& version_information); int initialize(); }; diff --git a/include/utils/config.hpp b/include/utils/config.hpp index 0b9cf611..2f3de54d 100644 --- a/include/utils/config.hpp +++ b/include/utils/config.hpp @@ -30,7 +30,7 @@ struct RuntimeSettings; /// /// \brief A structure that contains all available schemas /// -struct schemas { +struct Schemas { nlohmann::json config; ///< The config schema nlohmann::json manifest; ///< The manifest scheme nlohmann::json interface; ///< The interface schema @@ -38,6 +38,19 @@ struct schemas { nlohmann::json error_declaration_list; ///< The error-declaration-list schema }; +struct Validators { + nlohmann::json_schema::json_validator config; + nlohmann::json_schema::json_validator manifest; + nlohmann::json_schema::json_validator type; + nlohmann::json_schema::json_validator interface; + nlohmann::json_schema::json_validator error_declaration_list; +}; + +struct SchemaValidation { + Schemas schemas; + Validators validators; +}; + /// /// \brief Allowed format of a type URI, which are of a format like this /type_file_name#/TypeName /// @@ -50,6 +63,30 @@ struct ImplementationInfo { std::string impl_intf; }; +/// +/// \brief A simple json schema loader that uses the builtin draft7 schema of +/// the json schema validator when it encounters it, throws an exception +/// otherwise +void loader(const nlohmann::json_uri& uri, nlohmann::json& schema); + +/// +/// \brief An extension to the default format checker of the json schema +/// validator supporting uris +void format_checker(const std::string& format, const std::string& value); + +/// +/// \brief loads and validates a json schema at the provided \p path +/// +/// \returns the loaded json schema as a json object as well as a related schema validator +std::tuple load_schema(const fs::path& path); + +/// +/// \brief loads the config.json and manifest.json in the schemes subfolder of +/// the provided \p schemas_dir +/// +/// \returns the loaded configs and related validators +SchemaValidation load_schemas(const fs::path& schemas_dir); + /// /// \brief Base class for configs /// @@ -62,7 +99,7 @@ class ConfigBase { nlohmann::json interfaces; nlohmann::json interface_definitions; nlohmann::json types; - schemas _schemas; + Schemas schemas; std::unordered_map tier_mappings; // experimental caches @@ -190,6 +227,8 @@ class ManagerConfig : public ConfigBase { private: const ManagerSettings& ms; std::unordered_map> telemetry_configs; + Validators validators; + std::unique_ptr draft7_validator; /// /// \brief loads and validates the manifest of the module \p module_id using the provided \p module config @@ -259,10 +298,6 @@ class ManagerConfig : public ConfigBase { /// \brief Create a ManagerConfig from the provided ManagerSettings \p ms explicit ManagerConfig(const ManagerSettings& ms); - /// - /// \brief Serialize the config to json - nlohmann::json serialize(); - /// /// \returns a TelemetryConfig if this has been configured for the given \p module_id std::optional get_telemetry_config(const std::string& module_id); @@ -291,7 +326,7 @@ class Config : public ConfigBase { /// /// \returns the commands that the modules \p module_name implements from the given implementation \p impl_id - nlohmann::json get_module_cmds(const std::string& module_name, const std::string& impl_id); + const nlohmann::json& get_module_cmds(const std::string& module_name, const std::string& impl_id); /// /// \brief A RequirementInitialization contains everything needed to initialize a requirement in user code. This @@ -305,7 +340,7 @@ class Config : public ConfigBase { /// /// \returns a json object that contains the module config options - nlohmann::json get_module_json_config(const std::string& module_id); + const nlohmann::json& get_module_json_config(const std::string& module_id); /// /// \brief assemble basic information about the module (id, name, @@ -328,19 +363,6 @@ class Config : public ConfigBase { /// otherwise void ref_loader(const nlohmann::json_uri& uri, nlohmann::json& schema); - /// - /// \brief loads the config.json and manifest.json in the schemes subfolder of - /// the provided \p schemas_dir - /// - /// \returns the config and manifest schemas - static schemas load_schemas(const fs::path& schemas_dir); - - /// - /// \brief loads and validates a json schema at the provided \p path - /// - /// \returns the loaded json schema as a json object - static nlohmann::json load_schema(const fs::path& path); - /// /// \brief loads all module manifests relative to the \p main_dir /// @@ -352,25 +374,14 @@ class Config : public ConfigBase { /// /// \returns a set of object keys static std::set keys(const nlohmann::json& object); - - /// - /// \brief A simple json schema loader that uses the builtin draft7 schema of - /// the json schema validator when it encounters it, throws an exception - /// otherwise - static void loader(const nlohmann::json_uri& uri, nlohmann::json& schema); - - /// - /// \brief An extension to the default format checker of the json schema - /// validator supporting uris - static void format_checker(const std::string& format, const std::string& value); }; } // namespace Everest NLOHMANN_JSON_NAMESPACE_BEGIN -template <> struct adl_serializer { - static void to_json(nlohmann::json& j, const Everest::schemas& s); +template <> struct adl_serializer { + static void to_json(nlohmann::json& j, const Everest::Schemas& s); - static void from_json(const nlohmann::json& j, Everest::schemas& s); + static void from_json(const nlohmann::json& j, Everest::Schemas& s); }; NLOHMANN_JSON_NAMESPACE_END diff --git a/include/utils/message_queue.hpp b/include/utils/message_queue.hpp index 58168764..44153d28 100644 --- a/include/utils/message_queue.hpp +++ b/include/utils/message_queue.hpp @@ -79,10 +79,10 @@ class MessageHandler { void stop(); /// \brief Adds a \p handler that will receive messages from the queue. - void add_handler(std::shared_ptr handler); + void add_handler(const std::shared_ptr& handler); /// \brief Removes a specific \p handler - void remove_handler(std::shared_ptr handler); + void remove_handler(const std::shared_ptr& handler); /// \brief \returns the number of registered handlers std::size_t count_handlers(); diff --git a/include/utils/module_config.hpp b/include/utils/module_config.hpp index 6808d2bd..a9c7ffa1 100644 --- a/include/utils/module_config.hpp +++ b/include/utils/module_config.hpp @@ -9,7 +9,7 @@ namespace Everest { /// \brief get config from manager via mqtt -nlohmann::json get_module_config(std::shared_ptr mqtt, const std::string& module_id); +nlohmann::json get_module_config(const std::shared_ptr& mqtt, const std::string& module_id); } // namespace Everest #endif // UTILS_MODULE_CONFIG_HPP diff --git a/include/utils/mqtt_abstraction.hpp b/include/utils/mqtt_abstraction.hpp index 5bff00d8..403e2dd8 100644 --- a/include/utils/mqtt_abstraction.hpp +++ b/include/utils/mqtt_abstraction.hpp @@ -65,8 +65,8 @@ class MQTTAbstraction { void unsubscribe(const std::string& topic); /// - /// \copydoc MQTTAbstractionImpl::get(const std::string&, QOS) - nlohmann::json get(const std::string& topic, QOS qos); + /// \copydoc MQTTAbstractionImpl::clear_retained_topics() + void clear_retained_topics(); /// /// \brief Get MQTT topic prefix for the "everest" topic @@ -85,8 +85,8 @@ class MQTTAbstraction { std::shared_future get_main_loop_future(); /// - /// \copydoc MQTTAbstractionImpl::register_handler(const std::string&, std::shared_ptr, QOS) - void register_handler(const std::string& topic, std::shared_ptr handler, QOS qos); + /// \copydoc MQTTAbstractionImpl::register_handler(const std::string&, const std::shared_ptr&, QOS) + void register_handler(const std::string& topic, const std::shared_ptr& handler, QOS qos); /// /// \copydoc MQTTAbstractionImpl::unregister_handler(const std::string&, const Token&) diff --git a/include/utils/mqtt_abstraction_impl.hpp b/include/utils/mqtt_abstraction_impl.hpp index b1e47321..756926af 100644 --- a/include/utils/mqtt_abstraction_impl.hpp +++ b/include/utils/mqtt_abstraction_impl.hpp @@ -20,7 +20,7 @@ #include -constexpr std::size_t MQTT_BUF_SIZE = 500 * 1024; +constexpr std::size_t MQTT_BUF_SIZE = std::size_t{500} * std::size_t{1024}; namespace Everest { /// \brief Contains a payload and the topic it was received on with additional QOS @@ -81,8 +81,8 @@ class MQTTAbstractionImpl { void unsubscribe(const std::string& topic); /// - /// \brief subscribe and wait for value on the subscribed topic - nlohmann::json get(const std::string& topic, QOS qos); + /// \brief clears any previously published topics that had the retain flag set + void clear_retained_topics(); /// /// \brief Spawn a thread running the mqtt main loop @@ -96,7 +96,7 @@ class MQTTAbstractionImpl { /// /// \brief subscribes to the given \p topic and registers a callback \p handler that is called when a message /// arrives on the topic. With \p qos a MQTT Quality of Service level can be set. - void register_handler(const std::string& topic, std::shared_ptr handler, QOS qos); + void register_handler(const std::string& topic, const std::shared_ptr& handler, QOS qos); /// /// \brief unsubscribes a handler identified by its \p token from the given \p topic @@ -121,6 +121,8 @@ class MQTTAbstractionImpl { MessageQueue message_queue; std::vector> messages_before_connected; std::mutex messages_before_connected_mutex; + std::mutex retained_topics_mutex; + std::vector retained_topics; Thread mqtt_mainloop_thread; std::shared_future main_loop_future; diff --git a/include/utils/types.hpp b/include/utils/types.hpp index 80115c53..5e35eb5c 100644 --- a/include/utils/types.hpp +++ b/include/utils/types.hpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include diff --git a/lib/config.cpp b/lib/config.cpp index 7a097dcf..f4383a00 100644 --- a/lib/config.cpp +++ b/lib/config.cpp @@ -48,9 +48,87 @@ struct ParsedConfigMap { std::set unknown_config_entries; }; +void loader(const json_uri& uri, json& schema) { + BOOST_LOG_FUNCTION(); + + if (uri.location() == "http://json-schema.org/draft-07/schema") { + schema = nlohmann::json_schema::draft7_schema_builtin; + return; + } + + // TODO(kai): think about supporting more urls here + EVTHROW(EverestInternalError(fmt::format("{} is not supported for schema loading at the moment\n", uri.url()))); +} + +void format_checker(const std::string& format, const std::string& value) { + BOOST_LOG_FUNCTION(); + + if (format == "uri") { + if (value.find("://") == std::string::npos) { + EVTHROW(std::invalid_argument("URI does not contain :// - invalid")); + } + } else if (format == "uri-reference") { + if (!std::regex_match(value, type_uri_regex)) { + EVTHROW(std::invalid_argument("Type URI is malformed.")); + } + } else { + nlohmann::json_schema::default_string_format_check(format, value); + } +} + +std::tuple load_schema(const fs::path& path) { + BOOST_LOG_FUNCTION(); + + if (!fs::exists(path)) { + EVLOG_AND_THROW( + EverestInternalError(fmt::format("Schema file does not exist at: {}", fs::absolute(path).string()))); + } + + EVLOG_debug << fmt::format("Loading schema file at: {}", fs::canonical(path).string()); + + json schema = load_yaml(path); + + auto validator = nlohmann::json_schema::json_validator(loader, format_checker); + + try { + validator.set_root_schema(schema); + } catch (const std::exception& e) { + EVLOG_AND_THROW(EverestInternalError( + fmt::format("Validation of schema '{}' failed, here is why: {}", path.string(), e.what()))); + } + + return std::make_tuple(std::move(schema), + std::move(validator)); +} + +SchemaValidation load_schemas(const fs::path& schemas_dir) { + BOOST_LOG_FUNCTION(); + SchemaValidation schema_validation; + + EVLOG_debug << fmt::format("Loading base schema files for config and manifests... from: {}", schemas_dir.string()); + auto [config_schema, config_val] = load_schema(schemas_dir / "config.yaml"); + schema_validation.schemas.config = config_schema; + schema_validation.validators.config = std::move(config_val); + auto [manifest_schema, manifest_val] = load_schema(schemas_dir / "manifest.yaml"); + schema_validation.schemas.manifest = manifest_schema; + schema_validation.validators.manifest = std::move(manifest_val); + auto [interface_schema, interface_val] = load_schema(schemas_dir / "interface.yaml"); + schema_validation.schemas.interface = interface_schema; + schema_validation.validators.interface = std::move(interface_val); + auto [type_schema, type_val] = load_schema(schemas_dir / "type.yaml"); + schema_validation.schemas.type = type_schema; + schema_validation.validators.type = std::move(type_val); + auto [error_declaration_list_schema, error_declaration_list_val] = + load_schema(schemas_dir / "error-declaration-list.yaml"); + schema_validation.schemas.error_declaration_list = error_declaration_list_schema; + schema_validation.validators.error_declaration_list = std::move(error_declaration_list_val); + + return schema_validation; +} + static void validate_config_schema(const json& config_map_schema) { // iterate over every config entry - json_validator validator(Config::loader, Config::format_checker); + json_validator validator(loader, format_checker); for (const auto& config_item : config_map_schema.items()) { if (!config_item.value().contains("default")) { continue; @@ -78,8 +156,8 @@ static ParsedConfigMap parse_config_map(const json& config_map_schema, const jso // validate each config entry for (const auto& config_entry_el : config_map_schema.items()) { - const std::string config_entry_name = config_entry_el.key(); - const json config_entry = config_entry_el.value(); + const std::string& config_entry_name = config_entry_el.key(); + const json& config_entry = config_entry_el.value(); // only convenience exception, would be catched by schema validation below if not thrown here if (!config_entry.contains("default") and !config_map.contains(config_entry_name)) { @@ -92,7 +170,7 @@ static ParsedConfigMap parse_config_map(const json& config_map_schema, const jso } else if (config_entry.contains("default")) { config_entry_value = config_entry.at("default"); // use default value defined in manifest } - json_validator validator(Config::loader, Config::format_checker); + json_validator validator(loader, format_checker); validator.set_root_schema(config_entry); try { auto patch = validator.validate(config_entry_value); @@ -104,7 +182,8 @@ static ParsedConfigMap parse_config_map(const json& config_map_schema, const jso throw ConfigParseException(ConfigParseException::SCHEMA, config_entry_name, err.what()); } - parsed_config_map[config_entry_name] = config_entry_value; + parsed_config_map[config_entry_name] = + json::object({{"value", config_entry_value}, {"type", config_entry.at("type")}}); } return {parsed_config_map, unknown_config_entries}; @@ -125,7 +204,7 @@ static auto get_provides_for_probe_module(const std::string& probe_module_id, co const auto& connections = module_config.value("connections", json::object()); for (const auto& connection : connections.items()) { - const std::string req_id = connection.key(); + const std::string& req_id = connection.key(); const std::string module_name = module_config.at("module"); const auto& module_manifest = manifests.at(module_name); @@ -161,7 +240,7 @@ static auto get_provides_for_probe_module(const std::string& probe_module_id, co static auto get_requirements_for_probe_module(const std::string& probe_module_id, const json& config, const json& manifests) { - const auto probe_module_config = config.at(probe_module_id); + const auto& probe_module_config = config.at(probe_module_id); auto requirements = json::object(); @@ -181,7 +260,7 @@ static auto get_requirements_for_probe_module(const std::string& probe_module_id if (module_config_it == config.end()) { EVLOG_AND_THROW( - EverestConfigError("ProbeModule refers to a non-existent module id '" + module_id + "'")); + EverestConfigError(fmt::format("ProbeModule refers to a non-existent module id '{}'", module_id))); } const auto& module_manifest = manifests.at(module_config_it->at("module")); @@ -189,14 +268,15 @@ static auto get_requirements_for_probe_module(const std::string& probe_module_id const auto& module_provides_it = module_manifest.find("provides"); if (module_provides_it == module_manifest.end()) { - EVLOG_AND_THROW(EverestConfigError("ProbeModule requires something from module id' " + module_id + - "', but it does not provide anything")); + EVLOG_AND_THROW(EverestConfigError(fmt::format( + "ProbeModule requires something from module id '{}' but it does not provide anything", module_id))); } const auto& provide_it = module_provides_it->find(impl_id); if (provide_it == module_provides_it->end()) { - EVLOG_AND_THROW(EverestConfigError("ProbeModule requires something from module id '" + module_id + - "', but it does not provide '" + impl_id + "'")); + EVLOG_AND_THROW(EverestConfigError( + fmt::format("ProbeModule requires something from module id '{}', but it does not provide '{}'", + module_id, impl_id))); } const std::string interface = provide_it->at("interface"); @@ -269,7 +349,7 @@ std::string create_printable_identifier(const ImplementationInfo& info, const st BOOST_LOG_FUNCTION(); // no implementation id yet so only return this kind of string: - const auto module_string = fmt::format("{}:{}", info.module_id, info.module_name); + auto module_string = fmt::format("{}:{}", info.module_id, info.module_name); if (impl_id.empty()) { return module_string; } @@ -339,7 +419,7 @@ const json& ConfigBase::get_settings() const { const json ConfigBase::get_schemas() const { BOOST_LOG_FUNCTION(); - return this->_schemas; + return this->schemas; } json ConfigBase::get_error_types() { @@ -483,9 +563,7 @@ void ManagerConfig::load_and_validate_manifest(const std::string& module_id, con this->manifests[module_name] = load_yaml(manifest_path); } - json_validator validator(Config::loader, Config::format_checker); - validator.set_root_schema(this->_schemas.manifest); - const auto patch = validator.validate(this->manifests[module_name]); + const auto patch = this->validators.manifest.validate(this->manifests[module_name]); if (!patch.is_null()) { // extend manifest with default values this->manifests[module_name] = this->manifests[module_name].patch(patch); @@ -579,7 +657,7 @@ void ManagerConfig::load_and_validate_manifest(const std::string& module_id, con // validate config for !module { - const json config_map = module_config.at("config_module"); + const json& config_map = module_config.at("config_module"); const json config_map_schema = this->manifests[module_config.at("module").get()]["config"]; try { @@ -613,7 +691,7 @@ std::tuple ManagerConfig::load_and_validate_with_schema(const fs: int64_t validation_ms = 0; const auto start_time_validate = std::chrono::system_clock::now(); - json_validator validator(Config::loader, Config::format_checker); + json_validator validator(loader, format_checker); validator.set_root_schema(schema); validator.validate(json_to_validate); const auto end_time_validate = std::chrono::system_clock::now(); @@ -647,9 +725,7 @@ json ManagerConfig::load_interface_file(const std::string& intf_name) { // this subschema can not use allOf with the draft-07 schema because that will cause our validator to // add all draft-07 default values which never validate (the {"not": true} default contradicts everything) // --> validating against draft-07 will be done in an extra step below - json_validator validator(Config::loader, Config::format_checker); - validator.set_root_schema(this->_schemas.interface); - auto patch = validator.validate(interface_json); + auto patch = this->validators.interface.validate(interface_json); if (!patch.is_null()) { // extend config entry with default values interface_json = interface_json.patch(patch); @@ -662,7 +738,6 @@ json ManagerConfig::load_interface_file(const std::string& intf_name) { } // validate every cmd arg/result and var definition against draft-07 schema - validator.set_root_schema(draft07); for (auto& var_entry : interface_json["vars"].items()) { auto& var_value = var_entry.value(); // erase "description" @@ -683,7 +758,7 @@ json ManagerConfig::load_interface_file(const std::string& intf_name) { } } } - validator.validate(var_value); + this->draft7_validator->validate(var_value); } for (auto& cmd_entry : interface_json["cmds"].items()) { auto& cmd = interface_json["cmds"][cmd_entry.key()]; @@ -697,14 +772,14 @@ json ManagerConfig::load_interface_file(const std::string& intf_name) { if (arg_entry.contains("description")) { arg_entry.erase("description"); } - validator.validate(arg_entry); + this->draft7_validator->validate(arg_entry); } auto& result = interface_json["cmds"][cmd_entry.key()]["result"]; // erase "description" if (result.contains("description")) { result.erase("description"); } - validator.validate(result); + this->draft7_validator->validate(result); } return interface_json; @@ -887,7 +962,7 @@ void ManagerConfig::resolve_all_requirements() { } void ManagerConfig::parse(json config) { - this->main = config; + this->main = std::move(config); // load type files if (this->ms.runtime_settings->validate_schema) { int64_t total_time_validation_ms = 0, total_time_parsing_ms = 0; @@ -903,7 +978,7 @@ void ManagerConfig::parse(json config) { EVLOG_verbose << fmt::format("Loading type file at: {}", fs::canonical(type_file_path).c_str()); const auto [type_json, validate_ms] = - load_and_validate_with_schema(type_file_path, this->_schemas.type); + load_and_validate_with_schema(type_file_path, this->schemas.type); total_time_validation_ms += validate_ms; this->types[type_path] = type_json.at("types"); @@ -934,7 +1009,7 @@ void ManagerConfig::parse(json config) { EVLOG_verbose << fmt::format("Loading error file at: {}", fs::canonical(error_file_path).c_str()); const auto [error_json, validate_ms] = - load_and_validate_with_schema(error_file_path, this->_schemas.error_declaration_list); + load_and_validate_with_schema(error_file_path, this->schemas.error_declaration_list); total_time_validation_ms += validate_ms; } catch (const std::exception& e) { @@ -1048,8 +1123,12 @@ ManagerConfig::ManagerConfig(const ManagerSettings& ms) : ConfigBase(ms.mqtt_set this->interfaces = json({}); this->interface_definitions = json({}); this->types = json({}); - this->_schemas = Config::load_schemas(this->ms.schemas_dir); + auto schema_validation = load_schemas(this->ms.schemas_dir); + this->schemas = schema_validation.schemas; + this->validators = std::move(schema_validation.validators); this->error_map = error::ErrorTypeMap(this->ms.errors_dir); + this->draft7_validator = std::make_unique(loader, format_checker); + this->draft7_validator->set_root_schema(draft07); // load and process config file const fs::path config_path = this->ms.config_file; @@ -1071,26 +1150,19 @@ ManagerConfig::ManagerConfig(const ManagerSettings& ms) : ConfigBase(ms.mqtt_set EVLOG_verbose << "No user-config provided."; } - json_validator validator(Config::loader, Config::format_checker); - validator.set_root_schema(this->_schemas.config); - const auto patch = validator.validate(complete_config); + const auto patch = this->validators.config.validate(complete_config); if (!patch.is_null()) { // extend config with default values complete_config = complete_config.patch(patch); } - const auto config = complete_config.at("active_modules"); this->settings = this->ms.get_runtime_settings(); - this->parse(config); + this->parse(complete_config.at("active_modules")); } catch (const std::exception& e) { EVLOG_AND_THROW(EverestConfigError(fmt::format("Failed to load and parse config file: {}", e.what()))); } } -json ManagerConfig::serialize() { - return json::object({{"main", this->main}, {"module_names", this->module_names}}); -} - std::optional ManagerConfig::get_telemetry_config(const std::string& module_id) { BOOST_LOG_FUNCTION(); @@ -1118,7 +1190,7 @@ Config::Config(const MQTTSettings& mqtt_settings, json serialized_config) : Conf this->telemetry_config = serialized_config.at("telemetry_config"); } - this->_schemas = serialized_config.at("schemas"); + this->schemas = serialized_config.at("schemas"); this->error_map = error::ErrorTypeMap(); this->error_map.load_error_types_map(serialized_config.at("error_map")); } @@ -1132,7 +1204,7 @@ bool Config::module_provides(const std::string& module_name, const std::string& return (provides.find(impl_id) != provides.end()); } -json Config::get_module_cmds(const std::string& module_name, const std::string& impl_id) { +const json& Config::get_module_cmds(const std::string& module_name, const std::string& impl_id) { return this->module_config_cache.at(module_name).cmds.at(impl_id); } @@ -1160,14 +1232,12 @@ ModuleConfigs Config::get_module_configs(const std::string& module_id) const { const json manifest = this->manifests.at(module_type); for (const auto& conf_map : config_maps.items()) { - const json config_schema = (conf_map.key() == "!module") - ? manifest.at("config") - : manifest.at("provides").at(conf_map.key()).at("config"); ConfigMap processed_conf_map; for (const auto& entry : conf_map.value().items()) { - const json entry_type = config_schema.at(entry.key()).at("type"); + const auto& entry_value = entry.value(); + const json& entry_type = entry_value.at("type"); ConfigEntry value; - const json data = entry.value(); + const json& data = entry_value.at("value"); if (data.is_string()) { value = data.get(); @@ -1197,9 +1267,9 @@ ModuleConfigs Config::get_module_configs(const std::string& module_id) const { } // FIXME (aw): check if module_id does not exist -json Config::get_module_json_config(const std::string& module_id) { +const json& Config::get_module_json_config(const std::string& module_id) { BOOST_LOG_FUNCTION(); - return this->main[module_id]["config_maps"]; + return this->main.at(module_id).at("config_maps"); } ModuleInfo Config::get_module_info(const std::string& module_id) const { @@ -1239,7 +1309,7 @@ void Config::ref_loader(const json_uri& uri, json& schema) { schema = nlohmann::json_schema::draft7_schema_builtin; return; } else { - const auto path = uri.path(); + const auto& path = uri.path(); if (this->types.contains(path)) { schema = this->types[path]; EVLOG_verbose << fmt::format("ref path \"{}\" schema has been found.", path); @@ -1253,50 +1323,12 @@ void Config::ref_loader(const json_uri& uri, json& schema) { EVTHROW(EverestInternalError(fmt::format("{} is not supported for schema loading at the moment\n", uri.url()))); } -schemas Config::load_schemas(const fs::path& schemas_dir) { - BOOST_LOG_FUNCTION(); - schemas schemas; - - EVLOG_debug << fmt::format("Loading base schema files for config and manifests... from: {}", schemas_dir.string()); - schemas.config = Config::load_schema(schemas_dir / "config.yaml"); - schemas.manifest = Config::load_schema(schemas_dir / "manifest.yaml"); - schemas.interface = Config::load_schema(schemas_dir / "interface.yaml"); - schemas.type = Config::load_schema(schemas_dir / "type.yaml"); - schemas.error_declaration_list = Config::load_schema(schemas_dir / "error-declaration-list.yaml"); - - return schemas; -} - -json Config::load_schema(const fs::path& path) { - BOOST_LOG_FUNCTION(); - - if (!fs::exists(path)) { - EVLOG_AND_THROW( - EverestInternalError(fmt::format("Schema file does not exist at: {}", fs::absolute(path).string()))); - } - - EVLOG_debug << fmt::format("Loading schema file at: {}", fs::canonical(path).string()); - - const json schema = load_yaml(path); - - json_validator validator(Config::loader, Config::format_checker); - - try { - validator.set_root_schema(schema); - } catch (const std::exception& e) { - EVLOG_AND_THROW(EverestInternalError( - fmt::format("Validation of schema '{}' failed, here is why: {}", path.string(), e.what()))); - } - - return schema; -} - json Config::load_all_manifests(const std::string& modules_dir, const std::string& schemas_dir) { BOOST_LOG_FUNCTION(); json manifests = json({}); - const schemas schemas = Config::load_schemas(schemas_dir); + auto schema_validation = load_schemas(schemas_dir); const fs::path modules_path = fs::path(modules_dir); @@ -1312,9 +1344,7 @@ json Config::load_all_manifests(const std::string& modules_dir, const std::strin try { manifests[module_name] = load_yaml(manifest_path); - json_validator validator(Config::loader, Config::format_checker); - validator.set_root_schema(schemas.manifest); - validator.validate(manifests.at(module_name)); + schema_validation.validators.manifest.validate(manifests.at(module_name)); } catch (const std::exception& e) { EVLOG_AND_THROW(EverestConfigError( fmt::format("Failed to load and parse module manifest file of module {}: {}", module_name, e.what()))); @@ -1344,38 +1374,10 @@ std::set Config::keys(const json& object) { return keys; } -void Config::loader(const json_uri& uri, json& schema) { - BOOST_LOG_FUNCTION(); - - if (uri.location() == "http://json-schema.org/draft-07/schema") { - schema = nlohmann::json_schema::draft7_schema_builtin; - return; - } - - // TODO(kai): think about supporting more urls here - EVTHROW(EverestInternalError(fmt::format("{} is not supported for schema loading at the moment\n", uri.url()))); -} - -void Config::format_checker(const std::string& format, const std::string& value) { - BOOST_LOG_FUNCTION(); - - if (format == "uri") { - if (value.find("://") == std::string::npos) { - EVTHROW(std::invalid_argument("URI does not contain :// - invalid")); - } - } else if (format == "uri-reference") { - if (!std::regex_match(value, type_uri_regex)) { - EVTHROW(std::invalid_argument("Type URI is malformed.")); - } - } else { - nlohmann::json_schema::default_string_format_check(format, value); - } -} - } // namespace Everest NLOHMANN_JSON_NAMESPACE_BEGIN -void adl_serializer::to_json(nlohmann::json& j, const Everest::schemas& s) { +void adl_serializer::to_json(nlohmann::json& j, const Everest::Schemas& s) { j = {{"config", s.config}, {"manifest", s.manifest}, {"interface", s.interface}, @@ -1383,7 +1385,7 @@ void adl_serializer::to_json(nlohmann::json& j, const Everest: {"error_declaration_list", s.error_declaration_list}}; } -void adl_serializer::from_json(const nlohmann::json& j, Everest::schemas& s) { +void adl_serializer::from_json(const nlohmann::json& j, Everest::Schemas& s) { s.config = j.at("config"); s.manifest = j.at("manifest"); s.interface = j.at("interface"); diff --git a/lib/everest.cpp b/lib/everest.cpp index 09aa0940..c13f0429 100644 --- a/lib/everest.cpp +++ b/lib/everest.cpp @@ -39,7 +39,7 @@ const auto remote_cmd_res_timeout_seconds = 300; const std::array TELEMETRY_RESERVED_KEYS = {{"connector_id"}}; Everest::Everest(std::string module_id_, const Config& config_, bool validate_data_with_schema, - std::shared_ptr mqtt_abstraction, const std::string& telemetry_prefix, + const std::shared_ptr& mqtt_abstraction, const std::string& telemetry_prefix, bool telemetry_enabled) : mqtt_abstraction(mqtt_abstraction), config(config_), @@ -71,7 +71,8 @@ Everest::Everest(std::string module_id_, const Config& config_, bool validate_da // setup error_manager_req_global if enabled + error_database + error_state_monitor if (this->module_manifest.contains("enable_global_errors") && this->module_manifest.at("enable_global_errors").get()) { - std::shared_ptr global_error_database = std::make_shared(); + const std::shared_ptr& global_error_database = + std::make_shared(); const error::ErrorManagerReqGlobal::SubscribeGlobalAllErrorsFunc subscribe_global_all_errors_func = [this](const error::ErrorCallback& callback, const error::ErrorCallback& clear_callback) { this->subscribe_global_all_errors(callback, clear_callback); @@ -90,7 +91,7 @@ Everest::Everest(std::string module_id_, const Config& config_, bool validate_da // setup error_managers, error_state_monitors, error_factories and error_databases for all implementations for (const std::string& impl : Config::keys(this->module_manifest.at("provides"))) { // setup shared database - std::shared_ptr error_database = std::make_shared(); + const std::shared_ptr error_database = std::make_shared(); // setup error manager const std::string interface_name = this->module_manifest.at("provides").at(impl).at("interface"); @@ -188,7 +189,7 @@ Everest::Everest(std::string module_id_, const Config& config_, bool validate_da } // register handler for global ready signal - const auto handle_ready_wrapper = [this](const std::string&, json data) { this->handle_ready(data); }; + const auto handle_ready_wrapper = [this](const std::string&, const json& data) { this->handle_ready(data); }; const auto everest_ready = std::make_shared(HandlerType::ExternalMQTT, std::make_shared(handle_ready_wrapper)); this->mqtt_abstraction->register_handler(fmt::format("{}ready", mqtt_everest_prefix), everest_ready, QOS::QOS2); @@ -229,11 +230,11 @@ void Everest::heartbeat() { void Everest::publish_metadata() { BOOST_LOG_FUNCTION(); - const auto module_info = this->config.get_module_info(this->module_id); - const auto manifest = this->config.get_manifests().at(module_info.name); + const auto& module_name = this->config.get_module_name(this->module_id); + const auto& manifest = this->config.get_manifests().at(module_name); json metadata = json({}); - metadata["module"] = module_info.name; + metadata["module"] = module_name; if (manifest.contains("provides")) { metadata["provides"] = json({}); @@ -265,7 +266,7 @@ void Everest::check_code() { this->config.get_manifests()[this->config.get_main_config()[this->module_id]["module"].get()]; for (const auto& element : module_manifest.at("provides").items()) { const auto& impl_id = element.key(); - const auto impl_manifest = element.value(); + const auto& impl_manifest = element.value(); const auto interface_definition = this->config.get_interface_definition(impl_manifest.at("interface")); std::set cmds_not_registered; @@ -347,7 +348,7 @@ json Everest::call_cmd(const Requirement& req, const std::string& cmd_name, json try { json_validator validator( [this](const json_uri& uri, json& schema) { this->config.ref_loader(uri, schema); }, - Config::format_checker); + format_checker); validator.set_root_schema(cmd_definition.at("arguments").at(arg_name)); validator.validate(json_args.at(arg_name)); } catch (const std::exception& e) { @@ -437,8 +438,7 @@ void Everest::publish_var(const std::string& impl_id, const std::string& var_nam const auto var_definition = impl_intf.at("vars").at(var_name); try { json_validator validator( - [this](const json_uri& uri, json& schema) { this->config.ref_loader(uri, schema); }, - Config::format_checker); + [this](const json_uri& uri, json& schema) { this->config.ref_loader(uri, schema); }, format_checker); validator.set_root_schema(var_definition); validator.validate(value); } catch (const std::exception& e) { @@ -492,7 +492,7 @@ void Everest::subscribe_var(const Requirement& req, const std::string& var_name, try { json_validator validator( [this](const json_uri& uri, json& schema) { this->config.ref_loader(uri, schema); }, - Config::format_checker); + format_checker); validator.set_root_schema(requirement_manifest_vardef); validator.validate(data); } catch (const std::exception& e) { @@ -666,13 +666,13 @@ void Everest::subscribe_global_all_errors(const error::ErrorCallback& callback, for (const auto& [module_id, module_name] : this->config.get_module_names()) { const json provides = this->config.get_manifests().at(module_name).at("provides"); for (const auto& impl : provides.items()) { - const std::string impl_id = impl.key(); - const std::string interface = impl.value().at("interface"); + const std::string& impl_id = impl.key(); + const std::string& interface = impl.value().at("interface"); const json errors = this->config.get_interface_definition(interface).at("errors"); for (const auto& error_namespace_it : errors.items()) { - const std::string error_type_namespace = error_namespace_it.key(); + const std::string& error_type_namespace = error_namespace_it.key(); for (const auto& error_name_it : error_namespace_it.value().items()) { - const std::string error_type_name = error_name_it.key(); + const std::string& error_type_name = error_name_it.key(); const std::string raise_topic = fmt::format("{}/error/{}/{}", this->config.mqtt_prefix(module_id, impl_id), error_type_namespace, error_type_name); @@ -784,7 +784,7 @@ void Everest::signal_ready() { /// \brief Ready handler for global readyness (e.g. all modules are ready now). /// This will called when receiving the global ready signal from manager. /// -void Everest::handle_ready(json data) { +void Everest::handle_ready(const json& data) { BOOST_LOG_FUNCTION(); EVLOG_debug << fmt::format("handle_ready: {}", data.dump()); @@ -818,7 +818,7 @@ void Everest::handle_ready(json data) { // this->heartbeat_thread = std::thread(&Everest::heartbeat, this); } -void Everest::provide_cmd(const std::string impl_id, const std::string cmd_name, const JsonCommand handler) { +void Everest::provide_cmd(const std::string& impl_id, const std::string cmd_name, const JsonCommand& handler) { BOOST_LOG_FUNCTION(); // extract manifest definition of this command @@ -857,7 +857,7 @@ void Everest::provide_cmd(const std::string impl_id, const std::string cmd_name, } json_validator validator( [this](const json_uri& uri, json& schema) { this->config.ref_loader(uri, schema); }, - Config::format_checker); + format_checker); validator.set_root_schema(cmd_definition.at("arguments").at(arg_name)); validator.validate(data.at("args").at(arg_name)); } @@ -883,7 +883,7 @@ void Everest::provide_cmd(const std::string impl_id, const std::string cmd_name, (!cmd_definition.contains("result") || cmd_definition.at("result").is_null()))) { json_validator validator( [this](const json_uri& uri, json& schema) { this->config.ref_loader(uri, schema); }, - Config::format_checker); + format_checker); validator.set_root_schema(cmd_definition.at("result")); validator.validate(res_data.at("retval")); } @@ -975,7 +975,7 @@ void Everest::provide_cmd(const cmd& cmd) { // call cmd handlers (handle async or normal handlers being both: // methods or functions) // FIXME (aw): this behaviour needs to be checked, i.e. how to distinguish in json between no value and null? - return handler(data).value_or(nullptr); + return handler(std::move(data)).value_or(nullptr); }); } @@ -983,8 +983,8 @@ json Everest::get_cmd_definition(const std::string& module_id, const std::string bool is_call) { BOOST_LOG_FUNCTION(); - const std::string module_name = this->config.get_module_name(module_id); - const auto cmds = this->config.get_module_cmds(module_name, impl_id); + const auto& module_name = this->config.get_module_name(module_id); + const auto& cmds = this->config.get_module_cmds(module_name, impl_id); if (!this->config.module_provides(module_name, impl_id)) { if (!is_call) { diff --git a/lib/message_queue.cpp b/lib/message_queue.cpp index f92ea7ea..d9903722 100644 --- a/lib/message_queue.cpp +++ b/lib/message_queue.cpp @@ -70,7 +70,7 @@ MessageHandler::MessageHandler() : running(true) { std::vector> local_handlers; { const std::lock_guard handlers_lock(handler_list_mutex); - for (auto handler : this->handlers) { + for (const auto& handler : this->handlers) { local_handlers.push_back(handler); } } @@ -129,14 +129,14 @@ void MessageHandler::stop() { this->cv.notify_all(); } -void MessageHandler::add_handler(std::shared_ptr handler) { +void MessageHandler::add_handler(const std::shared_ptr& handler) { { const std::lock_guard lock(this->handler_list_mutex); this->handlers.insert(handler); } } -void MessageHandler::remove_handler(std::shared_ptr handler) { +void MessageHandler::remove_handler(const std::shared_ptr& handler) { { const std::lock_guard lock(this->handler_list_mutex); auto it = std::find(this->handlers.begin(), this->handlers.end(), handler); diff --git a/lib/module_config.cpp b/lib/module_config.cpp index f92e80fe..878394f5 100644 --- a/lib/module_config.cpp +++ b/lib/module_config.cpp @@ -2,6 +2,8 @@ // Copyright Pionix GmbH and Contributors to EVerest #include +#include +#include #include @@ -12,31 +14,43 @@ #include namespace Everest { +struct AsyncReturn { + std::future future; + Token token; +}; + using json = nlohmann::json; inline constexpr int mqtt_get_config_timeout_ms = 5000; +using FutureCallback = std::tuple, std::string>; -json get_module_config(std::shared_ptr mqtt, const std::string& module_id) { - const auto& everest_prefix = mqtt->get_everest_prefix(); +AsyncReturn get_async(const std::shared_ptr& mqtt, const std::string& topic, QOS qos) { + auto res_promise = std::make_shared>(); + std::future res_future = res_promise->get_future(); - const auto get_config_topic = fmt::format("{}modules/{}/get_config", everest_prefix, module_id); - const auto config_topic = fmt::format("{}modules/{}/config", everest_prefix, module_id); + auto res_handler = [res_promise](const std::string& topic, json data) mutable { + res_promise->set_value(std::move(data)); + }; + + const auto res_token = + std::make_shared(HandlerType::GetConfig, std::make_shared(res_handler)); + mqtt->register_handler(topic, res_token, QOS::QOS2); + + return {std::move(res_future), res_token}; +} + +json get(const std::shared_ptr& mqtt, const std::string& topic, QOS qos) { + BOOST_LOG_FUNCTION(); std::promise res_promise; std::future res_future = res_promise.get_future(); - const auto res_handler = [module_id, &res_promise](const std::string& topic, json data) { - EVLOG_verbose << fmt::format("Incoming config for {}", module_id); - + const auto res_handler = [&res_promise](const std::string& topic, json data) { res_promise.set_value(std::move(data)); }; const std::shared_ptr res_token = std::make_shared(HandlerType::GetConfig, std::make_shared(res_handler)); - mqtt->register_handler(config_topic, res_token, QOS::QOS2); - - const json config_publish_data = json::object({{"type", "full"}}); - - mqtt->publish(get_config_topic, config_publish_data, QOS::QOS2); + mqtt->register_handler(topic, res_token, QOS::QOS2); // wait for result future const std::chrono::time_point res_wait = @@ -48,62 +62,174 @@ json get_module_config(std::shared_ptr mqtt, const std::string& json result; if (res_future_status == std::future_status::timeout) { - mqtt->unregister_handler(config_topic, res_token); - EVLOG_AND_THROW( - EverestTimeoutError(fmt::format("Timeout while waiting for result of get_config of {}", module_id))); + mqtt->unregister_handler(topic, res_token); + EVLOG_AND_THROW(EverestTimeoutError(fmt::format("Timeout while waiting for result of get({})", topic))); } if (res_future_status == std::future_status::ready) { result = res_future.get(); } - mqtt->unregister_handler(config_topic, res_token); + mqtt->unregister_handler(topic, res_token); - const auto interface_names_topic = fmt::format("{}interfaces", everest_prefix); - const auto interface_names = mqtt->get(interface_names_topic, QOS::QOS2); - auto interface_definitions = json::object(); - for (const auto& interface : interface_names) { - auto interface_topic = fmt::format("{}interface_definitions/{}", everest_prefix, interface.get()); - auto interface_definition = mqtt->get(interface_topic, QOS::QOS2); - interface_definitions[interface] = interface_definition; + return result; +} + +void populate_future_cbs(std::vector& future_cbs, const std::shared_ptr& mqtt, + const std::string& everest_prefix, const std::string& topic, json& out) { + future_cbs.push_back(std::make_tuple>( + get_async(mqtt, topic, QOS::QOS2), + [topic, &everest_prefix, &out](json result) { + out = std::move(result); + + return topic; + }, + topic)); +} + +json get_with_timeout(std::future future, const std::shared_ptr& mqtt, const std::string& topic, + const Token& token) { + // wait for result future + const std::chrono::time_point wait = + std::chrono::steady_clock::now() + std::chrono::milliseconds(mqtt_get_config_timeout_ms); + std::future_status future_status; + do { + future_status = future.wait_until(wait); + } while (future_status == std::future_status::deferred); + + json result; + if (future_status == std::future_status::timeout) { + mqtt->unregister_handler(topic, token); + EVLOG_AND_THROW(EverestTimeoutError(fmt::format("Timeout while waiting for result of get({})", topic))); + } + if (future_status == std::future_status::ready) { + return future.get(); } - result["interface_definitions"] = interface_definitions; + return result; +} + +void populate_future_cbs_arr(std::vector& future_cbs, const std::shared_ptr& mqtt, + const std::string& everest_prefix, const std::string& topic, + const std::string& inner_topic_part, json& array_out, json& out) { + future_cbs.push_back(std::make_tuple, std::string>( + get_async(mqtt, topic, QOS::QOS2), + [topic, &everest_prefix, &mqtt, &inner_topic_part, &array_out, &out](const json& result_array) { + array_out = result_array; + std::vector array_future_cbs; + std::set keys; + for (const auto& element : result_array) { + keys.insert(element.get()); + } + for (const auto& key : keys) { + auto key_topic = fmt::format("{}{}{}", everest_prefix, inner_topic_part, key); + array_future_cbs.push_back(std::make_tuple, std::string>( + get_async(mqtt, key_topic, QOS::QOS2), + [&key, key_topic, &out](const json& key_response) { + out[key] = key_response; + return key_topic; + }, + fmt::format("{}{}{}", everest_prefix, inner_topic_part, key))); + } + for (auto&& [retval, callback, tpc] : array_future_cbs) { + const auto inner_topic = callback(get_with_timeout(std::move(retval.future), mqtt, tpc, retval.token)); + mqtt->unregister_handler(inner_topic, retval.token); + } + return topic; + }, + fmt::format("{}", topic))); +} +json get_module_config(const std::shared_ptr& mqtt, const std::string& module_id) { + const auto start_time = std::chrono::system_clock::now(); + const auto& everest_prefix = mqtt->get_everest_prefix(); + + const auto get_config_topic = fmt::format("{}modules/{}/get_config", everest_prefix, module_id); + + json result; + + std::vector future_cbs; + + // config + auto config = json::object(); + const auto config_topic = fmt::format("{}modules/{}/config", everest_prefix, module_id); + populate_future_cbs(future_cbs, mqtt, everest_prefix, config_topic, config); + + const json config_publish_data = json::object({{"type", "full"}}); + mqtt->publish(get_config_topic, config_publish_data, QOS::QOS2); + + // interfaces + const auto interface_names_topic = fmt::format("{}interfaces", everest_prefix); + auto interface_names = json::object(); + auto interface_definitions = json::object(); + const std::string interface_inner_topic_part = "interface_definitions/"; + populate_future_cbs_arr(future_cbs, mqtt, everest_prefix, interface_names_topic, interface_inner_topic_part, + interface_names, interface_definitions); + + // manifests + const auto module_names_topic = fmt::format("{}module_names", everest_prefix); + auto module_names_out = json::object(); + auto manifests = json::object(); + const std::string manifests_inner_topic_part = "manifests/"; + populate_future_cbs_arr(future_cbs, mqtt, everest_prefix, module_names_topic, manifests_inner_topic_part, + module_names_out, manifests); + + // types const auto type_names_topic = fmt::format("{}types", everest_prefix); - const auto type_names = mqtt->get(type_names_topic, QOS::QOS2); + auto type_names = json::object(); auto type_definitions = json::object(); - for (const auto& type_name : type_names) { - // type_definition keys already start with a / so omit it in the topic name - auto type_topic = fmt::format("{}type_definitions{}", everest_prefix, type_name.get()); - auto type_definition = mqtt->get(type_topic, QOS::QOS2); - type_definitions[type_name] = type_definition; - } - result["types"] = type_definitions; + // type_definition keys already start with a / so omit it in the topic name + const std::string type_definitions_inner_topic_part = "type_definitions"; + populate_future_cbs_arr(future_cbs, mqtt, everest_prefix, type_names_topic, type_definitions_inner_topic_part, + type_names, type_definitions); + // module provides + auto module_provides = json::object(); const auto module_provides_topic = fmt::format("{}module_provides", everest_prefix); - const auto module_provides = mqtt->get(module_provides_topic, QOS::QOS2); - result["module_provides"] = module_provides; + populate_future_cbs(future_cbs, mqtt, everest_prefix, module_provides_topic, module_provides); + // settings + auto settings = json::object(); const auto settings_topic = fmt::format("{}settings", everest_prefix); - const auto settings = mqtt->get(settings_topic, QOS::QOS2); - result["settings"] = settings; + populate_future_cbs(future_cbs, mqtt, everest_prefix, settings_topic, settings); + // schemas + auto schemas = json::object(); const auto schemas_topic = fmt::format("{}schemas", everest_prefix); - const auto schemas = mqtt->get(schemas_topic, QOS::QOS2); - result["schemas"] = schemas; - - const auto manifests_topic = fmt::format("{}manifests", everest_prefix); - const auto manifests = mqtt->get(manifests_topic, QOS::QOS2); - result["manifests"] = manifests; + populate_future_cbs(future_cbs, mqtt, everest_prefix, schemas_topic, schemas); + // error_types_map + auto error_types_map = json::object(); const auto error_types_map_topic = fmt::format("{}error_types_map", everest_prefix); - const auto error_types_map = mqtt->get(error_types_map_topic, QOS::QOS2); - result["error_map"] = error_types_map; + populate_future_cbs(future_cbs, mqtt, everest_prefix, error_types_map_topic, error_types_map); + // module_config_cache + auto module_config_cache = json::object(); const auto module_config_cache_topic = fmt::format("{}module_config_cache", everest_prefix); - const auto module_config_cache = mqtt->get(module_config_cache_topic, QOS::QOS2); + populate_future_cbs(future_cbs, mqtt, everest_prefix, module_config_cache_topic, module_config_cache); + + for (auto&& [retval, callback, tpc] : future_cbs) { + auto topic = callback(get_with_timeout(std::move(retval.future), mqtt, tpc, retval.token)); + mqtt->unregister_handler(topic, retval.token); + } + + result["mappings"] = config.at("mappings"); + result["module_config"] = config.at("module_config"); + result["module_names"] = module_names_out; + result["interface_definitions"] = interface_definitions; + result["manifests"] = manifests; + result["types"] = type_definitions; + result["module_provides"] = module_provides; + result["settings"] = settings; + result["schemas"] = schemas; + result["error_map"] = error_types_map; result["module_config_cache"] = module_config_cache; + const auto end_time = std::chrono::system_clock::now(); + + EVLOG_debug << "get_module_config(" << module_id + << "): " << std::chrono::duration_cast(end_time - start_time).count() + << "ms"; + return result; } diff --git a/lib/mqtt_abstraction.cpp b/lib/mqtt_abstraction.cpp index 111e00db..09351390 100644 --- a/lib/mqtt_abstraction.cpp +++ b/lib/mqtt_abstraction.cpp @@ -71,9 +71,9 @@ void MQTTAbstraction::unsubscribe(const std::string& topic) { mqtt_abstraction->unsubscribe(topic); } -json MQTTAbstraction::get(const std::string& topic, QOS qos) { +void MQTTAbstraction::clear_retained_topics() { BOOST_LOG_FUNCTION(); - return mqtt_abstraction->get(topic, qos); + mqtt_abstraction->clear_retained_topics(); } const std::string& MQTTAbstraction::get_everest_prefix() const { @@ -96,7 +96,8 @@ std::shared_future MQTTAbstraction::get_main_loop_future() { return mqtt_abstraction->get_main_loop_future(); } -void MQTTAbstraction::register_handler(const std::string& topic, std::shared_ptr handler, QOS qos) { +void MQTTAbstraction::register_handler(const std::string& topic, const std::shared_ptr& handler, + QOS qos) { BOOST_LOG_FUNCTION(); mqtt_abstraction->register_handler(topic, handler, qos); } diff --git a/lib/mqtt_abstraction_impl.cpp b/lib/mqtt_abstraction_impl.cpp index a3b50553..6c4dc9d7 100644 --- a/lib/mqtt_abstraction_impl.cpp +++ b/lib/mqtt_abstraction_impl.cpp @@ -155,6 +155,13 @@ void MQTTAbstractionImpl::publish(const std::string& topic, const std::string& d if (retain) { publish_flags |= MQTT_PUBLISH_RETAIN; + if (not(data.empty() and qos == QOS::QOS0)) { + // topic should be retained, so save the topic in retained_topics + // do not save the topic when the payload is empty and QOS is set to 0 which means a retained topic is to be + // cleared + const std::lock_guard lock(retained_topics_mutex); + this->retained_topics.push_back(topic); + } } if (!this->mqtt_is_connected) { @@ -207,38 +214,16 @@ void MQTTAbstractionImpl::unsubscribe(const std::string& topic) { notify_write_data(); } -json MQTTAbstractionImpl::get(const std::string& topic, QOS qos) { +void MQTTAbstractionImpl::clear_retained_topics() { BOOST_LOG_FUNCTION(); - std::promise res_promise; - std::future res_future = res_promise.get_future(); - - const auto res_handler = [this, &res_promise](const std::string& topic, json data) { - res_promise.set_value(std::move(data)); - }; - - const std::shared_ptr res_token = - std::make_shared(HandlerType::GetConfig, std::make_shared(res_handler)); - this->register_handler(topic, res_token, QOS::QOS2); + const std::lock_guard lock(retained_topics_mutex); - // wait for result future - const std::chrono::time_point res_wait = - std::chrono::steady_clock::now() + std::chrono::milliseconds(mqtt_get_timeout_ms); - std::future_status res_future_status; - do { - res_future_status = res_future.wait_until(res_wait); - } while (res_future_status == std::future_status::deferred); - - json result; - if (res_future_status == std::future_status::timeout) { - this->unregister_handler(topic, res_token); - EVLOG_AND_THROW(EverestTimeoutError(fmt::format("Timeout while waiting for result of get()"))); - } - if (res_future_status == std::future_status::ready) { - result = res_future.get(); + for (const auto& retained_topic : retained_topics) { + this->publish(retained_topic, std::string(), QOS::QOS0, true); + EVLOG_verbose << "Cleared retained topic: " << retained_topic; } - this->unregister_handler(topic, res_token); - return result; + retained_topics.clear(); } void MQTTAbstractionImpl::notify_write_data() { @@ -364,9 +349,11 @@ void MQTTAbstractionImpl::on_mqtt_message(const Message& message) { } lock.unlock(); + // TODO(kai): this should maybe not even be a warning since it can happen that we unsubscribe from a topic and + // have removed the message handler but the MQTT unsubscribe didn't complete yet and we still receive messages + // on this topic that we can just ignore if (!found) { - EVLOG_AND_THROW( - EverestInternalError(fmt::format("Internal error: topic '{}' should have a matching handler!", topic))); + EVLOG_warning << fmt::format("Internal error: topic '{}' should have a matching handler!", topic); } } catch (boost::exception& e) { EVLOG_critical << fmt::format("Caught MQTT on_message boost::exception:\n{}", @@ -409,7 +396,8 @@ void MQTTAbstractionImpl::on_mqtt_disconnect() { EVLOG_AND_THROW(EverestInternalError("Lost connection to MQTT broker")); } -void MQTTAbstractionImpl::register_handler(const std::string& topic, std::shared_ptr handler, QOS qos) { +void MQTTAbstractionImpl::register_handler(const std::string& topic, const std::shared_ptr& handler, + QOS qos) { BOOST_LOG_FUNCTION(); switch (handler->type) { @@ -451,15 +439,15 @@ void MQTTAbstractionImpl::register_handler(const std::string& topic, std::shared if (this->message_handlers.count(topic) == 0) { this->message_handlers.emplace(std::piecewise_construct, std::forward_as_tuple(topic), std::forward_as_tuple()); } - this->message_handlers[topic].add_handler(handler); + this->message_handlers.at(topic).add_handler(handler); // only subscribe for this topic if we aren't already and the mqtt client is connected // if we are not connected the on_mqtt_connect() callback will subscribe to the topic - if (this->mqtt_is_connected && this->message_handlers[topic].count_handlers() == 1) { + if (this->mqtt_is_connected && this->message_handlers.at(topic).count_handlers() == 1) { EVLOG_verbose << fmt::format("Subscribing to {}", topic); this->subscribe(topic, qos); } - EVLOG_verbose << fmt::format("#handler[{}] = {}", topic, this->message_handlers[topic].count_handlers()); + EVLOG_verbose << fmt::format("#handler[{}] = {}", topic, this->message_handlers.at(topic).count_handlers()); } void MQTTAbstractionImpl::unregister_handler(const std::string& topic, const Token& token) { @@ -484,6 +472,7 @@ void MQTTAbstractionImpl::unregister_handler(const std::string& topic, const Tok EVLOG_verbose << fmt::format("Unsubscribing from {}", topic); this->unsubscribe(topic); } + this->message_handlers.erase(topic); } const std::string handler_count = (number_of_handlers == 0) ? "None" : std::to_string(number_of_handlers); diff --git a/lib/runtime.cpp b/lib/runtime.cpp index eaf53ae2..80797ac2 100644 --- a/lib/runtime.cpp +++ b/lib/runtime.cpp @@ -397,8 +397,8 @@ ModuleCallbacks::ModuleCallbacks( } ModuleLoader::ModuleLoader(int argc, char* argv[], ModuleCallbacks callbacks, - const VersionInformation version_information) : - runtime_settings(nullptr), callbacks(callbacks), version_information(version_information) { + const VersionInformation& version_information) : + runtime_settings(nullptr), callbacks(std::move(callbacks)), version_information(version_information) { if (!this->parse_command_line(argc, argv)) { this->should_exit = true; return; @@ -478,11 +478,11 @@ int ModuleLoader::initialize() { ModuleAdapter module_adapter; module_adapter.call = [&everest](const Requirement& req, const std::string& cmd_name, Parameters args) { - return everest.call_cmd(req, cmd_name, args); + return everest.call_cmd(req, cmd_name, std::move(args)); }; module_adapter.publish = [&everest](const std::string& param1, const std::string& param2, Value param3) { - return everest.publish_var(param1, param2, param3); + return everest.publish_var(param1, param2, std::move(param3)); }; module_adapter.subscribe = [&everest](const Requirement& req, const std::string& var_name, diff --git a/lib/types.cpp b/lib/types.cpp index 19b0e30f..ba58c570 100644 --- a/lib/types.cpp +++ b/lib/types.cpp @@ -7,15 +7,15 @@ TypedHandler::TypedHandler(const std::string& name_, const std::string& id_, HandlerType type_, std::shared_ptr handler_) : - name(name_), id(id_), type(type_), handler(handler_) { + name(name_), id(id_), type(type_), handler(std::move(handler_)) { } TypedHandler::TypedHandler(const std::string& name_, HandlerType type_, std::shared_ptr handler_) : - TypedHandler(name_, "", type_, handler_) { + TypedHandler(name_, "", type_, std::move(handler_)) { } TypedHandler::TypedHandler(HandlerType type_, std::shared_ptr handler_) : - TypedHandler("", "", type_, handler_) { + TypedHandler("", "", type_, std::move(handler_)) { } bool operator<(const Requirement& lhs, const Requirement& rhs) { diff --git a/lib/yaml_loader.cpp b/lib/yaml_loader.cpp index fb4a576f..a69cbae8 100644 --- a/lib/yaml_loader.cpp +++ b/lib/yaml_loader.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -11,13 +12,18 @@ #include +static std::streamsize clamp(std::size_t len) { + return (len <= std::numeric_limits::max()) ? static_cast(len) + : std::numeric_limits::max(); +} + static void yaml_error_handler(const char* msg, std::size_t len, ryml::Location loc, void*) { std::stringstream error_msg; error_msg << "YAML parsing error: "; if (loc) { if (not loc.name.empty()) { - error_msg.write(loc.name.str, loc.name.len); + error_msg.write(loc.name.str, clamp(loc.name.len)); error_msg << ":"; } error_msg << loc.line << ":"; @@ -28,7 +34,7 @@ static void yaml_error_handler(const char* msg, std::size_t len, ryml::Location error_msg << " (" << loc.offset << "B):"; } } - error_msg.write(msg, len); + error_msg.write(msg, clamp(len)); throw std::runtime_error(error_msg.str()); } @@ -78,6 +84,11 @@ static nlohmann::ordered_json ryml_to_nlohmann_json(const c4::yml::NodeRef& ryml } } +static std::string load_file_content(const std::filesystem::path& path) { + std::ifstream ifs(path.string()); + return std::string(std::istreambuf_iterator(ifs), std::istreambuf_iterator()); +} + static std::string load_yaml_content(std::filesystem::path path) { namespace fs = std::filesystem; @@ -92,16 +103,14 @@ static std::string load_yaml_content(std::filesystem::path path) { // first check for yaml, if not found try fall back to json and evlog debug deprecated if (fs::exists(path)) { - std::ifstream ifs(path.string()); - return std::string(std::istreambuf_iterator(ifs), std::istreambuf_iterator()); + return load_file_content(path); } path.replace_extension(".json"); if (fs::exists(path)) { EVLOG_info << "Deprecated: loaded file in json format"; - std::ifstream ifs(path.string()); - return std::string(std::istreambuf_iterator(ifs), std::istreambuf_iterator()); + return load_file_content(path); } // failed to find yaml and json diff --git a/src/controller/command_api.cpp b/src/controller/command_api.cpp index 4d4e58d4..88a9aed8 100644 --- a/src/controller/command_api.cpp +++ b/src/controller/command_api.cpp @@ -28,7 +28,7 @@ nlohmann::json CommandApi::handle(const std::string& cmd, const json& params) { if (cmd == "get_modules") { auto modules_list = json::object(); - for (const auto item : fs::directory_iterator(this->config.module_dir)) { + for (const auto& item : fs::directory_iterator(this->config.module_dir)) { if (!fs::is_directory(item)) { continue; } @@ -48,7 +48,7 @@ nlohmann::json CommandApi::handle(const std::string& cmd, const json& params) { } else if (cmd == "get_configs") { auto config_list = json::object(); - for (const auto item : fs::directory_iterator(this->config.configs_dir)) { + for (const auto& item : fs::directory_iterator(this->config.configs_dir)) { if (!fs::is_regular_file(item)) { continue; } @@ -65,7 +65,7 @@ nlohmann::json CommandApi::handle(const std::string& cmd, const json& params) { } else if (cmd == "get_interfaces") { auto interface_list = json::object(); - for (const auto item : fs::directory_iterator(this->config.interface_dir)) { + for (const auto& item : fs::directory_iterator(this->config.interface_dir)) { if (!fs::is_regular_file(item)) { continue; diff --git a/src/manager.cpp b/src/manager.cpp index 6f07645f..8b58f17a 100644 --- a/src/manager.cpp +++ b/src/manager.cpp @@ -229,7 +229,7 @@ static std::map spawn_modules(const std::vector started_modules; - const auto rs = ms.get_runtime_settings(); + const auto& rs = ms.get_runtime_settings(); for (const auto& module : modules) { @@ -265,45 +265,25 @@ struct ModuleReadyInfo { std::map modules_ready; std::mutex modules_ready_mutex; -void cleanup_retained_topics(ManagerConfig& config, MQTTAbstraction& mqtt_abstraction, - const std::string& mqtt_everest_prefix) { - const auto interface_definitions = config.get_interface_definitions(); - - mqtt_abstraction.publish(fmt::format("{}interfaces", mqtt_everest_prefix), std::string(), QOS::QOS2, true); - - for (const auto& interface_definition : interface_definitions.items()) { - mqtt_abstraction.publish( - fmt::format("{}interface_definitions/{}", mqtt_everest_prefix, interface_definition.key()), std::string(), - QOS::QOS2, true); - } - - mqtt_abstraction.publish(fmt::format("{}types", mqtt_everest_prefix), std::string(), QOS::QOS2, true); - - mqtt_abstraction.publish(fmt::format("{}module_provides", mqtt_everest_prefix), std::string(), QOS::QOS2, true); - - mqtt_abstraction.publish(fmt::format("{}settings", mqtt_everest_prefix), std::string(), QOS::QOS2, true); - - mqtt_abstraction.publish(fmt::format("{}schemas", mqtt_everest_prefix), std::string(), QOS::QOS2, true); - - mqtt_abstraction.publish(fmt::format("{}manifests", mqtt_everest_prefix), std::string(), QOS::QOS2, true); - - mqtt_abstraction.publish(fmt::format("{}error_types_map", mqtt_everest_prefix), std::string(), QOS::QOS2, true); - - mqtt_abstraction.publish(fmt::format("{}module_config_cache", mqtt_everest_prefix), std::string(), QOS::QOS2, true); -} - static std::map start_modules(ManagerConfig& config, MQTTAbstraction& mqtt_abstraction, const std::vector& ignored_modules, const std::vector& standalone_modules, - const ManagerSettings& ms, StatusFifo& status_fifo) { + const ManagerSettings& ms, StatusFifo& status_fifo, + bool retain_topics) { BOOST_LOG_FUNCTION(); std::vector modules_to_spawn; - const auto main_config = config.get_main_config(); + const auto& main_config = config.get_main_config(); + const auto& module_names = config.get_module_names(); + const auto number_of_modules = main_config.size(); + if (number_of_modules == 0) { + EVLOG_info << "No modules to start"; + return {}; + } + EVLOG_info << "Starting " << number_of_modules << " modules"; modules_to_spawn.reserve(main_config.size()); - const auto serialized_config = config.serialize(); const auto interface_definitions = config.get_interface_definitions(); std::vector interface_names; for (auto& interface_definition : interface_definitions.items()) { @@ -342,7 +322,13 @@ static std::map start_modules(ManagerConfig& config, MQTTAbs mqtt_abstraction.publish(fmt::format("{}schemas", ms.mqtt_settings.everest_prefix), schemas, QOS::QOS2, true); const auto manifests = config.get_manifests(); - mqtt_abstraction.publish(fmt::format("{}manifests", ms.mqtt_settings.everest_prefix), manifests, QOS::QOS2, true); + + for (const auto& manifest : manifests.items()) { + auto manifest_copy = manifest.value(); + manifest_copy.erase("config"); + mqtt_abstraction.publish(fmt::format("{}manifests/{}", ms.mqtt_settings.everest_prefix, manifest.key()), + manifest_copy, QOS::QOS2, true); + } const auto error_types_map = config.get_error_types(); mqtt_abstraction.publish(fmt::format("{}error_types_map", ms.mqtt_settings.everest_prefix), error_types_map, @@ -352,12 +338,16 @@ static std::map start_modules(ManagerConfig& config, MQTTAbs mqtt_abstraction.publish(fmt::format("{}module_config_cache", ms.mqtt_settings.everest_prefix), module_config_cache, QOS::QOS2, true); - for (const auto& module : serialized_config.at("module_names").items()) { - const std::string module_name = module.key(); - json serialized_mod_config = serialized_config; + mqtt_abstraction.publish(fmt::format("{}module_names", ms.mqtt_settings.everest_prefix), module_names, QOS::QOS2, + true); + + for (const auto& module_name_entry : module_names) { + const auto& module_name = module_name_entry.first; + const auto& module_type = module_name_entry.second; + json serialized_mod_config = json::object(); serialized_mod_config["module_config"] = json::object(); + serialized_mod_config["module_config"][module_name] = main_config.at(module_name); // add mappings of fulfillments - serialized_mod_config["module_config"][module_name] = serialized_config.at("main").at(module_name); const auto fulfillments = config.get_fulfillments(module_name); serialized_mod_config["mappings"] = json::object(); for (const auto& fulfillment_list : fulfillments) { @@ -373,7 +363,6 @@ static std::map start_modules(ManagerConfig& config, MQTTAbs if (mappings.has_value()) { serialized_mod_config["mappings"][module_name] = mappings.value(); } - serialized_mod_config.erase("main"); // FIXME: do not put this "main" config in there in the first place const auto telemetry_config = config.get_telemetry_config(module_name); if (telemetry_config.has_value()) { serialized_mod_config["telemetry_config"] = telemetry_config.value(); @@ -384,11 +373,20 @@ static std::map start_modules(ManagerConfig& config, MQTTAbs continue; } - // FIXME (aw): shall create a ref to main_confit.at(module_name)! - const std::string module_type = main_config.at(module_name).at("module"); // FIXME (aw): implicitely adding ModuleReadyInfo and setting its ready member auto module_it = modules_ready.emplace(module_name, ModuleReadyInfo{false, nullptr, nullptr}).first; + const std::string config_topic = fmt::format("{}/config", config.mqtt_module_prefix(module_name)); + const Handler module_get_config_handler = [module_name, config_topic, serialized_mod_config, + &mqtt_abstraction](const std::string&, const nlohmann::json& json) { + mqtt_abstraction.publish(config_topic, serialized_mod_config.dump(), QOS::QOS2); + }; + + const std::string get_config_topic = fmt::format("{}/get_config", config.mqtt_module_prefix(module_name)); + module_it->second.get_config_token = std::make_shared( + HandlerType::ExternalMQTT, std::make_shared(module_get_config_handler)); + mqtt_abstraction.register_handler(get_config_topic, module_it->second.get_config_token, QOS::QOS2); + const auto capabilities = [&module_config = main_config.at(module_name)]() { const auto cap_it = module_config.find("capabilities"); if (cap_it == module_config.end()) { @@ -404,8 +402,8 @@ static std::map start_modules(ManagerConfig& config, MQTTAbs } const Handler module_ready_handler = [module_name, &mqtt_abstraction, &config, standalone_modules, - mqtt_everest_prefix = ms.mqtt_settings.everest_prefix, - &status_fifo](const std::string&, nlohmann::json json) { + mqtt_everest_prefix = ms.mqtt_settings.everest_prefix, &status_fifo, + retain_topics](const std::string&, const nlohmann::json& json) { EVLOG_debug << fmt::format("received module ready signal for module: {}({})", module_name, json.dump()); const std::unique_lock lock(modules_ready_mutex); // FIXME (aw): here are race conditions, if the ready handler gets called while modules are shut down! @@ -427,11 +425,16 @@ static std::map start_modules(ManagerConfig& config, MQTTAbs [](const auto& element) { return element.second.ready; })) { const auto complete_end_time = std::chrono::system_clock::now(); status_fifo.update(StatusFifo::ALL_MODULES_STARTED); + if (not retain_topics) { + EVLOG_info << "Clearing retained topics published by manager during startup"; + mqtt_abstraction.clear_retained_topics(); + } else { + EVLOG_info << "Keeping retained topics published by manager during startup for inspection"; + } EVLOG_info << fmt::format( TERMINAL_STYLE_OK, "🚙🚙🚙 All modules are initialized. EVerest up and running [{}ms] 🚙🚙🚙", std::chrono::duration_cast(complete_end_time - complete_start_time) .count()); - // cleanup_retained_topics(config, mqtt_abstraction, mqtt_everest_prefix); mqtt_abstraction.publish(fmt::format("{}ready", mqtt_everest_prefix), nlohmann::json(true)); } else if (!standalone_modules.empty()) { if (modules_spawned == modules_ready.size() - standalone_modules.size()) { @@ -447,17 +450,6 @@ static std::map start_modules(ManagerConfig& config, MQTTAbs std::make_shared(HandlerType::ExternalMQTT, std::make_shared(module_ready_handler)); mqtt_abstraction.register_handler(ready_topic, module_it->second.ready_token, QOS::QOS2); - const std::string config_topic = fmt::format("{}/config", config.mqtt_module_prefix(module_name)); - const Handler module_get_config_handler = [module_name, config_topic, serialized_mod_config, - &mqtt_abstraction](const std::string&, nlohmann::json json) { - mqtt_abstraction.publish(config_topic, serialized_mod_config.dump()); - }; - - const std::string get_config_topic = fmt::format("{}/get_config", config.mqtt_module_prefix(module_name)); - module_it->second.get_config_token = std::make_shared( - HandlerType::ExternalMQTT, std::make_shared(module_get_config_handler)); - mqtt_abstraction.register_handler(get_config_topic, module_it->second.get_config_token, QOS::QOS2); - if (std::any_of(standalone_modules.begin(), standalone_modules.end(), [module_name](const auto& element) { return element == module_name; })) { EVLOG_info << "Not starting standalone module: " << fmt::format(TERMINAL_STYLE_BLUE, "{}", module_name); @@ -665,6 +657,8 @@ int boot(const po::variables_map& vm) { return EXIT_SUCCESS; } + const bool retain_topics = (vm.count("retain-topics") != 0); + const auto start_time = std::chrono::system_clock::now(); std::unique_ptr config; try { @@ -720,7 +714,7 @@ int boot(const po::variables_map& vm) { const auto& main_config = config->get_main_config(); for (const auto& module : main_config.items()) { - const std::string module_id = module.key(); + const std::string& module_id = module.key(); // check if standalone parameter is set const auto& module_config = main_config.at(module_id); if (module_config.value("standalone", false)) { @@ -758,7 +752,10 @@ int boot(const po::variables_map& vm) { mqtt_abstraction.spawn_main_loop_thread(); auto module_handles = - start_modules(*config, mqtt_abstraction, ignored_modules, standalone_modules, ms, status_fifo); + start_modules(*config, mqtt_abstraction, ignored_modules, standalone_modules, ms, status_fifo, retain_topics); + if (module_handles.empty()) { + return EXIT_FAILURE; + } bool modules_started = true; bool restart_modules = false; @@ -823,8 +820,8 @@ int boot(const po::variables_map& vm) { #ifdef ENABLE_ADMIN_PANEL if (module_handles.size() == 0 && restart_modules) { - module_handles = - start_modules(*config, mqtt_abstraction, ignored_modules, standalone_modules, ms, status_fifo); + module_handles = start_modules(*config, mqtt_abstraction, ignored_modules, standalone_modules, ms, + status_fifo, retain_topics); restart_modules = false; modules_started = true; } @@ -888,6 +885,8 @@ int main(int argc, char* argv[]) { "looked up in the default config directory"); desc.add_options()("status-fifo", po::value()->default_value(""), "Path to a named pipe, that shall be used for status updates from the manager"); + desc.add_options()("retain-topics", "Retain configuration MQTT topics setup by manager for inspection, by default " + "these will be cleared after startup"); po::variables_map vm; diff --git a/tests/test_config.cpp b/tests/test_config.cpp index dddd1e79..516ce9c3 100644 --- a/tests/test_config.cpp +++ b/tests/test_config.cpp @@ -171,18 +171,6 @@ SCENARIO("Check Config Constructor", "[!throws]") { }()); } } - GIVEN("A valid config with a valid module serialized") { - auto ms = - Everest::ManagerSettings(bin_dir + "valid_module_config/", bin_dir + "valid_module_config/config.yaml"); - THEN("It should not throw at all") { - CHECK_NOTHROW([&]() { - auto mc = Everest::ManagerConfig(ms); - auto serialized = mc.serialize(); - CHECK(serialized.at("module_names").size() == 1); - CHECK(serialized.at("module_names").at("valid_module") == "TESTValidManifest"); - }()); - } - } GIVEN("A valid config in legacy json format with a valid module") { auto ms = Everest::ManagerSettings(bin_dir + "valid_module_config_json/", bin_dir + "valid_module_config_json/config.json");