Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/metrics groups #212

Merged
25 commits merged into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
18fbbc4
Add support to metrics in policy and create GroupMetrics in Handler
Feb 4, 2022
c142b8e
Add groups support to handlers and handler groups in DNS Handler
Feb 9, 2022
9562a90
add configure group method and not pass that in constructor
Feb 10, 2022
987c72a
use bitset operator[] to reduce overhead
Feb 10, 2022
8003bba
refactor DNS Handler class to be possible to match groups in one place
Feb 17, 2022
723ca0f
Refactor Net Handler to process input data in one method
Feb 18, 2022
29b25aa
add groups on net handler
Feb 18, 2022
7f04f10
refactor some group handling to base class
weyrick Feb 18, 2022
e0b1ecb
Add support to metrics in policy and create GroupMetrics in Handler
Feb 4, 2022
63e0ed3
Add groups support to handlers and handler groups in DNS Handler
Feb 9, 2022
3331994
add configure group method and not pass that in constructor
Feb 10, 2022
3d04d4b
use bitset operator[] to reduce overhead
Feb 10, 2022
578e6fc
refactor DNS Handler class to be possible to match groups in one place
Feb 17, 2022
242134b
Refactor Net Handler to process input data in one method
Feb 18, 2022
a4615c5
add groups on net handler
Feb 18, 2022
5db8673
refactor some group handling to base class
weyrick Feb 18, 2022
3f09059
change metrics key to metric_groups and add method to check if group …
Feb 21, 2022
6b998bd
Merge branch 'feature/metrics-groups' of github.com:ns1labs/pktvisor …
Feb 21, 2022
e9904a2
remove definition of MetricGroupIntType from streamHandler
Feb 21, 2022
966afd7
remove not necessary validation
Feb 21, 2022
435f900
Merge branch 'develop' into feature/metrics-groups
Feb 21, 2022
9c04582
Add unit tests for DNS handler groups
Feb 21, 2022
b9e817a
add group unit tests for net handler
Feb 22, 2022
e4bea53
validate group when merging metrics
Feb 22, 2022
96a4da8
create unit tests for metric groups in policy class
Feb 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions RFCs/2021-04-16-76-collection-policies.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ visor:
type: net
filter:
protocols: [ udp ]
metrics:
metric_groups:
enable:
- top_ips
default_dns:
Expand All @@ -72,7 +72,7 @@ visor:
filter:
# must match the available configuration options for this version of this stream handler
qname_suffix: .mydomain.com
metrics:
metric_groups:
disable:
- top_qtypes
- top_udp_ports
Expand Down
7 changes: 0 additions & 7 deletions RFCs/2021-04-16-77-module-reflection.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,6 @@ All interfaces and schemas are versioned.
"metrics": [
"..."
]
},
"top_qnames_by_rcode": {
"title": "Top N QNames (Failing RCodes) ",
"description": "Top QNames across failing result codes",
"metrics": [
"..."
]
}
}
}
Expand Down
39 changes: 36 additions & 3 deletions src/AbstractMetricsManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

#pragma once

#include <chrono>
#include <atomic>
#include <chrono>
#include <deque>
#include <exception>
#include <nlohmann/json.hpp>
Expand All @@ -22,6 +22,9 @@

namespace visor {

constexpr size_t GROUP_SIZE = 64;
weyrick marked this conversation as resolved.
Show resolved Hide resolved
typedef uint32_t MetricGroupIntType;

using json = nlohmann::json;

class PeriodException : public std::runtime_error
Expand Down Expand Up @@ -59,6 +62,8 @@ class AbstractMetricsBucket
bool _recorded_stream = false;

protected:
const std::bitset<GROUP_SIZE> *_groups;

// merge the metrics of the specialized metric bucket
virtual void specialized_merge(const AbstractMetricsBucket &other) = 0;

Expand Down Expand Up @@ -180,6 +185,7 @@ class AbstractMetricsBucket
_end_tstamp.tv_sec = other._end_tstamp.tv_sec;
}
_rate_events.merge(other._rate_events);
_groups = other._groups;
weyrick marked this conversation as resolved.
Show resolved Hide resolved
}
specialized_merge(other);
}
Expand All @@ -195,6 +201,17 @@ class AbstractMetricsBucket
}
}

void configure_groups(const std::bitset<GROUP_SIZE> *groups)
{
std::unique_lock lock(_base_mutex);
_groups = groups;
weyrick marked this conversation as resolved.
Show resolved Hide resolved
}

inline bool group_enabled(MetricGroupIntType g) const
{
return (*_groups)[g];
}

virtual void to_json(json &j) const = 0;
virtual void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const = 0;
};
Expand Down Expand Up @@ -230,6 +247,8 @@ class AbstractMetricsManager
*/
bool _recorded_stream = false;

const std::bitset<GROUP_SIZE> *_groups;

private:
/**
* window maintenance
Expand All @@ -253,6 +272,7 @@ class AbstractMetricsManager
std::unique_ptr<MetricsBucketClass> expiring_bucket;
// this changes the live bucket
_metric_buckets.emplace_front(std::make_unique<MetricsBucketClass>());
_metric_buckets[0]->configure_groups(_groups);
_metric_buckets[0]->set_start_tstamp(stamp);
if (_recorded_stream) {
_metric_buckets[0]->set_recorded_stream();
Expand Down Expand Up @@ -303,6 +323,11 @@ class AbstractMetricsManager
_metric_buckets[0]->new_event(_deep_sampling_now);
}

inline bool group_enabled(MetricGroupIntType g) const
{
return (*_groups)[g];
}

/**
* call back when the time window period shift
*
Expand All @@ -320,7 +345,6 @@ class AbstractMetricsManager
, _last_shift_tstamp{0, 0}
, _next_shift_tstamp{0, 0}
{

if (window_config->config_exists("deep_sample_rate")) {
_deep_sample_rate = window_config->config_get<uint64_t>("deep_sample_rate");
}
Expand All @@ -341,7 +365,6 @@ class AbstractMetricsManager
_next_shift_tstamp.tv_sec += AbstractMetricsManager::PERIOD_SEC;

_metric_buckets.emplace_front(std::make_unique<MetricsBucketClass>());

}

virtual ~AbstractMetricsManager() = default;
Expand Down Expand Up @@ -407,6 +430,16 @@ class AbstractMetricsManager
return _metric_buckets.at(period).get();
}

void configure_groups(const std::bitset<GROUP_SIZE> *groups)
{
std::unique_lock wl(_base_mutex);
std::shared_lock rl(_bucket_mutex);
_groups = groups;
for (const auto &bucket : _metric_buckets) {
bucket->configure_groups(groups);
}
}

MetricsBucketClass *live_bucket()
{
// CRITICAL PATH
Expand Down
20 changes: 19 additions & 1 deletion src/Policies.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
if (!module.IsMap()) {
throw PolicyException("expecting Handler configuration map");
}

if (!module["type"] || !module["type"].IsScalar()) {
module = module[handler_module_name];
if (!module["type"] || !module["type"].IsScalar()) {
Expand Down Expand Up @@ -231,9 +232,26 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
throw PolicyException(fmt::format("invalid stream handler config for handler '{}': {}", handler_module_name, e.what()));
}
}
Config handler_metrics;
if (module["metric_groups"]) {
if (!module["metric_groups"].IsMap()) {
throw PolicyException("stream handler metric groups is not a map");
}

if (!module["metric_groups"]["enable"] && !module["metric_groups"]["disable"]) {
throw PolicyException("stream handler metric groups should contain enable and/or disable tags");
}

try {
handler_config.config_set_yaml(module["metric_groups"]);
} catch (ConfigException &e) {
throw PolicyException(fmt::format("invalid stream handler metrics for handler '{}': {}", handler_module_name, e.what()));
}
}
spdlog::get("visor")->info("policy [{}]: instantiating Handler {} of type {}", policy_name, handler_module_name, handler_module_type);
// note, currently merging the handler config with the window config. do they need to be separate?
// TODO separate filter config
handler_config.config_merge(handler_metrics);
handler_config.config_merge(handler_filter);
handler_config.config_merge(window_config);

Expand Down Expand Up @@ -373,4 +391,4 @@ void Policy::stop()
_running = false;
}

}
}
36 changes: 36 additions & 0 deletions src/StreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "AbstractMetricsManager.h"
#include "AbstractModule.h"
#include <fmt/ostream.h>
#include <nlohmann/json.hpp>
#include <sstream>

Expand Down Expand Up @@ -41,9 +42,44 @@ class StreamHandler : public AbstractRunnableModule
template <class MetricsManagerClass>
class StreamMetricsHandler : public StreamHandler
{
public:
typedef std::map<std::string, MetricGroupIntType> GroupDefType;

private:
MetricGroupIntType _process_group(const GroupDefType &group_defs, const std::string &group)
{
auto it = group_defs.find(group);
if (it == group_defs.end()) {
std::vector<std::string> valid_groups;
for (const auto &defs : group_defs) {
valid_groups.push_back(defs.first);
}
throw StreamHandlerException(fmt::format("{} is an invalid/unsupported metric group. The valid groups are {}", group, fmt::join(valid_groups, ", ")));
}
return it->second;
}

protected:
std::unique_ptr<MetricsManagerClass> _metrics;
std::bitset<GROUP_SIZE> _groups;

void process_groups(const GroupDefType &group_defs)
{

if (config_exists("enable")) {
for (const auto &group : config_get<StringList>("enable")) {
_groups.set(_process_group(group_defs, group));
}
}

if (config_exists("disable")) {
for (const auto &group : config_get<StringList>("disable")) {
_groups.reset(_process_group(group_defs, group));
}
}

_metrics->configure_groups(&_groups);
}

void common_info_json(json &j) const
{
Expand Down
Loading