Skip to content

Commit

Permalink
store/tikv: fix CheckStreamTimeoutLoop goroutine leak (#13812) (#14227)
Browse files Browse the repository at this point in the history
  • Loading branch information
SunRunAway authored and sre-bot committed Dec 25, 2019
1 parent 1a32e51 commit da1427a
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"sync/atomic"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/debugpb"
Expand Down Expand Up @@ -77,21 +77,23 @@ type connArray struct {
streamTimeout chan *tikvrpc.Lease
// batchConn is not null when batch is enabled.
*batchConn
done chan struct{}
}

func newConnArray(maxSize uint, addr string, security config.Security, idleNotify *uint32, done <-chan struct{}) (*connArray, error) {
func newConnArray(maxSize uint, addr string, security config.Security, idleNotify *uint32) (*connArray, error) {
a := &connArray{
index: 0,
v: make([]*grpc.ClientConn, maxSize),
streamTimeout: make(chan *tikvrpc.Lease, 1024),
done: make(chan struct{}),
}
if err := a.Init(addr, security, idleNotify, done); err != nil {
if err := a.Init(addr, security, idleNotify); err != nil {
return nil, err
}
return a, nil
}

func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32, done <-chan struct{}) error {
func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32) error {
a.target = addr

opt := grpc.WithInsecure()
Expand Down Expand Up @@ -159,7 +161,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
a.batchCommandsClients = append(a.batchCommandsClients, batchClient)
}
}
go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout, done)
go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout, a.done)
if allowBatch {
go a.batchSendLoop(cfg.TiKVClient)
}
Expand All @@ -184,6 +186,8 @@ func (a *connArray) Close() {
a.v[i] = nil
}
}

close(a.done)
}

// rpcClient is RPC client struct.
Expand All @@ -193,7 +197,6 @@ func (a *connArray) Close() {
type rpcClient struct {
sync.RWMutex
isClosed bool
done chan struct{}

conns map[string]*connArray
security config.Security
Expand All @@ -205,7 +208,6 @@ type rpcClient struct {

func newRPCClient(security config.Security) *rpcClient {
return &rpcClient{
done: make(chan struct{}, 1),
conns: make(map[string]*connArray),
security: security,
}
Expand Down Expand Up @@ -241,7 +243,7 @@ func (c *rpcClient) createConnArray(addr string) (*connArray, error) {
if !ok {
var err error
connCount := config.GetGlobalConfig().TiKVClient.GrpcConnectionCount
array, err = newConnArray(connCount, addr, c.security, &c.idleNotify, c.done)
array, err = newConnArray(connCount, addr, c.security, &c.idleNotify)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -341,7 +343,6 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R

func (c *rpcClient) Close() error {
// TODO: add a unit test for SendRequest After Closed
close(c.done)
c.closeConns()
return nil
}

0 comments on commit da1427a

Please sign in to comment.