From ca2235f46131337a6e262d19eddbbf209f9071ac Mon Sep 17 00:00:00 2001 From: ystaticy Date: Thu, 2 Feb 2023 14:29:56 +0800 Subject: [PATCH 1/4] keyspace: gc delete range (#40639) ref pingcap/tidb#40848 --- ddl/attributes_sql_test.go | 12 ++-- ddl/main_test.go | 2 +- domain/db_test.go | 4 +- domain/domain.go | 87 ++++++++++++++---------- domain/infosync/info.go | 37 +++++----- domain/infosync/info_test.go | 6 +- server/stat_test.go | 2 +- store/driver/tikv_driver.go | 4 -- store/gcworker/gc_worker.go | 127 +++++++++++++++++++++++++++++++++++ 9 files changed, 215 insertions(+), 66 deletions(-) diff --git a/ddl/attributes_sql_test.go b/ddl/attributes_sql_test.go index 95f881e6fb3fe..2d2685ffd06b2 100644 --- a/ddl/attributes_sql_test.go +++ b/ddl/attributes_sql_test.go @@ -269,7 +269,7 @@ PARTITION BY RANGE (c) ( func TestFlashbackTable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -327,7 +327,7 @@ PARTITION BY RANGE (c) ( func TestDropTable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -380,7 +380,7 @@ PARTITION BY RANGE (c) ( func TestCreateWithSameName(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -444,7 +444,7 @@ PARTITION BY RANGE (c) ( func TestPartition(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -504,7 +504,7 @@ PARTITION BY RANGE (c) ( func TestDropSchema(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -530,7 +530,7 @@ PARTITION BY RANGE (c) ( func TestDefaultKeyword(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") diff --git a/ddl/main_test.go b/ddl/main_test.go index 6a8642ae34380..a10374b04f0f2 100644 --- a/ddl/main_test.go +++ b/ddl/main_test.go @@ -52,7 +52,7 @@ func TestMain(m *testing.M) { conf.Experimental.AllowsExpressionIndex = true }) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err) os.Exit(1) diff --git a/domain/db_test.go b/domain/db_test.go index ff6e6b625788a..9b122664f8397 100644 --- a/domain/db_test.go +++ b/domain/db_test.go @@ -73,7 +73,7 @@ func TestNormalSessionPool(t *testing.T) { domain, err := session.BootstrapSession(store) require.NoError(t, err) defer domain.Close() - info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, true) + info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true) require.NoError(t, err1) conf := config.GetGlobalConfig() conf.Socket = "" @@ -107,7 +107,7 @@ func TestAbnormalSessionPool(t *testing.T) { domain, err := session.BootstrapSession(store) require.NoError(t, err) defer domain.Close() - info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, true) + info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true) require.NoError(t, err1) conf := config.GetGlobalConfig() conf.Socket = "" diff --git a/domain/domain.go b/domain/domain.go index 0d06fdc5a7a13..3f1c833b148c0 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -98,20 +98,26 @@ func NewMockDomain() *Domain { // Domain represents a storage space. Different domains can use the same database name. // Multiple domains can be used in parallel without synchronization. type Domain struct { - store kv.Storage - infoCache *infoschema.InfoCache - privHandle *privileges.Handle - bindHandle atomic.Pointer[bindinfo.BindHandle] - statsHandle unsafe.Pointer - statsLease time.Duration - ddl ddl.DDL - info *infosync.InfoSyncer - globalCfgSyncer *globalconfigsync.GlobalConfigSyncer - m sync.Mutex - SchemaValidator SchemaValidator - sysSessionPool *sessionPool - exit chan struct{} - etcdClient *clientv3.Client + store kv.Storage + infoCache *infoschema.InfoCache + privHandle *privileges.Handle + bindHandle atomic.Pointer[bindinfo.BindHandle] + statsHandle unsafe.Pointer + statsLease time.Duration + ddl ddl.DDL + info *infosync.InfoSyncer + globalCfgSyncer *globalconfigsync.GlobalConfigSyncer + m sync.Mutex + SchemaValidator SchemaValidator + sysSessionPool *sessionPool + exit chan struct{} + // `etcdClient` must be used when keyspace is not set, or when the logic to each etcd path needs to be separated by keyspace. + etcdClient *clientv3.Client + // `unprefixedEtcdCli` will never set the etcd namespace prefix by keyspace. + // It is only used in storeMinStartTS and RemoveMinStartTS now. + // It must be used when the etcd path isn't needed to separate by keyspace. + // See keyspace RFC: https://github.com/pingcap/tidb/pull/39685 + unprefixedEtcdCli *clientv3.Client sysVarCache sysVarCache // replaces GlobalVariableCache slowQuery *topNSlowQueries expensiveQueryHandle *expensivequery.Handle @@ -881,6 +887,10 @@ func (do *Domain) Close() { terror.Log(errors.Trace(do.etcdClient.Close())) } + if do.unprefixedEtcdCli != nil { + terror.Log(errors.Trace(do.unprefixedEtcdCli.Close())) + } + do.slowQuery.Close() if do.cancel != nil { do.cancel() @@ -927,6 +937,27 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio const serverIDForStandalone = 1 // serverID for standalone deployment. +func newEtcdCli(addrs []string, ebd kv.EtcdBackend) (*clientv3.Client, error) { + cfg := config.GetGlobalConfig() + etcdLogCfg := zap.NewProductionConfig() + etcdLogCfg.Level = zap.NewAtomicLevelAt(zap.ErrorLevel) + cli, err := clientv3.New(clientv3.Config{ + LogConfig: &etcdLogCfg, + Endpoints: addrs, + AutoSyncInterval: 30 * time.Second, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{ + grpc.WithBackoffMaxDelay(time.Second * 3), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second, + Timeout: time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second, + }), + }, + TLS: ebd.TLSConfig(), + }) + return cli, err +} + // Init initializes a domain. func (do *Domain) Init( ddlLease time.Duration, @@ -942,25 +973,7 @@ func (do *Domain) Init( return err } if addrs != nil { - cfg := config.GetGlobalConfig() - // silence etcd warn log, when domain closed, it won't randomly print warn log - // see details at the issue https://github.com/pingcap/tidb/issues/15479 - etcdLogCfg := zap.NewProductionConfig() - etcdLogCfg.Level = zap.NewAtomicLevelAt(zap.ErrorLevel) - cli, err := clientv3.New(clientv3.Config{ - LogConfig: &etcdLogCfg, - Endpoints: addrs, - AutoSyncInterval: 30 * time.Second, - DialTimeout: 5 * time.Second, - DialOptions: []grpc.DialOption{ - grpc.WithBackoffMaxDelay(time.Second * 3), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second, - Timeout: time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second, - }), - }, - TLS: ebd.TLSConfig(), - }) + cli, err := newEtcdCli(addrs, ebd) if err != nil { return errors.Trace(err) } @@ -968,6 +981,12 @@ func (do *Domain) Init( etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec())) do.etcdClient = cli + + unprefixedEtcdCli, err := newEtcdCli(addrs, ebd) + if err != nil { + return errors.Trace(err) + } + do.unprefixedEtcdCli = unprefixedEtcdCli } } @@ -1029,7 +1048,7 @@ func (do *Domain) Init( // step 1: prepare the info/schema syncer which domain reload needed. skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard - do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, skipRegisterToDashboard) + do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, do.unprefixedEtcdCli, skipRegisterToDashboard) if err != nil { return err } diff --git a/domain/infosync/info.go b/domain/infosync/info.go index 7732a831b057e..6c9e721959cf9 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -95,12 +95,18 @@ var ErrPrometheusAddrIsNotSet = dbterror.ClassDomain.NewStd(errno.ErrPrometheusA // InfoSyncer stores server info to etcd when the tidb-server starts and delete when tidb-server shuts down. type InfoSyncer struct { - etcdCli *clientv3.Client - info *ServerInfo - serverInfoPath string - minStartTS uint64 - minStartTSPath string - managerMu struct { + // `etcdClient` must be used when keyspace is not set, or when the logic to each etcd path needs to be separated by keyspace. + etcdCli *clientv3.Client + // `unprefixedEtcdCli` will never set the etcd namespace prefix by keyspace. + // It is only used in storeMinStartTS and RemoveMinStartTS now. + // It must be used when the etcd path isn't needed to separate by keyspace. + // See keyspace RFC: https://github.com/pingcap/tidb/pull/39685 + unprefixedEtcdCli *clientv3.Client + info *ServerInfo + serverInfoPath string + minStartTS uint64 + minStartTSPath string + managerMu struct { mu sync.RWMutex util2.SessionManager } @@ -180,12 +186,13 @@ func setGlobalInfoSyncer(is *InfoSyncer) { } // GlobalInfoSyncerInit return a new InfoSyncer. It is exported for testing. -func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() uint64, etcdCli *clientv3.Client, skipRegisterToDashBoard bool) (*InfoSyncer, error) { +func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() uint64, etcdCli *clientv3.Client, unprefixedEtcdCli *clientv3.Client, skipRegisterToDashBoard bool) (*InfoSyncer, error) { is := &InfoSyncer{ - etcdCli: etcdCli, - info: getServerInfo(id, serverIDGetter), - serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id), - minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id), + etcdCli: etcdCli, + unprefixedEtcdCli: unprefixedEtcdCli, + info: getServerInfo(id, serverIDGetter), + serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id), + minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id), } err := is.init(ctx, skipRegisterToDashBoard) if err != nil { @@ -721,20 +728,20 @@ func (is *InfoSyncer) GetMinStartTS() uint64 { // storeMinStartTS stores self server min start timestamp to etcd. func (is *InfoSyncer) storeMinStartTS(ctx context.Context) error { - if is.etcdCli == nil { + if is.unprefixedEtcdCli == nil { return nil } - return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, is.minStartTSPath, + return util.PutKVToEtcd(ctx, is.unprefixedEtcdCli, keyOpDefaultRetryCnt, is.minStartTSPath, strconv.FormatUint(is.minStartTS, 10), clientv3.WithLease(is.session.Lease())) } // RemoveMinStartTS removes self server min start timestamp from etcd. func (is *InfoSyncer) RemoveMinStartTS() { - if is.etcdCli == nil { + if is.unprefixedEtcdCli == nil { return } - err := util.DeleteKeyFromEtcd(is.minStartTSPath, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout) + err := util.DeleteKeyFromEtcd(is.minStartTSPath, is.unprefixedEtcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout) if err != nil { logutil.BgLogger().Error("remove minStartTS failed", zap.Error(err)) } diff --git a/domain/infosync/info_test.go b/domain/infosync/info_test.go index 90a30d8f1f161..3264c0adca3c6 100644 --- a/domain/infosync/info_test.go +++ b/domain/infosync/info_test.go @@ -67,7 +67,7 @@ func TestTopology(t *testing.T) { require.NoError(t, err) }() - info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, false) + info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, false) require.NoError(t, err) err = info.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt) @@ -152,7 +152,7 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) { } func TestPutBundlesRetry(t *testing.T) { - _, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, false) + _, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, false) require.NoError(t, err) bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"}) @@ -216,7 +216,7 @@ func TestPutBundlesRetry(t *testing.T) { func TestTiFlashManager(t *testing.T) { ctx := context.Background() - _, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, false) + _, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, false) tiflash := NewMockTiFlash() SetMockTiFlash(tiflash) diff --git a/server/stat_test.go b/server/stat_test.go index 66c974a3deeea..4484823f6dc83 100644 --- a/server/stat_test.go +++ b/server/stat_test.go @@ -46,7 +46,7 @@ func TestUptime(t *testing.T) { }() require.NoError(t, err) - _, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true) + _, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) require.NoError(t, err) tidbdrv := NewTiDBDriver(store) diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index 8a28b91b6cb2c..21557f09bec2c 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -196,10 +196,6 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (kv.Storage, if err != nil { return nil, errors.Trace(err) } - // If there's setting keyspace-name, then skipped GC worker logic. - // It needs a group of special tidb nodes to execute GC worker logic. - // TODO: remove this restriction while merged keyspace GC worker logic. - disableGC = true } codec := pdClient.GetCodec() diff --git a/store/gcworker/gc_worker.go b/store/gcworker/gc_worker.go index f9b53e55988d4..474096fbca7ec 100644 --- a/store/gcworker/gc_worker.go +++ b/store/gcworker/gc_worker.go @@ -301,6 +301,85 @@ func (w *GCWorker) tick(ctx context.Context) { } } +// getGCSafePoint returns the current gc safe point. +func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) { + // If there is try to set gc safepoint is 0, the interface will not set gc safepoint to 0, + // it will return current gc safepoint. + safePoint, err := pdClient.UpdateGCSafePoint(ctx, 0) + if err != nil { + return 0, errors.Trace(err) + } + return safePoint, nil +} + +func (w *GCWorker) logIsGCSafePointTooEarly(ctx context.Context, safePoint uint64) error { + now, err := w.getOracleTime() + if err != nil { + return errors.Trace(err) + } + + checkTs := oracle.GoTimeToTS(now.Add(-gcDefaultLifeTime * 2)) + if checkTs > safePoint { + logutil.Logger(ctx).Info("[gc worker] gc safepoint is too early. " + + "Maybe there is a bit BR/Lightning/CDC task, " + + "or a long transaction is running" + + "or need a tidb without setting keyspace-name to calculate and update gc safe point.") + } + return nil +} + +func (w *GCWorker) runKeyspaceDeleteRange(ctx context.Context, concurrency int) error { + // Get safe point from PD. + // The GC safe point is updated only after the global GC have done resolveLocks phase globally. + // So, in the following code, resolveLocks must have been done by the global GC on the ranges to be deleted, + // so its safe to delete the ranges. + safePoint, err := getGCSafePoint(ctx, w.pdClient) + if err != nil { + logutil.Logger(ctx).Info("[gc worker] get gc safe point error", zap.Error(errors.Trace(err))) + return nil + } + + if safePoint == 0 { + logutil.Logger(ctx).Info("[gc worker] skip keyspace delete range, because gc safe point is 0") + return nil + } + + err = w.logIsGCSafePointTooEarly(ctx, safePoint) + if err != nil { + logutil.Logger(ctx).Info("[gc worker] log is gc safe point is too early error", zap.Error(errors.Trace(err))) + return nil + } + + keyspaceID := w.store.GetCodec().GetKeyspaceID() + logutil.Logger(ctx).Info("[gc worker] start keyspace delete range", + zap.String("uuid", w.uuid), + zap.Int("concurrency", concurrency), + zap.Uint32("keyspaceID", uint32(keyspaceID)), + zap.Uint64("GCSafepoint", safePoint)) + + // Do deleteRanges. + err = w.deleteRanges(ctx, safePoint, concurrency) + if err != nil { + logutil.Logger(ctx).Error("[gc worker] delete range returns an error", + zap.String("uuid", w.uuid), + zap.Error(err)) + metrics.GCJobFailureCounter.WithLabelValues("delete_range").Inc() + return errors.Trace(err) + } + + // Do redoDeleteRanges. + err = w.redoDeleteRanges(ctx, safePoint, concurrency) + if err != nil { + logutil.Logger(ctx).Error("[gc worker] redo-delete range returns an error", + zap.String("uuid", w.uuid), + zap.Error(err)) + metrics.GCJobFailureCounter.WithLabelValues("redo_delete_range").Inc() + return errors.Trace(err) + } + + return nil +} + // leaderTick of GC worker checks if it should start a GC job every tick. func (w *GCWorker) leaderTick(ctx context.Context) error { if w.gcIsRunning { @@ -317,6 +396,19 @@ func (w *GCWorker) leaderTick(ctx context.Context) error { return errors.Trace(err) } + // Gc safe point is not separated by keyspace now. The whole cluster has only one global gc safe point. + // So at least one TiDB with `keyspace-name` not set is required in the whole cluster to calculate and update gc safe point. + // If `keyspace-name` is set, the TiDB node will only do its own delete range, and will not calculate gc safe point and resolve locks. + // Note that when `keyspace-name` is set, `checkLeader` will be done within the key space. + // Therefore only one TiDB node in each key space will be responsible to do delete range. + if w.store.GetCodec().GetKeyspace() != nil { + err = w.runKeyspaceGCJob(ctx, concurrency) + if err != nil { + return errors.Trace(err) + } + return nil + } + ok, safePoint, err := w.prepare(ctx) if err != nil { metrics.GCJobFailureCounter.WithLabelValues("prepare").Inc() @@ -354,6 +446,36 @@ func (w *GCWorker) leaderTick(ctx context.Context) error { return nil } +func (w *GCWorker) runKeyspaceGCJob(ctx context.Context, concurrency int) error { + // When the worker is just started, or an old GC job has just finished, + // wait a while before starting a new job. + if time.Since(w.lastFinish) < gcWaitTime { + logutil.Logger(ctx).Info("[gc worker] another keyspace gc job has just finished, skipped.", + zap.String("leaderTick on ", w.uuid)) + return nil + } + + now, err := w.getOracleTime() + if err != nil { + return errors.Trace(err) + } + ok, err := w.checkGCInterval(now) + if err != nil || !ok { + return errors.Trace(err) + } + + go func() { + w.done <- w.runKeyspaceDeleteRange(ctx, concurrency) + }() + + err = w.saveTime(gcLastRunTimeKey, now) + if err != nil { + return errors.Trace(err) + } + + return nil +} + // prepare checks preconditions for starting a GC job. It returns a bool // that indicates whether the GC job should start and the new safePoint. func (w *GCWorker) prepare(ctx context.Context) (bool, uint64, error) { @@ -1974,6 +2096,11 @@ func (w *GCWorker) saveValueToSysTable(key, value string) error { // Placement rules cannot be removed immediately after drop table / truncate table, // because the tables can be flashed back or recovered. func (w *GCWorker) doGCPlacementRules(se session.Session, safePoint uint64, dr util.DelRangeTask, gcPlacementRuleCache map[int64]interface{}) (err error) { + if w.store.GetCodec().GetKeyspace() != nil { + logutil.BgLogger().Info("[gc worker] skip doGCPlacementRules when keyspace_name is set.", zap.String("uuid", w.uuid)) + return nil + } + // Get the job from the job history var historyJob *model.Job failpoint.Inject("mockHistoryJobForGC", func(v failpoint.Value) { From d0d321f440e265e6eb87731ba9fbf3611bf0dca2 Mon Sep 17 00:00:00 2001 From: xiongjiwei Date: Thu, 2 Feb 2023 15:53:56 +0900 Subject: [PATCH 2/4] ddl: add more tests about multi-valued index (#40973) --- expression/multi_valued_index_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/expression/multi_valued_index_test.go b/expression/multi_valued_index_test.go index 1172d06f78986..b749b4c94d9ba 100644 --- a/expression/multi_valued_index_test.go +++ b/expression/multi_valued_index_test.go @@ -46,6 +46,7 @@ func TestMultiValuedIndexDDL(t *testing.T) { tk.MustExec("drop table t") tk.MustGetErrCode("CREATE TABLE t(x INT, KEY k ((1 AND CAST(JSON_ARRAY(x) AS UNSIGNED ARRAY))));", errno.ErrNotSupportedYet) tk.MustGetErrCode("CREATE TABLE t1 (f1 json, key mvi((cast(cast(f1 as unsigned array) as unsigned array))));", errno.ErrNotSupportedYet) + tk.MustGetErrCode("CREATE TABLE t1 (f1 json, primary key mvi((cast(cast(f1 as unsigned array) as unsigned array))));", errno.ErrNotSupportedYet) tk.MustGetErrCode("CREATE TABLE t1 (f1 json, key mvi((cast(f1->>'$[*]' as unsigned array))));", errno.ErrInvalidTypeForJSON) tk.MustGetErrCode("CREATE TABLE t1 (f1 json, key mvi((cast(f1->'$[*]' as year array))));", errno.ErrNotSupportedYet) tk.MustGetErrCode("CREATE TABLE t1 (f1 json, key mvi((cast(f1->'$[*]' as json array))));", errno.ErrNotSupportedYet) @@ -75,6 +76,9 @@ func TestMultiValuedIndexDDL(t *testing.T) { tk.MustExec("drop table t") tk.MustExec("set names gbk") tk.MustExec("create table t(a json, b int, index idx3(b, (cast(a as char(10) array))));") + + tk.MustExec("CREATE TABLE users (id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, doc JSON);") + tk.MustExecToErr("CREATE TABLE t (id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, doc JSON, FOREIGN KEY fk_user_id ((cast(doc->'$[*]' as signed array))) REFERENCES users(id));") } func TestMultiValuedIndexDML(t *testing.T) { From c215108f55b10d19a1baab2c999e66c1b9718113 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 2 Feb 2023 15:47:56 +0800 Subject: [PATCH 3/4] test: stabilize TestAddIndexMergeConflictWithPessimistic (#40975) close pingcap/tidb#40939 --- ddl/indexmergetest/BUILD.bazel | 1 + ddl/indexmergetest/merge_test.go | 3 +++ 2 files changed, 4 insertions(+) diff --git a/ddl/indexmergetest/BUILD.bazel b/ddl/indexmergetest/BUILD.bazel index 25dfef99ecb3f..5f6e4215f664e 100644 --- a/ddl/indexmergetest/BUILD.bazel +++ b/ddl/indexmergetest/BUILD.bazel @@ -8,6 +8,7 @@ go_test( "merge_test.go", ], flaky = True, + race = "on", shard_count = 4, deps = [ "//config", diff --git a/ddl/indexmergetest/merge_test.go b/ddl/indexmergetest/merge_test.go index a31b3edcc23a4..20bc251c77089 100644 --- a/ddl/indexmergetest/merge_test.go +++ b/ddl/indexmergetest/merge_test.go @@ -478,6 +478,7 @@ func TestAddIndexMergeConflictWithPessimistic(t *testing.T) { callback := &callback.TestDDLCallback{Do: dom} runPessimisticTxn := false + afterPessDML := make(chan struct{}, 1) callback.OnJobRunBeforeExported = func(job *model.Job) { if t.Failed() { return @@ -500,6 +501,7 @@ func TestAddIndexMergeConflictWithPessimistic(t *testing.T) { assert.NoError(t, err) _, err = tk2.Exec("update t set a = 3 where id = 1;") assert.NoError(t, err) + afterPessDML <- struct{}{} } } dom.DDL().SetHook(callback) @@ -515,6 +517,7 @@ func TestAddIndexMergeConflictWithPessimistic(t *testing.T) { case <-afterCommit: require.Fail(t, "should be blocked by the pessimistic txn") } + <-afterPessDML tk2.MustExec("rollback;") <-afterCommit dom.DDL().SetHook(originHook) From 04f9b3c150f226fc1257d8e09356481fbfa94453 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 2 Feb 2023 16:11:56 +0800 Subject: [PATCH 4/4] *: add UseAutoScaler config to disable AutoScaler (#40966) close pingcap/tidb#40971 --- config/config.go | 5 +- config/config.toml.example | 4 ++ executor/tiflashtest/tiflash_test.go | 31 ++++++++ session/session.go | 8 +++ store/copr/batch_coprocessor.go | 101 ++++++++++++++++++++++++++- store/copr/mpp.go | 18 +++++ tidb-server/main.go | 2 +- util/tiflashcompute/topo_fetcher.go | 3 + 8 files changed, 168 insertions(+), 4 deletions(-) diff --git a/config/config.go b/config/config.go index b0a7b4498a05b..a2230f5d73c35 100644 --- a/config/config.go +++ b/config/config.go @@ -295,6 +295,8 @@ type Config struct { TiFlashComputeAutoScalerAddr string `toml:"autoscaler-addr" json:"autoscaler-addr"` IsTiFlashComputeFixedPool bool `toml:"is-tiflashcompute-fixed-pool" json:"is-tiflashcompute-fixed-pool"` AutoScalerClusterID string `toml:"autoscaler-cluster-id" json:"autoscaler-cluster-id"` + // todo: remove this after AutoScaler is stable. + UseAutoScaler bool `toml:"use-autoscaler" json:"use-autoscaler"` // TiDBMaxReuseChunk indicates max cached chunk num TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"` @@ -1012,6 +1014,7 @@ var defaultConf = Config{ TiFlashComputeAutoScalerAddr: tiflashcompute.DefAWSAutoScalerAddr, IsTiFlashComputeFixedPool: false, AutoScalerClusterID: "", + UseAutoScaler: true, TiDBMaxReuseChunk: 64, TiDBMaxReuseColumn: 256, TiDBEnableExitCheck: false, @@ -1348,7 +1351,7 @@ func (c *Config) Valid() error { } // Check tiflash_compute topo fetch is valid. - if c.DisaggregatedTiFlash { + if c.DisaggregatedTiFlash && c.UseAutoScaler { if !tiflashcompute.IsValidAutoScalerConfig(c.TiFlashComputeAutoScalerType) { return fmt.Errorf("invalid AutoScaler type, expect %s, %s or %s, got %s", tiflashcompute.MockASStr, tiflashcompute.AWSASStr, tiflashcompute.GCPASStr, c.TiFlashComputeAutoScalerType) diff --git a/config/config.toml.example b/config/config.toml.example index d153f56fe7690..74cbc9413a496 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -129,6 +129,10 @@ autoscaler-addr = "tiflash-autoscale-lb.tiflash-autoscale.svc.cluster.local:8081 # Only meaningful when disaggregated-tiflash is true. autoscaler-cluster-id = "" +# use-autoscaler indicates whether use AutoScaler or PD for tiflash_compute nodes, only meaningful when disaggregated-tiflash is true. +# Will remove this after AutoScaler is stable. +use-autoscaler = true + [log] # Log level: debug, info, warn, error, fatal. level = "info" diff --git a/executor/tiflashtest/tiflash_test.go b/executor/tiflashtest/tiflash_test.go index f2bb7232bbbfa..bc759b5b27452 100644 --- a/executor/tiflashtest/tiflash_test.go +++ b/executor/tiflashtest/tiflash_test.go @@ -1319,6 +1319,37 @@ func TestDisaggregatedTiFlash(t *testing.T) { require.Contains(t, err.Error(), "[util:1815]Internal : get tiflash_compute topology failed") } +// todo: remove this after AutoScaler is stable. +func TestDisaggregatedTiFlashNonAutoScaler(t *testing.T) { + config.UpdateGlobal(func(conf *config.Config) { + conf.DisaggregatedTiFlash = true + conf.UseAutoScaler = false + }) + defer config.UpdateGlobal(func(conf *config.Config) { + conf.DisaggregatedTiFlash = false + conf.UseAutoScaler = true + }) + + // Setting globalTopoFetcher to nil to can make sure cannot fetch topo from AutoScaler. + err := tiflashcompute.InitGlobalTopoFetcher(tiflashcompute.InvalidASStr, "", "", false) + require.Contains(t, err.Error(), "unexpected topo fetch type. expect: mock or aws or gcp, got invalid") + + store := testkit.CreateMockStore(t, withMockTiFlash(2)) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(c1 int)") + tk.MustExec("alter table t set tiflash replica 1") + tb := external.GetTableByName(t, tk, "test", "t") + err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true) + require.NoError(t, err) + tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") + + err = tk.ExecToErr("select * from t;") + // This error message means we use PD instead of AutoScaler. + require.Contains(t, err.Error(), "tiflash_compute node is unavailable") +} + func TestDisaggregatedTiFlashQuery(t *testing.T) { config.UpdateGlobal(func(conf *config.Config) { conf.DisaggregatedTiFlash = true diff --git a/session/session.go b/session/session.go index 624315fe8fc72..5b7fa5a8875aa 100644 --- a/session/session.go +++ b/session/session.go @@ -3339,6 +3339,14 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { return nil, err } + if config.GetGlobalConfig().DisaggregatedTiFlash && !config.GetGlobalConfig().UseAutoScaler { + // Invalid client-go tiflash_compute store cache if necessary. + err = dom.WatchTiFlashComputeNodeChange() + if err != nil { + return nil, err + } + } + if err = extensionimpl.Bootstrap(context.Background(), dom); err != nil { return nil, err } diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 568b4dc940c3c..99b7a807d52e2 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -494,7 +494,10 @@ func buildBatchCopTasksForNonPartitionedTable( balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { if config.GetGlobalConfig().DisaggregatedTiFlash { - return buildBatchCopTasksConsistentHash(ctx, bo, store, []*KeyRanges{ranges}, storeType, ttl) + if config.GetGlobalConfig().UseAutoScaler { + return buildBatchCopTasksConsistentHash(ctx, bo, store, []*KeyRanges{ranges}, storeType, ttl) + } + return buildBatchCopTasksConsistentHashForPD(bo, store, []*KeyRanges{ranges}, storeType, ttl) } return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } @@ -511,7 +514,12 @@ func buildBatchCopTasksForPartitionedTable( balanceContinuousRegionCount int64, partitionIDs []int64) (batchTasks []*batchCopTask, err error) { if config.GetGlobalConfig().DisaggregatedTiFlash { - batchTasks, err = buildBatchCopTasksConsistentHash(ctx, bo, store, rangesForEachPhysicalTable, storeType, ttl) + if config.GetGlobalConfig().UseAutoScaler { + batchTasks, err = buildBatchCopTasksConsistentHash(ctx, bo, store, rangesForEachPhysicalTable, storeType, ttl) + } else { + // todo: remove this after AutoScaler is stable. + batchTasks, err = buildBatchCopTasksConsistentHashForPD(bo, store, rangesForEachPhysicalTable, storeType, ttl) + } } else { batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } @@ -1169,3 +1177,92 @@ func (b *batchCopIterator) handleCollectExecutionInfo(bo *Backoffer, resp *batch } resp.detail.CalleeAddress = task.storeAddr } + +// Only called when UseAutoScaler is false. +func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer, + kvStore *kvStore, + rangesForEachPhysicalTable []*KeyRanges, + storeType kv.StoreType, + ttl time.Duration) (res []*batchCopTask, err error) { + const cmdType = tikvrpc.CmdBatchCop + var retryNum int + cache := kvStore.GetRegionCache() + + for { + retryNum++ + var rangesLen int + tasks := make([]*copTask, 0) + regionIDs := make([]tikv.RegionVerID, 0) + + for i, ranges := range rangesForEachPhysicalTable { + rangesLen += ranges.Len() + locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) + if err != nil { + return nil, errors.Trace(err) + } + for _, lo := range locations { + tasks = append(tasks, &copTask{ + region: lo.Location.Region, + ranges: lo.Ranges, + cmdType: cmdType, + storeType: storeType, + partitionIndex: int64(i), + }) + regionIDs = append(regionIDs, lo.Location.Region) + } + } + + stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer()) + if err != nil { + return nil, err + } + stores = filterAliveStores(bo.GetCtx(), stores, ttl, kvStore) + if len(stores) == 0 { + return nil, errors.New("tiflash_compute node is unavailable") + } + + rpcCtxs, err := cache.GetTiFlashComputeRPCContextByConsistentHash(bo.TiKVBackoffer(), regionIDs, stores) + if err != nil { + return nil, err + } + if rpcCtxs == nil { + logutil.BgLogger().Info("buildBatchCopTasksConsistentHash retry because rcpCtx is nil", zap.Int("retryNum", retryNum)) + err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer")) + if err != nil { + return nil, errors.Trace(err) + } + continue + } + if len(rpcCtxs) != len(tasks) { + return nil, errors.Errorf("length should be equal, len(rpcCtxs): %d, len(tasks): %d", len(rpcCtxs), len(tasks)) + } + taskMap := make(map[string]*batchCopTask) + for i, rpcCtx := range rpcCtxs { + regionInfo := RegionInfo{ + // tasks and rpcCtxs are correspond to each other. + Region: tasks[i].region, + Meta: rpcCtx.Meta, + Ranges: tasks[i].ranges, + AllStores: []uint64{rpcCtx.Store.StoreID()}, + PartitionIndex: tasks[i].partitionIndex, + } + if batchTask, ok := taskMap[rpcCtx.Addr]; ok { + batchTask.regionInfos = append(batchTask.regionInfos, regionInfo) + } else { + batchTask := &batchCopTask{ + storeAddr: rpcCtx.Addr, + cmdType: cmdType, + ctx: rpcCtx, + regionInfos: []RegionInfo{regionInfo}, + } + taskMap[rpcCtx.Addr] = batchTask + res = append(res, batchTask) + } + } + logutil.BgLogger().Info("buildBatchCopTasksConsistentHash done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(stores))) + break + } + + failpointCheckForConsistentHash(res) + return res, nil +} diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 49b44922aa2ce..03c09b37950a0 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -250,6 +250,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req var rpcResp *tikvrpc.Response var err error var retry bool + invalidPDCache := config.GetGlobalConfig().DisaggregatedTiFlash && !config.GetGlobalConfig().UseAutoScaler // If copTasks is not empty, we should send request according to region distribution. // Or else it's the task without region, which always happens in high layer task without table. @@ -262,6 +263,9 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req // That's a hard job but we can try it in the future. if sender.GetRPCError() != nil { logutil.BgLogger().Warn("mpp dispatch meet io error", zap.String("error", sender.GetRPCError().Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId), zap.Int64("mpp-version", taskMeta.MppVersion)) + if invalidPDCache { + m.store.GetRegionCache().InvalidateTiFlashComputeStores() + } // if needTriggerFallback is true, we return timeout to trigger tikv's fallback if m.needTriggerFallback { err = derr.ErrTiFlashServerTimeout @@ -274,6 +278,9 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled { retry = false } else if err != nil { + if invalidPDCache { + m.store.GetRegionCache().InvalidateTiFlashComputeStores() + } if bo.Backoff(tikv.BoTiFlashRPC(), err) == nil { retry = true } @@ -351,7 +358,9 @@ func (m *mppIterator) cancelMppTasks() { } // send cancel cmd to all stores where tasks run + invalidPDCache := config.GetGlobalConfig().DisaggregatedTiFlash && !config.GetGlobalConfig().UseAutoScaler wg := util.WaitGroupWrapper{} + gotErr := atomic.Bool{} for addr := range usedStoreAddrs { storeAddr := addr wg.Run(func() { @@ -359,10 +368,16 @@ func (m *mppIterator) cancelMppTasks() { logutil.BgLogger().Debug("cancel task", zap.Uint64("query id ", m.startTs), zap.String("on addr", storeAddr), zap.Int64("mpp-version", m.mppVersion.ToInt64())) if err != nil { logutil.BgLogger().Error("cancel task error", zap.Error(err), zap.Uint64("query id", m.startTs), zap.String("on addr", storeAddr), zap.Int64("mpp-version", m.mppVersion.ToInt64())) + if invalidPDCache { + gotErr.CompareAndSwap(false, true) + } } }) } wg.Wait() + if invalidPDCache && gotErr.Load() { + m.store.GetRegionCache().InvalidateTiFlashComputeStores() + } } func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchRequest, taskMeta *mpp.TaskMeta) { @@ -389,6 +404,9 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques if err != nil { logutil.BgLogger().Warn("establish mpp connection meet error and cannot retry", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId), zap.Int64("mpp-version", taskMeta.MppVersion)) + if config.GetGlobalConfig().DisaggregatedTiFlash && !config.GetGlobalConfig().UseAutoScaler { + m.store.GetRegionCache().InvalidateTiFlashComputeStores() + } // if needTriggerFallback is true, we return timeout to trigger tikv's fallback if m.needTriggerFallback { m.sendError(derr.ErrTiFlashServerTimeout) diff --git a/tidb-server/main.go b/tidb-server/main.go index f3ca17cf7c4c6..3d73fa111ab8c 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -204,7 +204,7 @@ func main() { err := cpuprofile.StartCPUProfiler() terror.MustNil(err) - if config.GetGlobalConfig().DisaggregatedTiFlash { + if config.GetGlobalConfig().DisaggregatedTiFlash && config.GetGlobalConfig().UseAutoScaler { clusterID, err := config.GetAutoScalerClusterID() terror.MustNil(err) diff --git a/util/tiflashcompute/topo_fetcher.go b/util/tiflashcompute/topo_fetcher.go index f8d7e7b123e63..877d3a0519448 100644 --- a/util/tiflashcompute/topo_fetcher.go +++ b/util/tiflashcompute/topo_fetcher.go @@ -44,6 +44,8 @@ const ( GCPASStr = "gcp" // TestASStr is string value for test AutoScaler. TestASStr = "test" + // InvalidASStr is string value for invalid AutoScaler. + InvalidASStr = "invalid" ) const ( @@ -127,6 +129,7 @@ func InitGlobalTopoFetcher(typ string, addr string, clusterID string, isFixedPoo case TestASType: globalTopoFetcher = NewTestAutoScalerFetcher() default: + globalTopoFetcher = nil err = errors.Errorf("unexpected topo fetch type. expect: %s or %s or %s, got %s", MockASStr, AWSASStr, GCPASStr, typ) }