Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: fix CheckStreamTimeoutLoop goroutine leak (#13812) #14227

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"sync/atomic"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/debugpb"
Expand Down Expand Up @@ -77,21 +77,23 @@ type connArray struct {
streamTimeout chan *tikvrpc.Lease
// batchConn is not null when batch is enabled.
*batchConn
done chan struct{}
}

func newConnArray(maxSize uint, addr string, security config.Security, idleNotify *uint32, done <-chan struct{}) (*connArray, error) {
func newConnArray(maxSize uint, addr string, security config.Security, idleNotify *uint32) (*connArray, error) {
a := &connArray{
index: 0,
v: make([]*grpc.ClientConn, maxSize),
streamTimeout: make(chan *tikvrpc.Lease, 1024),
done: make(chan struct{}),
}
if err := a.Init(addr, security, idleNotify, done); err != nil {
if err := a.Init(addr, security, idleNotify); err != nil {
return nil, err
}
return a, nil
}

func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32, done <-chan struct{}) error {
func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32) error {
a.target = addr

opt := grpc.WithInsecure()
Expand Down Expand Up @@ -159,7 +161,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
a.batchCommandsClients = append(a.batchCommandsClients, batchClient)
}
}
go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout, done)
go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout, a.done)
if allowBatch {
go a.batchSendLoop(cfg.TiKVClient)
}
Expand All @@ -184,6 +186,8 @@ func (a *connArray) Close() {
a.v[i] = nil
}
}

close(a.done)
}

// rpcClient is RPC client struct.
Expand All @@ -193,7 +197,6 @@ func (a *connArray) Close() {
type rpcClient struct {
sync.RWMutex
isClosed bool
done chan struct{}

conns map[string]*connArray
security config.Security
Expand All @@ -205,7 +208,6 @@ type rpcClient struct {

func newRPCClient(security config.Security) *rpcClient {
return &rpcClient{
done: make(chan struct{}, 1),
conns: make(map[string]*connArray),
security: security,
}
Expand Down Expand Up @@ -241,7 +243,7 @@ func (c *rpcClient) createConnArray(addr string) (*connArray, error) {
if !ok {
var err error
connCount := config.GetGlobalConfig().TiKVClient.GrpcConnectionCount
array, err = newConnArray(connCount, addr, c.security, &c.idleNotify, c.done)
array, err = newConnArray(connCount, addr, c.security, &c.idleNotify)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -341,7 +343,6 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R

func (c *rpcClient) Close() error {
// TODO: add a unit test for SendRequest After Closed
close(c.done)
c.closeConns()
return nil
}