From e5ba11518863050612f6be11d217c11222e06ac9 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sun, 26 Nov 2023 20:13:26 +0800 Subject: [PATCH 1/5] *: fix grpc client leak bug for AUTO_ID_CACHE=1 tables #48870 --- br/pkg/lightning/common/common_test.go | 2 +- br/pkg/lightning/importer/table_import.go | 9 +++++--- ddl/column.go | 5 ++-- ddl/ddl.go | 3 +++ ddl/options.go | 19 +++++++++++---- domain/BUILD.bazel | 1 + domain/domain.go | 10 ++++++++ executor/importer/table_import.go | 11 +++++---- meta/autoid/autoid.go | 9 ++++---- meta/autoid/autoid_service.go | 28 ++++++++++++++++------- meta/autoid/autoid_test.go | 2 +- 11 files changed, 68 insertions(+), 31 deletions(-) diff --git a/br/pkg/lightning/common/common_test.go b/br/pkg/lightning/common/common_test.go index f1e3cdb4a6e0d..12ccc21180650 100644 --- a/br/pkg/lightning/common/common_test.go +++ b/br/pkg/lightning/common/common_test.go @@ -169,7 +169,7 @@ func (r mockRequirement) Store() kv.Storage { return r.Storage } -func (r mockRequirement) GetEtcdClient() *clientv3.Client { +func (r mockRequirement) AutoIDClient() *autoid.ClientDiscover { return nil } diff --git a/br/pkg/lightning/importer/table_import.go b/br/pkg/lightning/importer/table_import.go index 3574b44612467..c1da1972b8c1f 100644 --- a/br/pkg/lightning/importer/table_import.go +++ b/br/pkg/lightning/importer/table_import.go @@ -71,6 +71,7 @@ type TableImporter struct { logger log.Logger kvStore tidbkv.Storage etcdCli *clientv3.Client + autoidCli *autoid.ClientDiscover ignoreColumns map[string]struct{} } @@ -92,6 +93,7 @@ func NewTableImporter( if err != nil { return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", tableName) } + autoidCli := autoid.NewClientDiscover(etcdCli) return &TableImporter{ tableName: tableName, @@ -102,6 +104,7 @@ func NewTableImporter( alloc: idAlloc, kvStore: kvStore, etcdCli: etcdCli, + autoidCli: autoidCli, logger: logger.With(zap.String("table", tableName)), ignoreColumns: ignoreColumns, }, nil @@ -280,9 +283,9 @@ func (tr *TableImporter) Store() tidbkv.Storage { return tr.kvStore } -// GetEtcdClient implements the autoid.Requirement interface. -func (tr *TableImporter) GetEtcdClient() *clientv3.Client { - return tr.etcdCli +// AutoIDClient implements the autoid.Requirement interface. +func (tr *TableImporter) AutoIDClient() *autoid.ClientDiscover { + return tr.autoidCli } // RebaseChunkRowIDs rebase the row id of the chunks. diff --git a/ddl/column.go b/ddl/column.go index a35fa10fb5792..f6d77f432951e 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -52,7 +52,6 @@ import ( decoder "github.com/pingcap/tidb/util/rowDecoder" "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tidb/util/sqlexec" - clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -1690,8 +1689,8 @@ func (r *asAutoIDRequirement) Store() kv.Storage { return r.store } -func (r *asAutoIDRequirement) GetEtcdClient() *clientv3.Client { - return r.etcdCli +func (r *asAutoIDRequirement) AutoIDClient() *autoid.ClientDiscover { + return r.autoidCli } // applyNewAutoRandomBits set auto_random bits to TableInfo and diff --git a/ddl/ddl.go b/ddl/ddl.go index cf9919893a4ea..e02d62bc12c8f 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -45,6 +45,7 @@ import ( "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/parser/ast" @@ -355,6 +356,7 @@ type ddlCtx struct { statsHandle *handle.Handle tableLockCkr util.DeadTableLockChecker etcdCli *clientv3.Client + autoidCli *autoid.ClientDiscover // backfillJobCh gets notification if any backfill jobs coming. backfillJobCh chan struct{} @@ -679,6 +681,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl { infoCache: opt.InfoCache, tableLockCkr: deadLockCkr, etcdCli: opt.EtcdCli, + autoidCli: opt.AutoIDClient, schemaVersionManager: newSchemaVersionManager(), waitSchemaSyncedController: newWaitSchemaSyncedController(), runningJobIDs: make([]string, 0, jobRecordCapacity), diff --git a/ddl/options.go b/ddl/options.go index e4c59e5d3b5b7..46a251884d1ea 100644 --- a/ddl/options.go +++ b/ddl/options.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta/autoid" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -27,11 +28,19 @@ type Option func(*Options) // Options represents all the options of the DDL module needs type Options struct { - EtcdCli *clientv3.Client - Store kv.Storage - InfoCache *infoschema.InfoCache - Hook Callback - Lease time.Duration + EtcdCli *clientv3.Client + Store kv.Storage + AutoIDClient *autoid.ClientDiscover + InfoCache *infoschema.InfoCache + Hook Callback + Lease time.Duration +} + +// WithAutoIDClient specifies the autoid client used by the autoid service for those AUTO_ID_CACHE=1 tables. +func WithAutoIDClient(cli *autoid.ClientDiscover) Option { + return func(options *Options) { + options.AutoIDClient = cli + } } // WithEtcdClient specifies the `clientv3.Client` of DDL used to request the etcd service diff --git a/domain/BUILD.bazel b/domain/BUILD.bazel index d1001013a1953..af65cb71b4dd1 100644 --- a/domain/BUILD.bazel +++ b/domain/BUILD.bazel @@ -40,6 +40,7 @@ go_library( "//keyspace", "//kv", "//meta", + "//meta/autoid", "//metrics", "//owner", "//parser/ast", diff --git a/domain/domain.go b/domain/domain.go index f7d59a2210aaa..78a264cc1f285 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -52,6 +52,7 @@ import ( "github.com/pingcap/tidb/keyspace" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/parser/ast" @@ -135,6 +136,8 @@ type Domain struct { exit chan struct{} // `etcdClient` must be used when keyspace is not set, or when the logic to each etcd path needs to be separated by keyspace. etcdClient *clientv3.Client + // autoidClient is used when there are tables with AUTO_ID_CACHE=1, it is the client to the autoid service. + autoidClient *autoid.ClientDiscover // `unprefixedEtcdCli` will never set the etcd namespace prefix by keyspace. // It is only used in storeMinStartTS and RemoveMinStartTS now. // It must be used when the etcd path isn't needed to separate by keyspace. @@ -1091,6 +1094,7 @@ func (do *Domain) Init( etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec())) do.etcdClient = cli + do.autoidClient = autoid.NewClientDiscover(cli) unprefixedEtcdCli, err := newEtcdCli(addrs, ebd) if err != nil { @@ -1124,6 +1128,7 @@ func (do *Domain) Init( ctx, ddl.WithEtcdClient(do.etcdClient), ddl.WithStore(do.store), + ddl.WithAutoIDClient(do.autoidClient), ddl.WithInfoCache(do.infoCache), ddl.WithHook(callback), ddl.WithLease(ddlLease), @@ -1556,6 +1561,11 @@ func (do *Domain) GetEtcdClient() *clientv3.Client { return do.etcdClient } +// AutoIDClient returns the autoid client. +func (do *Domain) AutoIDClient() *autoid.ClientDiscover { + return do.autoidClient +} + // GetPDClient returns the PD client. func (do *Domain) GetPDClient() pd.Client { if store, ok := do.store.(kv.StorageWithPD); ok { diff --git a/executor/importer/table_import.go b/executor/importer/table_import.go index 8811679c29dd6..b750657a4421e 100644 --- a/executor/importer/table_import.go +++ b/executor/importer/table_import.go @@ -393,8 +393,9 @@ func (ti *TableImporter) PopulateChunks(ctx context.Context) (map[int32]*checkpo ti.logger.Error("close etcd client error", zap.Error(err)) } }() + autoidCli := autoid.NewClientDiscover(etcdCli) - r := &asAutoIDRequirement{ti.kvStore, etcdCli} + r := &asAutoIDRequirement{ti.kvStore, autoidCli} // todo: the new base should be the max row id of the last Node if we support distributed import. if err = common.RebaseGlobalAutoID(ctx, 0, r, ti.dbID, ti.tableInfo.Core); err != nil { return nil, errors.Trace(err) @@ -412,8 +413,8 @@ func (ti *TableImporter) PopulateChunks(ctx context.Context) (map[int32]*checkpo } type asAutoIDRequirement struct { - kvStore tidbkv.Storage - etcdCli *clientv3.Client + kvStore tidbkv.Storage + autoidCli *autoid.ClientDiscover } var _ autoid.Requirement = &asAutoIDRequirement{} @@ -422,8 +423,8 @@ func (r *asAutoIDRequirement) Store() tidbkv.Storage { return r.kvStore } -func (r *asAutoIDRequirement) GetEtcdClient() *clientv3.Client { - return r.etcdCli +func (r *asAutoIDRequirement) AutoIDClient() *autoid.ClientDiscover { + return r.autoidCli } func (ti *TableImporter) rebaseChunkRowID(rowIDBase int64) { diff --git a/meta/autoid/autoid.go b/meta/autoid/autoid.go index c7153f31ad0dc..6db418149492b 100644 --- a/meta/autoid/autoid.go +++ b/meta/autoid/autoid.go @@ -39,7 +39,6 @@ import ( "github.com/pingcap/tidb/util/tracing" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" tikvutil "github.com/tikv/client-go/v2/util" - clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -573,12 +572,12 @@ func newSinglePointAlloc(r Requirement, dbID, tblID int64, isUnsigned bool) *sin tblID: tblID, isUnsigned: isUnsigned, } - if r.GetEtcdClient() == nil { + if r.AutoIDClient() == nil { // Only for test in mockstore - spa.clientDiscover = clientDiscover{} + spa.ClientDiscover = &ClientDiscover{} spa.mu.AutoIDAllocClient = MockForTest(r.Store()) } else { - spa.clientDiscover = clientDiscover{etcdCli: r.GetEtcdClient()} + spa.ClientDiscover = r.AutoIDClient() } // mockAutoIDChange failpoint is not implemented in this allocator, so fallback to use the default one. @@ -593,7 +592,7 @@ func newSinglePointAlloc(r Requirement, dbID, tblID int64, isUnsigned bool) *sin // Requirement is the parameter required by NewAllocator type Requirement interface { Store() kv.Storage - GetEtcdClient() *clientv3.Client + AutoIDClient() *ClientDiscover } // NewAllocator returns a new auto increment id generator on the store. diff --git a/meta/autoid/autoid_service.go b/meta/autoid/autoid_service.go index bdb7725be9451..58be6b562c56d 100644 --- a/meta/autoid/autoid_service.go +++ b/meta/autoid/autoid_service.go @@ -40,10 +40,10 @@ type singlePointAlloc struct { tblID int64 lastAllocated int64 isUnsigned bool - clientDiscover + *ClientDiscover } -type clientDiscover struct { +type ClientDiscover struct { // This the etcd client for service discover etcdCli *clientv3.Client // This is the real client for the AutoIDAlloc service @@ -60,7 +60,15 @@ const ( autoIDLeaderPath = "tidb/autoid/leader" ) -func (d *clientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, error) { +// NewClientDiscover creates a ClientDiscover object. +func NewClientDiscover(etcdCli *clientv3.Client) *ClientDiscover { + return &ClientDiscover{ + etcdCli: etcdCli, + } +} + +// GetClient gets the AutoIDAllocClient. +func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, error) { d.mu.RLock() cli := d.mu.AutoIDAllocClient if cli != nil { @@ -138,7 +146,7 @@ retry: if err != nil { if strings.Contains(err.Error(), "rpc error") { time.Sleep(backoffDuration) - sp.resetConn(err) + sp.ResetConn(err) goto retry } return 0, 0, errors.Trace(err) @@ -155,9 +163,13 @@ retry: const backoffDuration = 200 * time.Millisecond -func (sp *singlePointAlloc) resetConn(reason error) { - logutil.BgLogger().Info("[autoid client] reset grpc connection", - zap.String("reason", reason.Error())) +// ResetConn reset the AutoIDAllocClient and underlying grpc connection. +// The next GetClient() call will recreate the client connecting to the correct leader by querying etcd. +func (sp *singlePointAlloc) ResetConn(reason error) { + if reason != nil { + logutil.BgLogger().Info("[autoid client] reset grpc connection", + zap.String("reason", reason.Error())) + } var grpcConn *grpc.ClientConn sp.mu.Lock() grpcConn = sp.mu.ClientConn @@ -210,7 +222,7 @@ retry: if err != nil { if strings.Contains(err.Error(), "rpc error") { time.Sleep(backoffDuration) - sp.resetConn(err) + sp.ResetConn(err) goto retry } return errors.Trace(err) diff --git a/meta/autoid/autoid_test.go b/meta/autoid/autoid_test.go index f209db2cd6b6c..24792f897676e 100644 --- a/meta/autoid/autoid_test.go +++ b/meta/autoid/autoid_test.go @@ -44,7 +44,7 @@ func (r mockRequirement) Store() kv.Storage { return r.Storage } -func (r mockRequirement) GetEtcdClient() *clientv3.Client { +func (r mockRequirement) AutoIDClient() *ClientDiscover { return nil } From e143d80fda4db85b9427890709e73b78c7166368 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 27 Nov 2023 10:51:38 +0800 Subject: [PATCH 2/5] make lint happy --- meta/autoid/autoid_service.go | 1 + 1 file changed, 1 insertion(+) diff --git a/meta/autoid/autoid_service.go b/meta/autoid/autoid_service.go index 58be6b562c56d..8b51b60aa8c26 100644 --- a/meta/autoid/autoid_service.go +++ b/meta/autoid/autoid_service.go @@ -43,6 +43,7 @@ type singlePointAlloc struct { *ClientDiscover } +// ClientDiscover is used to get the AutoIDAllocClient, it creates the grpc connection with autoid service leader. type ClientDiscover struct { // This the etcd client for service discover etcdCli *clientv3.Client From 600bb49a22c13aaef20587deabb64eb6b047b515 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 27 Nov 2023 11:51:40 +0800 Subject: [PATCH 3/5] fix build --- meta/autoid/BUILD.bazel | 1 - meta/autoid/autoid_test.go | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/meta/autoid/BUILD.bazel b/meta/autoid/BUILD.bazel index a50e918fb8368..e653a904ce7ca 100644 --- a/meta/autoid/BUILD.bazel +++ b/meta/autoid/BUILD.bazel @@ -63,7 +63,6 @@ go_test( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", - "@io_etcd_go_etcd_client_v3//:client", "@org_uber_go_goleak//:goleak", ], ) diff --git a/meta/autoid/autoid_test.go b/meta/autoid/autoid_test.go index 24792f897676e..dc69a997fda96 100644 --- a/meta/autoid/autoid_test.go +++ b/meta/autoid/autoid_test.go @@ -33,7 +33,6 @@ import ( "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/util" "github.com/stretchr/testify/require" - clientv3 "go.etcd.io/etcd/client/v3" ) type mockRequirement struct { @@ -44,7 +43,7 @@ func (r mockRequirement) Store() kv.Storage { return r.Storage } -func (r mockRequirement) AutoIDClient() *ClientDiscover { +func (r mockRequirement) AutoIDClient() *autoid.ClientDiscover { return nil } From 26234a44e94564e04a3cb4979415573b7d099432 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 27 Nov 2023 12:03:09 +0800 Subject: [PATCH 4/5] fix build --- infoschema/BUILD.bazel | 1 - infoschema/infoschema_test.go | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/infoschema/BUILD.bazel b/infoschema/BUILD.bazel index 11de5e7d34556..5240f2afe4e94 100644 --- a/infoschema/BUILD.bazel +++ b/infoschema/BUILD.bazel @@ -123,7 +123,6 @@ go_test( "@com_github_prometheus_prometheus//promql", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//testutils", - "@io_etcd_go_etcd_client_v3//:client", "@org_golang_google_grpc//:grpc", "@org_uber_go_goleak//:goleak", ], diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index 7cc65567a1ee7..4c14de760043e 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -37,7 +37,6 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/stretchr/testify/require" - clientv3 "go.etcd.io/etcd/client/v3" ) func TestBasic(t *testing.T) { @@ -553,7 +552,7 @@ func (r mockRequirement) Store() kv.Storage { return r.Storage } -func (r mockRequirement) GetEtcdClient() *clientv3.Client { +func (r mockRequirement) AutoIDClient() *autoid.ClientDiscover { return nil } From 224b44bd4e1365324208bd1e4495ad55e83cfe1f Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 27 Nov 2023 12:25:34 +0800 Subject: [PATCH 5/5] fix build --- br/pkg/lightning/common/BUILD.bazel | 1 - br/pkg/lightning/common/common_test.go | 1 - 2 files changed, 2 deletions(-) diff --git a/br/pkg/lightning/common/BUILD.bazel b/br/pkg/lightning/common/BUILD.bazel index 25081317cbae6..519e81ed03175 100644 --- a/br/pkg/lightning/common/BUILD.bazel +++ b/br/pkg/lightning/common/BUILD.bazel @@ -124,7 +124,6 @@ go_test( "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", - "@io_etcd_go_etcd_client_v3//:client", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", "@org_uber_go_goleak//:goleak", diff --git a/br/pkg/lightning/common/common_test.go b/br/pkg/lightning/common/common_test.go index 12ccc21180650..0fb69f58b203a 100644 --- a/br/pkg/lightning/common/common_test.go +++ b/br/pkg/lightning/common/common_test.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/tidb/store/mockstore" tmock "github.com/pingcap/tidb/util/mock" "github.com/stretchr/testify/require" - clientv3 "go.etcd.io/etcd/client/v3" ) func newTableInfo(t *testing.T,