Skip to content

Commit

Permalink
Review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Alfonso Acosta committed Feb 8, 2016
1 parent 3dd2d45 commit 5939ac3
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 93 deletions.
158 changes: 77 additions & 81 deletions probe/endpoint/procspy/background_reader_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package procspy

import (
"bytes"
"fmt"
"io"
"math"
"sync"
Expand All @@ -22,112 +21,109 @@ const (
)

type backgroundReader struct {
walker process.Walker
mtx sync.Mutex
running bool
pleaseStop bool
walkingBuf *bytes.Buffer
readyBuf *bytes.Buffer
readySockets map[uint64]*Proc
stopc chan struct{}
mtx sync.Mutex
latestBuf *bytes.Buffer
latestSockets map[uint64]*Proc
}

// starts a rate-limited background goroutine to read the expensive files from
// proc.
func newBackgroundReader(walker process.Walker) *backgroundReader {
br := &backgroundReader{
walker: walker,
walkingBuf: bytes.NewBuffer(make([]byte, 0, 5000)),
readyBuf: bytes.NewBuffer(make([]byte, 0, 5000)),
latestBuf: bytes.NewBuffer(make([]byte, 0, 5000)),
stopc: make(chan struct{}),
}
go br.loop(walker)
return br
}

// starts a rate-limited background goroutine to read the expensive files from
// proc.
func (br *backgroundReader) start() error {
br.mtx.Lock()
defer br.mtx.Unlock()
if br.running {
return fmt.Errorf("background reader already running")
}
br.running = true
go br.loop()
return nil
func (br *backgroundReader) stop() {
close(br.stopc)
}

func (br *backgroundReader) stop() error {
func (br *backgroundReader) getWalkedProcPid(buf *bytes.Buffer) (map[uint64]*Proc, error) {
br.mtx.Lock()
defer br.mtx.Unlock()
if !br.running {
return fmt.Errorf("background reader already not running")
}
br.pleaseStop = true
return nil

_, err := io.Copy(buf, br.latestBuf)

return br.latestSockets, err
}

func (br *backgroundReader) loop() {
const (
maxRateLimitPeriodF = float64(maxRateLimitPeriod)
targetWalkTimeF = float64(targetWalkTime)
func (br *backgroundReader) loop(walker process.Walker) {
var (
begin time.Time // when we started the last performWalk
tickc = time.After(time.Millisecond) // fire immediately
walkc chan map[uint64]*Proc // initially nil, i.e. off
walkBuf = bytes.NewBuffer(make([]byte, 0, 5000))
rateLimitPeriod = initialRateLimitPeriod
ticker = time.NewTicker(rateLimitPeriod)
pWalker = newPidWalker(walker, ticker.C, fdBlockSize)
)

rateLimitPeriod := initialRateLimitPeriod
ticker := time.NewTicker(rateLimitPeriod)
for {
start := time.Now()
sockets, err := walkProcPid(br.walkingBuf, br.walker, ticker.C, fdBlockSize)
if err != nil {
log.Errorf("background /proc reader: error walking /proc: %s", err)
continue
}

br.mtx.Lock()

// Should we stop?
if br.pleaseStop {
br.pleaseStop = false
br.running = false
ticker.Stop()
select {
case <-tickc:
tickc = nil // turn off until the next loop
walkc = make(chan map[uint64]*Proc, 1) // turn on (need buffered so we don't leak performWalk)
begin = time.Now() // reset counter
go performWalk(pWalker, walkBuf, walkc) // do work

case sockets := <-walkc:
// Swap buffers
br.mtx.Lock()
br.latestBuf, walkBuf = walkBuf, br.latestBuf
br.latestSockets = sockets
br.mtx.Unlock()
return
}

// Swap buffers
br.readyBuf, br.walkingBuf = br.walkingBuf, br.readyBuf
br.readySockets = sockets
walkBuf.Reset()

br.mtx.Unlock()
// Schedule next walk and adjust rate limit
walkTime := time.Since(begin)
rateLimitPeriod, nextInterval := scheduleNextWalk(rateLimitPeriod, walkTime)
ticker.Stop()
ticker := time.NewTicker(rateLimitPeriod)
pWalker.ticker = ticker.C

walkTime := time.Now().Sub(start)
walkTimeF := float64(walkTime)
walkc = nil // turn off until the next loop
tickc = time.After(nextInterval) // turn on

log.Debugf("background /proc reader: full pass took %s", walkTime)
if walkTimeF/targetWalkTimeF > 1.5 {
log.Warnf(
"background /proc reader: full pass took %s: 50%% more than expected (%s)",
walkTime,
targetWalkTime,
)
case <-br.stopc:
pWalker.stop()
ticker.Stop()
return // abort
}
}
}

// Adjust rate limit to more-accurately meet the target walk time in next iteration
scaledRateLimitPeriod := targetWalkTimeF / walkTimeF * float64(rateLimitPeriod)
rateLimitPeriod = time.Duration(math.Min(scaledRateLimitPeriod, maxRateLimitPeriodF))
log.Debugf("background /proc reader: new rate limit %s", rateLimitPeriod)

ticker.Stop()
ticker = time.NewTicker(rateLimitPeriod)

br.walkingBuf.Reset()
// Adjust rate limit for next walk and calculate how long to wait until it should be started
func scheduleNextWalk(rateLimitPeriod time.Duration, took time.Duration) (time.Duration, time.Duration) {

// Sleep during spare time
time.Sleep(targetWalkTime - walkTime)
log.Debugf("background /proc reader: full pass took %s", took)
if float64(took)/float64(targetWalkTime) > 1.5 {
log.Warnf(
"background /proc reader: full pass took %s: 50%% more than expected (%s)",
took,
targetWalkTime,
)
}
}

func (br *backgroundReader) getWalkedProcPid(buf *bytes.Buffer) (map[uint64]*Proc, error) {
br.mtx.Lock()
defer br.mtx.Unlock()
// Adjust rate limit to more-accurately meet the target walk time in next iteration
scaledRateLimitPeriod := float64(targetWalkTime) / float64(took) * float64(rateLimitPeriod)
rateLimitPeriod = time.Duration(math.Min(scaledRateLimitPeriod, float64(maxRateLimitPeriod)))

_, err := io.Copy(buf, br.readyBuf)
log.Debugf("background /proc reader: new rate limit %s", rateLimitPeriod)

return br.readySockets, err
return rateLimitPeriod, targetWalkTime - took
}

func performWalk(w pidWalker, buf *bytes.Buffer, c chan<- map[uint64]*Proc) {
sockets, err := w.walk(buf)
if err != nil {
log.Errorf("background /proc reader: error walking /proc: %s", err)
buf.Reset()
c <- nil
return
}
c <- sockets
}
4 changes: 2 additions & 2 deletions probe/endpoint/procspy/proc_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func TestWalkProcPid(t *testing.T) {
walker := process.NewWalker(procRoot)
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()
fdBlockSize := uint64(1)
have, err := walkProcPid(&buf, walker, ticker.C, fdBlockSize)
pWalker := newPidWalker(walker, ticker.C, 1)
have, err := pWalker.walk(&buf)
if err != nil {
t.Fatal(err)
}
Expand Down
44 changes: 35 additions & 9 deletions probe/endpoint/procspy/proc_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,23 @@ var (
netNamespacePathSuffix = ""
)

type pidWalker struct {
walker process.Walker
ticker <-chan time.Time
stopc chan struct{}
fdBlockSize uint64
}

func newPidWalker(walker process.Walker, ticker <-chan time.Time, fdBlockSize uint64) pidWalker {
w := pidWalker{
walker: walker,
ticker: ticker,
fdBlockSize: fdBlockSize,
stopc: make(chan struct{}),
}
return w
}

// SetProcRoot sets the location of the proc filesystem.
func SetProcRoot(root string) {
procRoot = root
Expand Down Expand Up @@ -105,8 +122,8 @@ func readProcessConnections(buf *bytes.Buffer, namespaceProcs []*process.Process

}

// walkNamespacePid does the work of walkProcPid for a single namespace
func walkNamespacePid(buf *bytes.Buffer, sockets map[uint64]*Proc, namespaceProcs []*process.Process, ticker <-chan time.Time, fdBlockSize uint64) error {
// walkNamespace does the work of walk for a single namespace
func (w pidWalker) walkNamespace(buf *bytes.Buffer, sockets map[uint64]*Proc, namespaceProcs []*process.Process) error {

if found, err := readProcessConnections(buf, namespaceProcs); err != nil || !found {
return err
Expand All @@ -120,9 +137,10 @@ func walkNamespacePid(buf *bytes.Buffer, sockets map[uint64]*Proc, namespaceProc
dirName := strconv.Itoa(p.PID)
fdBase := filepath.Join(procRoot, dirName, "fd")

if fdBlockCount > fdBlockSize {
if fdBlockCount > w.fdBlockSize {
// we surpassed the filedescriptor rate limit
<-ticker
// TODO: worth selecting on w.stopc?
<-w.ticker
fdBlockCount = 0

// read the connections again to
Expand Down Expand Up @@ -170,11 +188,11 @@ func walkNamespacePid(buf *bytes.Buffer, sockets map[uint64]*Proc, namespaceProc
return nil
}

// walkProcPid walks over all numerical (PID) /proc entries. It reads
// walk walks over all numerical (PID) /proc entries. It reads
// /proc/PID/net/tcp{,6} for each namespace and sees if the ./fd/* files of each
// process in that namespace are symlinks to sockets. Returns a map from socket
// ID (inode) to PID.
func walkProcPid(buf *bytes.Buffer, walker process.Walker, ticker <-chan time.Time, fdBlockSize uint64) (map[uint64]*Proc, error) {
func (w pidWalker) walk(buf *bytes.Buffer) (map[uint64]*Proc, error) {
var (
sockets = map[uint64]*Proc{} // map socket inode -> process
namespaces = map[uint64][]*process.Process{} // map network namespace id -> processes
Expand All @@ -189,7 +207,7 @@ func walkProcPid(buf *bytes.Buffer, walker process.Walker, ticker <-chan time.Ti
// between reading /net/tcp{,6} of each namespace and /proc/PID/fd/* for
// the processes living in that namespace.

walker.Walk(func(p, _ process.Process) {
w.walker.Walk(func(p, _ process.Process) {
dirName := strconv.Itoa(p.PID)

netNamespacePath := filepath.Join(procRoot, dirName, getNetNamespacePathSuffix())
Expand All @@ -202,14 +220,22 @@ func walkProcPid(buf *bytes.Buffer, walker process.Walker, ticker <-chan time.Ti
})

for _, procs := range namespaces {
<-ticker
walkNamespacePid(buf, sockets, procs, ticker, fdBlockSize)
select {
case <-w.ticker:
w.walkNamespace(buf, sockets, procs)
case <-w.stopc:
break
}
}

metrics.SetGauge(namespaceKey, float32(len(namespaces)))
return sockets, nil
}

func (w pidWalker) stop() {
close(w.stopc)
}

// readFile reads an arbitrary file into a buffer.
func readFile(filename string, buf *bytes.Buffer) (int64, error) {
f, err := fs.Open(filename)
Expand Down
1 change: 0 additions & 1 deletion probe/endpoint/procspy/spy_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func (c *pnConnIter) Next() *Connection {
// NewConnectionScanner creates a new Linux ConnectionScanner
func NewConnectionScanner(walker process.Walker) ConnectionScanner {
br := newBackgroundReader(walker)
br.start()
return &linuxScanner{br}
}

Expand Down

0 comments on commit 5939ac3

Please sign in to comment.