diff --git a/ddl/index.go b/ddl/index.go index 9bed641c75b91..14b89c64aa44e 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" @@ -443,7 +444,6 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) { const ( defaultTaskHandleCnt = 128 - defaultIndexWorkers = 16 ) // indexRecord is the record information of an index. @@ -873,10 +873,10 @@ func (w *worker) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgI log.Infof("[ddl-reorg] addTableIndex, job:%s, reorgInfo:%#v", job, reorgInfo) colFieldMap := makeupIndexColFieldMap(t, indexInfo) - // TODO: make workerCnt can be modified by system variable. - workerCnt := defaultIndexWorkers + // variable.ddlReorgWorkerCounter can be modified by system variable "tidb_ddl_reorg_worker_cnt". + workerCnt := variable.GetDDLReorgWorkerCounter() idxWorkers := make([]*addIndexWorker, workerCnt) - for i := 0; i < workerCnt; i++ { + for i := 0; i < int(workerCnt); i++ { sessCtx := newContext(reorgInfo.d.store) idxWorkers[i] = newAddIndexWorker(sessCtx, w, i, t, indexInfo, colFieldMap) go idxWorkers[i].run(reorgInfo.d) diff --git a/executor/ddl_test.go b/executor/ddl_test.go index 397b62d3c2872..b25211a8baa1e 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/plan" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -399,3 +400,31 @@ func (s *testSuite) TestMaxHandleAddIndex(c *C) { tk.MustExec("alter table t1 add index idx_b(b)") tk.MustExec("admin check table t1") } + +func (s *testSuite) TestSetDDLReorgWorkerCnt(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(variable.DefTiDBDDLReorgWorkerCount)) + tk.MustExec("set tidb_ddl_reorg_worker_cnt = 1") + c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(1)) + tk.MustExec("set tidb_ddl_reorg_worker_cnt = 100") + c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(100)) + tk.MustExec("set tidb_ddl_reorg_worker_cnt = invalid_val") + c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(variable.DefTiDBDDLReorgWorkerCount)) + tk.MustExec("set tidb_ddl_reorg_worker_cnt = 100") + c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(100)) + tk.MustExec("set tidb_ddl_reorg_worker_cnt = -1") + c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(variable.DefTiDBDDLReorgWorkerCount)) + + res := tk.MustQuery("select @@tidb_ddl_reorg_worker_cnt") + res.Check(testkit.Rows("-1")) + tk.MustExec("set tidb_ddl_reorg_worker_cnt = 100") + res = tk.MustQuery("select @@tidb_ddl_reorg_worker_cnt") + res.Check(testkit.Rows("100")) + + res = tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt") + res.Check(testkit.Rows(fmt.Sprintf("%v", variable.DefTiDBDDLReorgWorkerCount))) + tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 100") + res = tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt") + res.Check(testkit.Rows("100")) +} diff --git a/session/session.go b/session/session.go index ff7e65edd1ecf..9dcf55437a3b2 100644 --- a/session/session.go +++ b/session/session.go @@ -1313,6 +1313,7 @@ const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variab variable.TiDBHashAggPartialConcurrency + quoteCommaQuote + variable.TiDBHashAggFinalConcurrency + quoteCommaQuote + variable.TiDBBackoffLockFast + quoteCommaQuote + + variable.TiDBDDLReorgWorkerCount + quoteCommaQuote + variable.TiDBOptInSubqUnFolding + quoteCommaQuote + variable.TiDBDistSQLScanConcurrency + quoteCommaQuote + variable.TiDBMaxChunkSize + quoteCommaQuote + diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 1583bc8d1a1c0..99edc8610bdd2 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -543,6 +543,9 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.EnableStreaming = TiDBOptOn(val) case TiDBOptimizerSelectivityLevel: s.OptimizerSelectivityLevel = tidbOptPositiveInt32(val, DefTiDBOptimizerSelectivityLevel) + case TiDBDDLReorgWorkerCount: + workerCnt := tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount) + SetDDLReorgWorkerCounter(int32(workerCnt)) } s.systems[name] = val return nil diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 3393243eb2285..30f519cac913d 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -647,6 +647,7 @@ var defaultSysVars = []*SysVar{ /* The following variable is defined as session scope but is actually server scope. */ {ScopeSession, TiDBGeneralLog, strconv.Itoa(DefTiDBGeneralLog)}, {ScopeSession, TiDBConfig, ""}, + {ScopeGlobal | ScopeSession, TiDBDDLReorgWorkerCount, strconv.Itoa(DefTiDBDDLReorgWorkerCount)}, } // SynonymsSysVariables is synonyms of system variables. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 8627efc52511d..0fe5976880923 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -171,6 +171,9 @@ const ( // tidb_backoff_lock_fast is used for tikv backoff base time in milliseconds. TiDBBackoffLockFast = "tidb_backoff_lock_fast" + + // tidb_ddl_reorg_worker_cnt defines the count of ddl reorg workers. + TiDBDDLReorgWorkerCount = "tidb_ddl_reorg_worker_cnt" ) // Default TiDB system variable values. @@ -205,11 +208,14 @@ const ( DefTiDBHashJoinConcurrency = 5 DefTiDBProjectionConcurrency = 4 DefTiDBOptimizerSelectivityLevel = 0 + DefTiDBDDLReorgWorkerCount = 16 DefTiDBHashAggPartialConcurrency = 4 DefTiDBHashAggFinalConcurrency = 4 ) // Process global variables. var ( - ProcessGeneralLog uint32 + ProcessGeneralLog uint32 + ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount + maxDDLReorgWorkerCount int32 = 128 ) diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 44da3ab4b4091..c94b09335bc18 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -27,6 +27,20 @@ import ( "github.com/pingcap/tidb/types" ) +// SetDDLReorgWorkerCounter sets ddlReorgWorkerCounter count. +// Max worker count is maxDDLReorgWorkerCount. +func SetDDLReorgWorkerCounter(cnt int32) { + if cnt > maxDDLReorgWorkerCount { + cnt = maxDDLReorgWorkerCount + } + atomic.StoreInt32(&ddlReorgWorkerCounter, cnt) +} + +// GetDDLReorgWorkerCounter gets ddlReorgWorkerCounter. +func GetDDLReorgWorkerCounter() int32 { + return atomic.LoadInt32(&ddlReorgWorkerCounter) +} + // GetSessionSystemVar gets a system variable. // If it is a session only variable, use the default value defined in code. // Returns error if there is no such variable. diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index c76333cb45f58..c6c522fd19596 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -219,6 +219,16 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) { SetSessionSystemVar(v, TiDBOptimizerSelectivityLevel, types.NewIntDatum(1)) c.Assert(v.OptimizerSelectivityLevel, Equals, 1) + c.Assert(GetDDLReorgWorkerCounter(), Equals, int32(DefTiDBDDLReorgWorkerCount)) + SetSessionSystemVar(v, TiDBDDLReorgWorkerCount, types.NewIntDatum(1)) + c.Assert(GetDDLReorgWorkerCounter(), Equals, int32(1)) + + SetSessionSystemVar(v, TiDBDDLReorgWorkerCount, types.NewIntDatum(-1)) + c.Assert(GetDDLReorgWorkerCounter(), Equals, int32(DefTiDBDDLReorgWorkerCount)) + + SetSessionSystemVar(v, TiDBDDLReorgWorkerCount, types.NewIntDatum(int64(maxDDLReorgWorkerCount)+1)) + c.Assert(GetDDLReorgWorkerCounter(), Equals, int32(maxDDLReorgWorkerCount)) + err = SetSessionSystemVar(v, TiDBRetryLimit, types.NewStringDatum("3")) c.Assert(err, IsNil) val, err = GetSessionSystemVar(v, TiDBRetryLimit)