Skip to content

Commit

Permalink
[WIP] pillar/watcher: Add goroutine leak detection.
Browse files Browse the repository at this point in the history
Introduce goroutine leak detection functionality that monitors the
number of goroutines over a 24-hour period to identify potential leaks.
The detection algorithm employs a moving average smoothing technique and
linear regression to analyze trends. If an upward trend is statistically
significant, it flags the anomaly. This approach helps identify subtle
issues in goroutine management, leveraging a t-statistic for confidence
testing.

Signed-off-by: Nikolay Martyanov <nikolay@zededa.com>
  • Loading branch information
OhmSpectator committed Nov 1, 2024
1 parent a3fbdd9 commit f9cd1d6
Showing 1 changed file with 99 additions and 0 deletions.
99 changes: 99 additions & 0 deletions pkg/pillar/cmd/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package watcher

import (
"flag"
"math"
"os"
"runtime"
"sync"
Expand Down Expand Up @@ -156,6 +157,101 @@ func handleMemoryPressureEvents() {
}
}

// Function to calculate moving average
func movingAverage(data []int, windowSize int) []float64 {
var ma []float64
for i := 0; i <= len(data)-windowSize; i++ {
sum := 0.0
for j := i; j < i+windowSize; j++ {
sum += float64(data[j])
}
ma = append(ma, sum/float64(windowSize))
}
return ma
}

// Function to perform linear regression
func linearRegression(x, y []float64) (slope, intercept, rValue float64) {
n := float64(len(x))
var sumX, sumY, sumXY, sumX2, sumY2 float64
for i := 0; i < len(x); i++ {
sumX += x[i]
sumY += y[i]
sumXY += x[i] * y[i]
sumX2 += x[i] * x[i]
sumY2 += y[i] * y[i]
}
slope = (n*sumXY - sumX*sumY) / (n*sumX2 - sumX*sumX)
intercept = (sumY - slope*sumX) / n
rNumerator := n*sumXY - sumX*sumY
rDenominator := math.Sqrt((n*sumX2 - sumX*sumX) * (n*sumY2 - sumY*sumY))
rValue = rNumerator / rDenominator
return
}

// Function to calculate t-statistic for the slope
func tStatistic(slope, stdErr float64, df int) float64 {
return slope / stdErr
}

// Detects goroutine leaks by analyzing the number of goroutines created
// in the last 24 hours. It uses linear regression to detect any significant
// upward trend in the number of goroutines.
func detectGoroutineLeaks(stats []int) bool {
// Step 1: Smooth the data
windowSize := 20 // Choose appropriate window size (let it be 20 minutes)
smoothedData := movingAverage(stats, windowSize)

// Step 2: Prepare data for regression
x := make([]float64, len(smoothedData))
for i := range x {
x[i] = float64(i)
}
y := smoothedData

// Step 3: Perform linear regression
slope, _, rValue := linearRegression(x, y)

// Step 4: Statistical significance testing
n := float64(len(x))
degreesOfFreedom := int(n - 2)
stdErr := math.Sqrt((1 - rValue*rValue) / (n - 2))
tStat := tStatistic(slope, stdErr, degreesOfFreedom)

// Critical t-value for 95% confidence
// For large n, t-critical ~1.96
tCritical := 1.96
if math.Abs(tStat) > tCritical {
log.Warnf("Significant upward trend detected in goroutine count.")
return true
}
return false
}

func handleGoroutineLeaks() {
// Create a map to keep track of the number of goroutines at a given time
// Limit it to 100 entries
const maxEntries = 24 * 60 // 24 hours
// Create a slice to keep track of the last 100 entries
stats := make([]int, 0, maxEntries)
for {
time.Sleep(1 * time.Minute)
numGoroutines := runtime.NumGoroutine()
stats = append(stats, numGoroutines)
if len(stats) > maxEntries {
stats = stats[1:]
}
// Check if there are any leaks
if len(stats) > 60 && detectGoroutineLeaks(stats) {
// Count the number of goroutines that were created in the last 1 hour
numGoroutinesHourAgo := stats[len(stats)-60]
leakCount := numGoroutines - numGoroutinesHourAgo
log.Warnf("Goroutine leak detected. Number of goroutines created in the last 1 hour: %d", leakCount)
// TODO, call function similar to dumpStacks() in agentlog.go
}
}
}

// Run :
func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, arguments []string, baseDir string) int {
logger = loggerArg
Expand Down Expand Up @@ -268,6 +364,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
// Handle memory pressure events by calling GC explicitly
go handleMemoryPressureEvents()

// Detect goroutine leaks
go handleGoroutineLeaks()

for {
select {
case change := <-subGlobalConfig.MsgChan():
Expand Down

0 comments on commit f9cd1d6

Please sign in to comment.