diff --git a/br/pkg/lightning/common/BUILD.bazel b/br/pkg/lightning/common/BUILD.bazel index 44b370a799cde..519e81ed03175 100644 --- a/br/pkg/lightning/common/BUILD.bazel +++ b/br/pkg/lightning/common/BUILD.bazel @@ -23,7 +23,6 @@ go_library( "//br/pkg/lightning/log", "//br/pkg/utils", "//errno", - "//kv", "//meta/autoid", "//parser/model", "//sessionctx/variable", @@ -102,7 +101,7 @@ go_test( ], embed = [":common"], flaky = True, - shard_count = 20, + shard_count = 21, deps = [ "//br/pkg/errors", "//br/pkg/lightning/log", diff --git a/br/pkg/lightning/common/common_test.go b/br/pkg/lightning/common/common_test.go index ef9b74ab7486f..ae6f8f1563a3d 100644 --- a/br/pkg/lightning/common/common_test.go +++ b/br/pkg/lightning/common/common_test.go @@ -159,3 +159,70 @@ func TestAllocGlobalAutoID(t *testing.T) { require.Equal(t, c.expectAllocatorTypes, allocatorTypes, c.tableID) } } + +type mockRequirement struct { + kv.Storage +} + +func (r mockRequirement) Store() kv.Storage { + return r.Storage +} + +func (r mockRequirement) AutoIDClient() *autoid.ClientDiscover { + return nil +} + +func TestRebaseTableAllocators(t *testing.T) { + storePath := t.TempDir() + kvStore, err := mockstore.NewMockStore(mockstore.WithPath(storePath)) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, kvStore.Close()) + }) + ti := newTableInfo(t, 1, 42, + "create table t42 (a int primary key nonclustered auto_increment) AUTO_ID_CACHE 1", kvStore) + allocators, err := common.GetGlobalAutoIDAlloc(mockRequirement{kvStore}, 1, ti) + require.NoError(t, err) + require.Len(t, allocators, 2) + for _, alloc := range allocators { + id, err := alloc.NextGlobalAutoID() + require.NoError(t, err) + require.Equal(t, int64(1), id) + } + ctx := context.Background() + allocatorTypes := make([]autoid.AllocatorType, 0, len(allocators)) + // rebase to 123 + for _, alloc := range allocators { + require.NoError(t, alloc.Rebase(ctx, 123, false)) + allocatorTypes = append(allocatorTypes, alloc.GetType()) + } + require.Equal(t, []autoid.AllocatorType{autoid.AutoIncrementType, autoid.RowIDAllocType}, allocatorTypes) + // this call does nothing + require.NoError(t, common.RebaseTableAllocators(ctx, nil, mockRequirement{kvStore}, 1, ti)) + for _, alloc := range allocators { + nextID, err := alloc.NextGlobalAutoID() + require.NoError(t, err) + require.Equal(t, int64(124), nextID) + } + // this call rebase AutoIncrementType allocator to 223 + require.NoError(t, common.RebaseTableAllocators(ctx, map[autoid.AllocatorType]int64{ + autoid.AutoIncrementType: 223, + }, mockRequirement{kvStore}, 1, ti)) + next, err := allocators[0].NextGlobalAutoID() + require.NoError(t, err) + require.Equal(t, int64(224), next) + next, err = allocators[1].NextGlobalAutoID() + require.NoError(t, err) + require.Equal(t, int64(124), next) + // this call rebase AutoIncrementType allocator to 323, RowIDAllocType allocator to 423 + require.NoError(t, common.RebaseTableAllocators(ctx, map[autoid.AllocatorType]int64{ + autoid.AutoIncrementType: 323, + autoid.RowIDAllocType: 423, + }, mockRequirement{kvStore}, 1, ti)) + next, err = allocators[0].NextGlobalAutoID() + require.NoError(t, err) + require.Equal(t, int64(324), next) + next, err = allocators[1].NextGlobalAutoID() + require.NoError(t, err) + require.Equal(t, int64(424), next) +} diff --git a/br/pkg/lightning/importer/table_import.go b/br/pkg/lightning/importer/table_import.go index c66961665dbd4..0b67e5b12b32f 100644 --- a/br/pkg/lightning/importer/table_import.go +++ b/br/pkg/lightning/importer/table_import.go @@ -69,6 +69,8 @@ type TableImporter struct { alloc autoid.Allocators logger log.Logger kvStore tidbkv.Storage + etcdCli *clientv3.Client + autoidCli *autoid.ClientDiscover ignoreColumns map[string]struct{} } @@ -89,6 +91,7 @@ func NewTableImporter( if err != nil { return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", tableName) } + autoidCli := autoid.NewClientDiscover(etcdCli) return &TableImporter{ tableName: tableName, @@ -98,6 +101,8 @@ func NewTableImporter( encTable: tbl, alloc: idAlloc, kvStore: kvStore, + etcdCli: etcdCli, + autoidCli: autoidCli, logger: logger.With(zap.String("table", tableName)), ignoreColumns: ignoreColumns, }, nil @@ -268,6 +273,19 @@ func (tr *TableImporter) populateChunks(ctx context.Context, rc *Controller, cp return err } +// AutoIDRequirement implements autoid.Requirement. +var _ autoid.Requirement = &TableImporter{} + +// Store implements the autoid.Requirement interface. +func (tr *TableImporter) Store() tidbkv.Storage { + return tr.kvStore +} + +// AutoIDClient implements the autoid.Requirement interface. +func (tr *TableImporter) AutoIDClient() *autoid.ClientDiscover { + return tr.autoidCli +} + // RebaseChunkRowIDs rebase the row id of the chunks. func (*TableImporter) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDBase int64) { if rowIDBase == 0 { diff --git a/ddl/column.go b/ddl/column.go index e3f15eb6423ca..f6d77f432951e 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -754,7 +754,7 @@ func (w *worker) doModifyColumnTypeWithData( job.SnapshotVer = 0 job.SchemaState = model.StateWriteReorganization case model.StateWriteReorganization: - tbl, err := getTable(d.store, dbInfo.ID, tblInfo) + tbl, err := getTable((*asAutoIDRequirement)(d), dbInfo.ID, tblInfo) if err != nil { return ver, errors.Trace(err) } @@ -1681,6 +1681,18 @@ func checkNewAutoRandomBits(idAccessors meta.AutoIDAccessors, oldCol *model.Colu return nil } +type asAutoIDRequirement ddlCtx + +var _ autoid.Requirement = &asAutoIDRequirement{} + +func (r *asAutoIDRequirement) Store() kv.Storage { + return r.store +} + +func (r *asAutoIDRequirement) AutoIDClient() *autoid.ClientDiscover { + return r.autoidCli +} + // applyNewAutoRandomBits set auto_random bits to TableInfo and // migrate auto_increment ID to auto_random ID if possible. func applyNewAutoRandomBits(d *ddlCtx, m *meta.Meta, dbInfo *model.DBInfo, @@ -1690,7 +1702,7 @@ func applyNewAutoRandomBits(d *ddlCtx, m *meta.Meta, dbInfo *model.DBInfo, if !needMigrateFromAutoIncToAutoRand { return nil } - autoRandAlloc := autoid.NewAllocatorsFromTblInfo(d.store, dbInfo.ID, tblInfo).Get(autoid.AutoRandomType) + autoRandAlloc := autoid.NewAllocatorsFromTblInfo((*asAutoIDRequirement)(d), dbInfo.ID, tblInfo).Get(autoid.AutoRandomType) if autoRandAlloc == nil { errMsg := fmt.Sprintf(autoid.AutoRandomAllocatorNotFound, dbInfo.Name.O, tblInfo.Name.O) return dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(errMsg) diff --git a/ddl/ddl.go b/ddl/ddl.go index ecd7100ece59e..e02d62bc12c8f 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -33,7 +33,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" -<<<<<<< HEAD:ddl/ddl.go "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/ingest" sess "github.com/pingcap/tidb/ddl/internal/session" @@ -46,6 +45,7 @@ import ( "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/parser/ast" @@ -65,41 +65,6 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tidb/util/syncutil" -======= - "github.com/pingcap/tidb/pkg/config" - "github.com/pingcap/tidb/pkg/ddl/ingest" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" - "github.com/pingcap/tidb/pkg/ddl/syncer" - "github.com/pingcap/tidb/pkg/ddl/util" - "github.com/pingcap/tidb/pkg/disttask/framework/dispatcher" - "github.com/pingcap/tidb/pkg/disttask/framework/proto" - "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" - "github.com/pingcap/tidb/pkg/domain/infosync" - "github.com/pingcap/tidb/pkg/infoschema" - "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/meta" - "github.com/pingcap/tidb/pkg/meta/autoid" - "github.com/pingcap/tidb/pkg/metrics" - "github.com/pingcap/tidb/pkg/owner" - "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/parser/terror" - "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/binloginfo" - "github.com/pingcap/tidb/pkg/sessionctx/variable" - "github.com/pingcap/tidb/pkg/sessiontxn" - "github.com/pingcap/tidb/pkg/statistics/handle" - statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util" - "github.com/pingcap/tidb/pkg/table" - pumpcli "github.com/pingcap/tidb/pkg/tidb-binlog/pump_client" - tidbutil "github.com/pingcap/tidb/pkg/util" - "github.com/pingcap/tidb/pkg/util/dbterror" - "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" - "github.com/pingcap/tidb/pkg/util/gcutil" - "github.com/pingcap/tidb/pkg/util/logutil" - "github.com/pingcap/tidb/pkg/util/syncutil" ->>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870)):pkg/ddl/ddl.go "github.com/tikv/client-go/v2/tikvrpc" clientv3 "go.etcd.io/etcd/client/v3" atomicutil "go.uber.org/atomic" @@ -391,12 +356,9 @@ type ddlCtx struct { statsHandle *handle.Handle tableLockCkr util.DeadTableLockChecker etcdCli *clientv3.Client -<<<<<<< HEAD:ddl/ddl.go + autoidCli *autoid.ClientDiscover // backfillJobCh gets notification if any backfill jobs coming. backfillJobCh chan struct{} -======= - autoidCli *autoid.ClientDiscover ->>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870)):pkg/ddl/ddl.go *waitSchemaSyncedController *schemaVersionManager diff --git a/ddl/options.go b/ddl/options.go index 690cb29c44455..1355808e831a2 100644 --- a/ddl/options.go +++ b/ddl/options.go @@ -17,14 +17,9 @@ package ddl import ( "time" -<<<<<<< HEAD:ddl/options.go "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" -======= - "github.com/pingcap/tidb/pkg/infoschema" - "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/meta/autoid" ->>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870)):pkg/ddl/options.go + "github.com/pingcap/tidb/meta/autoid" clientv3 "go.etcd.io/etcd/client/v3" ) diff --git a/domain/BUILD.bazel b/domain/BUILD.bazel index d1001013a1953..af65cb71b4dd1 100644 --- a/domain/BUILD.bazel +++ b/domain/BUILD.bazel @@ -40,6 +40,7 @@ go_library( "//keyspace", "//kv", "//meta", + "//meta/autoid", "//metrics", "//owner", "//parser/ast", diff --git a/domain/domain.go b/domain/domain.go index d195ca78be2ab..d7ce5b07632d3 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -36,7 +36,6 @@ import ( "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/br/pkg/streamhelper" "github.com/pingcap/tidb/br/pkg/streamhelper/daemon" -<<<<<<< HEAD:domain/domain.go "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/ddl/placement" @@ -53,6 +52,7 @@ import ( "github.com/pingcap/tidb/keyspace" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/parser/ast" @@ -84,62 +84,6 @@ import ( "github.com/pingcap/tidb/util/servermemorylimit" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/syncutil" -======= - "github.com/pingcap/tidb/pkg/bindinfo" - "github.com/pingcap/tidb/pkg/config" - "github.com/pingcap/tidb/pkg/ddl" - "github.com/pingcap/tidb/pkg/ddl/placement" - "github.com/pingcap/tidb/pkg/ddl/schematracker" - ddlutil "github.com/pingcap/tidb/pkg/ddl/util" - "github.com/pingcap/tidb/pkg/disttask/framework/dispatcher" - "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" - "github.com/pingcap/tidb/pkg/disttask/framework/storage" - "github.com/pingcap/tidb/pkg/domain/globalconfigsync" - "github.com/pingcap/tidb/pkg/domain/infosync" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" - "github.com/pingcap/tidb/pkg/errno" - "github.com/pingcap/tidb/pkg/infoschema" - infoschema_metrics "github.com/pingcap/tidb/pkg/infoschema/metrics" - "github.com/pingcap/tidb/pkg/infoschema/perfschema" - "github.com/pingcap/tidb/pkg/keyspace" - "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/meta" - "github.com/pingcap/tidb/pkg/meta/autoid" - "github.com/pingcap/tidb/pkg/metrics" - "github.com/pingcap/tidb/pkg/owner" - "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/parser/terror" - "github.com/pingcap/tidb/pkg/privilege/privileges" - "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/sessionstates" - "github.com/pingcap/tidb/pkg/sessionctx/variable" - "github.com/pingcap/tidb/pkg/statistics/handle" - statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" - "github.com/pingcap/tidb/pkg/store/helper" - "github.com/pingcap/tidb/pkg/telemetry" - "github.com/pingcap/tidb/pkg/ttl/ttlworker" - "github.com/pingcap/tidb/pkg/types" - "github.com/pingcap/tidb/pkg/util" - "github.com/pingcap/tidb/pkg/util/dbterror" - disttaskutil "github.com/pingcap/tidb/pkg/util/disttask" - "github.com/pingcap/tidb/pkg/util/domainutil" - "github.com/pingcap/tidb/pkg/util/engine" - "github.com/pingcap/tidb/pkg/util/etcd" - "github.com/pingcap/tidb/pkg/util/expensivequery" - "github.com/pingcap/tidb/pkg/util/gctuner" - "github.com/pingcap/tidb/pkg/util/globalconn" - "github.com/pingcap/tidb/pkg/util/intest" - "github.com/pingcap/tidb/pkg/util/logutil" - "github.com/pingcap/tidb/pkg/util/memory" - "github.com/pingcap/tidb/pkg/util/memoryusagealarm" - "github.com/pingcap/tidb/pkg/util/replayer" - "github.com/pingcap/tidb/pkg/util/servermemorylimit" - "github.com/pingcap/tidb/pkg/util/sqlexec" - "github.com/pingcap/tidb/pkg/util/sqlkiller" - "github.com/pingcap/tidb/pkg/util/syncutil" ->>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870)):pkg/domain/domain.go "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv/transaction" pd "github.com/tikv/pd/client" @@ -1150,6 +1094,7 @@ func (do *Domain) Init( etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec())) do.etcdClient = cli + do.autoidClient = autoid.NewClientDiscover(cli) do.autoidClient = autoid.NewClientDiscover(cli) diff --git a/executor/importer/table_import.go b/executor/importer/table_import.go index 8811679c29dd6..b750657a4421e 100644 --- a/executor/importer/table_import.go +++ b/executor/importer/table_import.go @@ -393,8 +393,9 @@ func (ti *TableImporter) PopulateChunks(ctx context.Context) (map[int32]*checkpo ti.logger.Error("close etcd client error", zap.Error(err)) } }() + autoidCli := autoid.NewClientDiscover(etcdCli) - r := &asAutoIDRequirement{ti.kvStore, etcdCli} + r := &asAutoIDRequirement{ti.kvStore, autoidCli} // todo: the new base should be the max row id of the last Node if we support distributed import. if err = common.RebaseGlobalAutoID(ctx, 0, r, ti.dbID, ti.tableInfo.Core); err != nil { return nil, errors.Trace(err) @@ -412,8 +413,8 @@ func (ti *TableImporter) PopulateChunks(ctx context.Context) (map[int32]*checkpo } type asAutoIDRequirement struct { - kvStore tidbkv.Storage - etcdCli *clientv3.Client + kvStore tidbkv.Storage + autoidCli *autoid.ClientDiscover } var _ autoid.Requirement = &asAutoIDRequirement{} @@ -422,8 +423,8 @@ func (r *asAutoIDRequirement) Store() tidbkv.Storage { return r.kvStore } -func (r *asAutoIDRequirement) GetEtcdClient() *clientv3.Client { - return r.etcdCli +func (r *asAutoIDRequirement) AutoIDClient() *autoid.ClientDiscover { + return r.autoidCli } func (ti *TableImporter) rebaseChunkRowID(rowIDBase int64) { diff --git a/infoschema/BUILD.bazel b/infoschema/BUILD.bazel index 11de5e7d34556..5240f2afe4e94 100644 --- a/infoschema/BUILD.bazel +++ b/infoschema/BUILD.bazel @@ -123,7 +123,6 @@ go_test( "@com_github_prometheus_prometheus//promql", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//testutils", - "@io_etcd_go_etcd_client_v3//:client", "@org_golang_google_grpc//:grpc", "@org_uber_go_goleak//:goleak", ], diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index 7cc65567a1ee7..4c14de760043e 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -37,7 +37,6 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/stretchr/testify/require" - clientv3 "go.etcd.io/etcd/client/v3" ) func TestBasic(t *testing.T) { @@ -553,7 +552,7 @@ func (r mockRequirement) Store() kv.Storage { return r.Storage } -func (r mockRequirement) GetEtcdClient() *clientv3.Client { +func (r mockRequirement) AutoIDClient() *autoid.ClientDiscover { return nil } diff --git a/meta/autoid/BUILD.bazel b/meta/autoid/BUILD.bazel index a50e918fb8368..e653a904ce7ca 100644 --- a/meta/autoid/BUILD.bazel +++ b/meta/autoid/BUILD.bazel @@ -63,7 +63,6 @@ go_test( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", - "@io_etcd_go_etcd_client_v3//:client", "@org_uber_go_goleak//:goleak", ], ) diff --git a/meta/autoid/autoid.go b/meta/autoid/autoid.go index c8227f2d6348b..6db418149492b 100644 --- a/meta/autoid/autoid.go +++ b/meta/autoid/autoid.go @@ -39,7 +39,6 @@ import ( "github.com/pingcap/tidb/util/tracing" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" tikvutil "github.com/tikv/client-go/v2/util" - clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -567,37 +566,18 @@ func NextStep(curStep int64, consumeDur time.Duration) int64 { // package circle depending issue. var MockForTest func(kv.Storage) autoid.AutoIDAllocClient -func newSinglePointAlloc(store kv.Storage, dbID, tblID int64, isUnsigned bool) *singlePointAlloc { - ebd, ok := store.(kv.EtcdBackend) - if !ok { - // newSinglePointAlloc fail because not etcd background - // This could happen in the server package unit test - return nil - } - - addrs, err := ebd.EtcdAddrs() - if err != nil { - panic(err) - } +func newSinglePointAlloc(r Requirement, dbID, tblID int64, isUnsigned bool) *singlePointAlloc { spa := &singlePointAlloc{ dbID: dbID, tblID: tblID, isUnsigned: isUnsigned, } - if len(addrs) > 0 { - etcdCli, err := clientv3.New(clientv3.Config{ - Endpoints: addrs, - TLS: ebd.TLSConfig(), - AutoSyncInterval: 30 * time.Second, - }) - if err != nil { - logutil.BgLogger().Error("[autoid client] fail to connect etcd, fallback to default", zap.Error(err)) - return nil - } - spa.clientDiscover = clientDiscover{etcdCli: etcdCli} + if r.AutoIDClient() == nil { + // Only for test in mockstore + spa.ClientDiscover = &ClientDiscover{} + spa.mu.AutoIDAllocClient = MockForTest(r.Store()) } else { - spa.clientDiscover = clientDiscover{} - spa.mu.AutoIDAllocClient = MockForTest(store) + spa.ClientDiscover = r.AutoIDClient() } // mockAutoIDChange failpoint is not implemented in this allocator, so fallback to use the default one. @@ -609,9 +589,19 @@ func newSinglePointAlloc(store kv.Storage, dbID, tblID int64, isUnsigned bool) * return spa } +// Requirement is the parameter required by NewAllocator +type Requirement interface { + Store() kv.Storage + AutoIDClient() *ClientDiscover +} + // NewAllocator returns a new auto increment id generator on the store. -func NewAllocator(store kv.Storage, dbID, tbID int64, isUnsigned bool, +func NewAllocator(r Requirement, dbID, tbID int64, isUnsigned bool, allocType AllocatorType, opts ...AllocOption) Allocator { + var store kv.Storage + if r != nil { + store = r.Store() + } alloc := &allocator{ store: store, dbID: dbID, @@ -628,7 +618,7 @@ func NewAllocator(store kv.Storage, dbID, tbID int64, isUnsigned bool, // Use the MySQL compatible AUTO_INCREMENT mode. if alloc.customStep && alloc.step == 1 && alloc.tbVersion >= model.TableInfoVersion5 { if allocType == AutoIncrementType { - alloc1 := newSinglePointAlloc(store, dbID, tbID, isUnsigned) + alloc1 := newSinglePointAlloc(r, dbID, tbID, isUnsigned) if alloc1 != nil { return alloc1 } @@ -658,7 +648,7 @@ func NewSequenceAllocator(store kv.Storage, dbID, tbID int64, info *model.Sequen } // NewAllocatorsFromTblInfo creates an array of allocators of different types with the information of model.TableInfo. -func NewAllocatorsFromTblInfo(store kv.Storage, schemaID int64, tblInfo *model.TableInfo) Allocators { +func NewAllocatorsFromTblInfo(r Requirement, schemaID int64, tblInfo *model.TableInfo) Allocators { var allocs []Allocator dbID := tblInfo.GetDBID(schemaID) idCacheOpt := CustomAutoIncCacheOption(tblInfo.AutoIdCache) @@ -667,20 +657,20 @@ func NewAllocatorsFromTblInfo(store kv.Storage, schemaID int64, tblInfo *model.T hasRowID := !tblInfo.PKIsHandle && !tblInfo.IsCommonHandle hasAutoIncID := tblInfo.GetAutoIncrementColInfo() != nil if hasRowID || hasAutoIncID { - alloc := NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), RowIDAllocType, idCacheOpt, tblVer) + alloc := NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), RowIDAllocType, idCacheOpt, tblVer) allocs = append(allocs, alloc) } if hasAutoIncID { - alloc := NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), AutoIncrementType, idCacheOpt, tblVer) + alloc := NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), AutoIncrementType, idCacheOpt, tblVer) allocs = append(allocs, alloc) } hasAutoRandID := tblInfo.ContainsAutoRandomBits() if hasAutoRandID { - alloc := NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), AutoRandomType, idCacheOpt, tblVer) + alloc := NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), AutoRandomType, idCacheOpt, tblVer) allocs = append(allocs, alloc) } if tblInfo.IsSequence() { - allocs = append(allocs, NewSequenceAllocator(store, dbID, tblInfo.ID, tblInfo.Sequence)) + allocs = append(allocs, NewSequenceAllocator(r.Store(), dbID, tblInfo.ID, tblInfo.Sequence)) } return NewAllocators(tblInfo.SepAutoInc(), allocs...) } diff --git a/meta/autoid/autoid_service.go b/meta/autoid/autoid_service.go index c65de732ff613..682317b32e3de 100644 --- a/meta/autoid/autoid_service.go +++ b/meta/autoid/autoid_service.go @@ -40,12 +40,7 @@ type singlePointAlloc struct { tblID int64 lastAllocated int64 isUnsigned bool -<<<<<<< HEAD:meta/autoid/autoid_service.go - clientDiscover -======= *ClientDiscover - keyspaceID uint32 ->>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870)):pkg/meta/autoid/autoid_service.go } // ClientDiscover is used to get the AutoIDAllocClient, it creates the grpc connection with autoid service leader. @@ -169,11 +164,6 @@ retry: const backoffDuration = 200 * time.Millisecond -<<<<<<< HEAD:meta/autoid/autoid_service.go -func (sp *singlePointAlloc) resetConn(reason error) { - logutil.BgLogger().Info("[autoid client] reset grpc connection", - zap.String("reason", reason.Error())) -======= // ResetConn reset the AutoIDAllocClient and underlying grpc connection. // The next GetClient() call will recreate the client connecting to the correct leader by querying etcd. func (d *ClientDiscover) ResetConn(reason error) { @@ -181,7 +171,6 @@ func (d *ClientDiscover) ResetConn(reason error) { logutil.BgLogger().Info("reset grpc connection", zap.String("category", "autoid client"), zap.String("reason", reason.Error())) } ->>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870)):pkg/meta/autoid/autoid_service.go var grpcConn *grpc.ClientConn d.mu.Lock() grpcConn = d.mu.ClientConn diff --git a/meta/autoid/autoid_test.go b/meta/autoid/autoid_test.go index 912b1e173bc8c..85a4e1a610935 100644 --- a/meta/autoid/autoid_test.go +++ b/meta/autoid/autoid_test.go @@ -35,6 +35,18 @@ import ( "github.com/stretchr/testify/require" ) +type mockRequirement struct { + kv.Storage +} + +func (r mockRequirement) Store() kv.Storage { + return r.Storage +} + +func (r mockRequirement) AutoIDClient() *autoid.ClientDiscover { + return nil +} + func TestSignedAutoid(t *testing.T) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/meta/autoid/mockAutoIDChange", `return(true)`)) defer func() { diff --git a/pkg/disttask/importinto/subtask_executor.go b/pkg/disttask/importinto/subtask_executor.go deleted file mode 100644 index 9eb203b2f9514..0000000000000 --- a/pkg/disttask/importinto/subtask_executor.go +++ /dev/null @@ -1,307 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package importinto - -import ( - "context" - "net" - "strconv" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/br/pkg/lightning/backend" - "github.com/pingcap/tidb/br/pkg/lightning/backend/local" - "github.com/pingcap/tidb/br/pkg/lightning/common" - "github.com/pingcap/tidb/br/pkg/lightning/config" - "github.com/pingcap/tidb/br/pkg/lightning/log" - verify "github.com/pingcap/tidb/br/pkg/lightning/verification" - tidb "github.com/pingcap/tidb/pkg/config" - "github.com/pingcap/tidb/pkg/disttask/framework/proto" - "github.com/pingcap/tidb/pkg/disttask/framework/storage" - "github.com/pingcap/tidb/pkg/executor/importer" - "github.com/pingcap/tidb/pkg/keyspace" - "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/meta/autoid" - "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/variable" - "github.com/pingcap/tidb/pkg/util/etcd" - "github.com/pingcap/tidb/pkg/util/logutil" - "github.com/pingcap/tidb/pkg/util/mathutil" - "github.com/tikv/client-go/v2/util" - clientv3 "go.etcd.io/etcd/client/v3" - "go.uber.org/zap" -) - -// TestSyncChan is used to test. -var TestSyncChan = make(chan struct{}) - -// MiniTaskExecutor is the interface for a minimal task executor. -// exported for testing. -type MiniTaskExecutor interface { - Run(ctx context.Context, dataWriter, indexWriter backend.EngineWriter) error -} - -// importMinimalTaskExecutor is a minimal task executor for IMPORT INTO. -type importMinimalTaskExecutor struct { - mTtask *importStepMinimalTask -} - -var newImportMinimalTaskExecutor = newImportMinimalTaskExecutor0 - -func newImportMinimalTaskExecutor0(t *importStepMinimalTask) MiniTaskExecutor { - return &importMinimalTaskExecutor{ - mTtask: t, - } -} - -func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWriter backend.EngineWriter) error { - logger := logutil.BgLogger().With(zap.Stringer("type", proto.ImportInto), zap.Int64("table-id", e.mTtask.Plan.TableInfo.ID)) - logger.Info("execute chunk") - failpoint.Inject("waitBeforeSortChunk", func() { - time.Sleep(3 * time.Second) - }) - failpoint.Inject("errorWhenSortChunk", func() { - failpoint.Return(errors.New("occur an error when sort chunk")) - }) - failpoint.Inject("syncBeforeSortChunk", func() { - TestSyncChan <- struct{}{} - <-TestSyncChan - }) - chunkCheckpoint := toChunkCheckpoint(e.mTtask.Chunk) - sharedVars := e.mTtask.SharedVars - if sharedVars.TableImporter.IsLocalSort() { - if err := importer.ProcessChunk(ctx, &chunkCheckpoint, sharedVars.TableImporter, sharedVars.DataEngine, sharedVars.IndexEngine, sharedVars.Progress, logger); err != nil { - return err - } - } else { - if err := importer.ProcessChunkWith(ctx, &chunkCheckpoint, sharedVars.TableImporter, dataWriter, indexWriter, sharedVars.Progress, logger); err != nil { - return err - } - } - - sharedVars.mu.Lock() - defer sharedVars.mu.Unlock() - sharedVars.Checksum.Add(&chunkCheckpoint.Checksum) - return nil -} - -// postProcess does the post-processing for the task. -func postProcess(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) (err error) { - failpoint.Inject("syncBeforePostProcess", func() { - TestSyncChan <- struct{}{} - <-TestSyncChan - }) - - callLog := log.BeginTask(logger, "post process") - defer func() { - callLog.End(zap.ErrorLevel, err) - }() - - if err = rebaseAllocatorBases(ctx, taskMeta, subtaskMeta, logger); err != nil { - return err - } - - // TODO: create table indexes depends on the option. - // create table indexes even if the post process is failed. - // defer func() { - // err2 := createTableIndexes(ctx, globalTaskManager, taskMeta, logger) - // err = multierr.Append(err, err2) - // }() - - return verifyChecksum(ctx, taskMeta, subtaskMeta, logger) -} - -func verifyChecksum(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) error { - if taskMeta.Plan.Checksum == config.OpLevelOff { - return nil - } - localChecksum := verify.MakeKVChecksum(subtaskMeta.Checksum.Size, subtaskMeta.Checksum.KVs, subtaskMeta.Checksum.Sum) - logger.Info("local checksum", zap.Object("checksum", &localChecksum)) - - failpoint.Inject("waitCtxDone", func() { - <-ctx.Done() - }) - - globalTaskManager, err := storage.GetTaskManager() - ctx = util.WithInternalSourceType(ctx, kv.InternalDistTask) - if err != nil { - return err - } - remoteChecksum, err := checksumTable(ctx, globalTaskManager, taskMeta, logger) - if err != nil { - if taskMeta.Plan.Checksum != config.OpLevelOptional { - return err - } - logger.Warn("checksumTable failed, will skip this error and go on", zap.Error(err)) - } - if remoteChecksum != nil { - if !remoteChecksum.IsEqual(&localChecksum) { - err2 := common.ErrChecksumMismatch.GenWithStackByArgs( - remoteChecksum.Checksum, localChecksum.Sum(), - remoteChecksum.TotalKVs, localChecksum.SumKVS(), - remoteChecksum.TotalBytes, localChecksum.SumSize(), - ) - if taskMeta.Plan.Checksum == config.OpLevelOptional { - logger.Warn("verify checksum failed, but checksum is optional, will skip it", zap.Error(err2)) - err2 = nil - } - return err2 - } - logger.Info("checksum pass", zap.Object("local", &localChecksum)) - } - return nil -} - -func checksumTable(ctx context.Context, executor storage.SessionExecutor, taskMeta *TaskMeta, logger *zap.Logger) (*local.RemoteChecksum, error) { - var ( - tableName = common.UniqueTable(taskMeta.Plan.DBName, taskMeta.Plan.TableInfo.Name.L) - sql = "ADMIN CHECKSUM TABLE " + tableName - maxErrorRetryCount = 3 - distSQLScanConcurrencyFactor = 1 - remoteChecksum *local.RemoteChecksum - txnErr error - ) - - ctx = util.WithInternalSourceType(ctx, kv.InternalImportInto) - for i := 0; i < maxErrorRetryCount; i++ { - txnErr = executor.WithNewTxn(ctx, func(se sessionctx.Context) error { - // increase backoff weight - if err := setBackoffWeight(se, taskMeta, logger); err != nil { - logger.Warn("set tidb_backoff_weight failed", zap.Error(err)) - } - - distSQLScanConcurrency := se.GetSessionVars().DistSQLScanConcurrency() - se.GetSessionVars().SetDistSQLScanConcurrency(mathutil.Max(distSQLScanConcurrency/distSQLScanConcurrencyFactor, local.MinDistSQLScanConcurrency)) - defer func() { - se.GetSessionVars().SetDistSQLScanConcurrency(distSQLScanConcurrency) - }() - - // TODO: add resource group name - - rs, err := storage.ExecSQL(ctx, se, sql) - if err != nil { - return err - } - if len(rs) < 1 { - return errors.New("empty checksum result") - } - - failpoint.Inject("errWhenChecksum", func() { - if i == 0 { - failpoint.Return(errors.New("occur an error when checksum, coprocessor task terminated due to exceeding the deadline")) - } - }) - - // ADMIN CHECKSUM TABLE . example. - // mysql> admin checksum table test.t; - // +---------+------------+---------------------+-----------+-------------+ - // | Db_name | Table_name | Checksum_crc64_xor | Total_kvs | Total_bytes | - // +---------+------------+---------------------+-----------+-------------+ - // | test | t | 8520875019404689597 | 7296873 | 357601387 | - // +---------+------------+------------- - remoteChecksum = &local.RemoteChecksum{ - Schema: rs[0].GetString(0), - Table: rs[0].GetString(1), - Checksum: rs[0].GetUint64(2), - TotalKVs: rs[0].GetUint64(3), - TotalBytes: rs[0].GetUint64(4), - } - return nil - }) - if !common.IsRetryableError(txnErr) { - break - } - distSQLScanConcurrencyFactor *= 2 - logger.Warn("retry checksum table", zap.Int("retry count", i+1), zap.Error(txnErr)) - } - return remoteChecksum, txnErr -} - -// TestChecksumTable is used to test checksum table in unit test. -func TestChecksumTable(ctx context.Context, executor storage.SessionExecutor, taskMeta *TaskMeta, logger *zap.Logger) (*local.RemoteChecksum, error) { - return checksumTable(ctx, executor, taskMeta, logger) -} - -func setBackoffWeight(se sessionctx.Context, taskMeta *TaskMeta, logger *zap.Logger) error { - backoffWeight := local.DefaultBackoffWeight - if val, ok := taskMeta.Plan.ImportantSysVars[variable.TiDBBackOffWeight]; ok { - if weight, err := strconv.Atoi(val); err == nil && weight > backoffWeight { - backoffWeight = weight - } - } - logger.Info("set backoff weight", zap.Int("weight", backoffWeight)) - return se.GetSessionVars().SetSystemVar(variable.TiDBBackOffWeight, strconv.Itoa(backoffWeight)) -} - -type autoIDRequirement struct { - store kv.Storage - autoidCli *autoid.ClientDiscover -} - -func (r *autoIDRequirement) Store() kv.Storage { - return r.store -} - -func (r *autoIDRequirement) AutoIDClient() *autoid.ClientDiscover { - return r.autoidCli -} - -func rebaseAllocatorBases(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) (err error) { - callLog := log.BeginTask(logger, "rebase allocators") - defer func() { - callLog.End(zap.ErrorLevel, err) - }() - - if !common.TableHasAutoID(taskMeta.Plan.DesiredTableInfo) { - return nil - } - - tidbCfg := tidb.GetGlobalConfig() - hostPort := net.JoinHostPort("127.0.0.1", strconv.Itoa(int(tidbCfg.Status.StatusPort))) - tls, err2 := common.NewTLS( - tidbCfg.Security.ClusterSSLCA, - tidbCfg.Security.ClusterSSLCert, - tidbCfg.Security.ClusterSSLKey, - hostPort, - nil, nil, nil, - ) - if err2 != nil { - return err2 - } - - // no need to close kvStore, since it's a cached store. - kvStore, err2 := importer.GetCachedKVStoreFrom(tidbCfg.Path, tls) - if err2 != nil { - return errors.Trace(err2) - } - etcdCli, err := clientv3.New(clientv3.Config{ - Endpoints: []string{tidbCfg.Path}, - AutoSyncInterval: 30 * time.Second, - TLS: tls.TLSConfig(), - }) - if err != nil { - return errors.Trace(err) - } - etcd.SetEtcdCliByNamespace(etcdCli, keyspace.MakeKeyspaceEtcdNamespace(kvStore.GetCodec())) - autoidCli := autoid.NewClientDiscover(etcdCli) - r := autoIDRequirement{store: kvStore, autoidCli: autoidCli} - err = common.RebaseTableAllocators(ctx, subtaskMeta.MaxIDs, &r, taskMeta.Plan.DBID, taskMeta.Plan.DesiredTableInfo) - if err1 := etcdCli.Close(); err1 != nil { - logger.Info("close etcd client error", zap.Error(err1)) - } - autoidCli.ResetConn(nil) - return errors.Trace(err) -} diff --git a/pkg/domain/BUILD.bazel b/pkg/domain/BUILD.bazel deleted file mode 100644 index 4f588f1209827..0000000000000 --- a/pkg/domain/BUILD.bazel +++ /dev/null @@ -1,169 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "domain", - srcs = [ - "domain.go", - "domain_sysvars.go", - "domainctx.go", - "extract.go", - "historical_stats.go", - "optimize_trace.go", - "plan_replayer.go", - "plan_replayer_dump.go", - "runaway.go", - "schema_checker.go", - "schema_validator.go", - "sysvar_cache.go", - "test_helper.go", - "topn_slow_query.go", - ], - importpath = "github.com/pingcap/tidb/pkg/domain", - visibility = ["//visibility:public"], - deps = [ - "//br/pkg/streamhelper", - "//br/pkg/streamhelper/daemon", - "//pkg/bindinfo", - "//pkg/config", - "//pkg/ddl", - "//pkg/ddl/placement", - "//pkg/ddl/schematracker", - "//pkg/ddl/util", - "//pkg/disttask/framework/dispatcher", - "//pkg/disttask/framework/scheduler", - "//pkg/disttask/framework/storage", - "//pkg/domain/globalconfigsync", - "//pkg/domain/infosync", - "//pkg/domain/metrics", - "//pkg/domain/resourcegroup", - "//pkg/errno", - "//pkg/infoschema", - "//pkg/infoschema/metrics", - "//pkg/infoschema/perfschema", - "//pkg/keyspace", - "//pkg/kv", - "//pkg/meta", - "//pkg/meta/autoid", - "//pkg/metrics", - "//pkg/owner", - "//pkg/parser/ast", - "//pkg/parser/model", - "//pkg/parser/mysql", - "//pkg/parser/terror", - "//pkg/privilege/privileges", - "//pkg/sessionctx", - "//pkg/sessionctx/sessionstates", - "//pkg/sessionctx/variable", - "//pkg/statistics/handle", - "//pkg/statistics/handle/logutil", - "//pkg/statistics/handle/util", - "//pkg/store/helper", - "//pkg/telemetry", - "//pkg/ttl/cache", - "//pkg/ttl/sqlbuilder", - "//pkg/ttl/ttlworker", - "//pkg/types", - "//pkg/util", - "//pkg/util/chunk", - "//pkg/util/dbterror", - "//pkg/util/disttask", - "//pkg/util/domainutil", - "//pkg/util/engine", - "//pkg/util/etcd", - "//pkg/util/execdetails", - "//pkg/util/expensivequery", - "//pkg/util/gctuner", - "//pkg/util/globalconn", - "//pkg/util/intest", - "//pkg/util/logutil", - "//pkg/util/memory", - "//pkg/util/memoryusagealarm", - "//pkg/util/printer", - "//pkg/util/replayer", - "//pkg/util/servermemorylimit", - "//pkg/util/sqlexec", - "//pkg/util/sqlkiller", - "//pkg/util/syncutil", - "@com_github_burntsushi_toml//:toml", - "@com_github_ngaut_pools//:pools", - "@com_github_pingcap_errors//:errors", - "@com_github_pingcap_failpoint//:failpoint", - "@com_github_pingcap_kvproto//pkg/metapb", - "@com_github_pingcap_kvproto//pkg/pdpb", - "@com_github_pingcap_kvproto//pkg/resource_manager", - "@com_github_pingcap_log//:log", - "@com_github_stretchr_testify//require", - "@com_github_tikv_client_go_v2//oracle", - "@com_github_tikv_client_go_v2//tikv", - "@com_github_tikv_client_go_v2//txnkv/transaction", - "@com_github_tikv_pd_client//:client", - "@com_github_tikv_pd_client//http", - "@com_github_tikv_pd_client//resource_group/controller", - "@io_etcd_go_etcd_client_v3//:client", - "@io_etcd_go_etcd_client_v3//concurrency", - "@org_golang_google_grpc//:grpc", - "@org_golang_google_grpc//backoff", - "@org_golang_google_grpc//keepalive", - "@org_golang_x_exp//maps", - "@org_uber_go_atomic//:atomic", - "@org_uber_go_zap//:zap", - ], -) - -go_test( - name = "domain_test", - timeout = "short", - srcs = [ - "db_test.go", - "domain_test.go", - "domain_utils_test.go", - "domainctx_test.go", - "extract_test.go", - "main_test.go", - "plan_replayer_handle_test.go", - "plan_replayer_test.go", - "schema_checker_test.go", - "schema_validator_test.go", - "session_pool_test.go", - "topn_slow_query_test.go", - ], - embed = [":domain"], - flaky = True, - shard_count = 23, - deps = [ - "//pkg/config", - "//pkg/ddl", - "//pkg/domain/infosync", - "//pkg/errno", - "//pkg/keyspace", - "//pkg/kv", - "//pkg/metrics", - "//pkg/parser/ast", - "//pkg/parser/auth", - "//pkg/parser/model", - "//pkg/parser/mysql", - "//pkg/parser/terror", - "//pkg/server", - "//pkg/session", - "//pkg/sessionctx/variable", - "//pkg/store/mockstore", - "//pkg/testkit", - "//pkg/testkit/testsetup", - "//pkg/types", - "//pkg/util", - "//pkg/util/mock", - "//pkg/util/replayer", - "//pkg/util/stmtsummary/v2:stmtsummary", - "@com_github_ngaut_pools//:pools", - "@com_github_pingcap_errors//:errors", - "@com_github_pingcap_failpoint//:failpoint", - "@com_github_pingcap_kvproto//pkg/metapb", - "@com_github_prometheus_client_model//go", - "@com_github_stretchr_testify//require", - "@com_github_tikv_client_go_v2//oracle", - "@com_github_tikv_client_go_v2//txnkv/transaction", - "@com_github_tikv_pd_client//:client", - "@io_etcd_go_etcd_tests_v3//integration", - "@org_uber_go_goleak//:goleak", - ], -)