Skip to content

Commit

Permalink
ref: register metrics and lock outside collector
Browse files Browse the repository at this point in the history
This helps fix some concurrency problems by registering the metrics once
outside the collector and introducing a mutex lock when values get
updated
  • Loading branch information
wbollock committed Dec 11, 2023
1 parent 5b21991 commit 4450fb3
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 93 deletions.
151 changes: 59 additions & 92 deletions internal/collector/icmp_collector.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package collector

import (
"math"
"net/http"
"strconv"
"strings"
"sync"
"time"

probing "github.com/prometheus-community/pro-bing"
Expand All @@ -14,7 +14,6 @@ import (
)

const (
namespace = "ping_"
defaultTimeout = time.Second * 10
defaultInterval = time.Second
defaultCount = 5
Expand All @@ -26,41 +25,6 @@ const (
minPacketSize = 24
)

var (
pingSuccessGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: namespace + "success",
Help: "Returns whether the ping succeeded",
})
pingTimeoutGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: namespace + "timeout",
Help: "Returns whether the ping failed by timeout",
})
probeDurationGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: namespace + "duration_seconds",
Help: "Returns how long the probe took to complete in seconds",
})
minGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: namespace + "rtt_min_seconds",
Help: "Best round trip time",
})
maxGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: namespace + "rtt_max_seconds",
Help: "Worst round trip time",
})
avgGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: namespace + "rtt_avg_seconds",
Help: "Mean round trip time",
})
stddevGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: namespace + "rtt_std_deviation",
Help: "Standard deviation",
})
lossGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: namespace + "loss_ratio",
Help: "Packet loss from 0 to 100",
})
)

type pingParams struct {
target string
timeout time.Duration
Expand Down Expand Up @@ -146,73 +110,76 @@ func serveMetricsWithError(w http.ResponseWriter, r *http.Request, registry *pro
}
}

func PingHandler(w http.ResponseWriter, r *http.Request) {
p := parseParams(r)
start := time.Now()
func PingHandler(registry *prometheus.Registry, pingSuccessGauge prometheus.Gauge, pingTimeoutGauge prometheus.Gauge, probeDurationGauge prometheus.Gauge, minGauge prometheus.Gauge, maxGauge prometheus.Gauge, avgGauge prometheus.Gauge, stddevGauge prometheus.Gauge, lossGauge prometheus.Gauge, mutex *sync.Mutex) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {

registry := prometheus.NewRegistry()
registry.MustRegister(pingSuccessGauge, pingTimeoutGauge, probeDurationGauge, minGauge, maxGauge, avgGauge, stddevGauge, lossGauge)
p := parseParams(r)
start := time.Now()

// assume failure
pingSuccessGauge.Set(0)
pingTimeoutGauge.Set(1)
// TODO use atomic lock and reduce lock duration, dont think this is needed
mutex.Lock()

log.Debugf("Request received with parameters: target=%v, count=%v, size=%v, interval=%v, timeout=%v, ttl=%v, packet=%v",
p.target, p.count, p.size, p.interval, p.timeout, p.ttl, p.packet)
// assume failure
pingSuccessGauge.Set(0)
pingTimeoutGauge.Set(1)

pinger := probing.New(p.target)
mutex.Unlock()

pinger.Count = p.count
pinger.Size = p.size
pinger.Interval = p.interval
pinger.Timeout = p.timeout
pinger.TTL = p.ttl
log.Debugf("Request received with parameters: target=%v, count=%v, size=%v, interval=%v, timeout=%v, ttl=%v, packet=%v",
p.target, p.count, p.size, p.interval, p.timeout, p.ttl, p.packet)

if p.packet == "icmp" {
pinger.SetPrivileged(true)
} else {
pinger.SetPrivileged(false)
}
pinger := probing.New(p.target)

if p.protocol == "v6" || p.protocol == "6" || p.protocol == "ip6" {
pinger.SetNetwork("ip6")
} else {
pinger.SetNetwork("ip4")
}
pinger.Count = p.count
pinger.Size = p.size
pinger.Interval = p.interval
pinger.Timeout = p.timeout
pinger.TTL = p.ttl

pinger.OnRecv = func(pkt *probing.Packet) {
log.Debugf("%d bytes from %s: icmp_seq=%d time=%v ttl=%v\n",
pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt, pkt.TTL)
}
if p.packet == "icmp" {
pinger.SetPrivileged(true)
} else {
pinger.SetPrivileged(false)
}

pinger.OnDuplicateRecv = func(pkt *probing.Packet) {
log.Debugf("%d bytes from %s: icmp_seq=%d time=%v ttl=%v (DUP!)\n",
pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt, pkt.TTL)
}
if p.protocol == "v6" || p.protocol == "6" || p.protocol == "ip6" {
pinger.SetNetwork("ip6")
} else {
pinger.SetNetwork("ip4")
}

pinger.OnFinish = func(stats *probing.Statistics) {
log.Debugf("OnFinish: target=%v, PacketsSent=%d, PacketsRecv=%d, PacketLoss=%f%%, MinRtt=%v, AvgRtt=%v, MaxRtt=%v, StdDevRtt=%v, Duration=%v",
stats.IPAddr, pinger.PacketsSent, pinger.PacketsRecv, stats.PacketLoss, stats.MinRtt, stats.AvgRtt, stats.MaxRtt, stats.StdDevRtt, time.Since(start))

// lock while we attribute values to
mutex.Lock()
if pinger.PacketsRecv > 0 && pinger.Timeout > time.Since(start) {
log.Debugf("Ping successful: target=%v", stats.IPAddr)
pingSuccessGauge.Set(1)
pingTimeoutGauge.Set(0)
} else if pinger.Timeout < time.Since(start) {
log.Debugf("Ping timeout: target=%v", stats.IPAddr)
pingTimeoutGauge.Set(1)
pingSuccessGauge.Set(0)
} else if pinger.PacketsRecv == 0 {
log.Debugf("Ping failed, no packets received: target=%v", stats.IPAddr)
pingSuccessGauge.Set(0)
pingTimeoutGauge.Set(0)
}

pinger.OnFinish = func(stats *probing.Statistics) {
log.Debugf("OnFinish: PacketsSent=%d, PacketsRecv=%d, PacketLoss=%f%%, MinRtt=%v, AvgRtt=%v, MaxRtt=%v, StdDevRtt=%v, Duration=%v",
pinger.PacketsSent, pinger.PacketsRecv, stats.PacketLoss, stats.MinRtt, stats.AvgRtt, stats.MaxRtt, stats.StdDevRtt, time.Since(start))

const tolerance = 0.001 // tolerance for easier floating point comparisons
// success declared if we sent as many packets as intended, we got back all packets sent, and packet loss is not equal to 100%
if pinger.Count == pinger.PacketsRecv && math.Abs(stats.PacketLoss-100) > tolerance {
// no error will be raised if we reach a timeout
// https://github.com/prometheus-community/pro-bing/issues/70
pingSuccessGauge.Set(1)
pingTimeoutGauge.Set(0)
minGauge.Set(stats.MinRtt.Seconds())
avgGauge.Set(stats.AvgRtt.Seconds())
maxGauge.Set(stats.MaxRtt.Seconds())
stddevGauge.Set(float64(stats.StdDevRtt))
lossGauge.Set(stats.PacketLoss)
probeDurationGauge.Set(time.Since(start).Seconds())
mutex.Unlock()
}
minGauge.Set(stats.MinRtt.Seconds())
avgGauge.Set(stats.AvgRtt.Seconds())
maxGauge.Set(stats.MaxRtt.Seconds())
stddevGauge.Set(float64(stats.StdDevRtt))
lossGauge.Set(stats.PacketLoss)
probeDurationGauge.Set(time.Since(start).Seconds())
}

if err := pinger.Run(); err != nil {
log.Error("Failed to ping target host:", err)
if err := pinger.Run(); err != nil {
log.Error("Failed to ping target host:", err)
}
serveMetricsWithError(w, r, registry)
}
serveMetricsWithError(w, r, registry)
}
45 changes: 44 additions & 1 deletion internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"fmt"
"net/http"
"net/http/pprof"
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"github.com/wbollock/ping_exporter/internal/collector"
Expand All @@ -21,11 +23,52 @@ const (
defaultMetricsPath = "/metrics"
)

const namespace = "ping_"

var (
pingSuccessGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: namespace + "success",
Help: "Returns whether the ping succeeded",
})
pingTimeoutGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: namespace + "timeout",
Help: "Returns whether the ping failed by timeout",
})
probeDurationGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: namespace + "duration_seconds",
Help: "Returns how long the probe took to complete in seconds",
})
minGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: namespace + "rtt_min_seconds",
Help: "Best round trip time",
})
maxGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: namespace + "rtt_max_seconds",
Help: "Worst round trip time",
})
avgGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: namespace + "rtt_avg_seconds",
Help: "Mean round trip time",
})
stddevGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: namespace + "rtt_std_deviation",
Help: "Standard deviation",
})
lossGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: namespace + "loss_ratio",
Help: "Packet loss from 0 to 100",
})
)

func SetupServer() http.Handler {
mux := http.NewServeMux()

mux.Handle(defaultMetricsPath, promhttp.Handler())
mux.HandleFunc("/probe", collector.PingHandler)

var mutex sync.Mutex
registry := prometheus.NewRegistry()
registry.MustRegister(pingSuccessGauge, pingTimeoutGauge, probeDurationGauge, minGauge, maxGauge, avgGauge, stddevGauge, lossGauge)
mux.HandleFunc("/probe", collector.PingHandler(registry, pingSuccessGauge, pingTimeoutGauge, probeDurationGauge, minGauge, maxGauge, avgGauge, stddevGauge, lossGauge, &mutex))

// for non-standard web servers, need to register handlers
mux.HandleFunc("/debug/pprof/", http.HandlerFunc(pprof.Index))
Expand Down

0 comments on commit 4450fb3

Please sign in to comment.