diff --git a/src/InputStream.h b/src/InputStream.h index bf6cdf50b..2aceb6374 100644 --- a/src/InputStream.h +++ b/src/InputStream.h @@ -6,6 +6,7 @@ #include "AbstractModule.h" #include "StreamHandler.h" +#include namespace visor { @@ -15,6 +16,11 @@ class InputStream : public AbstractRunnableModule std::vector _policies; public: + enum class Action { + AddPolicy, + RemovePolicy + }; + InputStream(const std::string &name) : AbstractRunnableModule(name) { @@ -26,12 +32,14 @@ class InputStream : public AbstractRunnableModule { std::unique_lock lock(_input_mutex); _policies.push_back(policy); + policy_signal(policy, Action::AddPolicy); } void remove_policy(const Policy *policy) { std::unique_lock lock(_input_mutex); _policies.erase(std::remove(_policies.begin(), _policies.end(), policy), _policies.end()); + policy_signal(policy, Action::RemovePolicy); } size_t policies_count() const @@ -40,7 +48,10 @@ class InputStream : public AbstractRunnableModule return _policies.size(); } - virtual size_t consumer_count() const = 0; + virtual size_t consumer_count() const + { + return policy_signal.slot_count(); + } void common_info_json(json &j) const { @@ -48,6 +59,8 @@ class InputStream : public AbstractRunnableModule j["input"]["running"] = running(); j["input"]["consumers"] = consumer_count(); } + + mutable sigslot::signal policy_signal; }; } diff --git a/src/Policies.cpp b/src/Policies.cpp index 44a652743..8ec0dc938 100644 --- a/src/Policies.cpp +++ b/src/Policies.cpp @@ -146,12 +146,7 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) throw PolicyException(fmt::format("unable to instantiate tap '{}': {}", tap_name, e.what())); } } - // Create Policy - auto policy = std::make_unique(policy_name, tap); - // if and only if policy succeeds, we will return this in result set - Policy *policy_ptr = policy.get(); - policy->set_input_stream(input_ptr); - // Handler Section + // Handler type if (!it->second["handlers"] || !it->second["handlers"].IsMap()) { throw PolicyException("missing or invalid handler configuration at key 'handlers'"); } @@ -162,6 +157,14 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) } else if (handler_node["modules"].IsSequence()) { handler_sequence = true; } + + // Create Policy + auto policy = std::make_unique(policy_name, tap, handler_sequence); + // if and only if policy succeeds, we will return this in result set + Policy *policy_ptr = policy.get(); + policy->set_input_stream(input_ptr); + + // Handler Section Config window_config; if (handler_node["window_config"] && handler_node["window_config"].IsMap()) { try { @@ -174,6 +177,21 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) window_config.config_set("deep_sample_rate", _default_deep_sample_rate); } + std::unique_ptr input_resources_policy; + Policy *input_res_policy_ptr; + std::unique_ptr resources_module; + if (input_stream) { + // create new policy with resources handler for input stream + input_resources_policy = std::make_unique(input_stream_module_name + "-resources", tap, false); + input_resources_policy->set_input_stream(input_ptr); + auto resources_handler_plugin = _registry->handler_plugins().find("input_resources"); + if (resources_handler_plugin != _registry->handler_plugins().end()) { + resources_module = resources_handler_plugin->second->instantiate(input_stream_module_name + "-resources", input_ptr, &window_config); + input_resources_policy->add_module(resources_module.get()); + input_res_policy_ptr = input_resources_policy.get(); + } + } + std::vector> handler_modules; for (YAML::const_iterator h_it = handler_node["modules"].begin(); h_it != handler_node["modules"].end(); ++h_it) { @@ -269,6 +287,9 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) // make sure policy starts before committing try { policy->start(); + if (input_resources_policy) { + input_resources_policy->start(); + } } catch (std::runtime_error &e) { throw PolicyException(fmt::format("policy [{}] failed to start: {}", policy_name, e.what())); } @@ -278,6 +299,9 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) // roll back during exception ensures no modules have been added to any of the managers try { module_add(std::move(policy)); + if (input_resources_policy) { + module_add(std::move(input_resources_policy)); + } } catch (ModuleException &e) { throw PolicyException(fmt::format("policy [{}] creation failed (policy): {}", policy_name, e.what())); } @@ -288,6 +312,9 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) } catch (ModuleException &e) { // note that if this call excepts, we are in an unknown state and the exception will propagate module_remove(policy_name); + if (input_res_policy_ptr) { + module_remove(input_res_policy_ptr->name()); + } throw PolicyException(fmt::format("policy [{}] creation failed (input): {}", policy_name, e.what())); } std::vector added_handlers; @@ -298,10 +325,18 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) // if it did not except, add it to the list for rollback upon exception added_handlers.push_back(hname); } + if (resources_module) { + auto hname = resources_module->name(); + _registry->handler_manager()->module_add(std::move(resources_module)); + added_handlers.push_back(hname); + } } catch (ModuleException &e) { // note that if any of these calls except, we are in an unknown state and the exception will propagate // nothing needs to be stopped because it was not started module_remove(policy_name); + if (input_res_policy_ptr) { + module_remove(input_res_policy_ptr->name()); + } _registry->input_manager()->module_remove(input_stream_module_name); for (auto &m : added_handlers) { _registry->handler_manager()->module_remove(m); @@ -311,6 +346,9 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) } // success + if (input_res_policy_ptr) { + input_ptr->add_policy(input_res_policy_ptr); + } input_ptr->add_policy(policy_ptr); result.push_back(policy_ptr); } @@ -325,6 +363,7 @@ void PolicyManager::remove_policy(const std::string &name) } auto policy = _map[name].get(); + auto input_stream = policy->input_stream(); auto input_name = policy->input_stream()->name(); std::vector module_names; for (const auto &mod : policy->modules()) { @@ -336,7 +375,18 @@ void PolicyManager::remove_policy(const std::string &name) _registry->handler_manager()->module_remove(name); } - if (!policy->input_stream()->policies_count()) { + if (input_stream->policies_count() == 1) { + // if there is only one policy left on the input stream, and that policy is the input resources policy, then remove it + auto input_resources_name = input_name + "-resources"; + if (_map.count(input_resources_name) != 0) { + auto resources_policy = _map[input_resources_name].get(); + resources_policy->stop(); + _registry->handler_manager()->module_remove(input_resources_name); + _map.erase(input_resources_name); + } + } + + if (!input_stream->policies_count()) { _registry->input_manager()->module_remove(input_name); } diff --git a/src/Policies.h b/src/Policies.h index 0aa65c241..1b459b383 100644 --- a/src/Policies.h +++ b/src/Policies.h @@ -28,15 +28,18 @@ class PolicyException : public std::runtime_error class Policy : public AbstractRunnableModule { + static constexpr size_t HANDLERS_SEQUENCE_SIZE = 1; Tap *_tap; + bool _modules_sequence; InputStream *_input_stream; std::vector _modules; public: - Policy(const std::string &name, Tap *tap) + Policy(const std::string &name, Tap *tap, bool modules_sequence) : AbstractRunnableModule(name) , _tap(tap) + , _modules_sequence(modules_sequence) , _input_stream(nullptr) { } @@ -66,6 +69,15 @@ class Policy : public AbstractRunnableModule return _modules; } + size_t get_handlers_list_size() const + { + if (_modules_sequence) { + return HANDLERS_SEQUENCE_SIZE; + } else { + return _modules.size(); + } + } + // life cycle void start() override; void stop() override; diff --git a/src/handlers/CMakeLists.txt b/src/handlers/CMakeLists.txt index c5e69c4ba..2ccd20eac 100644 --- a/src/handlers/CMakeLists.txt +++ b/src/handlers/CMakeLists.txt @@ -4,5 +4,6 @@ add_subdirectory(dns) add_subdirectory(dhcp) add_subdirectory(pcap) add_subdirectory(mock) +add_subdirectory(input_resources) set(VISOR_STATIC_PLUGINS ${VISOR_STATIC_PLUGINS} PARENT_SCOPE) diff --git a/src/handlers/README.md b/src/handlers/README.md index 7052384e9..07abe59ad 100644 --- a/src/handlers/README.md +++ b/src/handlers/README.md @@ -10,3 +10,4 @@ See the individual READMEs for more information: * [Mock](mock/) * [Network](net/) * [PCAP](pcap/) +* [Resources](resources/) diff --git a/src/handlers/input_resources/CMakeLists.txt b/src/handlers/input_resources/CMakeLists.txt new file mode 100644 index 000000000..e9cb5f915 --- /dev/null +++ b/src/handlers/input_resources/CMakeLists.txt @@ -0,0 +1,28 @@ +message(STATUS "Handler Module: Input Resources") + +set_directory_properties(PROPERTIES CORRADE_USE_PEDANTIC_FLAGS ON) + +corrade_add_static_plugin(VisorHandlerInputResources + ${CMAKE_CURRENT_BINARY_DIR} + InputResourcesHandler.conf + InputResourcesHandlerModulePlugin.cpp + InputResourcesStreamHandler.cpp + ) +add_library(Visor::Handler::InputResources ALIAS VisorHandlerInputResources) + +target_include_directories(VisorHandlerInputResources + INTERFACE + $ + ) + +target_link_libraries(VisorHandlerInputResources + PUBLIC + Visor::Input::Pcap + Visor::Input::Dnstap + Visor::Input::Mock + Visor::Input::Sflow + ) + +set(VISOR_STATIC_PLUGINS ${VISOR_STATIC_PLUGINS} Visor::Handler::InputResources PARENT_SCOPE) + +add_subdirectory(tests) \ No newline at end of file diff --git a/src/handlers/input_resources/InputResourcesHandler.conf b/src/handlers/input_resources/InputResourcesHandler.conf new file mode 100644 index 000000000..947adaa02 --- /dev/null +++ b/src/handlers/input_resources/InputResourcesHandler.conf @@ -0,0 +1,5 @@ +# Aliases +provides=input_resources +[data] +desc=Input Resources analyzer +type=handler diff --git a/src/handlers/input_resources/InputResourcesHandlerModulePlugin.cpp b/src/handlers/input_resources/InputResourcesHandlerModulePlugin.cpp new file mode 100644 index 000000000..565dc90c6 --- /dev/null +++ b/src/handlers/input_resources/InputResourcesHandlerModulePlugin.cpp @@ -0,0 +1,31 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +#include "InputResourcesHandlerModulePlugin.h" +#include "CoreRegistry.h" +#include "InputResourcesStreamHandler.h" +#include "HandlerManager.h" +#include "InputStreamManager.h" +#include +#include + +CORRADE_PLUGIN_REGISTER(VisorHandlerInputResources, visor::handler::resources::InputResourcesHandlerModulePlugin, + "visor.module.handler/1.0") + +namespace visor::handler::resources { + +using json = nlohmann::json; + +void InputResourcesHandlerModulePlugin::setup_routes(HttpServer *svr) +{ +} +std::unique_ptr InputResourcesHandlerModulePlugin::instantiate(const std::string &name, InputStream *input_stream, const Configurable *config, StreamHandler *stream_handler) +{ + // TODO using config as both window config and module config + auto handler_module = std::make_unique(name, input_stream, config, stream_handler); + handler_module->config_merge(*config); + return handler_module; +} + +} \ No newline at end of file diff --git a/src/handlers/input_resources/InputResourcesHandlerModulePlugin.h b/src/handlers/input_resources/InputResourcesHandlerModulePlugin.h new file mode 100644 index 000000000..06e0aef15 --- /dev/null +++ b/src/handlers/input_resources/InputResourcesHandlerModulePlugin.h @@ -0,0 +1,24 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +#pragma once + +#include "HandlerModulePlugin.h" + +namespace visor::handler::resources { + +class InputResourcesHandlerModulePlugin : public HandlerModulePlugin +{ + +protected: + void setup_routes(HttpServer *svr) override; + +public: + explicit InputResourcesHandlerModulePlugin(Corrade::PluginManager::AbstractManager &manager, const std::string &plugin) + : visor::HandlerModulePlugin{manager, plugin} + { + } + std::unique_ptr instantiate(const std::string &name, InputStream *input_stream, const Configurable *config, StreamHandler *stream_handler = nullptr) override; +}; +} diff --git a/src/handlers/input_resources/InputResourcesStreamHandler.cpp b/src/handlers/input_resources/InputResourcesStreamHandler.cpp new file mode 100644 index 000000000..7182a4a69 --- /dev/null +++ b/src/handlers/input_resources/InputResourcesStreamHandler.cpp @@ -0,0 +1,217 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +#include "InputResourcesStreamHandler.h" +#include "Policies.h" + +namespace visor::handler::resources { + +InputResourcesStreamHandler::InputResourcesStreamHandler(const std::string &name, InputStream *stream, const Configurable *window_config, StreamHandler *handler) + : visor::StreamMetricsHandler(name, window_config) + , _timer(0) + , _timestamp(timespec()) +{ + if (handler) { + throw StreamHandlerException(fmt::format("ResourcesStreamHandler: unsupported upstream chained stream handler {}", handler->name())); + } + + assert(stream); + // figure out which input stream we have + if (stream) { + _pcap_stream = dynamic_cast(stream); + _dnstap_stream = dynamic_cast(stream); + _mock_stream = dynamic_cast(stream); + _sflow_stream = dynamic_cast(stream); + if (!_pcap_stream && !_mock_stream && !_dnstap_stream && !_sflow_stream) { + throw StreamHandlerException(fmt::format("NetStreamHandler: unsupported input stream {}", stream->name())); + } + } +} + +void InputResourcesStreamHandler::start() +{ + if (_running) { + return; + } + + if (config_exists("recorded_stream")) { + _metrics->set_recorded_stream(); + } + + if (_pcap_stream) { + _pkt_connection = _pcap_stream->packet_signal.connect(&InputResourcesStreamHandler::process_packet_cb, this); + _policies_connection = _pcap_stream->policy_signal.connect(&InputResourcesStreamHandler::process_policies_cb, this); + } else if (_dnstap_stream) { + _dnstap_connection = _dnstap_stream->dnstap_signal.connect(&InputResourcesStreamHandler::process_dnstap_cb, this); + _policies_connection = _dnstap_stream->policy_signal.connect(&InputResourcesStreamHandler::process_policies_cb, this); + } else if (_sflow_stream) { + _sflow_connection = _sflow_stream->sflow_signal.connect(&InputResourcesStreamHandler::process_sflow_cb, this); + _policies_connection = _sflow_stream->policy_signal.connect(&InputResourcesStreamHandler::process_policies_cb, this); + } + + _running = true; +} + +void InputResourcesStreamHandler::stop() +{ + if (!_running) { + return; + } + + if (_pcap_stream) { + _pkt_connection.disconnect(); + } else if (_dnstap_stream) { + _dnstap_connection.disconnect(); + } else if (_sflow_stream) { + _sflow_connection.disconnect(); + } + _policies_connection.disconnect(); + + _running = false; +} + +void InputResourcesStreamHandler::process_policies_cb(const Policy *policy, InputStream::Action action) +{ + int16_t policies_number = 0; + int16_t handlers_count = 0; + + switch (action) { + case InputStream::Action::AddPolicy: + policies_number = 1; + handlers_count = policy->get_handlers_list_size(); + break; + case InputStream::Action::RemovePolicy: + policies_number = -1; + handlers_count = -policy->get_handlers_list_size(); + break; + } + + _metrics->process_policies(policies_number, handlers_count); +} + +void InputResourcesStreamHandler::process_sflow_cb([[maybe_unused]] const SFSample &) +{ + if (difftime(time(NULL), _timer) >= MEASURE_INTERVAL) { + _timer = time(NULL); + _metrics->process_resources(_monitor.cpu_percentage(), _monitor.memory_usage()); + } +} + +void InputResourcesStreamHandler::process_dnstap_cb([[maybe_unused]] const dnstap::Dnstap &) +{ + if (difftime(time(NULL), _timer) >= MEASURE_INTERVAL) { + _timer = time(NULL); + _metrics->process_resources(_monitor.cpu_percentage(), _monitor.memory_usage()); + } +} + +void InputResourcesStreamHandler::process_packet_cb([[maybe_unused]] pcpp::Packet &payload, [[maybe_unused]] PacketDirection dir, [[maybe_unused]] pcpp::ProtocolType l3, [[maybe_unused]] pcpp::ProtocolType l4, [[maybe_unused]] timespec stamp) +{ + if (stamp.tv_sec >= _timestamp.tv_sec + MEASURE_INTERVAL) { + _timestamp = stamp; + _metrics->process_resources(_monitor.cpu_percentage(), _monitor.memory_usage()); + } +} + +void InputResourcesMetricsBucket::specialized_merge(const AbstractMetricsBucket &o) +{ + // static because caller guarantees only our own bucket type + const auto &other = static_cast(o); + + std::shared_lock r_lock(other._mutex); + std::unique_lock w_lock(_mutex); + + _cpu_usage.merge(other._cpu_usage); + _memory_bytes.merge(other._memory_bytes); + + // Merge only the first bucket which is the more recent + if (!_merged) { + _policy_count += other._policy_count; + _handler_count += other._handler_count; + _merged = true; + } +} + +void InputResourcesMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelMap add_labels) const +{ + { + auto [num_events, num_samples, event_rate, event_lock] = event_data_locked(); // thread safe + + event_rate->to_prometheus(out, add_labels); + num_events->to_prometheus(out, add_labels); + num_samples->to_prometheus(out, add_labels); + } + + std::shared_lock r_lock(_mutex); + + _cpu_usage.to_prometheus(out, add_labels); + _memory_bytes.to_prometheus(out, add_labels); + _policy_count.to_prometheus(out, add_labels); + _handler_count.to_prometheus(out, add_labels); +} + +void InputResourcesMetricsBucket::to_json(json &j) const +{ + bool live_rates = !read_only() && !recorded_stream(); + + { + auto [num_events, num_samples, event_rate, event_lock] = event_data_locked(); // thread safe + + event_rate->to_json(j, live_rates); + num_events->to_json(j); + num_samples->to_json(j); + } + + std::shared_lock r_lock(_mutex); + + _cpu_usage.to_json(j); + _memory_bytes.to_json(j); + _policy_count.to_json(j); + _handler_count.to_json(j); +} + +void InputResourcesMetricsBucket::process_resources(double cpu_usage, uint64_t memory_usage) +{ + std::unique_lock lock(_mutex); + + _cpu_usage.update(cpu_usage); + _memory_bytes.update(memory_usage); +} + +void InputResourcesMetricsBucket::process_policies(int16_t policy_count, int16_t handler_count) +{ + std::unique_lock lock(_mutex); + + _policy_count += policy_count; + _handler_count += handler_count; +} + +void InputResourcesMetricsManager::process_resources(double cpu_usage, uint64_t memory_usage, timespec stamp) +{ + if (stamp.tv_sec == 0) { + // use now() + std::timespec_get(&stamp, TIME_UTC); + } + // base event + new_event(stamp); + // process in the "live" bucket. this will parse the resources if we are deep sampling + live_bucket()->process_resources(cpu_usage, memory_usage); +} + +void InputResourcesMetricsManager::process_policies(int16_t policy_count, int16_t handler_count, bool self) +{ + if (!self) { + policy_total += policy_count; + handler_total += handler_count; + } + + timespec stamp; + // use now() + std::timespec_get(&stamp, TIME_UTC); + // base event + new_event(stamp); + // process in the "live" bucket. this will parse the resources if we are deep sampling + live_bucket()->process_policies(policy_count, handler_count); +} +} \ No newline at end of file diff --git a/src/handlers/input_resources/InputResourcesStreamHandler.h b/src/handlers/input_resources/InputResourcesStreamHandler.h new file mode 100644 index 000000000..e3f4f18bc --- /dev/null +++ b/src/handlers/input_resources/InputResourcesStreamHandler.h @@ -0,0 +1,121 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +#pragma once + +#include "AbstractMetricsManager.h" +#include "DnstapInputStream.h" +#include "MockInputStream.h" +#include "PcapInputStream.h" +#include "SflowInputStream.h" +#include "StreamHandler.h" +#include "ThreadMonitor.h" +#include +#include +#include + +namespace visor::handler::resources { + +using namespace visor::input::pcap; +using namespace visor::input::dnstap; +using namespace visor::input::mock; +using namespace visor::input::sflow; + +constexpr double MEASURE_INTERVAL = 5; // in seconds + +class InputResourcesMetricsBucket final : public visor::AbstractMetricsBucket +{ + +protected: + mutable std::shared_mutex _mutex; + + Quantile _cpu_usage; + Quantile _memory_bytes; + Counter _policy_count; + Counter _handler_count; + bool _merged; + +public: + InputResourcesMetricsBucket() + : _cpu_usage("resources", {"cpu_usage"}, "Quantiles of 5s averages of percent cpu usage by the input stream") + , _memory_bytes("resources", {"memory_bytes"}, "Quantiles of 5s averages of memory usage (in bytes) by the input stream") + , _policy_count("resources", {"policy_count"}, "Total number of policies attached to the input stream") + , _handler_count("resources", {"handler_count"}, "Total number of handlers attached to the input stream") + , _merged(false) + { + } + + // visor::AbstractMetricsBucket + + void specialized_merge(const AbstractMetricsBucket &other) override; + void to_json(json &j) const override; + void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const override; + + void process_resources(double cpu_usage, uint64_t memory_usage); + void process_policies(int16_t policy_count, int16_t handler_count); +}; + +class InputResourcesMetricsManager final : public visor::AbstractMetricsManager +{ + uint16_t policy_total; + uint16_t handler_total; + +public: + InputResourcesMetricsManager(const Configurable *window_config) + : visor::AbstractMetricsManager(window_config) + , policy_total(0) + , handler_total(0) + { + } + + void on_period_shift([[maybe_unused]] timespec stamp, [[maybe_unused]] const InputResourcesMetricsBucket *maybe_expiring_bucket) override + { + process_policies(policy_total, handler_total, true); + } + + void process_resources(double cpu_usage, uint64_t memory_usage, timespec stamp = timespec()); + void process_policies(int16_t policy_count, int16_t handler_count, bool self = false); +}; + +class InputResourcesStreamHandler final : public visor::StreamMetricsHandler +{ + ThreadMonitor _monitor; + time_t _timer; + timespec _timestamp; + + PcapInputStream *_pcap_stream{nullptr}; + DnstapInputStream *_dnstap_stream{nullptr}; + MockInputStream *_mock_stream{nullptr}; + SflowInputStream *_sflow_stream{nullptr}; + + sigslot::connection _dnstap_connection; + sigslot::connection _sflow_connection; + sigslot::connection _pkt_connection; + sigslot::connection _policies_connection; + + void process_sflow_cb(const SFSample &); + void process_dnstap_cb(const dnstap::Dnstap &); + void process_policies_cb(const Policy *policy, InputStream::Action action); + void process_packet_cb(pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4, timespec stamp); + +public: + InputResourcesStreamHandler(const std::string &name, InputStream *stream, const Configurable *window_config, StreamHandler *handler = nullptr); + ~InputResourcesStreamHandler() = default; + + // visor::AbstractModule + std::string schema_key() const override + { + return "input_resources"; + } + + size_t consumer_count() const override + { + return 0; + } + + void start() override; + void stop() override; +}; + +} diff --git a/src/handlers/input_resources/README.md b/src/handlers/input_resources/README.md new file mode 100644 index 000000000..30263bef5 --- /dev/null +++ b/src/handlers/input_resources/README.md @@ -0,0 +1,7 @@ +# Application Input Resources Metrics Stream Handler + +This directory contains the Input Resources stream handler + +It can attach to input streams and provide system thread information + +[InputResourcesStreamHandler.h](InputResourcesStreamHandler.h) contains the list of metrics. diff --git a/src/handlers/input_resources/ThreadMonitor.h b/src/handlers/input_resources/ThreadMonitor.h new file mode 100644 index 000000000..2a489103a --- /dev/null +++ b/src/handlers/input_resources/ThreadMonitor.h @@ -0,0 +1,95 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +#pragma once + +#include +#include + +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) +#elif __APPLE__ +#elif __linux__ +#include +#endif + +namespace visor { + +class ThreadMonitor +{ +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) +#elif __APPLE__ +#elif __linux__ + uint64_t _last_system_time = 0; + uint64_t _last_thread_time = 0; +#endif +public: + ThreadMonitor() = default; + ~ThreadMonitor() = default; + + inline double cpu_percentage() + { +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) + return 0; +#elif __APPLE__ + return 0; +#elif __linux__ + uint64_t stat; + + std::ifstream system_stat("/proc/stat"); + std::string line; + std::getline(system_stat, line); + std::stringstream cpu_times(line.erase(0, 5)); + uint64_t system_total_time = 0; + while (cpu_times >> stat) { + system_total_time += stat; + } + system_total_time = system_total_time / sysconf(_SC_NPROCESSORS_ONLN); + + std::vector stats; + std::ifstream thread_stat("/proc/thread-self/stat"); + thread_stat.ignore(' '); + while (thread_stat >> stat) { + stats.push_back(stat); + } + uint64_t thread_total_time = (stats[8] + stats[9]); + + uint64_t current_thread_time = thread_total_time - _last_thread_time; + _last_thread_time = thread_total_time; + double current_period_time = system_total_time - _last_system_time; + _last_system_time = system_total_time; + + double cpu_usage = (current_thread_time / current_period_time) * 100.0; + if (cpu_usage < 0.0) { + cpu_usage = 0.0; + } + return cpu_usage; +#endif + } + + inline uint64_t memory_usage() + { +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) + return 0; +#elif __APPLE__ + return 0; +#elif __linux__ + uint64_t memory; + std::string token; + std::ifstream file("/proc/thread-self/status"); + while (file >> token) { + if (token == "VmRSS:") { + if (file >> memory) { + return memory * 1024; + } else { + return 0; + } + } + // Ignore the rest of the line + file.ignore(std::numeric_limits::max(), '\n'); + } + return 0; // Nothing found +#endif + } +}; +} diff --git a/src/handlers/input_resources/tests/CMakeLists.txt b/src/handlers/input_resources/tests/CMakeLists.txt new file mode 100644 index 000000000..6e3075cb2 --- /dev/null +++ b/src/handlers/input_resources/tests/CMakeLists.txt @@ -0,0 +1,16 @@ + +## TEST SUITE +add_executable(unit-tests-handler-input-resources + main.cpp + test_resources_layer.cpp + ) + +target_link_libraries(unit-tests-handler-input-resources + PRIVATE + ${CONAN_LIBS_JSON-SCHEMA-VALIDATOR} + Visor::Handler::InputResources) + +add_test(NAME unit-tests-handler-input-resources + WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}/src + COMMAND unit-tests-handler-input-resources + ) diff --git a/src/handlers/input_resources/tests/main.cpp b/src/handlers/input_resources/tests/main.cpp new file mode 100644 index 000000000..3b6e7e5b3 --- /dev/null +++ b/src/handlers/input_resources/tests/main.cpp @@ -0,0 +1,24 @@ +#define CATCH_CONFIG_RUNNER +#include +#include +#include +#include + +int main(int argc, char *argv[]) +{ + Catch::Session session; + + auto logger = spdlog::get("visor"); + if (!logger) { + spdlog::stderr_color_mt("visor"); + } + + int result = session.applyCommandLine(argc, argv); + if (result != 0) { + return result; + } + + result = session.run(); + + return (result == 0 ? EXIT_SUCCESS : EXIT_FAILURE); +} diff --git a/src/handlers/input_resources/tests/test_resources_layer.cpp b/src/handlers/input_resources/tests/test_resources_layer.cpp new file mode 100644 index 000000000..0965c8daf --- /dev/null +++ b/src/handlers/input_resources/tests/test_resources_layer.cpp @@ -0,0 +1,106 @@ +#include + +#include "DnstapInputStream.h" +#include "InputResourcesStreamHandler.h" +#include "PcapInputStream.h" +#include "SflowInputStream.h" +#include "Policies.h" + +using namespace visor::handler::resources; + +TEST_CASE("Check resources for pcap input", "[pcap][resources]") +{ + PcapInputStream stream{"pcap-test"}; + stream.config_set("pcap_file", "tests/fixtures/dns_ipv4_udp.pcap"); + stream.config_set("bpf", std::string()); + + visor::Config c; + c.config_set("num_periods", 1); + InputResourcesStreamHandler resources_handler{"resource-test", &stream, &c}; + + resources_handler.start(); + stream.start(); + //add and remove policy + auto policy = std::make_unique("policy-test", nullptr, false); + stream.add_policy(policy.get()); + stream.remove_policy(policy.get()); + resources_handler.stop(); + stream.stop(); + + auto event_data = resources_handler.metrics()->bucket(0)->event_data_locked(); + + CHECK(resources_handler.metrics()->current_periods() == 1); + CHECK(event_data.num_events->value() >= 1); + + nlohmann::json j; + resources_handler.metrics()->bucket(0)->to_json(j); + + CHECK(j["cpu_usage"]["p50"] != nullptr); + CHECK(j["memory_bytes"]["p50"] != nullptr); + CHECK(j["policy_count"] == 0); + CHECK(j["handler_count"] == 0); + + std::stringstream output; + std::string line; + resources_handler.metrics()->bucket(0)->to_prometheus(output, {{"policy", "default"}}); + std::getline(output, line); + CHECK(line == "# HELP base_total Total number of events"); + std::getline(output, line); + CHECK(line == "# TYPE base_total gauge"); +} + +TEST_CASE("Check resources for dnstap input", "[dnstap][resources]") +{ + DnstapInputStream stream{"dnstap-test"}; + stream.config_set("dnstap_file", "inputs/dnstap/tests/fixtures/fixture.dnstap"); + stream.config_set("only_hosts", {"192.168.0.0/24", "2001:db8::/48"}); + visor::Config c; + c.config_set("num_periods", 1); + InputResourcesStreamHandler resources_handler{"resource-test", &stream, &c}; + + resources_handler.start(); + stream.start(); + stream.stop(); + resources_handler.stop(); + + auto event_data = resources_handler.metrics()->bucket(0)->event_data_locked(); + + CHECK(resources_handler.metrics()->current_periods() == 1); + CHECK(event_data.num_events->value() == 1); + + nlohmann::json j; + resources_handler.metrics()->bucket(0)->to_json(j); + + CHECK(j["cpu_usage"]["p50"] != nullptr); + CHECK(j["memory_bytes"]["p50"] != nullptr); + CHECK(j["policy_count"] == 0); + CHECK(j["handler_count"] == 0); +} + +TEST_CASE("Check resources for sflow input", "[sflow][resources]") +{ + SflowInputStream stream{"sflow-test"}; + stream.config_set("pcap_file", "tests/fixtures/ecmp.pcap"); + + visor::Config c; + c.config_set("num_periods", 1); + InputResourcesStreamHandler resources_handler{"resource-test", &stream, &c}; + + resources_handler.start(); + stream.start(); + stream.stop(); + resources_handler.stop(); + + auto event_data = resources_handler.metrics()->bucket(0)->event_data_locked(); + + CHECK(resources_handler.metrics()->current_periods() == 1); + CHECK(event_data.num_events->value() == 1); + + nlohmann::json j; + resources_handler.metrics()->bucket(0)->to_json(j); + + CHECK(j["cpu_usage"]["p50"] != nullptr); + CHECK(j["memory_bytes"]["p50"] != nullptr); + CHECK(j["policy_count"] == 0); + CHECK(j["handler_count"] == 0); +} \ No newline at end of file diff --git a/src/handlers/static_plugins.h b/src/handlers/static_plugins.h index b193c3503..fa2013519 100644 --- a/src/handlers/static_plugins.h +++ b/src/handlers/static_plugins.h @@ -5,6 +5,7 @@ int import_handler_plugins() CORRADE_PLUGIN_IMPORT(VisorHandlerDns); CORRADE_PLUGIN_IMPORT(VisorHandlerDhcp); CORRADE_PLUGIN_IMPORT(VisorHandlerPcap); + CORRADE_PLUGIN_IMPORT(VisorHandlerInputResources); return 0; } diff --git a/src/inputs/dnstap/DnstapInputStream.h b/src/inputs/dnstap/DnstapInputStream.h index ca1f962aa..b98ea5f94 100644 --- a/src/inputs/dnstap/DnstapInputStream.h +++ b/src/inputs/dnstap/DnstapInputStream.h @@ -8,7 +8,6 @@ #include "InputStream.h" #include "dnstap.pb.h" #include -#include #include #include #include @@ -96,7 +95,7 @@ class DnstapInputStream : public visor::InputStream void info_json(json &j) const override; size_t consumer_count() const override { - return dnstap_signal.slot_count(); + return policy_signal.slot_count() + dnstap_signal.slot_count(); } // handler functionality diff --git a/src/inputs/mock/MockInputStream.h b/src/inputs/mock/MockInputStream.h index bcfaa5b5e..bea58f4eb 100644 --- a/src/inputs/mock/MockInputStream.h +++ b/src/inputs/mock/MockInputStream.h @@ -5,7 +5,6 @@ #pragma once #include "InputStream.h" -#include #include #include @@ -31,7 +30,7 @@ class MockInputStream : public visor::InputStream void info_json(json &j) const override; size_t consumer_count() const override { - return random_int_signal.slot_count(); + return policy_signal.slot_count() + random_int_signal.slot_count(); } // handler functionality diff --git a/src/inputs/pcap/PcapInputStream.h b/src/inputs/pcap/PcapInputStream.h index 943d409df..beea87211 100644 --- a/src/inputs/pcap/PcapInputStream.h +++ b/src/inputs/pcap/PcapInputStream.h @@ -15,7 +15,6 @@ #include "utils.h" #include #include -#include #include #include #ifdef __linux__ @@ -87,7 +86,7 @@ class PcapInputStream : public visor::InputStream void info_json(json &j) const override; size_t consumer_count() const override { - return packet_signal.slot_count() + udp_signal.slot_count() + start_tstamp_signal.slot_count() + tcp_message_ready_signal.slot_count() + tcp_connection_start_signal.slot_count() + tcp_connection_end_signal.slot_count() + tcp_reassembly_error_signal.slot_count() + pcap_stats_signal.slot_count(); + return policy_signal.slot_count() + packet_signal.slot_count() + udp_signal.slot_count() + start_tstamp_signal.slot_count() + tcp_message_ready_signal.slot_count() + tcp_connection_start_signal.slot_count() + tcp_connection_end_signal.slot_count() + tcp_reassembly_error_signal.slot_count() + pcap_stats_signal.slot_count(); } // utilities diff --git a/src/inputs/sflow/SflowInputStream.h b/src/inputs/sflow/SflowInputStream.h index 7636ace68..f0b584eeb 100644 --- a/src/inputs/sflow/SflowInputStream.h +++ b/src/inputs/sflow/SflowInputStream.h @@ -6,7 +6,6 @@ #include "InputStream.h" #include "SflowData.h" -#include #include #include @@ -39,7 +38,7 @@ class SflowInputStream : public visor::InputStream void info_json(json &j) const override; size_t consumer_count() const override { - return sflow_signal.slot_count(); + return policy_signal.slot_count() + sflow_signal.slot_count(); } // handler functionality diff --git a/src/tests/test_policies.cpp b/src/tests/test_policies.cpp index 940fbfbcf..d4ee4b208 100644 --- a/src/tests/test_policies.cpp +++ b/src/tests/test_policies.cpp @@ -657,7 +657,7 @@ TEST_CASE("Policies", "[policies]") CHECK(policy->modules()[1]->running()); CHECK(policy->modules()[2]->running()); policy->stop(); - CHECK(!policy->input_stream()->running()); + CHECK(policy->input_stream()->running()); CHECK(!policy->modules()[0]->running()); CHECK(!policy->modules()[1]->running()); CHECK(!policy->modules()[2]->running());