From 06879c01645feafe57f4a8dd722820ded592bf5c Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Thu, 3 Mar 2022 17:23:05 -0400 Subject: [PATCH 01/22] Implement ThreadMonitor for linux systems --- src/CoreServer.cpp | 5 +++- src/Policies.h | 7 +++++ src/ThreadMonitor.h | 72 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 1 deletion(-) create mode 100644 src/ThreadMonitor.h diff --git a/src/CoreServer.cpp b/src/CoreServer.cpp index 9b35971cc..8b1c0abaa 100644 --- a/src/CoreServer.cpp +++ b/src/CoreServer.cpp @@ -124,6 +124,7 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) _logger->debug("{} bucket window_json elapsed time: {}", hmod->name(), sw); } } + policy->window_json(j["1m"]); res.set_content(j.dump(), "text/json"); } catch (const std::exception &e) { res.status = 500; @@ -143,15 +144,16 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) try { auto [policy, lock] = _registry->policy_manager()->module_get_locked("default"); uint64_t period(std::stol(req.matches[1])); + auto key = fmt::format("{}m", period); for (auto &mod : policy->modules()) { auto hmod = dynamic_cast(mod); if (hmod) { spdlog::stopwatch sw; - auto key = fmt::format("{}m", period); hmod->window_json(j[key], period, true); _logger->debug("{} window_json {} elapsed time: {}", hmod->name(), period, sw); } } + policy->window_json(j[key]); res.set_content(j.dump(), "text/json"); } catch (const std::exception &e) { res.status = 500; @@ -323,6 +325,7 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) } } } + policy->window_json(j[policy->name()]); _logger->debug("{} policy json metrics elapsed time: {}", policy->name(), psw); } res.set_content(j.dump(), "text/json"); diff --git a/src/Policies.h b/src/Policies.h index 0aa65c241..dd96e67fc 100644 --- a/src/Policies.h +++ b/src/Policies.h @@ -10,6 +10,7 @@ #include "HandlerModulePlugin.h" #include "InputModulePlugin.h" #include "Taps.h" +#include "ThreadMonitor.h" #include #include @@ -66,6 +67,12 @@ class Policy : public AbstractRunnableModule return _modules; } + void window_json(json &j) + { + j["resources"]["cpu"] = ThreadMonitor::cpu_percentage(); + j["resources"]["memory"] = ThreadMonitor::memory_usage(); + } + // life cycle void start() override; void stop() override; diff --git a/src/ThreadMonitor.h b/src/ThreadMonitor.h new file mode 100644 index 000000000..f7a2b1f8e --- /dev/null +++ b/src/ThreadMonitor.h @@ -0,0 +1,72 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +#pragma once + +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) +#elif __APPLE__ +#elif __linux__ +#include +#endif + +#include +#include + +namespace visor { + +class ThreadMonitor +{ +public: + static inline double cpu_percentage() + { +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) + return 0; +#elif __APPLE__ + return 0; +#elif __linux__ + double up_time; + std::ifstream uptime("/proc/uptime"); + uptime >> up_time; + if (up_time <= 0) { + return 0; + } + uint64_t token; + std::vector stats; + std::ifstream stat("/proc/thread-self/stat"); + stat.ignore(' '); + while (stat >> token) { + stats.push_back(token); + } + double total_time = (stats[8] + stats[9] + stats[10] + stats[11]) / sysconf(_SC_CLK_TCK);; + uint32_t seconds = up_time - (stats[16] / sysconf(_SC_CLK_TCK)); + return 100 * (total_time / seconds); +#endif + } + + static inline uint64_t memory_usage() + { +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) + return 0; +#elif __APPLE__ + return 0; +#elif __linux__ + uint64_t memory; + std::string token; + std::ifstream file("/proc/thread-self/status"); + while (file >> token) { + if (token == "VmRSS:") { + if (file >> memory) { + return memory; + } else { + return 0; + } + } + // Ignore the rest of the line + file.ignore(std::numeric_limits::max(), '\n'); + } + return 0; // Nothing found +#endif + } +}; +} From 03215a908a4b915e72b9e7357f23a509ca8c497e Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Mon, 14 Mar 2022 14:57:08 -0400 Subject: [PATCH 02/22] Resources metrics as Handler --- src/CoreServer.cpp | 3 - src/Policies.h | 7 - src/handlers/CMakeLists.txt | 1 + src/handlers/README.md | 1 + src/handlers/resources/CMakeLists.txt | 25 +++ src/handlers/resources/README.md | 7 + src/handlers/resources/ResourcesHandler.conf | 5 + .../ResourcesHandlerModulePlugin.cpp | 31 ++++ .../resources/ResourcesHandlerModulePlugin.h | 24 +++ .../resources/ResourcesStreamHandler.cpp | 157 ++++++++++++++++++ .../resources/ResourcesStreamHandler.h | 101 +++++++++++ src/{ => handlers/resources}/ThreadMonitor.h | 25 +++ src/handlers/static_plugins.h | 1 + 13 files changed, 378 insertions(+), 10 deletions(-) create mode 100644 src/handlers/resources/CMakeLists.txt create mode 100644 src/handlers/resources/README.md create mode 100644 src/handlers/resources/ResourcesHandler.conf create mode 100644 src/handlers/resources/ResourcesHandlerModulePlugin.cpp create mode 100644 src/handlers/resources/ResourcesHandlerModulePlugin.h create mode 100644 src/handlers/resources/ResourcesStreamHandler.cpp create mode 100644 src/handlers/resources/ResourcesStreamHandler.h rename src/{ => handlers/resources}/ThreadMonitor.h (73%) diff --git a/src/CoreServer.cpp b/src/CoreServer.cpp index 8b1c0abaa..b7989e707 100644 --- a/src/CoreServer.cpp +++ b/src/CoreServer.cpp @@ -124,7 +124,6 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) _logger->debug("{} bucket window_json elapsed time: {}", hmod->name(), sw); } } - policy->window_json(j["1m"]); res.set_content(j.dump(), "text/json"); } catch (const std::exception &e) { res.status = 500; @@ -153,7 +152,6 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) _logger->debug("{} window_json {} elapsed time: {}", hmod->name(), period, sw); } } - policy->window_json(j[key]); res.set_content(j.dump(), "text/json"); } catch (const std::exception &e) { res.status = 500; @@ -325,7 +323,6 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) } } } - policy->window_json(j[policy->name()]); _logger->debug("{} policy json metrics elapsed time: {}", policy->name(), psw); } res.set_content(j.dump(), "text/json"); diff --git a/src/Policies.h b/src/Policies.h index dd96e67fc..0aa65c241 100644 --- a/src/Policies.h +++ b/src/Policies.h @@ -10,7 +10,6 @@ #include "HandlerModulePlugin.h" #include "InputModulePlugin.h" #include "Taps.h" -#include "ThreadMonitor.h" #include #include @@ -67,12 +66,6 @@ class Policy : public AbstractRunnableModule return _modules; } - void window_json(json &j) - { - j["resources"]["cpu"] = ThreadMonitor::cpu_percentage(); - j["resources"]["memory"] = ThreadMonitor::memory_usage(); - } - // life cycle void start() override; void stop() override; diff --git a/src/handlers/CMakeLists.txt b/src/handlers/CMakeLists.txt index c5e69c4ba..de3a656ba 100644 --- a/src/handlers/CMakeLists.txt +++ b/src/handlers/CMakeLists.txt @@ -4,5 +4,6 @@ add_subdirectory(dns) add_subdirectory(dhcp) add_subdirectory(pcap) add_subdirectory(mock) +add_subdirectory(resources) set(VISOR_STATIC_PLUGINS ${VISOR_STATIC_PLUGINS} PARENT_SCOPE) diff --git a/src/handlers/README.md b/src/handlers/README.md index 7052384e9..07abe59ad 100644 --- a/src/handlers/README.md +++ b/src/handlers/README.md @@ -10,3 +10,4 @@ See the individual READMEs for more information: * [Mock](mock/) * [Network](net/) * [PCAP](pcap/) +* [Resources](resources/) diff --git a/src/handlers/resources/CMakeLists.txt b/src/handlers/resources/CMakeLists.txt new file mode 100644 index 000000000..52f404df5 --- /dev/null +++ b/src/handlers/resources/CMakeLists.txt @@ -0,0 +1,25 @@ +message(STATUS "Handler Module: Resources") + +set_directory_properties(PROPERTIES CORRADE_USE_PEDANTIC_FLAGS ON) + +corrade_add_static_plugin(VisorHandlerResources + ${CMAKE_CURRENT_BINARY_DIR} + ResourcesHandler.conf + ResourcesHandlerModulePlugin.cpp + ResourcesStreamHandler.cpp) +add_library(Visor::Handler::Resources ALIAS VisorHandlerResources) + +target_include_directories(VisorHandlerResources + INTERFACE + $ + ) + +target_link_libraries(VisorHandlerResources + PUBLIC + Visor::Input::Pcap + Visor::Input::Dnstap + Visor::Input::Mock + Visor::Input::Sflow + ) + +set(VISOR_STATIC_PLUGINS ${VISOR_STATIC_PLUGINS} Visor::Handler::Resources PARENT_SCOPE) \ No newline at end of file diff --git a/src/handlers/resources/README.md b/src/handlers/resources/README.md new file mode 100644 index 000000000..976e7da82 --- /dev/null +++ b/src/handlers/resources/README.md @@ -0,0 +1,7 @@ +# Application Resources Metrics Stream Handler + +This directory contains the Resources stream handler + +It can attach to input streams and provide system thread information + +[ResourcesStreamHandler.h](ResourcesStreamHandler.h) contains the list of metrics. diff --git a/src/handlers/resources/ResourcesHandler.conf b/src/handlers/resources/ResourcesHandler.conf new file mode 100644 index 000000000..432eb1c78 --- /dev/null +++ b/src/handlers/resources/ResourcesHandler.conf @@ -0,0 +1,5 @@ +# Aliases +provides=resources +[data] +desc=Resources analyzer +type=handler diff --git a/src/handlers/resources/ResourcesHandlerModulePlugin.cpp b/src/handlers/resources/ResourcesHandlerModulePlugin.cpp new file mode 100644 index 000000000..2b442b9e6 --- /dev/null +++ b/src/handlers/resources/ResourcesHandlerModulePlugin.cpp @@ -0,0 +1,31 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +#include "ResourcesHandlerModulePlugin.h" +#include "CoreRegistry.h" +#include "ResourcesStreamHandler.h" +#include "HandlerManager.h" +#include "InputStreamManager.h" +#include +#include + +CORRADE_PLUGIN_REGISTER(VisorHandlerResources, visor::handler::resources::ResourcesHandlerModulePlugin, + "visor.module.handler/1.0") + +namespace visor::handler::resources { + +using json = nlohmann::json; + +void ResourcesHandlerModulePlugin::setup_routes(HttpServer *svr) +{ +} +std::unique_ptr ResourcesHandlerModulePlugin::instantiate(const std::string &name, InputStream *input_stream, const Configurable *config, StreamHandler *stream_handler) +{ + // TODO using config as both window config and module config + auto handler_module = std::make_unique(name, input_stream, config, stream_handler); + handler_module->config_merge(*config); + return handler_module; +} + +} \ No newline at end of file diff --git a/src/handlers/resources/ResourcesHandlerModulePlugin.h b/src/handlers/resources/ResourcesHandlerModulePlugin.h new file mode 100644 index 000000000..d9abe2a5a --- /dev/null +++ b/src/handlers/resources/ResourcesHandlerModulePlugin.h @@ -0,0 +1,24 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +#pragma once + +#include "HandlerModulePlugin.h" + +namespace visor::handler::resources { + +class ResourcesHandlerModulePlugin : public HandlerModulePlugin +{ + +protected: + void setup_routes(HttpServer *svr) override; + +public: + explicit ResourcesHandlerModulePlugin(Corrade::PluginManager::AbstractManager &manager, const std::string &plugin) + : visor::HandlerModulePlugin{manager, plugin} + { + } + std::unique_ptr instantiate(const std::string &name, InputStream *input_stream, const Configurable *config, StreamHandler *stream_handler = nullptr) override; +}; +} diff --git a/src/handlers/resources/ResourcesStreamHandler.cpp b/src/handlers/resources/ResourcesStreamHandler.cpp new file mode 100644 index 000000000..ff7e4183b --- /dev/null +++ b/src/handlers/resources/ResourcesStreamHandler.cpp @@ -0,0 +1,157 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +#include "ResourcesStreamHandler.h" +#include "ThreadMonitor.h" + +namespace visor::handler::resources { + +ResourcesStreamHandler::ResourcesStreamHandler(const std::string &name, InputStream *stream, const Configurable *window_config, StreamHandler *handler) + : visor::StreamMetricsHandler(name, window_config) + , _timer(time(NULL)) +{ + if (handler) { + throw StreamHandlerException(fmt::format("ResourcesStreamHandler: unsupported upstream chained stream handler {}", handler->name())); + } + + assert(stream); + // figure out which input stream we have + if (stream) { + _pcap_stream = dynamic_cast(stream); + _dnstap_stream = dynamic_cast(stream); + _mock_stream = dynamic_cast(stream); + _sflow_stream = dynamic_cast(stream); + if (!_pcap_stream && !_mock_stream && !_dnstap_stream && !_sflow_stream) { + throw StreamHandlerException(fmt::format("NetStreamHandler: unsupported input stream {}", stream->name())); + } + } +} + +void ResourcesStreamHandler::start() +{ + if (_running) { + return; + } + + if (config_exists("recorded_stream")) { + _metrics->set_recorded_stream(); + } + + if (_pcap_stream) { + _pkt_connection = _pcap_stream->packet_signal.connect(&ResourcesStreamHandler::process_packet_cb, this); + } else if (_dnstap_stream) { + _dnstap_connection = _dnstap_stream->dnstap_signal.connect(&ResourcesStreamHandler::process_dnstap_cb, this); + } else if (_sflow_stream) { + _sflow_connection = _sflow_stream->sflow_signal.connect(&ResourcesStreamHandler::process_sflow_cb, this); + } + + _running = true; +} + +void ResourcesStreamHandler::stop() +{ + if (!_running) { + return; + } + + if (_pcap_stream) { + _pkt_connection.disconnect(); + } else if (_dnstap_stream) { + _dnstap_connection.disconnect(); + } else if (_sflow_stream) { + _sflow_connection.disconnect(); + } + + _running = false; +} + +void ResourcesStreamHandler::process_sflow_cb([[maybe_unused]] const SFSample &) +{ + if (difftime(time(NULL), _timer) >= MEASURE_INTERVAL) { + _timer = time(NULL); + _metrics->process_resources(); + } +} + +void ResourcesStreamHandler::process_dnstap_cb([[maybe_unused]] const dnstap::Dnstap &) +{ + if (difftime(time(NULL), _timer) >= MEASURE_INTERVAL) { + _timer = time(NULL); + _metrics->process_resources(); + } +} + +void ResourcesStreamHandler::process_packet_cb([[maybe_unused]] pcpp::Packet &payload, [[maybe_unused]] PacketDirection dir, [[maybe_unused]] pcpp::ProtocolType l3, [[maybe_unused]] pcpp::ProtocolType l4, [[maybe_unused]] timespec stamp) +{ + if (difftime(time(NULL), _timer) >= MEASURE_INTERVAL) { + _timer = time(NULL); + _metrics->process_resources(); + } +} + +void ResourcesMetricsBucket::specialized_merge(const AbstractMetricsBucket &o) +{ + // static because caller guarantees only our own bucket type + const auto &other = static_cast(o); + + std::shared_lock r_lock(other._mutex); + std::unique_lock w_lock(_mutex); + + _cpu_percentage.merge(other._cpu_percentage); + _memory_usage_kb.merge(other._memory_usage_kb); +} + +void ResourcesMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelMap add_labels) const +{ + { + auto [num_events, num_samples, event_rate, event_lock] = event_data_locked(); // thread safe + + event_rate->to_prometheus(out, add_labels); + num_events->to_prometheus(out, add_labels); + num_samples->to_prometheus(out, add_labels); + } + + std::shared_lock r_lock(_mutex); + + _cpu_percentage.to_prometheus(out, add_labels); + _memory_usage_kb.to_prometheus(out, add_labels); +} + +void ResourcesMetricsBucket::to_json(json &j) const +{ + bool live_rates = !read_only() && !recorded_stream(); + + { + auto [num_events, num_samples, event_rate, event_lock] = event_data_locked(); // thread safe + + event_rate->to_json(j, live_rates); + num_events->to_json(j); + num_samples->to_json(j); + } + + std::shared_lock r_lock(_mutex); + + _cpu_percentage.to_json(j); + _memory_usage_kb.to_json(j); +} + +void ResourcesMetricsBucket::process_resources() +{ + std::unique_lock lock(_mutex); + + _cpu_percentage.update(ThreadMonitor::cpu_percentage()); + _memory_usage_kb.update(ThreadMonitor::memory_usage()); +} + +void ResourcesMetricsManager::process_resources() +{ + timespec stamp; + // use now() + std::timespec_get(&stamp, TIME_UTC); + // base event + new_event(stamp); + // process in the "live" bucket. this will parse the resources if we are deep sampling + live_bucket()->process_resources(); +} +} \ No newline at end of file diff --git a/src/handlers/resources/ResourcesStreamHandler.h b/src/handlers/resources/ResourcesStreamHandler.h new file mode 100644 index 000000000..ad1996f48 --- /dev/null +++ b/src/handlers/resources/ResourcesStreamHandler.h @@ -0,0 +1,101 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +#pragma once + +#include "AbstractMetricsManager.h" +#include "PcapInputStream.h" +#include "DnstapInputStream.h" +#include "MockInputStream.h" +#include "SflowInputStream.h" +#include "StreamHandler.h" +#include +#include +#include + +namespace visor::handler::resources { + +using namespace visor::input::pcap; +using namespace visor::input::dnstap; +using namespace visor::input::mock; +using namespace visor::input::sflow; + +constexpr double MEASURE_INTERVAL = 10; //in seconds + +class ResourcesMetricsBucket final : public visor::AbstractMetricsBucket +{ + +protected: + mutable std::shared_mutex _mutex; + + // total numPackets is tracked in base class num_events + Quantile _cpu_percentage; + Quantile _memory_usage_kb; + uint32_t thread_id; + +public: + ResourcesMetricsBucket() + : _cpu_percentage("resources", {"xact", "out", "quantiles_us"}, "Quantiles of thread cpu usage") + , _memory_usage_kb("resources", {"xact", "in", "quantiles_us"}, "Quantiles of thead memory usage in kilobytes") + { + } + + // visor::AbstractMetricsBucket + void specialized_merge(const AbstractMetricsBucket &other) override; + void to_json(json &j) const override; + void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const override; + + void process_resources(); + +}; + +class ResourcesMetricsManager final : public visor::AbstractMetricsManager +{ +public: + ResourcesMetricsManager(const Configurable *window_config) + : visor::AbstractMetricsManager(window_config) + { + } + + void process_resources(); +}; + +class ResourcesStreamHandler final : public visor::StreamMetricsHandler +{ + + // the input stream sources we support (only one will be in use at a time) + time_t _timer; + PcapInputStream *_pcap_stream{nullptr}; + DnstapInputStream *_dnstap_stream{nullptr}; + MockInputStream *_mock_stream{nullptr}; + SflowInputStream *_sflow_stream{nullptr}; + + sigslot::connection _dnstap_connection; + sigslot::connection _sflow_connection; + sigslot::connection _pkt_connection; + + void process_sflow_cb(const SFSample &); + void process_dnstap_cb(const dnstap::Dnstap &); + void process_packet_cb(pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4, timespec stamp); + +public: + ResourcesStreamHandler(const std::string &name, InputStream *stream, const Configurable *window_config, StreamHandler *handler = nullptr); + ~ResourcesStreamHandler() = default; + + // visor::AbstractModule + std::string schema_key() const override + { + return "resources"; + } + + size_t consumer_count() const override + { + return 0; + } + + void start() override; + void stop() override; +}; + +} diff --git a/src/ThreadMonitor.h b/src/handlers/resources/ThreadMonitor.h similarity index 73% rename from src/ThreadMonitor.h rename to src/handlers/resources/ThreadMonitor.h index f7a2b1f8e..6d1eb0057 100644 --- a/src/ThreadMonitor.h +++ b/src/handlers/resources/ThreadMonitor.h @@ -18,6 +18,31 @@ namespace visor { class ThreadMonitor { public: + static inline uint32_t thread_id() + { +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) + return 0; +#elif __APPLE__ + return 0; +#elif __linux__ + uint32_t thread_id; + std::string token; + std::ifstream file("/proc/thread-self/status"); + while (file >> token) { + if (token == "Tgid:") { + if (file >> thread_id) { + return thread_id; + } else { + return 0; + } + } + // Ignore the rest of the line + file.ignore(std::numeric_limits::max(), '\n'); + } + return 0; // Nothing found +#endif + } + static inline double cpu_percentage() { #if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) diff --git a/src/handlers/static_plugins.h b/src/handlers/static_plugins.h index b193c3503..62b37f3ee 100644 --- a/src/handlers/static_plugins.h +++ b/src/handlers/static_plugins.h @@ -5,6 +5,7 @@ int import_handler_plugins() CORRADE_PLUGIN_IMPORT(VisorHandlerDns); CORRADE_PLUGIN_IMPORT(VisorHandlerDhcp); CORRADE_PLUGIN_IMPORT(VisorHandlerPcap); + CORRADE_PLUGIN_IMPORT(VisorHandlerResources); return 0; } From fd99353248c05f82881598cc256b7f9a81ed3f32 Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Thu, 17 Mar 2022 14:30:21 -0400 Subject: [PATCH 03/22] Implement cpu percentage based on htop program --- .../resources/ResourcesStreamHandler.cpp | 5 +- .../resources/ResourcesStreamHandler.h | 14 ++-- src/handlers/resources/ThreadMonitor.h | 78 +++++++++---------- 3 files changed, 48 insertions(+), 49 deletions(-) diff --git a/src/handlers/resources/ResourcesStreamHandler.cpp b/src/handlers/resources/ResourcesStreamHandler.cpp index ff7e4183b..e296dae6f 100644 --- a/src/handlers/resources/ResourcesStreamHandler.cpp +++ b/src/handlers/resources/ResourcesStreamHandler.cpp @@ -3,7 +3,6 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ #include "ResourcesStreamHandler.h" -#include "ThreadMonitor.h" namespace visor::handler::resources { @@ -140,8 +139,8 @@ void ResourcesMetricsBucket::process_resources() { std::unique_lock lock(_mutex); - _cpu_percentage.update(ThreadMonitor::cpu_percentage()); - _memory_usage_kb.update(ThreadMonitor::memory_usage()); + _cpu_percentage.update(_monitor.cpu_percentage()); + _memory_usage_kb.update(_monitor.memory_usage()); } void ResourcesMetricsManager::process_resources() diff --git a/src/handlers/resources/ResourcesStreamHandler.h b/src/handlers/resources/ResourcesStreamHandler.h index ad1996f48..83b52dfd3 100644 --- a/src/handlers/resources/ResourcesStreamHandler.h +++ b/src/handlers/resources/ResourcesStreamHandler.h @@ -5,11 +5,12 @@ #pragma once #include "AbstractMetricsManager.h" -#include "PcapInputStream.h" #include "DnstapInputStream.h" #include "MockInputStream.h" +#include "PcapInputStream.h" #include "SflowInputStream.h" #include "StreamHandler.h" +#include "ThreadMonitor.h" #include #include #include @@ -21,7 +22,7 @@ using namespace visor::input::dnstap; using namespace visor::input::mock; using namespace visor::input::sflow; -constexpr double MEASURE_INTERVAL = 10; //in seconds +constexpr double MEASURE_INTERVAL = 10; // in seconds class ResourcesMetricsBucket final : public visor::AbstractMetricsBucket { @@ -30,14 +31,16 @@ class ResourcesMetricsBucket final : public visor::AbstractMetricsBucket mutable std::shared_mutex _mutex; // total numPackets is tracked in base class num_events - Quantile _cpu_percentage; + Quantile _cpu_percentage; Quantile _memory_usage_kb; uint32_t thread_id; + ThreadMonitor _monitor; + public: ResourcesMetricsBucket() - : _cpu_percentage("resources", {"xact", "out", "quantiles_us"}, "Quantiles of thread cpu usage") - , _memory_usage_kb("resources", {"xact", "in", "quantiles_us"}, "Quantiles of thead memory usage in kilobytes") + : _cpu_percentage("resources", {"cpu_percentage"}, "Quantiles of thread cpu usage") + , _memory_usage_kb("resources", {"memory_bytes"}, "Quantiles of thead memory usage in bytes") { } @@ -47,7 +50,6 @@ class ResourcesMetricsBucket final : public visor::AbstractMetricsBucket void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const override; void process_resources(); - }; class ResourcesMetricsManager final : public visor::AbstractMetricsManager diff --git a/src/handlers/resources/ThreadMonitor.h b/src/handlers/resources/ThreadMonitor.h index 6d1eb0057..2a489103a 100644 --- a/src/handlers/resources/ThreadMonitor.h +++ b/src/handlers/resources/ThreadMonitor.h @@ -4,72 +4,70 @@ #pragma once +#include +#include + #if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) #elif __APPLE__ #elif __linux__ #include #endif -#include -#include - namespace visor { class ThreadMonitor { -public: - static inline uint32_t thread_id() - { #if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) - return 0; #elif __APPLE__ - return 0; #elif __linux__ - uint32_t thread_id; - std::string token; - std::ifstream file("/proc/thread-self/status"); - while (file >> token) { - if (token == "Tgid:") { - if (file >> thread_id) { - return thread_id; - } else { - return 0; - } - } - // Ignore the rest of the line - file.ignore(std::numeric_limits::max(), '\n'); - } - return 0; // Nothing found + uint64_t _last_system_time = 0; + uint64_t _last_thread_time = 0; #endif - } +public: + ThreadMonitor() = default; + ~ThreadMonitor() = default; - static inline double cpu_percentage() + inline double cpu_percentage() { #if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) return 0; #elif __APPLE__ return 0; #elif __linux__ - double up_time; - std::ifstream uptime("/proc/uptime"); - uptime >> up_time; - if (up_time <= 0) { - return 0; + uint64_t stat; + + std::ifstream system_stat("/proc/stat"); + std::string line; + std::getline(system_stat, line); + std::stringstream cpu_times(line.erase(0, 5)); + uint64_t system_total_time = 0; + while (cpu_times >> stat) { + system_total_time += stat; } - uint64_t token; + system_total_time = system_total_time / sysconf(_SC_NPROCESSORS_ONLN); + std::vector stats; - std::ifstream stat("/proc/thread-self/stat"); - stat.ignore(' '); - while (stat >> token) { - stats.push_back(token); + std::ifstream thread_stat("/proc/thread-self/stat"); + thread_stat.ignore(' '); + while (thread_stat >> stat) { + stats.push_back(stat); + } + uint64_t thread_total_time = (stats[8] + stats[9]); + + uint64_t current_thread_time = thread_total_time - _last_thread_time; + _last_thread_time = thread_total_time; + double current_period_time = system_total_time - _last_system_time; + _last_system_time = system_total_time; + + double cpu_usage = (current_thread_time / current_period_time) * 100.0; + if (cpu_usage < 0.0) { + cpu_usage = 0.0; } - double total_time = (stats[8] + stats[9] + stats[10] + stats[11]) / sysconf(_SC_CLK_TCK);; - uint32_t seconds = up_time - (stats[16] / sysconf(_SC_CLK_TCK)); - return 100 * (total_time / seconds); + return cpu_usage; #endif } - static inline uint64_t memory_usage() + inline uint64_t memory_usage() { #if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) return 0; @@ -82,7 +80,7 @@ class ThreadMonitor while (file >> token) { if (token == "VmRSS:") { if (file >> memory) { - return memory; + return memory * 1024; } else { return 0; } From dd3fd337cfd48043ec6a39a95068b24e480c1c9c Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Thu, 17 Mar 2022 16:16:51 -0400 Subject: [PATCH 04/22] reuse pcap timestamp to check timediff --- src/handlers/resources/ResourcesStreamHandler.cpp | 6 ++++-- src/handlers/resources/ResourcesStreamHandler.h | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/handlers/resources/ResourcesStreamHandler.cpp b/src/handlers/resources/ResourcesStreamHandler.cpp index e296dae6f..e1ca37439 100644 --- a/src/handlers/resources/ResourcesStreamHandler.cpp +++ b/src/handlers/resources/ResourcesStreamHandler.cpp @@ -10,6 +10,8 @@ ResourcesStreamHandler::ResourcesStreamHandler(const std::string &name, InputStr : visor::StreamMetricsHandler(name, window_config) , _timer(time(NULL)) { + std::timespec_get(&_timestamp, TIME_UTC); + if (handler) { throw StreamHandlerException(fmt::format("ResourcesStreamHandler: unsupported upstream chained stream handler {}", handler->name())); } @@ -83,8 +85,8 @@ void ResourcesStreamHandler::process_dnstap_cb([[maybe_unused]] const dnstap::Dn void ResourcesStreamHandler::process_packet_cb([[maybe_unused]] pcpp::Packet &payload, [[maybe_unused]] PacketDirection dir, [[maybe_unused]] pcpp::ProtocolType l3, [[maybe_unused]] pcpp::ProtocolType l4, [[maybe_unused]] timespec stamp) { - if (difftime(time(NULL), _timer) >= MEASURE_INTERVAL) { - _timer = time(NULL); + if (stamp.tv_sec >= MEASURE_INTERVAL + _timestamp.tv_sec) { + _timestamp = stamp; _metrics->process_resources(); } } diff --git a/src/handlers/resources/ResourcesStreamHandler.h b/src/handlers/resources/ResourcesStreamHandler.h index 83b52dfd3..6666edcb4 100644 --- a/src/handlers/resources/ResourcesStreamHandler.h +++ b/src/handlers/resources/ResourcesStreamHandler.h @@ -68,6 +68,7 @@ class ResourcesStreamHandler final : public visor::StreamMetricsHandler Date: Thu, 17 Mar 2022 17:54:19 -0400 Subject: [PATCH 05/22] Add Resources Handler to every new input stream --- src/CoreServer.cpp | 5 +++++ src/InputStream.h | 11 +++++++++++ src/Policies.cpp | 15 ++++++++++++++- 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/src/CoreServer.cpp b/src/CoreServer.cpp index b7989e707..944709a63 100644 --- a/src/CoreServer.cpp +++ b/src/CoreServer.cpp @@ -4,6 +4,7 @@ #include "CoreServer.h" #include "HandlerManager.h" +#include "InputStream.h" #include "Metrics.h" #include "Policies.h" #include "Taps.h" @@ -323,6 +324,8 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) } } } + auto resources_handler = policy->input_stream()->resources_handler(); + resources_handler->window_json(j[policy->name()][resources_handler->name()], period, req.matches[2] == "window"); _logger->debug("{} policy json metrics elapsed time: {}", policy->name(), psw); } res.set_content(j.dump(), "text/json"); @@ -363,6 +366,8 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) _logger->debug("{} window_prometheus elapsed time: {}", hmod->name(), sw); } } + auto resources_handler = policy->input_stream()->resources_handler(); + resources_handler->window_prometheus(output, {{"policy", p_mname}, {"module", resources_handler->name()}}); } catch (const std::exception &e) { res.status = 500; res.set_content(e.what(), "text/plain"); diff --git a/src/InputStream.h b/src/InputStream.h index bf6cdf50b..aa158cab7 100644 --- a/src/InputStream.h +++ b/src/InputStream.h @@ -13,6 +13,7 @@ class InputStream : public AbstractRunnableModule { mutable std::shared_mutex _input_mutex; std::vector _policies; + StreamHandler *_resources_handler; public: InputStream(const std::string &name) @@ -40,6 +41,16 @@ class InputStream : public AbstractRunnableModule return _policies.size(); } + void set_resources_handler(StreamHandler *resources_handler) + { + _resources_handler = resources_handler; + } + + StreamHandler *resources_handler() const + { + return _resources_handler; + } + virtual size_t consumer_count() const = 0; void common_info_json(json &j) const diff --git a/src/Policies.cpp b/src/Policies.cpp index 44a652743..c4f19b83d 100644 --- a/src/Policies.cpp +++ b/src/Policies.cpp @@ -174,6 +174,14 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) window_config.config_set("deep_sample_rate", _default_deep_sample_rate); } + std::unique_ptr resources_module; + if (input_stream) { + //create resources handler for input stream + auto resources_handler_plugin = _registry->handler_plugins().find("resources"); + resources_module = resources_handler_plugin->second->instantiate(input_stream_module_name + "-resources", input_ptr, &window_config); + input_stream->set_resources_handler(resources_module.get()); + } + std::vector> handler_modules; for (YAML::const_iterator h_it = handler_node["modules"].begin(); h_it != handler_node["modules"].end(); ++h_it) { @@ -282,8 +290,9 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) throw PolicyException(fmt::format("policy [{}] creation failed (policy): {}", policy_name, e.what())); } try { - if (input_stream) { + if (input_stream && resources_module) { _registry->input_manager()->module_add(std::move(input_stream)); + _registry->handler_manager()->module_add(std::move(resources_module)); } } catch (ModuleException &e) { // note that if this call excepts, we are in an unknown state and the exception will propagate @@ -326,6 +335,7 @@ void PolicyManager::remove_policy(const std::string &name) auto policy = _map[name].get(); auto input_name = policy->input_stream()->name(); + auto resource_handler_name = policy->input_stream()->resources_handler()->name(); std::vector module_names; for (const auto &mod : policy->modules()) { module_names.push_back(mod->name()); @@ -338,6 +348,7 @@ void PolicyManager::remove_policy(const std::string &name) if (!policy->input_stream()->policies_count()) { _registry->input_manager()->module_remove(input_name); + _registry->handler_manager()->module_remove(resource_handler_name); } _map.erase(name); @@ -365,6 +376,7 @@ void Policy::start() // from handlers in the same thread we are starting the policy from spdlog::get("visor")->info("policy [{}]: starting input instance: {}", _name, _input_stream->name()); _input_stream->start(); + _input_stream->resources_handler()->start(); _running = true; } void Policy::stop() @@ -377,6 +389,7 @@ void Policy::stop() if (_input_stream->policies_count() <= 1) { spdlog::get("visor")->info("policy [{}]: stopping input instance: {}", _name, _input_stream->name()); _input_stream->stop(); + _input_stream->resources_handler()->stop(); } else { spdlog::get("visor")->info("policy [{}]: input instance {} not stopped because it is in use by another policy.", _name, _input_stream->name()); } From 034e461889f9d69fbebbca4952a21f58e5742c24 Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Fri, 18 Mar 2022 15:32:15 -0400 Subject: [PATCH 06/22] add unit tests for resources handler --- src/handlers/resources/CMakeLists.txt | 4 +- .../resources/ResourcesStreamHandler.cpp | 16 ++-- .../resources/ResourcesStreamHandler.h | 2 +- src/handlers/resources/tests/CMakeLists.txt | 16 ++++ src/handlers/resources/tests/main.cpp | 24 +++++ .../resources/tests/test_resources_layer.cpp | 87 +++++++++++++++++++ 6 files changed, 139 insertions(+), 10 deletions(-) create mode 100644 src/handlers/resources/tests/CMakeLists.txt create mode 100644 src/handlers/resources/tests/main.cpp create mode 100644 src/handlers/resources/tests/test_resources_layer.cpp diff --git a/src/handlers/resources/CMakeLists.txt b/src/handlers/resources/CMakeLists.txt index 52f404df5..cea8b7188 100644 --- a/src/handlers/resources/CMakeLists.txt +++ b/src/handlers/resources/CMakeLists.txt @@ -22,4 +22,6 @@ target_link_libraries(VisorHandlerResources Visor::Input::Sflow ) -set(VISOR_STATIC_PLUGINS ${VISOR_STATIC_PLUGINS} Visor::Handler::Resources PARENT_SCOPE) \ No newline at end of file +set(VISOR_STATIC_PLUGINS ${VISOR_STATIC_PLUGINS} Visor::Handler::Resources PARENT_SCOPE) + +add_subdirectory(tests) \ No newline at end of file diff --git a/src/handlers/resources/ResourcesStreamHandler.cpp b/src/handlers/resources/ResourcesStreamHandler.cpp index e1ca37439..10d97bd1c 100644 --- a/src/handlers/resources/ResourcesStreamHandler.cpp +++ b/src/handlers/resources/ResourcesStreamHandler.cpp @@ -8,10 +8,9 @@ namespace visor::handler::resources { ResourcesStreamHandler::ResourcesStreamHandler(const std::string &name, InputStream *stream, const Configurable *window_config, StreamHandler *handler) : visor::StreamMetricsHandler(name, window_config) - , _timer(time(NULL)) + , _timer(0) + , _timestamp(timespec()) { - std::timespec_get(&_timestamp, TIME_UTC); - if (handler) { throw StreamHandlerException(fmt::format("ResourcesStreamHandler: unsupported upstream chained stream handler {}", handler->name())); } @@ -87,7 +86,7 @@ void ResourcesStreamHandler::process_packet_cb([[maybe_unused]] pcpp::Packet &pa { if (stamp.tv_sec >= MEASURE_INTERVAL + _timestamp.tv_sec) { _timestamp = stamp; - _metrics->process_resources(); + _metrics->process_resources(stamp); } } @@ -145,11 +144,12 @@ void ResourcesMetricsBucket::process_resources() _memory_usage_kb.update(_monitor.memory_usage()); } -void ResourcesMetricsManager::process_resources() +void ResourcesMetricsManager::process_resources(timespec stamp) { - timespec stamp; - // use now() - std::timespec_get(&stamp, TIME_UTC); + if(stamp.tv_sec == 0) { + // use now() + std::timespec_get(&stamp, TIME_UTC); + } // base event new_event(stamp); // process in the "live" bucket. this will parse the resources if we are deep sampling diff --git a/src/handlers/resources/ResourcesStreamHandler.h b/src/handlers/resources/ResourcesStreamHandler.h index 6666edcb4..beef4de92 100644 --- a/src/handlers/resources/ResourcesStreamHandler.h +++ b/src/handlers/resources/ResourcesStreamHandler.h @@ -60,7 +60,7 @@ class ResourcesMetricsManager final : public visor::AbstractMetricsManager diff --git a/src/handlers/resources/tests/CMakeLists.txt b/src/handlers/resources/tests/CMakeLists.txt new file mode 100644 index 000000000..7585d130d --- /dev/null +++ b/src/handlers/resources/tests/CMakeLists.txt @@ -0,0 +1,16 @@ + +## TEST SUITE +add_executable(unit-tests-handler-resources + main.cpp + test_resources_layer.cpp + ) + +target_link_libraries(unit-tests-handler-resources + PRIVATE + ${CONAN_LIBS_JSON-SCHEMA-VALIDATOR} + Visor::Handler::Resources) + +add_test(NAME unit-tests-handler-resources + WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/src + COMMAND unit-tests-handler-resources + ) diff --git a/src/handlers/resources/tests/main.cpp b/src/handlers/resources/tests/main.cpp new file mode 100644 index 000000000..3b6e7e5b3 --- /dev/null +++ b/src/handlers/resources/tests/main.cpp @@ -0,0 +1,24 @@ +#define CATCH_CONFIG_RUNNER +#include +#include +#include +#include + +int main(int argc, char *argv[]) +{ + Catch::Session session; + + auto logger = spdlog::get("visor"); + if (!logger) { + spdlog::stderr_color_mt("visor"); + } + + int result = session.applyCommandLine(argc, argv); + if (result != 0) { + return result; + } + + result = session.run(); + + return (result == 0 ? EXIT_SUCCESS : EXIT_FAILURE); +} diff --git a/src/handlers/resources/tests/test_resources_layer.cpp b/src/handlers/resources/tests/test_resources_layer.cpp new file mode 100644 index 000000000..6b5584e50 --- /dev/null +++ b/src/handlers/resources/tests/test_resources_layer.cpp @@ -0,0 +1,87 @@ +#include + +#include "DnstapInputStream.h" +#include "PcapInputStream.h" +#include "ResourcesStreamHandler.h" +#include "SflowInputStream.h" + +using namespace visor::handler::resources; + +TEST_CASE("Check resources for pcap input", "[pcap][resources]") +{ + PcapInputStream stream{"pcap-test"}; + stream.config_set("pcap_file", "tests/fixtures/dns_ipv4_udp.pcap"); + stream.config_set("bpf", std::string()); + + visor::Config c; + c.config_set("num_periods", 1); + ResourcesStreamHandler resources_handler{"net-test", &stream, &c}; + + resources_handler.start(); + stream.start(); + resources_handler.stop(); + stream.stop(); + + auto event_data = resources_handler.metrics()->bucket(0)->event_data_locked(); + + CHECK(resources_handler.metrics()->current_periods() == 1); + CHECK(event_data.num_events->value() == 1); + + nlohmann::json j; + resources_handler.metrics()->bucket(0)->to_json(j); + + CHECK(j["cpu_percentage"]["p50"] == 0.0); + CHECK(j["memory_bytes"]["p50"] != nullptr); +} + +TEST_CASE("Check resources for dnstap input", "[dnstap][resources]") +{ + DnstapInputStream stream{"dnstap-test"}; + stream.config_set("dnstap_file", "inputs/dnstap/tests/fixtures/fixture.dnstap"); + stream.config_set("only_hosts", {"192.168.0.0/24", "2001:db8::/48"}); + visor::Config c; + c.config_set("num_periods", 1); + ResourcesStreamHandler resources_handler{"net-test", &stream, &c}; + + resources_handler.start(); + stream.start(); + stream.stop(); + resources_handler.stop(); + + auto event_data = resources_handler.metrics()->bucket(0)->event_data_locked(); + + CHECK(resources_handler.metrics()->current_periods() == 1); + CHECK(event_data.num_events->value() == 1); + + nlohmann::json j; + resources_handler.metrics()->bucket(0)->to_json(j); + + CHECK(j["cpu_percentage"]["p50"] == 0.0); + CHECK(j["memory_bytes"]["p50"] != nullptr); +} + +TEST_CASE("Check resources for sflow input", "[sflow][resources]") +{ + SflowInputStream stream{"sflow-test"}; + stream.config_set("pcap_file", "tests/fixtures/ecmp.pcap"); + + visor::Config c; + c.config_set("num_periods", 1); + ResourcesStreamHandler resources_handler{"net-test", &stream, &c}; + + resources_handler.start(); + stream.start(); + stream.stop(); + resources_handler.stop(); + + auto event_data = resources_handler.metrics()->bucket(0)->event_data_locked(); + + CHECK(resources_handler.metrics()->current_periods() == 1); + CHECK(event_data.num_events->value() == 1); + + nlohmann::json j; + resources_handler.metrics()->bucket(0)->to_json(j); + + CHECK(j["cpu_percentage"]["p50"] == 0.0); + CHECK(j["memory_bytes"]["p50"] != nullptr); +} \ No newline at end of file From 17e3f349dbdd1e2109ec9353504b8a6bc26d64b6 Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Fri, 18 Mar 2022 18:08:17 -0400 Subject: [PATCH 07/22] add coverage to prometheus method --- src/handlers/resources/tests/test_resources_layer.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/handlers/resources/tests/test_resources_layer.cpp b/src/handlers/resources/tests/test_resources_layer.cpp index 6b5584e50..9ab738d94 100644 --- a/src/handlers/resources/tests/test_resources_layer.cpp +++ b/src/handlers/resources/tests/test_resources_layer.cpp @@ -32,6 +32,16 @@ TEST_CASE("Check resources for pcap input", "[pcap][resources]") CHECK(j["cpu_percentage"]["p50"] == 0.0); CHECK(j["memory_bytes"]["p50"] != nullptr); + + std::stringstream output; + std::string line; + resources_handler.metrics()->bucket(0)->to_prometheus(output, {{"policy", "default"}}); + std::getline(output, line); + CHECK(line == "# HELP base_total Total number of events"); + std::getline(output, line); + CHECK(line == "# TYPE base_total gauge"); + std::getline(output, line); + CHECK(line == R"(base_total{policy="default"} 1)"); } TEST_CASE("Check resources for dnstap input", "[dnstap][resources]") From b4a68a499536bea1d9a23b3cddb24d4b4b80f3db Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Mon, 21 Mar 2022 17:46:44 -0400 Subject: [PATCH 08/22] rename resources handler to input resources handler --- src/Policies.cpp | 2 +- src/handlers/CMakeLists.txt | 2 +- src/handlers/input_resources/CMakeLists.txt | 28 +++++++++++++++ .../InputResourcesHandler.conf | 5 +++ .../InputResourcesHandlerModulePlugin.cpp} | 12 +++---- .../InputResourcesHandlerModulePlugin.h} | 4 +-- .../InputResourcesStreamHandler.cpp} | 34 +++++++++---------- .../InputResourcesStreamHandler.h} | 16 ++++----- src/handlers/input_resources/README.md | 7 ++++ .../ThreadMonitor.h | 0 .../input_resources/tests/CMakeLists.txt | 16 +++++++++ .../tests/main.cpp | 0 .../tests/test_resources_layer.cpp | 8 ++--- src/handlers/resources/CMakeLists.txt | 27 --------------- src/handlers/resources/README.md | 7 ---- src/handlers/resources/ResourcesHandler.conf | 5 --- src/handlers/resources/tests/CMakeLists.txt | 16 --------- src/handlers/static_plugins.h | 2 +- 18 files changed, 96 insertions(+), 95 deletions(-) create mode 100644 src/handlers/input_resources/CMakeLists.txt create mode 100644 src/handlers/input_resources/InputResourcesHandler.conf rename src/handlers/{resources/ResourcesHandlerModulePlugin.cpp => input_resources/InputResourcesHandlerModulePlugin.cpp} (51%) rename src/handlers/{resources/ResourcesHandlerModulePlugin.h => input_resources/InputResourcesHandlerModulePlugin.h} (76%) rename src/handlers/{resources/ResourcesStreamHandler.cpp => input_resources/InputResourcesStreamHandler.cpp} (71%) rename src/handlers/{resources/ResourcesStreamHandler.h => input_resources/InputResourcesStreamHandler.h} (78%) create mode 100644 src/handlers/input_resources/README.md rename src/handlers/{resources => input_resources}/ThreadMonitor.h (100%) create mode 100644 src/handlers/input_resources/tests/CMakeLists.txt rename src/handlers/{resources => input_resources}/tests/main.cpp (100%) rename src/handlers/{resources => input_resources}/tests/test_resources_layer.cpp (91%) delete mode 100644 src/handlers/resources/CMakeLists.txt delete mode 100644 src/handlers/resources/README.md delete mode 100644 src/handlers/resources/ResourcesHandler.conf delete mode 100644 src/handlers/resources/tests/CMakeLists.txt diff --git a/src/Policies.cpp b/src/Policies.cpp index c4f19b83d..d5269c05f 100644 --- a/src/Policies.cpp +++ b/src/Policies.cpp @@ -177,7 +177,7 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) std::unique_ptr resources_module; if (input_stream) { //create resources handler for input stream - auto resources_handler_plugin = _registry->handler_plugins().find("resources"); + auto resources_handler_plugin = _registry->handler_plugins().find("input_resources"); resources_module = resources_handler_plugin->second->instantiate(input_stream_module_name + "-resources", input_ptr, &window_config); input_stream->set_resources_handler(resources_module.get()); } diff --git a/src/handlers/CMakeLists.txt b/src/handlers/CMakeLists.txt index de3a656ba..2ccd20eac 100644 --- a/src/handlers/CMakeLists.txt +++ b/src/handlers/CMakeLists.txt @@ -4,6 +4,6 @@ add_subdirectory(dns) add_subdirectory(dhcp) add_subdirectory(pcap) add_subdirectory(mock) -add_subdirectory(resources) +add_subdirectory(input_resources) set(VISOR_STATIC_PLUGINS ${VISOR_STATIC_PLUGINS} PARENT_SCOPE) diff --git a/src/handlers/input_resources/CMakeLists.txt b/src/handlers/input_resources/CMakeLists.txt new file mode 100644 index 000000000..e9cb5f915 --- /dev/null +++ b/src/handlers/input_resources/CMakeLists.txt @@ -0,0 +1,28 @@ +message(STATUS "Handler Module: Input Resources") + +set_directory_properties(PROPERTIES CORRADE_USE_PEDANTIC_FLAGS ON) + +corrade_add_static_plugin(VisorHandlerInputResources + ${CMAKE_CURRENT_BINARY_DIR} + InputResourcesHandler.conf + InputResourcesHandlerModulePlugin.cpp + InputResourcesStreamHandler.cpp + ) +add_library(Visor::Handler::InputResources ALIAS VisorHandlerInputResources) + +target_include_directories(VisorHandlerInputResources + INTERFACE + $ + ) + +target_link_libraries(VisorHandlerInputResources + PUBLIC + Visor::Input::Pcap + Visor::Input::Dnstap + Visor::Input::Mock + Visor::Input::Sflow + ) + +set(VISOR_STATIC_PLUGINS ${VISOR_STATIC_PLUGINS} Visor::Handler::InputResources PARENT_SCOPE) + +add_subdirectory(tests) \ No newline at end of file diff --git a/src/handlers/input_resources/InputResourcesHandler.conf b/src/handlers/input_resources/InputResourcesHandler.conf new file mode 100644 index 000000000..947adaa02 --- /dev/null +++ b/src/handlers/input_resources/InputResourcesHandler.conf @@ -0,0 +1,5 @@ +# Aliases +provides=input_resources +[data] +desc=Input Resources analyzer +type=handler diff --git a/src/handlers/resources/ResourcesHandlerModulePlugin.cpp b/src/handlers/input_resources/InputResourcesHandlerModulePlugin.cpp similarity index 51% rename from src/handlers/resources/ResourcesHandlerModulePlugin.cpp rename to src/handlers/input_resources/InputResourcesHandlerModulePlugin.cpp index 2b442b9e6..565dc90c6 100644 --- a/src/handlers/resources/ResourcesHandlerModulePlugin.cpp +++ b/src/handlers/input_resources/InputResourcesHandlerModulePlugin.cpp @@ -2,28 +2,28 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -#include "ResourcesHandlerModulePlugin.h" +#include "InputResourcesHandlerModulePlugin.h" #include "CoreRegistry.h" -#include "ResourcesStreamHandler.h" +#include "InputResourcesStreamHandler.h" #include "HandlerManager.h" #include "InputStreamManager.h" #include #include -CORRADE_PLUGIN_REGISTER(VisorHandlerResources, visor::handler::resources::ResourcesHandlerModulePlugin, +CORRADE_PLUGIN_REGISTER(VisorHandlerInputResources, visor::handler::resources::InputResourcesHandlerModulePlugin, "visor.module.handler/1.0") namespace visor::handler::resources { using json = nlohmann::json; -void ResourcesHandlerModulePlugin::setup_routes(HttpServer *svr) +void InputResourcesHandlerModulePlugin::setup_routes(HttpServer *svr) { } -std::unique_ptr ResourcesHandlerModulePlugin::instantiate(const std::string &name, InputStream *input_stream, const Configurable *config, StreamHandler *stream_handler) +std::unique_ptr InputResourcesHandlerModulePlugin::instantiate(const std::string &name, InputStream *input_stream, const Configurable *config, StreamHandler *stream_handler) { // TODO using config as both window config and module config - auto handler_module = std::make_unique(name, input_stream, config, stream_handler); + auto handler_module = std::make_unique(name, input_stream, config, stream_handler); handler_module->config_merge(*config); return handler_module; } diff --git a/src/handlers/resources/ResourcesHandlerModulePlugin.h b/src/handlers/input_resources/InputResourcesHandlerModulePlugin.h similarity index 76% rename from src/handlers/resources/ResourcesHandlerModulePlugin.h rename to src/handlers/input_resources/InputResourcesHandlerModulePlugin.h index d9abe2a5a..06e0aef15 100644 --- a/src/handlers/resources/ResourcesHandlerModulePlugin.h +++ b/src/handlers/input_resources/InputResourcesHandlerModulePlugin.h @@ -8,14 +8,14 @@ namespace visor::handler::resources { -class ResourcesHandlerModulePlugin : public HandlerModulePlugin +class InputResourcesHandlerModulePlugin : public HandlerModulePlugin { protected: void setup_routes(HttpServer *svr) override; public: - explicit ResourcesHandlerModulePlugin(Corrade::PluginManager::AbstractManager &manager, const std::string &plugin) + explicit InputResourcesHandlerModulePlugin(Corrade::PluginManager::AbstractManager &manager, const std::string &plugin) : visor::HandlerModulePlugin{manager, plugin} { } diff --git a/src/handlers/resources/ResourcesStreamHandler.cpp b/src/handlers/input_resources/InputResourcesStreamHandler.cpp similarity index 71% rename from src/handlers/resources/ResourcesStreamHandler.cpp rename to src/handlers/input_resources/InputResourcesStreamHandler.cpp index 10d97bd1c..1cf04bc10 100644 --- a/src/handlers/resources/ResourcesStreamHandler.cpp +++ b/src/handlers/input_resources/InputResourcesStreamHandler.cpp @@ -2,12 +2,12 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -#include "ResourcesStreamHandler.h" +#include "InputResourcesStreamHandler.h" namespace visor::handler::resources { -ResourcesStreamHandler::ResourcesStreamHandler(const std::string &name, InputStream *stream, const Configurable *window_config, StreamHandler *handler) - : visor::StreamMetricsHandler(name, window_config) +InputResourcesStreamHandler::InputResourcesStreamHandler(const std::string &name, InputStream *stream, const Configurable *window_config, StreamHandler *handler) + : visor::StreamMetricsHandler(name, window_config) , _timer(0) , _timestamp(timespec()) { @@ -28,7 +28,7 @@ ResourcesStreamHandler::ResourcesStreamHandler(const std::string &name, InputStr } } -void ResourcesStreamHandler::start() +void InputResourcesStreamHandler::start() { if (_running) { return; @@ -39,17 +39,17 @@ void ResourcesStreamHandler::start() } if (_pcap_stream) { - _pkt_connection = _pcap_stream->packet_signal.connect(&ResourcesStreamHandler::process_packet_cb, this); + _pkt_connection = _pcap_stream->packet_signal.connect(&InputResourcesStreamHandler::process_packet_cb, this); } else if (_dnstap_stream) { - _dnstap_connection = _dnstap_stream->dnstap_signal.connect(&ResourcesStreamHandler::process_dnstap_cb, this); + _dnstap_connection = _dnstap_stream->dnstap_signal.connect(&InputResourcesStreamHandler::process_dnstap_cb, this); } else if (_sflow_stream) { - _sflow_connection = _sflow_stream->sflow_signal.connect(&ResourcesStreamHandler::process_sflow_cb, this); + _sflow_connection = _sflow_stream->sflow_signal.connect(&InputResourcesStreamHandler::process_sflow_cb, this); } _running = true; } -void ResourcesStreamHandler::stop() +void InputResourcesStreamHandler::stop() { if (!_running) { return; @@ -66,7 +66,7 @@ void ResourcesStreamHandler::stop() _running = false; } -void ResourcesStreamHandler::process_sflow_cb([[maybe_unused]] const SFSample &) +void InputResourcesStreamHandler::process_sflow_cb([[maybe_unused]] const SFSample &) { if (difftime(time(NULL), _timer) >= MEASURE_INTERVAL) { _timer = time(NULL); @@ -74,7 +74,7 @@ void ResourcesStreamHandler::process_sflow_cb([[maybe_unused]] const SFSample &) } } -void ResourcesStreamHandler::process_dnstap_cb([[maybe_unused]] const dnstap::Dnstap &) +void InputResourcesStreamHandler::process_dnstap_cb([[maybe_unused]] const dnstap::Dnstap &) { if (difftime(time(NULL), _timer) >= MEASURE_INTERVAL) { _timer = time(NULL); @@ -82,7 +82,7 @@ void ResourcesStreamHandler::process_dnstap_cb([[maybe_unused]] const dnstap::Dn } } -void ResourcesStreamHandler::process_packet_cb([[maybe_unused]] pcpp::Packet &payload, [[maybe_unused]] PacketDirection dir, [[maybe_unused]] pcpp::ProtocolType l3, [[maybe_unused]] pcpp::ProtocolType l4, [[maybe_unused]] timespec stamp) +void InputResourcesStreamHandler::process_packet_cb([[maybe_unused]] pcpp::Packet &payload, [[maybe_unused]] PacketDirection dir, [[maybe_unused]] pcpp::ProtocolType l3, [[maybe_unused]] pcpp::ProtocolType l4, [[maybe_unused]] timespec stamp) { if (stamp.tv_sec >= MEASURE_INTERVAL + _timestamp.tv_sec) { _timestamp = stamp; @@ -90,10 +90,10 @@ void ResourcesStreamHandler::process_packet_cb([[maybe_unused]] pcpp::Packet &pa } } -void ResourcesMetricsBucket::specialized_merge(const AbstractMetricsBucket &o) +void InputResourcesMetricsBucket::specialized_merge(const AbstractMetricsBucket &o) { // static because caller guarantees only our own bucket type - const auto &other = static_cast(o); + const auto &other = static_cast(o); std::shared_lock r_lock(other._mutex); std::unique_lock w_lock(_mutex); @@ -102,7 +102,7 @@ void ResourcesMetricsBucket::specialized_merge(const AbstractMetricsBucket &o) _memory_usage_kb.merge(other._memory_usage_kb); } -void ResourcesMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelMap add_labels) const +void InputResourcesMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelMap add_labels) const { { auto [num_events, num_samples, event_rate, event_lock] = event_data_locked(); // thread safe @@ -118,7 +118,7 @@ void ResourcesMetricsBucket::to_prometheus(std::stringstream &out, Metric::Label _memory_usage_kb.to_prometheus(out, add_labels); } -void ResourcesMetricsBucket::to_json(json &j) const +void InputResourcesMetricsBucket::to_json(json &j) const { bool live_rates = !read_only() && !recorded_stream(); @@ -136,7 +136,7 @@ void ResourcesMetricsBucket::to_json(json &j) const _memory_usage_kb.to_json(j); } -void ResourcesMetricsBucket::process_resources() +void InputResourcesMetricsBucket::process_resources() { std::unique_lock lock(_mutex); @@ -144,7 +144,7 @@ void ResourcesMetricsBucket::process_resources() _memory_usage_kb.update(_monitor.memory_usage()); } -void ResourcesMetricsManager::process_resources(timespec stamp) +void InputResourcesMetricsManager::process_resources(timespec stamp) { if(stamp.tv_sec == 0) { // use now() diff --git a/src/handlers/resources/ResourcesStreamHandler.h b/src/handlers/input_resources/InputResourcesStreamHandler.h similarity index 78% rename from src/handlers/resources/ResourcesStreamHandler.h rename to src/handlers/input_resources/InputResourcesStreamHandler.h index beef4de92..3f522749f 100644 --- a/src/handlers/resources/ResourcesStreamHandler.h +++ b/src/handlers/input_resources/InputResourcesStreamHandler.h @@ -24,7 +24,7 @@ using namespace visor::input::sflow; constexpr double MEASURE_INTERVAL = 10; // in seconds -class ResourcesMetricsBucket final : public visor::AbstractMetricsBucket +class InputResourcesMetricsBucket final : public visor::AbstractMetricsBucket { protected: @@ -38,7 +38,7 @@ class ResourcesMetricsBucket final : public visor::AbstractMetricsBucket ThreadMonitor _monitor; public: - ResourcesMetricsBucket() + InputResourcesMetricsBucket() : _cpu_percentage("resources", {"cpu_percentage"}, "Quantiles of thread cpu usage") , _memory_usage_kb("resources", {"memory_bytes"}, "Quantiles of thead memory usage in bytes") { @@ -52,18 +52,18 @@ class ResourcesMetricsBucket final : public visor::AbstractMetricsBucket void process_resources(); }; -class ResourcesMetricsManager final : public visor::AbstractMetricsManager +class InputResourcesMetricsManager final : public visor::AbstractMetricsManager { public: - ResourcesMetricsManager(const Configurable *window_config) - : visor::AbstractMetricsManager(window_config) + InputResourcesMetricsManager(const Configurable *window_config) + : visor::AbstractMetricsManager(window_config) { } void process_resources(timespec stamp = timespec()); }; -class ResourcesStreamHandler final : public visor::StreamMetricsHandler +class InputResourcesStreamHandler final : public visor::StreamMetricsHandler { // the input stream sources we support (only one will be in use at a time) @@ -83,8 +83,8 @@ class ResourcesStreamHandler final : public visor::StreamMetricsHandler("num_periods", 1); - ResourcesStreamHandler resources_handler{"net-test", &stream, &c}; + InputResourcesStreamHandler resources_handler{"net-test", &stream, &c}; resources_handler.start(); stream.start(); @@ -51,7 +51,7 @@ TEST_CASE("Check resources for dnstap input", "[dnstap][resources]") stream.config_set("only_hosts", {"192.168.0.0/24", "2001:db8::/48"}); visor::Config c; c.config_set("num_periods", 1); - ResourcesStreamHandler resources_handler{"net-test", &stream, &c}; + InputResourcesStreamHandler resources_handler{"net-test", &stream, &c}; resources_handler.start(); stream.start(); @@ -77,7 +77,7 @@ TEST_CASE("Check resources for sflow input", "[sflow][resources]") visor::Config c; c.config_set("num_periods", 1); - ResourcesStreamHandler resources_handler{"net-test", &stream, &c}; + InputResourcesStreamHandler resources_handler{"net-test", &stream, &c}; resources_handler.start(); stream.start(); diff --git a/src/handlers/resources/CMakeLists.txt b/src/handlers/resources/CMakeLists.txt deleted file mode 100644 index cea8b7188..000000000 --- a/src/handlers/resources/CMakeLists.txt +++ /dev/null @@ -1,27 +0,0 @@ -message(STATUS "Handler Module: Resources") - -set_directory_properties(PROPERTIES CORRADE_USE_PEDANTIC_FLAGS ON) - -corrade_add_static_plugin(VisorHandlerResources - ${CMAKE_CURRENT_BINARY_DIR} - ResourcesHandler.conf - ResourcesHandlerModulePlugin.cpp - ResourcesStreamHandler.cpp) -add_library(Visor::Handler::Resources ALIAS VisorHandlerResources) - -target_include_directories(VisorHandlerResources - INTERFACE - $ - ) - -target_link_libraries(VisorHandlerResources - PUBLIC - Visor::Input::Pcap - Visor::Input::Dnstap - Visor::Input::Mock - Visor::Input::Sflow - ) - -set(VISOR_STATIC_PLUGINS ${VISOR_STATIC_PLUGINS} Visor::Handler::Resources PARENT_SCOPE) - -add_subdirectory(tests) \ No newline at end of file diff --git a/src/handlers/resources/README.md b/src/handlers/resources/README.md deleted file mode 100644 index 976e7da82..000000000 --- a/src/handlers/resources/README.md +++ /dev/null @@ -1,7 +0,0 @@ -# Application Resources Metrics Stream Handler - -This directory contains the Resources stream handler - -It can attach to input streams and provide system thread information - -[ResourcesStreamHandler.h](ResourcesStreamHandler.h) contains the list of metrics. diff --git a/src/handlers/resources/ResourcesHandler.conf b/src/handlers/resources/ResourcesHandler.conf deleted file mode 100644 index 432eb1c78..000000000 --- a/src/handlers/resources/ResourcesHandler.conf +++ /dev/null @@ -1,5 +0,0 @@ -# Aliases -provides=resources -[data] -desc=Resources analyzer -type=handler diff --git a/src/handlers/resources/tests/CMakeLists.txt b/src/handlers/resources/tests/CMakeLists.txt deleted file mode 100644 index 7585d130d..000000000 --- a/src/handlers/resources/tests/CMakeLists.txt +++ /dev/null @@ -1,16 +0,0 @@ - -## TEST SUITE -add_executable(unit-tests-handler-resources - main.cpp - test_resources_layer.cpp - ) - -target_link_libraries(unit-tests-handler-resources - PRIVATE - ${CONAN_LIBS_JSON-SCHEMA-VALIDATOR} - Visor::Handler::Resources) - -add_test(NAME unit-tests-handler-resources - WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/src - COMMAND unit-tests-handler-resources - ) diff --git a/src/handlers/static_plugins.h b/src/handlers/static_plugins.h index 62b37f3ee..fa2013519 100644 --- a/src/handlers/static_plugins.h +++ b/src/handlers/static_plugins.h @@ -5,7 +5,7 @@ int import_handler_plugins() CORRADE_PLUGIN_IMPORT(VisorHandlerDns); CORRADE_PLUGIN_IMPORT(VisorHandlerDhcp); CORRADE_PLUGIN_IMPORT(VisorHandlerPcap); - CORRADE_PLUGIN_IMPORT(VisorHandlerResources); + CORRADE_PLUGIN_IMPORT(VisorHandlerInputResources); return 0; } From d2765d25ea48532c6a088c0d677c8626743c5b78 Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Mon, 21 Mar 2022 17:49:14 -0400 Subject: [PATCH 09/22] fix typo --- src/handlers/input_resources/InputResourcesStreamHandler.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handlers/input_resources/InputResourcesStreamHandler.h b/src/handlers/input_resources/InputResourcesStreamHandler.h index 3f522749f..4b879519c 100644 --- a/src/handlers/input_resources/InputResourcesStreamHandler.h +++ b/src/handlers/input_resources/InputResourcesStreamHandler.h @@ -40,7 +40,7 @@ class InputResourcesMetricsBucket final : public visor::AbstractMetricsBucket public: InputResourcesMetricsBucket() : _cpu_percentage("resources", {"cpu_percentage"}, "Quantiles of thread cpu usage") - , _memory_usage_kb("resources", {"memory_bytes"}, "Quantiles of thead memory usage in bytes") + , _memory_usage_kb("resources", {"memory_bytes"}, "Quantiles of thread memory usage in bytes") { } From 5ebfb24ba75dec8c154d654176226bf10e81c236 Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Mon, 21 Mar 2022 18:41:53 -0400 Subject: [PATCH 10/22] create new policy with resource handler for each new input --- src/CoreServer.cpp | 6 +----- src/InputStream.h | 11 ----------- src/Policies.cpp | 24 +++++++++++++++++------- 3 files changed, 18 insertions(+), 23 deletions(-) diff --git a/src/CoreServer.cpp b/src/CoreServer.cpp index 944709a63..b1a221aec 100644 --- a/src/CoreServer.cpp +++ b/src/CoreServer.cpp @@ -144,11 +144,11 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) try { auto [policy, lock] = _registry->policy_manager()->module_get_locked("default"); uint64_t period(std::stol(req.matches[1])); - auto key = fmt::format("{}m", period); for (auto &mod : policy->modules()) { auto hmod = dynamic_cast(mod); if (hmod) { spdlog::stopwatch sw; + auto key = fmt::format("{}m", period); hmod->window_json(j[key], period, true); _logger->debug("{} window_json {} elapsed time: {}", hmod->name(), period, sw); } @@ -324,8 +324,6 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) } } } - auto resources_handler = policy->input_stream()->resources_handler(); - resources_handler->window_json(j[policy->name()][resources_handler->name()], period, req.matches[2] == "window"); _logger->debug("{} policy json metrics elapsed time: {}", policy->name(), psw); } res.set_content(j.dump(), "text/json"); @@ -366,8 +364,6 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) _logger->debug("{} window_prometheus elapsed time: {}", hmod->name(), sw); } } - auto resources_handler = policy->input_stream()->resources_handler(); - resources_handler->window_prometheus(output, {{"policy", p_mname}, {"module", resources_handler->name()}}); } catch (const std::exception &e) { res.status = 500; res.set_content(e.what(), "text/plain"); diff --git a/src/InputStream.h b/src/InputStream.h index aa158cab7..bf6cdf50b 100644 --- a/src/InputStream.h +++ b/src/InputStream.h @@ -13,7 +13,6 @@ class InputStream : public AbstractRunnableModule { mutable std::shared_mutex _input_mutex; std::vector _policies; - StreamHandler *_resources_handler; public: InputStream(const std::string &name) @@ -41,16 +40,6 @@ class InputStream : public AbstractRunnableModule return _policies.size(); } - void set_resources_handler(StreamHandler *resources_handler) - { - _resources_handler = resources_handler; - } - - StreamHandler *resources_handler() const - { - return _resources_handler; - } - virtual size_t consumer_count() const = 0; void common_info_json(json &j) const diff --git a/src/Policies.cpp b/src/Policies.cpp index d5269c05f..4b06337dc 100644 --- a/src/Policies.cpp +++ b/src/Policies.cpp @@ -174,12 +174,15 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) window_config.config_set("deep_sample_rate", _default_deep_sample_rate); } + std::unique_ptr input_resources_policy; std::unique_ptr resources_module; if (input_stream) { - //create resources handler for input stream + // create new policy with resources handler for input stream + input_resources_policy = std::make_unique(input_stream_module_name + "-resources", tap); + input_resources_policy->set_input_stream(input_ptr); auto resources_handler_plugin = _registry->handler_plugins().find("input_resources"); - resources_module = resources_handler_plugin->second->instantiate(input_stream_module_name + "-resources", input_ptr, &window_config); - input_stream->set_resources_handler(resources_module.get()); + resources_module = resources_handler_plugin->second->instantiate("resources", input_ptr, &window_config); + input_resources_policy->add_module(resources_module.get()); } std::vector> handler_modules; @@ -276,6 +279,9 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) // make sure policy starts before committing try { + if (input_resources_policy) { + input_resources_policy->start(); + } policy->start(); } catch (std::runtime_error &e) { throw PolicyException(fmt::format("policy [{}] failed to start: {}", policy_name, e.what())); @@ -285,6 +291,9 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) // If the modules created above go out of scope before this step, they will destruct so the key is to make sure // roll back during exception ensures no modules have been added to any of the managers try { + if (input_resources_policy) { + module_add(std::move(input_resources_policy)); + } module_add(std::move(policy)); } catch (ModuleException &e) { throw PolicyException(fmt::format("policy [{}] creation failed (policy): {}", policy_name, e.what())); @@ -335,7 +344,6 @@ void PolicyManager::remove_policy(const std::string &name) auto policy = _map[name].get(); auto input_name = policy->input_stream()->name(); - auto resource_handler_name = policy->input_stream()->resources_handler()->name(); std::vector module_names; for (const auto &mod : policy->modules()) { module_names.push_back(mod->name()); @@ -347,8 +355,12 @@ void PolicyManager::remove_policy(const std::string &name) } if (!policy->input_stream()->policies_count()) { + auto resources_name = input_name + "-resources"; + auto resources_policy = _map[resources_name].get(); + resources_policy->stop(); + _registry->handler_manager()->module_remove("resources"); _registry->input_manager()->module_remove(input_name); - _registry->handler_manager()->module_remove(resource_handler_name); + _map.erase(resources_name); } _map.erase(name); @@ -376,7 +388,6 @@ void Policy::start() // from handlers in the same thread we are starting the policy from spdlog::get("visor")->info("policy [{}]: starting input instance: {}", _name, _input_stream->name()); _input_stream->start(); - _input_stream->resources_handler()->start(); _running = true; } void Policy::stop() @@ -389,7 +400,6 @@ void Policy::stop() if (_input_stream->policies_count() <= 1) { spdlog::get("visor")->info("policy [{}]: stopping input instance: {}", _name, _input_stream->name()); _input_stream->stop(); - _input_stream->resources_handler()->stop(); } else { spdlog::get("visor")->info("policy [{}]: input instance {} not stopped because it is in use by another policy.", _name, _input_stream->name()); } From b8b490f028845c8ce52dd49875b59ab33503c117 Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Mon, 21 Mar 2022 18:43:30 -0400 Subject: [PATCH 11/22] remove not necessary include from CoreServer --- src/CoreServer.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/CoreServer.cpp b/src/CoreServer.cpp index b1a221aec..9b35971cc 100644 --- a/src/CoreServer.cpp +++ b/src/CoreServer.cpp @@ -4,7 +4,6 @@ #include "CoreServer.h" #include "HandlerManager.h" -#include "InputStream.h" #include "Metrics.h" #include "Policies.h" #include "Taps.h" From 0a50c76ace31006f368494bdb3dc13d428e0b8a8 Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Tue, 22 Mar 2022 11:02:49 -0400 Subject: [PATCH 12/22] Add unique name for resources handler --- src/Policies.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Policies.cpp b/src/Policies.cpp index 4b06337dc..1ecb5ad75 100644 --- a/src/Policies.cpp +++ b/src/Policies.cpp @@ -181,7 +181,7 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) input_resources_policy = std::make_unique(input_stream_module_name + "-resources", tap); input_resources_policy->set_input_stream(input_ptr); auto resources_handler_plugin = _registry->handler_plugins().find("input_resources"); - resources_module = resources_handler_plugin->second->instantiate("resources", input_ptr, &window_config); + resources_module = resources_handler_plugin->second->instantiate(input_stream_module_name + "-resources", input_ptr, &window_config); input_resources_policy->add_module(resources_module.get()); } @@ -358,7 +358,7 @@ void PolicyManager::remove_policy(const std::string &name) auto resources_name = input_name + "-resources"; auto resources_policy = _map[resources_name].get(); resources_policy->stop(); - _registry->handler_manager()->module_remove("resources"); + _registry->handler_manager()->module_remove(resources_name); _registry->input_manager()->module_remove(input_name); _map.erase(resources_name); } From e78a13a692ab435835ef5d6a449e9da7bffb6f60 Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Tue, 22 Mar 2022 12:01:50 -0400 Subject: [PATCH 13/22] add input resources policy to input policy list --- src/Policies.cpp | 3 ++- src/tests/test_policies.cpp | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Policies.cpp b/src/Policies.cpp index 1ecb5ad75..f9788dd7e 100644 --- a/src/Policies.cpp +++ b/src/Policies.cpp @@ -183,6 +183,7 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) auto resources_handler_plugin = _registry->handler_plugins().find("input_resources"); resources_module = resources_handler_plugin->second->instantiate(input_stream_module_name + "-resources", input_ptr, &window_config); input_resources_policy->add_module(resources_module.get()); + input_stream->add_policy(input_resources_policy.get()); } std::vector> handler_modules; @@ -354,7 +355,7 @@ void PolicyManager::remove_policy(const std::string &name) _registry->handler_manager()->module_remove(name); } - if (!policy->input_stream()->policies_count()) { + if (policy->input_stream()->policies_count() <= 1) { auto resources_name = input_name + "-resources"; auto resources_policy = _map[resources_name].get(); resources_policy->stop(); diff --git a/src/tests/test_policies.cpp b/src/tests/test_policies.cpp index 940fbfbcf..d4ee4b208 100644 --- a/src/tests/test_policies.cpp +++ b/src/tests/test_policies.cpp @@ -657,7 +657,7 @@ TEST_CASE("Policies", "[policies]") CHECK(policy->modules()[1]->running()); CHECK(policy->modules()[2]->running()); policy->stop(); - CHECK(!policy->input_stream()->running()); + CHECK(policy->input_stream()->running()); CHECK(!policy->modules()[0]->running()); CHECK(!policy->modules()[1]->running()); CHECK(!policy->modules()[2]->running()); From 70be023bc7cd26e2bea1af6690fd2727631e6cbe Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Tue, 22 Mar 2022 13:46:10 -0400 Subject: [PATCH 14/22] only add input resource policy if the creation policy succeed --- src/Policies.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Policies.cpp b/src/Policies.cpp index f9788dd7e..d61fedead 100644 --- a/src/Policies.cpp +++ b/src/Policies.cpp @@ -292,10 +292,10 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) // If the modules created above go out of scope before this step, they will destruct so the key is to make sure // roll back during exception ensures no modules have been added to any of the managers try { + module_add(std::move(policy)); if (input_resources_policy) { module_add(std::move(input_resources_policy)); } - module_add(std::move(policy)); } catch (ModuleException &e) { throw PolicyException(fmt::format("policy [{}] creation failed (policy): {}", policy_name, e.what())); } From 47f6de0275b4db0e65992fd5bf63a94aa1ab0c2e Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Tue, 22 Mar 2022 14:44:35 -0400 Subject: [PATCH 15/22] verify if input resources policy exists before removing it --- src/Policies.cpp | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/Policies.cpp b/src/Policies.cpp index d61fedead..d1534ec0d 100644 --- a/src/Policies.cpp +++ b/src/Policies.cpp @@ -181,9 +181,11 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) input_resources_policy = std::make_unique(input_stream_module_name + "-resources", tap); input_resources_policy->set_input_stream(input_ptr); auto resources_handler_plugin = _registry->handler_plugins().find("input_resources"); - resources_module = resources_handler_plugin->second->instantiate(input_stream_module_name + "-resources", input_ptr, &window_config); - input_resources_policy->add_module(resources_module.get()); - input_stream->add_policy(input_resources_policy.get()); + if (resources_handler_plugin != _registry->handler_plugins().end()) { + resources_module = resources_handler_plugin->second->instantiate(input_stream_module_name + "-resources", input_ptr, &window_config); + input_resources_policy->add_module(resources_module.get()); + input_stream->add_policy(input_resources_policy.get()); + } } std::vector> handler_modules; @@ -355,13 +357,17 @@ void PolicyManager::remove_policy(const std::string &name) _registry->handler_manager()->module_remove(name); } - if (policy->input_stream()->policies_count() <= 1) { - auto resources_name = input_name + "-resources"; - auto resources_policy = _map[resources_name].get(); - resources_policy->stop(); - _registry->handler_manager()->module_remove(resources_name); + if (policy->input_stream()->policies_count() == 1) { + auto input_resources_name = input_name + "-resources"; + if (_map.count(input_resources_name) != 0) { + auto resources_policy = _map[input_resources_name].get(); + resources_policy->stop(); + _registry->handler_manager()->module_remove(input_resources_name); + _registry->input_manager()->module_remove(input_name); + _map.erase(input_resources_name); + } + } else if (!policy->input_stream()->policies_count()) { _registry->input_manager()->module_remove(input_name); - _map.erase(resources_name); } _map.erase(name); From 95fa310b9357d7503e632b6b86278fcf0517ce61 Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Tue, 22 Mar 2022 14:45:48 -0400 Subject: [PATCH 16/22] start input resources policy only after the added policy --- src/Policies.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Policies.cpp b/src/Policies.cpp index d1534ec0d..9e01802da 100644 --- a/src/Policies.cpp +++ b/src/Policies.cpp @@ -282,10 +282,10 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) // make sure policy starts before committing try { + policy->start(); if (input_resources_policy) { input_resources_policy->start(); } - policy->start(); } catch (std::runtime_error &e) { throw PolicyException(fmt::format("policy [{}] failed to start: {}", policy_name, e.what())); } From eff1db9ff445f729100eb0e6d0e36970eca5a416 Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Tue, 22 Mar 2022 16:24:04 -0400 Subject: [PATCH 17/22] do proper roll back policy --- src/Policies.cpp | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/src/Policies.cpp b/src/Policies.cpp index 9e01802da..afa72beb5 100644 --- a/src/Policies.cpp +++ b/src/Policies.cpp @@ -175,6 +175,7 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) } std::unique_ptr input_resources_policy; + Policy *input_res_policy_ptr; std::unique_ptr resources_module; if (input_stream) { // create new policy with resources handler for input stream @@ -184,7 +185,7 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) if (resources_handler_plugin != _registry->handler_plugins().end()) { resources_module = resources_handler_plugin->second->instantiate(input_stream_module_name + "-resources", input_ptr, &window_config); input_resources_policy->add_module(resources_module.get()); - input_stream->add_policy(input_resources_policy.get()); + input_res_policy_ptr = input_resources_policy.get(); } } @@ -302,13 +303,15 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) throw PolicyException(fmt::format("policy [{}] creation failed (policy): {}", policy_name, e.what())); } try { - if (input_stream && resources_module) { + if (input_stream) { _registry->input_manager()->module_add(std::move(input_stream)); - _registry->handler_manager()->module_add(std::move(resources_module)); } } catch (ModuleException &e) { // note that if this call excepts, we are in an unknown state and the exception will propagate module_remove(policy_name); + if (input_res_policy_ptr) { + module_remove(input_res_policy_ptr->name()); + } throw PolicyException(fmt::format("policy [{}] creation failed (input): {}", policy_name, e.what())); } std::vector added_handlers; @@ -319,10 +322,18 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) // if it did not except, add it to the list for rollback upon exception added_handlers.push_back(hname); } + if (resources_module) { + auto hname = resources_module->name(); + _registry->handler_manager()->module_add(std::move(resources_module)); + added_handlers.push_back(hname); + } } catch (ModuleException &e) { // note that if any of these calls except, we are in an unknown state and the exception will propagate // nothing needs to be stopped because it was not started module_remove(policy_name); + if (input_res_policy_ptr) { + module_remove(input_res_policy_ptr->name()); + } _registry->input_manager()->module_remove(input_stream_module_name); for (auto &m : added_handlers) { _registry->handler_manager()->module_remove(m); @@ -332,6 +343,9 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) } // success + if (input_res_policy_ptr) { + input_ptr->add_policy(input_res_policy_ptr); + } input_ptr->add_policy(policy_ptr); result.push_back(policy_ptr); } @@ -346,6 +360,7 @@ void PolicyManager::remove_policy(const std::string &name) } auto policy = _map[name].get(); + auto input_stream = policy->input_stream(); auto input_name = policy->input_stream()->name(); std::vector module_names; for (const auto &mod : policy->modules()) { @@ -357,16 +372,18 @@ void PolicyManager::remove_policy(const std::string &name) _registry->handler_manager()->module_remove(name); } - if (policy->input_stream()->policies_count() == 1) { + if (input_stream->policies_count() == 1) { + // if there is only one policy left on the input stream, and that policy is the input resources policy, then remove it auto input_resources_name = input_name + "-resources"; if (_map.count(input_resources_name) != 0) { auto resources_policy = _map[input_resources_name].get(); resources_policy->stop(); _registry->handler_manager()->module_remove(input_resources_name); - _registry->input_manager()->module_remove(input_name); _map.erase(input_resources_name); } - } else if (!policy->input_stream()->policies_count()) { + } + + if (!input_stream->policies_count()) { _registry->input_manager()->module_remove(input_name); } From a1cfe973e10085a7dc6ea7b4ad74ec0707a1c756 Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Wed, 23 Mar 2022 15:03:13 -0400 Subject: [PATCH 18/22] Add support to count policies and handlers attached to input stream --- src/InputStream.h | 21 ++++++-- src/Policies.cpp | 9 +++- .../InputResourcesStreamHandler.cpp | 52 +++++++++++++++---- .../InputResourcesStreamHandler.h | 18 ++++--- .../tests/test_resources_layer.cpp | 25 ++++++--- src/inputs/dnstap/DnstapInputStream.h | 3 +- src/inputs/mock/MockInputStream.h | 3 +- src/inputs/pcap/PcapInputStream.h | 3 +- src/inputs/sflow/SflowInputStream.h | 3 +- 9 files changed, 99 insertions(+), 38 deletions(-) diff --git a/src/InputStream.h b/src/InputStream.h index bf6cdf50b..eb17531bd 100644 --- a/src/InputStream.h +++ b/src/InputStream.h @@ -6,13 +6,14 @@ #include "AbstractModule.h" #include "StreamHandler.h" +#include namespace visor { class InputStream : public AbstractRunnableModule { mutable std::shared_mutex _input_mutex; - std::vector _policies; + std::map _policies; public: InputStream(const std::string &name) @@ -22,16 +23,21 @@ class InputStream : public AbstractRunnableModule virtual ~InputStream(){}; - void add_policy(const Policy *policy) + void add_policy(const Policy *policy, uint16_t handlers) { std::unique_lock lock(_input_mutex); - _policies.push_back(policy); + _policies[policy] = handlers; + attached_policies(1, handlers); } void remove_policy(const Policy *policy) { std::unique_lock lock(_input_mutex); - _policies.erase(std::remove(_policies.begin(), _policies.end(), policy), _policies.end()); + auto iterator = _policies.find(policy); + if (iterator != _policies.end()) { + attached_policies(-1, -iterator->second); + _policies.erase(iterator); + } } size_t policies_count() const @@ -40,7 +46,10 @@ class InputStream : public AbstractRunnableModule return _policies.size(); } - virtual size_t consumer_count() const = 0; + virtual size_t consumer_count() const + { + return attached_policies.slot_count(); + } void common_info_json(json &j) const { @@ -48,6 +57,8 @@ class InputStream : public AbstractRunnableModule j["input"]["running"] = running(); j["input"]["consumers"] = consumer_count(); } + + mutable sigslot::signal attached_policies; }; } diff --git a/src/Policies.cpp b/src/Policies.cpp index afa72beb5..05270ba6d 100644 --- a/src/Policies.cpp +++ b/src/Policies.cpp @@ -344,9 +344,14 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) // success if (input_res_policy_ptr) { - input_ptr->add_policy(input_res_policy_ptr); + input_ptr->add_policy(input_res_policy_ptr, input_res_policy_ptr->modules().size()); } - input_ptr->add_policy(policy_ptr); + if (handler_sequence) { + input_ptr->add_policy(policy_ptr, 1); + } else { + input_ptr->add_policy(policy_ptr, policy_ptr->modules().size()); + } + result.push_back(policy_ptr); } return result; diff --git a/src/handlers/input_resources/InputResourcesStreamHandler.cpp b/src/handlers/input_resources/InputResourcesStreamHandler.cpp index 1cf04bc10..c1eb76f4d 100644 --- a/src/handlers/input_resources/InputResourcesStreamHandler.cpp +++ b/src/handlers/input_resources/InputResourcesStreamHandler.cpp @@ -40,10 +40,13 @@ void InputResourcesStreamHandler::start() if (_pcap_stream) { _pkt_connection = _pcap_stream->packet_signal.connect(&InputResourcesStreamHandler::process_packet_cb, this); + _policies_connection = _pcap_stream->attached_policies.connect(&InputResourcesStreamHandler::process_policies_cb, this); } else if (_dnstap_stream) { _dnstap_connection = _dnstap_stream->dnstap_signal.connect(&InputResourcesStreamHandler::process_dnstap_cb, this); + _policies_connection = _dnstap_stream->attached_policies.connect(&InputResourcesStreamHandler::process_policies_cb, this); } else if (_sflow_stream) { _sflow_connection = _sflow_stream->sflow_signal.connect(&InputResourcesStreamHandler::process_sflow_cb, this); + _policies_connection = _sflow_stream->attached_policies.connect(&InputResourcesStreamHandler::process_policies_cb, this); } _running = true; @@ -62,15 +65,21 @@ void InputResourcesStreamHandler::stop() } else if (_sflow_stream) { _sflow_connection.disconnect(); } + _policies_connection.disconnect(); _running = false; } +void InputResourcesStreamHandler::process_policies_cb(int16_t policies_number, int16_t handlers_count) +{ + _metrics->process_policies(policies_number, handlers_count); +} + void InputResourcesStreamHandler::process_sflow_cb([[maybe_unused]] const SFSample &) { if (difftime(time(NULL), _timer) >= MEASURE_INTERVAL) { _timer = time(NULL); - _metrics->process_resources(); + _metrics->process_resources(_monitor.cpu_percentage(), _monitor.memory_usage()); } } @@ -78,7 +87,7 @@ void InputResourcesStreamHandler::process_dnstap_cb([[maybe_unused]] const dnsta { if (difftime(time(NULL), _timer) >= MEASURE_INTERVAL) { _timer = time(NULL); - _metrics->process_resources(); + _metrics->process_resources(_monitor.cpu_percentage(), _monitor.memory_usage()); } } @@ -86,7 +95,7 @@ void InputResourcesStreamHandler::process_packet_cb([[maybe_unused]] pcpp::Packe { if (stamp.tv_sec >= MEASURE_INTERVAL + _timestamp.tv_sec) { _timestamp = stamp; - _metrics->process_resources(stamp); + _metrics->process_resources(_monitor.cpu_percentage(), _monitor.memory_usage()); } } @@ -100,6 +109,8 @@ void InputResourcesMetricsBucket::specialized_merge(const AbstractMetricsBucket _cpu_percentage.merge(other._cpu_percentage); _memory_usage_kb.merge(other._memory_usage_kb); + _policies_number += other._policies_number; + _handlers_count += other._handlers_count; } void InputResourcesMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelMap add_labels) const @@ -116,6 +127,8 @@ void InputResourcesMetricsBucket::to_prometheus(std::stringstream &out, Metric:: _cpu_percentage.to_prometheus(out, add_labels); _memory_usage_kb.to_prometheus(out, add_labels); + _policies_number.to_prometheus(out, add_labels); + _handlers_count.to_prometheus(out, add_labels); } void InputResourcesMetricsBucket::to_json(json &j) const @@ -134,25 +147,46 @@ void InputResourcesMetricsBucket::to_json(json &j) const _cpu_percentage.to_json(j); _memory_usage_kb.to_json(j); + _policies_number.to_json(j); + _handlers_count.to_json(j); +} + +void InputResourcesMetricsBucket::process_resources(double cpu_usage, uint64_t memory_usage) +{ + std::unique_lock lock(_mutex); + + _cpu_percentage.update(cpu_usage); + _memory_usage_kb.update(memory_usage); } -void InputResourcesMetricsBucket::process_resources() +void InputResourcesMetricsBucket::process_policies(int16_t policies_number, int16_t handlers_count) { std::unique_lock lock(_mutex); - _cpu_percentage.update(_monitor.cpu_percentage()); - _memory_usage_kb.update(_monitor.memory_usage()); + _policies_number += policies_number; + _handlers_count += handlers_count; } -void InputResourcesMetricsManager::process_resources(timespec stamp) +void InputResourcesMetricsManager::process_resources(double cpu_usage, uint64_t memory_usage, timespec stamp) { - if(stamp.tv_sec == 0) { + if (stamp.tv_sec == 0) { // use now() std::timespec_get(&stamp, TIME_UTC); } // base event new_event(stamp); // process in the "live" bucket. this will parse the resources if we are deep sampling - live_bucket()->process_resources(); + live_bucket()->process_resources(cpu_usage, memory_usage); +} + +void InputResourcesMetricsManager::process_policies(int16_t policies_number, int16_t handlers_count) +{ + timespec stamp; + // use now() + std::timespec_get(&stamp, TIME_UTC); + // base event + new_event(stamp); + // process in the "live" bucket. this will parse the resources if we are deep sampling + live_bucket()->process_policies(policies_number, handlers_count); } } \ No newline at end of file diff --git a/src/handlers/input_resources/InputResourcesStreamHandler.h b/src/handlers/input_resources/InputResourcesStreamHandler.h index 4b879519c..49b94b3a5 100644 --- a/src/handlers/input_resources/InputResourcesStreamHandler.h +++ b/src/handlers/input_resources/InputResourcesStreamHandler.h @@ -33,14 +33,15 @@ class InputResourcesMetricsBucket final : public visor::AbstractMetricsBucket // total numPackets is tracked in base class num_events Quantile _cpu_percentage; Quantile _memory_usage_kb; - uint32_t thread_id; - - ThreadMonitor _monitor; + Counter _policies_number; + Counter _handlers_count; public: InputResourcesMetricsBucket() : _cpu_percentage("resources", {"cpu_percentage"}, "Quantiles of thread cpu usage") , _memory_usage_kb("resources", {"memory_bytes"}, "Quantiles of thread memory usage in bytes") + , _policies_number("resources", {"policies_attached"}, "Total number of policies attached to the input stream") + , _handlers_count("resources", {"handlers_attached"}, "Total number of handlers attached to the input stream") { } @@ -49,7 +50,8 @@ class InputResourcesMetricsBucket final : public visor::AbstractMetricsBucket void to_json(json &j) const override; void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const override; - void process_resources(); + void process_resources(double cpu_usage, uint64_t memory_usage); + void process_policies(int16_t policies_number, int16_t handlers_count); }; class InputResourcesMetricsManager final : public visor::AbstractMetricsManager @@ -60,13 +62,15 @@ class InputResourcesMetricsManager final : public visor::AbstractMetricsManager< { } - void process_resources(timespec stamp = timespec()); + void process_resources(double cpu_usage, uint64_t memory_usage, timespec stamp = timespec()); + void process_policies(int16_t policies_number, int16_t handlers_count); }; class InputResourcesStreamHandler final : public visor::StreamMetricsHandler { // the input stream sources we support (only one will be in use at a time) + ThreadMonitor _monitor; time_t _timer; timespec _timestamp; PcapInputStream *_pcap_stream{nullptr}; @@ -77,9 +81,11 @@ class InputResourcesStreamHandler final : public visor::StreamMetricsHandler #include "DnstapInputStream.h" -#include "PcapInputStream.h" #include "InputResourcesStreamHandler.h" +#include "PcapInputStream.h" #include "SflowInputStream.h" using namespace visor::handler::resources; @@ -15,23 +15,26 @@ TEST_CASE("Check resources for pcap input", "[pcap][resources]") visor::Config c; c.config_set("num_periods", 1); - InputResourcesStreamHandler resources_handler{"net-test", &stream, &c}; + InputResourcesStreamHandler resources_handler{"resource-test", &stream, &c}; resources_handler.start(); stream.start(); + stream.attached_policies(1, 2); resources_handler.stop(); stream.stop(); auto event_data = resources_handler.metrics()->bucket(0)->event_data_locked(); CHECK(resources_handler.metrics()->current_periods() == 1); - CHECK(event_data.num_events->value() == 1); + CHECK(event_data.num_events->value() == 2); nlohmann::json j; resources_handler.metrics()->bucket(0)->to_json(j); CHECK(j["cpu_percentage"]["p50"] == 0.0); CHECK(j["memory_bytes"]["p50"] != nullptr); + CHECK(j["policies_attached"] == 1); + CHECK(j["handlers_attached"] == 2); std::stringstream output; std::string line; @@ -41,7 +44,7 @@ TEST_CASE("Check resources for pcap input", "[pcap][resources]") std::getline(output, line); CHECK(line == "# TYPE base_total gauge"); std::getline(output, line); - CHECK(line == R"(base_total{policy="default"} 1)"); + CHECK(line == R"(base_total{policy="default"} 2)"); } TEST_CASE("Check resources for dnstap input", "[dnstap][resources]") @@ -51,23 +54,26 @@ TEST_CASE("Check resources for dnstap input", "[dnstap][resources]") stream.config_set("only_hosts", {"192.168.0.0/24", "2001:db8::/48"}); visor::Config c; c.config_set("num_periods", 1); - InputResourcesStreamHandler resources_handler{"net-test", &stream, &c}; + InputResourcesStreamHandler resources_handler{"resource-test", &stream, &c}; resources_handler.start(); stream.start(); + stream.attached_policies(1, 2); stream.stop(); resources_handler.stop(); auto event_data = resources_handler.metrics()->bucket(0)->event_data_locked(); CHECK(resources_handler.metrics()->current_periods() == 1); - CHECK(event_data.num_events->value() == 1); + CHECK(event_data.num_events->value() == 2); nlohmann::json j; resources_handler.metrics()->bucket(0)->to_json(j); CHECK(j["cpu_percentage"]["p50"] == 0.0); CHECK(j["memory_bytes"]["p50"] != nullptr); + CHECK(j["policies_attached"] == 1); + CHECK(j["handlers_attached"] == 2); } TEST_CASE("Check resources for sflow input", "[sflow][resources]") @@ -77,21 +83,24 @@ TEST_CASE("Check resources for sflow input", "[sflow][resources]") visor::Config c; c.config_set("num_periods", 1); - InputResourcesStreamHandler resources_handler{"net-test", &stream, &c}; + InputResourcesStreamHandler resources_handler{"resource-test", &stream, &c}; resources_handler.start(); stream.start(); + stream.attached_policies(1, 2); stream.stop(); resources_handler.stop(); auto event_data = resources_handler.metrics()->bucket(0)->event_data_locked(); CHECK(resources_handler.metrics()->current_periods() == 1); - CHECK(event_data.num_events->value() == 1); + CHECK(event_data.num_events->value() == 2); nlohmann::json j; resources_handler.metrics()->bucket(0)->to_json(j); CHECK(j["cpu_percentage"]["p50"] == 0.0); CHECK(j["memory_bytes"]["p50"] != nullptr); + CHECK(j["policies_attached"] == 1); + CHECK(j["handlers_attached"] == 2); } \ No newline at end of file diff --git a/src/inputs/dnstap/DnstapInputStream.h b/src/inputs/dnstap/DnstapInputStream.h index ca1f962aa..1c220652d 100644 --- a/src/inputs/dnstap/DnstapInputStream.h +++ b/src/inputs/dnstap/DnstapInputStream.h @@ -8,7 +8,6 @@ #include "InputStream.h" #include "dnstap.pb.h" #include -#include #include #include #include @@ -96,7 +95,7 @@ class DnstapInputStream : public visor::InputStream void info_json(json &j) const override; size_t consumer_count() const override { - return dnstap_signal.slot_count(); + return attached_policies.slot_count() + dnstap_signal.slot_count(); } // handler functionality diff --git a/src/inputs/mock/MockInputStream.h b/src/inputs/mock/MockInputStream.h index bcfaa5b5e..fda649d05 100644 --- a/src/inputs/mock/MockInputStream.h +++ b/src/inputs/mock/MockInputStream.h @@ -5,7 +5,6 @@ #pragma once #include "InputStream.h" -#include #include #include @@ -31,7 +30,7 @@ class MockInputStream : public visor::InputStream void info_json(json &j) const override; size_t consumer_count() const override { - return random_int_signal.slot_count(); + return attached_policies.slot_count() + random_int_signal.slot_count(); } // handler functionality diff --git a/src/inputs/pcap/PcapInputStream.h b/src/inputs/pcap/PcapInputStream.h index 943d409df..3b6aacccd 100644 --- a/src/inputs/pcap/PcapInputStream.h +++ b/src/inputs/pcap/PcapInputStream.h @@ -15,7 +15,6 @@ #include "utils.h" #include #include -#include #include #include #ifdef __linux__ @@ -87,7 +86,7 @@ class PcapInputStream : public visor::InputStream void info_json(json &j) const override; size_t consumer_count() const override { - return packet_signal.slot_count() + udp_signal.slot_count() + start_tstamp_signal.slot_count() + tcp_message_ready_signal.slot_count() + tcp_connection_start_signal.slot_count() + tcp_connection_end_signal.slot_count() + tcp_reassembly_error_signal.slot_count() + pcap_stats_signal.slot_count(); + return attached_policies.slot_count() + packet_signal.slot_count() + udp_signal.slot_count() + start_tstamp_signal.slot_count() + tcp_message_ready_signal.slot_count() + tcp_connection_start_signal.slot_count() + tcp_connection_end_signal.slot_count() + tcp_reassembly_error_signal.slot_count() + pcap_stats_signal.slot_count(); } // utilities diff --git a/src/inputs/sflow/SflowInputStream.h b/src/inputs/sflow/SflowInputStream.h index 7636ace68..7ef9b56fc 100644 --- a/src/inputs/sflow/SflowInputStream.h +++ b/src/inputs/sflow/SflowInputStream.h @@ -6,7 +6,6 @@ #include "InputStream.h" #include "SflowData.h" -#include #include #include @@ -39,7 +38,7 @@ class SflowInputStream : public visor::InputStream void info_json(json &j) const override; size_t consumer_count() const override { - return sflow_signal.slot_count(); + return attached_policies.slot_count() + sflow_signal.slot_count(); } // handler functionality From 165297d72d402862a23f8844e2aedd3abd6792e5 Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Wed, 23 Mar 2022 17:59:09 -0400 Subject: [PATCH 19/22] Properly handle policies and handlers count on input resources handler --- src/InputStream.h | 24 ++++++----- src/Policies.cpp | 26 ++++++------ src/Policies.h | 14 ++++++- .../InputResourcesStreamHandler.cpp | 40 +++++++++++++++---- .../InputResourcesStreamHandler.h | 15 ++++++- .../tests/test_resources_layer.cpp | 23 +++++------ src/inputs/dnstap/DnstapInputStream.h | 2 +- src/inputs/mock/MockInputStream.h | 2 +- src/inputs/pcap/PcapInputStream.h | 2 +- src/inputs/sflow/SflowInputStream.h | 2 +- 10 files changed, 97 insertions(+), 53 deletions(-) diff --git a/src/InputStream.h b/src/InputStream.h index eb17531bd..2aceb6374 100644 --- a/src/InputStream.h +++ b/src/InputStream.h @@ -13,9 +13,14 @@ namespace visor { class InputStream : public AbstractRunnableModule { mutable std::shared_mutex _input_mutex; - std::map _policies; + std::vector _policies; public: + enum class Action { + AddPolicy, + RemovePolicy + }; + InputStream(const std::string &name) : AbstractRunnableModule(name) { @@ -23,21 +28,18 @@ class InputStream : public AbstractRunnableModule virtual ~InputStream(){}; - void add_policy(const Policy *policy, uint16_t handlers) + void add_policy(const Policy *policy) { std::unique_lock lock(_input_mutex); - _policies[policy] = handlers; - attached_policies(1, handlers); + _policies.push_back(policy); + policy_signal(policy, Action::AddPolicy); } void remove_policy(const Policy *policy) { std::unique_lock lock(_input_mutex); - auto iterator = _policies.find(policy); - if (iterator != _policies.end()) { - attached_policies(-1, -iterator->second); - _policies.erase(iterator); - } + _policies.erase(std::remove(_policies.begin(), _policies.end(), policy), _policies.end()); + policy_signal(policy, Action::RemovePolicy); } size_t policies_count() const @@ -48,7 +50,7 @@ class InputStream : public AbstractRunnableModule virtual size_t consumer_count() const { - return attached_policies.slot_count(); + return policy_signal.slot_count(); } void common_info_json(json &j) const @@ -58,7 +60,7 @@ class InputStream : public AbstractRunnableModule j["input"]["consumers"] = consumer_count(); } - mutable sigslot::signal attached_policies; + mutable sigslot::signal policy_signal; }; } diff --git a/src/Policies.cpp b/src/Policies.cpp index 05270ba6d..8ec0dc938 100644 --- a/src/Policies.cpp +++ b/src/Policies.cpp @@ -146,12 +146,7 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) throw PolicyException(fmt::format("unable to instantiate tap '{}': {}", tap_name, e.what())); } } - // Create Policy - auto policy = std::make_unique(policy_name, tap); - // if and only if policy succeeds, we will return this in result set - Policy *policy_ptr = policy.get(); - policy->set_input_stream(input_ptr); - // Handler Section + // Handler type if (!it->second["handlers"] || !it->second["handlers"].IsMap()) { throw PolicyException("missing or invalid handler configuration at key 'handlers'"); } @@ -162,6 +157,14 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) } else if (handler_node["modules"].IsSequence()) { handler_sequence = true; } + + // Create Policy + auto policy = std::make_unique(policy_name, tap, handler_sequence); + // if and only if policy succeeds, we will return this in result set + Policy *policy_ptr = policy.get(); + policy->set_input_stream(input_ptr); + + // Handler Section Config window_config; if (handler_node["window_config"] && handler_node["window_config"].IsMap()) { try { @@ -179,7 +182,7 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) std::unique_ptr resources_module; if (input_stream) { // create new policy with resources handler for input stream - input_resources_policy = std::make_unique(input_stream_module_name + "-resources", tap); + input_resources_policy = std::make_unique(input_stream_module_name + "-resources", tap, false); input_resources_policy->set_input_stream(input_ptr); auto resources_handler_plugin = _registry->handler_plugins().find("input_resources"); if (resources_handler_plugin != _registry->handler_plugins().end()) { @@ -344,14 +347,9 @@ std::vector PolicyManager::load(const YAML::Node &policy_yaml) // success if (input_res_policy_ptr) { - input_ptr->add_policy(input_res_policy_ptr, input_res_policy_ptr->modules().size()); + input_ptr->add_policy(input_res_policy_ptr); } - if (handler_sequence) { - input_ptr->add_policy(policy_ptr, 1); - } else { - input_ptr->add_policy(policy_ptr, policy_ptr->modules().size()); - } - + input_ptr->add_policy(policy_ptr); result.push_back(policy_ptr); } return result; diff --git a/src/Policies.h b/src/Policies.h index 0aa65c241..0fc49cd07 100644 --- a/src/Policies.h +++ b/src/Policies.h @@ -28,15 +28,18 @@ class PolicyException : public std::runtime_error class Policy : public AbstractRunnableModule { + static constexpr size_t HANDLERS_SEQUENCE_SIZE = 1; Tap *_tap; InputStream *_input_stream; + bool _modules_sequence; std::vector _modules; public: - Policy(const std::string &name, Tap *tap) + Policy(const std::string &name, Tap *tap, bool modules_sequence) : AbstractRunnableModule(name) , _tap(tap) + , _modules_sequence(modules_sequence) , _input_stream(nullptr) { } @@ -66,6 +69,15 @@ class Policy : public AbstractRunnableModule return _modules; } + size_t get_handlers_list_size() const + { + if (_modules_sequence) { + return HANDLERS_SEQUENCE_SIZE; + } else { + return _modules.size(); + } + } + // life cycle void start() override; void stop() override; diff --git a/src/handlers/input_resources/InputResourcesStreamHandler.cpp b/src/handlers/input_resources/InputResourcesStreamHandler.cpp index c1eb76f4d..5cea45ea5 100644 --- a/src/handlers/input_resources/InputResourcesStreamHandler.cpp +++ b/src/handlers/input_resources/InputResourcesStreamHandler.cpp @@ -3,6 +3,7 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ #include "InputResourcesStreamHandler.h" +#include "Policies.h" namespace visor::handler::resources { @@ -40,13 +41,13 @@ void InputResourcesStreamHandler::start() if (_pcap_stream) { _pkt_connection = _pcap_stream->packet_signal.connect(&InputResourcesStreamHandler::process_packet_cb, this); - _policies_connection = _pcap_stream->attached_policies.connect(&InputResourcesStreamHandler::process_policies_cb, this); + _policies_connection = _pcap_stream->policy_signal.connect(&InputResourcesStreamHandler::process_policies_cb, this); } else if (_dnstap_stream) { _dnstap_connection = _dnstap_stream->dnstap_signal.connect(&InputResourcesStreamHandler::process_dnstap_cb, this); - _policies_connection = _dnstap_stream->attached_policies.connect(&InputResourcesStreamHandler::process_policies_cb, this); + _policies_connection = _dnstap_stream->policy_signal.connect(&InputResourcesStreamHandler::process_policies_cb, this); } else if (_sflow_stream) { _sflow_connection = _sflow_stream->sflow_signal.connect(&InputResourcesStreamHandler::process_sflow_cb, this); - _policies_connection = _sflow_stream->attached_policies.connect(&InputResourcesStreamHandler::process_policies_cb, this); + _policies_connection = _sflow_stream->policy_signal.connect(&InputResourcesStreamHandler::process_policies_cb, this); } _running = true; @@ -70,9 +71,23 @@ void InputResourcesStreamHandler::stop() _running = false; } -void InputResourcesStreamHandler::process_policies_cb(int16_t policies_number, int16_t handlers_count) +void InputResourcesStreamHandler::process_policies_cb(const Policy *policy, InputStream::Action action) { - _metrics->process_policies(policies_number, handlers_count); + int16_t policies_number = 0; + int16_t handlers_count = 0; + + switch (action) { + case InputStream::Action::AddPolicy: + policies_number = 1; + handlers_count = policy->get_handlers_list_size(); + break; + case InputStream::Action::RemovePolicy: + policies_number = -1; + handlers_count = -policy->get_handlers_list_size(); + break; + } + + _metrics->process_policies(policies_number, handlers_count, false); } void InputResourcesStreamHandler::process_sflow_cb([[maybe_unused]] const SFSample &) @@ -109,8 +124,12 @@ void InputResourcesMetricsBucket::specialized_merge(const AbstractMetricsBucket _cpu_percentage.merge(other._cpu_percentage); _memory_usage_kb.merge(other._memory_usage_kb); - _policies_number += other._policies_number; - _handlers_count += other._handlers_count; + if (other._policies_number.value() > _policies_number.value()) { + _policies_number += other._policies_number; + } + if (other._handlers_count.value() > _handlers_count.value()) { + _handlers_count += other._handlers_count; + } } void InputResourcesMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelMap add_labels) const @@ -179,8 +198,13 @@ void InputResourcesMetricsManager::process_resources(double cpu_usage, uint64_t live_bucket()->process_resources(cpu_usage, memory_usage); } -void InputResourcesMetricsManager::process_policies(int16_t policies_number, int16_t handlers_count) +void InputResourcesMetricsManager::process_policies(int16_t policies_number, int16_t handlers_count, bool self) { + if (!self) { + policies_total += policies_number; + handlers_total += handlers_count; + } + timespec stamp; // use now() std::timespec_get(&stamp, TIME_UTC); diff --git a/src/handlers/input_resources/InputResourcesStreamHandler.h b/src/handlers/input_resources/InputResourcesStreamHandler.h index 49b94b3a5..c97d75220 100644 --- a/src/handlers/input_resources/InputResourcesStreamHandler.h +++ b/src/handlers/input_resources/InputResourcesStreamHandler.h @@ -46,6 +46,7 @@ class InputResourcesMetricsBucket final : public visor::AbstractMetricsBucket } // visor::AbstractMetricsBucket + void specialized_merge(const AbstractMetricsBucket &other) override; void to_json(json &j) const override; void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const override; @@ -56,14 +57,24 @@ class InputResourcesMetricsBucket final : public visor::AbstractMetricsBucket class InputResourcesMetricsManager final : public visor::AbstractMetricsManager { + uint16_t policies_total; + uint16_t handlers_total; + public: InputResourcesMetricsManager(const Configurable *window_config) : visor::AbstractMetricsManager(window_config) + , policies_total(0) + , handlers_total(0) + { + } + + void on_period_shift([[maybe_unused]] timespec stamp, [[maybe_unused]] const InputResourcesMetricsBucket *maybe_expiring_bucket) override { + process_policies(policies_total, handlers_total, true); } void process_resources(double cpu_usage, uint64_t memory_usage, timespec stamp = timespec()); - void process_policies(int16_t policies_number, int16_t handlers_count); + void process_policies(int16_t policies_number, int16_t handlers_count, bool self); }; class InputResourcesStreamHandler final : public visor::StreamMetricsHandler @@ -85,7 +96,7 @@ class InputResourcesStreamHandler final : public visor::StreamMetricsHandlerbucket(0)->event_data_locked(); CHECK(resources_handler.metrics()->current_periods() == 1); - CHECK(event_data.num_events->value() == 2); + CHECK(event_data.num_events->value() == 1); nlohmann::json j; resources_handler.metrics()->bucket(0)->to_json(j); CHECK(j["cpu_percentage"]["p50"] == 0.0); CHECK(j["memory_bytes"]["p50"] != nullptr); - CHECK(j["policies_attached"] == 1); - CHECK(j["handlers_attached"] == 2); + CHECK(j["policies_attached"] == 0); + CHECK(j["handlers_attached"] == 0); std::stringstream output; std::string line; @@ -44,7 +43,7 @@ TEST_CASE("Check resources for pcap input", "[pcap][resources]") std::getline(output, line); CHECK(line == "# TYPE base_total gauge"); std::getline(output, line); - CHECK(line == R"(base_total{policy="default"} 2)"); + CHECK(line == R"(base_total{policy="default"} 1)"); } TEST_CASE("Check resources for dnstap input", "[dnstap][resources]") @@ -58,22 +57,21 @@ TEST_CASE("Check resources for dnstap input", "[dnstap][resources]") resources_handler.start(); stream.start(); - stream.attached_policies(1, 2); stream.stop(); resources_handler.stop(); auto event_data = resources_handler.metrics()->bucket(0)->event_data_locked(); CHECK(resources_handler.metrics()->current_periods() == 1); - CHECK(event_data.num_events->value() == 2); + CHECK(event_data.num_events->value() == 1); nlohmann::json j; resources_handler.metrics()->bucket(0)->to_json(j); CHECK(j["cpu_percentage"]["p50"] == 0.0); CHECK(j["memory_bytes"]["p50"] != nullptr); - CHECK(j["policies_attached"] == 1); - CHECK(j["handlers_attached"] == 2); + CHECK(j["policies_attached"] == 0); + CHECK(j["handlers_attached"] == 0); } TEST_CASE("Check resources for sflow input", "[sflow][resources]") @@ -87,20 +85,19 @@ TEST_CASE("Check resources for sflow input", "[sflow][resources]") resources_handler.start(); stream.start(); - stream.attached_policies(1, 2); stream.stop(); resources_handler.stop(); auto event_data = resources_handler.metrics()->bucket(0)->event_data_locked(); CHECK(resources_handler.metrics()->current_periods() == 1); - CHECK(event_data.num_events->value() == 2); + CHECK(event_data.num_events->value() == 1); nlohmann::json j; resources_handler.metrics()->bucket(0)->to_json(j); CHECK(j["cpu_percentage"]["p50"] == 0.0); CHECK(j["memory_bytes"]["p50"] != nullptr); - CHECK(j["policies_attached"] == 1); - CHECK(j["handlers_attached"] == 2); + CHECK(j["policies_attached"] == 0); + CHECK(j["handlers_attached"] == 0); } \ No newline at end of file diff --git a/src/inputs/dnstap/DnstapInputStream.h b/src/inputs/dnstap/DnstapInputStream.h index 1c220652d..b98ea5f94 100644 --- a/src/inputs/dnstap/DnstapInputStream.h +++ b/src/inputs/dnstap/DnstapInputStream.h @@ -95,7 +95,7 @@ class DnstapInputStream : public visor::InputStream void info_json(json &j) const override; size_t consumer_count() const override { - return attached_policies.slot_count() + dnstap_signal.slot_count(); + return policy_signal.slot_count() + dnstap_signal.slot_count(); } // handler functionality diff --git a/src/inputs/mock/MockInputStream.h b/src/inputs/mock/MockInputStream.h index fda649d05..bea58f4eb 100644 --- a/src/inputs/mock/MockInputStream.h +++ b/src/inputs/mock/MockInputStream.h @@ -30,7 +30,7 @@ class MockInputStream : public visor::InputStream void info_json(json &j) const override; size_t consumer_count() const override { - return attached_policies.slot_count() + random_int_signal.slot_count(); + return policy_signal.slot_count() + random_int_signal.slot_count(); } // handler functionality diff --git a/src/inputs/pcap/PcapInputStream.h b/src/inputs/pcap/PcapInputStream.h index 3b6aacccd..beea87211 100644 --- a/src/inputs/pcap/PcapInputStream.h +++ b/src/inputs/pcap/PcapInputStream.h @@ -86,7 +86,7 @@ class PcapInputStream : public visor::InputStream void info_json(json &j) const override; size_t consumer_count() const override { - return attached_policies.slot_count() + packet_signal.slot_count() + udp_signal.slot_count() + start_tstamp_signal.slot_count() + tcp_message_ready_signal.slot_count() + tcp_connection_start_signal.slot_count() + tcp_connection_end_signal.slot_count() + tcp_reassembly_error_signal.slot_count() + pcap_stats_signal.slot_count(); + return policy_signal.slot_count() + packet_signal.slot_count() + udp_signal.slot_count() + start_tstamp_signal.slot_count() + tcp_message_ready_signal.slot_count() + tcp_connection_start_signal.slot_count() + tcp_connection_end_signal.slot_count() + tcp_reassembly_error_signal.slot_count() + pcap_stats_signal.slot_count(); } // utilities diff --git a/src/inputs/sflow/SflowInputStream.h b/src/inputs/sflow/SflowInputStream.h index 7ef9b56fc..f0b584eeb 100644 --- a/src/inputs/sflow/SflowInputStream.h +++ b/src/inputs/sflow/SflowInputStream.h @@ -38,7 +38,7 @@ class SflowInputStream : public visor::InputStream void info_json(json &j) const override; size_t consumer_count() const override { - return attached_policies.slot_count() + sflow_signal.slot_count(); + return policy_signal.slot_count() + sflow_signal.slot_count(); } // handler functionality From e069d25f299516f232fb7eed28475622cb228cee Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Thu, 24 Mar 2022 18:25:10 -0400 Subject: [PATCH 20/22] Only merge the fist bucket which is the more recent one --- src/Policies.h | 2 +- .../input_resources/InputResourcesStreamHandler.cpp | 7 ++++--- src/handlers/input_resources/InputResourcesStreamHandler.h | 2 ++ 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/Policies.h b/src/Policies.h index 0fc49cd07..1b459b383 100644 --- a/src/Policies.h +++ b/src/Policies.h @@ -31,8 +31,8 @@ class Policy : public AbstractRunnableModule static constexpr size_t HANDLERS_SEQUENCE_SIZE = 1; Tap *_tap; - InputStream *_input_stream; bool _modules_sequence; + InputStream *_input_stream; std::vector _modules; public: diff --git a/src/handlers/input_resources/InputResourcesStreamHandler.cpp b/src/handlers/input_resources/InputResourcesStreamHandler.cpp index 5cea45ea5..9786f93cc 100644 --- a/src/handlers/input_resources/InputResourcesStreamHandler.cpp +++ b/src/handlers/input_resources/InputResourcesStreamHandler.cpp @@ -124,11 +124,12 @@ void InputResourcesMetricsBucket::specialized_merge(const AbstractMetricsBucket _cpu_percentage.merge(other._cpu_percentage); _memory_usage_kb.merge(other._memory_usage_kb); - if (other._policies_number.value() > _policies_number.value()) { + + // Merge only the first bucket which is the more recent + if (!_merged) { _policies_number += other._policies_number; - } - if (other._handlers_count.value() > _handlers_count.value()) { _handlers_count += other._handlers_count; + _merged = true; } } diff --git a/src/handlers/input_resources/InputResourcesStreamHandler.h b/src/handlers/input_resources/InputResourcesStreamHandler.h index c97d75220..0808b1b0a 100644 --- a/src/handlers/input_resources/InputResourcesStreamHandler.h +++ b/src/handlers/input_resources/InputResourcesStreamHandler.h @@ -35,6 +35,7 @@ class InputResourcesMetricsBucket final : public visor::AbstractMetricsBucket Quantile _memory_usage_kb; Counter _policies_number; Counter _handlers_count; + bool _merged; public: InputResourcesMetricsBucket() @@ -42,6 +43,7 @@ class InputResourcesMetricsBucket final : public visor::AbstractMetricsBucket , _memory_usage_kb("resources", {"memory_bytes"}, "Quantiles of thread memory usage in bytes") , _policies_number("resources", {"policies_attached"}, "Total number of policies attached to the input stream") , _handlers_count("resources", {"handlers_attached"}, "Total number of handlers attached to the input stream") + , _merged(false) { } From 09610ce523380ae076f4bc4f282fdbd813456c51 Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Fri, 25 Mar 2022 13:09:04 -0400 Subject: [PATCH 21/22] Change variables name --- .../InputResourcesStreamHandler.cpp | 42 +++++++++---------- .../InputResourcesStreamHandler.h | 36 ++++++++-------- .../tests/test_resources_layer.cpp | 22 +++++----- 3 files changed, 48 insertions(+), 52 deletions(-) diff --git a/src/handlers/input_resources/InputResourcesStreamHandler.cpp b/src/handlers/input_resources/InputResourcesStreamHandler.cpp index 9786f93cc..cd87785b2 100644 --- a/src/handlers/input_resources/InputResourcesStreamHandler.cpp +++ b/src/handlers/input_resources/InputResourcesStreamHandler.cpp @@ -122,13 +122,13 @@ void InputResourcesMetricsBucket::specialized_merge(const AbstractMetricsBucket std::shared_lock r_lock(other._mutex); std::unique_lock w_lock(_mutex); - _cpu_percentage.merge(other._cpu_percentage); - _memory_usage_kb.merge(other._memory_usage_kb); + _cpu_usage.merge(other._cpu_usage); + _memory_bytes.merge(other._memory_bytes); // Merge only the first bucket which is the more recent if (!_merged) { - _policies_number += other._policies_number; - _handlers_count += other._handlers_count; + _policy_count += other._policy_count; + _handler_count += other._handler_count; _merged = true; } } @@ -145,10 +145,10 @@ void InputResourcesMetricsBucket::to_prometheus(std::stringstream &out, Metric:: std::shared_lock r_lock(_mutex); - _cpu_percentage.to_prometheus(out, add_labels); - _memory_usage_kb.to_prometheus(out, add_labels); - _policies_number.to_prometheus(out, add_labels); - _handlers_count.to_prometheus(out, add_labels); + _cpu_usage.to_prometheus(out, add_labels); + _memory_bytes.to_prometheus(out, add_labels); + _policy_count.to_prometheus(out, add_labels); + _handler_count.to_prometheus(out, add_labels); } void InputResourcesMetricsBucket::to_json(json &j) const @@ -165,26 +165,26 @@ void InputResourcesMetricsBucket::to_json(json &j) const std::shared_lock r_lock(_mutex); - _cpu_percentage.to_json(j); - _memory_usage_kb.to_json(j); - _policies_number.to_json(j); - _handlers_count.to_json(j); + _cpu_usage.to_json(j); + _memory_bytes.to_json(j); + _policy_count.to_json(j); + _handler_count.to_json(j); } void InputResourcesMetricsBucket::process_resources(double cpu_usage, uint64_t memory_usage) { std::unique_lock lock(_mutex); - _cpu_percentage.update(cpu_usage); - _memory_usage_kb.update(memory_usage); + _cpu_usage.update(cpu_usage); + _memory_bytes.update(memory_usage); } -void InputResourcesMetricsBucket::process_policies(int16_t policies_number, int16_t handlers_count) +void InputResourcesMetricsBucket::process_policies(int16_t policy_count, int16_t handler_count) { std::unique_lock lock(_mutex); - _policies_number += policies_number; - _handlers_count += handlers_count; + _policy_count += policy_count; + _handler_count += handler_count; } void InputResourcesMetricsManager::process_resources(double cpu_usage, uint64_t memory_usage, timespec stamp) @@ -199,11 +199,11 @@ void InputResourcesMetricsManager::process_resources(double cpu_usage, uint64_t live_bucket()->process_resources(cpu_usage, memory_usage); } -void InputResourcesMetricsManager::process_policies(int16_t policies_number, int16_t handlers_count, bool self) +void InputResourcesMetricsManager::process_policies(int16_t policy_count, int16_t handler_count, bool self) { if (!self) { - policies_total += policies_number; - handlers_total += handlers_count; + policy_total += policy_count; + handler_total += handler_count; } timespec stamp; @@ -212,6 +212,6 @@ void InputResourcesMetricsManager::process_policies(int16_t policies_number, int // base event new_event(stamp); // process in the "live" bucket. this will parse the resources if we are deep sampling - live_bucket()->process_policies(policies_number, handlers_count); + live_bucket()->process_policies(policy_count, handler_count); } } \ No newline at end of file diff --git a/src/handlers/input_resources/InputResourcesStreamHandler.h b/src/handlers/input_resources/InputResourcesStreamHandler.h index 0808b1b0a..440f2d832 100644 --- a/src/handlers/input_resources/InputResourcesStreamHandler.h +++ b/src/handlers/input_resources/InputResourcesStreamHandler.h @@ -22,7 +22,7 @@ using namespace visor::input::dnstap; using namespace visor::input::mock; using namespace visor::input::sflow; -constexpr double MEASURE_INTERVAL = 10; // in seconds +constexpr double MEASURE_INTERVAL = 5; // in seconds class InputResourcesMetricsBucket final : public visor::AbstractMetricsBucket { @@ -30,19 +30,18 @@ class InputResourcesMetricsBucket final : public visor::AbstractMetricsBucket protected: mutable std::shared_mutex _mutex; - // total numPackets is tracked in base class num_events - Quantile _cpu_percentage; - Quantile _memory_usage_kb; - Counter _policies_number; - Counter _handlers_count; + Quantile _cpu_usage; + Quantile _memory_bytes; + Counter _policy_count; + Counter _handler_count; bool _merged; public: InputResourcesMetricsBucket() - : _cpu_percentage("resources", {"cpu_percentage"}, "Quantiles of thread cpu usage") - , _memory_usage_kb("resources", {"memory_bytes"}, "Quantiles of thread memory usage in bytes") - , _policies_number("resources", {"policies_attached"}, "Total number of policies attached to the input stream") - , _handlers_count("resources", {"handlers_attached"}, "Total number of handlers attached to the input stream") + : _cpu_usage("resources", {"cpu_usage"}, "Quantiles of 5s averages of percent cpu usage by the input stream") + , _memory_bytes("resources", {"memory_bytes"}, "Quantiles of 5s averages of memory usage (in bytes) by the input stream") + , _policy_count("resources", {"policy_count"}, "Total number of policies attached to the input stream") + , _handler_count("resources", {"handler_count"}, "Total number of handlers attached to the input stream") , _merged(false) { } @@ -54,38 +53,37 @@ class InputResourcesMetricsBucket final : public visor::AbstractMetricsBucket void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const override; void process_resources(double cpu_usage, uint64_t memory_usage); - void process_policies(int16_t policies_number, int16_t handlers_count); + void process_policies(int16_t policy_count, int16_t handler_count); }; class InputResourcesMetricsManager final : public visor::AbstractMetricsManager { - uint16_t policies_total; - uint16_t handlers_total; + uint16_t policy_total; + uint16_t handler_total; public: InputResourcesMetricsManager(const Configurable *window_config) : visor::AbstractMetricsManager(window_config) - , policies_total(0) - , handlers_total(0) + , policy_total(0) + , handler_total(0) { } void on_period_shift([[maybe_unused]] timespec stamp, [[maybe_unused]] const InputResourcesMetricsBucket *maybe_expiring_bucket) override { - process_policies(policies_total, handlers_total, true); + process_policies(policy_total, handler_total, true); } void process_resources(double cpu_usage, uint64_t memory_usage, timespec stamp = timespec()); - void process_policies(int16_t policies_number, int16_t handlers_count, bool self); + void process_policies(int16_t policy_count, int16_t handler_count, bool self); }; class InputResourcesStreamHandler final : public visor::StreamMetricsHandler { - - // the input stream sources we support (only one will be in use at a time) ThreadMonitor _monitor; time_t _timer; timespec _timestamp; + PcapInputStream *_pcap_stream{nullptr}; DnstapInputStream *_dnstap_stream{nullptr}; MockInputStream *_mock_stream{nullptr}; diff --git a/src/handlers/input_resources/tests/test_resources_layer.cpp b/src/handlers/input_resources/tests/test_resources_layer.cpp index c26f7d91f..1a0ba6ee5 100644 --- a/src/handlers/input_resources/tests/test_resources_layer.cpp +++ b/src/handlers/input_resources/tests/test_resources_layer.cpp @@ -25,15 +25,15 @@ TEST_CASE("Check resources for pcap input", "[pcap][resources]") auto event_data = resources_handler.metrics()->bucket(0)->event_data_locked(); CHECK(resources_handler.metrics()->current_periods() == 1); - CHECK(event_data.num_events->value() == 1); + CHECK(event_data.num_events->value() >= 1); nlohmann::json j; resources_handler.metrics()->bucket(0)->to_json(j); - CHECK(j["cpu_percentage"]["p50"] == 0.0); + CHECK(j["cpu_usage"]["p50"] != nullptr); CHECK(j["memory_bytes"]["p50"] != nullptr); - CHECK(j["policies_attached"] == 0); - CHECK(j["handlers_attached"] == 0); + CHECK(j["policy_count"] == 0); + CHECK(j["handler_count"] == 0); std::stringstream output; std::string line; @@ -42,8 +42,6 @@ TEST_CASE("Check resources for pcap input", "[pcap][resources]") CHECK(line == "# HELP base_total Total number of events"); std::getline(output, line); CHECK(line == "# TYPE base_total gauge"); - std::getline(output, line); - CHECK(line == R"(base_total{policy="default"} 1)"); } TEST_CASE("Check resources for dnstap input", "[dnstap][resources]") @@ -68,10 +66,10 @@ TEST_CASE("Check resources for dnstap input", "[dnstap][resources]") nlohmann::json j; resources_handler.metrics()->bucket(0)->to_json(j); - CHECK(j["cpu_percentage"]["p50"] == 0.0); + CHECK(j["cpu_usage"]["p50"] != nullptr); CHECK(j["memory_bytes"]["p50"] != nullptr); - CHECK(j["policies_attached"] == 0); - CHECK(j["handlers_attached"] == 0); + CHECK(j["policy_count"] == 0); + CHECK(j["handler_count"] == 0); } TEST_CASE("Check resources for sflow input", "[sflow][resources]") @@ -96,8 +94,8 @@ TEST_CASE("Check resources for sflow input", "[sflow][resources]") nlohmann::json j; resources_handler.metrics()->bucket(0)->to_json(j); - CHECK(j["cpu_percentage"]["p50"] == 0.0); + CHECK(j["cpu_usage"]["p50"] != nullptr); CHECK(j["memory_bytes"]["p50"] != nullptr); - CHECK(j["policies_attached"] == 0); - CHECK(j["handlers_attached"] == 0); + CHECK(j["policy_count"] == 0); + CHECK(j["handler_count"] == 0); } \ No newline at end of file From 3351676b91e02612817627b97f3918f3a2ce166d Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Mon, 28 Mar 2022 11:55:49 -0400 Subject: [PATCH 22/22] improve code coverage of input resources module --- src/handlers/input_resources/InputResourcesStreamHandler.cpp | 4 ++-- src/handlers/input_resources/InputResourcesStreamHandler.h | 2 +- src/handlers/input_resources/tests/test_resources_layer.cpp | 5 +++++ 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/handlers/input_resources/InputResourcesStreamHandler.cpp b/src/handlers/input_resources/InputResourcesStreamHandler.cpp index cd87785b2..7182a4a69 100644 --- a/src/handlers/input_resources/InputResourcesStreamHandler.cpp +++ b/src/handlers/input_resources/InputResourcesStreamHandler.cpp @@ -87,7 +87,7 @@ void InputResourcesStreamHandler::process_policies_cb(const Policy *policy, Inpu break; } - _metrics->process_policies(policies_number, handlers_count, false); + _metrics->process_policies(policies_number, handlers_count); } void InputResourcesStreamHandler::process_sflow_cb([[maybe_unused]] const SFSample &) @@ -108,7 +108,7 @@ void InputResourcesStreamHandler::process_dnstap_cb([[maybe_unused]] const dnsta void InputResourcesStreamHandler::process_packet_cb([[maybe_unused]] pcpp::Packet &payload, [[maybe_unused]] PacketDirection dir, [[maybe_unused]] pcpp::ProtocolType l3, [[maybe_unused]] pcpp::ProtocolType l4, [[maybe_unused]] timespec stamp) { - if (stamp.tv_sec >= MEASURE_INTERVAL + _timestamp.tv_sec) { + if (stamp.tv_sec >= _timestamp.tv_sec + MEASURE_INTERVAL) { _timestamp = stamp; _metrics->process_resources(_monitor.cpu_percentage(), _monitor.memory_usage()); } diff --git a/src/handlers/input_resources/InputResourcesStreamHandler.h b/src/handlers/input_resources/InputResourcesStreamHandler.h index 440f2d832..e3f4f18bc 100644 --- a/src/handlers/input_resources/InputResourcesStreamHandler.h +++ b/src/handlers/input_resources/InputResourcesStreamHandler.h @@ -75,7 +75,7 @@ class InputResourcesMetricsManager final : public visor::AbstractMetricsManager< } void process_resources(double cpu_usage, uint64_t memory_usage, timespec stamp = timespec()); - void process_policies(int16_t policy_count, int16_t handler_count, bool self); + void process_policies(int16_t policy_count, int16_t handler_count, bool self = false); }; class InputResourcesStreamHandler final : public visor::StreamMetricsHandler diff --git a/src/handlers/input_resources/tests/test_resources_layer.cpp b/src/handlers/input_resources/tests/test_resources_layer.cpp index 1a0ba6ee5..0965c8daf 100644 --- a/src/handlers/input_resources/tests/test_resources_layer.cpp +++ b/src/handlers/input_resources/tests/test_resources_layer.cpp @@ -4,6 +4,7 @@ #include "InputResourcesStreamHandler.h" #include "PcapInputStream.h" #include "SflowInputStream.h" +#include "Policies.h" using namespace visor::handler::resources; @@ -19,6 +20,10 @@ TEST_CASE("Check resources for pcap input", "[pcap][resources]") resources_handler.start(); stream.start(); + //add and remove policy + auto policy = std::make_unique("policy-test", nullptr, false); + stream.add_policy(policy.get()); + stream.remove_policy(policy.get()); resources_handler.stop(); stream.stop();