From da1427a08520b2fdc852a13e8fb822033224be8d Mon Sep 17 00:00:00 2001 From: Feng Liyuan Date: Wed, 25 Dec 2019 19:14:09 +0800 Subject: [PATCH] store/tikv: fix CheckStreamTimeoutLoop goroutine leak (#13812) (#14227) --- store/tikv/client.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/store/tikv/client.go b/store/tikv/client.go index e983cd17b7171..0129241abb8cf 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -23,7 +23,7 @@ import ( "sync/atomic" "time" - "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/debugpb" @@ -77,21 +77,23 @@ type connArray struct { streamTimeout chan *tikvrpc.Lease // batchConn is not null when batch is enabled. *batchConn + done chan struct{} } -func newConnArray(maxSize uint, addr string, security config.Security, idleNotify *uint32, done <-chan struct{}) (*connArray, error) { +func newConnArray(maxSize uint, addr string, security config.Security, idleNotify *uint32) (*connArray, error) { a := &connArray{ index: 0, v: make([]*grpc.ClientConn, maxSize), streamTimeout: make(chan *tikvrpc.Lease, 1024), + done: make(chan struct{}), } - if err := a.Init(addr, security, idleNotify, done); err != nil { + if err := a.Init(addr, security, idleNotify); err != nil { return nil, err } return a, nil } -func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32, done <-chan struct{}) error { +func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32) error { a.target = addr opt := grpc.WithInsecure() @@ -159,7 +161,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint a.batchCommandsClients = append(a.batchCommandsClients, batchClient) } } - go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout, done) + go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout, a.done) if allowBatch { go a.batchSendLoop(cfg.TiKVClient) } @@ -184,6 +186,8 @@ func (a *connArray) Close() { a.v[i] = nil } } + + close(a.done) } // rpcClient is RPC client struct. @@ -193,7 +197,6 @@ func (a *connArray) Close() { type rpcClient struct { sync.RWMutex isClosed bool - done chan struct{} conns map[string]*connArray security config.Security @@ -205,7 +208,6 @@ type rpcClient struct { func newRPCClient(security config.Security) *rpcClient { return &rpcClient{ - done: make(chan struct{}, 1), conns: make(map[string]*connArray), security: security, } @@ -241,7 +243,7 @@ func (c *rpcClient) createConnArray(addr string) (*connArray, error) { if !ok { var err error connCount := config.GetGlobalConfig().TiKVClient.GrpcConnectionCount - array, err = newConnArray(connCount, addr, c.security, &c.idleNotify, c.done) + array, err = newConnArray(connCount, addr, c.security, &c.idleNotify) if err != nil { return nil, err } @@ -341,7 +343,6 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R func (c *rpcClient) Close() error { // TODO: add a unit test for SendRequest After Closed - close(c.done) c.closeConns() return nil }