Skip to content

Commit

Permalink
add configure group method and not pass that in constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
Leonardo Parente committed Feb 10, 2022
1 parent 9d99aba commit b354c61
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 50 deletions.
22 changes: 20 additions & 2 deletions src/AbstractMetricsManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

namespace visor {

constexpr size_t GROUP_SIZE = 64;

using json = nlohmann::json;

class PeriodException : public std::runtime_error
Expand Down Expand Up @@ -59,6 +61,8 @@ class AbstractMetricsBucket
bool _recorded_stream = false;

protected:
const std::bitset<GROUP_SIZE> *_groups;

// merge the metrics of the specialized metric bucket
virtual void specialized_merge(const AbstractMetricsBucket &other) = 0;

Expand Down Expand Up @@ -195,6 +199,10 @@ class AbstractMetricsBucket
}
}

void configure_groups(const std::bitset<GROUP_SIZE> *groups) {
_groups = groups;
}

virtual void to_json(json &j) const = 0;
virtual void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const = 0;
};
Expand Down Expand Up @@ -230,6 +238,7 @@ class AbstractMetricsManager
*/
bool _recorded_stream = false;

const std::bitset<GROUP_SIZE> *_groups;
private:
/**
* window maintenance
Expand Down Expand Up @@ -314,7 +323,7 @@ class AbstractMetricsManager
}

public:
AbstractMetricsManager(const Configurable *window_config, const std::bitset<64> groups = std::bitset<64>())
AbstractMetricsManager(const Configurable *window_config)
: _metric_buckets{}
, _deep_sampling_now{true}
, _last_shift_tstamp{0, 0}
Expand All @@ -339,7 +348,7 @@ class AbstractMetricsManager
_next_shift_tstamp = _last_shift_tstamp;
_next_shift_tstamp.tv_sec += AbstractMetricsManager::PERIOD_SEC;

_metric_buckets.emplace_front(std::make_unique<MetricsBucketClass>(groups));
_metric_buckets.emplace_front(std::make_unique<MetricsBucketClass>());

}

Expand Down Expand Up @@ -406,6 +415,15 @@ class AbstractMetricsManager
return _metric_buckets.at(period).get();
}

void configure_groups(const std::bitset<GROUP_SIZE> *groups) {
std::unique_lock wl(_base_mutex);
std::shared_lock rl(_bucket_mutex);
_groups = groups;
for (const auto &bucket : _metric_buckets) {
bucket->configure_groups(groups);
}
}

MetricsBucketClass *live_bucket()
{
// CRITICAL PATH
Expand Down
5 changes: 3 additions & 2 deletions src/StreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class StreamMetricsHandler : public StreamHandler

protected:
std::unique_ptr<MetricsManagerClass> _metrics;
std::bitset<GROUP_SIZE> _groups;

void common_info_json(json &j) const
{
Expand Down Expand Up @@ -76,10 +77,10 @@ class StreamMetricsHandler : public StreamHandler
}

public:
StreamMetricsHandler(const std::string &name, const Configurable *window_config, const std::bitset<64> groups = std::bitset<64>())
StreamMetricsHandler(const std::string &name, const Configurable *window_config)
: StreamHandler(name)
{
_metrics = std::make_unique<MetricsManagerClass>(window_config, groups);
_metrics = std::make_unique<MetricsManagerClass>(window_config);
}

const MetricsManagerClass *metrics() const
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/dhcp/DhcpStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class DhcpMetricsBucket final : public visor::AbstractMetricsBucket
counters _counters;

public:
DhcpMetricsBucket(const std::bitset<64> groups = std::bitset<64>())
DhcpMetricsBucket()
{
set_event_rate_info("dhcp", {"rates", "total"}, "Rate of all DHCP wire packets (combined ingress and egress) per second");
set_num_events_info("dhcp", {"wire_packets", "total"}, "Total DHCP wire packets");
Expand All @@ -70,7 +70,7 @@ class DhcpMetricsBucket final : public visor::AbstractMetricsBucket
class DhcpMetricsManager final : public visor::AbstractMetricsManager<DhcpMetricsBucket>
{
public:
DhcpMetricsManager(const Configurable *window_config, const std::bitset<64> groups)
DhcpMetricsManager(const Configurable *window_config)
: visor::AbstractMetricsManager<DhcpMetricsBucket>(window_config)
{
}
Expand Down
47 changes: 22 additions & 25 deletions src/handlers/dns/DnsStreamHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
namespace visor::handler::dns {

DnsStreamHandler::DnsStreamHandler(const std::string &name, InputStream *stream, const Configurable *window_config, StreamHandler *handler)
: StreamMetricsHandler(name, window_config, _process_dns_groups(window_config))
: StreamMetricsHandler(name, window_config)
{
if (handler) {
throw StreamHandlerException(fmt::format("DnsStreamHandler: unsupported upstream chained stream handler {}", handler->name()));
Expand All @@ -35,8 +35,6 @@ DnsStreamHandler::DnsStreamHandler(const std::string &name, InputStream *stream,
if (!_pcap_stream && !_mock_stream && !_dnstap_stream) {
throw StreamHandlerException(fmt::format("DnsStreamHandler: unsupported input stream {}", stream->name()));
}

_g_enabled = _process_dns_groups(window_config);
}

void DnsStreamHandler::start()
Expand All @@ -45,6 +43,8 @@ void DnsStreamHandler::start()
return;
}

_process_dns_groups();

// Setup Filters
if (config_exists("exclude_noerror") && config_get<bool>("exclude_noerror")) {
_f_enabled.set(Filters::ExcludingRCode);
Expand Down Expand Up @@ -224,13 +224,11 @@ void DnsStreamHandler::tcp_message_ready_cb(int8_t side, const pcpp::TcpStreamDa
// data is freed upon return
};

if (_g_enabled.test(group::DnsMetrics::TopDnsWire)) {
if (!iter->second.sessionData[side]) {
iter->second.sessionData[side] = std::make_unique<TcpSessionData>(got_dns_message);
}

iter->second.sessionData[side]->receive_dns_wire_data(tcpData.getData(), tcpData.getDataLength());
if (!iter->second.sessionData[side]) {
iter->second.sessionData[side] = std::make_unique<TcpSessionData>(got_dns_message);
}

iter->second.sessionData[side]->receive_dns_wire_data(tcpData.getData(), tcpData.getDataLength());
}

void DnsStreamHandler::tcp_connection_start_cb(const pcpp::ConnectionData &connectionData)
Expand Down Expand Up @@ -313,28 +311,27 @@ bool DnsStreamHandler::_filtering(DnsLayer &payload, [[maybe_unused]] PacketDire
return true;
}

const std::bitset<64> DnsStreamHandler::_process_dns_groups(const Configurable *metrics_config)
void DnsStreamHandler::_process_dns_groups()
{
// default enabled groups
std::bitset<64> groups;
groups.set(group::DnsMetrics::Counters);
groups.set(group::DnsMetrics::TopDnsWire);
groups.set(group::DnsMetrics::DnsTransactions);
groups.set(group::DnsMetrics::TopQnames);

if (metrics_config->config_exists("enable")) {
for (const auto &group : metrics_config->config_get<StringList>("enable")) {
groups.set(_group_metrics.at(group));
_groups.set(group::DnsMetrics::Counters);
_groups.set(group::DnsMetrics::TopDnsWire);
_groups.set(group::DnsMetrics::DnsTransactions);
_groups.set(group::DnsMetrics::TopQnames);

if (config_exists("enable")) {
for (const auto &group : config_get<StringList>("enable")) {
_groups.set(_group_metrics.at(group));
}
}

if (metrics_config->config_exists("disable")) {
for (const auto &group : metrics_config->config_get<StringList>("disable")) {
groups.reset(_group_metrics.at(group));
if (config_exists("disable")) {
for (const auto &group : config_get<StringList>("disable")) {
_groups.reset(_group_metrics.at(group));
}
}

return groups;
_metrics->configure_groups(&_groups);
}

void DnsMetricsBucket::specialized_merge(const AbstractMetricsBucket &o)
Expand Down Expand Up @@ -594,7 +591,7 @@ void DnsMetricsBucket::process_dns_layer(bool deep, DnsLayer &payload, bool dnst
_dns_qnameCard.update(name);
_dns_topQType.update(query->getDnsType());

if (_groups.test(group::DnsMetrics::TopQnames)) {
if (_groups->test(group::DnsMetrics::TopQnames)) {
if (payload.getDnsHeader()->queryOrResponse == response) {
switch (payload.getDnsHeader()->responseCode) {
case SrvFail:
Expand Down Expand Up @@ -728,7 +725,7 @@ void DnsMetricsManager::process_dns_layer(DnsLayer &payload, PacketDirection dir
// process in the "live" bucket. this will parse the resources if we are deep sampling
live_bucket()->process_dns_layer(_deep_sampling_now, payload, false, l3, l4, port);

if (_groups.test(group::DnsMetrics::DnsTransactions)) {
if (_groups->test(group::DnsMetrics::DnsTransactions)) {
// handle dns transactions (query/response pairs)
if (payload.getDnsHeader()->queryOrResponse == QR::response) {
auto xact = _qr_pair_manager.maybe_end_transaction(flowkey, payload.getDnsHeader()->transactionID, stamp);
Expand Down
17 changes: 5 additions & 12 deletions src/handlers/dns/DnsStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ class DnsMetricsBucket final : public visor::AbstractMetricsBucket
protected:
mutable std::shared_mutex _mutex;

const std::bitset<64> _groups;

Quantile<uint64_t> _dnsXactFromTimeUs;
Quantile<uint64_t> _dnsXactToTimeUs;

Expand Down Expand Up @@ -102,9 +100,8 @@ class DnsMetricsBucket final : public visor::AbstractMetricsBucket
counters _counters;

public:
DnsMetricsBucket(const std::bitset<64> groups = std::bitset<64>())
: _groups(groups)
, _dnsXactFromTimeUs("dns", {"xact", "out", "quantiles_us"}, "Quantiles of transaction timing (query/reply pairs) when host is client, in microseconds")
DnsMetricsBucket()
: _dnsXactFromTimeUs("dns", {"xact", "out", "quantiles_us"}, "Quantiles of transaction timing (query/reply pairs) when host is client, in microseconds")
, _dnsXactToTimeUs("dns", {"xact", "in", "quantiles_us"}, "Quantiles of transaction timing (query/reply pairs) when host is server, in microseconds")
, _dns_qnameCard("dns", {"cardinality", "qname"}, "Cardinality of unique QNAMES, both ingress and egress")
, _dns_topQname2("dns", "qname", {"top_qname2"}, "Top QNAMES, aggregated at a depth of two labels")
Expand Down Expand Up @@ -161,16 +158,13 @@ class DnsMetricsBucket final : public visor::AbstractMetricsBucket

class DnsMetricsManager final : public visor::AbstractMetricsManager<DnsMetricsBucket>
{

const std::bitset<64> _groups;
QueryResponsePairMgr _qr_pair_manager;
float _to90th{0.0};
float _from90th{0.0};

public:
DnsMetricsManager(const Configurable *window_config, const std::bitset<64> groups)
: visor::AbstractMetricsManager<DnsMetricsBucket>(window_config, groups)
, _groups(groups)
DnsMetricsManager(const Configurable *window_config)
: visor::AbstractMetricsManager<DnsMetricsBucket>(window_config)
{
}

Expand Down Expand Up @@ -280,10 +274,9 @@ class DnsStreamHandler final : public visor::StreamMetricsHandler<DnsMetricsMana
{"top_dns_wire", group::DnsMetrics::TopDnsWire},
{"top_qnames", group::DnsMetrics::TopQnames}};

std::bitset<64> _g_enabled;

bool _filtering(DnsLayer &payload, PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4, uint16_t port, timespec stamp);
const std::bitset<64> _process_dns_groups(const Configurable *metrics_config);
void _process_dns_groups();

public:
DnsStreamHandler(const std::string &name, InputStream *stream, const Configurable *window_config, StreamHandler *handler = nullptr);
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/mock/MockStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class MockMetricsBucket final : public visor::AbstractMetricsBucket
counters _counters;

public:
MockMetricsBucket(const std::bitset<64> groups = std::bitset<64>())
MockMetricsBucket()
{
}

Expand All @@ -57,7 +57,7 @@ class MockMetricsBucket final : public visor::AbstractMetricsBucket
class MockMetricsManager final : public visor::AbstractMetricsManager<MockMetricsBucket>
{
public:
MockMetricsManager(const Configurable *window_config, const std::bitset<64> groups)
MockMetricsManager(const Configurable *window_config)
: visor::AbstractMetricsManager<MockMetricsBucket>(window_config)
{
}
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/net/NetStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class NetworkMetricsBucket final : public visor::AbstractMetricsBucket
Rate _rate_out;

public:
NetworkMetricsBucket(const std::bitset<64> groups = std::bitset<64>())
NetworkMetricsBucket()
: _srcIPCard("packets", {"cardinality", "src_ips_in"}, "Source IP cardinality")
, _dstIPCard("packets", {"cardinality", "dst_ips_out"}, "Destination IP cardinality")
, _topGeoLoc("packets", "geo_loc", {"top_geoLoc"}, "Top GeoIP locations")
Expand Down Expand Up @@ -105,7 +105,7 @@ class NetworkMetricsBucket final : public visor::AbstractMetricsBucket
class NetworkMetricsManager final : public visor::AbstractMetricsManager<NetworkMetricsBucket>
{
public:
NetworkMetricsManager(const Configurable *window_config, const std::bitset<64> groups)
NetworkMetricsManager(const Configurable *window_config)
: visor::AbstractMetricsManager<NetworkMetricsBucket>(window_config)
{
}
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/pcap/PcapStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class PcapMetricsBucket final : public visor::AbstractMetricsBucket
counters _counters;

public:
PcapMetricsBucket(const std::bitset<64> groups = std::bitset<64>())
PcapMetricsBucket()
{
}

Expand All @@ -65,7 +65,7 @@ class PcapMetricsBucket final : public visor::AbstractMetricsBucket
class PcapMetricsManager final : public visor::AbstractMetricsManager<PcapMetricsBucket>
{
public:
PcapMetricsManager(const Configurable *window_config, const std::bitset<64> groups)
PcapMetricsManager(const Configurable *window_config)
: visor::AbstractMetricsManager<PcapMetricsBucket>(window_config)
{
}
Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ using namespace visor;
class TestMetricsBucket : public AbstractMetricsBucket
{
public:
TestMetricsBucket(const std::bitset<64> groups = std::bitset<64>())
TestMetricsBucket()
{
}
void specialized_merge([[maybe_unused]] const AbstractMetricsBucket &other)
Expand Down

0 comments on commit b354c61

Please sign in to comment.