Skip to content

Commit

Permalink
Merge pull request #212 from ns1labs/feature/metrics-groups
Browse files Browse the repository at this point in the history
Feature/metrics groups
  • Loading branch information
leoparente authored Feb 22, 2022
2 parents 4dcafe0 + 96a4da8 commit a6002ce
Show file tree
Hide file tree
Showing 13 changed files with 942 additions and 409 deletions.
4 changes: 2 additions & 2 deletions RFCs/2021-04-16-76-collection-policies.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ visor:
type: net
filter:
protocols: [ udp ]
metrics:
metric_groups:
enable:
- top_ips
default_dns:
Expand All @@ -72,7 +72,7 @@ visor:
filter:
# must match the available configuration options for this version of this stream handler
qname_suffix: .mydomain.com
metrics:
metric_groups:
disable:
- top_qtypes
- top_udp_ports
Expand Down
7 changes: 0 additions & 7 deletions RFCs/2021-04-16-77-module-reflection.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,6 @@ All interfaces and schemas are versioned.
"metrics": [
"..."
]
},
"top_qnames_by_rcode": {
"title": "Top N QNames (Failing RCodes) ",
"description": "Top QNames across failing result codes",
"metrics": [
"..."
]
}
}
}
Expand Down
39 changes: 36 additions & 3 deletions src/AbstractMetricsManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

#pragma once

#include <chrono>
#include <atomic>
#include <chrono>
#include <deque>
#include <exception>
#include <nlohmann/json.hpp>
Expand All @@ -22,6 +22,9 @@

namespace visor {

constexpr size_t GROUP_SIZE = 64;
typedef uint32_t MetricGroupIntType;

using json = nlohmann::json;

class PeriodException : public std::runtime_error
Expand Down Expand Up @@ -59,6 +62,8 @@ class AbstractMetricsBucket
bool _recorded_stream = false;

protected:
const std::bitset<GROUP_SIZE> *_groups;

// merge the metrics of the specialized metric bucket
virtual void specialized_merge(const AbstractMetricsBucket &other) = 0;

Expand Down Expand Up @@ -180,6 +185,7 @@ class AbstractMetricsBucket
_end_tstamp.tv_sec = other._end_tstamp.tv_sec;
}
_rate_events.merge(other._rate_events);
_groups = other._groups;
}
specialized_merge(other);
}
Expand All @@ -195,6 +201,17 @@ class AbstractMetricsBucket
}
}

void configure_groups(const std::bitset<GROUP_SIZE> *groups)
{
std::unique_lock lock(_base_mutex);
_groups = groups;
}

inline bool group_enabled(MetricGroupIntType g) const
{
return (*_groups)[g];
}

virtual void to_json(json &j) const = 0;
virtual void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const = 0;
};
Expand Down Expand Up @@ -230,6 +247,8 @@ class AbstractMetricsManager
*/
bool _recorded_stream = false;

const std::bitset<GROUP_SIZE> *_groups;

private:
/**
* window maintenance
Expand All @@ -253,6 +272,7 @@ class AbstractMetricsManager
std::unique_ptr<MetricsBucketClass> expiring_bucket;
// this changes the live bucket
_metric_buckets.emplace_front(std::make_unique<MetricsBucketClass>());
_metric_buckets[0]->configure_groups(_groups);
_metric_buckets[0]->set_start_tstamp(stamp);
if (_recorded_stream) {
_metric_buckets[0]->set_recorded_stream();
Expand Down Expand Up @@ -303,6 +323,11 @@ class AbstractMetricsManager
_metric_buckets[0]->new_event(_deep_sampling_now);
}

inline bool group_enabled(MetricGroupIntType g) const
{
return (*_groups)[g];
}

/**
* call back when the time window period shift
*
Expand All @@ -320,7 +345,6 @@ class AbstractMetricsManager
, _last_shift_tstamp{0, 0}
, _next_shift_tstamp{0, 0}
{

if (window_config->config_exists("deep_sample_rate")) {
_deep_sample_rate = window_config->config_get<uint64_t>("deep_sample_rate");
}
Expand All @@ -341,7 +365,6 @@ class AbstractMetricsManager
_next_shift_tstamp.tv_sec += AbstractMetricsManager::PERIOD_SEC;

_metric_buckets.emplace_front(std::make_unique<MetricsBucketClass>());

}

virtual ~AbstractMetricsManager() = default;
Expand Down Expand Up @@ -407,6 +430,16 @@ class AbstractMetricsManager
return _metric_buckets.at(period).get();
}

void configure_groups(const std::bitset<GROUP_SIZE> *groups)
{
std::unique_lock wl(_base_mutex);
std::shared_lock rl(_bucket_mutex);
_groups = groups;
for (const auto &bucket : _metric_buckets) {
bucket->configure_groups(groups);
}
}

MetricsBucketClass *live_bucket()
{
// CRITICAL PATH
Expand Down
20 changes: 19 additions & 1 deletion src/Policies.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
if (!module.IsMap()) {
throw PolicyException("expecting Handler configuration map");
}

if (!module["type"] || !module["type"].IsScalar()) {
module = module[handler_module_name];
if (!module["type"] || !module["type"].IsScalar()) {
Expand Down Expand Up @@ -231,9 +232,26 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
throw PolicyException(fmt::format("invalid stream handler config for handler '{}': {}", handler_module_name, e.what()));
}
}
Config handler_metrics;
if (module["metric_groups"]) {
if (!module["metric_groups"].IsMap()) {
throw PolicyException("stream handler metric groups is not a map");
}

if (!module["metric_groups"]["enable"] && !module["metric_groups"]["disable"]) {
throw PolicyException("stream handler metric groups should contain enable and/or disable tags");
}

try {
handler_config.config_set_yaml(module["metric_groups"]);
} catch (ConfigException &e) {
throw PolicyException(fmt::format("invalid stream handler metrics for handler '{}': {}", handler_module_name, e.what()));
}
}
spdlog::get("visor")->info("policy [{}]: instantiating Handler {} of type {}", policy_name, handler_module_name, handler_module_type);
// note, currently merging the handler config with the window config. do they need to be separate?
// TODO separate filter config
handler_config.config_merge(handler_metrics);
handler_config.config_merge(handler_filter);
handler_config.config_merge(window_config);

Expand Down Expand Up @@ -373,4 +391,4 @@ void Policy::stop()
_running = false;
}

}
}
36 changes: 36 additions & 0 deletions src/StreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "AbstractMetricsManager.h"
#include "AbstractModule.h"
#include <fmt/ostream.h>
#include <nlohmann/json.hpp>
#include <sstream>

Expand Down Expand Up @@ -41,9 +42,44 @@ class StreamHandler : public AbstractRunnableModule
template <class MetricsManagerClass>
class StreamMetricsHandler : public StreamHandler
{
public:
typedef std::map<std::string, MetricGroupIntType> GroupDefType;

private:
MetricGroupIntType _process_group(const GroupDefType &group_defs, const std::string &group)
{
auto it = group_defs.find(group);
if (it == group_defs.end()) {
std::vector<std::string> valid_groups;
for (const auto &defs : group_defs) {
valid_groups.push_back(defs.first);
}
throw StreamHandlerException(fmt::format("{} is an invalid/unsupported metric group. The valid groups are {}", group, fmt::join(valid_groups, ", ")));
}
return it->second;
}

protected:
std::unique_ptr<MetricsManagerClass> _metrics;
std::bitset<GROUP_SIZE> _groups;

void process_groups(const GroupDefType &group_defs)
{

if (config_exists("enable")) {
for (const auto &group : config_get<StringList>("enable")) {
_groups.set(_process_group(group_defs, group));
}
}

if (config_exists("disable")) {
for (const auto &group : config_get<StringList>("disable")) {
_groups.reset(_process_group(group_defs, group));
}
}

_metrics->configure_groups(&_groups);
}

void common_info_json(json &j) const
{
Expand Down
Loading

0 comments on commit a6002ce

Please sign in to comment.