From 5939ac33621eefdaeb508c16358e2b052a718e31 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Mon, 8 Feb 2016 18:01:37 +0000 Subject: [PATCH] Review feedback --- .../procspy/background_reader_linux.go | 158 +++++++++--------- probe/endpoint/procspy/proc_internal_test.go | 4 +- probe/endpoint/procspy/proc_linux.go | 44 ++++- probe/endpoint/procspy/spy_linux.go | 1 - 4 files changed, 114 insertions(+), 93 deletions(-) diff --git a/probe/endpoint/procspy/background_reader_linux.go b/probe/endpoint/procspy/background_reader_linux.go index 7b28f78a91..a7225b3b64 100644 --- a/probe/endpoint/procspy/background_reader_linux.go +++ b/probe/endpoint/procspy/background_reader_linux.go @@ -2,7 +2,6 @@ package procspy import ( "bytes" - "fmt" "io" "math" "sync" @@ -22,112 +21,109 @@ const ( ) type backgroundReader struct { - walker process.Walker - mtx sync.Mutex - running bool - pleaseStop bool - walkingBuf *bytes.Buffer - readyBuf *bytes.Buffer - readySockets map[uint64]*Proc + stopc chan struct{} + mtx sync.Mutex + latestBuf *bytes.Buffer + latestSockets map[uint64]*Proc } +// starts a rate-limited background goroutine to read the expensive files from +// proc. func newBackgroundReader(walker process.Walker) *backgroundReader { br := &backgroundReader{ - walker: walker, - walkingBuf: bytes.NewBuffer(make([]byte, 0, 5000)), - readyBuf: bytes.NewBuffer(make([]byte, 0, 5000)), + latestBuf: bytes.NewBuffer(make([]byte, 0, 5000)), + stopc: make(chan struct{}), } + go br.loop(walker) return br } -// starts a rate-limited background goroutine to read the expensive files from -// proc. -func (br *backgroundReader) start() error { - br.mtx.Lock() - defer br.mtx.Unlock() - if br.running { - return fmt.Errorf("background reader already running") - } - br.running = true - go br.loop() - return nil +func (br *backgroundReader) stop() { + close(br.stopc) } -func (br *backgroundReader) stop() error { +func (br *backgroundReader) getWalkedProcPid(buf *bytes.Buffer) (map[uint64]*Proc, error) { br.mtx.Lock() defer br.mtx.Unlock() - if !br.running { - return fmt.Errorf("background reader already not running") - } - br.pleaseStop = true - return nil + + _, err := io.Copy(buf, br.latestBuf) + + return br.latestSockets, err } -func (br *backgroundReader) loop() { - const ( - maxRateLimitPeriodF = float64(maxRateLimitPeriod) - targetWalkTimeF = float64(targetWalkTime) +func (br *backgroundReader) loop(walker process.Walker) { + var ( + begin time.Time // when we started the last performWalk + tickc = time.After(time.Millisecond) // fire immediately + walkc chan map[uint64]*Proc // initially nil, i.e. off + walkBuf = bytes.NewBuffer(make([]byte, 0, 5000)) + rateLimitPeriod = initialRateLimitPeriod + ticker = time.NewTicker(rateLimitPeriod) + pWalker = newPidWalker(walker, ticker.C, fdBlockSize) ) - rateLimitPeriod := initialRateLimitPeriod - ticker := time.NewTicker(rateLimitPeriod) for { - start := time.Now() - sockets, err := walkProcPid(br.walkingBuf, br.walker, ticker.C, fdBlockSize) - if err != nil { - log.Errorf("background /proc reader: error walking /proc: %s", err) - continue - } - - br.mtx.Lock() - - // Should we stop? - if br.pleaseStop { - br.pleaseStop = false - br.running = false - ticker.Stop() + select { + case <-tickc: + tickc = nil // turn off until the next loop + walkc = make(chan map[uint64]*Proc, 1) // turn on (need buffered so we don't leak performWalk) + begin = time.Now() // reset counter + go performWalk(pWalker, walkBuf, walkc) // do work + + case sockets := <-walkc: + // Swap buffers + br.mtx.Lock() + br.latestBuf, walkBuf = walkBuf, br.latestBuf + br.latestSockets = sockets br.mtx.Unlock() - return - } - - // Swap buffers - br.readyBuf, br.walkingBuf = br.walkingBuf, br.readyBuf - br.readySockets = sockets + walkBuf.Reset() - br.mtx.Unlock() + // Schedule next walk and adjust rate limit + walkTime := time.Since(begin) + rateLimitPeriod, nextInterval := scheduleNextWalk(rateLimitPeriod, walkTime) + ticker.Stop() + ticker := time.NewTicker(rateLimitPeriod) + pWalker.ticker = ticker.C - walkTime := time.Now().Sub(start) - walkTimeF := float64(walkTime) + walkc = nil // turn off until the next loop + tickc = time.After(nextInterval) // turn on - log.Debugf("background /proc reader: full pass took %s", walkTime) - if walkTimeF/targetWalkTimeF > 1.5 { - log.Warnf( - "background /proc reader: full pass took %s: 50%% more than expected (%s)", - walkTime, - targetWalkTime, - ) + case <-br.stopc: + pWalker.stop() + ticker.Stop() + return // abort } + } +} - // Adjust rate limit to more-accurately meet the target walk time in next iteration - scaledRateLimitPeriod := targetWalkTimeF / walkTimeF * float64(rateLimitPeriod) - rateLimitPeriod = time.Duration(math.Min(scaledRateLimitPeriod, maxRateLimitPeriodF)) - log.Debugf("background /proc reader: new rate limit %s", rateLimitPeriod) - - ticker.Stop() - ticker = time.NewTicker(rateLimitPeriod) - - br.walkingBuf.Reset() +// Adjust rate limit for next walk and calculate how long to wait until it should be started +func scheduleNextWalk(rateLimitPeriod time.Duration, took time.Duration) (time.Duration, time.Duration) { - // Sleep during spare time - time.Sleep(targetWalkTime - walkTime) + log.Debugf("background /proc reader: full pass took %s", took) + if float64(took)/float64(targetWalkTime) > 1.5 { + log.Warnf( + "background /proc reader: full pass took %s: 50%% more than expected (%s)", + took, + targetWalkTime, + ) } -} -func (br *backgroundReader) getWalkedProcPid(buf *bytes.Buffer) (map[uint64]*Proc, error) { - br.mtx.Lock() - defer br.mtx.Unlock() + // Adjust rate limit to more-accurately meet the target walk time in next iteration + scaledRateLimitPeriod := float64(targetWalkTime) / float64(took) * float64(rateLimitPeriod) + rateLimitPeriod = time.Duration(math.Min(scaledRateLimitPeriod, float64(maxRateLimitPeriod))) - _, err := io.Copy(buf, br.readyBuf) + log.Debugf("background /proc reader: new rate limit %s", rateLimitPeriod) - return br.readySockets, err + return rateLimitPeriod, targetWalkTime - took +} + +func performWalk(w pidWalker, buf *bytes.Buffer, c chan<- map[uint64]*Proc) { + sockets, err := w.walk(buf) + if err != nil { + log.Errorf("background /proc reader: error walking /proc: %s", err) + buf.Reset() + c <- nil + return + } + c <- sockets } diff --git a/probe/endpoint/procspy/proc_internal_test.go b/probe/endpoint/procspy/proc_internal_test.go index 362068c412..54cfa20f43 100644 --- a/probe/endpoint/procspy/proc_internal_test.go +++ b/probe/endpoint/procspy/proc_internal_test.go @@ -61,8 +61,8 @@ func TestWalkProcPid(t *testing.T) { walker := process.NewWalker(procRoot) ticker := time.NewTicker(time.Millisecond) defer ticker.Stop() - fdBlockSize := uint64(1) - have, err := walkProcPid(&buf, walker, ticker.C, fdBlockSize) + pWalker := newPidWalker(walker, ticker.C, 1) + have, err := pWalker.walk(&buf) if err != nil { t.Fatal(err) } diff --git a/probe/endpoint/procspy/proc_linux.go b/probe/endpoint/procspy/proc_linux.go index dec3055e63..e475aee188 100644 --- a/probe/endpoint/procspy/proc_linux.go +++ b/probe/endpoint/procspy/proc_linux.go @@ -24,6 +24,23 @@ var ( netNamespacePathSuffix = "" ) +type pidWalker struct { + walker process.Walker + ticker <-chan time.Time + stopc chan struct{} + fdBlockSize uint64 +} + +func newPidWalker(walker process.Walker, ticker <-chan time.Time, fdBlockSize uint64) pidWalker { + w := pidWalker{ + walker: walker, + ticker: ticker, + fdBlockSize: fdBlockSize, + stopc: make(chan struct{}), + } + return w +} + // SetProcRoot sets the location of the proc filesystem. func SetProcRoot(root string) { procRoot = root @@ -105,8 +122,8 @@ func readProcessConnections(buf *bytes.Buffer, namespaceProcs []*process.Process } -// walkNamespacePid does the work of walkProcPid for a single namespace -func walkNamespacePid(buf *bytes.Buffer, sockets map[uint64]*Proc, namespaceProcs []*process.Process, ticker <-chan time.Time, fdBlockSize uint64) error { +// walkNamespace does the work of walk for a single namespace +func (w pidWalker) walkNamespace(buf *bytes.Buffer, sockets map[uint64]*Proc, namespaceProcs []*process.Process) error { if found, err := readProcessConnections(buf, namespaceProcs); err != nil || !found { return err @@ -120,9 +137,10 @@ func walkNamespacePid(buf *bytes.Buffer, sockets map[uint64]*Proc, namespaceProc dirName := strconv.Itoa(p.PID) fdBase := filepath.Join(procRoot, dirName, "fd") - if fdBlockCount > fdBlockSize { + if fdBlockCount > w.fdBlockSize { // we surpassed the filedescriptor rate limit - <-ticker + // TODO: worth selecting on w.stopc? + <-w.ticker fdBlockCount = 0 // read the connections again to @@ -170,11 +188,11 @@ func walkNamespacePid(buf *bytes.Buffer, sockets map[uint64]*Proc, namespaceProc return nil } -// walkProcPid walks over all numerical (PID) /proc entries. It reads +// walk walks over all numerical (PID) /proc entries. It reads // /proc/PID/net/tcp{,6} for each namespace and sees if the ./fd/* files of each // process in that namespace are symlinks to sockets. Returns a map from socket // ID (inode) to PID. -func walkProcPid(buf *bytes.Buffer, walker process.Walker, ticker <-chan time.Time, fdBlockSize uint64) (map[uint64]*Proc, error) { +func (w pidWalker) walk(buf *bytes.Buffer) (map[uint64]*Proc, error) { var ( sockets = map[uint64]*Proc{} // map socket inode -> process namespaces = map[uint64][]*process.Process{} // map network namespace id -> processes @@ -189,7 +207,7 @@ func walkProcPid(buf *bytes.Buffer, walker process.Walker, ticker <-chan time.Ti // between reading /net/tcp{,6} of each namespace and /proc/PID/fd/* for // the processes living in that namespace. - walker.Walk(func(p, _ process.Process) { + w.walker.Walk(func(p, _ process.Process) { dirName := strconv.Itoa(p.PID) netNamespacePath := filepath.Join(procRoot, dirName, getNetNamespacePathSuffix()) @@ -202,14 +220,22 @@ func walkProcPid(buf *bytes.Buffer, walker process.Walker, ticker <-chan time.Ti }) for _, procs := range namespaces { - <-ticker - walkNamespacePid(buf, sockets, procs, ticker, fdBlockSize) + select { + case <-w.ticker: + w.walkNamespace(buf, sockets, procs) + case <-w.stopc: + break + } } metrics.SetGauge(namespaceKey, float32(len(namespaces))) return sockets, nil } +func (w pidWalker) stop() { + close(w.stopc) +} + // readFile reads an arbitrary file into a buffer. func readFile(filename string, buf *bytes.Buffer) (int64, error) { f, err := fs.Open(filename) diff --git a/probe/endpoint/procspy/spy_linux.go b/probe/endpoint/procspy/spy_linux.go index 9cd1e327ea..61f189881c 100644 --- a/probe/endpoint/procspy/spy_linux.go +++ b/probe/endpoint/procspy/spy_linux.go @@ -35,7 +35,6 @@ func (c *pnConnIter) Next() *Connection { // NewConnectionScanner creates a new Linux ConnectionScanner func NewConnectionScanner(walker process.Walker) ConnectionScanner { br := newBackgroundReader(walker) - br.start() return &linuxScanner{br} }