Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-200: Basic flows' agent architecture #2

Merged
merged 2 commits into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ generate: prereqs
go generate ./pkg/...

.PHONY: build
build: prereqs fmt lint
@echo "### Building project"
build: prereqs fmt lint test compile

.PHONY: compile
compile:
@echo "### Compiling project"
GOOS=$(GOOS) go build -ldflags "-X main.version=${VERSION}" -mod vendor -a -o bin/netobserv-agent cmd/netobserv-agent.go

.PHONY: test
Expand Down
60 changes: 27 additions & 33 deletions cmd/netobserv-agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,55 +3,49 @@
package main

import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"sort"
"syscall"
"time"

"github.com/netobserv/netobserv-agent/pkg/connect"
"github.com/netobserv/netobserv-agent/pkg/agent"
"github.com/sirupsen/logrus"
)

var (
interfaceName = flag.String("iface", "eth0", "interface to attach to")
reportFreq = flag.Duration("freq", 5*time.Second, "frequency of on-screen reporting")
// TODO: make configurable. NETOBSERV-201
const (
maxStoredFlowEntries = 1000
maxFlowEvictionPeriod = 1 * time.Second
communicationBufferLength = 20
)

func main() {
flag.Parse()

monitor := connect.NewMonitor(*interfaceName)
if err := monitor.Start(); err != nil {
log.Fatalf("starting monitor: %s", err)
logrus.SetLevel(logrus.DebugLevel)

flowsAgent, err := agent.FlowsAgent(agent.Config{
ExcludeIfaces: []string{"lo"},
BuffersLen: communicationBufferLength,
CacheMaxFlows: maxStoredFlowEntries,
CacheActiveTimeout: maxFlowEvictionPeriod,
})
if err != nil {
logrus.WithError(err).Fatal("can't instantiate netobserv-agent")
}

logrus.Infof("push CTRL+C or send SIGTERM to interrupt execution")
ctx, canceler := context.WithCancel(context.Background())
// Subscribe to signals for terminating the program.
go func() {
for {
time.Sleep(*reportFreq)
fmt.Println("PROTOCOL SOURCE DESTINATION PACKETS BYTES")
stats := monitor.Stats()
sort.SliceStable(stats, func(i, j int) bool {
return stats[i].Bytes > stats[j].Bytes
})
for _, egress := range stats {
fmt.Printf("%-8s %-21s %-21s %-7d %-7s\n",
egress.Protocol,
fmt.Sprintf("%s:%d", egress.SrcIP, egress.SrcPort),
fmt.Sprintf("%s:%d", egress.DstIP, egress.DstPort),
egress.Packets, egress.Bytes)
}
}
stopper := make(chan os.Signal, 1)
signal.Notify(stopper, os.Interrupt, syscall.SIGTERM)
<-stopper
canceler()
}()

// Subscribe to signals for terminating the program.
stopper := make(chan os.Signal, 1)
signal.Notify(stopper, os.Interrupt, syscall.SIGTERM)
<-stopper
log.Println("stopping server and closing resources")
if err := monitor.Stop(); err != nil {
log.Printf("error stopping server: %s", err)
if err := flowsAgent.Run(ctx); err != nil {
logrus.WithError(err).Fatal("can't start netobserv-agent")
}
}
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@ go 1.17

require (
github.com/cilium/ebpf v0.8.1
github.com/netobserv/gopipes v0.1.1
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
github.com/vishvananda/netlink v1.1.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df // indirect
golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
18 changes: 18 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
github.com/cilium/ebpf v0.8.1 h1:bLSSEbBLqGPXxls55pGr5qWZaTqcmfDJHhou7t254ao=
github.com/cilium/ebpf v0.8.1/go.mod h1:f5zLIM0FSNuAkSyLAN7X+Hy6yznlF1mNiWUMfxMtrgk=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss=
github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
Expand All @@ -12,16 +15,31 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/netobserv/gopipes v0.1.1 h1:f8zJsvnMgRFRa2B+1siwRtW0Y4dqeBROmkcI/HgT1gE=
github.com/netobserv/gopipes v0.1.1/go.mod h1:eGoHZW1ON8Dx/zmDXUhsbVNqatPjtpdO0UZBmGZGmVI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/vishvananda/netlink v1.1.0 h1:1iyaYNBLmP6L0220aDnYQpo1QEV4t4hJ+xEEhhJH8j0=
github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
golang.org/x/sys v0.0.0-20190606203320-7fc4e5ec1444/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34 h1:GkvMjFtXUmahfDtashnc1mnrCtuBVcwse5QV2lUk/tI=
golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
111 changes: 111 additions & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package agent

import (
"context"
"fmt"

"github.com/netobserv/gopipes/pkg/node"
"github.com/netobserv/netobserv-agent/pkg/ebpf"
"github.com/netobserv/netobserv-agent/pkg/flow"
"github.com/sirupsen/logrus"
)

var alog = logrus.WithField("component", "agent.Flows")

// Flows reporting agent
type Flows struct {
tracers map[string]flowTracer
accounter flowAccounter
exporter flowExporter
}

type flowTracer interface {
Trace(ctx context.Context, forwardFlows chan<- *flow.Record)
Register() error
Unregister() error
}

type flowAccounter interface {
Account(in <-chan *flow.Record, out chan<- *flow.Record)
}

type flowExporter func(in <-chan *flow.Record)

// FlowsAgent instantiates a new agent, given a configuration.
func FlowsAgent(cfg Config) (*Flows, error) {
alog.Info("initializing Flows agent")
interfaces, err := getInterfaces(&cfg)
if err != nil {
return nil, err
}
tracers := map[string]flowTracer{}
for iface := range interfaces {
tracers[iface] = ebpf.NewFlowTracer(iface)
}
return &Flows{
tracers: tracers,
accounter: flow.NewAccounter(cfg.CacheMaxFlows, cfg.BuffersLen, cfg.CacheActiveTimeout),
// For now, just print flows. TODO: NETOBSERV-202
exporter: func(in <-chan *flow.Record) {
for record := range in {
fmt.Printf("%-8s %-21s %-21s %-7d %-7s\n",
record.Protocol,
fmt.Sprintf("%s:%d", record.SrcIP, record.SrcPort),
fmt.Sprintf("%s:%d", record.DstIP, record.DstPort),
record.Packets, record.Bytes)
}
},
}, nil
}

// Run a Flows agent. The function will keep running in the same thread
// until the passed context is canceled
func (f *Flows) Run(ctx context.Context) error {
alog.Info("starting Flows agent")
alog.Debug("registering flow tracers")
var tracers []*node.Init
for i, t := range f.tracers {
// make sure the background/deferred functions use this loop's values
iface, tracer := i, t
tlog := alog.WithField("iface", iface)
tlog.Debug("registering flow tracer")
if err := tracer.Register(); err != nil {
return err
}
defer func() {
if err := tracer.Unregister(); err != nil {
tlog.WithError(err).Warn("error unregistering flow tracer")
}
}()
tracers = append(tracers,
node.AsInit(func(out chan<- *flow.Record) {
tracer.Trace(ctx, out)
tlog.Debug("tracer routine ended")
}))
}
alog.Debug("registering accounter")
accounter := node.AsMiddle(f.accounter.Account)
alog.Debug("registering exporter")
exporter := node.AsTerminal(f.exporter)

alog.Debug("connecting graph")
for _, t := range tracers {
t.SendsTo(accounter)
}
accounter.SendsTo(exporter)

alog.Debug("starting graph")
for _, t := range tracers {
t.Start()
}

alog.Info("Flows agent successfully started")
<-ctx.Done()
alog.Info("stopping Flows agent")

alog.Debug("waiting for all nodes to finish their pending work")
<-exporter.Done()

alog.Info("Flows agent stopped")
return nil
}
50 changes: 50 additions & 0 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package agent

import (
"net"
"time"
)

// TODO: NETOBSERV-201: fill from CLI and env
type Config struct {
// Ifaces contains the interface names where flow traces will be attached. If empty, the agent
// will fetch all the interfaces in the system, excepting the ones listed in ExcludeIfaces
Ifaces []string
// ExcludeIfaces contains the interface names that will be excluded from flow tracing. Default:
// "lo" (loopback)
ExcludeIfaces []string
// BuffersLen establishes the length of communication channels between the different processing
// stages
BuffersLen int
// CacheMaxFlows specifies how many flows can be accumulated in the accounting cache before
// being flushing the cache for its later export
CacheMaxFlows int
// CacheActiveTimeout specifies the maximum duration in which a flow is kept in the accounting
// cache before being flushed for its later export
CacheActiveTimeout time.Duration
// Verbose mode
Verbose bool
}

func getInterfaces(cfg *Config) (map[string]struct{}, error) {
// get interfaces from configuration or acquire them from the system
ifaces := map[string]struct{}{}
if cfg.Ifaces != nil {
for _, iface := range cfg.Ifaces {
ifaces[iface] = struct{}{}
}
return ifaces, nil
}
nifaces, err := net.Interfaces()
if err != nil {
return nil, err
}
for _, iface := range nifaces {
ifaces[iface.Name] = struct{}{}
}
// exclude interfaces
for _, iface := range cfg.ExcludeIfaces {
delete(ifaces, iface)
}
return ifaces, nil
}
4 changes: 3 additions & 1 deletion pkg/connect/bpf_bpfeb.go → pkg/ebpf/bpf_bpfeb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

File renamed without changes.
2 changes: 1 addition & 1 deletion pkg/connect/bpf_bpfel.go → pkg/ebpf/bpf_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

File renamed without changes.
Loading