Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util+plugins: Fix potential memory leak with explicit timer cancellation. #7089

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion download/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,16 @@ func (d *Downloader) loop(ctx context.Context) {

d.logger.Debug("Waiting %v before next download/retry.", delay)

timer, timerCancel := util.TimerWithCancel(delay)
select {
case <-time.After(delay):
case <-timer.C:
if err != nil {
retry++
} else {
retry = 0
}
case <-ctx.Done():
timerCancel() // explicitly cancel the timer.
return
}
}
Expand Down
4 changes: 3 additions & 1 deletion download/oci_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,16 @@ func (d *OCIDownloader) loop(ctx context.Context) {

d.logger.Debug("OCI - Waiting %v before next download/retry.", delay)

timer, timerCancel := util.TimerWithCancel(delay)
select {
case <-time.After(delay):
case <-timer.C:
if err != nil {
retry++
} else {
retry = 0
}
case <-ctx.Done():
timerCancel() // explicitly cancel the timer.
return
}
}
Expand Down
5 changes: 4 additions & 1 deletion internal/wasm/sdk/opa/loader/file/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/open-policy-agent/opa/bundle"
"github.com/open-policy-agent/opa/internal/wasm/sdk/opa/errors"
"github.com/open-policy-agent/opa/util"

"github.com/open-policy-agent/opa/internal/wasm/sdk/opa"
)
Expand Down Expand Up @@ -156,9 +157,11 @@ func (l *Loader) poller() {
l.logError(err)
}

timer, timerCancel := util.TimerWithCancel(l.interval)
select {
case <-time.After(l.interval):
case <-timer.C:
case <-l.closing:
timerCancel() // explicitly cancel the timer.
return
}
}
Expand Down
11 changes: 9 additions & 2 deletions internal/wasm/sdk/opa/loader/http/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/open-policy-agent/opa/bundle"
"github.com/open-policy-agent/opa/internal/wasm/sdk/opa"
"github.com/open-policy-agent/opa/internal/wasm/sdk/opa/errors"
"github.com/open-policy-agent/opa/util"
)

const (
Expand Down Expand Up @@ -139,9 +140,12 @@ func (l *Loader) poller() {
break
}

delay := time.Duration(float64((l.maxDelay-l.minDelay))*rand.Float64()) + l.minDelay
timer, timerCancel := util.TimerWithCancel(delay)
select {
case <-time.After(time.Duration(float64((l.maxDelay-l.minDelay))*rand.Float64()) + l.minDelay):
case <-timer.C:
case <-ctx.Done():
timerCancel() // explicitly cancel the timer.
return
}
}
Expand All @@ -160,9 +164,12 @@ func (l *Loader) download(ctx context.Context) error {
break
}

delay := defaultBackoff(float64(MinRetryDelay), float64(l.maxDelay), retry)
timer, timerCancel := util.TimerWithCancel(delay)
select {
case <-time.After(defaultBackoff(float64(MinRetryDelay), float64(l.maxDelay), retry)):
case <-timer.C:
case <-ctx.Done():
timerCancel() // explicitly cancel the timer.
return context.Canceled
}
}
Expand Down
4 changes: 3 additions & 1 deletion plugins/logs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,15 +802,17 @@ func (p *Plugin) loop() {

waitC = make(chan struct{})
go func() {
timer, timerCancel := util.TimerWithCancel(delay)
select {
case <-time.After(delay):
case <-timer.C:
if err != nil {
retry++
} else {
retry = 0
}
close(waitC)
case <-ctx.Done():
timerCancel() // explicitly cancel the timer.
}
}()
}
Expand Down
7 changes: 5 additions & 2 deletions topdown/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func createHTTPRequest(bctx BuiltinContext, obj ast.Object) (*http.Request, *htt
var tlsConfig tls.Config
var customHeaders map[string]interface{}
var tlsInsecureSkipVerify bool
var timeout = defaultHTTPRequestTimeout
timeout := defaultHTTPRequestTimeout

for _, val := range obj.Keys() {
key, err := ast.JSON(val.Value)
Expand Down Expand Up @@ -736,9 +736,12 @@ func executeHTTPRequest(req *http.Request, client *http.Client, inputReqObj ast.
return nil, err
}

delay := util.DefaultBackoff(float64(minRetryDelay), float64(maxRetryDelay), i)
timer, timerCancel := util.TimerWithCancel(delay)
select {
case <-time.After(util.DefaultBackoff(float64(minRetryDelay), float64(maxRetryDelay), i)):
case <-timer.C:
case <-req.Context().Done():
timerCancel() // explicitly cancel the timer.
return nil, context.Canceled
}
}
Expand Down
48 changes: 48 additions & 0 deletions util/time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package util

import "time"

// TimerWithCancel exists because of memory leaks when using
// time.After in select statements. Instead, we now manually create timers,
// wait on them, and manually free them.
//
// See this for more details:
// https://www.arangodb.com/2020/09/a-story-of-a-memory-leak-in-go-how-to-properly-use-time-after/
//
// Note: This issue is fixed in Go 1.23, but this fix helps us until then.
//
// Warning: the cancel cannot be done concurrent to reading, everything should
// work in the same goroutine.
//
// Example:
//
// for retries := 0; true; retries++ {
//
// ...main logic...
//
// timer, cancel := utils.TimerWithCancel(utils.Backoff(retries))
// select {
// case <-ctx.Done():
// cancel()
// return ctx.Err()
// case <-timer.C:
// continue
// }
// }
func TimerWithCancel(delay time.Duration) (*time.Timer, func()) {
timer := time.NewTimer(delay)

return timer, func() {
// Note: The Stop function returns:
// - true: if the timer is active. (no draining required)
// - false: if the timer was already stopped or fired/expired.
// In this case the channel should be drained to prevent memory
// leaks only if it is not empty.
// This operation is safe only if the cancel function is
// used in same goroutine. Concurrent reading or canceling may
// cause deadlock.
if !timer.Stop() && len(timer.C) > 0 {
<-timer.C
}
}
}