Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify and fix TCP netprobe probe #690

Merged
merged 4 commits into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions src/handlers/netprobe/NetProbeStreamHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void NetProbeStreamHandler::probe_signal_send(pcpp::Packet &payload, TestType ty
}
} else if (type == TestType::TCP) {
if (auto tcp = payload.getLayerOfType<pcpp::TcpLayer>(); tcp != nullptr) {
_metrics->process_netprobe_tcp(static_cast<uint32_t>(tcp->getSrcPort()), true, name, stamp);
_metrics->process_netprobe_tcp(true, name, stamp);
}
}
}
Expand All @@ -92,7 +92,7 @@ void NetProbeStreamHandler::probe_signal_recv(pcpp::Packet &payload, TestType ty
}
} else if (type == TestType::TCP) {
if (auto tcp = payload.getLayerOfType<pcpp::TcpLayer>(); tcp != nullptr) {
_metrics->process_netprobe_tcp(static_cast<uint32_t>(tcp->getDstPort()), false, name, stamp);
_metrics->process_netprobe_tcp(false, name, stamp);
}
}
}
Expand All @@ -119,6 +119,7 @@ void NetProbeMetricsBucket::specialized_merge(const AbstractMetricsBucket &o, Me
if (group_enabled(group::NetProbeMetrics::Counters)) {
_targets_metrics[targetId]->attempts += target.second->attempts;
_targets_metrics[targetId]->successes += target.second->successes;
_targets_metrics[targetId]->connect_failures += target.second->connect_failures;
_targets_metrics[targetId]->dns_failures += target.second->dns_failures;
_targets_metrics[targetId]->timed_out += target.second->timed_out;
}
Expand All @@ -143,6 +144,7 @@ void NetProbeMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelM
if (group_enabled(group::NetProbeMetrics::Counters)) {
target.second->attempts.to_prometheus(out, target_labels);
target.second->successes.to_prometheus(out, target_labels);
target.second->connect_failures.to_prometheus(out, target_labels);
target.second->dns_failures.to_prometheus(out, target_labels);
target.second->timed_out.to_prometheus(out, target_labels);
}
Expand Down Expand Up @@ -198,6 +200,7 @@ void NetProbeMetricsBucket::to_opentelemetry(metrics::v1::ScopeMetrics &scope, t
if (group_enabled(group::NetProbeMetrics::Counters)) {
target.second->attempts.to_opentelemetry(scope, start_ts, end_ts, target_labels);
target.second->successes.to_opentelemetry(scope, start_ts, end_ts, target_labels);
target.second->connect_failures.to_opentelemetry(scope, start_ts, end_ts, target_labels);
target.second->dns_failures.to_opentelemetry(scope, start_ts, end_ts, target_labels);
target.second->timed_out.to_opentelemetry(scope, start_ts, end_ts, target_labels);
}
Expand Down Expand Up @@ -252,6 +255,7 @@ void NetProbeMetricsBucket::to_json(json &j) const
if (group_enabled(group::NetProbeMetrics::Counters)) {
target.second->attempts.to_json(j["targets"][targetId]);
target.second->successes.to_json(j["targets"][targetId]);
target.second->connect_failures.to_json(j["targets"][targetId]);
target.second->dns_failures.to_json(j["targets"][targetId]);
target.second->timed_out.to_json(j["targets"][targetId]);
}
Expand Down Expand Up @@ -311,9 +315,12 @@ void NetProbeMetricsBucket::process_failure(ErrorType error, const std::string &
break;
case ErrorType::Timeout:
++_targets_metrics[target]->timed_out;
break;
case ErrorType::SocketError:
case ErrorType::InvalidIp:
case ErrorType::ConnectionFailure:
case ErrorType::ConnectFailure:
++_targets_metrics[target]->connect_failures;
break;
default:
break;
}
Expand Down Expand Up @@ -374,12 +381,14 @@ void NetProbeMetricsManager::process_netprobe_icmp(pcpp::IcmpLayer *layer, const

if (layer->getMessageType() == pcpp::ICMP_ECHO_REQUEST) {
if (auto request = layer->getEchoRequestData(); request != nullptr) {
_request_reply_manager->start_transaction((static_cast<uint32_t>(request->header->id) << 16) | request->header->sequence, {{stamp, {0, 0}}, target});
auto ping_id = (static_cast<uint32_t>(request->header->id) << 16) | request->header->sequence;
_request_reply_manager->start_transaction(std::to_string(ping_id), {{stamp, {0, 0}}, target});
}
live_bucket()->process_attempts(_deep_sampling_now, target);
} else if (layer->getMessageType() == pcpp::ICMP_ECHO_REPLY) {
if (auto reply = layer->getEchoReplyData(); reply != nullptr) {
auto xact = _request_reply_manager->maybe_end_transaction((static_cast<uint32_t>(reply->header->id) << 16) | reply->header->sequence, stamp);
auto ping_id = (static_cast<uint32_t>(reply->header->id) << 16) | reply->header->sequence;
auto xact = _request_reply_manager->maybe_end_transaction(std::to_string(ping_id), stamp);
if (xact.first == Result::Valid) {
live_bucket()->new_transaction(_deep_sampling_now, xact.second);
} else if (xact.first == Result::TimedOut) {
Expand All @@ -389,16 +398,16 @@ void NetProbeMetricsManager::process_netprobe_icmp(pcpp::IcmpLayer *layer, const
}
}

void NetProbeMetricsManager::process_netprobe_tcp(uint32_t port, bool send, const std::string &target, timespec stamp)
void NetProbeMetricsManager::process_netprobe_tcp(bool send, const std::string &target, timespec stamp)
{
// base event
new_event(stamp);

if (send) {
_request_reply_manager->start_transaction(port, {{stamp, {0, 0}}, target});
_request_reply_manager->start_transaction(target, {{stamp, {0, 0}}, target});
live_bucket()->process_attempts(_deep_sampling_now, target);
} else {
auto xact = _request_reply_manager->maybe_end_transaction(port, stamp);
auto xact = _request_reply_manager->maybe_end_transaction(target, stamp);
if (xact.first == Result::Valid) {
live_bucket()->new_transaction(_deep_sampling_now, xact.second);
} else if (xact.first == Result::TimedOut) {
Expand Down
8 changes: 5 additions & 3 deletions src/handlers/netprobe/NetProbeStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ struct Target {
Counter successes;
Counter minimum;
Counter maximum;
Counter connect_failures;
Counter dns_failures;
Counter timed_out;

Expand All @@ -59,7 +60,8 @@ struct Target {
, successes(NET_PROBE_SCHEMA, {"successes"}, "Total Net Probe successes")
, minimum(NET_PROBE_SCHEMA, {"response_min_us"}, "Minimum response time measured in the reporting interval")
, maximum(NET_PROBE_SCHEMA, {"response_max_us"}, "Maximum response time measured in the reporting interval")
, dns_failures(NET_PROBE_SCHEMA, {"dns_lookup_failures"}, "Total Net Probe failures when performed DNS lookup")
, connect_failures(NET_PROBE_SCHEMA, {"connect_failures"}, "Total Net Probe failures when performing a TCP socket connection")
, dns_failures(NET_PROBE_SCHEMA, {"dns_lookup_failures"}, "Total Net Probe failures when performing a DNS lookup")
, timed_out(NET_PROBE_SCHEMA, {"packets_timeout"}, "Total Net Probe timeout transactions")
{
}
Expand Down Expand Up @@ -98,7 +100,7 @@ class NetProbeMetricsBucket final : public visor::AbstractMetricsBucket

class NetProbeMetricsManager final : public visor::AbstractMetricsManager<NetProbeMetricsBucket>
{
typedef TransactionManager<uint32_t, NetProbeTransaction, std::hash<uint32_t>> NetProbeTransactionManager;
typedef TransactionManager<std::string, NetProbeTransaction, std::hash<std::string>> NetProbeTransactionManager;
std::unique_ptr<NetProbeTransactionManager> _request_reply_manager;

public:
Expand All @@ -122,7 +124,7 @@ class NetProbeMetricsManager final : public visor::AbstractMetricsManager<NetPro
void process_filtered(timespec stamp);
void process_failure(ErrorType error, const std::string &target);
void process_netprobe_icmp(pcpp::IcmpLayer *layer, const std::string &target, timespec stamp);
void process_netprobe_tcp(uint32_t port, bool send, const std::string &target, timespec stamp);
void process_netprobe_tcp(bool send, const std::string &target, timespec stamp);
};

class NetProbeStreamHandler final : public visor::StreamMetricsHandler<NetProbeMetricsManager>
Expand Down
2 changes: 1 addition & 1 deletion src/inputs/netprobe/NetProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ enum class ErrorType {
SocketError,
DnsLookupFailure,
InvalidIp,
ConnectionFailure
ConnectFailure
};

enum class TestType {
Expand Down
3 changes: 3 additions & 0 deletions src/inputs/netprobe/PingProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ bool PingProbe::start(std::shared_ptr<uvw::Loop> io_loop)
throw NetProbeException("PingProbe - unable to initialize AsyncHandle receiver");
}
_recv_handler->on<uvw::AsyncEvent>([this](const auto &, auto &) {
// note this processes received packets across ALL active ping probes (because of the single receiver thread)
// the expectation is that packets which did not originate from this probe will be ignored by the handler attached to this probe,
// since it did not originate from it
for (auto &[packet, stamp] : PingReceiver::recv_packets) {
_recv(packet, TestType::Ping, _name, stamp);
}
Expand Down
54 changes: 13 additions & 41 deletions src/inputs/netprobe/TcpProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,44 +31,17 @@ bool TcpProbe::start(std::shared_ptr<uvw::Loop> io_loop)
}

_interval_timer->on<uvw::TimerEvent>([this](const auto &, auto &) {
_internal_sequence = 0;
_timeout_timer = _io_loop->resource<uvw::TimerHandle>();
if (!_timeout_timer) {
throw NetProbeException("Netprobe - unable to initialize timeout TimerHandle");
}

_timeout_timer->on<uvw::TimerEvent>([this](const auto &, auto &) {
_internal_sequence = _config.packets_per_test;
_fail(ErrorType::Timeout, TestType::Ping, _name);
if (_internal_timer) {
_internal_timer->stop();
}
_interval_timer->again();
});

if (!_dns.empty()) {
auto [ip, ipv4] = _resolve_dns();
_ip_str = ip;
_is_ipv4 = ipv4;
if (_ip_str.empty()) {
_fail(ErrorType::DnsLookupFailure, TestType::Ping, _name);
_fail(ErrorType::DnsLookupFailure, TestType::TCP, _name);
return;
}
}

_internal_timer = _io_loop->resource<uvw::TimerHandle>();
_internal_timer->on<uvw::TimerEvent>([this](const auto &, auto &) {
if (_internal_sequence < _config.packets_per_test) {
_internal_sequence++;
_timeout_timer->stop();
_timeout_timer->start(uvw::TimerHandle::Time{_config.timeout_msec}, uvw::TimerHandle::Time{0});
_perform_tcp_process();
}
});
_timeout_timer->start(uvw::TimerHandle::Time{_config.timeout_msec}, uvw::TimerHandle::Time{0});
_perform_tcp_process();
_internal_sequence++;
_internal_timer->start(uvw::TimerHandle::Time{_config.packets_interval_msec}, uvw::TimerHandle::Time{_config.packets_interval_msec});
});

_interval_timer->start(uvw::TimerHandle::Time{0}, uvw::TimerHandle::Time{_config.interval_msec});
Expand All @@ -80,33 +53,32 @@ void TcpProbe::_perform_tcp_process()
{
_client = _io_loop->resource<uvw::TCPHandle>();
_client->on<uvw::ErrorEvent>([this](const auto &, auto &) {
_fail(ErrorType::ConnectionFailure, TestType::Ping, _name);
_fail(ErrorType::ConnectFailure, TestType::TCP, _name);
});
_client->once<uvw::CloseEvent>([this](const uvw::CloseEvent &, uvw::TCPHandle &) {
timespec stamp;
std::timespec_get(&stamp, TIME_UTC);
pcpp::Packet packet;
auto layer = pcpp::TcpLayer(static_cast<uint16_t>(_port), _client_port);
packet.addLayer(&layer);
_recv(packet, TestType::TCP, _name, stamp);
});
_client->once<uvw::ShutdownEvent>([](const uvw::ShutdownEvent &, uvw::TCPHandle &handle) {
_client->once<uvw::ShutdownEvent>([this](const uvw::ShutdownEvent &, uvw::TCPHandle &handle) {
handle.close();
});
_client->once<uvw::ConnectEvent>([this](const uvw::ConnectEvent &, uvw::TCPHandle &handle) {
timespec stamp;
std::timespec_get(&stamp, TIME_UTC);
_client_port = static_cast<uint16_t>(handle.sock().port);
pcpp::Packet packet;
auto layer = pcpp::TcpLayer(_client_port, static_cast<uint16_t>(_port));
auto layer = pcpp::TcpLayer(0, static_cast<uint16_t>(_dst_port));
packet.addLayer(&layer);
_send(packet, TestType::TCP, _name, stamp);
_recv(packet, TestType::TCP, _name, stamp);
handle.shutdown();
});
timespec stamp;
std::timespec_get(&stamp, TIME_UTC);
pcpp::Packet packet;
auto layer = pcpp::TcpLayer(0, static_cast<uint16_t>(_dst_port));
packet.addLayer(&layer);
_send(packet, TestType::TCP, _name, stamp);
if (_is_ipv4) {
_client->connect<uvw::TCPHandle::IPv4>(_ip_str, _port);
_client->connect<uvw::TCPHandle::IPv4>(_ip_str, _dst_port);
} else {
_client->connect<uvw::TCPHandle::IPv6>(_ip_str, _port);
_client->connect<uvw::TCPHandle::IPv6>(_ip_str, _dst_port);
}
}

Expand Down
16 changes: 2 additions & 14 deletions src/inputs/netprobe/TcpProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,13 @@

namespace visor::input::netprobe {

/**
* @class PingProbe
* @brief PingProbe class used for sending ICMP Echo Requests.
*
* This class is created for each specified target. However, it reuses a shared socket per thread (per UV_LOOP).
* I.e. each unique NetProbeInputStream with Ping Type will have a socket to send ICMP Echo Request.
*/
class TcpProbe final : public NetProbe
{
uint32_t _port;
uint16_t _client_port;
uint32_t _dst_port;
bool _init{false};
bool _is_ipv4{false};
uint16_t _internal_sequence{0};
std::string _ip_str;
std::shared_ptr<uvw::TimerHandle> _interval_timer;
std::shared_ptr<uvw::TimerHandle> _internal_timer;
std::shared_ptr<uvw::TimerHandle> _timeout_timer;

std::shared_ptr<uvw::TCPHandle> _client;

Expand All @@ -37,8 +26,7 @@ class TcpProbe final : public NetProbe
public:
TcpProbe(uint16_t id, const std::string &name, const pcpp::IPAddress &ip, const std::string &dns, uint32_t port)
: NetProbe(id, name, ip, dns)
, _port(port)
, _client_port(0){};
, _dst_port(port) {};
~TcpProbe() = default;
bool start(std::shared_ptr<uvw::Loop> io_loop) override;
bool stop() override;
Expand Down
Loading