Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ThreadMonitor for linux systems #230

Merged
27 commits merged into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
06879c0
Implement ThreadMonitor for linux systems
Mar 3, 2022
8140b11
Merge branch 'develop' into feature/cpu-mem-info
Mar 10, 2022
98b8d26
Merge branch 'develop' into feature/cpu-mem-info
Mar 14, 2022
03215a9
Resources metrics as Handler
Mar 14, 2022
fd99353
Implement cpu percentage based on htop program
Mar 17, 2022
dd3fd33
reuse pcap timestamp to check timediff
Mar 17, 2022
89e9295
Add Resources Handler to every new input stream
Mar 17, 2022
034e461
add unit tests for resources handler
Mar 18, 2022
17e3f34
add coverage to prometheus method
Mar 18, 2022
50674b4
Merge branch 'develop' into feature/cpu-mem-info
Mar 21, 2022
bc9e2f6
Merge branch 'develop' into feature/cpu-mem-info
Mar 21, 2022
b4a68a4
rename resources handler to input resources handler
Mar 21, 2022
d2765d2
fix typo
Mar 21, 2022
5ebfb24
create new policy with resource handler for each new input
Mar 21, 2022
b8b490f
remove not necessary include from CoreServer
Mar 21, 2022
0a50c76
Add unique name for resources handler
Mar 22, 2022
e78a13a
add input resources policy to input policy list
Mar 22, 2022
70be023
only add input resource policy if the creation policy succeed
Mar 22, 2022
47f6de0
verify if input resources policy exists before removing it
Mar 22, 2022
95fa310
start input resources policy only after the added policy
Mar 22, 2022
eff1db9
do proper roll back policy
Mar 22, 2022
3856f79
Merge branch 'develop' into feature/cpu-mem-info
Mar 22, 2022
a1cfe97
Add support to count policies and handlers attached to input stream
Mar 23, 2022
165297d
Properly handle policies and handlers count on input resources handler
Mar 23, 2022
e069d25
Only merge the fist bucket which is the more recent one
Mar 24, 2022
09610ce
Change variables name
Mar 25, 2022
3351676
improve code coverage of input resources module
Mar 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions src/Policies.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,18 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
window_config.config_set<uint64_t>("deep_sample_rate", _default_deep_sample_rate);
}

std::unique_ptr<Policy> input_resources_policy;
std::unique_ptr<StreamHandler> resources_module;
if (input_stream) {
weyrick marked this conversation as resolved.
Show resolved Hide resolved
// create new policy with resources handler for input stream
input_resources_policy = std::make_unique<Policy>(input_stream_module_name + "-resources", tap);
input_resources_policy->set_input_stream(input_ptr);
auto resources_handler_plugin = _registry->handler_plugins().find("input_resources");
resources_module = resources_handler_plugin->second->instantiate(input_stream_module_name + "-resources", input_ptr, &window_config);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's put a check to make sure resources_handler_plugin was found, just in case

input_resources_policy->add_module(resources_module.get());
input_stream->add_policy(input_resources_policy.get());
}

std::vector<std::unique_ptr<StreamHandler>> handler_modules;
for (YAML::const_iterator h_it = handler_node["modules"].begin(); h_it != handler_node["modules"].end(); ++h_it) {

Expand Down Expand Up @@ -268,6 +280,9 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)

// make sure policy starts before committing
try {
if (input_resources_policy) {
input_resources_policy->start();
weyrick marked this conversation as resolved.
Show resolved Hide resolved
}
policy->start();
} catch (std::runtime_error &e) {
throw PolicyException(fmt::format("policy [{}] failed to start: {}", policy_name, e.what()));
Expand All @@ -278,12 +293,16 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
// roll back during exception ensures no modules have been added to any of the managers
try {
module_add(std::move(policy));
if (input_resources_policy) {
module_add(std::move(input_resources_policy));
}
} catch (ModuleException &e) {
throw PolicyException(fmt::format("policy [{}] creation failed (policy): {}", policy_name, e.what()));
}
try {
if (input_stream) {
if (input_stream && resources_module) {
_registry->input_manager()->module_add(std::move(input_stream));
_registry->handler_manager()->module_add(std::move(resources_module));
}
} catch (ModuleException &e) {
weyrick marked this conversation as resolved.
Show resolved Hide resolved
// note that if this call excepts, we are in an unknown state and the exception will propagate
Expand Down Expand Up @@ -336,8 +355,13 @@ void PolicyManager::remove_policy(const std::string &name)
_registry->handler_manager()->module_remove(name);
}

if (!policy->input_stream()->policies_count()) {
if (policy->input_stream()->policies_count() <= 1) {
weyrick marked this conversation as resolved.
Show resolved Hide resolved
auto resources_name = input_name + "-resources";
auto resources_policy = _map[resources_name].get();
resources_policy->stop();
_registry->handler_manager()->module_remove(resources_name);
_registry->input_manager()->module_remove(input_name);
_map.erase(resources_name);
}

_map.erase(name);
Expand Down
1 change: 1 addition & 0 deletions src/handlers/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ add_subdirectory(dns)
add_subdirectory(dhcp)
add_subdirectory(pcap)
add_subdirectory(mock)
add_subdirectory(input_resources)

set(VISOR_STATIC_PLUGINS ${VISOR_STATIC_PLUGINS} PARENT_SCOPE)
1 change: 1 addition & 0 deletions src/handlers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ See the individual READMEs for more information:
* [Mock](mock/)
* [Network](net/)
* [PCAP](pcap/)
* [Resources](resources/)
28 changes: 28 additions & 0 deletions src/handlers/input_resources/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
message(STATUS "Handler Module: Input Resources")

set_directory_properties(PROPERTIES CORRADE_USE_PEDANTIC_FLAGS ON)

corrade_add_static_plugin(VisorHandlerInputResources
${CMAKE_CURRENT_BINARY_DIR}
InputResourcesHandler.conf
InputResourcesHandlerModulePlugin.cpp
InputResourcesStreamHandler.cpp
)
add_library(Visor::Handler::InputResources ALIAS VisorHandlerInputResources)

target_include_directories(VisorHandlerInputResources
INTERFACE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
)

target_link_libraries(VisorHandlerInputResources
PUBLIC
Visor::Input::Pcap
Visor::Input::Dnstap
Visor::Input::Mock
Visor::Input::Sflow
)

set(VISOR_STATIC_PLUGINS ${VISOR_STATIC_PLUGINS} Visor::Handler::InputResources PARENT_SCOPE)

add_subdirectory(tests)
5 changes: 5 additions & 0 deletions src/handlers/input_resources/InputResourcesHandler.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Aliases
provides=input_resources
[data]
desc=Input Resources analyzer
type=handler
31 changes: 31 additions & 0 deletions src/handlers/input_resources/InputResourcesHandlerModulePlugin.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */

#include "InputResourcesHandlerModulePlugin.h"
#include "CoreRegistry.h"
#include "InputResourcesStreamHandler.h"
#include "HandlerManager.h"
#include "InputStreamManager.h"
#include <Corrade/PluginManager/AbstractManager.h>
#include <nlohmann/json.hpp>

CORRADE_PLUGIN_REGISTER(VisorHandlerInputResources, visor::handler::resources::InputResourcesHandlerModulePlugin,
"visor.module.handler/1.0")

namespace visor::handler::resources {

using json = nlohmann::json;

void InputResourcesHandlerModulePlugin::setup_routes(HttpServer *svr)
{
}
std::unique_ptr<StreamHandler> InputResourcesHandlerModulePlugin::instantiate(const std::string &name, InputStream *input_stream, const Configurable *config, StreamHandler *stream_handler)
{
// TODO using config as both window config and module config
auto handler_module = std::make_unique<InputResourcesStreamHandler>(name, input_stream, config, stream_handler);
handler_module->config_merge(*config);
return handler_module;
}

}
24 changes: 24 additions & 0 deletions src/handlers/input_resources/InputResourcesHandlerModulePlugin.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */

#pragma once

#include "HandlerModulePlugin.h"

namespace visor::handler::resources {

class InputResourcesHandlerModulePlugin : public HandlerModulePlugin
{

protected:
void setup_routes(HttpServer *svr) override;

public:
explicit InputResourcesHandlerModulePlugin(Corrade::PluginManager::AbstractManager &manager, const std::string &plugin)
: visor::HandlerModulePlugin{manager, plugin}
{
}
std::unique_ptr<StreamHandler> instantiate(const std::string &name, InputStream *input_stream, const Configurable *config, StreamHandler *stream_handler = nullptr) override;
};
}
158 changes: 158 additions & 0 deletions src/handlers/input_resources/InputResourcesStreamHandler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */

#include "InputResourcesStreamHandler.h"

namespace visor::handler::resources {

InputResourcesStreamHandler::InputResourcesStreamHandler(const std::string &name, InputStream *stream, const Configurable *window_config, StreamHandler *handler)
: visor::StreamMetricsHandler<InputResourcesMetricsManager>(name, window_config)
, _timer(0)
, _timestamp(timespec())
{
if (handler) {
throw StreamHandlerException(fmt::format("ResourcesStreamHandler: unsupported upstream chained stream handler {}", handler->name()));
}

assert(stream);
// figure out which input stream we have
if (stream) {
_pcap_stream = dynamic_cast<PcapInputStream *>(stream);
_dnstap_stream = dynamic_cast<DnstapInputStream *>(stream);
_mock_stream = dynamic_cast<MockInputStream *>(stream);
_sflow_stream = dynamic_cast<SflowInputStream *>(stream);
if (!_pcap_stream && !_mock_stream && !_dnstap_stream && !_sflow_stream) {
throw StreamHandlerException(fmt::format("NetStreamHandler: unsupported input stream {}", stream->name()));
}
}
}

void InputResourcesStreamHandler::start()
{
if (_running) {
return;
}

if (config_exists("recorded_stream")) {
_metrics->set_recorded_stream();
}

if (_pcap_stream) {
_pkt_connection = _pcap_stream->packet_signal.connect(&InputResourcesStreamHandler::process_packet_cb, this);
} else if (_dnstap_stream) {
_dnstap_connection = _dnstap_stream->dnstap_signal.connect(&InputResourcesStreamHandler::process_dnstap_cb, this);
} else if (_sflow_stream) {
_sflow_connection = _sflow_stream->sflow_signal.connect(&InputResourcesStreamHandler::process_sflow_cb, this);
}

_running = true;
}

void InputResourcesStreamHandler::stop()
{
if (!_running) {
return;
}

if (_pcap_stream) {
_pkt_connection.disconnect();
} else if (_dnstap_stream) {
_dnstap_connection.disconnect();
} else if (_sflow_stream) {
_sflow_connection.disconnect();
}

_running = false;
}

void InputResourcesStreamHandler::process_sflow_cb([[maybe_unused]] const SFSample &)
{
if (difftime(time(NULL), _timer) >= MEASURE_INTERVAL) {
_timer = time(NULL);
_metrics->process_resources();
}
}

void InputResourcesStreamHandler::process_dnstap_cb([[maybe_unused]] const dnstap::Dnstap &)
{
if (difftime(time(NULL), _timer) >= MEASURE_INTERVAL) {
_timer = time(NULL);
_metrics->process_resources();
}
}

void InputResourcesStreamHandler::process_packet_cb([[maybe_unused]] pcpp::Packet &payload, [[maybe_unused]] PacketDirection dir, [[maybe_unused]] pcpp::ProtocolType l3, [[maybe_unused]] pcpp::ProtocolType l4, [[maybe_unused]] timespec stamp)
{
if (stamp.tv_sec >= MEASURE_INTERVAL + _timestamp.tv_sec) {
_timestamp = stamp;
_metrics->process_resources(stamp);
}
}

void InputResourcesMetricsBucket::specialized_merge(const AbstractMetricsBucket &o)
{
// static because caller guarantees only our own bucket type
const auto &other = static_cast<const InputResourcesMetricsBucket &>(o);

std::shared_lock r_lock(other._mutex);
std::unique_lock w_lock(_mutex);

_cpu_percentage.merge(other._cpu_percentage);
_memory_usage_kb.merge(other._memory_usage_kb);
}

void InputResourcesMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelMap add_labels) const
{
{
auto [num_events, num_samples, event_rate, event_lock] = event_data_locked(); // thread safe

event_rate->to_prometheus(out, add_labels);
num_events->to_prometheus(out, add_labels);
num_samples->to_prometheus(out, add_labels);
}

std::shared_lock r_lock(_mutex);

_cpu_percentage.to_prometheus(out, add_labels);
_memory_usage_kb.to_prometheus(out, add_labels);
}

void InputResourcesMetricsBucket::to_json(json &j) const
{
bool live_rates = !read_only() && !recorded_stream();

{
auto [num_events, num_samples, event_rate, event_lock] = event_data_locked(); // thread safe

event_rate->to_json(j, live_rates);
num_events->to_json(j);
num_samples->to_json(j);
}

std::shared_lock r_lock(_mutex);

_cpu_percentage.to_json(j);
_memory_usage_kb.to_json(j);
}

void InputResourcesMetricsBucket::process_resources()
{
std::unique_lock lock(_mutex);

_cpu_percentage.update(_monitor.cpu_percentage());
_memory_usage_kb.update(_monitor.memory_usage());
}

void InputResourcesMetricsManager::process_resources(timespec stamp)
{
if(stamp.tv_sec == 0) {
// use now()
std::timespec_get(&stamp, TIME_UTC);
}
// base event
new_event(stamp);
// process in the "live" bucket. this will parse the resources if we are deep sampling
live_bucket()->process_resources();
}
}
Loading