From 745c0bbeeea32af4c61598d94c6ce09bd84db4da Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Tue, 19 Nov 2024 19:47:41 +0800 Subject: [PATCH 01/38] build execute max_write_speed --- pkg/ddl/db_test.go | 9 +++++++++ pkg/executor/operate_ddl_jobs.go | 11 +++++++++++ pkg/meta/model/reorg.go | 5 +++-- pkg/planner/core/planbuilder.go | 11 +++++++++++ 4 files changed, 34 insertions(+), 2 deletions(-) diff --git a/pkg/ddl/db_test.go b/pkg/ddl/db_test.go index 2cd989ce1107f..27290e2a82f0a 100644 --- a/pkg/ddl/db_test.go +++ b/pkg/ddl/db_test.go @@ -1195,6 +1195,15 @@ func TestAdminAlterDDLJobUnsupportedCases(t *testing.T) { tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 257;", "the value 257 for thread is out of range [1, 256]") tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 31;", "the value 31 for batch_size is out of range [32, 10240]") tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 10241;", "the value 10241 for batch_size is out of range [32, 10240]") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '2PiB';", "the value 2PiB for max_write_speed is out of range [0, 1125899906842624]") + // valid config value + tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 16;", "ddl job 1 is not running") + tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 64;", "ddl job 1 is not running") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '0';", "ddl job 1 is not running") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '64';", "ddl job 1 is not running") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '2KB';", "ddl job 1 is not running") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '3MiB';", "ddl job 1 is not running") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '4 gb';", "ddl job 1 is not running") // invalid job id tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 8;", "ddl job 1 is not running") diff --git a/pkg/executor/operate_ddl_jobs.go b/pkg/executor/operate_ddl_jobs.go index 4aa14405aa665..8ab9414392213 100644 --- a/pkg/executor/operate_ddl_jobs.go +++ b/pkg/executor/operate_ddl_jobs.go @@ -19,6 +19,7 @@ import ( "fmt" "strconv" + "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/ddl" @@ -210,6 +211,16 @@ func (e *AlterDDLJobExec) updateReorgMeta(job *model.Job, byWho model.AdminComma job.ReorgMeta.BatchSize = int(cons.Value.GetInt64()) } job.AdminOperator = byWho + case core.AlterDDLJobMaxWriteSpeed: + if opt.Value != nil { + cons := opt.Value.(*expression.Constant) + speed, err := units.RAMInBytes(cons.Value.GetString()) + if err != nil { + return errors.Trace(err) + } + job.ReorgMeta.MaxWriteSpeed = speed + } + job.AdminOperator = byWho default: return errors.Errorf("unsupported admin alter ddl jobs config: %s", opt.Name) } diff --git a/pkg/meta/model/reorg.go b/pkg/meta/model/reorg.go index 01be1d69fcbc1..ad88b88e90329 100644 --- a/pkg/meta/model/reorg.go +++ b/pkg/meta/model/reorg.go @@ -73,8 +73,9 @@ type DDLReorgMeta struct { TargetScope string `json:"target_scope"` // These two variables are set when corresponding session variables are set explicitly. When they are set, // user cannot change it by setting the global one. Otherwise, they can be adjusted dynamically through global var. - Concurrency int `json:"concurrency"` - BatchSize int `json:"batch_size"` + Concurrency int `json:"concurrency"` + BatchSize int `json:"batch_size"` + MaxWriteSpeed int64 `json:"max_write_speed"` } // GetConcurrencyOrDefault gets the concurrency from DDLReorgMeta or returns the default value. diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index ad4c1455a5a3a..8a3b2792ea06d 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -23,6 +23,7 @@ import ( "strconv" "strings" + "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/bindinfo" @@ -5939,6 +5940,16 @@ func checkAlterDDLJobOptValue(opt *AlterDDLJobOpt) error { return fmt.Errorf("the value %v for %s is out of range [%v, %v]", bs, opt.Name, variable.MinDDLReorgBatchSize, variable.MaxDDLReorgBatchSize) } + case AlterDDLJobMaxWriteSpeed: + speedStr := opt.Value.(*expression.Constant).Value.GetString() + speed, err := units.RAMInBytes(speedStr) + if err != nil { + return errors.Trace(err) + } + if speed < 0 || speed > units.PiB { + return fmt.Errorf("the value %s for %s is out of range [%v, %v]", + speedStr, opt.Name, 0, units.PiB) + } } return nil } From f286233507ae87fed5b0fb2c4acad1f48d6373e9 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Thu, 21 Nov 2024 22:35:54 +0800 Subject: [PATCH 02/38] use get and set interface handle the max write speed in reorg meta --- pkg/ddl/backfilling.go | 3 ++- pkg/ddl/backfilling_dist_executor.go | 1 + pkg/ddl/executor.go | 1 + pkg/ddl/index.go | 2 +- pkg/ddl/ingest/backend_mgr.go | 4 +++- pkg/ddl/ingest/config.go | 3 ++- pkg/ddl/ingest/mock.go | 2 +- pkg/executor/operate_ddl_jobs.go | 2 +- pkg/meta/model/reorg.go | 15 +++++++++++++++ .../realtikvtest/addindextest3/functional_test.go | 2 +- 10 files changed, 28 insertions(+), 7 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index e3d52c22481db..e4754d386cd8c 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -708,8 +708,9 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( //nolint: forcetypeassert discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) + maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load())) bcCtx, err := ingest.LitBackCtxMgr.Register( - ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, job.RealStartTS) + ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, maxWriteSpeed, job.RealStartTS) if err != nil { return errors.Trace(err) } diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index d9b5e6c062273..ccc50c0c99a61 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -154,6 +154,7 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) { discovery, job.ReorgMeta.ResourceGroupName, job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), + job.ReorgMeta.GetConcurrencyOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load())), job.RealStartTS, ) } diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index cba9955689c25..1c1b7a384c823 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -4928,6 +4928,7 @@ func initJobReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) erro if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok { m.SetBatchSize(variable.TidbOptInt(sv, 0)) } + m.SetMaxWriteSpeed(int(variable.DDLReorgMaxWriteSpeed.Load())) } setDistTaskParam := func() error { m.IsDistReorg = variable.EnableDistTask.Load() diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 7bdf73313b708..824cedfc2a56f 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -2429,7 +2429,7 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo ctx := tidblogutil.WithCategory(ctx, "ddl-ingest") if bc == nil { bc, err = ingest.LitBackCtxMgr.Register( - ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1, reorgInfo.RealStartTS) + ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1, 0, reorgInfo.RealStartTS) if err != nil { return err } diff --git a/pkg/ddl/ingest/backend_mgr.go b/pkg/ddl/ingest/backend_mgr.go index 81c29617ae0c3..048f883e4ceef 100644 --- a/pkg/ddl/ingest/backend_mgr.go +++ b/pkg/ddl/ingest/backend_mgr.go @@ -51,6 +51,7 @@ type BackendCtxMgr interface { pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int, + maxWriteSpeed int, initTS uint64, ) (BackendCtx, error) Unregister(jobID int64) @@ -118,6 +119,7 @@ func (m *litBackendCtxMgr) Register( pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, concurrency int, + maxWriteSpeed int, initTS uint64, ) (BackendCtx, error) { bc, exist := m.Load(jobID) @@ -136,7 +138,7 @@ func (m *litBackendCtxMgr) Register( logutil.Logger(ctx).Error(LitErrCreateDirFail, zap.Error(err)) return nil, err } - cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName, concurrency) + cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName, concurrency, maxWriteSpeed) if err != nil { logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err)) return nil, err diff --git a/pkg/ddl/ingest/config.go b/pkg/ddl/ingest/config.go index cc6419c7271ce..a97cb81d9cd03 100644 --- a/pkg/ddl/ingest/config.go +++ b/pkg/ddl/ingest/config.go @@ -44,6 +44,7 @@ func genConfig( unique bool, resourceGroup string, concurrency int, + maxWriteSpeed int, ) (*local.BackendConfig, error) { cfg := &local.BackendConfig{ LocalStoreDir: jobSortPath, @@ -68,7 +69,7 @@ func genConfig( PausePDSchedulerScope: lightning.PausePDSchedulerScopeTable, TaskType: kvutil.ExplicitTypeDDL, DisableAutomaticCompactions: true, - StoreWriteBWLimit: int(variable.DDLReorgMaxWriteSpeed.Load()), + StoreWriteBWLimit: maxWriteSpeed, } // Each backend will build a single dir in lightning dir. if ImporterRangeConcurrencyForTest != nil { diff --git a/pkg/ddl/ingest/mock.go b/pkg/ddl/ingest/mock.go index 59dcb167fdfec..88b50ad099255 100644 --- a/pkg/ddl/ingest/mock.go +++ b/pkg/ddl/ingest/mock.go @@ -57,7 +57,7 @@ func (m *MockBackendCtxMgr) CheckMoreTasksAvailable() (bool, error) { // Register implements BackendCtxMgr.Register interface. func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client, - pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int, initTS uint64) (BackendCtx, error) { + pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int, maxWriteSpeed int, initTS uint64) (BackendCtx, error) { logutil.DDLIngestLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID)) if mockCtx, ok := m.runningJobs[jobID]; ok { return mockCtx, nil diff --git a/pkg/executor/operate_ddl_jobs.go b/pkg/executor/operate_ddl_jobs.go index 27b241f4953c7..4dded20fa6577 100644 --- a/pkg/executor/operate_ddl_jobs.go +++ b/pkg/executor/operate_ddl_jobs.go @@ -218,7 +218,7 @@ func (e *AlterDDLJobExec) updateReorgMeta(job *model.Job, byWho model.AdminComma if err != nil { return errors.Trace(err) } - job.ReorgMeta.MaxWriteSpeed = speed + job.ReorgMeta.SetMaxWriteSpeed(int(speed)) } job.AdminOperator = byWho default: diff --git a/pkg/meta/model/reorg.go b/pkg/meta/model/reorg.go index c82dd3f69c9f2..d0022cfa96374 100644 --- a/pkg/meta/model/reorg.go +++ b/pkg/meta/model/reorg.go @@ -109,6 +109,21 @@ func (dm *DDLReorgMeta) SetBatchSize(batchSize int) { atomic.StoreInt64(&dm.BatchSize, int64(batchSize)) } +// GetMaxWriteSpeedOrDefault gets the max write speed from DDLReorgMeta. +// 0 means no limit. +func (dm *DDLReorgMeta) GetMaxWriteSpeedOrDefault(defaultVal int) int { + maxWriteSpeed := atomic.LoadInt64(&dm.MaxWriteSpeed) + if dm == nil { + return defaultVal + } + return int(maxWriteSpeed) +} + +// SetMaxWriteSpeed sets the max write speed in DDLReorgMeta. +func (dm *DDLReorgMeta) SetMaxWriteSpeed(maxWriteSpeed int) { + atomic.StoreInt64(&dm.MaxWriteSpeed, int64(maxWriteSpeed)) +} + const ( // ReorgMetaVersion0 is the minimum version of DDLReorgMeta. ReorgMetaVersion0 = int64(0) diff --git a/tests/realtikvtest/addindextest3/functional_test.go b/tests/realtikvtest/addindextest3/functional_test.go index aaa6eb4e29764..6e37fb5e04bcd 100644 --- a/tests/realtikvtest/addindextest3/functional_test.go +++ b/tests/realtikvtest/addindextest3/functional_test.go @@ -85,7 +85,7 @@ func TestDDLTestEstimateTableRowSize(t *testing.T) { func TestBackendCtxConcurrentUnregister(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) discovery := store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() - bCtx, err := ingest.LitBackCtxMgr.Register(context.Background(), 1, false, nil, discovery, "test", 1, 0) + bCtx, err := ingest.LitBackCtxMgr.Register(context.Background(), 1, false, nil, discovery, "test", 1, 0, 0) require.NoError(t, err) idxIDs := []int64{1, 2, 3, 4, 5, 6, 7} uniques := make([]bool, 0, len(idxIDs)) From cbbd6aba5ddea36e1e2a0d97207df1060aee9b10 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Thu, 21 Nov 2024 23:20:23 +0800 Subject: [PATCH 03/38] make the limiter can be adjusted --- pkg/lightning/backend/local/local.go | 7 +-- pkg/lightning/backend/local/localhelper.go | 58 ++++++++++++++++--- .../backend/local/localhelper_test.go | 37 ++++++++++++ pkg/lightning/backend/local/region_job.go | 5 ++ 4 files changed, 92 insertions(+), 15 deletions(-) diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index e2e757bb48563..eaeda026560d6 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -637,12 +637,7 @@ func NewBackend( return nil, common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs() } - var writeLimiter StoreWriteLimiter - if config.StoreWriteBWLimit > 0 { - writeLimiter = newStoreWriteLimiter(config.StoreWriteBWLimit) - } else { - writeLimiter = noopStoreWriteLimiter{} - } + writeLimiter := newStoreWriteLimiter(config.StoreWriteBWLimit) local := &Backend{ pdCli: pdCli, pdHTTPCli: pdHTTPCli, diff --git a/pkg/lightning/backend/local/localhelper.go b/pkg/lightning/backend/local/localhelper.go index e042a78c5cdf5..0f15b8d083c63 100644 --- a/pkg/lightning/backend/local/localhelper.go +++ b/pkg/lightning/backend/local/localhelper.go @@ -19,6 +19,7 @@ import ( "context" "math" "sync" + "sync/atomic" "time" "github.com/docker/go-units" @@ -133,16 +134,30 @@ func largerStartKey(a, b []byte) []byte { type StoreWriteLimiter interface { WaitN(ctx context.Context, storeID uint64, n int) error Limit() int + UpdateLimiter(limit int) } type storeWriteLimiter struct { rwm sync.RWMutex limiters map[uint64]*rate.Limiter - limit int - burst int + // limit and burst can only be non-negative, 0 means no rate limiting. + limit int64 + burst int64 } func newStoreWriteLimiter(limit int) *storeWriteLimiter { + l, b := calculateLimitAndBurst(limit) + return &storeWriteLimiter{ + limiters: make(map[uint64]*rate.Limiter), + limit: l, + burst: b, + } +} + +func calculateLimitAndBurst(limit int) (int64, int64) { + if limit <= 0 { + return 0, 0 + } var burst int // Allow burst of at most 20% of the limit. if limit <= math.MaxInt-limit/5 { @@ -151,15 +166,14 @@ func newStoreWriteLimiter(limit int) *storeWriteLimiter { // If overflowed, set burst to math.MaxInt. burst = math.MaxInt } - return &storeWriteLimiter{ - limiters: make(map[uint64]*rate.Limiter), - limit: limit, - burst: burst, - } + return int64(limit), int64(burst) } func (s *storeWriteLimiter) WaitN(ctx context.Context, storeID uint64, n int) error { limiter := s.getLimiter(storeID) + if limiter == nil { + return nil + } // The original WaitN doesn't allow n > burst, // so we call WaitN with burst multiple times. for n > limiter.Burst() { @@ -172,10 +186,13 @@ func (s *storeWriteLimiter) WaitN(ctx context.Context, storeID uint64, n int) er } func (s *storeWriteLimiter) Limit() int { - return s.limit + return int(atomic.LoadInt64(&s.limit)) } func (s *storeWriteLimiter) getLimiter(storeID uint64) *rate.Limiter { + if atomic.LoadInt64(&s.limit) == 0 { + return nil + } s.rwm.RLock() limiter, ok := s.limiters[storeID] s.rwm.RUnlock() @@ -186,12 +203,33 @@ func (s *storeWriteLimiter) getLimiter(storeID uint64) *rate.Limiter { defer s.rwm.Unlock() limiter, ok = s.limiters[storeID] if !ok { - limiter = rate.NewLimiter(rate.Limit(s.limit), s.burst) + limiter = rate.NewLimiter(rate.Limit(atomic.LoadInt64(&s.limit)), int(atomic.LoadInt64(&s.burst))) s.limiters[storeID] = limiter } return limiter } +func (s *storeWriteLimiter) UpdateLimiter(newLimit int) { + limit, burst := calculateLimitAndBurst(newLimit) + if atomic.LoadInt64(&s.limit) == limit { + return + } + + atomic.StoreInt64(&s.limit, limit) + atomic.StoreInt64(&s.burst, burst) + // Update all existing limiters with the new limit and burst values. + s.rwm.Lock() + defer s.rwm.Unlock() + if atomic.LoadInt64(&s.limit) == 0 { + s.limiters = make(map[uint64]*rate.Limiter) + return + } + for _, limiter := range s.limiters { + limiter.SetLimit(rate.Limit(atomic.LoadInt64(&s.limit))) + limiter.SetBurst(int(atomic.LoadInt64(&s.burst))) + } +} + type noopStoreWriteLimiter struct{} func (noopStoreWriteLimiter) WaitN(_ context.Context, _ uint64, _ int) error { @@ -202,6 +240,8 @@ func (noopStoreWriteLimiter) Limit() int { return math.MaxInt } +func (noopStoreWriteLimiter) UpdateLimiter(_ int) {} + // compaction threshold const ( CompactionLowerThreshold = 512 * units.MiB diff --git a/pkg/lightning/backend/local/localhelper_test.go b/pkg/lightning/backend/local/localhelper_test.go index 4baa322c81e49..5cc453e435147 100644 --- a/pkg/lightning/backend/local/localhelper_test.go +++ b/pkg/lightning/backend/local/localhelper_test.go @@ -325,3 +325,40 @@ func TestStoreWriteLimiter(t *testing.T) { } wg.Wait() } + +func TestTuneStoreWriteLimiter(t *testing.T) { + limiter := newStoreWriteLimiter(100) + testLimiter := func(ctx context.Context, maxT int) { + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func(storeID uint64) { + defer wg.Done() + start := time.Now() + var gotTokens int + for { + n := rand.Intn(50) + if limiter.WaitN(ctx, storeID, n) != nil { + break + } + gotTokens += n + } + elapsed := time.Since(start) + maxTokens := 120 + int(float64(elapsed)/float64(time.Second)*float64(maxT)) + // In theory, gotTokens should be less than or equal to maxT. + // But we allow a little of error to avoid the test being flaky. + require.LessOrEqual(t, gotTokens, maxTokens+1) + }(uint64(i)) + } + wg.Wait() + } + + ctx0, cancel0 := context.WithTimeout(context.Background(), time.Second*2) + defer cancel0() + testLimiter(ctx0, 100) + + limiter.UpdateLimiter(200) + ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second*2) + defer cancel1() + testLimiter(ctx1, 200) +} diff --git a/pkg/lightning/backend/local/region_job.go b/pkg/lightning/backend/local/region_job.go index 14f1c9a8191e5..b9b42f459f802 100644 --- a/pkg/lightning/backend/local/region_job.go +++ b/pkg/lightning/backend/local/region_job.go @@ -780,6 +780,11 @@ func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestRe return resp, nil } +// UpdateLimiter updates the write limiter of the backend. +func (local *Backend) UpdateLimiter(limit int) { + local.writeLimiter.UpdateLimiter(limit) +} + // convertStageOnIngestError will try to fix the error contained in ingest response. // Return (_, error) when another error occurred. // Return (true, nil) when the job can retry ingesting immediately. From 829c67fd58c249c0c963f6f6fa2d615b27920718 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Fri, 22 Nov 2024 00:08:22 +0800 Subject: [PATCH 04/38] adjust max write speed --- pkg/ddl/backfilling.go | 35 ++++++++++++++--------- pkg/ddl/backfilling_read_index.go | 4 +-- pkg/ddl/job_worker.go | 1 + pkg/lightning/backend/local/region_job.go | 5 ++++ 4 files changed, 29 insertions(+), 16 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index e4754d386cd8c..d86808acc81ed 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -777,7 +777,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( if err != nil { return err } - err = executeAndClosePipeline(opCtx, pipe, job, avgRowSize) + err = executeAndClosePipeline(opCtx, pipe, job, bcCtx, avgRowSize) if err != nil { err1 := bcCtx.FinishAndUnregisterEngines(ingest.OptCloseEngines) if err1 != nil { @@ -794,7 +794,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( return bcCtx.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup) } -func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) { +func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job *model.Job, bcCtx ingest.BackendCtx, avgRowSize int) { opR, opW := pipe.GetLocalIngestModeReaderAndWriter() if opR == nil || opW == nil { logutil.DDLIngestLogger().Error("failed to get local ingest mode reader or writer", zap.Int64("jobID", job.ID)) @@ -818,23 +818,30 @@ func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job case <-ctx.Done(): return case <-ticker.C: - targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt( - job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), avgRowSize) + maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load())) + if maxWriteSpeed != bcCtx.GetLocalBackend().GetLimiterSpeed() { + bcCtx.GetLocalBackend().UpdateLimiter(maxWriteSpeed) + logutil.DDLIngestLogger().Info("adjust ddl job config success", + zap.Int64("jobID", job.ID), + zap.Int("max write speed", maxWriteSpeed)) + } + + concurrency := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) + targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(concurrency, avgRowSize) currentReaderCnt, currentWriterCnt := reader.GetWorkerPoolSize(), writer.GetWorkerPoolSize() - if int32(targetReaderCnt) == currentReaderCnt && int32(targetWriterCnt) == currentWriterCnt { - continue + if int32(targetReaderCnt) != currentReaderCnt || int32(targetWriterCnt) != currentWriterCnt { + reader.TuneWorkerPoolSize(int32(targetReaderCnt)) + writer.TuneWorkerPoolSize(int32(targetWriterCnt)) + logutil.DDLIngestLogger().Info("adjust ddl job config success", + zap.Int64("jobID", job.ID), + zap.Int32("table scan operator count", reader.GetWorkerPoolSize()), + zap.Int32("index ingest operator count", writer.GetWorkerPoolSize())) } - reader.TuneWorkerPoolSize(int32(targetReaderCnt)) - writer.TuneWorkerPoolSize(int32(targetWriterCnt)) - logutil.DDLIngestLogger().Info("adjust ddl job config success", - zap.Int64("jobID", job.ID), - zap.Int32("table scan operator count", reader.GetWorkerPoolSize()), - zap.Int32("index ingest operator count", writer.GetWorkerPoolSize())) } } } -func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) error { +func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job *model.Job, bcCtx ingest.BackendCtx, avgRowSize int) error { err := pipe.Execute() if err != nil { return err @@ -843,7 +850,7 @@ func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job // Adjust worker pool size dynamically. if job != nil { go func() { - adjustWorkerPoolSize(ctx, pipe, job, avgRowSize) + adjustWorkerPoolSize(ctx, pipe, job, bcCtx, avgRowSize) }() } diff --git a/pkg/ddl/backfilling_read_index.go b/pkg/ddl/backfilling_read_index.go index de0b3c8409db6..14e7fb501514d 100644 --- a/pkg/ddl/backfilling_read_index.go +++ b/pkg/ddl/backfilling_read_index.go @@ -114,14 +114,14 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta if err != nil { return err } - return executeAndClosePipeline(opCtx, pipe, nil, 0) + return executeAndClosePipeline(opCtx, pipe, nil, nil, 0) } pipe, err := r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency) if err != nil { return err } - err = executeAndClosePipeline(opCtx, pipe, nil, 0) + err = executeAndClosePipeline(opCtx, pipe, nil, nil, 0) if err != nil { // For dist task local based ingest, checkpoint is unsupported. // If there is an error we should keep local sort dir clean. diff --git a/pkg/ddl/job_worker.go b/pkg/ddl/job_worker.go index 01c0dc5547cfc..1ab333562ccf1 100644 --- a/pkg/ddl/job_worker.go +++ b/pkg/ddl/job_worker.go @@ -867,6 +867,7 @@ func (w *worker) runOneJobStep( if latestJob.IsAlterable() { job.ReorgMeta.SetConcurrency(latestJob.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))) job.ReorgMeta.SetBatchSize(latestJob.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))) + job.ReorgMeta.SetMaxWriteSpeed(latestJob.ReorgMeta.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load()))) } } } diff --git a/pkg/lightning/backend/local/region_job.go b/pkg/lightning/backend/local/region_job.go index b9b42f459f802..69abd6c19425b 100644 --- a/pkg/lightning/backend/local/region_job.go +++ b/pkg/lightning/backend/local/region_job.go @@ -785,6 +785,11 @@ func (local *Backend) UpdateLimiter(limit int) { local.writeLimiter.UpdateLimiter(limit) } +// GetLimiterSpeed returns the speed of the write limiter. +func (local *Backend) GetLimiterSpeed() int { + return local.writeLimiter.Limit() +} + // convertStageOnIngestError will try to fix the error contained in ingest response. // Return (_, error) when another error occurred. // Return (true, nil) when the job can retry ingesting immediately. From 07869e388d8d3f69386eee264c2fd9f15b138b86 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Fri, 22 Nov 2024 00:26:39 +0800 Subject: [PATCH 05/38] Update BUILD.bazel --- pkg/planner/core/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/planner/core/BUILD.bazel b/pkg/planner/core/BUILD.bazel index fa4bf2170eadc..3de5328f78ec7 100644 --- a/pkg/planner/core/BUILD.bazel +++ b/pkg/planner/core/BUILD.bazel @@ -196,6 +196,7 @@ go_library( "//pkg/util/tiflashcompute", "//pkg/util/tracing", "@com_github_bits_and_blooms_bitset//:bitset", + "@com_github_docker_go_units//:go-units", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/coprocessor", From b7eed57b2a5a4bf443975e2c43a52afd5f7dd473 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Fri, 22 Nov 2024 09:29:08 +0800 Subject: [PATCH 06/38] fix nogo --- pkg/lightning/backend/local/localhelper.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/lightning/backend/local/localhelper.go b/pkg/lightning/backend/local/localhelper.go index 0f15b8d083c63..3c3fc178f22ec 100644 --- a/pkg/lightning/backend/local/localhelper.go +++ b/pkg/lightning/backend/local/localhelper.go @@ -154,19 +154,18 @@ func newStoreWriteLimiter(limit int) *storeWriteLimiter { } } -func calculateLimitAndBurst(limit int) (int64, int64) { - if limit <= 0 { +func calculateLimitAndBurst(writeLimit int) (limit int64, burst int64) { + if writeLimit <= 0 { return 0, 0 } - var burst int - // Allow burst of at most 20% of the limit. - if limit <= math.MaxInt-limit/5 { - burst = limit + limit/5 + // Allow burst of at most 20% of the writeLimit. + if writeLimit <= math.MaxInt-writeLimit/5 { + burst = int64(writeLimit) + int64(writeLimit)/5 } else { // If overflowed, set burst to math.MaxInt. burst = math.MaxInt } - return int64(limit), int64(burst) + return int64(writeLimit), burst } func (s *storeWriteLimiter) WaitN(ctx context.Context, storeID uint64, n int) error { From 3e3955fc5df8be438f89ad32d8adc30d07fcb0ca Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Fri, 22 Nov 2024 11:13:01 +0800 Subject: [PATCH 07/38] fix absurd --- pkg/ddl/backfilling_dist_executor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index ccc50c0c99a61..956d5b547f430 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -154,7 +154,7 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) { discovery, job.ReorgMeta.ResourceGroupName, job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), - job.ReorgMeta.GetConcurrencyOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load())), + job.ReorgMeta.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load())), job.RealStartTS, ) } From fb8422b270d2035adf0efcebc303f56461120343 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Fri, 22 Nov 2024 12:41:03 +0800 Subject: [PATCH 08/38] show ddl jobs print max write speed --- pkg/executor/show_ddl_jobs.go | 4 ++++ pkg/executor/show_ddl_jobs_test.go | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/executor/show_ddl_jobs.go b/pkg/executor/show_ddl_jobs.go index 440e0bc995ffb..68a14cc404775 100644 --- a/pkg/executor/show_ddl_jobs.go +++ b/pkg/executor/show_ddl_jobs.go @@ -317,12 +317,16 @@ func showCommentsFromJob(job *model.Job) string { if job.MayNeedReorg() { concurrency := m.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) batchSize := m.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())) + maxWriteSpeed := m.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load())) if concurrency != variable.DefTiDBDDLReorgWorkerCount { labels = append(labels, fmt.Sprintf("thread=%d", concurrency)) } if batchSize != variable.DefTiDBDDLReorgBatchSize { labels = append(labels, fmt.Sprintf("batch_size=%d", batchSize)) } + if maxWriteSpeed != variable.DefTiDBDDLReorgMaxWriteSpeed { + labels = append(labels, fmt.Sprintf("max_write_speed=%d", maxWriteSpeed)) + } if m.TargetScope != "" { labels = append(labels, fmt.Sprintf("service_scope=%s", m.TargetScope)) } diff --git a/pkg/executor/show_ddl_jobs_test.go b/pkg/executor/show_ddl_jobs_test.go index 5b9aabd3324fb..c2e4f3f601afb 100644 --- a/pkg/executor/show_ddl_jobs_test.go +++ b/pkg/executor/show_ddl_jobs_test.go @@ -70,9 +70,10 @@ func TestShowCommentsFromJob(t *testing.T) { UseCloudStorage: true, Concurrency: 8, BatchSize: 1024, + MaxWriteSpeed: 1024 * 1024, } res = showCommentsFromJob(job) - require.Equal(t, "ingest, DXF, cloud, thread=8, batch_size=1024", res) + require.Equal(t, "ingest, DXF, cloud, thread=8, batch_size=1024, max_write_speed=1048576", res) job.ReorgMeta = &model.DDLReorgMeta{ ReorgTp: model.ReorgTypeLitMerge, @@ -80,6 +81,7 @@ func TestShowCommentsFromJob(t *testing.T) { UseCloudStorage: true, Concurrency: variable.DefTiDBDDLReorgWorkerCount, BatchSize: variable.DefTiDBDDLReorgBatchSize, + MaxWriteSpeed: variable.DefTiDBDDLReorgMaxWriteSpeed, } res = showCommentsFromJob(job) require.Equal(t, "ingest, DXF, cloud", res) @@ -90,6 +92,7 @@ func TestShowCommentsFromJob(t *testing.T) { UseCloudStorage: true, Concurrency: variable.DefTiDBDDLReorgWorkerCount, BatchSize: variable.DefTiDBDDLReorgBatchSize, + MaxWriteSpeed: variable.DefTiDBDDLReorgMaxWriteSpeed, TargetScope: "background", } res = showCommentsFromJob(job) From cc985976addceca67eb699591190a77941d84ea9 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Fri, 22 Nov 2024 14:17:27 +0800 Subject: [PATCH 09/38] rename --- pkg/ddl/backfilling.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index d86808acc81ed..c1a0186399e8b 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -794,7 +794,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( return bcCtx.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup) } -func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job *model.Job, bcCtx ingest.BackendCtx, avgRowSize int) { +func adjustWorkerCntAndMaxWriteSpeed(ctx context.Context, pipe *operator.AsyncPipeline, job *model.Job, bcCtx ingest.BackendCtx, avgRowSize int) { opR, opW := pipe.GetLocalIngestModeReaderAndWriter() if opR == nil || opW == nil { logutil.DDLIngestLogger().Error("failed to get local ingest mode reader or writer", zap.Int64("jobID", job.ID)) @@ -847,10 +847,10 @@ func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job return err } - // Adjust worker pool size dynamically. + // Adjust worker pool size and max write speed dynamically. if job != nil { go func() { - adjustWorkerPoolSize(ctx, pipe, job, bcCtx, avgRowSize) + adjustWorkerCntAndMaxWriteSpeed(ctx, pipe, job, bcCtx, avgRowSize) }() } From bb381f971f48b354121875c3e177e2cb56b23d78 Mon Sep 17 00:00:00 2001 From: fzzf678 <108643977+fzzf678@users.noreply.github.com> Date: Fri, 22 Nov 2024 14:38:10 +0800 Subject: [PATCH 10/38] Update pkg/meta/model/reorg.go Co-authored-by: CbcWestwolf <1004626265@qq.com> --- pkg/meta/model/reorg.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/meta/model/reorg.go b/pkg/meta/model/reorg.go index d0022cfa96374..5cde2ad3f020e 100644 --- a/pkg/meta/model/reorg.go +++ b/pkg/meta/model/reorg.go @@ -112,11 +112,10 @@ func (dm *DDLReorgMeta) SetBatchSize(batchSize int) { // GetMaxWriteSpeedOrDefault gets the max write speed from DDLReorgMeta. // 0 means no limit. func (dm *DDLReorgMeta) GetMaxWriteSpeedOrDefault(defaultVal int) int { - maxWriteSpeed := atomic.LoadInt64(&dm.MaxWriteSpeed) if dm == nil { return defaultVal } - return int(maxWriteSpeed) + return int(atomic.LoadInt64(&dm.MaxWriteSpeed)) } // SetMaxWriteSpeed sets the max write speed in DDLReorgMeta. From 50c43347ba8d7361dcf9b1ec9f10abfa398cb135 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Fri, 22 Nov 2024 17:13:17 +0800 Subject: [PATCH 11/38] wait goroutine exit --- pkg/ddl/backfilling.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index c1a0186399e8b..407fca9c1b876 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -848,13 +848,20 @@ func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job } // Adjust worker pool size and max write speed dynamically. + var wg sync.WaitGroup + adjustCtx, cancel := context.WithCancel(ctx) if job != nil { + wg.Add(1) go func() { - adjustWorkerCntAndMaxWriteSpeed(ctx, pipe, job, bcCtx, avgRowSize) + defer wg.Done() + adjustWorkerCntAndMaxWriteSpeed(adjustCtx, pipe, job, bcCtx, avgRowSize) }() } err = pipe.Close() + + cancel() + wg.Wait() // wait for adjustWorkerCntAndMaxWriteSpeed to exit if opErr := ctx.OperatorErr(); opErr != nil { return opErr } From 7d46d9e63510985e2bbe3a92e0e7cb5056d2d5a1 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Fri, 22 Nov 2024 17:47:14 +0800 Subject: [PATCH 12/38] support int type receive value --- pkg/ddl/db_test.go | 2 ++ pkg/executor/operate_ddl_jobs.go | 34 ++++++++++++++++++---------- pkg/planner/core/planbuilder.go | 23 +++++++++++++++---- pkg/planner/core/planbuilder_test.go | 15 +++++++++++- 4 files changed, 57 insertions(+), 17 deletions(-) diff --git a/pkg/ddl/db_test.go b/pkg/ddl/db_test.go index 51059e8e3cdb1..07e01c4a6572f 100644 --- a/pkg/ddl/db_test.go +++ b/pkg/ddl/db_test.go @@ -1196,6 +1196,7 @@ func TestAdminAlterDDLJobUnsupportedCases(t *testing.T) { tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 31;", "the value 31 for batch_size is out of range [32, 10240]") tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 10241;", "the value 10241 for batch_size is out of range [32, 10240]") tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '2PiB';", "the value 2PiB for max_write_speed is out of range [0, 1125899906842624]") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = -1;", "the value -1 for max_write_speed is out of range [0, 1125899906842624]") // valid config value tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 16;", "ddl job 1 is not running") tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 64;", "ddl job 1 is not running") @@ -1204,6 +1205,7 @@ func TestAdminAlterDDLJobUnsupportedCases(t *testing.T) { tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '2KB';", "ddl job 1 is not running") tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '3MiB';", "ddl job 1 is not running") tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '4 gb';", "ddl job 1 is not running") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 1;", "ddl job 1 is not running") // invalid job id tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 8;", "ddl job 1 is not running") diff --git a/pkg/executor/operate_ddl_jobs.go b/pkg/executor/operate_ddl_jobs.go index 4dded20fa6577..d5fb507c06b69 100644 --- a/pkg/executor/operate_ddl_jobs.go +++ b/pkg/executor/operate_ddl_jobs.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser/types" "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/util/chunk" @@ -198,28 +199,37 @@ func (e *AlterDDLJobExec) processAlterDDLJobConfig( func (e *AlterDDLJobExec) updateReorgMeta(job *model.Job, byWho model.AdminCommandOperator) error { for _, opt := range e.AlterOpts { + if opt.Value == nil { + continue + } switch opt.Name { case core.AlterDDLJobThread: - if opt.Value != nil { - cons := opt.Value.(*expression.Constant) - job.ReorgMeta.SetConcurrency(int(cons.Value.GetInt64())) - } + cons := opt.Value.(*expression.Constant) + job.ReorgMeta.SetConcurrency(int(cons.Value.GetInt64())) job.AdminOperator = byWho case core.AlterDDLJobBatchSize: - if opt.Value != nil { - cons := opt.Value.(*expression.Constant) - job.ReorgMeta.SetBatchSize(int(cons.Value.GetInt64())) - } + cons := opt.Value.(*expression.Constant) + job.ReorgMeta.SetBatchSize(int(cons.Value.GetInt64())) job.AdminOperator = byWho case core.AlterDDLJobMaxWriteSpeed: - if opt.Value != nil { - cons := opt.Value.(*expression.Constant) - speed, err := units.RAMInBytes(cons.Value.GetString()) + var ( + speed int64 + err error + ) + v := opt.Value.(*expression.Constant) + switch v.RetType.EvalType() { + case types.ETString: + speedStr := opt.Value.(*expression.Constant).Value.GetString() + speed, err = units.RAMInBytes(speedStr) if err != nil { return errors.Trace(err) } - job.ReorgMeta.SetMaxWriteSpeed(int(speed)) + case types.ETInt: + speed = opt.Value.(*expression.Constant).Value.GetInt64() + default: + return fmt.Errorf("the value %s for %s is invalid", opt.Name, opt.Value) } + job.ReorgMeta.SetMaxWriteSpeed(int(speed)) job.AdminOperator = byWho default: return errors.Errorf("unsupported admin alter ddl jobs config: %s", opt.Name) diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index 52d717cf08fc7..084a10209530c 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -5942,11 +5942,26 @@ func checkAlterDDLJobOptValue(opt *AlterDDLJobOpt) error { bs, opt.Name, variable.MinDDLReorgBatchSize, variable.MaxDDLReorgBatchSize) } case AlterDDLJobMaxWriteSpeed: - speedStr := opt.Value.(*expression.Constant).Value.GetString() - speed, err := units.RAMInBytes(speedStr) - if err != nil { - return errors.Trace(err) + var ( + speed int64 + speedStr string + err error + ) + v := opt.Value.(*expression.Constant) + switch v.RetType.EvalType() { + case types.ETString: + speedStr = opt.Value.(*expression.Constant).Value.GetString() + speed, err = units.RAMInBytes(speedStr) + if err != nil { + return errors.Trace(err) + } + case types.ETInt: + speed = opt.Value.(*expression.Constant).Value.GetInt64() + speedStr = strconv.FormatInt(speed, 10) + default: + return fmt.Errorf("the value %s for %s is invalid", opt.Name, opt.Value) } + if speed < 0 || speed > units.PiB { return fmt.Errorf("the value %s for %s is out of range [%v, %v]", speedStr, opt.Name, 0, units.PiB) diff --git a/pkg/planner/core/planbuilder_test.go b/pkg/planner/core/planbuilder_test.go index d2c416960b7da..c9171bcf24fe0 100644 --- a/pkg/planner/core/planbuilder_test.go +++ b/pkg/planner/core/planbuilder_test.go @@ -924,13 +924,26 @@ func TestBuildAdminAlterDDLJobPlan(t *testing.T) { require.True(t, ok) require.Equal(t, cons.Value.GetString(), "10MiB") - stmt, err = parser.ParseOneStmt("admin alter ddl jobs 4 thread = 16, batch_size = 512, max_write_speed = '10MiB' ", "", "") + stmt, err = parser.ParseOneStmt("admin alter ddl jobs 4 max_write_speed = 1024", "", "") require.NoError(t, err) p, err = builder.Build(ctx, resolve.NewNodeW(stmt)) require.NoError(t, err) plan, ok = p.(*AlterDDLJob) require.True(t, ok) require.Equal(t, plan.JobID, int64(4)) + require.Equal(t, len(plan.Options), 1) + require.Equal(t, plan.Options[0].Name, AlterDDLJobMaxWriteSpeed) + cons, ok = plan.Options[0].Value.(*expression.Constant) + require.True(t, ok) + require.Equal(t, cons.Value.GetInt64(), int64(1024)) + + stmt, err = parser.ParseOneStmt("admin alter ddl jobs 5 thread = 16, batch_size = 512, max_write_speed = '10MiB' ", "", "") + require.NoError(t, err) + p, err = builder.Build(ctx, resolve.NewNodeW(stmt)) + require.NoError(t, err) + plan, ok = p.(*AlterDDLJob) + require.True(t, ok) + require.Equal(t, plan.JobID, int64(5)) require.Equal(t, len(plan.Options), 3) sort.Slice(plan.Options, func(i, j int) bool { return plan.Options[i].Name < plan.Options[j].Name From 73453298c224b5385596d36f1a6972dfb4bc0831 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Fri, 22 Nov 2024 17:49:52 +0800 Subject: [PATCH 13/38] handle the nil case in Set --- pkg/meta/model/reorg.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/meta/model/reorg.go b/pkg/meta/model/reorg.go index 5cde2ad3f020e..c0b2b39499845 100644 --- a/pkg/meta/model/reorg.go +++ b/pkg/meta/model/reorg.go @@ -92,6 +92,9 @@ func (dm *DDLReorgMeta) GetConcurrencyOrDefault(defaultVal int) int { // SetConcurrency sets the concurrency in DDLReorgMeta. func (dm *DDLReorgMeta) SetConcurrency(concurrency int) { + if dm == nil { + dm = &DDLReorgMeta{} + } atomic.StoreInt64(&dm.Concurrency, int64(concurrency)) } @@ -106,6 +109,9 @@ func (dm *DDLReorgMeta) GetBatchSizeOrDefault(defaultVal int) int { // SetBatchSize sets the batch size in DDLReorgMeta. func (dm *DDLReorgMeta) SetBatchSize(batchSize int) { + if dm == nil { + dm = &DDLReorgMeta{} + } atomic.StoreInt64(&dm.BatchSize, int64(batchSize)) } @@ -120,6 +126,9 @@ func (dm *DDLReorgMeta) GetMaxWriteSpeedOrDefault(defaultVal int) int { // SetMaxWriteSpeed sets the max write speed in DDLReorgMeta. func (dm *DDLReorgMeta) SetMaxWriteSpeed(maxWriteSpeed int) { + if dm == nil { + dm = &DDLReorgMeta{} + } atomic.StoreInt64(&dm.MaxWriteSpeed, int64(maxWriteSpeed)) } From 236a6881e97c65327fae635c9f3fe70b40426a96 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Fri, 22 Nov 2024 17:54:36 +0800 Subject: [PATCH 14/38] Update planbuilder.go --- pkg/planner/core/planbuilder.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index 084a10209530c..953dc48f78598 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -5943,28 +5943,26 @@ func checkAlterDDLJobOptValue(opt *AlterDDLJobOpt) error { } case AlterDDLJobMaxWriteSpeed: var ( - speed int64 - speedStr string - err error + speed int64 + err error ) v := opt.Value.(*expression.Constant) switch v.RetType.EvalType() { case types.ETString: - speedStr = opt.Value.(*expression.Constant).Value.GetString() + speedStr := opt.Value.(*expression.Constant).Value.GetString() speed, err = units.RAMInBytes(speedStr) if err != nil { return errors.Trace(err) } case types.ETInt: speed = opt.Value.(*expression.Constant).Value.GetInt64() - speedStr = strconv.FormatInt(speed, 10) default: return fmt.Errorf("the value %s for %s is invalid", opt.Name, opt.Value) } if speed < 0 || speed > units.PiB { return fmt.Errorf("the value %s for %s is out of range [%v, %v]", - speedStr, opt.Name, 0, units.PiB) + opt.Value, opt.Name, 0, units.PiB) } } return nil From 15e2726a123de5d15bb1d83feee24f3d8648a07d Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Fri, 22 Nov 2024 17:55:11 +0800 Subject: [PATCH 15/38] Update planbuilder.go --- pkg/planner/core/planbuilder.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index 953dc48f78598..ca81a89de3f62 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -5959,7 +5959,6 @@ func checkAlterDDLJobOptValue(opt *AlterDDLJobOpt) error { default: return fmt.Errorf("the value %s for %s is invalid", opt.Name, opt.Value) } - if speed < 0 || speed > units.PiB { return fmt.Errorf("the value %s for %s is out of range [%v, %v]", opt.Value, opt.Name, 0, units.PiB) From 8c07af02cd89bdf8311245955f3ac8759bca4eda Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Fri, 22 Nov 2024 18:09:37 +0800 Subject: [PATCH 16/38] add test --- .../backend/local/region_job_test.go | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/pkg/lightning/backend/local/region_job_test.go b/pkg/lightning/backend/local/region_job_test.go index 3a34c4f6e0f0d..0e39bf31ad085 100644 --- a/pkg/lightning/backend/local/region_job_test.go +++ b/pkg/lightning/backend/local/region_job_test.go @@ -579,3 +579,25 @@ func TestStoreBalancerNoRace(t *testing.T) { <-done2 require.Len(t, jobFromWorkerCh, 0) } + +func TestUpdateAndGetLimiterConcurrencySafety(t *testing.T) { + backend := &Backend{ + writeLimiter: newStoreWriteLimiter(0), + } + + var wg sync.WaitGroup + concurrentRoutines := 100 + for i := 0; i < concurrentRoutines; i++ { + wg.Add(2) + go func(limit int) { + defer wg.Done() + backend.UpdateLimiter(limit) + }(i) + + go func() { + defer wg.Done() + _ = backend.GetLimiterSpeed() + }() + } + wg.Wait() +} From 1f9a46c62cfa4c51ff6038600e290c820882da0e Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Fri, 22 Nov 2024 18:49:54 +0800 Subject: [PATCH 17/38] fix test --- pkg/planner/core/planbuilder.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index ca81a89de3f62..baf008679af4a 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -5943,25 +5943,27 @@ func checkAlterDDLJobOptValue(opt *AlterDDLJobOpt) error { } case AlterDDLJobMaxWriteSpeed: var ( - speed int64 - err error + speed int64 + err error + speedStr string ) v := opt.Value.(*expression.Constant) switch v.RetType.EvalType() { case types.ETString: - speedStr := opt.Value.(*expression.Constant).Value.GetString() + speedStr = opt.Value.(*expression.Constant).Value.GetString() speed, err = units.RAMInBytes(speedStr) if err != nil { return errors.Trace(err) } case types.ETInt: speed = opt.Value.(*expression.Constant).Value.GetInt64() + speedStr = strconv.FormatInt(speed, 10) default: return fmt.Errorf("the value %s for %s is invalid", opt.Name, opt.Value) } if speed < 0 || speed > units.PiB { return fmt.Errorf("the value %s for %s is out of range [%v, %v]", - opt.Value, opt.Name, 0, units.PiB) + speedStr, opt.Name, 0, units.PiB) } } return nil From 543a85ef8699a4d45839cb28451d02c15a0766ea Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Sun, 24 Nov 2024 18:59:41 +0800 Subject: [PATCH 18/38] get the speed after update --- pkg/ddl/backfilling.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 407fca9c1b876..f0fb872a46934 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -823,7 +823,7 @@ func adjustWorkerCntAndMaxWriteSpeed(ctx context.Context, pipe *operator.AsyncPi bcCtx.GetLocalBackend().UpdateLimiter(maxWriteSpeed) logutil.DDLIngestLogger().Info("adjust ddl job config success", zap.Int64("jobID", job.ID), - zap.Int("max write speed", maxWriteSpeed)) + zap.Int("max write speed", bcCtx.GetLocalBackend().GetLimiterSpeed())) } concurrency := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) From c1ea4f0b13678e0de62f4cfa343b7a1a110dcbe4 Mon Sep 17 00:00:00 2001 From: fzzf678 <108643977+fzzf678@users.noreply.github.com> Date: Mon, 25 Nov 2024 11:14:27 +0800 Subject: [PATCH 19/38] Update pkg/lightning/backend/local/localhelper_test.go Co-authored-by: D3Hunter --- pkg/lightning/backend/local/localhelper_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/lightning/backend/local/localhelper_test.go b/pkg/lightning/backend/local/localhelper_test.go index 5cc453e435147..ad772e9fa7b25 100644 --- a/pkg/lightning/backend/local/localhelper_test.go +++ b/pkg/lightning/backend/local/localhelper_test.go @@ -344,7 +344,7 @@ func TestTuneStoreWriteLimiter(t *testing.T) { gotTokens += n } elapsed := time.Since(start) - maxTokens := 120 + int(float64(elapsed)/float64(time.Second)*float64(maxT)) + maxTokens := 120 + int(float64(elapsed.Seconds())*float64(maxT)) // In theory, gotTokens should be less than or equal to maxT. // But we allow a little of error to avoid the test being flaky. require.LessOrEqual(t, gotTokens, maxTokens+1) From fc28a05821bd83f791fbbb27b40b30d1df083aed Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 25 Nov 2024 11:20:18 +0800 Subject: [PATCH 20/38] fix some coment use util.WaitGroupWrapper rename limit related func use atomic.int64 in storeWriteLimiter remove noopStoreWriteLimit --- pkg/ddl/backfilling.go | 8 ++-- pkg/lightning/backend/local/local_test.go | 8 ++-- pkg/lightning/backend/local/localhelper.go | 45 +++++++------------ .../backend/local/localhelper_test.go | 2 +- pkg/lightning/backend/local/region_job.go | 10 ++--- .../backend/local/region_job_test.go | 4 +- 6 files changed, 33 insertions(+), 44 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index f0fb872a46934..fc5a624bafcb7 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -819,11 +819,11 @@ func adjustWorkerCntAndMaxWriteSpeed(ctx context.Context, pipe *operator.AsyncPi return case <-ticker.C: maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load())) - if maxWriteSpeed != bcCtx.GetLocalBackend().GetLimiterSpeed() { - bcCtx.GetLocalBackend().UpdateLimiter(maxWriteSpeed) + if maxWriteSpeed != bcCtx.GetLocalBackend().GetWriteSpeedLimit() { + bcCtx.GetLocalBackend().UpdateWriteSpeedLimit(maxWriteSpeed) logutil.DDLIngestLogger().Info("adjust ddl job config success", zap.Int64("jobID", job.ID), - zap.Int("max write speed", bcCtx.GetLocalBackend().GetLimiterSpeed())) + zap.Int("max write speed", bcCtx.GetLocalBackend().GetWriteSpeedLimit())) } concurrency := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) @@ -848,7 +848,7 @@ func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job } // Adjust worker pool size and max write speed dynamically. - var wg sync.WaitGroup + var wg util.WaitGroupWrapper adjustCtx, cancel := context.WithCancel(ctx) if job != nil { wg.Add(1) diff --git a/pkg/lightning/backend/local/local_test.go b/pkg/lightning/backend/local/local_test.go index 35dd73dd36363..bac7821f33122 100644 --- a/pkg/lightning/backend/local/local_test.go +++ b/pkg/lightning/backend/local/local_test.go @@ -1223,7 +1223,7 @@ func TestCheckPeersBusy(t *testing.T) { }, }, logger: log.L(), - writeLimiter: noopStoreWriteLimiter{}, + writeLimiter: newStoreWriteLimiter(0), supportMultiIngest: true, BackendConfig: BackendConfig{ ShouldCheckWriteStall: true, @@ -1347,7 +1347,7 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) { }, }, logger: log.L(), - writeLimiter: noopStoreWriteLimiter{}, + writeLimiter: newStoreWriteLimiter(0), supportMultiIngest: true, BackendConfig: BackendConfig{ ShouldCheckWriteStall: true, @@ -1446,7 +1446,7 @@ func TestPartialWriteIngestErrorWontPanic(t *testing.T) { }, }, logger: log.L(), - writeLimiter: noopStoreWriteLimiter{}, + writeLimiter: newStoreWriteLimiter(0), supportMultiIngest: true, tikvCodec: keyspace.CodecV1, BackendConfig: BackendConfig{ @@ -1542,7 +1542,7 @@ func TestPartialWriteIngestBusy(t *testing.T) { }, }, logger: log.L(), - writeLimiter: noopStoreWriteLimiter{}, + writeLimiter: newStoreWriteLimiter(0), supportMultiIngest: true, tikvCodec: keyspace.CodecV1, BackendConfig: BackendConfig{ diff --git a/pkg/lightning/backend/local/localhelper.go b/pkg/lightning/backend/local/localhelper.go index 3c3fc178f22ec..402fbe3ab99cf 100644 --- a/pkg/lightning/backend/local/localhelper.go +++ b/pkg/lightning/backend/local/localhelper.go @@ -134,24 +134,25 @@ func largerStartKey(a, b []byte) []byte { type StoreWriteLimiter interface { WaitN(ctx context.Context, storeID uint64, n int) error Limit() int - UpdateLimiter(limit int) + UpdateLimit(limit int) } type storeWriteLimiter struct { rwm sync.RWMutex limiters map[uint64]*rate.Limiter // limit and burst can only be non-negative, 0 means no rate limiting. - limit int64 - burst int64 + limit atomic.Int64 + burst atomic.Int64 } func newStoreWriteLimiter(limit int) *storeWriteLimiter { l, b := calculateLimitAndBurst(limit) - return &storeWriteLimiter{ + s := &storeWriteLimiter{ limiters: make(map[uint64]*rate.Limiter), - limit: l, - burst: b, } + s.limit.Store(l) + s.burst.Store(b) + return s } func calculateLimitAndBurst(writeLimit int) (limit int64, burst int64) { @@ -185,11 +186,11 @@ func (s *storeWriteLimiter) WaitN(ctx context.Context, storeID uint64, n int) er } func (s *storeWriteLimiter) Limit() int { - return int(atomic.LoadInt64(&s.limit)) + return int(s.limit.Load()) } func (s *storeWriteLimiter) getLimiter(storeID uint64) *rate.Limiter { - if atomic.LoadInt64(&s.limit) == 0 { + if s.limit.Load() == 0 { return nil } s.rwm.RLock() @@ -202,45 +203,33 @@ func (s *storeWriteLimiter) getLimiter(storeID uint64) *rate.Limiter { defer s.rwm.Unlock() limiter, ok = s.limiters[storeID] if !ok { - limiter = rate.NewLimiter(rate.Limit(atomic.LoadInt64(&s.limit)), int(atomic.LoadInt64(&s.burst))) + limiter = rate.NewLimiter(rate.Limit(s.limit.Load()), int(s.burst.Load())) s.limiters[storeID] = limiter } return limiter } -func (s *storeWriteLimiter) UpdateLimiter(newLimit int) { +func (s *storeWriteLimiter) UpdateLimit(newLimit int) { limit, burst := calculateLimitAndBurst(newLimit) - if atomic.LoadInt64(&s.limit) == limit { + if s.limit.Load() == limit { return } - atomic.StoreInt64(&s.limit, limit) - atomic.StoreInt64(&s.burst, burst) + s.limit.Store(limit) + s.burst.Store(burst) // Update all existing limiters with the new limit and burst values. s.rwm.Lock() defer s.rwm.Unlock() - if atomic.LoadInt64(&s.limit) == 0 { + if s.limit.Load() == 0 { s.limiters = make(map[uint64]*rate.Limiter) return } for _, limiter := range s.limiters { - limiter.SetLimit(rate.Limit(atomic.LoadInt64(&s.limit))) - limiter.SetBurst(int(atomic.LoadInt64(&s.burst))) + limiter.SetLimit(rate.Limit(s.limit.Load())) + limiter.SetBurst(int(s.burst.Load())) } } -type noopStoreWriteLimiter struct{} - -func (noopStoreWriteLimiter) WaitN(_ context.Context, _ uint64, _ int) error { - return nil -} - -func (noopStoreWriteLimiter) Limit() int { - return math.MaxInt -} - -func (noopStoreWriteLimiter) UpdateLimiter(_ int) {} - // compaction threshold const ( CompactionLowerThreshold = 512 * units.MiB diff --git a/pkg/lightning/backend/local/localhelper_test.go b/pkg/lightning/backend/local/localhelper_test.go index 5cc453e435147..1cccf1dfe0fd5 100644 --- a/pkg/lightning/backend/local/localhelper_test.go +++ b/pkg/lightning/backend/local/localhelper_test.go @@ -357,7 +357,7 @@ func TestTuneStoreWriteLimiter(t *testing.T) { defer cancel0() testLimiter(ctx0, 100) - limiter.UpdateLimiter(200) + limiter.UpdateLimit(200) ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second*2) defer cancel1() testLimiter(ctx1, 200) diff --git a/pkg/lightning/backend/local/region_job.go b/pkg/lightning/backend/local/region_job.go index 69abd6c19425b..891a8cdffb261 100644 --- a/pkg/lightning/backend/local/region_job.go +++ b/pkg/lightning/backend/local/region_job.go @@ -780,13 +780,13 @@ func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestRe return resp, nil } -// UpdateLimiter updates the write limiter of the backend. -func (local *Backend) UpdateLimiter(limit int) { - local.writeLimiter.UpdateLimiter(limit) +// UpdateWriteSpeedLimit updates the write limiter of the backend. +func (local *Backend) UpdateWriteSpeedLimit(limit int) { + local.writeLimiter.UpdateLimit(limit) } -// GetLimiterSpeed returns the speed of the write limiter. -func (local *Backend) GetLimiterSpeed() int { +// GetWriteSpeedLimit returns the speed of the write limiter. +func (local *Backend) GetWriteSpeedLimit() int { return local.writeLimiter.Limit() } diff --git a/pkg/lightning/backend/local/region_job_test.go b/pkg/lightning/backend/local/region_job_test.go index 0e39bf31ad085..c22e223554630 100644 --- a/pkg/lightning/backend/local/region_job_test.go +++ b/pkg/lightning/backend/local/region_job_test.go @@ -591,12 +591,12 @@ func TestUpdateAndGetLimiterConcurrencySafety(t *testing.T) { wg.Add(2) go func(limit int) { defer wg.Done() - backend.UpdateLimiter(limit) + backend.UpdateWriteSpeedLimit(limit) }(i) go func() { defer wg.Done() - _ = backend.GetLimiterSpeed() + _ = backend.GetWriteSpeedLimit() }() } wg.Wait() From 62cb8f35cc01220f931084a4caf0d414e049db1e Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 25 Nov 2024 12:21:33 +0800 Subject: [PATCH 21/38] merge duplicate code --- pkg/executor/operate_ddl_jobs.go | 21 +++------------ pkg/planner/core/planbuilder.go | 45 ++++++++++++++++++-------------- 2 files changed, 29 insertions(+), 37 deletions(-) diff --git a/pkg/executor/operate_ddl_jobs.go b/pkg/executor/operate_ddl_jobs.go index d5fb507c06b69..556186a28a61e 100644 --- a/pkg/executor/operate_ddl_jobs.go +++ b/pkg/executor/operate_ddl_jobs.go @@ -19,7 +19,6 @@ import ( "fmt" "strconv" - "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/ddl" @@ -29,7 +28,6 @@ import ( "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tidb/pkg/parser/types" "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/util/chunk" @@ -212,22 +210,9 @@ func (e *AlterDDLJobExec) updateReorgMeta(job *model.Job, byWho model.AdminComma job.ReorgMeta.SetBatchSize(int(cons.Value.GetInt64())) job.AdminOperator = byWho case core.AlterDDLJobMaxWriteSpeed: - var ( - speed int64 - err error - ) - v := opt.Value.(*expression.Constant) - switch v.RetType.EvalType() { - case types.ETString: - speedStr := opt.Value.(*expression.Constant).Value.GetString() - speed, err = units.RAMInBytes(speedStr) - if err != nil { - return errors.Trace(err) - } - case types.ETInt: - speed = opt.Value.(*expression.Constant).Value.GetInt64() - default: - return fmt.Errorf("the value %s for %s is invalid", opt.Name, opt.Value) + speed, err := core.GetMaxWriteSpeedNumericValFromExpression(opt) + if err != nil { + return err } job.ReorgMeta.SetMaxWriteSpeed(int(speed)) job.AdminOperator = byWho diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index 271c2bb827f5c..2af06adf03f0c 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -5944,29 +5944,36 @@ func checkAlterDDLJobOptValue(opt *AlterDDLJobOpt) error { bs, opt.Name, variable.MinDDLReorgBatchSize, variable.MaxDDLReorgBatchSize) } case AlterDDLJobMaxWriteSpeed: - var ( - speed int64 - err error - speedStr string - ) - v := opt.Value.(*expression.Constant) - switch v.RetType.EvalType() { - case types.ETString: - speedStr = opt.Value.(*expression.Constant).Value.GetString() - speed, err = units.RAMInBytes(speedStr) - if err != nil { - return errors.Trace(err) - } - case types.ETInt: - speed = opt.Value.(*expression.Constant).Value.GetInt64() - speedStr = strconv.FormatInt(speed, 10) - default: - return fmt.Errorf("the value %s for %s is invalid", opt.Name, opt.Value) + speed, err := GetMaxWriteSpeedNumericValFromExpression(opt) + if err != nil { + return err } if speed < 0 || speed > units.PiB { return fmt.Errorf("the value %s for %s is out of range [%v, %v]", - speedStr, opt.Name, 0, units.PiB) + strconv.FormatInt(speed, 10), opt.Name, 0, units.PiB) } } return nil } + +// GetMaxWriteSpeedNumericValFromExpression gets the numeric value of the max write speed from the expression. +func GetMaxWriteSpeedNumericValFromExpression(opt *AlterDDLJobOpt) (int64, error) { + var ( + speed int64 + err error + ) + v := opt.Value.(*expression.Constant) + switch v.RetType.EvalType() { + case types.ETString: + speedStr := v.Value.GetString() + speed, err = units.RAMInBytes(speedStr) + if err != nil { + return 0, errors.Trace(err) + } + case types.ETInt: + speed = v.Value.GetInt64() + default: + return 0, fmt.Errorf("the value %s for %s is invalid", opt.Name, opt.Value) + } + return speed, nil +} From 1b25f9a3420f56783edf406d69c89e1639fe89a4 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 25 Nov 2024 12:34:10 +0800 Subject: [PATCH 22/38] handle the reorg meta nil case when unmarshalling --- pkg/meta/model/job.go | 3 +++ pkg/meta/model/reorg.go | 34 +++++++++++----------------------- 2 files changed, 14 insertions(+), 23 deletions(-) diff --git a/pkg/meta/model/job.go b/pkg/meta/model/job.go index 3994272b1369f..4b58ec0a610cc 100644 --- a/pkg/meta/model/job.go +++ b/pkg/meta/model/job.go @@ -546,6 +546,9 @@ func (job *Job) Encode(updateRawArgs bool) ([]byte, error) { // decode special args for this job. func (job *Job) Decode(b []byte) error { err := json.Unmarshal(b, job) + if job.MayNeedReorg() && job.ReorgMeta == nil { + job.ReorgMeta = &DDLReorgMeta{} + } return errors.Trace(err) } diff --git a/pkg/meta/model/reorg.go b/pkg/meta/model/reorg.go index c0b2b39499845..cce552e1f3266 100644 --- a/pkg/meta/model/reorg.go +++ b/pkg/meta/model/reorg.go @@ -75,16 +75,16 @@ type DDLReorgMeta struct { // These two variables are used to control the concurrency and batch size of the reorganization process. // They can be adjusted dynamically through `admin alter ddl jobs` command. // Note: Don't get or set these two variables directly, use the functions instead. - Concurrency int64 `json:"concurrency"` - BatchSize int64 `json:"batch_size"` - MaxWriteSpeed int64 `json:"max_write_speed"` + Concurrency atomic.Int64 `json:"concurrency"` + BatchSize atomic.Int64 `json:"batch_size"` + MaxWriteSpeed atomic.Int64 `json:"max_write_speed"` } // GetConcurrencyOrDefault gets the concurrency from DDLReorgMeta, // pass the default value in case of the reorg meta coming from old cluster and Concurrency is 0. func (dm *DDLReorgMeta) GetConcurrencyOrDefault(defaultVal int) int { - concurrency := atomic.LoadInt64(&dm.Concurrency) - if dm == nil || concurrency == 0 { + concurrency := dm.Concurrency.Load() + if concurrency == 0 { return defaultVal } return int(concurrency) @@ -92,16 +92,13 @@ func (dm *DDLReorgMeta) GetConcurrencyOrDefault(defaultVal int) int { // SetConcurrency sets the concurrency in DDLReorgMeta. func (dm *DDLReorgMeta) SetConcurrency(concurrency int) { - if dm == nil { - dm = &DDLReorgMeta{} - } - atomic.StoreInt64(&dm.Concurrency, int64(concurrency)) + dm.Concurrency.Store(int64(concurrency)) } // GetBatchSizeOrDefault gets the batch size from DDLReorgMeta. func (dm *DDLReorgMeta) GetBatchSizeOrDefault(defaultVal int) int { - batchSize := atomic.LoadInt64(&dm.BatchSize) - if dm == nil || batchSize == 0 { + batchSize := dm.BatchSize.Load() + if batchSize == 0 { return defaultVal } return int(batchSize) @@ -109,27 +106,18 @@ func (dm *DDLReorgMeta) GetBatchSizeOrDefault(defaultVal int) int { // SetBatchSize sets the batch size in DDLReorgMeta. func (dm *DDLReorgMeta) SetBatchSize(batchSize int) { - if dm == nil { - dm = &DDLReorgMeta{} - } - atomic.StoreInt64(&dm.BatchSize, int64(batchSize)) + dm.BatchSize.Store(int64(batchSize)) } // GetMaxWriteSpeedOrDefault gets the max write speed from DDLReorgMeta. // 0 means no limit. func (dm *DDLReorgMeta) GetMaxWriteSpeedOrDefault(defaultVal int) int { - if dm == nil { - return defaultVal - } - return int(atomic.LoadInt64(&dm.MaxWriteSpeed)) + return int(dm.MaxWriteSpeed.Load()) } // SetMaxWriteSpeed sets the max write speed in DDLReorgMeta. func (dm *DDLReorgMeta) SetMaxWriteSpeed(maxWriteSpeed int) { - if dm == nil { - dm = &DDLReorgMeta{} - } - atomic.StoreInt64(&dm.MaxWriteSpeed, int64(maxWriteSpeed)) + dm.MaxWriteSpeed.Store(int64(maxWriteSpeed)) } const ( From 317331712c49ddd8f2b641bca6f7d4449ebae886 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 25 Nov 2024 12:43:44 +0800 Subject: [PATCH 23/38] use uber's atomic --- pkg/meta/model/reorg.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/meta/model/reorg.go b/pkg/meta/model/reorg.go index cce552e1f3266..bc2df87f9d7a2 100644 --- a/pkg/meta/model/reorg.go +++ b/pkg/meta/model/reorg.go @@ -16,7 +16,7 @@ package model import ( "encoding/json" - "sync/atomic" + "go.uber.org/atomic" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/parser/mysql" From 115882321a8b6012cae0a483d456b36ed1ffd708 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 25 Nov 2024 13:04:23 +0800 Subject: [PATCH 24/38] update bazel --- pkg/meta/model/BUILD.bazel | 1 + pkg/meta/model/reorg.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/meta/model/BUILD.bazel b/pkg/meta/model/BUILD.bazel index 65516797fc58e..0dd924c3903ae 100644 --- a/pkg/meta/model/BUILD.bazel +++ b/pkg/meta/model/BUILD.bazel @@ -30,6 +30,7 @@ go_library( "//pkg/util/intest", "@com_github_pingcap_errors//:errors", "@com_github_tikv_pd_client//http", + "@org_uber_go_atomic//:atomic", ], ) diff --git a/pkg/meta/model/reorg.go b/pkg/meta/model/reorg.go index bc2df87f9d7a2..86cd867c0e29f 100644 --- a/pkg/meta/model/reorg.go +++ b/pkg/meta/model/reorg.go @@ -16,11 +16,11 @@ package model import ( "encoding/json" - "go.uber.org/atomic" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" + "go.uber.org/atomic" ) // BackfillState is the state used by the backfill-merge process. From a128426b9b223d1199c87d1875d43b341dee02e7 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 25 Nov 2024 14:44:36 +0800 Subject: [PATCH 25/38] handle the reorgmeta nil --- pkg/ddl/job_worker.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/ddl/job_worker.go b/pkg/ddl/job_worker.go index 1ab333562ccf1..bcb9a1df1e28e 100644 --- a/pkg/ddl/job_worker.go +++ b/pkg/ddl/job_worker.go @@ -875,6 +875,11 @@ func (w *worker) runOneJobStep( }) } } + // When upgrading from a version where the ReorgMeta fields did not exist in the DDL job information, + // the unmarshalled job will have a nil value for the ReorgMeta field. + if w.tp == addIdxWorker && job.ReorgMeta == nil { + job.ReorgMeta = &model.DDLReorgMeta{} + } prevState := job.State From 2a29bd5ee56b41e2c19d46720ba453c48cb9016f Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 25 Nov 2024 14:47:04 +0800 Subject: [PATCH 26/38] remove useless param --- pkg/ddl/backfilling.go | 4 ++-- pkg/ddl/backfilling_dist_executor.go | 2 +- pkg/ddl/job_worker.go | 2 +- pkg/executor/show_ddl_jobs.go | 2 +- pkg/meta/model/reorg.go | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index fc5a624bafcb7..394b1e77436ed 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -708,7 +708,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( //nolint: forcetypeassert discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) - maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load())) + maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault() bcCtx, err := ingest.LitBackCtxMgr.Register( ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, maxWriteSpeed, job.RealStartTS) if err != nil { @@ -818,7 +818,7 @@ func adjustWorkerCntAndMaxWriteSpeed(ctx context.Context, pipe *operator.AsyncPi case <-ctx.Done(): return case <-ticker.C: - maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load())) + maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault() if maxWriteSpeed != bcCtx.GetLocalBackend().GetWriteSpeedLimit() { bcCtx.GetLocalBackend().UpdateWriteSpeedLimit(maxWriteSpeed) logutil.DDLIngestLogger().Info("adjust ddl job config success", diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index 956d5b547f430..799be7baa1f8c 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -154,7 +154,7 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) { discovery, job.ReorgMeta.ResourceGroupName, job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), - job.ReorgMeta.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load())), + job.ReorgMeta.GetMaxWriteSpeedOrDefault(), job.RealStartTS, ) } diff --git a/pkg/ddl/job_worker.go b/pkg/ddl/job_worker.go index bcb9a1df1e28e..5a5c45f127d36 100644 --- a/pkg/ddl/job_worker.go +++ b/pkg/ddl/job_worker.go @@ -867,7 +867,7 @@ func (w *worker) runOneJobStep( if latestJob.IsAlterable() { job.ReorgMeta.SetConcurrency(latestJob.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))) job.ReorgMeta.SetBatchSize(latestJob.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))) - job.ReorgMeta.SetMaxWriteSpeed(latestJob.ReorgMeta.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load()))) + job.ReorgMeta.SetMaxWriteSpeed(latestJob.ReorgMeta.GetMaxWriteSpeedOrDefault()) } } } diff --git a/pkg/executor/show_ddl_jobs.go b/pkg/executor/show_ddl_jobs.go index 68a14cc404775..a885d5d5e18c9 100644 --- a/pkg/executor/show_ddl_jobs.go +++ b/pkg/executor/show_ddl_jobs.go @@ -317,7 +317,7 @@ func showCommentsFromJob(job *model.Job) string { if job.MayNeedReorg() { concurrency := m.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) batchSize := m.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())) - maxWriteSpeed := m.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load())) + maxWriteSpeed := m.GetMaxWriteSpeedOrDefault() if concurrency != variable.DefTiDBDDLReorgWorkerCount { labels = append(labels, fmt.Sprintf("thread=%d", concurrency)) } diff --git a/pkg/meta/model/reorg.go b/pkg/meta/model/reorg.go index 86cd867c0e29f..723a3fda3a9b7 100644 --- a/pkg/meta/model/reorg.go +++ b/pkg/meta/model/reorg.go @@ -111,7 +111,7 @@ func (dm *DDLReorgMeta) SetBatchSize(batchSize int) { // GetMaxWriteSpeedOrDefault gets the max write speed from DDLReorgMeta. // 0 means no limit. -func (dm *DDLReorgMeta) GetMaxWriteSpeedOrDefault(defaultVal int) int { +func (dm *DDLReorgMeta) GetMaxWriteSpeedOrDefault() int { return int(dm.MaxWriteSpeed.Load()) } From 025282259a002e39585ec27622e7bf76b212721b Mon Sep 17 00:00:00 2001 From: fzzf678 <108643977+fzzf678@users.noreply.github.com> Date: Mon, 25 Nov 2024 14:48:31 +0800 Subject: [PATCH 27/38] Update pkg/planner/core/planbuilder_test.go Co-authored-by: lance6716 --- pkg/planner/core/planbuilder_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/planner/core/planbuilder_test.go b/pkg/planner/core/planbuilder_test.go index c9171bcf24fe0..16142c990a931 100644 --- a/pkg/planner/core/planbuilder_test.go +++ b/pkg/planner/core/planbuilder_test.go @@ -932,7 +932,7 @@ func TestBuildAdminAlterDDLJobPlan(t *testing.T) { require.True(t, ok) require.Equal(t, plan.JobID, int64(4)) require.Equal(t, len(plan.Options), 1) - require.Equal(t, plan.Options[0].Name, AlterDDLJobMaxWriteSpeed) + require.Equal(t,AlterDDLJobMaxWriteSpeed, plan.Options[0].Name) cons, ok = plan.Options[0].Value.(*expression.Constant) require.True(t, ok) require.Equal(t, cons.Value.GetInt64(), int64(1024)) From 4237d3c2533b4e852439f276244b4875b375ff08 Mon Sep 17 00:00:00 2001 From: fzzf678 <108643977+fzzf678@users.noreply.github.com> Date: Mon, 25 Nov 2024 14:48:48 +0800 Subject: [PATCH 28/38] Update pkg/planner/core/planbuilder_test.go Co-authored-by: lance6716 --- pkg/planner/core/planbuilder_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/planner/core/planbuilder_test.go b/pkg/planner/core/planbuilder_test.go index 16142c990a931..a558b819a9eda 100644 --- a/pkg/planner/core/planbuilder_test.go +++ b/pkg/planner/core/planbuilder_test.go @@ -935,7 +935,7 @@ func TestBuildAdminAlterDDLJobPlan(t *testing.T) { require.Equal(t,AlterDDLJobMaxWriteSpeed, plan.Options[0].Name) cons, ok = plan.Options[0].Value.(*expression.Constant) require.True(t, ok) - require.Equal(t, cons.Value.GetInt64(), int64(1024)) + require.EqualValues(t, 1024, cons.Value.GetInt64()) stmt, err = parser.ParseOneStmt("admin alter ddl jobs 5 thread = 16, batch_size = 512, max_write_speed = '10MiB' ", "", "") require.NoError(t, err) From 3900fe9d736e27dd4537850869cd262b33314eac Mon Sep 17 00:00:00 2001 From: fzzf678 <108643977+fzzf678@users.noreply.github.com> Date: Mon, 25 Nov 2024 14:49:42 +0800 Subject: [PATCH 29/38] Update pkg/planner/core/planbuilder.go Co-authored-by: lance6716 --- pkg/planner/core/planbuilder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index 2af06adf03f0c..2e6e89ef02578 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -5957,7 +5957,7 @@ func checkAlterDDLJobOptValue(opt *AlterDDLJobOpt) error { } // GetMaxWriteSpeedNumericValFromExpression gets the numeric value of the max write speed from the expression. -func GetMaxWriteSpeedNumericValFromExpression(opt *AlterDDLJobOpt) (int64, error) { +func GetMaxWriteSpeedFromExpression(opt *AlterDDLJobOpt) (int64, error) { var ( speed int64 err error From 941af0880de527efa31d019ba85487ca804a0d32 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 25 Nov 2024 14:53:38 +0800 Subject: [PATCH 30/38] use require.Len() --- pkg/planner/core/planbuilder_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/planner/core/planbuilder_test.go b/pkg/planner/core/planbuilder_test.go index a558b819a9eda..66e6b9432db5d 100644 --- a/pkg/planner/core/planbuilder_test.go +++ b/pkg/planner/core/planbuilder_test.go @@ -892,7 +892,7 @@ func TestBuildAdminAlterDDLJobPlan(t *testing.T) { plan, ok := p.(*AlterDDLJob) require.True(t, ok) require.Equal(t, plan.JobID, int64(1)) - require.Equal(t, len(plan.Options), 1) + require.Len(t, plan.Options, 1) require.Equal(t, plan.Options[0].Name, AlterDDLJobThread) cons, ok := plan.Options[0].Value.(*expression.Constant) require.True(t, ok) @@ -905,7 +905,7 @@ func TestBuildAdminAlterDDLJobPlan(t *testing.T) { plan, ok = p.(*AlterDDLJob) require.True(t, ok) require.Equal(t, plan.JobID, int64(2)) - require.Equal(t, len(plan.Options), 1) + require.Len(t, plan.Options, 1) require.Equal(t, plan.Options[0].Name, AlterDDLJobBatchSize) cons, ok = plan.Options[0].Value.(*expression.Constant) require.True(t, ok) @@ -918,7 +918,7 @@ func TestBuildAdminAlterDDLJobPlan(t *testing.T) { plan, ok = p.(*AlterDDLJob) require.True(t, ok) require.Equal(t, plan.JobID, int64(3)) - require.Equal(t, len(plan.Options), 1) + require.Len(t, plan.Options, 1) require.Equal(t, plan.Options[0].Name, AlterDDLJobMaxWriteSpeed) cons, ok = plan.Options[0].Value.(*expression.Constant) require.True(t, ok) @@ -931,8 +931,8 @@ func TestBuildAdminAlterDDLJobPlan(t *testing.T) { plan, ok = p.(*AlterDDLJob) require.True(t, ok) require.Equal(t, plan.JobID, int64(4)) - require.Equal(t, len(plan.Options), 1) - require.Equal(t,AlterDDLJobMaxWriteSpeed, plan.Options[0].Name) + require.Len(t, plan.Options, 1) + require.Equal(t, AlterDDLJobMaxWriteSpeed, plan.Options[0].Name) cons, ok = plan.Options[0].Value.(*expression.Constant) require.True(t, ok) require.EqualValues(t, 1024, cons.Value.GetInt64()) @@ -944,7 +944,7 @@ func TestBuildAdminAlterDDLJobPlan(t *testing.T) { plan, ok = p.(*AlterDDLJob) require.True(t, ok) require.Equal(t, plan.JobID, int64(5)) - require.Equal(t, len(plan.Options), 3) + require.Len(t, plan.Options, 3) sort.Slice(plan.Options, func(i, j int) bool { return plan.Options[i].Name < plan.Options[j].Name }) From f7cce41ca7b8d727a8fee0356b0f19770afc92f1 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 25 Nov 2024 14:56:23 +0800 Subject: [PATCH 31/38] use named return value --- pkg/executor/operate_ddl_jobs.go | 2 +- pkg/planner/core/planbuilder.go | 16 ++++++---------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/pkg/executor/operate_ddl_jobs.go b/pkg/executor/operate_ddl_jobs.go index 556186a28a61e..2553f71ac5c0a 100644 --- a/pkg/executor/operate_ddl_jobs.go +++ b/pkg/executor/operate_ddl_jobs.go @@ -210,7 +210,7 @@ func (e *AlterDDLJobExec) updateReorgMeta(job *model.Job, byWho model.AdminComma job.ReorgMeta.SetBatchSize(int(cons.Value.GetInt64())) job.AdminOperator = byWho case core.AlterDDLJobMaxWriteSpeed: - speed, err := core.GetMaxWriteSpeedNumericValFromExpression(opt) + speed, err := core.GetMaxWriteSpeedFromExpression(opt) if err != nil { return err } diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index 2e6e89ef02578..f019d1b6a3103 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -5944,7 +5944,7 @@ func checkAlterDDLJobOptValue(opt *AlterDDLJobOpt) error { bs, opt.Name, variable.MinDDLReorgBatchSize, variable.MaxDDLReorgBatchSize) } case AlterDDLJobMaxWriteSpeed: - speed, err := GetMaxWriteSpeedNumericValFromExpression(opt) + speed, err := GetMaxWriteSpeedFromExpression(opt) if err != nil { return err } @@ -5956,24 +5956,20 @@ func checkAlterDDLJobOptValue(opt *AlterDDLJobOpt) error { return nil } -// GetMaxWriteSpeedNumericValFromExpression gets the numeric value of the max write speed from the expression. -func GetMaxWriteSpeedFromExpression(opt *AlterDDLJobOpt) (int64, error) { - var ( - speed int64 - err error - ) +// GetMaxWriteSpeedFromExpression gets the numeric value of the max write speed from the expression. +func GetMaxWriteSpeedFromExpression(opt *AlterDDLJobOpt) (maxWriteSpeed int64, err error) { v := opt.Value.(*expression.Constant) switch v.RetType.EvalType() { case types.ETString: speedStr := v.Value.GetString() - speed, err = units.RAMInBytes(speedStr) + maxWriteSpeed, err = units.RAMInBytes(speedStr) if err != nil { return 0, errors.Trace(err) } case types.ETInt: - speed = v.Value.GetInt64() + maxWriteSpeed = v.Value.GetInt64() default: return 0, fmt.Errorf("the value %s for %s is invalid", opt.Name, opt.Value) } - return speed, nil + return maxWriteSpeed, nil } From d71a82654c90cea79b1a36f3fc1565afcf4a99ac Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 25 Nov 2024 16:23:18 +0800 Subject: [PATCH 32/38] add test and fix comment --- pkg/ddl/backfilling_test.go | 7 ++-- pkg/ddl/db_test.go | 33 +++++++++++-------- pkg/executor/show_ddl_jobs_test.go | 18 +++++----- .../backend/local/localhelper_test.go | 2 +- pkg/planner/core/planbuilder.go | 4 +-- pkg/planner/core/planbuilder_test.go | 32 ++++++++++++++++++ 6 files changed, 67 insertions(+), 29 deletions(-) diff --git a/pkg/ddl/backfilling_test.go b/pkg/ddl/backfilling_test.go index ed0802a45f5c8..d67fac8443331 100644 --- a/pkg/ddl/backfilling_test.go +++ b/pkg/ddl/backfilling_test.go @@ -489,10 +489,9 @@ func TestValidateAndFillRanges(t *testing.T) { } func TestTuneTableScanWorkerBatchSize(t *testing.T) { - reorgMeta := &model.DDLReorgMeta{ - Concurrency: 4, - BatchSize: 32, - } + reorgMeta := &model.DDLReorgMeta{} + reorgMeta.Concurrency.Store(4) + reorgMeta.BatchSize.Store(32) copCtx := &copr.CopContextSingleIndex{ CopContextBase: &copr.CopContextBase{ FieldTypes: []*types.FieldType{}, diff --git a/pkg/ddl/db_test.go b/pkg/ddl/db_test.go index 07e01c4a6572f..a7c746b7b08d4 100644 --- a/pkg/ddl/db_test.go +++ b/pkg/ddl/db_test.go @@ -1161,13 +1161,12 @@ func TestAdminAlterDDLJobUpdateSysTable(t *testing.T) { tk.MustExec("create table t (a int);") job := model.Job{ - ID: 1, - Type: model.ActionAddIndex, - ReorgMeta: &model.DDLReorgMeta{ - Concurrency: 4, - BatchSize: 128, - }, + ID: 1, + Type: model.ActionAddIndex, + ReorgMeta: &model.DDLReorgMeta{}, } + job.ReorgMeta.Concurrency.Store(4) + job.ReorgMeta.BatchSize.Store(128) insertMockJob2Table(tk, &job) tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID)) j := getJobMetaByID(t, tk, job.ID) @@ -1195,8 +1194,16 @@ func TestAdminAlterDDLJobUnsupportedCases(t *testing.T) { tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 257;", "the value 257 for thread is out of range [1, 256]") tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 31;", "the value 31 for batch_size is out of range [32, 10240]") tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 10241;", "the value 10241 for batch_size is out of range [32, 10240]") - tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '2PiB';", "the value 2PiB for max_write_speed is out of range [0, 1125899906842624]") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '2PiB';", "the value 2251799813685248 for max_write_speed is out of range [0, 1125899906842624]") tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = -1;", "the value -1 for max_write_speed is out of range [0, 1125899906842624]") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 1.23;", "the value 1.23 for max_write_speed is invalid") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 'MiB';", "parse max_write_speed value error: invalid size: 'MiB'") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 'asd';", "parse max_write_speed value error: invalid size: 'asd'") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '';", "parse max_write_speed value error: invalid size: ''") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '20xl';", "parse max_write_speed value error: invalid suffix: 'xl'") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 1.2.3;", "[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 46 near \".3;\" ") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 20+30;", "[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 44 near \"+30;\" ") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = rand();", "[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 45 near \"rand();\" ") // valid config value tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 16;", "ddl job 1 is not running") tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 64;", "ddl job 1 is not running") @@ -1206,6 +1213,7 @@ func TestAdminAlterDDLJobUnsupportedCases(t *testing.T) { tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '3MiB';", "ddl job 1 is not running") tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '4 gb';", "ddl job 1 is not running") tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 1;", "ddl job 1 is not running") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '1.23';", "ddl job 1 is not running") // invalid job id tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 8;", "ddl job 1 is not running") @@ -1243,13 +1251,12 @@ func TestAdminAlterDDLJobCommitFailed(t *testing.T) { defer testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/executor/mockAlterDDLJobCommitFailed") job := model.Job{ - ID: 1, - Type: model.ActionAddIndex, - ReorgMeta: &model.DDLReorgMeta{ - Concurrency: 4, - BatchSize: 128, - }, + ID: 1, + Type: model.ActionAddIndex, + ReorgMeta: &model.DDLReorgMeta{}, } + job.ReorgMeta.Concurrency.Store(4) + job.ReorgMeta.BatchSize.Store(128) insertMockJob2Table(tk, &job) tk.MustGetErrMsg(fmt.Sprintf("admin alter ddl jobs %d thread = 8, batch_size = 256;", job.ID), "mock commit failed on admin alter ddl jobs") diff --git a/pkg/executor/show_ddl_jobs_test.go b/pkg/executor/show_ddl_jobs_test.go index c2e4f3f601afb..a5105f0588841 100644 --- a/pkg/executor/show_ddl_jobs_test.go +++ b/pkg/executor/show_ddl_jobs_test.go @@ -68,10 +68,10 @@ func TestShowCommentsFromJob(t *testing.T) { ReorgTp: model.ReorgTypeLitMerge, IsDistReorg: true, UseCloudStorage: true, - Concurrency: 8, - BatchSize: 1024, - MaxWriteSpeed: 1024 * 1024, } + job.ReorgMeta.Concurrency.Store(8) + job.ReorgMeta.BatchSize.Store(1024) + job.ReorgMeta.MaxWriteSpeed.Store(1024 * 1024) res = showCommentsFromJob(job) require.Equal(t, "ingest, DXF, cloud, thread=8, batch_size=1024, max_write_speed=1048576", res) @@ -79,10 +79,10 @@ func TestShowCommentsFromJob(t *testing.T) { ReorgTp: model.ReorgTypeLitMerge, IsDistReorg: true, UseCloudStorage: true, - Concurrency: variable.DefTiDBDDLReorgWorkerCount, - BatchSize: variable.DefTiDBDDLReorgBatchSize, - MaxWriteSpeed: variable.DefTiDBDDLReorgMaxWriteSpeed, } + job.ReorgMeta.Concurrency.Store(variable.DefTiDBDDLReorgWorkerCount) + job.ReorgMeta.BatchSize.Store(variable.DefTiDBDDLReorgBatchSize) + job.ReorgMeta.MaxWriteSpeed.Store(variable.DefTiDBDDLReorgMaxWriteSpeed) res = showCommentsFromJob(job) require.Equal(t, "ingest, DXF, cloud", res) @@ -90,11 +90,11 @@ func TestShowCommentsFromJob(t *testing.T) { ReorgTp: model.ReorgTypeLitMerge, IsDistReorg: true, UseCloudStorage: true, - Concurrency: variable.DefTiDBDDLReorgWorkerCount, - BatchSize: variable.DefTiDBDDLReorgBatchSize, - MaxWriteSpeed: variable.DefTiDBDDLReorgMaxWriteSpeed, TargetScope: "background", } + job.ReorgMeta.Concurrency.Store(variable.DefTiDBDDLReorgWorkerCount) + job.ReorgMeta.BatchSize.Store(variable.DefTiDBDDLReorgBatchSize) + job.ReorgMeta.MaxWriteSpeed.Store(variable.DefTiDBDDLReorgMaxWriteSpeed) res = showCommentsFromJob(job) require.Equal(t, "ingest, DXF, cloud, service_scope=background", res) } diff --git a/pkg/lightning/backend/local/localhelper_test.go b/pkg/lightning/backend/local/localhelper_test.go index 0b77a93924625..aa151576d5fd1 100644 --- a/pkg/lightning/backend/local/localhelper_test.go +++ b/pkg/lightning/backend/local/localhelper_test.go @@ -344,7 +344,7 @@ func TestTuneStoreWriteLimiter(t *testing.T) { gotTokens += n } elapsed := time.Since(start) - maxTokens := 120 + int(float64(elapsed.Seconds())*float64(maxT)) + maxTokens := 120 + int(elapsed.Seconds()*float64(maxT)) // In theory, gotTokens should be less than or equal to maxT. // But we allow a little of error to avoid the test being flaky. require.LessOrEqual(t, gotTokens, maxTokens+1) diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index f019d1b6a3103..6c9f62ee5814b 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -5964,12 +5964,12 @@ func GetMaxWriteSpeedFromExpression(opt *AlterDDLJobOpt) (maxWriteSpeed int64, e speedStr := v.Value.GetString() maxWriteSpeed, err = units.RAMInBytes(speedStr) if err != nil { - return 0, errors.Trace(err) + return 0, errors.Annotate(err, "parse max_write_speed value error") } case types.ETInt: maxWriteSpeed = v.Value.GetInt64() default: - return 0, fmt.Errorf("the value %s for %s is invalid", opt.Name, opt.Value) + return 0, fmt.Errorf("the value %s for %s is invalid", fmt.Sprintf("%v", v.Value.GetValue()), opt.Name) } return maxWriteSpeed, nil } diff --git a/pkg/planner/core/planbuilder_test.go b/pkg/planner/core/planbuilder_test.go index 66e6b9432db5d..5f5e3fc41183e 100644 --- a/pkg/planner/core/planbuilder_test.go +++ b/pkg/planner/core/planbuilder_test.go @@ -17,12 +17,14 @@ package core import ( "context" "fmt" + "math/rand" "reflect" "sort" "strings" "testing" "unsafe" + "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/expression" @@ -966,3 +968,33 @@ func TestBuildAdminAlterDDLJobPlan(t *testing.T) { _, err = builder.Build(ctx, resolve.NewNodeW(stmt)) require.Equal(t, err.Error(), "unsupported admin alter ddl jobs config: aaa") } + +func TestGetMaxWriteSpeedFromExpression(t *testing.T) { + parser := parser.New() + sctx := MockContext() + ctx := context.TODO() + builder, _ := NewPlanBuilder().Init(sctx, nil, hint.NewQBHintHandler(nil)) + // random speed value + n := rand.Intn(units.PiB + 1) + stmt, err := parser.ParseOneStmt(fmt.Sprintf("admin alter ddl jobs 1 max_write_speed = %d ", n), "", "") + require.NoError(t, err) + p, err := builder.Build(ctx, resolve.NewNodeW(stmt)) + require.NoError(t, err) + plan, ok := p.(*AlterDDLJob) + require.True(t, ok) + require.Equal(t, plan.JobID, int64(1)) + require.Len(t, plan.Options, 1) + require.Equal(t, plan.Options[0].Name, AlterDDLJobMaxWriteSpeed) + _, ok = plan.Options[0].Value.(*expression.Constant) + require.True(t, ok) + maxWriteSpeed, err := GetMaxWriteSpeedFromExpression(plan.Options[0]) + require.NoError(t, err) + require.Equal(t, int64(n), maxWriteSpeed) + // parse speed string error + opt := &AlterDDLJobOpt{ + Name: "test", + Value: expression.NewStrConst("MiB"), + } + _, err = GetMaxWriteSpeedFromExpression(opt) + require.Equal(t, "invalid size: 'MiB'", err.Error()) +} From 5ba427802a3717b4544118340772ed8fe0017002 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 25 Nov 2024 16:28:16 +0800 Subject: [PATCH 33/38] Update planbuilder_test.go --- pkg/planner/core/planbuilder_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/planner/core/planbuilder_test.go b/pkg/planner/core/planbuilder_test.go index 5f5e3fc41183e..eda77c5f33981 100644 --- a/pkg/planner/core/planbuilder_test.go +++ b/pkg/planner/core/planbuilder_test.go @@ -996,5 +996,5 @@ func TestGetMaxWriteSpeedFromExpression(t *testing.T) { Value: expression.NewStrConst("MiB"), } _, err = GetMaxWriteSpeedFromExpression(opt) - require.Equal(t, "invalid size: 'MiB'", err.Error()) + require.Equal(t, "parse max_write_speed value error: invalid size: 'MiB'", err.Error()) } From a9cda92729183f00f8728e12837cb8d53d143238 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 25 Nov 2024 16:47:13 +0800 Subject: [PATCH 34/38] check thread and batch_size eval type --- pkg/ddl/db_test.go | 6 ++++++ pkg/planner/core/planbuilder.go | 21 +++++++++++++++++++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/pkg/ddl/db_test.go b/pkg/ddl/db_test.go index a7c746b7b08d4..2960fd1e188e6 100644 --- a/pkg/ddl/db_test.go +++ b/pkg/ddl/db_test.go @@ -1192,8 +1192,14 @@ func TestAdminAlterDDLJobUnsupportedCases(t *testing.T) { // invalid config value tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 0;", "the value 0 for thread is out of range [1, 256]") tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 257;", "the value 257 for thread is out of range [1, 256]") + tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 10.5;", "the value for thread is invalid, only integer is allowed") + tk.MustGetErrMsg("admin alter ddl jobs 1 thread = '16';", "the value for thread is invalid, only integer is allowed") + tk.MustGetErrMsg("admin alter ddl jobs 1 thread = '';", "the value for thread is invalid, only integer is allowed") tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 31;", "the value 31 for batch_size is out of range [32, 10240]") tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 10241;", "the value 10241 for batch_size is out of range [32, 10240]") + tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 321.3;", "the value for batch_size is invalid, only integer is allowed") + tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = '512';", "the value for batch_size is invalid, only integer is allowed") + tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = '';", "the value for batch_size is invalid, only integer is allowed") tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '2PiB';", "the value 2251799813685248 for max_write_speed is out of range [0, 1125899906842624]") tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = -1;", "the value -1 for max_write_speed is out of range [0, 1125899906842624]") tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 1.23;", "the value 1.23 for max_write_speed is invalid") diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index 6c9f62ee5814b..0103eadb7a503 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -5931,13 +5931,19 @@ func (b *PlanBuilder) buildAdminAlterDDLJob(ctx context.Context, as *ast.AdminSt func checkAlterDDLJobOptValue(opt *AlterDDLJobOpt) error { switch opt.Name { case AlterDDLJobThread: - thread := opt.Value.(*expression.Constant).Value.GetInt64() + thread, err := GetThreadOrBatchSizeFromExpression(opt) + if err != nil { + return err + } if thread < 1 || thread > variable.MaxConfigurableConcurrency { return fmt.Errorf("the value %v for %s is out of range [1, %v]", thread, opt.Name, variable.MaxConfigurableConcurrency) } case AlterDDLJobBatchSize: - batchSize := opt.Value.(*expression.Constant).Value.GetInt64() + batchSize, err := GetThreadOrBatchSizeFromExpression(opt) + if err != nil { + return err + } bs := int32(batchSize) if bs < variable.MinDDLReorgBatchSize || bs > variable.MaxDDLReorgBatchSize { return fmt.Errorf("the value %v for %s is out of range [%v, %v]", @@ -5956,6 +5962,17 @@ func checkAlterDDLJobOptValue(opt *AlterDDLJobOpt) error { return nil } +// GetThreadOrBatchSizeFromExpression gets the numeric value of the thread or batch size from the expression. +func GetThreadOrBatchSizeFromExpression(opt *AlterDDLJobOpt) (int64, error) { + v := opt.Value.(*expression.Constant) + switch v.RetType.EvalType() { + case types.ETInt: + return v.Value.GetInt64(), nil + default: + return 0, fmt.Errorf("the value for %s is invalid, only integer is allowed", opt.Name) + } +} + // GetMaxWriteSpeedFromExpression gets the numeric value of the max write speed from the expression. func GetMaxWriteSpeedFromExpression(opt *AlterDDLJobOpt) (maxWriteSpeed int64, err error) { v := opt.Value.(*expression.Constant) From 00d00275961dd4a9fa229a337183e329b76d7fb8 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 25 Nov 2024 16:52:03 +0800 Subject: [PATCH 35/38] Update BUILD.bazel --- pkg/planner/core/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/planner/core/BUILD.bazel b/pkg/planner/core/BUILD.bazel index 3de5328f78ec7..cee3079583f42 100644 --- a/pkg/planner/core/BUILD.bazel +++ b/pkg/planner/core/BUILD.bazel @@ -326,6 +326,7 @@ go_test( "//pkg/util/ranger", "//pkg/util/stmtsummary", "//pkg/util/tracing", + "@com_github_docker_go_units//:go-units", "@com_github_golang_snappy//:snappy", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", From a1cc392ed269ddd58dba10f498b7f1cf12c209b4 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 25 Nov 2024 16:58:07 +0800 Subject: [PATCH 36/38] correct maxTokens formula --- pkg/lightning/backend/local/localhelper_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/lightning/backend/local/localhelper_test.go b/pkg/lightning/backend/local/localhelper_test.go index aa151576d5fd1..f107318422e0b 100644 --- a/pkg/lightning/backend/local/localhelper_test.go +++ b/pkg/lightning/backend/local/localhelper_test.go @@ -344,7 +344,7 @@ func TestTuneStoreWriteLimiter(t *testing.T) { gotTokens += n } elapsed := time.Since(start) - maxTokens := 120 + int(elapsed.Seconds()*float64(maxT)) + maxTokens := int(1.2*float64(maxT)) + int(elapsed.Seconds()*float64(maxT)) // In theory, gotTokens should be less than or equal to maxT. // But we allow a little of error to avoid the test being flaky. require.LessOrEqual(t, gotTokens, maxTokens+1) From becf32ebecbf24b0340c5dae129bc134eb8992dd Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 25 Nov 2024 17:50:27 +0800 Subject: [PATCH 37/38] remove handle reorg nil case when decode --- pkg/meta/model/job.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/meta/model/job.go b/pkg/meta/model/job.go index 4b58ec0a610cc..3994272b1369f 100644 --- a/pkg/meta/model/job.go +++ b/pkg/meta/model/job.go @@ -546,9 +546,6 @@ func (job *Job) Encode(updateRawArgs bool) ([]byte, error) { // decode special args for this job. func (job *Job) Decode(b []byte) error { err := json.Unmarshal(b, job) - if job.MayNeedReorg() && job.ReorgMeta == nil { - job.ReorgMeta = &DDLReorgMeta{} - } return errors.Trace(err) } From edcd1e924ec7126ff0d817d475ce79a96b15cc98 Mon Sep 17 00:00:00 2001 From: fzzf678 <108643977+fzzf678@users.noreply.github.com> Date: Mon, 25 Nov 2024 17:50:59 +0800 Subject: [PATCH 38/38] Update pkg/planner/core/planbuilder.go Co-authored-by: lance6716 --- pkg/planner/core/planbuilder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index 0103eadb7a503..9558a0417fd03 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -5986,7 +5986,7 @@ func GetMaxWriteSpeedFromExpression(opt *AlterDDLJobOpt) (maxWriteSpeed int64, e case types.ETInt: maxWriteSpeed = v.Value.GetInt64() default: - return 0, fmt.Errorf("the value %s for %s is invalid", fmt.Sprintf("%v", v.Value.GetValue()), opt.Name) + return 0, fmt.Errorf("the value %v for %s is invalid", v.Value.GetValue(), opt.Name) } return maxWriteSpeed, nil }