Skip to content

Commit

Permalink
delta: fix goroutine leak when watches close and a node is removed (#570
Browse files Browse the repository at this point in the history
)

Signed-off-by: Valerian Roche <valerian.roche@datadoghq.com>
  • Loading branch information
valerian-roche authored Jul 11, 2022
1 parent 26e2ad7 commit 6efcb69
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 4 deletions.
10 changes: 6 additions & 4 deletions pkg/server/delta/v3/watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ func newWatches() watches {
// Cancel all watches
func (w *watches) Cancel() {
for _, watch := range w.deltaWatches {
if watch.cancel != nil {
watch.cancel()
}
watch.Cancel()
}
}

Expand All @@ -46,5 +44,9 @@ func (w *watch) Cancel() {
if w.cancel != nil {
w.cancel()
}
close(w.responses)
if w.responses != nil {
// w.responses should never be used by a producer once cancel() has been closed, so we can safely close it here
// This is needed to release resources taken by goroutines watching this channel
close(w.responses)
}
}
42 changes: 42 additions & 0 deletions pkg/server/delta/v3/watches_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package delta

import (
"strconv"
"testing"

"github.com/stretchr/testify/assert"

"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
)

func TestDeltaWatches(t *testing.T) {
t.Run("watches response channels are properly closed when the watches are cancelled", func(t *testing.T) {
watches := newWatches()

cancelCount := 0
var channels []chan cache.DeltaResponse
// create a few watches, and ensure that the cancel function are called and the channels are closed
for i := 0; i < 5; i++ {
newWatch := watch{}
if i%2 == 0 {
newWatch.cancel = func() { cancelCount++ }
newWatch.responses = make(chan cache.DeltaResponse)
channels = append(channels, newWatch.responses)
}

watches.deltaWatches[strconv.Itoa(i)] = newWatch
}

watches.Cancel()

assert.Equal(t, 3, cancelCount)
for _, channel := range channels {
select {
case _, ok := <-channel:
assert.False(t, ok, "a channel was not closed")
default:
assert.Fail(t, "a channel was not closed")
}
}
})
}

0 comments on commit 6efcb69

Please sign in to comment.