Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup: prevent leaks from time.After #11983

Merged
merged 1 commit into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/11983.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
cleanup: prevent leaks from time.After
```
16 changes: 13 additions & 3 deletions api/allocations_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ import (
"github.com/gorilla/websocket"
)

const (
// heartbeatInterval is the amount of time to wait between sending heartbeats
// during an exec streaming operation
heartbeatInterval = 10 * time.Second
)

type execSession struct {
client *Client
alloc *Allocation
Expand Down Expand Up @@ -177,15 +183,19 @@ func (s *execSession) startTransmit(ctx context.Context, conn *websocket.Conn) <

// send a heartbeat every 10 seconds
go func() {
t := time.NewTimer(heartbeatInterval)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

helpers/ not available to api/, so no helper function here

  • make sure we call t.Stop
  • make sure the interval can never be non-positive

defer t.Stop()

for {
t.Reset(heartbeatInterval)

select {
case <-ctx.Done():
return
// heartbeat message
case <-time.After(10 * time.Second):
case <-t.C:
// heartbeat message
send(&execStreamingInputHeartbeat)
}

}
}()

Expand Down
7 changes: 6 additions & 1 deletion client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,9 @@ func (tr *TaskRunner) Run() {
return
}

timer, stop := helper.NewSafeTimer(0) // timer duration calculated JIT
defer stop()

MAIN:
for !tr.shouldShutdown() {
select {
Expand Down Expand Up @@ -612,9 +615,11 @@ MAIN:
break MAIN
}

timer.Reset(restartDelay)

// Actually restart by sleeping and also watching for destroy events
select {
case <-time.After(restartDelay):
case <-timer.C:
case <-tr.killCtx.Done():
tr.logger.Trace("task killed between restarts", "delay", restartDelay)
break MAIN
Expand Down
9 changes: 8 additions & 1 deletion command/agent/consul/version_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

log "github.com/hashicorp/go-hclog"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper"
)

// checkConsulTLSSkipVerify logs if Consul does not support TLSSkipVerify on
Expand All @@ -20,6 +21,10 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age
defer close(done)

i := uint64(0)

timer, stop := helper.NewSafeTimer(limit)
defer stop()

for {
self, err := client.Self()
if err == nil {
Expand All @@ -39,10 +44,12 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age
i++
}

timer.Reset(backoff)

select {
case <-ctx.Done():
return
case <-time.After(backoff):
case <-timer.C:
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion command/agent/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper"
)

// Monitor provides a mechanism to stream logs using go-hclog
Expand Down Expand Up @@ -107,12 +108,17 @@ func (d *monitor) Start() <-chan []byte {
// dropped messages and makes room on the logCh
// to add a dropped message count warning
go func() {
timer, stop := helper.NewSafeTimer(d.droppedDuration)
defer stop()

// loop and check for dropped messages
for {
timer.Reset(d.droppedDuration)

select {
case <-d.doneCh:
return
case <-time.After(d.droppedDuration):
case <-timer.C:
d.Lock()

// Check if there have been any dropped messages.
Expand Down
13 changes: 11 additions & 2 deletions drivers/docker/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
docker "github.com/fsouza/go-dockerclient"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/docker/util"
"github.com/hashicorp/nomad/helper"
nstructs "github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -91,19 +92,27 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte
defer destCh.close()

// backoff and retry used if the docker stats API returns an error
var backoff time.Duration
var backoff time.Duration = 0
var retry int

// create an interval timer
timer, stop := helper.NewSafeTimer(backoff)
defer stop()

// loops until doneCh is closed
for {
timer.Reset(backoff)

if backoff > 0 {
select {
case <-time.After(backoff):
case <-timer.C:
case <-ctx.Done():
return
case <-h.doneCh:
return
}
}

// make a channel for docker stats structs and start a collector to
// receive stats from docker and emit nomad stats
// statsCh will always be closed by docker client.
Expand Down
8 changes: 7 additions & 1 deletion drivers/shared/eventer/eventer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/plugins/drivers"
)

Expand Down Expand Up @@ -62,14 +63,19 @@ func NewEventer(ctx context.Context, logger hclog.Logger) *Eventer {
// eventLoop is the main logic which pulls events from the channel and broadcasts
// them to all consumers
func (e *Eventer) eventLoop() {
timer, stop := helper.NewSafeTimer(ConsumerGCInterval)
defer stop()

for {
timer.Reset(ConsumerGCInterval)

select {
case <-e.ctx.Done():
e.logger.Trace("task event loop shutdown")
return
case event := <-e.events:
e.iterateConsumers(event)
case <-time.After(ConsumerGCInterval):
case <-timer.C:
e.gcConsumers()
}
}
Expand Down
27 changes: 27 additions & 0 deletions helper/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,3 +572,30 @@ func PathEscapesSandbox(sandboxDir, path string) bool {
}
return false
}

// StopFunc is used to stop a time.Timer created with NewSafeTimer
type StopFunc func()

// NewSafeTimer creates a time.Timer but does not panic if duration is <= 0.
//
// Using a time.Timer is recommended instead of time.After when it is necessary
// to avoid leaking goroutines (e.g. in a select inside a loop).
//
// Returns the time.Timer and also a StopFunc, forcing the caller to deal
// with stopping the time.Timer to avoid leaking a goroutine.
func NewSafeTimer(duration time.Duration) (*time.Timer, StopFunc) {
Comment on lines +584 to +586
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the API on this. It makes it really obvious if we're accidentally misusing it by ex. calling NewSafeTimer inside the loop.

if duration <= 0 {
// Avoid panic by using the smallest positive value. This is close enough
// to the behavior of time.After(0), which this helper is intended to
// replace.
// https://go.dev/play/p/EIkm9MsPbHY
duration = 1
}

t := time.NewTimer(duration)
cancel := func() {
t.Stop()
}

return t, cancel
}
14 changes: 14 additions & 0 deletions helper/funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,3 +431,17 @@ func TestPathEscapesSandbox(t *testing.T) {
})
}
}

func Test_NewSafeTimer(t *testing.T) {
t.Run("zero", func(t *testing.T) {
timer, stop := NewSafeTimer(0)
defer stop()
<-timer.C
})

t.Run("positive", func(t *testing.T) {
timer, stop := NewSafeTimer(1)
defer stop()
<-timer.C
})
}
7 changes: 6 additions & 1 deletion nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,9 +706,14 @@ func (b *BlockedEvals) Stats() *BlockedStats {

// EmitStats is used to export metrics about the blocked eval tracker while enabled
func (b *BlockedEvals) EmitStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
defer stop()

for {
timer.Reset(period)

select {
case <-time.After(period):
case <-timer.C:
stats := b.Stats()
metrics.SetGauge([]string{"nomad", "blocked_evals", "total_quota_limit"}, float32(stats.TotalQuotaLimit))
metrics.SetGauge([]string{"nomad", "blocked_evals", "total_blocked"}, float32(stats.TotalBlocked))
Expand Down
10 changes: 8 additions & 2 deletions nomad/drainer/watch_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"sync"
"time"

log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
Expand Down Expand Up @@ -140,10 +139,17 @@ func (w *drainingJobWatcher) deregisterJob(jobID, namespace string) {

// watch is the long lived watching routine that detects job drain changes.
func (w *drainingJobWatcher) watch() {
timer, stop := helper.NewSafeTimer(stateReadErrorDelay)
defer stop()

waitIndex := uint64(1)

for {
timer.Reset(stateReadErrorDelay)

w.logger.Trace("getting job allocs at index", "index", waitIndex)
jobAllocs, index, err := w.getJobAllocs(w.getQueryCtx(), waitIndex)

if err != nil {
if err == context.Canceled {
// Determine if it is a cancel or a shutdown
Expand All @@ -164,7 +170,7 @@ func (w *drainingJobWatcher) watch() {
case <-w.ctx.Done():
w.logger.Trace("shutting down")
return
case <-time.After(stateReadErrorDelay):
case <-timer.C:
continue
}
}
Expand Down
9 changes: 7 additions & 2 deletions nomad/drainer/watch_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package drainer

import (
"context"
"time"

log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper"

"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -148,8 +148,13 @@ func NewNodeDrainWatcher(ctx context.Context, limiter *rate.Limiter, state *stat

// watch is the long lived watching routine that detects node changes.
func (w *nodeDrainWatcher) watch() {
timer, stop := helper.NewSafeTimer(stateReadErrorDelay)
defer stop()

nindex := uint64(1)

for {
timer.Reset(stateReadErrorDelay)
nodes, index, err := w.getNodes(nindex)
if err != nil {
if err == context.Canceled {
Expand All @@ -160,7 +165,7 @@ func (w *nodeDrainWatcher) watch() {
select {
case <-w.ctx.Done():
return
case <-time.After(stateReadErrorDelay):
case <-timer.C:
continue
}
}
Expand Down
11 changes: 8 additions & 3 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package nomad

import (
"container/heap"
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"

"context"

metrics "github.com/armon/go-metrics"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/lib/delayheap"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -835,9 +835,14 @@ func (b *EvalBroker) Stats() *BrokerStats {

// EmitStats is used to export metrics about the broker while enabled
func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
defer stop()

for {
timer.Reset(period)

select {
case <-time.After(period):
case <-timer.C:
stats := b.Stats()
metrics.SetGauge([]string{"nomad", "broker", "total_ready"}, float32(stats.TotalReady))
metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked))
Expand Down
7 changes: 5 additions & 2 deletions nomad/plan_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

metrics "github.com/armon/go-metrics"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -196,12 +197,14 @@ func (q *PlanQueue) Stats() *QueueStats {

// EmitStats is used to export metrics about the broker while enabled
func (q *PlanQueue) EmitStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
defer stop()

for {
select {
case <-time.After(period):
case <-timer.C:
stats := q.Stats()
metrics.SetGauge([]string{"nomad", "plan", "queue_depth"}, float32(stats.Depth))

case <-stopCh:
return
}
Expand Down
Loading