Skip to content

Commit

Permalink
Merge pull request #8184 from hashicorp/bugfix/goroutine-leaks
Browse files Browse the repository at this point in the history
  • Loading branch information
mkeeler authored Jun 25, 2020
2 parents df48db0 + e983561 commit 7041f69
Show file tree
Hide file tree
Showing 121 changed files with 22,790 additions and 10,289 deletions.
13 changes: 9 additions & 4 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4424,7 +4424,8 @@ func (a *Agent) LocalBlockingQuery(alwaysBlock bool, hash string, wait time.Dura
// If we are not blocking we can skip tracking and allocating - nil WatchSet
// is still valid to call Add on and will just be a no op.
var ws memdb.WatchSet
var timeout *time.Timer
var ctx context.Context = &lib.StopChannelContext{StopCh: a.shutdownCh}
shouldBlock := false

if alwaysBlock || hash != "" {
if wait == 0 {
Expand All @@ -4435,7 +4436,11 @@ func (a *Agent) LocalBlockingQuery(alwaysBlock bool, hash string, wait time.Dura
}
// Apply a small amount of jitter to the request.
wait += lib.RandomStagger(wait / 16)
timeout = time.NewTimer(wait)
var cancel func()
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(wait))
defer cancel()

shouldBlock = true
}

for {
Expand All @@ -4453,7 +4458,7 @@ func (a *Agent) LocalBlockingQuery(alwaysBlock bool, hash string, wait time.Dura
// WatchSet immediately returns false which would incorrectly cause this to
// loop and repeat again, however we rely on the invariant that ws == nil
// IFF timeout == nil in which case the Watch call is never invoked.
if timeout == nil || hash != curHash || ws.Watch(timeout.C) {
if !shouldBlock || hash != curHash || ws.WatchCtx(ctx) != nil {
return curHash, curResp, err
}
// Watch returned false indicating a change was detected, loop and repeat
Expand All @@ -4465,7 +4470,7 @@ func (a *Agent) LocalBlockingQuery(alwaysBlock bool, hash string, wait time.Dura
if syncPauseCh := a.SyncPausedCh(); syncPauseCh != nil {
select {
case <-syncPauseCh:
case <-timeout.C:
case <-ctx.Done():
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions agent/consul/gateway_locator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package consul

import (
"context"
"errors"
"math/rand"
"sort"
Expand Down Expand Up @@ -261,9 +262,9 @@ func NewGatewayLocator(

var errGatewayLocalStateNotInitialized = errors.New("local state not initialized")

func (g *GatewayLocator) Run(stopCh <-chan struct{}) {
func (g *GatewayLocator) Run(ctx context.Context) {
var lastFetchIndex uint64
retryLoopBackoff(stopCh, func() error {
retryLoopBackoff(ctx, func() error {
idx, err := g.runOnce(lastFetchIndex)
if err != nil {
return err
Expand Down
23 changes: 15 additions & 8 deletions agent/consul/leader_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ func (s *Server) secondaryIntermediateCertRenewalWatch(ctx context.Context) erro
case <-ctx.Done():
return nil
case <-time.After(structs.IntermediateCertRenewInterval):
retryLoopBackoff(ctx.Done(), func() error {
retryLoopBackoff(ctx, func() error {
s.caProviderReconfigurationLock.Lock()
defer s.caProviderReconfigurationLock.Unlock()

Expand Down Expand Up @@ -724,7 +724,7 @@ func (s *Server) secondaryCARootWatch(ctx context.Context) error {

connectLogger.Debug("starting Connect CA root replication from primary datacenter", "primary", s.config.PrimaryDatacenter)

retryLoopBackoff(ctx.Done(), func() error {
retryLoopBackoff(ctx, func() error {
var roots structs.IndexedCARoots
if err := s.forwardDC("ConnectCA.Roots", s.config.PrimaryDatacenter, &args, &roots); err != nil {
return fmt.Errorf("Error retrieving the primary datacenter's roots: %v", err)
Expand Down Expand Up @@ -780,7 +780,7 @@ func (s *Server) replicateIntentions(ctx context.Context) error {

connectLogger.Debug("starting Connect intention replication from primary datacenter", "primary", s.config.PrimaryDatacenter)

retryLoopBackoff(ctx.Done(), func() error {
retryLoopBackoff(ctx, func() error {
// Always use the latest replication token value in case it changed while looping.
args.QueryOptions.Token = s.tokens.ReplicationToken()

Expand Down Expand Up @@ -832,14 +832,14 @@ func (s *Server) replicateIntentions(ctx context.Context) error {

// retryLoopBackoff loops a given function indefinitely, backing off exponentially
// upon errors up to a maximum of maxRetryBackoff seconds.
func retryLoopBackoff(stopCh <-chan struct{}, loopFn func() error, errFn func(error)) {
func retryLoopBackoff(ctx context.Context, loopFn func() error, errFn func(error)) {
var failedAttempts uint
limiter := rate.NewLimiter(loopRateLimit, retryBucketSize)
for {
// Rate limit how often we run the loop
limiter.Wait(context.Background())
limiter.Wait(ctx)
select {
case <-stopCh:
case <-ctx.Done():
return
default:
}
Expand All @@ -850,8 +850,15 @@ func retryLoopBackoff(stopCh <-chan struct{}, loopFn func() error, errFn func(er

if err := loopFn(); err != nil {
errFn(err)
time.Sleep(retryTime)
continue

timer := time.NewTimer(retryTime)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
continue
}
}

// Reset the failed attempts after a successful run.
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/leader_federation_state_ae.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *Server) stopFederationStateAntiEntropy() {
func (s *Server) federationStateAntiEntropySync(ctx context.Context) error {
var lastFetchIndex uint64

retryLoopBackoff(ctx.Done(), func() error {
retryLoopBackoff(ctx, func() error {
if !s.DatacenterSupportsFederationStates() {
return nil
}
Expand Down
15 changes: 10 additions & 5 deletions agent/consul/rpc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package consul

import (
"context"
"crypto/tls"
"encoding/binary"
"errors"
Expand Down Expand Up @@ -752,7 +753,9 @@ type queryFn func(memdb.WatchSet, *state.Store) error

// blockingQuery is used to process a potentially blocking query operation.
func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error {
var timeout *time.Timer
var cancel func()
var ctx context.Context = &lib.StopChannelContext{StopCh: s.shutdownCh}

var queriesBlocking uint64
var queryTimeout time.Duration

Expand All @@ -776,9 +779,9 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
// Apply a small amount of jitter to the request.
queryTimeout += lib.RandomStagger(queryTimeout / jitterFraction)

// Setup a query timeout.
timeout = time.NewTimer(queryTimeout)
defer timeout.Stop()
// wrap the base context with a deadline
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(queryTimeout))
defer cancel()

// instrument blockingQueries
// atomic inc our server's count of in-flight blockingQueries and store the new value
Expand Down Expand Up @@ -833,7 +836,9 @@ RUN_QUERY:
}
// block up to the timeout if we don't see anything fresh.
if err == nil && minQueryIndex > 0 && queryMeta.GetIndex() <= minQueryIndex {
if expired := ws.Watch(timeout.C); !expired {
if err := ws.WatchCtx(ctx); err == nil {
// a non-nil error only occurs when the context is cancelled

// If a restore may have woken us up then bail out from
// the query immediately. This is slightly race-ey since
// this might have been interrupted for other reasons,
Expand Down
3 changes: 2 additions & 1 deletion agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func NewServerWithOptions(config *Config, options ...ConsulOption) (*Server, err
}

if s.gatewayLocator != nil {
go s.gatewayLocator.Run(s.shutdownCh)
go s.gatewayLocator.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
}

// Serf and dynamic bind ports
Expand Down Expand Up @@ -640,6 +640,7 @@ func (s *Server) trackAutoEncryptCARoots() {
ws := memdb.NewWatchSet()
state := s.fsm.State()
ws.Add(state.AbandonCh())
ws.Add(s.shutdownCh)
_, cas, err := state.CARoots(ws)
if err != nil {
s.logger.Error("Failed to watch AutoEncrypt CARoot", "error", err)
Expand Down
135 changes: 135 additions & 0 deletions agent/routine-leak-checker/leak_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package leakcheck

import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/x509"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/tlsutil"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)

func testTLSCertificates(serverName string) (cert string, key string, cacert string, err error) {
// generate CA
serial, err := tlsutil.GenerateSerialNumber()
if err != nil {
return "", "", "", err
}
signer, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
return "", "", "", err
}
ca, err := tlsutil.GenerateCA(signer, serial, 365, nil)
if err != nil {
return "", "", "", err
}

// generate leaf
serial, err = tlsutil.GenerateSerialNumber()
if err != nil {
return "", "", "", err
}

cert, privateKey, err := tlsutil.GenerateCert(
signer,
ca,
serial,
"Test Cert Name",
365,
[]string{serverName},
nil,
[]x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
)
if err != nil {
return "", "", "", err
}

return cert, privateKey, ca, nil
}

func setupPrimaryServer(t *testing.T) *agent.TestAgent {
d := testutil.TempDir(t, "leaks-primary-server")
t.Cleanup(func() { os.RemoveAll(d) })

certPEM, keyPEM, caPEM, err := testTLSCertificates("server.primary.consul")
require.NoError(t, err)

certPath := filepath.Join(d, "cert.pem")
keyPath := filepath.Join(d, "key.pem")
caPath := filepath.Join(d, "cacert.pem")

require.NoError(t, ioutil.WriteFile(certPath, []byte(certPEM), 0600))
require.NoError(t, ioutil.WriteFile(keyPath, []byte(keyPEM), 0600))
require.NoError(t, ioutil.WriteFile(caPath, []byte(caPEM), 0600))

aclParams := agent.DefaulTestACLConfigParams()
aclParams.PrimaryDatacenter = "primary"
aclParams.EnableTokenReplication = true

config := `
server = true
datacenter = "primary"
primary_datacenter = "primary"
enable_central_service_config = true
connect {
enabled = true
}
auto_encrypt {
allow_tls = true
}
` + agent.TestACLConfigWithParams(aclParams)

a := agent.NewTestAgent(t, config)
t.Cleanup(func() { a.Shutdown() })

testrpc.WaitForTestAgent(t, a.RPC, "primary", testrpc.WithToken(agent.TestDefaultMasterToken))

return a
}

func TestTestAgentLeaks_Server(t *testing.T) {
/*
Eventually go routine leak checking should be moved into other packages such as the agent
and agent/consul packages. However there are too many leaks for the test to run properly.
Many of the leaks are due to blocking queries from clients to servers being uncancellable.
Until we can move away from net/rpc and fix some of the other issues we don't want a
completely unbounded test which is guaranteed to fail 100% of the time. For now this
test will do. When we do update it we should add this in a *_test.go file in the packages
that we want to enable leak checking within:
import (
"testing"
"go.uber.org/goleak"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m,
goleak.IgnoreTopFunction("k8s.io/klog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("github.com/hashicorp/consul/sdk/freeport.checkFreedPorts"),
)
}
*/

defer goleak.VerifyNone(t,
goleak.IgnoreTopFunction("k8s.io/klog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("github.com/hashicorp/consul/sdk/freeport.checkFreedPorts"),
)

primaryServer := setupPrimaryServer(t)
primaryServer.Shutdown()
}
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,14 @@ require (
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/stretchr/testify v1.5.1
go.opencensus.io v0.22.0 // indirect
go.uber.org/goleak v1.0.0
golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9
golang.org/x/net v0.0.0-20191004110552-13f9640d40b9
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/sys v0.0.0-20200316230553-a7d97aace0b0
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1
golang.org/x/tools v0.0.0-20200513154647-78b527d18275 // indirect
google.golang.org/api v0.9.0 // indirect
google.golang.org/appengine v1.6.0 // indirect
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55
Expand Down
Loading

0 comments on commit 7041f69

Please sign in to comment.