Skip to content

Commit

Permalink
Moving to using a home grown ping engine. (#739)
Browse files Browse the repository at this point in the history
* bump to golang 1.22
  • Loading branch information
i3149 authored Aug 23, 2024
1 parent 9a43250 commit 83a2bb7
Show file tree
Hide file tree
Showing 23 changed files with 611 additions and 87 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/create-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: '1.21.6'
go-version: '1.22.6'

- name: Get Deps
shell: bash
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: '1.21.6'
go-version: '1.22.6'

- name: Set up QEMU
uses: docker/setup-qemu-action@v1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-eapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: '1.21.6'
go-version: '1.22.6'

- name: Set up QEMU
uses: docker/setup-qemu-action@v1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-kentik.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: '1.21.6'
go-version: '1.22.6'

- name: Set up QEMU
uses: docker/setup-qemu-action@v1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-next.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: '1.21.6'
go-version: '1.22.6'

- name: Set up QEMU
uses: docker/setup-qemu-action@v1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-packages.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: '1.21.6'
go-version: '1.22.6'

- name: Get Deps
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: '1.21.6'
go-version: '1.22.6'

- name: Set up QEMU
uses: docker/setup-qemu-action@v1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-staging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: '1.21.6'
go-version: '1.22.6'

- name: Set up QEMU
uses: docker/setup-qemu-action@v1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: '1.21.6'
go-version: '1.22.6'

- name: Install dependencies
run: sudo apt-get install make libpcap-dev
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-on-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: '1.21.6'
go-version: '1.22.6'

- name: Install dependencies
run: sudo apt-get install make libpcap-dev
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# build ktranslate
FROM golang:1.21-alpine as build
FROM golang:1.22-alpine as build
RUN apk add -U libpcap-dev alpine-sdk bash libcap
COPY . /src
WORKDIR /src
Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module github.com/kentik/ktranslate

go 1.21
go 1.22

toolchain go1.21.6
toolchain go1.22.1

require (
cloud.google.com/go/pubsub v1.36.1
Expand Down Expand Up @@ -58,6 +58,8 @@ require (
go.opentelemetry.io/otel/sdk v1.25.0
go.opentelemetry.io/otel/sdk/metric v1.25.0
go.starlark.net v0.0.0-20220926145019-14b050677505
golang.org/x/net v0.26.0
gonum.org/v1/gonum v0.15.1
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.33.0
Expand Down Expand Up @@ -142,7 +144,6 @@ require (
go.opentelemetry.io/otel/trace v1.25.0 // indirect
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/oauth2 v0.17.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down Expand Up @@ -1281,6 +1283,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.15.1 h1:FNy7N6OUZVUaWG9pTiD+jlhdQ3lMP+/LcTpJ6+a8sQ0=
gonum.org/v1/gonum v0.15.1/go.mod h1:eZTZuRFrzu5pcyjN5wJhcIhnUdNijYxX1T2IcrOGY0o=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
Expand Down
19 changes: 8 additions & 11 deletions pkg/inputs/snmp/metrics/device_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type DeviceMetrics struct {
profileName string
oids map[string]*kt.Mib
missing map[string]bool
lastSent int
}

func NewDeviceMetrics(gconf *kt.SnmpGlobalConfig, conf *kt.SnmpDeviceConfig, metrics *kt.SnmpDeviceMetric, profileMetrics map[string]*kt.Mib, profile *mibs.Profile, log logger.ContextL) *DeviceMetrics {
Expand Down Expand Up @@ -406,7 +405,14 @@ func (dm *DeviceMetrics) GetPingStats(ctx context.Context, pinger *ping.Pinger)
}

mib, oid := dm.profile.GetMibAndOid()
stats := pinger.Statistics()
stats, err := pinger.Ping()
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") { // We are exiting.
return nil, false, nil
}
return nil, false, err
}

dst := kt.NewJCHF()
dst.CustomStr = map[string]string{}
dst.CustomInt = map[string]int32{}
Expand All @@ -426,15 +432,6 @@ func (dm *DeviceMetrics) GetPingStats(ctx context.Context, pinger *ping.Pinger)
dst.CustomBigInt["StdDevRtt"] = stats.StdDevRtt.Microseconds()
dst.CustomMetrics["StdDevRtt"] = kt.MetricInfo{Oid: oid, Mib: mib, Format: kt.FloatMS, Profile: "ping", Type: "ping"}

// Guard on case we have 1 over the normal number of pings sent.
if dm.lastSent > 0 {
if dm.lastSent-stats.PacketsSent == -1 { // There's a bug where were sometimes we sent one too many pings on the boundry.
stats.PacketsSent = dm.lastSent
}
}

dm.lastSent = stats.PacketsSent

percnt := 0.0
if stats.PacketsSent > 0 && stats.PacketsRecv <= stats.PacketsSent { // Make sure that if there's more packets recieved than sent we don't get confused.
percnt = float64(stats.PacketsSent-stats.PacketsRecv) / float64(stats.PacketsSent) * 100.
Expand Down
77 changes: 47 additions & 30 deletions pkg/inputs/snmp/metrics/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (
)

const (
STATUS_CHECK_TIME = 60 * time.Second
STATUS_CHECK_TIME = 60 * time.Second
DefaultPingTimeout = 1000 * time.Millisecond
)

type Poller struct {
Expand Down Expand Up @@ -101,6 +102,12 @@ func NewPollerForPing(gconf *kt.SnmpGlobalConfig, conf *kt.SnmpDeviceConfig, jch
pingSec = 60
}

// How long to wait for a response back.
timeout := time.Millisecond * time.Duration(conf.TimeoutMS)
if timeout == 0 {
timeout = DefaultPingTimeout
}

poller := Poller{
jchfChan: jchfChan,
log: log,
Expand All @@ -112,12 +119,12 @@ func NewPollerForPing(gconf *kt.SnmpGlobalConfig, conf *kt.SnmpDeviceConfig, jch
gconf: gconf,
}

p, err := ping.NewPinger(log, conf.DeviceIP, pingSec)
p, err := ping.NewPinger(log, conf.DeviceIP, pingSec, timeout)
if err != nil {
log.Errorf("Cannot setup ping service for %s -> %s: %v", err, conf.DeviceIP, conf.DeviceName)
} else {
poller.pinger = p
log.Infof("Enabling response time service for %s -> %s", conf.DeviceIP, conf.DeviceName)
log.Infof("Enabling response time service for %s -> %s with timeout %v", conf.DeviceIP, conf.DeviceName, timeout)
}

return &poller
Expand Down Expand Up @@ -271,6 +278,7 @@ func (p *Poller) StartPingOnlyLoop(ctx context.Context) {
return
}

p.pinger.Start(ctx)
counterAlignment := time.Duration(p.counterTimeSec) * time.Second
jitterWindow := time.Duration(p.jitterTimeSec) * time.Second
firstCollection := time.Now().Truncate(counterAlignment).Add(counterAlignment).Add(time.Duration(rand.Int63n(int64(jitterWindow))))
Expand All @@ -279,36 +287,44 @@ func (p *Poller) StartPingOnlyLoop(ctx context.Context) {
fastTick := time.Duration(kt.LookupEnvInt("KENTIK_FAST_PING_TICK_SEC", 10)) * time.Second
slowTick := time.Duration(p.pingSec) * time.Second

p.log.Infof("snmpPing: First run will be at %v. Running every %v", firstCollection, counterAlignment)
p.pinger.Reset(slowTick, p.counterTimeSec/p.pingSec)
p.log.Infof("snmpPing: First run will be at %v. Running every %v, Sending %d probes", firstCollection, counterAlignment, p.counterTimeSec/p.pingSec)

go func() {
seenGoodPacketLoss := true
runningFast := false
for {
select {
case _ = <-counterCheck.C:
flows, isTotalLoss, err := p.deviceMetrics.GetPingStats(ctx, p.pinger)
if err != nil {
p.log.Warnf("There was an error when getting ping stats: %v.", err)
continue
}

// Reset so that we don't keep avg/min/max across checks.
p.pinger.Reset(slowTick)

// Send data on.
p.jchfChan <- flows

if !isTotalLoss { // We don't want to go back into fast polling unless we get <100% packet loss at some point.
seenGoodPacketLoss = true
}

// If there's total loss, go to fast polling but only if we haven't been here before.
if p.gconf.FastPoll && isTotalLoss && seenGoodPacketLoss {
p.log.Warnf("Starting fast ping operation due to 100% packet loss.")
ctxT, cancel := context.WithTimeout(ctx, fastDuration)
p.runFastPoll(ctxT, fastTick, fastDuration, slowTick)
cancel() // Done with fast polling.
seenGoodPacketLoss = false
}
go func() {
if runningFast {
return
}

flows, isTotalLoss, err := p.deviceMetrics.GetPingStats(ctx, p.pinger)
if err != nil {
p.log.Warnf("There was an error when getting ping stats: %v.", err)
return
}

// Send data on.
p.jchfChan <- flows

if !isTotalLoss { // We don't want to go back into fast polling unless we get <100% packet loss at some point.
seenGoodPacketLoss = true
}

// If there's total loss, go to fast polling but only if we haven't been here before.
if p.gconf.FastPoll && isTotalLoss && seenGoodPacketLoss && !runningFast {
p.log.Warnf("Starting fast ping operation due to 100 percent packet loss.")
runningFast = true
ctxT, cancel := context.WithTimeout(ctx, fastDuration)
p.runFastPoll(ctxT, fastTick, fastDuration, slowTick)
cancel() // Done with fast polling.
seenGoodPacketLoss = false
runningFast = false
}
}()

case <-ctx.Done():
p.log.Infof("Metrics PingOnly Done")
Expand All @@ -327,11 +343,12 @@ func (p *Poller) runFastPoll(ctx context.Context, fastTick time.Duration, fastDu

defer func() { // When we leave this loop, return to slow polling.
fastCheck.Stop()
p.pinger.Reset(slowTick)
p.pinger.Reset(slowTick, p.counterTimeSec/p.pingSec)
p.log.Infof("Fast ping done.")
}()

// But for now we need fast polling.
p.pinger.Reset(fastTick)
p.pinger.Reset(1*time.Second, int(fastTick.Seconds())) // Run every second.

for {
select {
Expand Down
17 changes: 17 additions & 0 deletions pkg/inputs/snmp/ping/kaping/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package kaping

import "net/netip"

type Config struct {
BindAddr4 netip.Addr
BindAddr6 netip.Addr
RawSocket bool
}

func DefaultConfig() Config {
return Config{
BindAddr4: netip.MustParseAddr("0.0.0.0"),
BindAddr6: netip.MustParseAddr("::"),
RawSocket: false,
}
}
Loading

0 comments on commit 83a2bb7

Please sign in to comment.