From 215cd989b74288fff12b359ef552607226040935 Mon Sep 17 00:00:00 2001 From: Nikolay Martyanov Date: Wed, 6 Nov 2024 13:46:25 +0100 Subject: [PATCH 1/9] pillar/agentlog: Ensure directory creation for goroutine stack dumps in DumpAllStacks. Modify DumpAllStacks to create the debug directory if it doesn't already exist, ensuring that stack dumps can always be written to the specified location. Signed-off-by: Nikolay Martyanov --- pkg/pillar/agentlog/agentlog.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/pillar/agentlog/agentlog.go b/pkg/pillar/agentlog/agentlog.go index 9adab1795a..b9f89ee721 100644 --- a/pkg/pillar/agentlog/agentlog.go +++ b/pkg/pillar/agentlog/agentlog.go @@ -196,6 +196,9 @@ func dumpStacks(log *base.LogObject, fileName string) { // DumpAllStacks writes to file but does not log func DumpAllStacks(log *base.LogObject, agentName string) { agentDebugDir := fmt.Sprintf("%s/%s/", types.PersistDebugDir, agentName) + // Create the directory if it does not exist + _ = os.MkdirAll(agentDebugDir, 0755) + sigUsr1FileName := agentDebugDir + "/sigusr1" stacks := getStacks(true) From ce5f46defab2ef67e28f2d74dc2ec36f22ef0b08 Mon Sep 17 00:00:00 2001 From: Nikolay Martyanov Date: Thu, 7 Nov 2024 14:57:22 +0100 Subject: [PATCH 2/9] pillar/docs: Add doc about watcher. The doc gives a short overview of the compoent and explains the manual garbage collection functionality. Signed-off-by: Nikolay Martyanov --- pkg/pillar/docs/watcher.md | 43 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 pkg/pillar/docs/watcher.md diff --git a/pkg/pillar/docs/watcher.md b/pkg/pillar/docs/watcher.md new file mode 100644 index 0000000000..353fd24a92 --- /dev/null +++ b/pkg/pillar/docs/watcher.md @@ -0,0 +1,43 @@ +# Watcher + +The **watcher** component is the part of our system responsible for +monitoring and managing system resources to ensure optimal performance and +stability. It oversees various aspects such as memory usage, disk space, and the +number of active goroutines. By keeping track of these metrics, the watcher can +detect anomalies, trigger garbage collection, and alert us to potential issues +like resource exhaustion or leaks. + +## Manual Garbage Collection + +The watcher component includes a mechanism to handle memory pressure events by +explicitly invoking the Go garbage collector. This functionality is crucial for +efficient memory management and helps prevent the system from running out of +memory. + +We monitor memory pressure events using the `handleMemoryPressureEvents` +function. This function listens for memory pressure notifications from the +system's cgroups at a medium pressure level. By doing so, we can respond +promptly when the system experiences memory constraints. + +When a memory pressure event is detected, the `handleMemoryPressureEvents` +function checks if certain conditions are met before triggering garbage +collection: + +- **Time Interval Check**: It ensures that a minimum time interval has passed + since the last garbage collection to avoid frequent invocations. This interval + is obtained from the `getForcedGOGCParams` function. +- **Memory Growth Check**: It verifies that the allocated memory has increased + significantly since the last garbage collection. This is determined by + comparing current memory usage with previous usage and considering + configurable growth parameters also retrieved by `getForcedGOGCParams`. + +If these conditions are satisfied, we explicitly invoke the garbage collector +using `runtime.GC()`. Before and after the garbage collection, we use +`runtime.ReadMemStats()` to record memory statistics. This allows us to +calculate the amount of memory reclaimed and set the threshold for the next +invocation. + +By adaptively triggering garbage collection based on actual memory pressure and +allocation patterns, we ensure efficient memory usage and maintain system +performance. This approach helps prevent potential memory-related issues by +proactively managing resources. From 1e5d2ecb49556a7def0e68d216da65220b6fd83b Mon Sep 17 00:00:00 2001 From: Nikolay Martyanov Date: Fri, 1 Nov 2024 18:15:47 +0100 Subject: [PATCH 3/9] pillar/watcher: Add goroutine leak detection. Introduce a detection system to identify potential goroutine leaks by combining a simple threshold check with trend-based monitoring. This approach periodically tracks the number of active goroutines, checking against a predefined maximum to catch immediate issues. Additionally, it applies a smoothing function to analyze longer-term trends within a sliding window. The detection logic includes: * A basic threshold check: If the goroutine count exceeds a set maximum, an alert is triggered. * A moving average to smooth fluctuations in goroutine counts over time. * Statistical analysis of the rate of change in goroutine counts, setting a dynamic threshold that adapts to typical variations. If the rate of increase consistently surpasses this threshold, it indicates a potential leak. Signed-off-by: Nikolay Martyanov --- pkg/pillar/cmd/watcher/watcher.go | 159 ++++++++++++++++++++++++++++++ 1 file changed, 159 insertions(+) diff --git a/pkg/pillar/cmd/watcher/watcher.go b/pkg/pillar/cmd/watcher/watcher.go index c1b5a64906..e9db68b136 100644 --- a/pkg/pillar/cmd/watcher/watcher.go +++ b/pkg/pillar/cmd/watcher/watcher.go @@ -5,6 +5,7 @@ package watcher import ( "flag" + "math" "os" "runtime" "sync" @@ -156,6 +157,148 @@ func handleMemoryPressureEvents() { } } +func movingAverage(data []int, windowSize int) []float64 { + // Validates the window size + if windowSize <= 0 { + windowSize = 1 // Do not smooth the data + } + + if windowSize > len(data) { + windowSize = len(data) + } + + if len(data) == 0 { + return nil + } + + smoothed := make([]float64, len(data)-windowSize+1) + var windowSum int + + // Calculates the sum of the first window + for i := 0; i < windowSize; i++ { + windowSum += data[i] + } + smoothed[0] = float64(windowSum) / float64(windowSize) + + // Slides the window through the data + for i := 1; i < len(smoothed); i++ { + windowSum = windowSum - data[i-1] + data[i+windowSize-1] + smoothed[i] = float64(windowSum) / float64(windowSize) + } + + return smoothed +} + +// calculateMeanStdDev calculates the mean and standard deviation of a slice of float64 numbers. +func calculateMeanStdDev(data []float64) (mean, stdDev float64) { + n := float64(len(data)) + var sum, sumSq float64 + for _, value := range data { + sum += value + sumSq += value * value + } + mean = sum / n + variance := (sumSq / n) - (mean * mean) + stdDev = math.Sqrt(variance) + return +} + +// detectGoroutineLeaks detects if there's a potential goroutine leak over time. +// Returns true if a leak is detected, false otherwise. +func detectGoroutineLeaks(stats []int) (bool, []float64) { + + if len(stats) < 10 { + // Not enough data to determine trend + return false, nil + } + + // The window size for the moving average + windowSize := len(stats) / 10 + + // Step 1: Smooth the data + smoothedData := movingAverage(stats, windowSize) + + if len(smoothedData) < 2 { + // Not enough data to determine trend + return false, smoothedData + } + + // Step 2: Calculate the rate of change + rateOfChange := make([]float64, len(smoothedData)-1) + for i := 1; i < len(smoothedData); i++ { + rateOfChange[i-1] = smoothedData[i] - smoothedData[i-1] + } + + // Step 3: Calculate mean and standard deviation of the rate of change + mean, stdDev := calculateMeanStdDev(rateOfChange) + + // Step 4: Determine the dynamic threshold + threshold := 0.0 + stdDev + + // Step 5: Check if the latest rate of change exceeds the threshold + latestChange := rateOfChange[len(rateOfChange)-1] + if mean > threshold && latestChange > threshold { + log.Warnf("Potential goroutine leak detected: latest increase of %.2f exceeds dynamic threshold of %.2f.", latestChange, threshold) + return true, smoothedData + } + return false, smoothedData +} + +func handlePotentialGoroutineLeak() { + // Dump the stack traces of all goroutines + agentlog.DumpAllStacks(log, agentName) +} + +func goroutinesMonitor(goroutinesThreshold int, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod time.Duration) { + entriesToKeep := int(keepStatsFor / checkInterval) + entriesToCheck := int(checkStatsFor / checkInterval) + stats := make([]int, 0, entriesToKeep+1) + var lastLeakHandled time.Time + for { + time.Sleep(checkInterval) + numGoroutines := runtime.NumGoroutine() + // First check for the threshold + if numGoroutines > goroutinesThreshold { + log.Warnf("Number of goroutines exceeds threshold: %d", numGoroutines) + if time.Since(lastLeakHandled) < cooldownPeriod { + // Skip if we've handled a leak recently + log.Warnf("Skipping stacks dumping due to cooldown period") + continue + } + handlePotentialGoroutineLeak() + lastLeakHandled = time.Now() + continue + } + stats = append(stats, numGoroutines) + // Keep the stats for the last keepStatsFor duration + if len(stats) > entriesToKeep { + stats = stats[1:] + } + + // If we have enough data, detect goroutine leaks + if len(stats) > entriesToCheck { + // Analyze the data for the last check window + entriesInLastCheckWindow := stats[len(stats)-entriesToCheck:] + leakDetected, _ := detectGoroutineLeaks(entriesInLastCheckWindow) + if leakDetected { + // Count the number of goroutines that were created in the last check window + numGoroutinesCheckWindowAgo := stats[len(stats)-entriesToCheck] + leakCount := numGoroutines - numGoroutinesCheckWindowAgo + minutesInCheckWindow := int(checkStatsFor.Minutes()) + log.Warnf("Potential goroutine leak! Created in the last %d minutes: %d, total: %d", + minutesInCheckWindow, leakCount, numGoroutines) + if time.Since(lastLeakHandled) < cooldownPeriod { + // Skip detailed handling if we've handled a leak recently + log.Warnf("Skipping stacks dumping due to cooldown period") + continue + } + handlePotentialGoroutineLeak() + lastLeakHandled = time.Now() + } + } + } +} + // Run : func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, arguments []string, baseDir string) int { logger = loggerArg @@ -268,6 +411,22 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar // Handle memory pressure events by calling GC explicitly go handleMemoryPressureEvents() + // Detect goroutine leaks + const ( + // Threshold for the number of goroutines + // When reached, we assume there's a potential goroutine leak + goroutinesThreshold = 1000 + // Check interval for the number of goroutines + checkInterval = 1 * time.Minute + // Check window. On each check, we analyze the stats for that period + checkWindow = 1 * time.Hour + // Keep statistic for that long + keepStatsFor = 24 * time.Hour + // Cooldown period for the leak detection + cooldownPeriod = 10 * time.Minute + ) + go goroutinesMonitor(goroutinesThreshold, checkInterval, checkWindow, keepStatsFor, cooldownPeriod) + for { select { case change := <-subGlobalConfig.MsgChan(): From 52b7aec4a948c89d6cc574fb4ee5d1ebc25afd1e Mon Sep 17 00:00:00 2001 From: Nikolay Martyanov Date: Tue, 5 Nov 2024 21:58:29 +0100 Subject: [PATCH 4/9] pillar/watcher: Add unit tests for goroutine leak detection. Introduce a suite of tests to verify the functionality of the goroutine leak detection system, simulating various scenarios to ensure robust leak identification. Growth scenarios: Test scenarios for initial goroutine growth with stabilization, sudden spikes, and decreases after spikes to validate that these typical patterns do not trigger false positives. Leak scenario: Include tests where the goroutine count gradually increases, simulating a leak. The test verifies that the detector correctly identifies this pattern as a leak. Monitoring tests: Implement tests for the `goroutinesMonitor` function to check detection behavior at regular intervals. Edge cases for the helper function: Add tests to handle empty data inputs, minimal data length, and cases where the moving average window exceeds data length, ensuring resilience against edge cases. Signed-off-by: Nikolay Martyanov --- pkg/pillar/cmd/watcher/watcher_test.go | 338 +++++++++++++++++++++++++ 1 file changed, 338 insertions(+) create mode 100644 pkg/pillar/cmd/watcher/watcher_test.go diff --git a/pkg/pillar/cmd/watcher/watcher_test.go b/pkg/pillar/cmd/watcher/watcher_test.go new file mode 100644 index 0000000000..4b24865d9b --- /dev/null +++ b/pkg/pillar/cmd/watcher/watcher_test.go @@ -0,0 +1,338 @@ +// Copyright (c) 2024 Zededa, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package watcher + +import ( + "github.com/lf-edge/eve/pkg/pillar/agentlog" + "io" + "math" + "os" + "reflect" + "strings" + "testing" + "time" +) + +// Scenario 1: Initial growth that slows down and then stabilizes +func emulateSystemStart(startDuration, stabilizationDuration int) []int { + totalDuration := startDuration + stabilizationDuration + data := make([]int, startDuration) + baseGoroutines := 0 + for i := 0; i < startDuration; i++ { + // Simulate fast growth that slows down + growth := int(500 * (1 - math.Exp(-float64(i)/10))) // Exponential decay + data[i] = baseGoroutines + growth + } + // Stabilize after growth + stableGoroutines := data[len(data)-1] + for i := startDuration; i < totalDuration; i++ { + data = append(data, stableGoroutines) + } + return data +} + +// Scenario 2: Stabilization, then a process creates a lot of goroutines quickly, which then stabilizes +func emulateSpikeAfterSystemStart(startDuration, stabilizationDuration, spikeDuration int) []int { + data := emulateSystemStart(startDuration, stabilizationDuration) + totalStartDuration := len(data) + baseGoroutines := data[len(data)-1] + for i := totalStartDuration; i < totalStartDuration+spikeDuration; i++ { + growth := int(100 * (1 - math.Exp(-float64(i-startDuration)/5))) // Quick spike + data = append(data, baseGoroutines+growth) + } + // Stabilize after spike + stableGoroutines := data[len(data)-1] + for i := totalStartDuration + spikeDuration; i < totalStartDuration+spikeDuration+stabilizationDuration; i++ { + data = append(data, stableGoroutines) + } + return data +} + +// Scenario 3: After the spike, goroutine count decreases +func emulateDecreaseAfterSpike(decreaseDuration, spikeDuration, stabilizationDuration, startDuration int) []int { + data := emulateSpikeAfterSystemStart(startDuration, stabilizationDuration, spikeDuration) + decreaseStart := len(data) + baseGoroutines := data[decreaseStart-1] + for i := 0; i < decreaseDuration; i++ { + decrease := int(float64(baseGoroutines) * (1 - float64(i)/float64(decreaseDuration))) + data = append(data, decrease) + } + return data +} + +// Scenario 4: After the spike, goroutine count starts to slowly increase over time +func emulateLeakAfterSpike(leakDuration, stabilizationDuration, spikeDuration, startDuration int) []int { + data := emulateSpikeAfterSystemStart(startDuration, stabilizationDuration, spikeDuration) + increaseStart := len(data) + baseGoroutines := data[increaseStart-1] + for i := increaseStart; i < increaseStart+leakDuration; i++ { + // Slow linear increase + growth := baseGoroutines + (i-increaseStart)*5 + data = append(data, growth) + } + return data +} + +func TestMain(m *testing.M) { + logger, log = agentlog.Init("watcher") + os.Exit(m.Run()) +} + +// Computes moving average correctly for valid data and window size +func TestMovingAverageValidData(t *testing.T) { + data := []int{1, 2, 3, 4, 5} + windowSize := 3 + expected := []float64{2.0, 3.0, 4.0} + + result := movingAverage(data, windowSize) + + if !reflect.DeepEqual(result, expected) { + t.Errorf("Expected %v, got %v", expected, result) + } +} + +// Handles window size equal to data length by returning a single average +func TestMovingAverageWindowSizeEqualDataLength(t *testing.T) { + data := []int{1, 2, 3, 4, 5} + windowSize := 5 + expected := []float64{3.0} + + result := movingAverage(data, windowSize) + + if !reflect.DeepEqual(result, expected) { + t.Errorf("Expected %v, got %v", expected, result) + } +} + +// Handles empty data array gracefully +func TestMovingAverageEmptyData(t *testing.T) { + data := []int{} + windowSize := 3 + var expected []float64 + expected = nil + + result := movingAverage(data, windowSize) + + if !reflect.DeepEqual(result, expected) { + t.Errorf("Expected %v (type %T), got %v (type %T)", expected, expected, result, result) + } +} + +// Manages window size of zero by defaulting to 1 +func TestMovingAverageWindowSizeZero(t *testing.T) { + data := []int{1, 2, 3, 4, 5} + windowSize := 0 + expected := []float64{1.0, 2.0, 3.0, 4.0, 5.0} + + result := movingAverage(data, windowSize) + + if !reflect.DeepEqual(result, expected) { + t.Errorf("Expected %v, got %v", expected, result) + } +} + +// Deals with window size larger than data length by defaulting to data length +func TestMovingAverageWindowSizeLargerThanDataLength(t *testing.T) { + data := []int{1, 2, 3} + windowSize := 5 + expected := []float64{2.0} + + result := movingAverage(data, windowSize) + + if !reflect.DeepEqual(result, expected) { + t.Errorf("Expected %v, got %v", expected, result) + } +} + +func TestGoroutineLeakDetectorWithSystemStart(t *testing.T) { + backupOut := logger.Out + logger.SetOutput(io.Discard) + defer logger.SetOutput(backupOut) + stats := emulateSystemStart(5, 5) + detected, _ := detectGoroutineLeaks(stats) + if detected { + t.Errorf("Expected no goroutine leak, but detected one") + } +} + +func TestGoroutineLeakDetectorWithLegitSpike(t *testing.T) { + backupOut := logger.Out + logger.SetOutput(io.Discard) + defer logger.SetOutput(backupOut) + stats := emulateSpikeAfterSystemStart(5, 5, 20) + detected, _ := detectGoroutineLeaks(stats) + if detected { + t.Errorf("Expected no goroutine leak, but detected one") + } +} + +func TestGoroutineLeakDetectorWithDecreaseAfterSpike(t *testing.T) { + backupOut := logger.Out + logger.SetOutput(io.Discard) + defer logger.SetOutput(backupOut) + stats := emulateDecreaseAfterSpike(10, 20, 5, 5) + detected, _ := detectGoroutineLeaks(stats) + if detected { + t.Errorf("Expected no goroutine leak, but detected one") + } +} + +func TestGoroutineLeakDetectorWithLeakAfterSpike(t *testing.T) { + backupOut := logger.Out + logger.SetOutput(io.Discard) + defer logger.SetOutput(backupOut) + stats := emulateLeakAfterSpike(100, 5, 20, 5) + detected, _ := detectGoroutineLeaks(stats) + if !detected { + t.Errorf("Expected goroutine leak to be detected, but it was not") + } +} + +func TestGoroutineLeakDetectorWithLeakEachStep(t *testing.T) { + backupOut := logger.Out + logger.SetOutput(io.Discard) + defer logger.SetOutput(backupOut) + startDuration := 5 + stabilizationDuration := 5 + spikeDuration := 20 + leakDuration := 100 + leakMayBeDetectedAfter := startDuration + stabilizationDuration + spikeDuration + stabilizationDuration + possibleFalsePositives := 60 + leakMustBeDetectedAfter := leakMayBeDetectedAfter + possibleFalsePositives + stats := emulateLeakAfterSpike(leakDuration, stabilizationDuration, spikeDuration, startDuration) + // Now check the behavior of detector on each new data point + for i := 0; i < len(stats); i++ { + detected, _ := detectGoroutineLeaks(stats[:i]) + // Leak should be detected after the slow increase starts + if detected && i < startDuration+stabilizationDuration+spikeDuration+stabilizationDuration { + t.Errorf("Expected no goroutine leak, but detected one at step %d", i) + } + if !detected && i >= leakMayBeDetectedAfter && i < leakMustBeDetectedAfter { + t.Logf("Expected goroutine leak to be detected, but it was not at step %d", i) + } + if !detected && i >= leakMustBeDetectedAfter { + t.Errorf("Expected goroutine leak to be detected, but it was not at step %d", i) + } + } +} + +// Handles empty input data gracefully +func TestEmptyInputData(t *testing.T) { + backupOut := logger.Out + logger.SetOutput(io.Discard) + defer logger.SetOutput(backupOut) + stats := []int{} + detected, smoothedData := detectGoroutineLeaks(stats) + if detected || smoothedData != nil { + t.Errorf("Expected no detection and nil smoothed data for empty input") + } +} + +// Handles input data with fewer than two elements +func TestInputDataWithFewerThanTwoElements(t *testing.T) { + backupOut := logger.Out + logger.SetOutput(io.Discard) + defer logger.SetOutput(backupOut) + stats := []int{5} + detected, smoothedData := detectGoroutineLeaks(stats) + if detected || len(smoothedData) != 0 { + t.Errorf("Expected no detection and empty smoothed data for input with fewer than two elements") + } +} + +// Handles window size larger than data length +func TestWindowSizeLargerThanDataLength(t *testing.T) { + stats := []int{1, 2} + windowSize := len(stats) + 1 + smoothedData := movingAverage(stats, windowSize) + if len(smoothedData) != 1 { + t.Errorf("Expected smoothed data length of 1 when window size is larger than data length") + } +} + +// Monitors goroutine count at regular intervals +func TestGoroutinesMonitorNoLeak(t *testing.T) { + keepStatsFor := 24 * 60 * time.Millisecond + goroutinesThreshold := 100 + checkInterval := 1 * time.Millisecond + checkStatsFor := 10 * time.Millisecond + cooldownPeriod := 5 * time.Millisecond + + backupOut := logger.Out + // Create a pipe to capture log output + r, w, _ := os.Pipe() + logger.SetOutput(w) + defer logger.SetOutput(backupOut) + + go func() { + goroutinesMonitor(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + }() + + timeStart := time.Now() + for { + if time.Since(timeStart) > 2*keepStatsFor { + break + } + // Create a goroutine + go func() { + time.Sleep(checkInterval / 2) + }() + time.Sleep(2 * checkInterval) + } + + // Close the pipe + w.Close() + + // Read the log output + output, _ := io.ReadAll(r) + + // Check if the log output does not contain the detection message + // If it does, it means that the goroutine leak was detected + if strings.Contains(string(output), "leak detected") { + t.Errorf("Expected no goroutine leak to be detected") + } + +} + +func TestGoroutinesMonitorLeak(t *testing.T) { + keepStatsFor := 24 * 60 * time.Millisecond + goroutinesThreshold := 100 + checkInterval := 1 * time.Millisecond + checkStatsFor := 10 * time.Millisecond + cooldownPeriod := 5 * time.Millisecond + + backupOut := logger.Out + // Create a pipe to capture log output + r, w, _ := os.Pipe() + logger.SetOutput(w) + defer logger.SetOutput(backupOut) + + go func() { + goroutinesMonitor(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + }() + + timeStart := time.Now() + for { + if time.Since(timeStart) > 2*keepStatsFor { + break + } + // Create a goroutine + go func() { + time.Sleep(checkInterval * 100) + }() + time.Sleep(checkInterval / 2) + } + + // Close the pipe + _ = w.Close() + + // Read the log output + output, _ := io.ReadAll(r) + + // Check if the log output contains the expected message + if !strings.Contains(string(output), "leak detected") { + t.Errorf("Expected log output to contain 'leak detected'") + } + +} From 61c2d6a28d06530638cb54045d8a1fede2b6f92c Mon Sep 17 00:00:00 2001 From: Nikolay Martyanov Date: Thu, 7 Nov 2024 14:58:49 +0100 Subject: [PATCH 5/9] pillar/docs: Add doc about goroutine leak detector. Document the goroutine leak detection approach, including methods to monitor and identify abnormal increases in goroutines to support proactive system maintenance. Signed-off-by: Nikolay Martyanov --- pkg/pillar/docs/watcher.md | 44 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/pkg/pillar/docs/watcher.md b/pkg/pillar/docs/watcher.md index 353fd24a92..f1d3ea89fc 100644 --- a/pkg/pillar/docs/watcher.md +++ b/pkg/pillar/docs/watcher.md @@ -41,3 +41,47 @@ By adaptively triggering garbage collection based on actual memory pressure and allocation patterns, we ensure efficient memory usage and maintain system performance. This approach helps prevent potential memory-related issues by proactively managing resources. + +## Goroutine Leak Detector + +We have implemented a system to detect potential goroutine leaks by monitoring +the number of active goroutines over time. This proactive approach helps us +identify unusual increases that may indicate a leak. + +To achieve this, we collect data on the number of goroutines at regular +intervals within the `goroutinesMonitor` function. However, raw data can be +noisy due to normal fluctuations in goroutine usage. To mitigate this, we apply +a moving average to the collected data using the `movingAverage` function. This +smoothing process reduces short-term variations and highlights longer-term +trends, making it easier to detect significant changes in the goroutine count. + +After smoothing the data, we calculate the rate of change by determining the +difference between consecutive smoothed values. This rate of change reflects how +quickly the number of goroutines is increasing or decreasing over time. To +analyze this effectively, we compute the mean and standard deviation of the rate +of change using the `calculateMeanStdDev` function. These statistical measures +provide insights into the typical behavior and variability within our system. + +Using the standard deviation, we set a dynamic threshold that adapts to the +system's normal operating conditions within the `detectGoroutineLeaks` function. +If both the mean rate of change and the latest observed rate exceed this +threshold, it indicates an abnormal increase in goroutine count, signaling a +potential leak. This method reduces false positives by accounting for natural +fluctuations and focusing on significant deviations from expected patterns. + +When a potential leak is detected, we respond by dumping the stack traces of all +goroutines using the `handlePotentialGoroutineLeak` function. This action +provides detailed information that can help diagnose the source of the leak, as +it reveals where goroutines are being created and potentially not terminated +properly. + +The goroutines stacks are collected and stored in a file for further analysis. +The file is stored in `/persist/agentdebug/watcher/sigusr1`. Also, a warning +message is logged to alert the user about the potential goroutine leak. To +search for relevant log messages, grep for `Potential goroutine leak` or +`Number of goroutines exceeds threshold`. + +To prevent repeated handling of the same issue within a short time frame, we +incorporate a cooldown period in the `goroutinesMonitor` function. This ensures +that resources are not wasted on redundant operations and that the monitoring +system remains efficient. From 567e86521590f0e88119213bb65cd3f5ebc1ecb6 Mon Sep 17 00:00:00 2001 From: Nikolay Martyanov Date: Thu, 7 Nov 2024 19:52:27 +0100 Subject: [PATCH 6/9] pillar/watcher: Make goroutine leak detector configurable. Added new configuration options for the goroutine leak detector, enabling adjustment of parameters such as threshold, check interval, analysis window, stats retention, and cooldown period through global settings. This enhancement allows for dynamic tuning of leak detection based on deployment needs without code modification. Introduced `GoroutineLeakDetectionParams` struct to manage these settings, along with validation to ensure parameters meet minimum requirements. Updated the `watcher` component to use this configurable setup, and integrated new global config keys to hold these values. Signed-off-by: Nikolay Martyanov --- docs/CONFIG-PROPERTIES.md | 5 + pkg/pillar/cmd/watcher/watcher.go | 123 +++++++++++++++++++++---- pkg/pillar/cmd/watcher/watcher_test.go | 17 ++-- pkg/pillar/docs/watcher.md | 5 + pkg/pillar/types/global.go | 23 +++++ pkg/pillar/types/global_test.go | 5 + 6 files changed, 154 insertions(+), 24 deletions(-) diff --git a/docs/CONFIG-PROPERTIES.md b/docs/CONFIG-PROPERTIES.md index b7122386ec..ab67b33b33 100644 --- a/docs/CONFIG-PROPERTIES.md +++ b/docs/CONFIG-PROPERTIES.md @@ -64,6 +64,11 @@ | network.switch.enable.arpsnoop | boolean | true | enable ARP Snooping on switch Network Instances | | wwan.query.visible.providers | bool | false | enable to periodically (once per hour) query the set of visible cellular service providers and publish them under WirelessStatus (for every modem) | | network.local.legacy.mac.address | bool | false | enables legacy MAC address generation for local network instances for those EVE nodes where changing MAC addresses in applications will lead to incorrect network configuration | +| goroutine.leak.detection.threshold | integer | 5000 | Amount of goroutines, reaching which will trigger leak detection regardless of growth rate. | +| goroutine.leak.detection.check.interval.minutes | integer (minutes) | 1 | Interval in minutes between the measurements of the goroutine count. | +| goroutine.leak.detection.check.window.minutes | integer (minutes) | 10 | Interval in minutes for which the leak analysis is performed. It should contain at least 10 measurements, so no less than 10 × goroutine.leak.detection.check.interval.minutes. | +| goroutine.leak.detection.keep.stats.hours | integer (hours) | 24 | Amount of hours to keep the stats for leak detection. We keep more stats than the check window to be able to react to settings with a bigger check window via configuration. | +| goroutine.leak.detection.cooldown.minutes | integer (minutes) | 5 | Cooldown period in minutes after the leak detection is triggered. During this period, no stack traces are collected; only warning messages are logged. | In addition, there can be per-agent settings. The Per-agent settings begin with "agent.*agentname*.*setting*" diff --git a/pkg/pillar/cmd/watcher/watcher.go b/pkg/pillar/cmd/watcher/watcher.go index e9db68b136..e2d47c41d1 100644 --- a/pkg/pillar/cmd/watcher/watcher.go +++ b/pkg/pillar/cmd/watcher/watcher.go @@ -28,6 +28,73 @@ const ( usageThreshold = 2 ) +// GoroutineLeakDetectionParams holds the global goroutine leak detection parameters +type GoroutineLeakDetectionParams struct { + mutex sync.Mutex + threshold int + checkInterval time.Duration + checkStatsFor time.Duration + keepStatsFor time.Duration + cooldownPeriod time.Duration +} + +func validateGoroutineLeakDetectionParams(threshold int, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod time.Duration) bool { + if threshold < 1 { + log.Warnf("Invalid threshold: %d", threshold) + return false + } + if checkInterval < 0 { + log.Warnf("Invalid check interval: %v", checkInterval) + return false + } + if checkStatsFor < checkInterval*10 { + log.Warnf("Invalid check window: %v", checkStatsFor) + log.Warnf("Check window must be at least 10 times the check interval (%v)", checkInterval) + return false + } + if keepStatsFor < checkStatsFor { + log.Warnf("Invalid keep stats duration: %v", keepStatsFor) + log.Warnf("Keep stats duration must be greater than a check window (%v)", checkStatsFor) + return false + } + if cooldownPeriod < checkInterval { + log.Warnf("Invalid cooldown period: %v", cooldownPeriod) + log.Warnf("Cooldown period must be greater than a check interval (%v)", checkInterval) + return false + } + return true +} + +// Set atomically sets the global goroutine leak detection parameters +func (gldp *GoroutineLeakDetectionParams) Set(threshold int, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod time.Duration) { + if !validateGoroutineLeakDetectionParams(threshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) { + return + } + gldp.mutex.Lock() + gldp.threshold = threshold + gldp.checkInterval = checkInterval + gldp.checkStatsFor = checkStatsFor + gldp.keepStatsFor = keepStatsFor + gldp.cooldownPeriod = cooldownPeriod + gldp.mutex.Unlock() +} + +// Get atomically gets the global goroutine leak detection parameters +func (gldp *GoroutineLeakDetectionParams) Get() (int, time.Duration, time.Duration, time.Duration, time.Duration) { + var threshold int + var checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod time.Duration + + gldp.mutex.Lock() + threshold = gldp.threshold + checkInterval = gldp.checkInterval + checkStatsFor = gldp.checkStatsFor + keepStatsFor = gldp.keepStatsFor + cooldownPeriod = gldp.cooldownPeriod + gldp.mutex.Unlock() + + return threshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod +} + type watcherContext struct { agentbase.AgentBase ps *pubsub.PubSub @@ -40,6 +107,9 @@ type watcherContext struct { GCInitialized bool // cli options + + // Global goroutine leak detection parameters + GRLDParams GoroutineLeakDetectionParams } // AddAgentSpecificCLIFlags adds CLI options @@ -84,6 +154,22 @@ func setForcedGOGCParams(ctx *watcherContext) { gogcForcedLock.Unlock() } +// Read the global goroutine leak detection parameters to the context +func readGlobalGoroutineLeakDetectionParams(ctx *watcherContext) { + gcp := agentlog.GetGlobalConfig(log, ctx.subGlobalConfig) + if gcp == nil { + return + } + + threshold := int(gcp.GlobalValueInt(types.GoroutineLeakDetectionThreshold)) + checkInterval := time.Duration(gcp.GlobalValueInt(types.GoroutineLeakDetectionCheckIntervalMinutes)) * time.Minute + checkStatsFor := time.Duration(gcp.GlobalValueInt(types.GoroutineLeakDetectionCheckWindowMinutes)) * time.Minute + keepStatsFor := time.Duration(gcp.GlobalValueInt(types.GoroutineLeakDetectionKeepStatsHours)) * time.Hour + cooldownPeriod := time.Duration(gcp.GlobalValueInt(types.GoroutineLeakDetectionCooldownMinutes)) * time.Minute + + ctx.GRLDParams.Set(threshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) +} + // Listens to root cgroup in hierarchy mode (events always propagate // up to the root) and call Go garbage collector with reasonable // interval when certain amount of memory has been allocated (presumably @@ -249,12 +335,27 @@ func handlePotentialGoroutineLeak() { agentlog.DumpAllStacks(log, agentName) } -func goroutinesMonitor(goroutinesThreshold int, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod time.Duration) { +// goroutinesMonitor monitors the number of goroutines and detects potential goroutine leaks. +func goroutinesMonitor(ctx *watcherContext) { + // Get the initial goroutine leak detection parameters to create the stats slice + goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod := ctx.GRLDParams.Get() entriesToKeep := int(keepStatsFor / checkInterval) - entriesToCheck := int(checkStatsFor / checkInterval) stats := make([]int, 0, entriesToKeep+1) var lastLeakHandled time.Time for { + // Check if we have to resize the stats slice + goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod = ctx.GRLDParams.Get() + newEntriesToKeep := int(keepStatsFor / checkInterval) + if newEntriesToKeep != entriesToKeep { + entriesToKeep = newEntriesToKeep + log.Functionf("Resizing stats slice to %d", entriesToKeep) + if len(stats) > entriesToKeep { + log.Functionf("Removing %d oldest entries", len(stats)-entriesToKeep) + stats = stats[len(stats)-entriesToKeep:] + } + } + entriesToCheck := int(checkStatsFor / checkInterval) + // Wait for the next check interval time.Sleep(checkInterval) numGoroutines := runtime.NumGoroutine() // First check for the threshold @@ -411,21 +512,7 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar // Handle memory pressure events by calling GC explicitly go handleMemoryPressureEvents() - // Detect goroutine leaks - const ( - // Threshold for the number of goroutines - // When reached, we assume there's a potential goroutine leak - goroutinesThreshold = 1000 - // Check interval for the number of goroutines - checkInterval = 1 * time.Minute - // Check window. On each check, we analyze the stats for that period - checkWindow = 1 * time.Hour - // Keep statistic for that long - keepStatsFor = 24 * time.Hour - // Cooldown period for the leak detection - cooldownPeriod = 10 * time.Minute - ) - go goroutinesMonitor(goroutinesThreshold, checkInterval, checkWindow, keepStatsFor, cooldownPeriod) + go goroutinesMonitor(&ctx) for { select { @@ -646,6 +733,8 @@ func handleGlobalConfigImpl(ctxArg interface{}, key string, if gcp != nil { ctx.GCInitialized = true } + // Update the global goroutine leak detection parameters + readGlobalGoroutineLeakDetectionParams(ctx) log.Functionf("handleGlobalConfigImpl done for %s", key) } diff --git a/pkg/pillar/cmd/watcher/watcher_test.go b/pkg/pillar/cmd/watcher/watcher_test.go index 4b24865d9b..3c32f8e1c7 100644 --- a/pkg/pillar/cmd/watcher/watcher_test.go +++ b/pkg/pillar/cmd/watcher/watcher_test.go @@ -265,9 +265,11 @@ func TestGoroutinesMonitorNoLeak(t *testing.T) { logger.SetOutput(w) defer logger.SetOutput(backupOut) - go func() { - goroutinesMonitor(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) - }() + // Create context with default parameters + ctx := &watcherContext{} + ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + + go goroutinesMonitor(ctx) timeStart := time.Now() for { @@ -308,9 +310,11 @@ func TestGoroutinesMonitorLeak(t *testing.T) { logger.SetOutput(w) defer logger.SetOutput(backupOut) - go func() { - goroutinesMonitor(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) - }() + // Create context with default parameters + ctx := &watcherContext{} + ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + + go goroutinesMonitor(ctx) timeStart := time.Now() for { @@ -334,5 +338,4 @@ func TestGoroutinesMonitorLeak(t *testing.T) { if !strings.Contains(string(output), "leak detected") { t.Errorf("Expected log output to contain 'leak detected'") } - } diff --git a/pkg/pillar/docs/watcher.md b/pkg/pillar/docs/watcher.md index f1d3ea89fc..86c2f79891 100644 --- a/pkg/pillar/docs/watcher.md +++ b/pkg/pillar/docs/watcher.md @@ -85,3 +85,8 @@ To prevent repeated handling of the same issue within a short time frame, we incorporate a cooldown period in the `goroutinesMonitor` function. This ensures that resources are not wasted on redundant operations and that the monitoring system remains efficient. + +The goroutine leak detector is dynamically configurable via global configuration +parameters. They are documented in the +[CONFIG-PROPERTIES.md](../../../docs/CONFIG-PROPERTIES.md) and all have +`goroutine.leak.detection` prefix. diff --git a/pkg/pillar/types/global.go b/pkg/pillar/types/global.go index 9b33134f48..533e208dc2 100644 --- a/pkg/pillar/types/global.go +++ b/pkg/pillar/types/global.go @@ -257,6 +257,22 @@ const ( // WwanQueryVisibleProviders : periodically query visible cellular service providers WwanQueryVisibleProviders GlobalSettingKey = "wwan.query.visible.providers" + // GoroutineLeakDetectionThreshold amount of goroutines, reaching which will trigger leak detection + // regardless of growth rate. + GoroutineLeakDetectionThreshold GlobalSettingKey = "goroutine.leak.detection.threshold" + // GoroutineLeakDetectionCheckIntervalMinutes interval in minutes between the measurements of the + // goroutine count. + GoroutineLeakDetectionCheckIntervalMinutes GlobalSettingKey = "goroutine.leak.detection.check.interval.minutes" + // GoroutineLeakDetectionCheckWindowMinutes interval in minutes for which the leak analysis is performed. + // It should contain at least 10 measurements, so no less than 10 * GoroutineLeakDetectionCheckIntervalMinutes. + GoroutineLeakDetectionCheckWindowMinutes GlobalSettingKey = "goroutine.leak.detection.check.window.minutes" + // GoroutineLeakDetectionKeepStatsHours amount of hours to keep the stats for the leak detection. We keep more + // stats than the check window to be able to react to settings a bigger check window via configuration. + GoroutineLeakDetectionKeepStatsHours GlobalSettingKey = "goroutine.leak.detection.keep.stats.hours" + // GoroutineLeakDetectionCooldownMinutes cooldown period in minutes after the leak detection is triggered. During + // this period no stack traces are collected, only warning messages are logged. + GoroutineLeakDetectionCooldownMinutes GlobalSettingKey = "goroutine.leak.detection.cooldown.minutes" + // TriState Items // NetworkFallbackAnyEth global setting key NetworkFallbackAnyEth GlobalSettingKey = "network.fallback.any.eth" @@ -933,6 +949,13 @@ func NewConfigItemSpecMap() ConfigItemSpecMap { configItemSpecMap.AddIntItem(LogRemainToSendMBytes, 2048, 10, 0xFFFFFFFF) configItemSpecMap.AddIntItem(DownloadMaxPortCost, 0, 0, 255) + // Goroutine Leak Detection section + configItemSpecMap.AddIntItem(GoroutineLeakDetectionThreshold, 5000, 1, 0xFFFFFFFF) + configItemSpecMap.AddIntItem(GoroutineLeakDetectionCheckIntervalMinutes, 1, 1, 0xFFFFFFFF) + configItemSpecMap.AddIntItem(GoroutineLeakDetectionCheckWindowMinutes, 10, 10, 0xFFFFFFFF) + configItemSpecMap.AddIntItem(GoroutineLeakDetectionKeepStatsHours, 24, 1, 0xFFFFFFFF) + configItemSpecMap.AddIntItem(GoroutineLeakDetectionCooldownMinutes, 5, 1, 0xFFFFFFFF) + // Add Bool Items configItemSpecMap.AddBoolItem(UsbAccess, true) // Controller likely default to false configItemSpecMap.AddBoolItem(VgaAccess, true) // Controller likely default to false diff --git a/pkg/pillar/types/global_test.go b/pkg/pillar/types/global_test.go index e7325cd898..444b41917c 100644 --- a/pkg/pillar/types/global_test.go +++ b/pkg/pillar/types/global_test.go @@ -215,6 +215,11 @@ func TestNewConfigItemSpecMap(t *testing.T) { NetDumpTopicPostOnboardInterval, NetDumpDownloaderPCAP, NetDumpDownloaderHTTPWithFieldValue, + GoroutineLeakDetectionThreshold, + GoroutineLeakDetectionCheckIntervalMinutes, + GoroutineLeakDetectionCheckWindowMinutes, + GoroutineLeakDetectionKeepStatsHours, + GoroutineLeakDetectionCooldownMinutes, } if len(specMap.GlobalSettings) != len(gsKeys) { t.Errorf("GlobalSettings has more (%d) than expected keys (%d)", From 8ac9d3a293a8f72327051efcdcf17169ffbaaebc Mon Sep 17 00:00:00 2001 From: Nikolay Martyanov Date: Thu, 7 Nov 2024 19:56:05 +0100 Subject: [PATCH 7/9] pillar/watcher: Add tests for runtime configuration updates in goroutine leak detection. Extended tests to validate that the goroutine leak detection respects updates to configuration values at runtime. Specifically, the tests confirm that adjustments to the `keepStatsFor` parameter dynamically alter the stats slice size, with changes logged as expected. Signed-off-by: Nikolay Martyanov --- pkg/pillar/cmd/watcher/watcher_test.go | 123 +++++++++++++++++++++++++ 1 file changed, 123 insertions(+) diff --git a/pkg/pillar/cmd/watcher/watcher_test.go b/pkg/pillar/cmd/watcher/watcher_test.go index 3c32f8e1c7..5faf11cd4a 100644 --- a/pkg/pillar/cmd/watcher/watcher_test.go +++ b/pkg/pillar/cmd/watcher/watcher_test.go @@ -4,7 +4,9 @@ package watcher import ( + "fmt" "github.com/lf-edge/eve/pkg/pillar/agentlog" + "github.com/sirupsen/logrus" "io" "math" "os" @@ -339,3 +341,124 @@ func TestGoroutinesMonitorLeak(t *testing.T) { t.Errorf("Expected log output to contain 'leak detected'") } } + +// Adjust stats slice size dynamically based on updated parameters +func TestGoroutinesMonitorUpdateParamsKeepStatsDecrease(t *testing.T) { + backupOut := logger.Out + backupLevel := logger.Level + // Create a pipe to capture log output + r, w, _ := os.Pipe() + logger.SetOutput(w) + logger.SetLevel(logrus.TraceLevel) + defer func() { + logger.SetOutput(backupOut) + logger.SetLevel(backupLevel) + }() + + // Define a context with default parameters + ctx := &watcherContext{} + + // Define parameters + goroutinesThreshold := 100 + checkInterval := 1 * time.Millisecond + checkStatsFor := 10 * time.Millisecond + keepStatsFor := 24 * 60 * time.Millisecond + cooldownPeriod := 5 * time.Millisecond + + // Set the parameters + ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + + go goroutinesMonitor(ctx) + + // Wait until we fill the stats slice + time.Sleep(2 * keepStatsFor) + + // Count the expected size of the stats slice + oldSize := int(keepStatsFor / checkInterval) + + // Change the keepStatsFor parameter to force resizing of the stats slice + keepStatsFor /= 2 + + ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + + // Wait for several check intervals to allow the new context to be updated + time.Sleep(checkInterval * 100) + + // Close the pipe + _ = w.Close() + output, _ := io.ReadAll(r) + + expectedNewSize := int(keepStatsFor / checkInterval) + expectedRemovedEntries := oldSize - expectedNewSize + + // Define the expected log output with the new size + msgResize := fmt.Sprintf("Resizing stats slice to %d", expectedNewSize) + msgRemove := fmt.Sprintf("Removing %d oldest entries", expectedRemovedEntries) + + expectedMsgs := []string{msgResize, msgRemove} + + // Check if the log output contains the expected messages + for _, expectedMsg := range expectedMsgs { + if !strings.Contains(string(output), expectedMsg) { + t.Errorf("Expected log output to contain '%s'", expectedMsg) + } + } +} + +// Adjust stats slice size dynamically based on updated parameters +func TestGoroutinesMonitorUpdateParamsKeepStatsIncrease(t *testing.T) { + backupOut := logger.Out + backupLevel := logger.Level + // Create a pipe to capture log output + r, w, _ := os.Pipe() + logger.SetOutput(w) + logger.SetLevel(logrus.TraceLevel) + defer func() { + logger.SetOutput(backupOut) + logger.SetLevel(backupLevel) + }() + + // Define a context with default parameters + ctx := &watcherContext{} + + // Define parameters + goroutinesThreshold := 100 + checkInterval := 1 * time.Millisecond + checkStatsFor := 10 * time.Millisecond + keepStatsFor := 24 * 60 * time.Millisecond + cooldownPeriod := 5 * time.Millisecond + + // Set the parameters + ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + + go goroutinesMonitor(ctx) + + // Wait until we fill the stats slice + time.Sleep(2 * keepStatsFor) + + // Change the keepStatsFor parameter to force resizing of the stats slice + keepStatsFor *= 2 + + ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + + // Wait for several check intervals to allow the new context to be updated + time.Sleep(checkInterval * 100) + + // Close the pipe + _ = w.Close() + output, _ := io.ReadAll(r) + + expectedNewSize := int(keepStatsFor / checkInterval) + + // Define the expected log output with the new size + msgResize := fmt.Sprintf("Resizing stats slice to %d", expectedNewSize) + + expectedMsgs := []string{msgResize} + + // Check if the log output contains the expected messages + for _, expectedMsg := range expectedMsgs { + if !strings.Contains(string(output), expectedMsg) { + t.Errorf("Expected log output to contain '%s'", expectedMsg) + } + } +} From 9ee63bbafb5227951ed232fde4158ee057aaf1b8 Mon Sep 17 00:00:00 2001 From: Nikolay Martyanov Date: Fri, 8 Nov 2024 14:09:42 +0100 Subject: [PATCH 8/9] pillar/watcher: Make goroutinesMonitor stoppable. Enable the `goroutinesMonitor` to be stopped via a cancellable context within `GoroutineLeakDetectionParams`, allowing controlled termination of the monitoring goroutine. This change introduces a `MakeStoppable` method to set up a cancellable context and a `Stop` method to trigger the cancellation, allowing tests to end monitoring cleanly. Additionally, `checkStopCondition` was added to periodically check if the goroutine should stop. Updated tests to utilize this functionality, adding verification for proper start and stop messages in the log output, ensuring that the monitor operates correctly in both stoppable and unstoppable modes. Signed-off-by: Nikolay Martyanov --- pkg/pillar/cmd/watcher/watcher.go | 38 ++++++++++ pkg/pillar/cmd/watcher/watcher_test.go | 99 ++++++++++++++++++++++++++ 2 files changed, 137 insertions(+) diff --git a/pkg/pillar/cmd/watcher/watcher.go b/pkg/pillar/cmd/watcher/watcher.go index e2d47c41d1..cd1a946b48 100644 --- a/pkg/pillar/cmd/watcher/watcher.go +++ b/pkg/pillar/cmd/watcher/watcher.go @@ -4,6 +4,7 @@ package watcher import ( + "context" "flag" "math" "os" @@ -36,6 +37,9 @@ type GoroutineLeakDetectionParams struct { checkStatsFor time.Duration keepStatsFor time.Duration cooldownPeriod time.Duration + // Context to make the monitoring goroutine cancellable + context context.Context + stop context.CancelFunc } func validateGoroutineLeakDetectionParams(threshold int, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod time.Duration) bool { @@ -79,6 +83,34 @@ func (gldp *GoroutineLeakDetectionParams) Set(threshold int, checkInterval, chec gldp.mutex.Unlock() } +// MakeStoppable creates a cancellable context and a stop function +func (gldp *GoroutineLeakDetectionParams) MakeStoppable() { + gldp.context, gldp.stop = context.WithCancel(context.Background()) +} + +func (gldp *GoroutineLeakDetectionParams) isStoppable() bool { + return gldp.context != nil +} + +func (gldp *GoroutineLeakDetectionParams) checkStopCondition() bool { + if gldp.context != nil { + select { + case <-gldp.context.Done(): + return true + default: + return false + } + } + return false +} + +// Stop cancels the context to stop the monitoring goroutine +func (gldp *GoroutineLeakDetectionParams) Stop() { + if gldp.stop != nil { + gldp.stop() + } +} + // Get atomically gets the global goroutine leak detection parameters func (gldp *GoroutineLeakDetectionParams) Get() (int, time.Duration, time.Duration, time.Duration, time.Duration) { var threshold int @@ -337,12 +369,18 @@ func handlePotentialGoroutineLeak() { // goroutinesMonitor monitors the number of goroutines and detects potential goroutine leaks. func goroutinesMonitor(ctx *watcherContext) { + log.Functionf("Starting goroutines monitor (stoppable: %v)", ctx.GRLDParams.isStoppable()) // Get the initial goroutine leak detection parameters to create the stats slice goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod := ctx.GRLDParams.Get() entriesToKeep := int(keepStatsFor / checkInterval) stats := make([]int, 0, entriesToKeep+1) var lastLeakHandled time.Time for { + // Check if we have to stop + if ctx.GRLDParams.checkStopCondition() { + log.Functionf("Stopping goroutines monitor") + return + } // Check if we have to resize the stats slice goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod = ctx.GRLDParams.Get() newEntriesToKeep := int(keepStatsFor / checkInterval) diff --git a/pkg/pillar/cmd/watcher/watcher_test.go b/pkg/pillar/cmd/watcher/watcher_test.go index 5faf11cd4a..f35383e587 100644 --- a/pkg/pillar/cmd/watcher/watcher_test.go +++ b/pkg/pillar/cmd/watcher/watcher_test.go @@ -270,8 +270,10 @@ func TestGoroutinesMonitorNoLeak(t *testing.T) { // Create context with default parameters ctx := &watcherContext{} ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + ctx.GRLDParams.MakeStoppable() go goroutinesMonitor(ctx) + defer ctx.GRLDParams.Stop() timeStart := time.Now() for { @@ -315,8 +317,10 @@ func TestGoroutinesMonitorLeak(t *testing.T) { // Create context with default parameters ctx := &watcherContext{} ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + ctx.GRLDParams.MakeStoppable() go goroutinesMonitor(ctx) + defer ctx.GRLDParams.Stop() timeStart := time.Now() for { @@ -367,8 +371,10 @@ func TestGoroutinesMonitorUpdateParamsKeepStatsDecrease(t *testing.T) { // Set the parameters ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + ctx.GRLDParams.MakeStoppable() go goroutinesMonitor(ctx) + defer ctx.GRLDParams.Stop() // Wait until we fill the stats slice time.Sleep(2 * keepStatsFor) @@ -430,8 +436,10 @@ func TestGoroutinesMonitorUpdateParamsKeepStatsIncrease(t *testing.T) { // Set the parameters ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + ctx.GRLDParams.MakeStoppable() go goroutinesMonitor(ctx) + defer ctx.GRLDParams.Stop() // Wait until we fill the stats slice time.Sleep(2 * keepStatsFor) @@ -462,3 +470,94 @@ func TestGoroutinesMonitorUpdateParamsKeepStatsIncrease(t *testing.T) { } } } + +func TestGoroutineMonitorStops(t *testing.T) { + keepStatsFor := 24 * 60 * time.Millisecond + goroutinesThreshold := 100 + checkInterval := 1 * time.Millisecond + checkStatsFor := 10 * time.Millisecond + cooldownPeriod := 5 * time.Millisecond + + backupOut := logger.Out + backupLevel := logger.Level + // Create a pipe to capture log output + r, w, _ := os.Pipe() + logger.SetOutput(w) + logger.SetLevel(logrus.TraceLevel) + defer func() { + logger.SetOutput(backupOut) + logger.SetLevel(backupLevel) + }() + + // Create context with default parameters + ctx := &watcherContext{} + ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + ctx.GRLDParams.MakeStoppable() + + go goroutinesMonitor(ctx) + + // Let the monitor run for a while + time.Sleep(keepStatsFor * 2) + + ctx.GRLDParams.Stop() + + // Wait for several check intervals to allow the monitor to stop + time.Sleep(checkInterval * 100) + + // Close the pipe + _ = w.Close() + + // Read the log output + output, _ := io.ReadAll(r) + + msgStart := "Starting goroutines monitor (stoppable: true)" + msgStop := "Stopping goroutines monitor" + expectedMsgs := []string{msgStart, msgStop} + for _, expectedMsg := range expectedMsgs { + if !strings.Contains(string(output), expectedMsg) { + t.Errorf("Expected log output to contain '%s'", expectedMsg) + } + } +} + +func TestGoroutineMonitorRunsFineUnstoppable(t *testing.T) { + keepStatsFor := 24 * 60 * time.Millisecond + goroutinesThreshold := 100 + checkInterval := 1 * time.Millisecond + checkStatsFor := 10 * time.Millisecond + cooldownPeriod := 5 * time.Millisecond + + backupOut := logger.Out + backupLevel := logger.Level + // Create a pipe to capture log output + r, w, _ := os.Pipe() + logger.SetOutput(w) + logger.SetLevel(logrus.TraceLevel) + defer func() { + logger.SetOutput(backupOut) + logger.SetLevel(backupLevel) + }() + + // Create context with default parameters + ctx := &watcherContext{} + ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + + go goroutinesMonitor(ctx) + + time.Sleep(keepStatsFor * 2) + + // Close the pipe + _ = w.Close() + + // Read the log output + output, _ := io.ReadAll(r) + + msgStart := "Starting goroutines monitor (stoppable: false)" + expectedMsgs := []string{msgStart} + for _, expectedMsg := range expectedMsgs { + if !strings.Contains(string(output), expectedMsg) { + t.Errorf("Expected log output to contain '%s'", expectedMsg) + } + } + +} From 4c50dc7753f52eb5b934d076225ac2b7a491edaa Mon Sep 17 00:00:00 2001 From: Nikolay Martyanov Date: Fri, 8 Nov 2024 21:03:25 +0100 Subject: [PATCH 9/9] debug/collect-info: Include dump by goroutine leak detector. Add a step to copy the goroutine leak detector stack dump (sigusr1) from /persist/agentdebug/watcher/ to the collected output directory as `goroutin-leak-detector-stacks-dump`, if present. Signed-off-by: Nikolay Martyanov --- pkg/debug/scripts/collect-info.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/debug/scripts/collect-info.sh b/pkg/debug/scripts/collect-info.sh index bdf92b5855..5c3056c6a2 100755 --- a/pkg/debug/scripts/collect-info.sh +++ b/pkg/debug/scripts/collect-info.sh @@ -6,7 +6,7 @@ # Script version, don't forget to bump up once something is changed -VERSION=30 +VERSION=31 # Add required packages here, it will be passed to "apk add". # Once something added here don't forget to add the same package # to the Dockerfile ('ENV PKGS' line) of the debug container, @@ -457,6 +457,7 @@ ln -s /persist/log "$DIR/persist-log" ln -s /persist/netdump "$DIR/persist-netdump" ln -s /persist/kcrashes "$DIR/persist-kcrashes" [ -d /persist/memory-monitor/output ] && ln -s /persist/memory-monitor/output "$DIR/persist-memory-monitor-output" +[ -f /persist/agentdebug/watcher/sigusr1 ] && cp /persist/agentdebug/watcher/sigusr1 "$DIR/goroutin-leak-detector-stacks-dump" ln -s /run "$DIR/root-run" cp -r /sys/fs/cgroup/memory "$DIR/sys-fs-cgroup-memory" >/dev/null 2>&1 [ -f /persist/SMART_details.json ] && ln -s /persist/SMART_details* "$DIR/"