Skip to content

Commit

Permalink
go.d.plugin: execute local-listeners periodically (netdata#17160)
Browse files Browse the repository at this point in the history
  • Loading branch information
ilyam8 authored Mar 14, 2024
1 parent 9a07f50 commit ddb6190
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ func NewDiscoverer(cfg Config) (*Discoverer, error) {
slog.String("discoverer", shortName),
),
cfgSource: cfg.Source,
interval: time.Second * 60,
ll: &localListenersExec{
binPath: filepath.Join(dir, "local-listeners"),
timeout: time.Second * 5,
},
interval: time.Minute * 2,
expiryTime: time.Minute * 10,
cache: make(map[uint64]*cacheItem),
started: make(chan struct{}),
}

d.Tags().Merge(tags)

return d, nil
Expand All @@ -72,6 +76,15 @@ type (

interval time.Duration
ll localListeners

expiryTime time.Duration
cache map[uint64]*cacheItem // [target.Hash]

started chan struct{}
}
cacheItem struct {
lastSeenTime time.Time
tgt model.Target
}
localListeners interface {
discover(ctx context.Context) ([]byte, error)
Expand All @@ -83,25 +96,30 @@ func (d *Discoverer) String() string {
}

func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) {
d.Info("instance is started")
defer func() { d.Info("instance is stopped") }()

close(d.started)

if err := d.discoverLocalListeners(ctx, in); err != nil {
d.Error(err)
return
}

//tk := time.NewTicker(d.interval)
//defer tk.Stop()
//
//for {
// select {
// case <-ctx.Done():
// return
// case <-tk.C:
// if err := d.discoverLocalListeners(ctx, in); err != nil {
// d.Error(err)
// return
// }
// }
//}
tk := time.NewTicker(d.interval)
defer tk.Stop()

for {
select {
case <-ctx.Done():
return
case <-tk.C:
if err := d.discoverLocalListeners(ctx, in); err != nil {
d.Warning(err)
return
}
}
}
}

func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []model.TargetGroup) error {
Expand All @@ -113,11 +131,13 @@ func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []mod
return err
}

tggs, err := d.parseLocalListeners(bs)
tgts, err := d.parseLocalListeners(bs)
if err != nil {
return err
}

tggs := d.processTargets(tgts)

select {
case <-ctx.Done():
case in <- tggs:
Expand All @@ -126,7 +146,42 @@ func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []mod
return nil
}

func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.TargetGroup, error) {
func (d *Discoverer) processTargets(tgts []model.Target) []model.TargetGroup {
tgg := &targetGroup{
provider: fullName,
source: fmt.Sprintf("discoverer=%s,host=localhost", shortName),
}
if d.cfgSource != "" {
tgg.source += fmt.Sprintf(",%s", d.cfgSource)
}

if d.expiryTime.Milliseconds() == 0 {
tgg.targets = tgts
return []model.TargetGroup{tgg}
}

now := time.Now()

for _, tgt := range tgts {
hash := tgt.Hash()
if _, ok := d.cache[hash]; !ok {
d.cache[hash] = &cacheItem{tgt: tgt}
}
d.cache[hash].lastSeenTime = now
}

for k, v := range d.cache {
if now.Sub(v.lastSeenTime) > d.expiryTime {
delete(d.cache, k)
continue
}
tgg.targets = append(tgg.targets, v.tgt)
}

return []model.TargetGroup{tgg}
}

func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.Target, error) {
var tgts []model.Target

sc := bufio.NewScanner(bytes.NewReader(bs))
Expand Down Expand Up @@ -161,17 +216,7 @@ func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.TargetGroup, error)
tgts = append(tgts, &tgt)
}

tgg := &targetGroup{
provider: fullName,
source: fmt.Sprintf("discoverer=%s,host=localhost", shortName),
targets: tgts,
}

if d.cfgSource != "" {
tgg.source += fmt.Sprintf(",%s", d.cfgSource)
}

return []model.TargetGroup{tgg}, nil
return tgts, nil
}

type localListenersExec struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,23 @@
package netlisteners

import (
"context"
"errors"
"testing"
"time"

"github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
)

var (
localListenersOutputSample = []byte(`
UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D
TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D
TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D
UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1
`)
)

func TestDiscoverer_Discover(t *testing.T) {
tests := map[string]discoverySim{
"valid response": {
mock: &mockLocalListenersExec{},
wantDoneBeforeCancel: false,
wantTargetGroups: []model.TargetGroup{&targetGroup{
"add listeners": {
listenersCli: func(cli listenersCli, interval, expiry time.Duration) {
cli.addListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
cli.addListener("TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
cli.addListener("TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
cli.addListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1")
time.Sleep(interval * 2)
},
wantGroups: []model.TargetGroup{&targetGroup{
provider: "sd:net_listeners",
source: "discoverer=net_listeners,host=localhost",
targets: []model.Target{
Expand Down Expand Up @@ -59,23 +54,83 @@ func TestDiscoverer_Discover(t *testing.T) {
},
}},
},
"empty response": {
mock: &mockLocalListenersExec{emptyResponse: true},
wantDoneBeforeCancel: false,
wantTargetGroups: []model.TargetGroup{&targetGroup{
"remove listeners; not expired": {
listenersCli: func(cli listenersCli, interval, expiry time.Duration) {
cli.addListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
cli.addListener("TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
cli.addListener("TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
cli.addListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1")
time.Sleep(interval * 2)
cli.removeListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
cli.removeListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1")
time.Sleep(interval * 2)
},
wantGroups: []model.TargetGroup{&targetGroup{
provider: "sd:net_listeners",
source: "discoverer=net_listeners,host=localhost",
targets: []model.Target{
withHash(&target{
Protocol: "UDP6",
Address: "::1",
Port: "8125",
Comm: "netdata",
Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D",
}),
withHash(&target{
Protocol: "TCP6",
Address: "::1",
Port: "8125",
Comm: "netdata",
Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D",
}),
withHash(&target{
Protocol: "TCP",
Address: "127.0.0.1",
Port: "8125",
Comm: "netdata",
Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D",
}),
withHash(&target{
Protocol: "UDP",
Address: "127.0.0.1",
Port: "53768",
Comm: "go.d.plugin",
Cmdline: "/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1",
}),
},
}},
},
"error on exec": {
mock: &mockLocalListenersExec{err: true},
wantDoneBeforeCancel: true,
wantTargetGroups: nil,
},
"invalid data": {
mock: &mockLocalListenersExec{invalidResponse: true},
wantDoneBeforeCancel: true,
wantTargetGroups: nil,
"remove listeners; expired": {
listenersCli: func(cli listenersCli, interval, expiry time.Duration) {
cli.addListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
cli.addListener("TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
cli.addListener("TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
cli.addListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1")
time.Sleep(interval * 2)
cli.removeListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
cli.removeListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1")
time.Sleep(expiry * 2)
},
wantGroups: []model.TargetGroup{&targetGroup{
provider: "sd:net_listeners",
source: "discoverer=net_listeners,host=localhost",
targets: []model.Target{
withHash(&target{
Protocol: "TCP6",
Address: "::1",
Port: "8125",
Comm: "netdata",
Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D",
}),
withHash(&target{
Protocol: "TCP",
Address: "127.0.0.1",
Port: "8125",
Comm: "netdata",
Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D",
}),
},
}},
},
}

Expand All @@ -88,26 +143,7 @@ func TestDiscoverer_Discover(t *testing.T) {

func withHash(l *target) *target {
l.hash, _ = calcHash(l)
tags, _ := model.ParseTags("hostnetsocket")
tags, _ := model.ParseTags("netlisteners")
l.Tags().Merge(tags)
return l
}

type mockLocalListenersExec struct {
err bool
emptyResponse bool
invalidResponse bool
}

func (m *mockLocalListenersExec) discover(context.Context) ([]byte, error) {
if m.err {
return nil, errors.New("mock discover() error")
}
if m.emptyResponse {
return nil, nil
}
if m.invalidResponse {
return []byte("this is very incorrect data"), nil
}
return localListenersOutputSample, nil
}
Loading

0 comments on commit ddb6190

Please sign in to comment.