Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables #48870 #48903

Merged
merged 5 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/lightning/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (r mockRequirement) Store() kv.Storage {
return r.Storage
}

func (r mockRequirement) GetEtcdClient() *clientv3.Client {
func (r mockRequirement) AutoIDClient() *autoid.ClientDiscover {
return nil
}

Expand Down
9 changes: 6 additions & 3 deletions br/pkg/lightning/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type TableImporter struct {
logger log.Logger
kvStore tidbkv.Storage
etcdCli *clientv3.Client
autoidCli *autoid.ClientDiscover

ignoreColumns map[string]struct{}
}
Expand All @@ -92,6 +93,7 @@ func NewTableImporter(
if err != nil {
return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", tableName)
}
autoidCli := autoid.NewClientDiscover(etcdCli)

return &TableImporter{
tableName: tableName,
Expand All @@ -102,6 +104,7 @@ func NewTableImporter(
alloc: idAlloc,
kvStore: kvStore,
etcdCli: etcdCli,
autoidCli: autoidCli,
logger: logger.With(zap.String("table", tableName)),
ignoreColumns: ignoreColumns,
}, nil
Expand Down Expand Up @@ -280,9 +283,9 @@ func (tr *TableImporter) Store() tidbkv.Storage {
return tr.kvStore
}

// GetEtcdClient implements the autoid.Requirement interface.
func (tr *TableImporter) GetEtcdClient() *clientv3.Client {
return tr.etcdCli
// AutoIDClient implements the autoid.Requirement interface.
func (tr *TableImporter) AutoIDClient() *autoid.ClientDiscover {
return tr.autoidCli
}

// RebaseChunkRowIDs rebase the row id of the chunks.
Expand Down
5 changes: 2 additions & 3 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
decoder "github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/sqlexec"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -1690,8 +1689,8 @@ func (r *asAutoIDRequirement) Store() kv.Storage {
return r.store
}

func (r *asAutoIDRequirement) GetEtcdClient() *clientv3.Client {
return r.etcdCli
func (r *asAutoIDRequirement) AutoIDClient() *autoid.ClientDiscover {
return r.autoidCli
}

// applyNewAutoRandomBits set auto_random bits to TableInfo and
Expand Down
3 changes: 3 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -355,6 +356,7 @@ type ddlCtx struct {
statsHandle *handle.Handle
tableLockCkr util.DeadTableLockChecker
etcdCli *clientv3.Client
autoidCli *autoid.ClientDiscover
// backfillJobCh gets notification if any backfill jobs coming.
backfillJobCh chan struct{}

Expand Down Expand Up @@ -679,6 +681,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
infoCache: opt.InfoCache,
tableLockCkr: deadLockCkr,
etcdCli: opt.EtcdCli,
autoidCli: opt.AutoIDClient,
schemaVersionManager: newSchemaVersionManager(),
waitSchemaSyncedController: newWaitSchemaSyncedController(),
runningJobIDs: make([]string, 0, jobRecordCapacity),
Expand Down
19 changes: 14 additions & 5 deletions ddl/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
clientv3 "go.etcd.io/etcd/client/v3"
)

Expand All @@ -27,11 +28,19 @@ type Option func(*Options)

// Options represents all the options of the DDL module needs
type Options struct {
EtcdCli *clientv3.Client
Store kv.Storage
InfoCache *infoschema.InfoCache
Hook Callback
Lease time.Duration
EtcdCli *clientv3.Client
Store kv.Storage
AutoIDClient *autoid.ClientDiscover
InfoCache *infoschema.InfoCache
Hook Callback
Lease time.Duration
}

// WithAutoIDClient specifies the autoid client used by the autoid service for those AUTO_ID_CACHE=1 tables.
func WithAutoIDClient(cli *autoid.ClientDiscover) Option {
return func(options *Options) {
options.AutoIDClient = cli
}
}

// WithEtcdClient specifies the `clientv3.Client` of DDL used to request the etcd service
Expand Down
1 change: 1 addition & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ go_library(
"//keyspace",
"//kv",
"//meta",
"//meta/autoid",
"//metrics",
"//owner",
"//parser/ast",
Expand Down
10 changes: 10 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -135,6 +136,8 @@ type Domain struct {
exit chan struct{}
// `etcdClient` must be used when keyspace is not set, or when the logic to each etcd path needs to be separated by keyspace.
etcdClient *clientv3.Client
// autoidClient is used when there are tables with AUTO_ID_CACHE=1, it is the client to the autoid service.
autoidClient *autoid.ClientDiscover
// `unprefixedEtcdCli` will never set the etcd namespace prefix by keyspace.
// It is only used in storeMinStartTS and RemoveMinStartTS now.
// It must be used when the etcd path isn't needed to separate by keyspace.
Expand Down Expand Up @@ -1091,6 +1094,7 @@ func (do *Domain) Init(
etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec()))

do.etcdClient = cli
do.autoidClient = autoid.NewClientDiscover(cli)

unprefixedEtcdCli, err := newEtcdCli(addrs, ebd)
if err != nil {
Expand Down Expand Up @@ -1124,6 +1128,7 @@ func (do *Domain) Init(
ctx,
ddl.WithEtcdClient(do.etcdClient),
ddl.WithStore(do.store),
ddl.WithAutoIDClient(do.autoidClient),
ddl.WithInfoCache(do.infoCache),
ddl.WithHook(callback),
ddl.WithLease(ddlLease),
Expand Down Expand Up @@ -1556,6 +1561,11 @@ func (do *Domain) GetEtcdClient() *clientv3.Client {
return do.etcdClient
}

// AutoIDClient returns the autoid client.
func (do *Domain) AutoIDClient() *autoid.ClientDiscover {
return do.autoidClient
}

// GetPDClient returns the PD client.
func (do *Domain) GetPDClient() pd.Client {
if store, ok := do.store.(kv.StorageWithPD); ok {
Expand Down
11 changes: 6 additions & 5 deletions executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,9 @@ func (ti *TableImporter) PopulateChunks(ctx context.Context) (map[int32]*checkpo
ti.logger.Error("close etcd client error", zap.Error(err))
}
}()
autoidCli := autoid.NewClientDiscover(etcdCli)

r := &asAutoIDRequirement{ti.kvStore, etcdCli}
r := &asAutoIDRequirement{ti.kvStore, autoidCli}
// todo: the new base should be the max row id of the last Node if we support distributed import.
if err = common.RebaseGlobalAutoID(ctx, 0, r, ti.dbID, ti.tableInfo.Core); err != nil {
return nil, errors.Trace(err)
Expand All @@ -412,8 +413,8 @@ func (ti *TableImporter) PopulateChunks(ctx context.Context) (map[int32]*checkpo
}

type asAutoIDRequirement struct {
kvStore tidbkv.Storage
etcdCli *clientv3.Client
kvStore tidbkv.Storage
autoidCli *autoid.ClientDiscover
}

var _ autoid.Requirement = &asAutoIDRequirement{}
Expand All @@ -422,8 +423,8 @@ func (r *asAutoIDRequirement) Store() tidbkv.Storage {
return r.kvStore
}

func (r *asAutoIDRequirement) GetEtcdClient() *clientv3.Client {
return r.etcdCli
func (r *asAutoIDRequirement) AutoIDClient() *autoid.ClientDiscover {
return r.autoidCli
}

func (ti *TableImporter) rebaseChunkRowID(rowIDBase int64) {
Expand Down
9 changes: 4 additions & 5 deletions meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/pingcap/tidb/util/tracing"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
tikvutil "github.com/tikv/client-go/v2/util"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -573,12 +572,12 @@ func newSinglePointAlloc(r Requirement, dbID, tblID int64, isUnsigned bool) *sin
tblID: tblID,
isUnsigned: isUnsigned,
}
if r.GetEtcdClient() == nil {
if r.AutoIDClient() == nil {
// Only for test in mockstore
spa.clientDiscover = clientDiscover{}
spa.ClientDiscover = &ClientDiscover{}
spa.mu.AutoIDAllocClient = MockForTest(r.Store())
} else {
spa.clientDiscover = clientDiscover{etcdCli: r.GetEtcdClient()}
spa.ClientDiscover = r.AutoIDClient()
}

// mockAutoIDChange failpoint is not implemented in this allocator, so fallback to use the default one.
Expand All @@ -593,7 +592,7 @@ func newSinglePointAlloc(r Requirement, dbID, tblID int64, isUnsigned bool) *sin
// Requirement is the parameter required by NewAllocator
type Requirement interface {
Store() kv.Storage
GetEtcdClient() *clientv3.Client
AutoIDClient() *ClientDiscover
}

// NewAllocator returns a new auto increment id generator on the store.
Expand Down
29 changes: 21 additions & 8 deletions meta/autoid/autoid_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ type singlePointAlloc struct {
tblID int64
lastAllocated int64
isUnsigned bool
clientDiscover
*ClientDiscover
}

type clientDiscover struct {
// ClientDiscover is used to get the AutoIDAllocClient, it creates the grpc connection with autoid service leader.
type ClientDiscover struct {
// This the etcd client for service discover
etcdCli *clientv3.Client
// This is the real client for the AutoIDAlloc service
Expand All @@ -60,7 +61,15 @@ const (
autoIDLeaderPath = "tidb/autoid/leader"
)

func (d *clientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, error) {
// NewClientDiscover creates a ClientDiscover object.
func NewClientDiscover(etcdCli *clientv3.Client) *ClientDiscover {
return &ClientDiscover{
etcdCli: etcdCli,
}
}

// GetClient gets the AutoIDAllocClient.
func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, error) {
d.mu.RLock()
cli := d.mu.AutoIDAllocClient
if cli != nil {
Expand Down Expand Up @@ -138,7 +147,7 @@ retry:
if err != nil {
if strings.Contains(err.Error(), "rpc error") {
time.Sleep(backoffDuration)
sp.resetConn(err)
sp.ResetConn(err)
goto retry
}
return 0, 0, errors.Trace(err)
Expand All @@ -155,9 +164,13 @@ retry:

const backoffDuration = 200 * time.Millisecond

func (sp *singlePointAlloc) resetConn(reason error) {
logutil.BgLogger().Info("[autoid client] reset grpc connection",
zap.String("reason", reason.Error()))
// ResetConn reset the AutoIDAllocClient and underlying grpc connection.
// The next GetClient() call will recreate the client connecting to the correct leader by querying etcd.
func (sp *singlePointAlloc) ResetConn(reason error) {
if reason != nil {
logutil.BgLogger().Info("[autoid client] reset grpc connection",
zap.String("reason", reason.Error()))
}
var grpcConn *grpc.ClientConn
sp.mu.Lock()
grpcConn = sp.mu.ClientConn
Expand Down Expand Up @@ -210,7 +223,7 @@ retry:
if err != nil {
if strings.Contains(err.Error(), "rpc error") {
time.Sleep(backoffDuration)
sp.resetConn(err)
sp.ResetConn(err)
goto retry
}
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion meta/autoid/autoid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (r mockRequirement) Store() kv.Storage {
return r.Storage
}

func (r mockRequirement) GetEtcdClient() *clientv3.Client {
func (r mockRequirement) AutoIDClient() *ClientDiscover {
return nil
}

Expand Down