Skip to content

Commit

Permalink
Merge pull request #3298 from weaveworks/conntrack-netlink-upstream
Browse files Browse the repository at this point in the history
Probe: use netlink to talk to conntrack
  • Loading branch information
bboreham authored Nov 20, 2018
2 parents 2e7dff5 + c732fee commit 476ef27
Show file tree
Hide file tree
Showing 21 changed files with 1,193 additions and 741 deletions.
52 changes: 22 additions & 30 deletions probe/endpoint/connection_tracker.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,21 @@
// +build linux

package endpoint

import (
"strconv"
"time"

log "github.com/sirupsen/logrus"
"github.com/typetypetype/conntrack"

"github.com/weaveworks/scope/probe/endpoint/procspy"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/scope/report"
)

// connectionTrackerConfig are the config options for the endpoint tracker.
type connectionTrackerConfig struct {
HostID string
HostName string
SpyProcs bool
UseConntrack bool
WalkProc bool
UseEbpfConn bool
ProcRoot string
BufferSize int
ProcessCache *process.CachingWalker
Scanner procspy.ConnectionScanner
DNSSnooper *DNSSnooper
}

type connectionTracker struct {
conf connectionTrackerConfig
conf ReporterConfig
flowWalker flowWalker // Interface
ebpfTracker *EbpfTracker
reverseResolver *reverseResolver
Expand All @@ -35,7 +24,7 @@ type connectionTracker struct {
ebpfLastFailureTime time.Time
}

func newConnectionTracker(conf connectionTrackerConfig) connectionTracker {
func newConnectionTracker(conf ReporterConfig) connectionTracker {
ct := connectionTracker{
conf: conf,
reverseResolver: newReverseResolver(),
Expand All @@ -53,20 +42,20 @@ func newConnectionTracker(conf connectionTrackerConfig) connectionTracker {
return ct
}

func flowToTuple(f flow) (ft fourTuple) {
func flowToTuple(f conntrack.Conn) (ft fourTuple) {
ft = fourTuple{
f.Original.Layer3.SrcIP,
f.Original.Layer3.DstIP,
uint16(f.Original.Layer4.SrcPort),
uint16(f.Original.Layer4.DstPort),
f.Orig.Src.String(),
f.Orig.Dst.String(),
uint16(f.Orig.SrcPort),
uint16(f.Orig.DstPort),
}
// Handle DNAT-ed connections in the initial state
if f.Original.Layer3.DstIP != f.Reply.Layer3.SrcIP {
if !f.Orig.Dst.Equal(f.Reply.Src) {
ft = fourTuple{
f.Reply.Layer3.DstIP,
f.Reply.Layer3.SrcIP,
uint16(f.Reply.Layer4.DstPort),
uint16(f.Reply.Layer4.SrcPort),
f.Reply.Dst.String(),
f.Reply.Src.String(),
uint16(f.Reply.DstPort),
uint16(f.Reply.SrcPort),
}
}
return ft
Expand All @@ -78,7 +67,7 @@ func (t *connectionTracker) useProcfs() {
t.conf.Scanner = procspy.NewConnectionScanner(t.conf.ProcessCache, t.conf.SpyProcs)
}
if t.flowWalker == nil {
t.flowWalker = newConntrackFlowWalker(t.conf.UseConntrack, t.conf.ProcRoot, t.conf.BufferSize)
t.flowWalker = newConntrackFlowWalker(t.conf.UseConntrack, t.conf.ProcRoot, t.conf.BufferSize, false /* natOnly */)
}
}

Expand Down Expand Up @@ -118,7 +107,7 @@ func (t *connectionTracker) ReportConnections(rpt *report.Report) {

// consult the flowWalker for short-lived (conntracked) connections
seenTuples := map[string]fourTuple{}
t.flowWalker.walkFlows(func(f flow, alive bool) {
t.flowWalker.walkFlows(func(f conntrack.Conn, alive bool) {
tuple := flowToTuple(f)
seenTuples[tuple.key()] = tuple
t.addConnection(rpt, false, tuple, "", nil, nil)
Expand All @@ -135,10 +124,13 @@ func (t *connectionTracker) existingFlows() map[string]fourTuple {
// log.Warnf("Not using conntrack: disabled")
} else if err := IsConntrackSupported(t.conf.ProcRoot); err != nil {
log.Warnf("Not using conntrack: not supported by the kernel: %s", err)
} else if existingFlows, err := existingConnections([]string{"--any-nat"}); err != nil {
} else if existingFlows, err := conntrack.ConnectionsSize(t.conf.BufferSize); err != nil {
log.Errorf("conntrack existingConnections error: %v", err)
} else {
for _, f := range existingFlows {
if (f.Status & conntrack.IPS_NAT_MASK) == 0 {
continue
}
tuple := flowToTuple(f)
seenTuples[tuple.key()] = tuple
}
Expand Down
Loading

0 comments on commit 476ef27

Please sign in to comment.