From 8cca32508aca977a2a057a70c1f25654a72e5db7 Mon Sep 17 00:00:00 2001 From: Shannon Weyrick Date: Fri, 6 Oct 2023 10:55:31 -0400 Subject: [PATCH 1/4] add connect failure metric --- src/handlers/netprobe/NetProbeStreamHandler.cpp | 11 +++++++++-- src/handlers/netprobe/NetProbeStreamHandler.h | 4 +++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/handlers/netprobe/NetProbeStreamHandler.cpp b/src/handlers/netprobe/NetProbeStreamHandler.cpp index f41c21a50..934229f30 100644 --- a/src/handlers/netprobe/NetProbeStreamHandler.cpp +++ b/src/handlers/netprobe/NetProbeStreamHandler.cpp @@ -79,7 +79,7 @@ void NetProbeStreamHandler::probe_signal_send(pcpp::Packet &payload, TestType ty } } else if (type == TestType::TCP) { if (auto tcp = payload.getLayerOfType(); tcp != nullptr) { - _metrics->process_netprobe_tcp(static_cast(tcp->getSrcPort()), true, name, stamp); + _metrics->process_netprobe_tcp(static_cast(tcp->getDstPort()), true, name, stamp); } } } @@ -119,6 +119,7 @@ void NetProbeMetricsBucket::specialized_merge(const AbstractMetricsBucket &o, Me if (group_enabled(group::NetProbeMetrics::Counters)) { _targets_metrics[targetId]->attempts += target.second->attempts; _targets_metrics[targetId]->successes += target.second->successes; + _targets_metrics[targetId]->connect_failures += target.second->connect_failures; _targets_metrics[targetId]->dns_failures += target.second->dns_failures; _targets_metrics[targetId]->timed_out += target.second->timed_out; } @@ -143,6 +144,7 @@ void NetProbeMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelM if (group_enabled(group::NetProbeMetrics::Counters)) { target.second->attempts.to_prometheus(out, target_labels); target.second->successes.to_prometheus(out, target_labels); + target.second->connect_failures.to_prometheus(out, target_labels); target.second->dns_failures.to_prometheus(out, target_labels); target.second->timed_out.to_prometheus(out, target_labels); } @@ -198,6 +200,7 @@ void NetProbeMetricsBucket::to_opentelemetry(metrics::v1::ScopeMetrics &scope, t if (group_enabled(group::NetProbeMetrics::Counters)) { target.second->attempts.to_opentelemetry(scope, start_ts, end_ts, target_labels); target.second->successes.to_opentelemetry(scope, start_ts, end_ts, target_labels); + target.second->connect_failures.to_opentelemetry(scope, start_ts, end_ts, target_labels); target.second->dns_failures.to_opentelemetry(scope, start_ts, end_ts, target_labels); target.second->timed_out.to_opentelemetry(scope, start_ts, end_ts, target_labels); } @@ -252,6 +255,7 @@ void NetProbeMetricsBucket::to_json(json &j) const if (group_enabled(group::NetProbeMetrics::Counters)) { target.second->attempts.to_json(j["targets"][targetId]); target.second->successes.to_json(j["targets"][targetId]); + target.second->connect_failures.to_json(j["targets"][targetId]); target.second->dns_failures.to_json(j["targets"][targetId]); target.second->timed_out.to_json(j["targets"][targetId]); } @@ -311,9 +315,12 @@ void NetProbeMetricsBucket::process_failure(ErrorType error, const std::string & break; case ErrorType::Timeout: ++_targets_metrics[target]->timed_out; + break; case ErrorType::SocketError: case ErrorType::InvalidIp: - case ErrorType::ConnectionFailure: + case ErrorType::ConnectFailure: + ++_targets_metrics[target]->connect_failures; + break; default: break; } diff --git a/src/handlers/netprobe/NetProbeStreamHandler.h b/src/handlers/netprobe/NetProbeStreamHandler.h index 0841fd48f..3b5545753 100644 --- a/src/handlers/netprobe/NetProbeStreamHandler.h +++ b/src/handlers/netprobe/NetProbeStreamHandler.h @@ -49,6 +49,7 @@ struct Target { Counter successes; Counter minimum; Counter maximum; + Counter connect_failures; Counter dns_failures; Counter timed_out; @@ -59,7 +60,8 @@ struct Target { , successes(NET_PROBE_SCHEMA, {"successes"}, "Total Net Probe successes") , minimum(NET_PROBE_SCHEMA, {"response_min_us"}, "Minimum response time measured in the reporting interval") , maximum(NET_PROBE_SCHEMA, {"response_max_us"}, "Maximum response time measured in the reporting interval") - , dns_failures(NET_PROBE_SCHEMA, {"dns_lookup_failures"}, "Total Net Probe failures when performed DNS lookup") + , connect_failures(NET_PROBE_SCHEMA, {"connect_failures"}, "Total Net Probe failures when performing a TCP socket connection") + , dns_failures(NET_PROBE_SCHEMA, {"dns_lookup_failures"}, "Total Net Probe failures when performing a DNS lookup") , timed_out(NET_PROBE_SCHEMA, {"packets_timeout"}, "Total Net Probe timeout transactions") { } From bd560f09c9b4059e4a51b037a77dda6b795e9975 Mon Sep 17 00:00:00 2001 From: Shannon Weyrick Date: Fri, 6 Oct 2023 12:28:43 -0400 Subject: [PATCH 2/4] simplify tcp probe. use target id for transaction id. --- .../netprobe/NetProbeStreamHandler.cpp | 14 ++--- src/handlers/netprobe/NetProbeStreamHandler.h | 4 +- src/inputs/netprobe/NetProbe.h | 2 +- src/inputs/netprobe/TcpProbe.cpp | 55 +++++-------------- src/inputs/netprobe/TcpProbe.h | 16 +----- 5 files changed, 26 insertions(+), 65 deletions(-) diff --git a/src/handlers/netprobe/NetProbeStreamHandler.cpp b/src/handlers/netprobe/NetProbeStreamHandler.cpp index 934229f30..3ab77b5ae 100644 --- a/src/handlers/netprobe/NetProbeStreamHandler.cpp +++ b/src/handlers/netprobe/NetProbeStreamHandler.cpp @@ -79,7 +79,7 @@ void NetProbeStreamHandler::probe_signal_send(pcpp::Packet &payload, TestType ty } } else if (type == TestType::TCP) { if (auto tcp = payload.getLayerOfType(); tcp != nullptr) { - _metrics->process_netprobe_tcp(static_cast(tcp->getDstPort()), true, name, stamp); + _metrics->process_netprobe_tcp(true, name, stamp); } } } @@ -92,7 +92,7 @@ void NetProbeStreamHandler::probe_signal_recv(pcpp::Packet &payload, TestType ty } } else if (type == TestType::TCP) { if (auto tcp = payload.getLayerOfType(); tcp != nullptr) { - _metrics->process_netprobe_tcp(static_cast(tcp->getDstPort()), false, name, stamp); + _metrics->process_netprobe_tcp(false, name, stamp); } } } @@ -381,12 +381,12 @@ void NetProbeMetricsManager::process_netprobe_icmp(pcpp::IcmpLayer *layer, const if (layer->getMessageType() == pcpp::ICMP_ECHO_REQUEST) { if (auto request = layer->getEchoRequestData(); request != nullptr) { - _request_reply_manager->start_transaction((static_cast(request->header->id) << 16) | request->header->sequence, {{stamp, {0, 0}}, target}); + _request_reply_manager->start_transaction(target, {{stamp, {0, 0}}, target}); } live_bucket()->process_attempts(_deep_sampling_now, target); } else if (layer->getMessageType() == pcpp::ICMP_ECHO_REPLY) { if (auto reply = layer->getEchoReplyData(); reply != nullptr) { - auto xact = _request_reply_manager->maybe_end_transaction((static_cast(reply->header->id) << 16) | reply->header->sequence, stamp); + auto xact = _request_reply_manager->maybe_end_transaction(target, stamp); if (xact.first == Result::Valid) { live_bucket()->new_transaction(_deep_sampling_now, xact.second); } else if (xact.first == Result::TimedOut) { @@ -396,16 +396,16 @@ void NetProbeMetricsManager::process_netprobe_icmp(pcpp::IcmpLayer *layer, const } } -void NetProbeMetricsManager::process_netprobe_tcp(uint32_t port, bool send, const std::string &target, timespec stamp) +void NetProbeMetricsManager::process_netprobe_tcp(bool send, const std::string &target, timespec stamp) { // base event new_event(stamp); if (send) { - _request_reply_manager->start_transaction(port, {{stamp, {0, 0}}, target}); + _request_reply_manager->start_transaction(target, {{stamp, {0, 0}}, target}); live_bucket()->process_attempts(_deep_sampling_now, target); } else { - auto xact = _request_reply_manager->maybe_end_transaction(port, stamp); + auto xact = _request_reply_manager->maybe_end_transaction(target, stamp); if (xact.first == Result::Valid) { live_bucket()->new_transaction(_deep_sampling_now, xact.second); } else if (xact.first == Result::TimedOut) { diff --git a/src/handlers/netprobe/NetProbeStreamHandler.h b/src/handlers/netprobe/NetProbeStreamHandler.h index 3b5545753..a12f7444d 100644 --- a/src/handlers/netprobe/NetProbeStreamHandler.h +++ b/src/handlers/netprobe/NetProbeStreamHandler.h @@ -100,7 +100,7 @@ class NetProbeMetricsBucket final : public visor::AbstractMetricsBucket class NetProbeMetricsManager final : public visor::AbstractMetricsManager { - typedef TransactionManager> NetProbeTransactionManager; + typedef TransactionManager> NetProbeTransactionManager; std::unique_ptr _request_reply_manager; public: @@ -124,7 +124,7 @@ class NetProbeMetricsManager final : public visor::AbstractMetricsManager diff --git a/src/inputs/netprobe/NetProbe.h b/src/inputs/netprobe/NetProbe.h index 18c24732c..53eb299f9 100644 --- a/src/inputs/netprobe/NetProbe.h +++ b/src/inputs/netprobe/NetProbe.h @@ -24,7 +24,7 @@ enum class ErrorType { SocketError, DnsLookupFailure, InvalidIp, - ConnectionFailure + ConnectFailure }; enum class TestType { diff --git a/src/inputs/netprobe/TcpProbe.cpp b/src/inputs/netprobe/TcpProbe.cpp index 625cdd0bb..a424ad3d1 100644 --- a/src/inputs/netprobe/TcpProbe.cpp +++ b/src/inputs/netprobe/TcpProbe.cpp @@ -11,6 +11,7 @@ #ifdef __GNUC__ #pragma GCC diagnostic pop #endif +#include namespace visor::input::netprobe { bool TcpProbe::start(std::shared_ptr io_loop) @@ -31,44 +32,17 @@ bool TcpProbe::start(std::shared_ptr io_loop) } _interval_timer->on([this](const auto &, auto &) { - _internal_sequence = 0; - _timeout_timer = _io_loop->resource(); - if (!_timeout_timer) { - throw NetProbeException("Netprobe - unable to initialize timeout TimerHandle"); - } - - _timeout_timer->on([this](const auto &, auto &) { - _internal_sequence = _config.packets_per_test; - _fail(ErrorType::Timeout, TestType::Ping, _name); - if (_internal_timer) { - _internal_timer->stop(); - } - _interval_timer->again(); - }); if (!_dns.empty()) { auto [ip, ipv4] = _resolve_dns(); _ip_str = ip; _is_ipv4 = ipv4; if (_ip_str.empty()) { - _fail(ErrorType::DnsLookupFailure, TestType::Ping, _name); + _fail(ErrorType::DnsLookupFailure, TestType::TCP, _name); return; } } - - _internal_timer = _io_loop->resource(); - _internal_timer->on([this](const auto &, auto &) { - if (_internal_sequence < _config.packets_per_test) { - _internal_sequence++; - _timeout_timer->stop(); - _timeout_timer->start(uvw::TimerHandle::Time{_config.timeout_msec}, uvw::TimerHandle::Time{0}); - _perform_tcp_process(); - } - }); - _timeout_timer->start(uvw::TimerHandle::Time{_config.timeout_msec}, uvw::TimerHandle::Time{0}); _perform_tcp_process(); - _internal_sequence++; - _internal_timer->start(uvw::TimerHandle::Time{_config.packets_interval_msec}, uvw::TimerHandle::Time{_config.packets_interval_msec}); }); _interval_timer->start(uvw::TimerHandle::Time{0}, uvw::TimerHandle::Time{_config.interval_msec}); @@ -80,33 +54,32 @@ void TcpProbe::_perform_tcp_process() { _client = _io_loop->resource(); _client->on([this](const auto &, auto &) { - _fail(ErrorType::ConnectionFailure, TestType::Ping, _name); + _fail(ErrorType::ConnectFailure, TestType::TCP, _name); }); _client->once([this](const uvw::CloseEvent &, uvw::TCPHandle &) { - timespec stamp; - std::timespec_get(&stamp, TIME_UTC); - pcpp::Packet packet; - auto layer = pcpp::TcpLayer(static_cast(_port), _client_port); - packet.addLayer(&layer); - _recv(packet, TestType::TCP, _name, stamp); }); - _client->once([](const uvw::ShutdownEvent &, uvw::TCPHandle &handle) { + _client->once([this](const uvw::ShutdownEvent &, uvw::TCPHandle &handle) { handle.close(); }); _client->once([this](const uvw::ConnectEvent &, uvw::TCPHandle &handle) { timespec stamp; std::timespec_get(&stamp, TIME_UTC); - _client_port = static_cast(handle.sock().port); pcpp::Packet packet; - auto layer = pcpp::TcpLayer(_client_port, static_cast(_port)); + auto layer = pcpp::TcpLayer(0, static_cast(_dst_port)); packet.addLayer(&layer); - _send(packet, TestType::TCP, _name, stamp); + _recv(packet, TestType::TCP, _name, stamp); handle.shutdown(); }); + timespec stamp; + std::timespec_get(&stamp, TIME_UTC); + pcpp::Packet packet; + auto layer = pcpp::TcpLayer(0, static_cast(_dst_port)); + packet.addLayer(&layer); + _send(packet, TestType::TCP, _name, stamp); if (_is_ipv4) { - _client->connect(_ip_str, _port); + _client->connect(_ip_str, _dst_port); } else { - _client->connect(_ip_str, _port); + _client->connect(_ip_str, _dst_port); } } diff --git a/src/inputs/netprobe/TcpProbe.h b/src/inputs/netprobe/TcpProbe.h index fe598db2a..4452601ce 100644 --- a/src/inputs/netprobe/TcpProbe.h +++ b/src/inputs/netprobe/TcpProbe.h @@ -11,24 +11,13 @@ namespace visor::input::netprobe { -/** - * @class PingProbe - * @brief PingProbe class used for sending ICMP Echo Requests. - * - * This class is created for each specified target. However, it reuses a shared socket per thread (per UV_LOOP). - * I.e. each unique NetProbeInputStream with Ping Type will have a socket to send ICMP Echo Request. - */ class TcpProbe final : public NetProbe { - uint32_t _port; - uint16_t _client_port; + uint32_t _dst_port; bool _init{false}; bool _is_ipv4{false}; - uint16_t _internal_sequence{0}; std::string _ip_str; std::shared_ptr _interval_timer; - std::shared_ptr _internal_timer; - std::shared_ptr _timeout_timer; std::shared_ptr _client; @@ -37,8 +26,7 @@ class TcpProbe final : public NetProbe public: TcpProbe(uint16_t id, const std::string &name, const pcpp::IPAddress &ip, const std::string &dns, uint32_t port) : NetProbe(id, name, ip, dns) - , _port(port) - , _client_port(0){}; + , _dst_port(port) {}; ~TcpProbe() = default; bool start(std::shared_ptr io_loop) override; bool stop() override; From bf57ea7b0ca931bf13258d8b409aeb5b3564d27d Mon Sep 17 00:00:00 2001 From: Shannon Weyrick Date: Fri, 6 Oct 2023 14:50:57 -0400 Subject: [PATCH 3/4] remove header, add some comments --- src/handlers/netprobe/NetProbeStreamHandler.cpp | 2 ++ src/inputs/netprobe/PingProbe.cpp | 1 + src/inputs/netprobe/TcpProbe.cpp | 1 - 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/handlers/netprobe/NetProbeStreamHandler.cpp b/src/handlers/netprobe/NetProbeStreamHandler.cpp index 3ab77b5ae..6f0348f26 100644 --- a/src/handlers/netprobe/NetProbeStreamHandler.cpp +++ b/src/handlers/netprobe/NetProbeStreamHandler.cpp @@ -381,11 +381,13 @@ void NetProbeMetricsManager::process_netprobe_icmp(pcpp::IcmpLayer *layer, const if (layer->getMessageType() == pcpp::ICMP_ECHO_REQUEST) { if (auto request = layer->getEchoRequestData(); request != nullptr) { + // TODO this may need more work to identify that the packet came from this probe/target _request_reply_manager->start_transaction(target, {{stamp, {0, 0}}, target}); } live_bucket()->process_attempts(_deep_sampling_now, target); } else if (layer->getMessageType() == pcpp::ICMP_ECHO_REPLY) { if (auto reply = layer->getEchoReplyData(); reply != nullptr) { + // TODO this may need more work to identify that the packet came from this probe/target auto xact = _request_reply_manager->maybe_end_transaction(target, stamp); if (xact.first == Result::Valid) { live_bucket()->new_transaction(_deep_sampling_now, xact.second); diff --git a/src/inputs/netprobe/PingProbe.cpp b/src/inputs/netprobe/PingProbe.cpp index 8404515f1..0ee4985a7 100644 --- a/src/inputs/netprobe/PingProbe.cpp +++ b/src/inputs/netprobe/PingProbe.cpp @@ -184,6 +184,7 @@ bool PingProbe::start(std::shared_ptr io_loop) throw NetProbeException("PingProbe - unable to initialize AsyncHandle receiver"); } _recv_handler->on([this](const auto &, auto &) { + // TODO note this processes received packets across ALL active ping probes (because of the single receiver thread) for (auto &[packet, stamp] : PingReceiver::recv_packets) { _recv(packet, TestType::Ping, _name, stamp); } diff --git a/src/inputs/netprobe/TcpProbe.cpp b/src/inputs/netprobe/TcpProbe.cpp index a424ad3d1..3765937d4 100644 --- a/src/inputs/netprobe/TcpProbe.cpp +++ b/src/inputs/netprobe/TcpProbe.cpp @@ -11,7 +11,6 @@ #ifdef __GNUC__ #pragma GCC diagnostic pop #endif -#include namespace visor::input::netprobe { bool TcpProbe::start(std::shared_ptr io_loop) From 103056a48b54ee5b349e9cadac5b6e747e6508be Mon Sep 17 00:00:00 2001 From: Shannon Weyrick Date: Fri, 6 Oct 2023 15:26:23 -0400 Subject: [PATCH 4/4] switch icmp back to port based transaction id (via string) --- src/handlers/netprobe/NetProbeStreamHandler.cpp | 8 ++++---- src/inputs/netprobe/PingProbe.cpp | 4 +++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/handlers/netprobe/NetProbeStreamHandler.cpp b/src/handlers/netprobe/NetProbeStreamHandler.cpp index 6f0348f26..8fa5ff6b7 100644 --- a/src/handlers/netprobe/NetProbeStreamHandler.cpp +++ b/src/handlers/netprobe/NetProbeStreamHandler.cpp @@ -381,14 +381,14 @@ void NetProbeMetricsManager::process_netprobe_icmp(pcpp::IcmpLayer *layer, const if (layer->getMessageType() == pcpp::ICMP_ECHO_REQUEST) { if (auto request = layer->getEchoRequestData(); request != nullptr) { - // TODO this may need more work to identify that the packet came from this probe/target - _request_reply_manager->start_transaction(target, {{stamp, {0, 0}}, target}); + auto ping_id = (static_cast(request->header->id) << 16) | request->header->sequence; + _request_reply_manager->start_transaction(std::to_string(ping_id), {{stamp, {0, 0}}, target}); } live_bucket()->process_attempts(_deep_sampling_now, target); } else if (layer->getMessageType() == pcpp::ICMP_ECHO_REPLY) { if (auto reply = layer->getEchoReplyData(); reply != nullptr) { - // TODO this may need more work to identify that the packet came from this probe/target - auto xact = _request_reply_manager->maybe_end_transaction(target, stamp); + auto ping_id = (static_cast(reply->header->id) << 16) | reply->header->sequence; + auto xact = _request_reply_manager->maybe_end_transaction(std::to_string(ping_id), stamp); if (xact.first == Result::Valid) { live_bucket()->new_transaction(_deep_sampling_now, xact.second); } else if (xact.first == Result::TimedOut) { diff --git a/src/inputs/netprobe/PingProbe.cpp b/src/inputs/netprobe/PingProbe.cpp index 0ee4985a7..2b408ebcd 100644 --- a/src/inputs/netprobe/PingProbe.cpp +++ b/src/inputs/netprobe/PingProbe.cpp @@ -184,7 +184,9 @@ bool PingProbe::start(std::shared_ptr io_loop) throw NetProbeException("PingProbe - unable to initialize AsyncHandle receiver"); } _recv_handler->on([this](const auto &, auto &) { - // TODO note this processes received packets across ALL active ping probes (because of the single receiver thread) + // note this processes received packets across ALL active ping probes (because of the single receiver thread) + // the expectation is that packets which did not originate from this probe will be ignored by the handler attached to this probe, + // since it did not originate from it for (auto &[packet, stamp] : PingReceiver::recv_packets) { _recv(packet, TestType::Ping, _name, stamp); }