Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ThreadMonitor for linux systems #230

Merged
27 commits merged into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
06879c0
Implement ThreadMonitor for linux systems
Mar 3, 2022
8140b11
Merge branch 'develop' into feature/cpu-mem-info
Mar 10, 2022
98b8d26
Merge branch 'develop' into feature/cpu-mem-info
Mar 14, 2022
03215a9
Resources metrics as Handler
Mar 14, 2022
fd99353
Implement cpu percentage based on htop program
Mar 17, 2022
dd3fd33
reuse pcap timestamp to check timediff
Mar 17, 2022
89e9295
Add Resources Handler to every new input stream
Mar 17, 2022
034e461
add unit tests for resources handler
Mar 18, 2022
17e3f34
add coverage to prometheus method
Mar 18, 2022
50674b4
Merge branch 'develop' into feature/cpu-mem-info
Mar 21, 2022
bc9e2f6
Merge branch 'develop' into feature/cpu-mem-info
Mar 21, 2022
b4a68a4
rename resources handler to input resources handler
Mar 21, 2022
d2765d2
fix typo
Mar 21, 2022
5ebfb24
create new policy with resource handler for each new input
Mar 21, 2022
b8b490f
remove not necessary include from CoreServer
Mar 21, 2022
0a50c76
Add unique name for resources handler
Mar 22, 2022
e78a13a
add input resources policy to input policy list
Mar 22, 2022
70be023
only add input resource policy if the creation policy succeed
Mar 22, 2022
47f6de0
verify if input resources policy exists before removing it
Mar 22, 2022
95fa310
start input resources policy only after the added policy
Mar 22, 2022
eff1db9
do proper roll back policy
Mar 22, 2022
3856f79
Merge branch 'develop' into feature/cpu-mem-info
Mar 22, 2022
a1cfe97
Add support to count policies and handlers attached to input stream
Mar 23, 2022
165297d
Properly handle policies and handlers count on input resources handler
Mar 23, 2022
e069d25
Only merge the fist bucket which is the more recent one
Mar 24, 2022
09610ce
Change variables name
Mar 25, 2022
3351676
improve code coverage of input resources module
Mar 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/InputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "AbstractModule.h"
#include "StreamHandler.h"
#include <sigslot/signal.hpp>

namespace visor {

Expand All @@ -15,6 +16,11 @@ class InputStream : public AbstractRunnableModule
std::vector<const Policy *> _policies;

public:
enum class Action {
AddPolicy,
RemovePolicy
};

InputStream(const std::string &name)
: AbstractRunnableModule(name)
{
Expand All @@ -26,12 +32,14 @@ class InputStream : public AbstractRunnableModule
{
std::unique_lock lock(_input_mutex);
_policies.push_back(policy);
policy_signal(policy, Action::AddPolicy);
}

void remove_policy(const Policy *policy)
{
std::unique_lock lock(_input_mutex);
_policies.erase(std::remove(_policies.begin(), _policies.end(), policy), _policies.end());
policy_signal(policy, Action::RemovePolicy);
}

size_t policies_count() const
Expand All @@ -40,14 +48,19 @@ class InputStream : public AbstractRunnableModule
return _policies.size();
}

virtual size_t consumer_count() const = 0;
virtual size_t consumer_count() const
{
return policy_signal.slot_count();
}

void common_info_json(json &j) const
{
AbstractModule::common_info_json(j);
j["input"]["running"] = running();
j["input"]["consumers"] = consumer_count();
}

mutable sigslot::signal<const Policy *, Action> policy_signal;
};

}
64 changes: 57 additions & 7 deletions src/Policies.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,7 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
throw PolicyException(fmt::format("unable to instantiate tap '{}': {}", tap_name, e.what()));
}
}
// Create Policy
auto policy = std::make_unique<Policy>(policy_name, tap);
// if and only if policy succeeds, we will return this in result set
Policy *policy_ptr = policy.get();
policy->set_input_stream(input_ptr);
// Handler Section
// Handler type
if (!it->second["handlers"] || !it->second["handlers"].IsMap()) {
throw PolicyException("missing or invalid handler configuration at key 'handlers'");
}
Expand All @@ -162,6 +157,14 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
} else if (handler_node["modules"].IsSequence()) {
handler_sequence = true;
}

// Create Policy
auto policy = std::make_unique<Policy>(policy_name, tap, handler_sequence);
// if and only if policy succeeds, we will return this in result set
Policy *policy_ptr = policy.get();
policy->set_input_stream(input_ptr);

// Handler Section
Config window_config;
if (handler_node["window_config"] && handler_node["window_config"].IsMap()) {
try {
Expand All @@ -174,6 +177,21 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
window_config.config_set<uint64_t>("deep_sample_rate", _default_deep_sample_rate);
}

std::unique_ptr<Policy> input_resources_policy;
Policy *input_res_policy_ptr;
std::unique_ptr<StreamHandler> resources_module;
if (input_stream) {
weyrick marked this conversation as resolved.
Show resolved Hide resolved
// create new policy with resources handler for input stream
input_resources_policy = std::make_unique<Policy>(input_stream_module_name + "-resources", tap, false);
input_resources_policy->set_input_stream(input_ptr);
auto resources_handler_plugin = _registry->handler_plugins().find("input_resources");
if (resources_handler_plugin != _registry->handler_plugins().end()) {
resources_module = resources_handler_plugin->second->instantiate(input_stream_module_name + "-resources", input_ptr, &window_config);
input_resources_policy->add_module(resources_module.get());
input_res_policy_ptr = input_resources_policy.get();
}
}

std::vector<std::unique_ptr<StreamHandler>> handler_modules;
for (YAML::const_iterator h_it = handler_node["modules"].begin(); h_it != handler_node["modules"].end(); ++h_it) {

Expand Down Expand Up @@ -269,6 +287,9 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
// make sure policy starts before committing
try {
policy->start();
if (input_resources_policy) {
input_resources_policy->start();
weyrick marked this conversation as resolved.
Show resolved Hide resolved
}
} catch (std::runtime_error &e) {
throw PolicyException(fmt::format("policy [{}] failed to start: {}", policy_name, e.what()));
}
Expand All @@ -278,6 +299,9 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
// roll back during exception ensures no modules have been added to any of the managers
try {
module_add(std::move(policy));
if (input_resources_policy) {
module_add(std::move(input_resources_policy));
}
} catch (ModuleException &e) {
throw PolicyException(fmt::format("policy [{}] creation failed (policy): {}", policy_name, e.what()));
}
Expand All @@ -288,6 +312,9 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
} catch (ModuleException &e) {
weyrick marked this conversation as resolved.
Show resolved Hide resolved
// note that if this call excepts, we are in an unknown state and the exception will propagate
module_remove(policy_name);
if (input_res_policy_ptr) {
module_remove(input_res_policy_ptr->name());
}
throw PolicyException(fmt::format("policy [{}] creation failed (input): {}", policy_name, e.what()));
}
std::vector<std::string> added_handlers;
Expand All @@ -298,10 +325,18 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
// if it did not except, add it to the list for rollback upon exception
added_handlers.push_back(hname);
}
if (resources_module) {
auto hname = resources_module->name();
_registry->handler_manager()->module_add(std::move(resources_module));
added_handlers.push_back(hname);
}
} catch (ModuleException &e) {
// note that if any of these calls except, we are in an unknown state and the exception will propagate
// nothing needs to be stopped because it was not started
module_remove(policy_name);
if (input_res_policy_ptr) {
module_remove(input_res_policy_ptr->name());
}
_registry->input_manager()->module_remove(input_stream_module_name);
for (auto &m : added_handlers) {
_registry->handler_manager()->module_remove(m);
Expand All @@ -311,6 +346,9 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
}

// success
if (input_res_policy_ptr) {
input_ptr->add_policy(input_res_policy_ptr);
}
input_ptr->add_policy(policy_ptr);
result.push_back(policy_ptr);
}
Expand All @@ -325,6 +363,7 @@ void PolicyManager::remove_policy(const std::string &name)
}

auto policy = _map[name].get();
auto input_stream = policy->input_stream();
auto input_name = policy->input_stream()->name();
std::vector<std::string> module_names;
for (const auto &mod : policy->modules()) {
Expand All @@ -336,7 +375,18 @@ void PolicyManager::remove_policy(const std::string &name)
_registry->handler_manager()->module_remove(name);
}

if (!policy->input_stream()->policies_count()) {
if (input_stream->policies_count() == 1) {
// if there is only one policy left on the input stream, and that policy is the input resources policy, then remove it
auto input_resources_name = input_name + "-resources";
if (_map.count(input_resources_name) != 0) {
auto resources_policy = _map[input_resources_name].get();
resources_policy->stop();
_registry->handler_manager()->module_remove(input_resources_name);
_map.erase(input_resources_name);
}
}

if (!input_stream->policies_count()) {
_registry->input_manager()->module_remove(input_name);
}

Expand Down
14 changes: 13 additions & 1 deletion src/Policies.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@ class PolicyException : public std::runtime_error

class Policy : public AbstractRunnableModule
{
static constexpr size_t HANDLERS_SEQUENCE_SIZE = 1;

Tap *_tap;
bool _modules_sequence;
InputStream *_input_stream;
std::vector<AbstractRunnableModule *> _modules;

public:
Policy(const std::string &name, Tap *tap)
Policy(const std::string &name, Tap *tap, bool modules_sequence)
: AbstractRunnableModule(name)
, _tap(tap)
, _modules_sequence(modules_sequence)
, _input_stream(nullptr)
{
}
Expand Down Expand Up @@ -66,6 +69,15 @@ class Policy : public AbstractRunnableModule
return _modules;
}

size_t get_handlers_list_size() const
{
if (_modules_sequence) {
return HANDLERS_SEQUENCE_SIZE;
} else {
return _modules.size();
}
}

// life cycle
void start() override;
void stop() override;
Expand Down
1 change: 1 addition & 0 deletions src/handlers/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ add_subdirectory(dns)
add_subdirectory(dhcp)
add_subdirectory(pcap)
add_subdirectory(mock)
add_subdirectory(input_resources)

set(VISOR_STATIC_PLUGINS ${VISOR_STATIC_PLUGINS} PARENT_SCOPE)
1 change: 1 addition & 0 deletions src/handlers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ See the individual READMEs for more information:
* [Mock](mock/)
* [Network](net/)
* [PCAP](pcap/)
* [Resources](resources/)
28 changes: 28 additions & 0 deletions src/handlers/input_resources/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
message(STATUS "Handler Module: Input Resources")

set_directory_properties(PROPERTIES CORRADE_USE_PEDANTIC_FLAGS ON)

corrade_add_static_plugin(VisorHandlerInputResources
${CMAKE_CURRENT_BINARY_DIR}
InputResourcesHandler.conf
InputResourcesHandlerModulePlugin.cpp
InputResourcesStreamHandler.cpp
)
add_library(Visor::Handler::InputResources ALIAS VisorHandlerInputResources)

target_include_directories(VisorHandlerInputResources
INTERFACE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
)

target_link_libraries(VisorHandlerInputResources
PUBLIC
Visor::Input::Pcap
Visor::Input::Dnstap
Visor::Input::Mock
Visor::Input::Sflow
)

set(VISOR_STATIC_PLUGINS ${VISOR_STATIC_PLUGINS} Visor::Handler::InputResources PARENT_SCOPE)

add_subdirectory(tests)
5 changes: 5 additions & 0 deletions src/handlers/input_resources/InputResourcesHandler.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Aliases
provides=input_resources
[data]
desc=Input Resources analyzer
type=handler
31 changes: 31 additions & 0 deletions src/handlers/input_resources/InputResourcesHandlerModulePlugin.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */

#include "InputResourcesHandlerModulePlugin.h"
#include "CoreRegistry.h"
#include "InputResourcesStreamHandler.h"
#include "HandlerManager.h"
#include "InputStreamManager.h"
#include <Corrade/PluginManager/AbstractManager.h>
#include <nlohmann/json.hpp>

CORRADE_PLUGIN_REGISTER(VisorHandlerInputResources, visor::handler::resources::InputResourcesHandlerModulePlugin,
"visor.module.handler/1.0")

namespace visor::handler::resources {

using json = nlohmann::json;

void InputResourcesHandlerModulePlugin::setup_routes(HttpServer *svr)
{
}
std::unique_ptr<StreamHandler> InputResourcesHandlerModulePlugin::instantiate(const std::string &name, InputStream *input_stream, const Configurable *config, StreamHandler *stream_handler)
{
// TODO using config as both window config and module config
auto handler_module = std::make_unique<InputResourcesStreamHandler>(name, input_stream, config, stream_handler);
handler_module->config_merge(*config);
return handler_module;
}

}
24 changes: 24 additions & 0 deletions src/handlers/input_resources/InputResourcesHandlerModulePlugin.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */

#pragma once

#include "HandlerModulePlugin.h"

namespace visor::handler::resources {

class InputResourcesHandlerModulePlugin : public HandlerModulePlugin
{

protected:
void setup_routes(HttpServer *svr) override;

public:
explicit InputResourcesHandlerModulePlugin(Corrade::PluginManager::AbstractManager &manager, const std::string &plugin)
: visor::HandlerModulePlugin{manager, plugin}
{
}
std::unique_ptr<StreamHandler> instantiate(const std::string &name, InputStream *input_stream, const Configurable *config, StreamHandler *stream_handler = nullptr) override;
};
}
Loading