Skip to content

Commit

Permalink
pillar/watcher: Make goroutinesMonitor stoppable.
Browse files Browse the repository at this point in the history
Enable the `goroutinesMonitor` to be stopped via a cancellable context
within `GoroutineLeakDetectionParams`, allowing controlled termination
of the monitoring goroutine. This change introduces a `MakeStoppable`
method to set up a cancellable context and a `Stop` method to trigger
the cancellation, allowing tests to end monitoring cleanly.

Additionally, `checkStopCondition` was added to periodically check if
the goroutine should stop.

Updated tests to utilize this functionality, adding verification for
proper start and stop messages in the log output, ensuring that the
monitor operates correctly in both stoppable and unstoppable modes.

Signed-off-by: Nikolay Martyanov <nikolay@zededa.com>
  • Loading branch information
OhmSpectator committed Nov 8, 2024
1 parent 66ecf50 commit ed74628
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 0 deletions.
38 changes: 38 additions & 0 deletions pkg/pillar/cmd/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package watcher

import (
"context"
"flag"
"math"
"os"
Expand Down Expand Up @@ -36,6 +37,9 @@ type GoroutineLeakDetectionParams struct {
checkStatsFor time.Duration
keepStatsFor time.Duration
cooldownPeriod time.Duration
// Context to make the monitoring goroutine cancellable
context context.Context
stop context.CancelFunc
}

func validateGoroutineLeakDetectionParams(threshold int, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod time.Duration) bool {
Expand Down Expand Up @@ -79,6 +83,34 @@ func (gldp *GoroutineLeakDetectionParams) Set(threshold int, checkInterval, chec
gldp.mutex.Unlock()
}

// MakeStoppable creates a cancellable context and a stop function
func (gldp *GoroutineLeakDetectionParams) MakeStoppable() {
gldp.context, gldp.stop = context.WithCancel(context.Background())
}

func (gldp *GoroutineLeakDetectionParams) isStoppable() bool {
return gldp.context != nil
}

func (gldp *GoroutineLeakDetectionParams) checkStopCondition() bool {
if gldp.context != nil {
select {
case <-gldp.context.Done():
return true
default:
return false
}
}
return false
}

// Stop cancels the context to stop the monitoring goroutine
func (gldp *GoroutineLeakDetectionParams) Stop() {
if gldp.stop != nil {
gldp.stop()
}
}

// Get atomically gets the global goroutine leak detection parameters
func (gldp *GoroutineLeakDetectionParams) Get() (int, time.Duration, time.Duration, time.Duration, time.Duration) {
var threshold int
Expand Down Expand Up @@ -337,12 +369,18 @@ func handlePotentialGoroutineLeak() {

// goroutinesMonitor monitors the number of goroutines and detects potential goroutine leaks.
func goroutinesMonitor(ctx *watcherContext) {
log.Functionf("Starting goroutines monitor (stoppable: %v)", ctx.GRLDParams.isStoppable())
// Get the initial goroutine leak detection parameters to create the stats slice
goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod := ctx.GRLDParams.Get()
entriesToKeep := int(keepStatsFor / checkInterval)
stats := make([]int, 0, entriesToKeep+1)
var lastLeakHandled time.Time
for {
// Check if we have to stop
if ctx.GRLDParams.checkStopCondition() {
log.Functionf("Stopping goroutines monitor")
return
}
// Check if we have to resize the stats slice
goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod = ctx.GRLDParams.Get()
newEntriesToKeep := int(keepStatsFor / checkInterval)
Expand Down
97 changes: 97 additions & 0 deletions pkg/pillar/cmd/watcher/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,10 @@ func TestGoroutinesMonitorNoLeak(t *testing.T) {
// Create context with default parameters
ctx := &watcherContext{}
ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod)
ctx.GRLDParams.MakeStoppable()

go goroutinesMonitor(ctx)
defer ctx.GRLDParams.Stop()

timeStart := time.Now()
for {
Expand Down Expand Up @@ -294,8 +296,10 @@ func TestGoroutinesMonitorLeak(t *testing.T) {
// Create context with default parameters
ctx := &watcherContext{}
ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod)
ctx.GRLDParams.MakeStoppable()

go goroutinesMonitor(ctx)
defer ctx.GRLDParams.Stop()

timeStart := time.Now()
for {
Expand Down Expand Up @@ -345,8 +349,10 @@ func TestGoroutinesMonitorUpdateParamsKeepStatsDecrease(t *testing.T) {

// Set the parameters
ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod)
ctx.GRLDParams.MakeStoppable()

go goroutinesMonitor(ctx)
defer ctx.GRLDParams.Stop()

// Wait until we fill the stats slice
time.Sleep(2 * keepStatsFor)
Expand Down Expand Up @@ -407,8 +413,10 @@ func TestGoroutinesMonitorUpdateParamsKeepStatsIncrease(t *testing.T) {

// Set the parameters
ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod)
ctx.GRLDParams.MakeStoppable()

go goroutinesMonitor(ctx)
defer ctx.GRLDParams.Stop()

// Wait until we fill the stats slice
time.Sleep(2 * keepStatsFor)
Expand Down Expand Up @@ -441,3 +449,92 @@ func TestGoroutinesMonitorUpdateParamsKeepStatsIncrease(t *testing.T) {
}
}
}

func TestGoroutineMonitorStops(t *testing.T) {
keepStatsFor := 24 * 60 * time.Millisecond
goroutinesThreshold := 100
checkInterval := 1 * time.Millisecond
checkStatsFor := 10 * time.Millisecond
cooldownPeriod := 5 * time.Millisecond

backupOut := logger.Out
backupLevel := logger.Level
// Create a pipe to capture log output
r, w, _ := os.Pipe()
logger.Out = w
logger.Level = logrus.TraceLevel
defer func() {
logger.Out = backupOut
logger.Level = backupLevel
}()

// Create context with default parameters
ctx := &watcherContext{}
ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod)
ctx.GRLDParams.MakeStoppable()

go goroutinesMonitor(ctx)

time.Sleep(keepStatsFor / 2)

ctx.GRLDParams.Stop()

// Close the pipe
_ = w.Close()

logger.Out = backupOut

// Read the log output
output, _ := io.ReadAll(r)

msgStart := "Starting goroutines monitor (stoppable: true)"
msgStop := "Stopping goroutines monitor"
expectedMsgs := []string{msgStart, msgStop}
for _, expectedMsg := range expectedMsgs {
if !strings.Contains(string(output), expectedMsg) {
t.Errorf("Expected log output to contain '%s'", expectedMsg)
}
}
}

func TestGoroutineMonitorRunsFineUnstoppable(t *testing.T) {
keepStatsFor := 24 * 60 * time.Millisecond
goroutinesThreshold := 100
checkInterval := 1 * time.Millisecond
checkStatsFor := 10 * time.Millisecond
cooldownPeriod := 5 * time.Millisecond

backupOut := logger.Out
backupLevel := logger.Level
// Create a pipe to capture log output
r, w, _ := os.Pipe()
logger.Out = w
logger.Level = logrus.TraceLevel
defer func() {
logger.Out = backupOut
logger.Level = backupLevel
}()

// Create context with default parameters
ctx := &watcherContext{}
ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod)

go goroutinesMonitor(ctx)

time.Sleep(keepStatsFor / 2)

// Close the pipe
_ = w.Close()

// Read the log output
output, _ := io.ReadAll(r)

msgStart := "Starting goroutines monitor (stoppable: false)"
expectedMsgs := []string{msgStart}
for _, expectedMsg := range expectedMsgs {
if !strings.Contains(string(output), expectedMsg) {
t.Errorf("Expected log output to contain '%s'", expectedMsg)
}
}

}

0 comments on commit ed74628

Please sign in to comment.