diff --git a/Makefile b/Makefile index 66b3ba0686917..ba8c742782900 100644 --- a/Makefile +++ b/Makefile @@ -408,10 +408,6 @@ bazel_coverage_test: failpoint-enable bazel_ci_prepare --build_event_json_file=bazel_1.json --@io_bazel_rules_go//go/config:cover_format=go_cover \ -- //... -//cmd/... -//tests/graceshutdown/... \ -//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/... - bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) \ - --build_event_json_file=bazel_2.json --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=featuretag \ - -- //... -//cmd/... -//tests/graceshutdown/... \ - -//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/... bazel_build: bazel_ci_prepare mkdir -p bin diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index fff775bf1c1d7..157b9cdf794c9 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -191,7 +191,6 @@ func NewMgr( return nil, errors.Trace(err) } // we must check tidb(tikv version) any time after concurrent ddl feature implemented in v6.2. - // when tidb < 6.2 we need set EnableConcurrentDDL false to make ddl works. // we will keep this check until 7.0, which allow the breaking changes. // NOTE: must call it after domain created! // FIXME: remove this check in v7.0 diff --git a/br/pkg/version/BUILD.bazel b/br/pkg/version/BUILD.bazel index 8171081ae5df1..7a15014e378e6 100644 --- a/br/pkg/version/BUILD.bazel +++ b/br/pkg/version/BUILD.bazel @@ -10,7 +10,6 @@ go_library( "//br/pkg/logutil", "//br/pkg/utils", "//br/pkg/version/build", - "//sessionctx/variable", "//util/engine", "@com_github_coreos_go_semver//semver", "@com_github_pingcap_errors//:errors", @@ -29,7 +28,6 @@ go_test( flaky = True, deps = [ "//br/pkg/version/build", - "//sessionctx/variable", "@com_github_coreos_go_semver//semver", "@com_github_data_dog_go_sqlmock//:go-sqlmock", "@com_github_pingcap_kvproto//pkg/metapb", diff --git a/br/pkg/version/version.go b/br/pkg/version/version.go index c49e3d1ada923..9cb974d48e13f 100644 --- a/br/pkg/version/version.go +++ b/br/pkg/version/version.go @@ -18,7 +18,6 @@ import ( "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version/build" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/engine" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -166,9 +165,7 @@ func CheckVersionForDDL(s *metapb.Store, tikvVersion *semver.Version) error { // use tikvVersion instead of tidbVersion since br doesn't have mysql client to connect tidb. requireVersion := semver.New("6.2.0-alpha") if tikvVersion.Compare(*requireVersion) < 0 { - log.Info("detected the old version of tidb cluster. set enable concurrent ddl to false") - variable.EnableConcurrentDDL.Store(false) - return nil + return errors.Errorf("detected the old version of tidb cluster, require: >= 6.2.0, but got %s", tikvVersion.String()) } return nil } diff --git a/br/pkg/version/version_test.go b/br/pkg/version/version_test.go index 1fc654b3990b6..927eeee119d5b 100644 --- a/br/pkg/version/version_test.go +++ b/br/pkg/version/version_test.go @@ -13,7 +13,6 @@ import ( "github.com/coreos/go-semver/semver" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/version/build" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" ) @@ -321,50 +320,40 @@ func TestCheckClusterVersion(t *testing.T) { mock.getAllStores = func() []*metapb.Store { return []*metapb.Store{{Version: "v6.4.0"}} } - originVal := variable.EnableConcurrentDDL.Load() err := CheckClusterVersion(context.Background(), &mock, CheckVersionForDDL) require.NoError(t, err) - require.Equal(t, originVal, variable.EnableConcurrentDDL.Load()) } { mock.getAllStores = func() []*metapb.Store { return []*metapb.Store{{Version: "v6.2.0"}} } - originVal := variable.EnableConcurrentDDL.Load() err := CheckClusterVersion(context.Background(), &mock, CheckVersionForDDL) require.NoError(t, err) - require.Equal(t, originVal, variable.EnableConcurrentDDL.Load()) } { mock.getAllStores = func() []*metapb.Store { return []*metapb.Store{{Version: "v6.2.0-alpha"}} } - originVal := variable.EnableConcurrentDDL.Load() err := CheckClusterVersion(context.Background(), &mock, CheckVersionForDDL) require.NoError(t, err) - require.Equal(t, originVal, variable.EnableConcurrentDDL.Load()) } { mock.getAllStores = func() []*metapb.Store { return []*metapb.Store{{Version: "v6.1.0"}} } - variable.EnableConcurrentDDL.Store(true) err := CheckClusterVersion(context.Background(), &mock, CheckVersionForDDL) - require.NoError(t, err) - require.False(t, variable.EnableConcurrentDDL.Load()) + require.Error(t, err) } { mock.getAllStores = func() []*metapb.Store { return []*metapb.Store{{Version: "v5.4.0"}} } - variable.EnableConcurrentDDL.Store(true) err := CheckClusterVersion(context.Background(), &mock, CheckVersionForDDL) - require.NoError(t, err) - require.False(t, variable.EnableConcurrentDDL.Load()) + require.Error(t, err) } } diff --git a/ddl/column.go b/ddl/column.go index 8105e52fa438e..04af3f1714a1d 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -806,7 +806,7 @@ func doReorgWorkForModifyColumnMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, j func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) { job.ReorgMeta.ReorgTp = model.ReorgTypeTxn - rh := newReorgHandler(t, w.sess, w.concurrentDDL) + rh := newReorgHandler(t, w.sess) dbInfo, err := t.GetDatabase(job.SchemaID) if err != nil { return false, ver, errors.Trace(err) diff --git a/ddl/concurrentddltest/BUILD.bazel b/ddl/concurrentddltest/BUILD.bazel deleted file mode 100644 index 82e2adf1fe9c2..0000000000000 --- a/ddl/concurrentddltest/BUILD.bazel +++ /dev/null @@ -1,26 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_test") - -go_test( - name = "concurrentddltest_test", - timeout = "moderate", - srcs = [ - "main_test.go", - "switch_test.go", - ], - flaky = True, - race = "on", - shard_count = 2, - deps = [ - "//config", - "//ddl", - "//kv", - "//meta", - "//sessionctx/variable", - "//testkit", - "//testkit/testsetup", - "//util", - "@com_github_stretchr_testify//require", - "@org_uber_go_atomic//:atomic", - "@org_uber_go_goleak//:goleak", - ], -) diff --git a/ddl/concurrentddltest/main_test.go b/ddl/concurrentddltest/main_test.go deleted file mode 100644 index 4ab7e96eab2ae..0000000000000 --- a/ddl/concurrentddltest/main_test.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package concurrentddltest - -import ( - "testing" - "time" - - "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/ddl" - "github.com/pingcap/tidb/testkit/testsetup" - "go.uber.org/goleak" -) - -func TestMain(m *testing.M) { - testsetup.SetupForCommonTest() - - config.UpdateGlobal(func(conf *config.Config) { - conf.TiKVClient.AsyncCommit.SafeWindow = 0 - conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0 - }) - - ddl.SetWaitTimeWhenErrorOccurred(time.Microsecond) - - opts := []goleak.Option{ - goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), - goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), - goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), - goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), - } - - goleak.VerifyTestMain(m, opts...) -} diff --git a/ddl/concurrentddltest/switch_test.go b/ddl/concurrentddltest/switch_test.go deleted file mode 100644 index 6cd26811008e6..0000000000000 --- a/ddl/concurrentddltest/switch_test.go +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package concurrentddltest - -import ( - "context" - "fmt" - "math/rand" - "testing" - "time" - - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta" - "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/testkit" - "github.com/pingcap/tidb/util" - "github.com/stretchr/testify/require" - "go.uber.org/atomic" -) - -func TestConcurrentDDLSwitch(t *testing.T) { - store := testkit.CreateMockStore(t) - - type table struct { - columnIdx int - indexIdx int - } - - var tables []*table - tblCount := 20 - for i := 0; i < tblCount; i++ { - tables = append(tables, &table{1, 0}) - } - - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt=1") - tk.MustExec("set @@global.tidb_ddl_reorg_batch_size=32") - - for i := range tables { - tk.MustExec(fmt.Sprintf("create table t%d (col0 int)", i)) - for j := 0; j < 1000; j++ { - tk.MustExec(fmt.Sprintf("insert into t%d values (%d)", i, j)) - } - } - - ddls := make([]string, 0, tblCount) - ddlCount := 100 - for i := 0; i < ddlCount; i++ { - tblIdx := rand.Intn(tblCount) - if rand.Intn(2) == 0 { - ddls = append(ddls, fmt.Sprintf("alter table t%d add index idx%d (col0)", tblIdx, tables[tblIdx].indexIdx)) - tables[tblIdx].indexIdx++ - } else { - ddls = append(ddls, fmt.Sprintf("alter table t%d add column col%d int", tblIdx, tables[tblIdx].columnIdx)) - tables[tblIdx].columnIdx++ - } - } - - c := atomic.NewInt32(0) - ch := make(chan struct{}) - go func() { - var wg util.WaitGroupWrapper - for i := range ddls { - wg.Add(1) - go func(idx int) { - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec(ddls[idx]) - c.Add(1) - wg.Done() - }(i) - } - wg.Wait() - ch <- struct{}{} - }() - - // sleep 2s to make sure the ddl jobs is into table. - time.Sleep(2 * time.Second) - ticker := time.NewTicker(time.Second) - count := 0 - done := false - for !done { - select { - case <-ch: - done = true - case <-ticker.C: - var b bool - var err error - err = kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, false, func(ctx context.Context, txn kv.Transaction) error { - b, err = meta.NewMeta(txn).IsConcurrentDDL() - return err - }) - require.NoError(t, err) - rs, err := testkit.NewTestKit(t, store).Exec(fmt.Sprintf("set @@global.tidb_enable_concurrent_ddl=%t", !b)) - if rs != nil { - require.NoError(t, rs.Close()) - } - if err == nil { - count++ - if b { - tk := testkit.NewTestKit(t, store) - tk.Session().GetSessionVars().MemQuotaQuery = -1 - tk.MustQuery("select count(*) from mysql.tidb_ddl_job").Check(testkit.Rows("0")) - tk.MustQuery("select count(*) from mysql.tidb_ddl_reorg").Check(testkit.Rows("0")) - } - } - } - } - - require.Equal(t, int32(ddlCount), c.Load()) - require.Greater(t, count, 0) - - tk = testkit.NewTestKit(t, store) - tk.Session().GetSessionVars().MemQuotaQuery = -1 - tk.MustExec("use test") - for i, tbl := range tables { - tk.MustQuery(fmt.Sprintf("select count(*) from information_schema.columns where TABLE_SCHEMA = 'test' and TABLE_NAME = 't%d'", i)).Check(testkit.Rows(fmt.Sprintf("%d", tbl.columnIdx))) - tk.MustExec(fmt.Sprintf("admin check table t%d", i)) - for j := 0; j < tbl.indexIdx; j++ { - tk.MustExec(fmt.Sprintf("admin check index t%d idx%d", i, j)) - } - } -} - -func TestConcurrentDDLSwitchWithMDL(t *testing.T) { - if !variable.EnableConcurrentDDL.Load() { - t.Skip("skip test if concurrent DDL is disabled") - } - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - tk.MustGetErrMsg("set global tidb_enable_concurrent_ddl=off", "can not disable concurrent ddl when metadata lock is enabled") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk.MustExec("set global tidb_enable_concurrent_ddl=off") - tk.MustExec("create table test.t(a int)") -} diff --git a/ddl/ddl.go b/ddl/ddl.go index 1e1b38eeb77bb..4cbdcfde9eeef 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -252,10 +252,6 @@ type DDL interface { GetInfoSchemaWithInterceptor(ctx sessionctx.Context) infoschema.InfoSchema // DoDDLJob does the DDL job, it's exported for test. DoDDLJob(ctx sessionctx.Context, job *model.Job) error - // MoveJobFromQueue2Table move existing DDLs from queue to table. - MoveJobFromQueue2Table(bool) error - // MoveJobFromTable2Queue move existing DDLs from table to queue. - MoveJobFromTable2Queue() error } type limitJobTask struct { @@ -270,7 +266,6 @@ type ddl struct { limitJobCh chan *limitJobTask *ddlCtx - workers map[workerType]*worker sessPool *sessionPool delRangeMgr delRangeManager enableTiFlashPoll *atomicutil.Bool @@ -624,7 +619,6 @@ func newDDL(ctx context.Context, options ...Option) *ddl { // Register functions for enable/disable ddl when changing system variable `tidb_enable_ddl`. variable.EnableDDL = d.EnableDDL variable.DisableDDL = d.DisableDDL - variable.SwitchConcurrentDDL = d.SwitchConcurrentDDL variable.SwitchMDL = d.SwitchMDL return d @@ -656,7 +650,7 @@ func (d *ddl) newDeleteRangeManager(mock bool) delRangeManager { func (d *ddl) prepareWorkers4ConcurrencyDDL() { workerFactory := func(tp workerType) func() (pools.Resource, error) { return func() (pools.Resource, error) { - wk := newWorker(d.ctx, tp, d.sessPool, d.delRangeMgr, d.ddlCtx, true) + wk := newWorker(d.ctx, tp, d.sessPool, d.delRangeMgr, d.ddlCtx) sessForJob, err := d.sessPool.get() if err != nil { return nil, err @@ -679,23 +673,6 @@ func (d *ddl) prepareWorkers4ConcurrencyDDL() { d.wg.Run(d.startDispatchLoop) } -func (d *ddl) prepareWorkers4legacyDDL() { - d.workers = make(map[workerType]*worker, 2) - d.workers[generalWorker] = newWorker(d.ctx, generalWorker, d.sessPool, d.delRangeMgr, d.ddlCtx, false) - d.workers[addIdxWorker] = newWorker(d.ctx, addIdxWorker, d.sessPool, d.delRangeMgr, d.ddlCtx, false) - for _, worker := range d.workers { - worker.wg.Add(1) - w := worker - go w.start(d.ddlCtx) - - metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_%s", metrics.CreateDDL, worker.String())).Inc() - - // When the start function is called, we will send a fake job to let worker - // checks owner firstly and try to find whether a job exists and run. - asyncNotify(worker.ddlJobCh) - } -} - // Start implements DDL.Start interface. func (d *ddl) Start(ctxPool *pools.ResourcePool) error { logutil.BgLogger().Info("[ddl] start DDL", zap.String("ID", d.uuid), zap.Bool("runWorker", config.GetGlobalConfig().Instance.TiDBEnableDDL.Load())) @@ -713,7 +690,6 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error { d.delRangeMgr = d.newDeleteRangeManager(ctxPool == nil) d.prepareWorkers4ConcurrencyDDL() - d.prepareWorkers4legacyDDL() if config.TableLockEnabled() { d.wg.Add(1) @@ -799,9 +775,6 @@ func (d *ddl) close() { d.generalDDLWorkerPool.close() } - for _, worker := range d.workers { - worker.Close() - } // d.delRangeMgr using sessions from d.sessPool. // Put it before d.sessPool.close to reduce the time spent by d.sessPool.close. if d.delRangeMgr != nil { @@ -921,24 +894,10 @@ func (d *ddl) asyncNotifyWorker(job *model.Job) { if !config.GetGlobalConfig().Instance.TiDBEnableDDL.Load() { return } - if variable.EnableConcurrentDDL.Load() { - if d.isOwner() { - asyncNotify(d.ddlJobCh) - } else { - d.asyncNotifyByEtcd(addingDDLJobConcurrent, job) - } + if d.isOwner() { + asyncNotify(d.ddlJobCh) } else { - var worker *worker - if job.MayNeedReorg() { - worker = d.workers[addIdxWorker] - } else { - worker = d.workers[generalWorker] - } - if d.ownerManager.IsOwner() { - asyncNotify(worker.ddlJobCh) - } else { - d.asyncNotifyByEtcd(worker.addingDDLJobKey, job) - } + d.asyncNotifyByEtcd(addingDDLJobConcurrent, job) } } @@ -1055,7 +1014,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { continue } sessVars.StmtCtx.DDLJobID = 0 // Avoid repeat. - errs, err := CancelJobs(se, d.store, []int64{jobID}) + errs, err := CancelJobs(se, []int64{jobID}) d.sessPool.put(se) if len(errs) > 0 { logutil.BgLogger().Warn("error canceling DDL job", zap.Error(errs[0])) @@ -1182,55 +1141,12 @@ func (d *ddl) startCleanDeadTableLock() { } } -// SwitchConcurrentDDL changes the DDL to concurrent DDL if toConcurrentDDL is true, otherwise, queue based DDL. -func (d *ddl) SwitchConcurrentDDL(toConcurrentDDL bool) error { - if !d.isOwner() { - return kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), d.store, true, func(ctx context.Context, txn kv.Transaction) error { - isConcurrentDDL, err := meta.NewMeta(txn).IsConcurrentDDL() - if err != nil { - return err - } - if isConcurrentDDL != toConcurrentDDL { - return errors.New("please set it on the DDL owner node") - } - return nil - }) - } - - if variable.EnableMDL.Load() && !toConcurrentDDL { - return errors.New("can not disable concurrent ddl when metadata lock is enabled") - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) - defer cancel() - d.waiting.Store(true) - defer d.waiting.Store(false) - if err := d.wait4Switch(ctx); err != nil { - return err - } - - var err error - if toConcurrentDDL { - err = d.MoveJobFromQueue2Table(false) - } else { - err = d.MoveJobFromTable2Queue() - } - if err == nil { - variable.EnableConcurrentDDL.Store(toConcurrentDDL) - logutil.BgLogger().Info("[ddl] SwitchConcurrentDDL", zap.Bool("toConcurrentDDL", toConcurrentDDL)) - } else { - logutil.BgLogger().Warn("[ddl] SwitchConcurrentDDL", zap.Bool("toConcurrentDDL", toConcurrentDDL), zap.Error(err)) - } - return err -} - // SwitchMDL enables MDL or disable DDL. func (d *ddl) SwitchMDL(enable bool) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() - // Disable MDL for test. - if enable && !variable.DefTiDBEnableConcurrentDDL { + if enable { sql := fmt.Sprintf("UPDATE HIGH_PRIORITY %[1]s.%[2]s SET VARIABLE_VALUE = %[4]d WHERE VARIABLE_NAME = '%[3]s'", mysql.SystemDB, mysql.GlobalVariablesTable, variable.TiDBEnableMDL, 0) sess, err := d.sessPool.get() @@ -1288,23 +1204,6 @@ func (d *ddl) SwitchMDL(enable bool) error { return nil } -func (d *ddl) wait4Switch(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - d.runningJobs.RLock() - if len(d.runningJobs.ids) == 0 { - d.runningJobs.RUnlock() - return nil - } - d.runningJobs.RUnlock() - time.Sleep(time.Second * 1) - } -} - // RecoverInfo contains information needed by DDL.RecoverTable. type RecoverInfo struct { SchemaID int64 @@ -1419,13 +1318,8 @@ func GetDDLInfo(s sessionctx.Context) (*Info, error) { } t := meta.NewMeta(txn) info.Jobs = make([]*model.Job, 0, 2) - enable := variable.EnableConcurrentDDL.Load() var generalJob, reorgJob *model.Job - if enable { - generalJob, reorgJob, err = get2JobsFromTable(sess) - } else { - generalJob, reorgJob, err = get2JobsFromQueue(t) - } + generalJob, reorgJob, err = get2JobsFromTable(sess) if err != nil { return nil, errors.Trace(err) } @@ -1446,7 +1340,7 @@ func GetDDLInfo(s sessionctx.Context) (*Info, error) { return info, nil } - _, info.ReorgHandle, _, _, err = newReorgHandler(t, sess, enable).GetDDLReorgHandle(reorgJob) + _, info.ReorgHandle, _, _, err = newReorgHandler(t, sess).GetDDLReorgHandle(reorgJob) if err != nil { if meta.ErrDDLReorgElementNotExist.Equal(err) { return info, nil @@ -1457,19 +1351,6 @@ func GetDDLInfo(s sessionctx.Context) (*Info, error) { return info, nil } -func get2JobsFromQueue(t *meta.Meta) (*model.Job, *model.Job, error) { - generalJob, err := t.GetDDLJobByIdx(0) - if err != nil { - return nil, nil, errors.Trace(err) - } - reorgJob, err := t.GetDDLJobByIdx(0, meta.AddIndexJobListKey) - if err != nil { - return nil, nil, errors.Trace(err) - } - - return generalJob, reorgJob, nil -} - func get2JobsFromTable(sess *session) (*model.Job, *model.Job, error) { var generalJob, reorgJob *model.Job jobs, err := getJobsBySQL(sess, JobTable, "not reorg order by job_id limit 1") @@ -1491,82 +1372,8 @@ func get2JobsFromTable(sess *session) (*model.Job, *model.Job, error) { } // CancelJobs cancels the DDL jobs. -func CancelJobs(se sessionctx.Context, store kv.Storage, ids []int64) (errs []error, err error) { - if variable.EnableConcurrentDDL.Load() { - return cancelConcurrencyJobs(se, ids) - } - - err = kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(ctx context.Context, txn kv.Transaction) error { - errs, err = cancelLegacyJobs(txn, ids) - return err - }) - return -} - -func cancelLegacyJobs(txn kv.Transaction, ids []int64) ([]error, error) { - if len(ids) == 0 { - return nil, nil - } - - errs := make([]error, len(ids)) - t := meta.NewMeta(txn) - generalJobs, err := getDDLJobsInQueue(t, meta.DefaultJobListKey) - if err != nil { - return nil, errors.Trace(err) - } - addIdxJobs, err := getDDLJobsInQueue(t, meta.AddIndexJobListKey) - if err != nil { - return nil, errors.Trace(err) - } - jobs := append(generalJobs, addIdxJobs...) - jobsMap := make(map[int64]int) - for i, id := range ids { - jobsMap[id] = i - } - for j, job := range jobs { - i, ok := jobsMap[job.ID] - if !ok { - logutil.BgLogger().Debug("the job that needs to be canceled isn't equal to current job", - zap.Int64("need to canceled job ID", job.ID), - zap.Int64("current job ID", job.ID)) - continue - } - delete(jobsMap, job.ID) - // These states can't be cancelled. - if job.IsDone() || job.IsSynced() { - errs[i] = dbterror.ErrCancelFinishedDDLJob.GenWithStackByArgs(job.ID) - continue - } - // If the state is rolling back, it means the work is cleaning the data after cancelling the job. - if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() { - continue - } - if !job.IsRollbackable() { - errs[i] = dbterror.ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID) - continue - } - - job.State = model.JobStateCancelling - // Make sure RawArgs isn't overwritten. - err := json.Unmarshal(job.RawArgs, &job.Args) - if err != nil { - errs[i] = errors.Trace(err) - continue - } - if j >= len(generalJobs) { - offset := int64(j - len(generalJobs)) - err = t.UpdateDDLJob(offset, job, true, meta.AddIndexJobListKey) - } else { - err = t.UpdateDDLJob(int64(j), job, true) - } - if err != nil { - errs[i] = errors.Trace(err) - } - } - for id, i := range jobsMap { - errs[i] = dbterror.ErrDDLJobNotFound.GenWithStackByArgs(id) - } - return errs, nil +func CancelJobs(se sessionctx.Context, ids []int64) (errs []error, err error) { + return cancelConcurrencyJobs(se, ids) } // cancelConcurrencyJobs cancels the DDL jobs that are in the concurrent state. @@ -1645,45 +1452,9 @@ func cancelConcurrencyJobs(se sessionctx.Context, ids []int64) ([]error, error) return errs, nil } -func getDDLJobsInQueue(t *meta.Meta, jobListKey meta.JobListKeyType) ([]*model.Job, error) { - cnt, err := t.DDLJobQueueLen(jobListKey) - if err != nil { - return nil, errors.Trace(err) - } - jobs := make([]*model.Job, cnt) - for i := range jobs { - jobs[i], err = t.GetDDLJobByIdx(int64(i), jobListKey) - if err != nil { - return nil, errors.Trace(err) - } - } - return jobs, nil -} - // GetAllDDLJobs get all DDL jobs and sorts jobs by job.ID. func GetAllDDLJobs(sess sessionctx.Context, t *meta.Meta) ([]*model.Job, error) { - if variable.EnableConcurrentDDL.Load() { - return getJobsBySQL(newSession(sess), JobTable, "1 order by job_id") - } - - return getDDLJobs(t) -} - -// getDDLJobs get all DDL jobs and sorts jobs by job.ID. -func getDDLJobs(t *meta.Meta) ([]*model.Job, error) { - generalJobs, err := getDDLJobsInQueue(t, meta.DefaultJobListKey) - if err != nil { - return nil, errors.Trace(err) - } - addIdxJobs, err := getDDLJobsInQueue(t, meta.AddIndexJobListKey) - if err != nil { - return nil, errors.Trace(err) - } - jobs := append(generalJobs, addIdxJobs...) - slices.SortFunc(jobs, func(i, j *model.Job) bool { - return i.ID < j.ID - }) - return jobs, nil + return getJobsBySQL(newSession(sess), JobTable, "1 order by job_id") } // MaxHistoryJobs is exported for testing. @@ -1873,19 +1644,11 @@ func GetHistoryJobByID(sess sessionctx.Context, id int64) (*model.Job, error) { return job, errors.Trace(err) } -// AddHistoryDDLJobForTest used for test. -func AddHistoryDDLJobForTest(sess sessionctx.Context, t *meta.Meta, job *model.Job, updateRawArgs bool) error { - return AddHistoryDDLJob(newSession(sess), t, job, updateRawArgs, variable.EnableConcurrentDDL.Load()) -} - // AddHistoryDDLJob record the history job. -func AddHistoryDDLJob(sess *session, t *meta.Meta, job *model.Job, updateRawArgs bool, concurrentDDL bool) error { - if concurrentDDL { - // only add history job into table if it is concurrent DDL. - err := addHistoryDDLJob2Table(sess, job, updateRawArgs) - if err != nil { - logutil.BgLogger().Info("[ddl] failed to add DDL job to history table", zap.Error(err)) - } +func AddHistoryDDLJob(sess *session, t *meta.Meta, job *model.Job, updateRawArgs bool) error { + err := addHistoryDDLJob2Table(sess, job, updateRawArgs) + if err != nil { + logutil.BgLogger().Info("[ddl] failed to add DDL job to history table", zap.Error(err)) } // we always add history DDL job to job list at this moment. return t.AddHistoryDDLJob(job, updateRawArgs) diff --git a/ddl/ddl_api_test.go b/ddl/ddl_api_test.go index f4010015f5456..9f36cc95f806c 100644 --- a/ddl/ddl_api_test.go +++ b/ddl/ddl_api_test.go @@ -115,73 +115,6 @@ func TestGetDDLJobsIsSort(t *testing.T) { require.NoError(t, err) } -func TestGetHistoryDDLJobs(t *testing.T) { - store := testkit.CreateMockStore(t) - - // delete the internal DDL record. - err := kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, false, func(ctx context.Context, txn kv.Transaction) error { - return meta.NewMeta(txn).ClearAllHistoryJob() - }) - require.NoError(t, err) - testkit.NewTestKit(t, store).MustExec("delete from mysql.tidb_ddl_history") - - tk := testkit.NewTestKit(t, store) - sess := tk.Session() - tk.MustExec("begin") - - txn, err := sess.Txn(true) - require.NoError(t, err) - - m := meta.NewMeta(txn) - cnt := 11 - jobs := make([]*model.Job, cnt) - for i := 0; i < cnt; i++ { - jobs[i] = &model.Job{ - ID: int64(i), - SchemaID: 1, - Type: model.ActionCreateTable, - } - err = ddl.AddHistoryDDLJobForTest(sess, m, jobs[i], true) - require.NoError(t, err) - - historyJobs, err := ddl.GetLastNHistoryDDLJobs(m, ddl.DefNumHistoryJobs) - require.NoError(t, err) - - if i+1 > ddl.MaxHistoryJobs { - require.Len(t, historyJobs, ddl.MaxHistoryJobs) - } else { - require.Len(t, historyJobs, i+1) - } - } - - delta := cnt - ddl.MaxHistoryJobs - historyJobs, err := ddl.GetLastNHistoryDDLJobs(m, ddl.DefNumHistoryJobs) - require.NoError(t, err) - require.Len(t, historyJobs, ddl.MaxHistoryJobs) - - l := len(historyJobs) - 1 - for i, job := range historyJobs { - require.Equal(t, jobs[delta+l-i].ID, job.ID) - require.Equal(t, int64(1), job.SchemaID) - require.Equal(t, model.ActionCreateTable, job.Type) - } - - var historyJobs2 []*model.Job - err = ddl.IterHistoryDDLJobs(txn, func(jobs []*model.Job) (b bool, e error) { - for _, job := range jobs { - historyJobs2 = append(historyJobs2, job) - if len(historyJobs2) == ddl.DefNumHistoryJobs { - return true, nil - } - } - return false, nil - }) - require.NoError(t, err) - require.Equal(t, historyJobs, historyJobs2) - - tk.MustExec("rollback") -} - func TestIsJobRollbackable(t *testing.T) { cases := []struct { tp model.ActionType diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index a2e75119e4d12..6b210d2445c26 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -19,7 +19,6 @@ import ( "testing" "time" - "github.com/pingcap/failpoint" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" @@ -271,109 +270,6 @@ func TestBuildJobDependence(t *testing.T) { require.NoError(t, err) } -func TestNotifyDDLJob(t *testing.T) { - store := createMockStore(t) - defer func() { - require.NoError(t, store.Close()) - }() - - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/NoDDLDispatchLoop", `return(true)`)) - defer require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/NoDDLDispatchLoop")) - - getFirstNotificationAfterStartDDL := func(d *ddl) { - select { - case <-d.workers[addIdxWorker].ddlJobCh: - default: - // The notification may be received by the worker. - } - select { - case <-d.workers[generalWorker].ddlJobCh: - default: - // The notification may be received by the worker. - } - - select { - case <-d.ddlJobCh: - default: - } - } - - d, err := testNewDDLAndStart( - context.Background(), - WithStore(store), - WithLease(testLease), - ) - require.NoError(t, err) - defer func() { - require.NoError(t, d.Stop()) - }() - getFirstNotificationAfterStartDDL(d) - // Ensure that the notification is not handled in workers `start` function. - d.cancel() - for _, worker := range d.workers { - worker.Close() - } - - job := &model.Job{ - SchemaID: 1, - TableID: 2, - Type: model.ActionCreateTable, - BinlogInfo: &model.HistoryInfo{}, - Args: []interface{}{}, - } - // Test the notification mechanism of the owner and the server receiving the DDL request on the same TiDB. - // This DDL request is a general DDL job. - d.asyncNotifyWorker(job) - select { - case <-d.workers[generalWorker].ddlJobCh: - case <-d.ddlJobCh: - default: - require.FailNow(t, "do not get the general job notification") - } - // Test the notification mechanism of the owner and the server receiving the DDL request on the same TiDB. - // This DDL request is a add index DDL job. - job.Type = model.ActionAddIndex - d.asyncNotifyWorker(job) - select { - case <-d.workers[addIdxWorker].ddlJobCh: - case <-d.ddlJobCh: - default: - require.FailNow(t, "do not get the add index job notification") - } - - // Test the notification mechanism that the owner and the server receiving the DDL request are not on the same TiDB. - // And the etcd client is nil. - d1, err := testNewDDLAndStart( - context.Background(), - WithStore(store), - WithLease(testLease), - ) - require.NoError(t, err) - defer func() { - require.NoError(t, d1.Stop()) - }() - getFirstNotificationAfterStartDDL(d1) - // Ensure that the notification is not handled by worker's "start". - d1.cancel() - for _, worker := range d1.workers { - worker.Close() - } - d1.ownerManager.RetireOwner() - d1.asyncNotifyWorker(job) - job.Type = model.ActionCreateTable - d1.asyncNotifyWorker(job) - require.False(t, d1.OwnerManager().IsOwner()) - select { - case <-d1.workers[addIdxWorker].ddlJobCh: - require.FailNow(t, "should not get the add index job notification") - case <-d1.workers[generalWorker].ddlJobCh: - require.FailNow(t, "should not get the general job notification") - case <-d1.ddlJobCh: - require.FailNow(t, "should not get the job notification") - default: - } -} - func TestError(t *testing.T) { kvErrs := []*terror.Error{ dbterror.ErrDDLJobNotFound, diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index a760cb598129b..7843fac34a69e 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -97,8 +97,6 @@ type worker struct { logCtx context.Context lockSeqNum bool - concurrentDDL bool - *ddlCtx } @@ -123,7 +121,7 @@ func NewJobContext() *JobContext { } } -func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRangeMgr delRangeManager, dCtx *ddlCtx, concurrentDDL bool) *worker { +func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRangeMgr delRangeManager, dCtx *ddlCtx) *worker { worker := &worker{ id: ddlWorkerID.Add(1), tp: tp, @@ -132,7 +130,6 @@ func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRan ddlCtx: dCtx, sessPool: sessPool, delRangeManager: delRangeMgr, - concurrentDDL: concurrentDDL, } worker.addingDDLJobKey = addingDDLJobPrefix + worker.typeStr() worker.logCtx = logutil.WithKeyValue(context.Background(), "worker", worker.String()) @@ -165,59 +162,6 @@ func (w *worker) Close() { logutil.Logger(w.logCtx).Info("[ddl] DDL worker closed", zap.Duration("take time", time.Since(startTime))) } -// start is used for async online schema changing, it will try to become the owner firstly, -// then wait or pull the job queue to handle a schema change job. -func (w *worker) start(d *ddlCtx) { - logutil.Logger(w.logCtx).Info("[ddl] start DDL worker") - defer w.wg.Done() - defer tidbutil.Recover( - metrics.LabelDDLWorker, - fmt.Sprintf("DDL ID %s, %s start", d.uuid, w), - nil, true, - ) - - // We use 4 * lease time to check owner's timeout, so here, we will update owner's status - // every 2 * lease time. If lease is 0, we will use default 1s. - // But we use etcd to speed up, normally it takes less than 1s now, so we use 1s as the max value. - checkTime := chooseLeaseTime(2*d.lease, 1*time.Second) - - ticker := time.NewTicker(checkTime) - defer ticker.Stop() - var notifyDDLJobByEtcdCh clientv3.WatchChan - if d.etcdCli != nil { - notifyDDLJobByEtcdCh = d.etcdCli.Watch(context.Background(), w.addingDDLJobKey) - } - - rewatchCnt := 0 - for { - ok := true - select { - case <-ticker.C: - logutil.Logger(w.logCtx).Debug("[ddl] wait to check DDL status again", zap.Duration("interval", checkTime)) - case <-w.ddlJobCh: - case _, ok = <-notifyDDLJobByEtcdCh: - case <-w.ctx.Done(): - return - } - - if !ok { - logutil.Logger(w.logCtx).Warn("[ddl] start worker watch channel closed", zap.String("watch key", w.addingDDLJobKey)) - notifyDDLJobByEtcdCh = d.etcdCli.Watch(context.Background(), w.addingDDLJobKey) - rewatchCnt++ - if rewatchCnt > 10 { - time.Sleep(time.Duration(rewatchCnt) * time.Second) - } - continue - } - - rewatchCnt = 0 - err := w.handleDDLJobQueue(d) - if err != nil { - logutil.Logger(w.logCtx).Warn("[ddl] handle DDL job failed", zap.Error(err)) - } - } -} - func (d *ddl) asyncNotifyByEtcd(addingDDLJobKey string, job *model.Job) { if d.etcdCli == nil { return @@ -239,37 +183,6 @@ func asyncNotify(ch chan struct{}) { } } -// buildJobDependence sets the curjob's dependency-ID. -// The dependency-job's ID must less than the current job's ID, and we need the largest one in the list. -func buildJobDependence(t *meta.Meta, curJob *model.Job) error { - // Jobs in the same queue are ordered. If we want to find a job's dependency-job, we need to look for - // it from the other queue. So if the job is "ActionAddIndex" job, we need find its dependency-job from DefaultJobList. - jobListKey := meta.DefaultJobListKey - if !curJob.MayNeedReorg() { - jobListKey = meta.AddIndexJobListKey - } - jobs, err := t.GetAllDDLJobsInQueue(jobListKey) - if err != nil { - return errors.Trace(err) - } - - for _, job := range jobs { - if curJob.ID < job.ID { - continue - } - isDependent, err := curJob.IsDependentOn(job) - if err != nil { - return errors.Trace(err) - } - if isDependent { - logutil.BgLogger().Info("[ddl] current DDL job depends on other job", zap.String("currentJob", curJob.String()), zap.String("dependentJob", job.String())) - curJob.DependencyID = job.ID - break - } - } - return nil -} - func (d *ddl) limitDDLJobs() { defer tidbutil.Recover(metrics.LabelDDL, "limitDDLJobs", nil, true) @@ -295,7 +208,7 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { startTime := time.Now() var err error // DDLForce2Queue is a flag to tell DDL worker to always push the job to the DDL queue. - toTable := variable.EnableConcurrentDDL.Load() && !variable.DDLForce2Queue.Load() + toTable := !variable.DDLForce2Queue.Load() if toTable { err = d.addBatchDDLJobs2Table(tasks) } else { @@ -315,6 +228,37 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { } } +// buildJobDependence sets the curjob's dependency-ID. +// The dependency-job's ID must less than the current job's ID, and we need the largest one in the list. +func buildJobDependence(t *meta.Meta, curJob *model.Job) error { + // Jobs in the same queue are ordered. If we want to find a job's dependency-job, we need to look for + // it from the other queue. So if the job is "ActionAddIndex" job, we need find its dependency-job from DefaultJobList. + jobListKey := meta.DefaultJobListKey + if !curJob.MayNeedReorg() { + jobListKey = meta.AddIndexJobListKey + } + jobs, err := t.GetAllDDLJobsInQueue(jobListKey) + if err != nil { + return errors.Trace(err) + } + + for _, job := range jobs { + if curJob.ID < job.ID { + continue + } + isDependent, err := curJob.IsDependentOn(job) + if err != nil { + return errors.Trace(err) + } + if isDependent { + logutil.BgLogger().Info("[ddl] current DDL job depends on other job", zap.String("currentJob", curJob.String()), zap.String("dependentJob", job.String())) + curJob.DependencyID = job.ID + break + } + } + return nil +} + func (d *ddl) addBatchDDLJobs2Queue(tasks []*limitJobTask) error { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) return kv.RunInNewTxn(ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error { @@ -444,13 +388,6 @@ func injectFailPointForGetJob(job *model.Job) { }) } -// getFirstDDLJob gets the first DDL job form DDL queue. -func (w *worker) getFirstDDLJob(t *meta.Meta) (*model.Job, error) { - job, err := t.GetDDLJobByIdx(0) - injectFailPointForGetJob(job) - return job, errors.Trace(err) -} - // handleUpdateJobError handles the too large DDL job. func (w *worker) handleUpdateJobError(t *meta.Meta, job *model.Job, err error) error { if err == nil { @@ -471,7 +408,7 @@ func (w *worker) handleUpdateJobError(t *meta.Meta, job *model.Job, err error) e // updateDDLJob updates the DDL job information. // Every time we enter another state except final state, we must call this function. -func (w *worker) updateDDLJob(t *meta.Meta, job *model.Job, meetErr bool) error { +func (w *worker) updateDDLJob(job *model.Job, meetErr bool) error { failpoint.Inject("mockErrEntrySizeTooLarge", func(val failpoint.Value) { if val.(bool) { failpoint.Return(kv.ErrEntryTooLarge) @@ -482,13 +419,7 @@ func (w *worker) updateDDLJob(t *meta.Meta, job *model.Job, meetErr bool) error logutil.Logger(w.logCtx).Info("[ddl] meet something wrong before update DDL job, shouldn't update raw args", zap.String("job", job.String())) } - var err error - if w.concurrentDDL { - err = updateDDLJob2Table(w.sess, job, updateRawArgs) - } else { - err = t.UpdateDDLJob(0, job, updateRawArgs) - } - return errors.Trace(err) + return errors.Trace(updateDDLJob2Table(w.sess, job, updateRawArgs)) } // registerMDLInfo registers metadata lock info. @@ -631,11 +562,7 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { if err != nil { return errors.Trace(err) } - if w.concurrentDDL { - err = w.deleteDDLJob(job) - } else { - _, err = t.DeQueueDDLJob() - } + err = w.deleteDDLJob(job) if err != nil { return errors.Trace(err) } @@ -650,7 +577,7 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { } w.writeDDLSeqNum(job) w.removeJobCtx(job) - err = AddHistoryDDLJob(w.sess, t, job, updateRawArgs, w.concurrentDDL) + err = AddHistoryDDLJob(w.sess, t, job, updateRawArgs) return errors.Trace(err) } @@ -714,13 +641,6 @@ func isDependencyJobDone(t *meta.Meta, job *model.Job) (bool, error) { return true, nil } -func newMetaWithQueueTp(txn kv.Transaction, tp workerType) *meta.Meta { - if tp == addIdxWorker { - return meta.NewMeta(txn, meta.AddIndexJobListKey) - } - return meta.NewMeta(txn) -} - func (w *JobContext) setDDLLabelForTopSQL(job *model.Job) { if !topsqlstate.TopSQLEnabled() || job == nil { return @@ -797,7 +717,7 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) { if err != nil { return 0, err } - if !variable.EnableConcurrentDDL.Load() || d.waiting.Load() { + if d.waiting.Load() { w.sess.rollback() return 0, nil } @@ -867,7 +787,7 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) { d.unlockSchemaVersion(job.ID) return 0, err } - err = w.updateDDLJob(t, job, runJobErr != nil) + err = w.updateDDLJob(job, runJobErr != nil) if err = w.handleUpdateJobError(t, job, err); err != nil { w.sess.rollback() d.unlockSchemaVersion(job.ID) @@ -911,152 +831,6 @@ func (w *JobContext) ddlJobSourceType() string { return w.tp } -// handleDDLJobQueue handles DDL jobs in DDL Job queue. -func (w *worker) handleDDLJobQueue(d *ddlCtx) error { - once := true - waitDependencyJobCnt := 0 - for { - if isChanClosed(w.ctx.Done()) { - return nil - } - - var ( - job *model.Job - schemaVer int64 - runJobErr error - ) - waitTime := 2 * d.lease - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) - err := kv.RunInNewTxn(ctx, d.store, false, func(ctx context.Context, txn kv.Transaction) error { - d.runningJobs.Lock() - // We are not owner, return and retry checking later. - if !d.isOwner() || variable.EnableConcurrentDDL.Load() || d.waiting.Load() { - d.runningJobs.Unlock() - return nil - } - - var err error - t := newMetaWithQueueTp(txn, w.tp) - - // We become the owner. Get the first job and run it. - job, err = w.getFirstDDLJob(t) - if job == nil || err != nil { - d.runningJobs.Unlock() - return errors.Trace(err) - } - d.runningJobs.ids[job.ID] = struct{}{} - d.runningJobs.Unlock() - - defer d.deleteRunningDDLJobMap(job.ID) - - // only general ddls allowed to be executed when TiKV is disk full. - if w.tp == addIdxWorker && job.IsRunning() { - txn.SetDiskFullOpt(kvrpcpb.DiskFullOpt_NotAllowedOnFull) - } - - w.setDDLLabelForTopSQL(job) - w.setDDLSourceForDiagnosis(job) - jobContext := w.jobContext(job) - if tagger := w.getResourceGroupTaggerForTopSQL(job); tagger != nil { - txn.SetOption(kv.ResourceGroupTagger, tagger) - } - if isDone, err1 := isDependencyJobDone(t, job); err1 != nil || !isDone { - return errors.Trace(err1) - } - - if once { - err = waitSchemaSynced(d, job, waitTime) - if err == nil { - once = false - } - return err - } - - if job.IsDone() || job.IsRollbackDone() { - if !job.IsRollbackDone() { - job.State = model.JobStateSynced - } - err = w.finishDDLJob(t, job) - return errors.Trace(err) - } - - d.mu.RLock() - d.mu.hook.OnJobRunBefore(job) - d.mu.RUnlock() - - // set request source type to DDL type - txn.SetOption(kv.RequestSourceType, jobContext.ddlJobSourceType()) - // If running job meets error, we will save this error in job Error - // and retry later if the job is not cancelled. - schemaVer, runJobErr = w.runDDLJob(d, t, job) - if job.IsCancelled() { - txn.Reset() - err = w.finishDDLJob(t, job) - return errors.Trace(err) - } - if runJobErr != nil && !job.IsRollingback() && !job.IsRollbackDone() { - // If the running job meets an error - // and the job state is rolling back, it means that we have already handled this error. - // Some DDL jobs (such as adding indexes) may need to update the table info and the schema version, - // then shouldn't discard the KV modification. - // And the job state is rollback done, it means the job was already finished, also shouldn't discard too. - // Otherwise, we should discard the KV modification when running job. - txn.Reset() - // If error happens after updateSchemaVersion(), then the schemaVer is updated. - // Result in the retry duration is up to 2 * lease. - schemaVer = 0 - } - err = w.updateDDLJob(t, job, runJobErr != nil) - if err = w.handleUpdateJobError(t, job, err); err != nil { - return errors.Trace(err) - } - writeBinlog(d.binlogCli, txn, job) - return nil - }) - - if runJobErr != nil { - // wait a while to retry again. If we don't wait here, DDL will retry this job immediately, - // which may act like a deadlock. - logutil.Logger(w.logCtx).Info("[ddl] run DDL job failed, sleeps a while then retries it.", - zap.Duration("waitTime", GetWaitTimeWhenErrorOccurred()), zap.Error(runJobErr)) - time.Sleep(GetWaitTimeWhenErrorOccurred()) - } - if job != nil { - d.unlockSchemaVersion(job.ID) - } - - if err != nil { - w.unlockSeqNum(err) - return errors.Trace(err) - } else if job == nil { - // No job now, return and retry getting later. - return nil - } - w.unlockSeqNum(err) - w.waitDependencyJobFinished(job, &waitDependencyJobCnt) - - // Here means the job enters another state (delete only, write only, public, etc...) or is cancelled. - // If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update - // the newest schema. - waitSchemaChanged(context.Background(), d, waitTime, schemaVer, job) - - if RunInGoTest { - // d.mu.hook is initialed from domain / test callback, which will force the owner host update schema diff synchronously. - d.mu.RLock() - d.mu.hook.OnSchemaStateChanged(schemaVer) - d.mu.RUnlock() - } - - d.mu.RLock() - d.mu.hook.OnJobUpdated(job) - d.mu.RUnlock() - - if job.IsSynced() || job.IsCancelled() || job.IsRollbackDone() { - asyncNotify(d.ddlJobDoneCh) - } - } -} - func skipWriteBinlog(job *model.Job) bool { switch job.Type { // ActionUpdateTiFlashReplicaStatus is a TiDB internal DDL, diff --git a/ddl/ddl_workerpool_test.go b/ddl/ddl_workerpool_test.go index e9f324ce9dff8..d8768507b8102 100644 --- a/ddl/ddl_workerpool_test.go +++ b/ddl/ddl_workerpool_test.go @@ -26,7 +26,7 @@ import ( func TestDDLWorkerPool(t *testing.T) { f := func() func() (pools.Resource, error) { return func() (pools.Resource, error) { - wk := newWorker(nil, addIdxWorker, nil, nil, nil, true) + wk := newWorker(nil, addIdxWorker, nil, nil, nil) return wk, nil } } diff --git a/ddl/index.go b/ddl/index.go index 929a86b08321b..306723964a693 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -880,7 +880,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) { elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}} - rh := newReorgHandler(t, w.sess, w.concurrentDDL) + rh := newReorgHandler(t, w.sess) dbInfo, err := t.GetDatabase(job.SchemaID) if err != nil { return false, ver, errors.Trace(err) diff --git a/ddl/job_table.go b/ddl/job_table.go index 117b3722eccde..06f745506145a 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -174,7 +174,7 @@ func (d *ddl) startDispatchLoop() { if isChanClosed(d.ctx.Done()) { return } - if !variable.EnableConcurrentDDL.Load() || !d.isOwner() || d.waiting.Load() { + if !d.isOwner() || d.waiting.Load() { d.once.Store(true) time.Sleep(time.Second) continue @@ -547,6 +547,19 @@ func addBackfillJobs(sess *session, tableName string, backfillJobs []*BackfillJo }) } +func runInTxn(se *session, f func(*session) error) (err error) { + err = se.begin() + if err != nil { + return err + } + err = f(se) + if err != nil { + se.rollback() + return + } + return errors.Trace(se.commit()) +} + // GetBackfillJobsForOneEle batch gets the backfill jobs in the tblName table that contains only one element. func GetBackfillJobsForOneEle(sess *session, batch int, excludedJobIDs []int64, lease time.Duration) ([]*BackfillJob, error) { eJobIDsBuilder := strings.Builder{} @@ -718,135 +731,3 @@ func updateBackfillJob(sess *session, tableName string, backfillJob *BackfillJob _, err = sess.execute(context.Background(), sql, label) return err } - -// MoveJobFromQueue2Table move existing DDLs in queue to table. -func (d *ddl) MoveJobFromQueue2Table(inBootstrap bool) error { - sess, err := d.sessPool.get() - if err != nil { - return err - } - defer d.sessPool.put(sess) - return runInTxn(newSession(sess), func(se *session) error { - txn, err := se.txn() - if err != nil { - return errors.Trace(err) - } - t := meta.NewMeta(txn) - isConcurrentDDL, err := t.IsConcurrentDDL() - if !inBootstrap && (isConcurrentDDL || err != nil) { - return errors.Trace(err) - } - systemDBID, err := t.GetSystemDBID() - if err != nil { - return errors.Trace(err) - } - for _, tp := range []workerType{addIdxWorker, generalWorker} { - t := newMetaWithQueueTp(txn, tp) - jobs, err := t.GetAllDDLJobsInQueue() - if err != nil { - return errors.Trace(err) - } - for _, job := range jobs { - // In bootstrap, we can ignore the internal DDL. - if inBootstrap && job.SchemaID == systemDBID { - continue - } - err = insertDDLJobs2Table(se, false, job) - if err != nil { - return errors.Trace(err) - } - if tp == generalWorker { - // General job do not have reorg info. - continue - } - element, start, end, pid, err := t.GetDDLReorgHandle(job) - if meta.ErrDDLReorgElementNotExist.Equal(err) { - continue - } - if err != nil { - return errors.Trace(err) - } - err = initDDLReorgHandle(se, job.ID, start, end, pid, element) - if err != nil { - return errors.Trace(err) - } - } - } - - if err = t.ClearALLDDLJob(); err != nil { - return errors.Trace(err) - } - if err = t.ClearAllDDLReorgHandle(); err != nil { - return errors.Trace(err) - } - return t.SetConcurrentDDL(true) - }) -} - -// MoveJobFromTable2Queue move existing DDLs in table to queue. -func (d *ddl) MoveJobFromTable2Queue() error { - sess, err := d.sessPool.get() - if err != nil { - return err - } - defer d.sessPool.put(sess) - return runInTxn(newSession(sess), func(se *session) error { - txn, err := se.txn() - if err != nil { - return errors.Trace(err) - } - t := meta.NewMeta(txn) - isConcurrentDDL, err := t.IsConcurrentDDL() - if !isConcurrentDDL || err != nil { - return errors.Trace(err) - } - jobs, err := getJobsBySQL(se, "tidb_ddl_job", "1 order by job_id") - if err != nil { - return errors.Trace(err) - } - - for _, job := range jobs { - jobListKey := meta.DefaultJobListKey - if job.MayNeedReorg() { - jobListKey = meta.AddIndexJobListKey - } - if err := t.EnQueueDDLJobNoUpdate(job, jobListKey); err != nil { - return errors.Trace(err) - } - } - - reorgHandle, err := se.execute(context.Background(), "select job_id, start_key, end_key, physical_id, ele_id, ele_type from mysql.tidb_ddl_reorg", "get_handle") - if err != nil { - return errors.Trace(err) - } - for _, row := range reorgHandle { - if err := t.UpdateDDLReorgHandle(row.GetInt64(0), row.GetBytes(1), row.GetBytes(2), row.GetInt64(3), &meta.Element{ID: row.GetInt64(4), TypeKey: row.GetBytes(5)}); err != nil { - return errors.Trace(err) - } - } - - // clean up these 2 tables. - _, err = se.execute(context.Background(), "delete from mysql.tidb_ddl_job", "delete_old_ddl") - if err != nil { - return errors.Trace(err) - } - _, err = se.execute(context.Background(), "delete from mysql.tidb_ddl_reorg", "delete_old_reorg") - if err != nil { - return errors.Trace(err) - } - return t.SetConcurrentDDL(false) - }) -} - -func runInTxn(se *session, f func(*session) error) (err error) { - err = se.begin() - if err != nil { - return err - } - err = f(se) - if err != nil { - se.rollback() - return - } - return errors.Trace(se.commit()) -} diff --git a/ddl/job_table_test.go b/ddl/job_table_test.go index ca30cf903107d..8948796e73243 100644 --- a/ddl/job_table_test.go +++ b/ddl/job_table_test.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/types" @@ -40,9 +39,6 @@ import ( // This test checks the chosen job records to see if there are wrong scheduling, if job A and job B cannot run concurrently, // then the all the record of job A must before or after job B, no cross record between these 2 jobs should be in between. func TestDDLScheduling(t *testing.T) { - if !variable.EnableConcurrentDDL.Load() { - t.Skipf("test requires concurrent ddl") - } store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go index e211e9d51ca77..9f84ca6aeeed8 100644 --- a/ddl/multi_schema_change_test.go +++ b/ddl/multi_schema_change_test.go @@ -1263,7 +1263,7 @@ func (c *cancelOnceHook) OnJobUpdated(job *model.Job) { return } c.triggered = true - errs, err := ddl.CancelJobs(c.s, c.store, []int64{job.ID}) + errs, err := ddl.CancelJobs(c.s, []int64{job.ID}) if errs[0] != nil { c.cancelErr = errs[0] return diff --git a/ddl/partition.go b/ddl/partition.go index 2c95f389707f9..37b9f8bb2f5c7 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1756,7 +1756,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( elements = append(elements, &meta.Element{ID: idxInfo.ID, TypeKey: meta.IndexElementKey}) } } - rh := newReorgHandler(t, w.sess, w.concurrentDDL) + rh := newReorgHandler(t, w.sess) reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job), d, rh, job, dbInfo, tbl, physicalTableIDs, elements) if err != nil || reorgInfo.first { diff --git a/ddl/reorg.go b/ddl/reorg.go index d7671031f64d1..a394d1682db82 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -34,7 +34,6 @@ import ( "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" @@ -275,8 +274,7 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo // Update a reorgInfo's handle. // Since daemon-worker is triggered by timer to store the info half-way. // you should keep these infos is read-only (like job) / atomic (like doneKey & element) / concurrent safe. - err := rh.UpdateDDLReorgStartHandle(job, currentElement, doneKey) - + err := updateDDLReorgStartHandle(rh.s, job, currentElement, doneKey) logutil.BgLogger().Info("[ddl] run reorg job wait timeout", zap.Duration("wait time", waitTimeout), zap.ByteString("element type", currentElement.TypeKey), @@ -673,7 +671,7 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, // We'll try to remove it in the next major TiDB version. if meta.ErrDDLReorgElementNotExist.Equal(err) { job.SnapshotVer = 0 - logutil.BgLogger().Warn("[ddl] get reorg info, the element does not exist", zap.String("job", job.String()), zap.Bool("enableConcurrentDDL", rh.enableConcurrentDDL)) + logutil.BgLogger().Warn("[ddl] get reorg info, the element does not exist", zap.String("job", job.String())) } return &info, errors.Trace(err) } @@ -772,8 +770,8 @@ func (r *reorgInfo) UpdateReorgMeta(startKey kv.Key, pool *sessionPool) (err err sess.rollback() return err } - rh := newReorgHandler(meta.NewMeta(txn), sess, variable.EnableConcurrentDDL.Load()) - err = rh.UpdateDDLReorgHandle(r.Job, startKey, r.EndKey, r.PhysicalTableID, r.currElement) + rh := newReorgHandler(meta.NewMeta(txn), sess) + err = updateDDLReorgHandle(rh.s, r.Job.ID, startKey, r.EndKey, r.PhysicalTableID, r.currElement) err1 := sess.commit() if err == nil { err = err1 @@ -785,63 +783,33 @@ func (r *reorgInfo) UpdateReorgMeta(startKey kv.Key, pool *sessionPool) (err err type reorgHandler struct { m *meta.Meta s *session - - enableConcurrentDDL bool } // NewReorgHandlerForTest creates a new reorgHandler, only used in test. func NewReorgHandlerForTest(t *meta.Meta, sess sessionctx.Context) *reorgHandler { - return newReorgHandler(t, newSession(sess), variable.EnableConcurrentDDL.Load()) + return newReorgHandler(t, newSession(sess)) } -func newReorgHandler(t *meta.Meta, sess *session, enableConcurrentDDL bool) *reorgHandler { - return &reorgHandler{m: t, s: sess, enableConcurrentDDL: enableConcurrentDDL} -} - -// UpdateDDLReorgStartHandle saves the job reorganization latest processed element and start handle for later resuming. -func (r *reorgHandler) UpdateDDLReorgStartHandle(job *model.Job, element *meta.Element, startKey kv.Key) error { - if r.enableConcurrentDDL { - return updateDDLReorgStartHandle(r.s, job, element, startKey) - } - return r.m.UpdateDDLReorgStartHandle(job, element, startKey) -} - -// UpdateDDLReorgHandle saves the job reorganization latest processed information for later resuming. -func (r *reorgHandler) UpdateDDLReorgHandle(job *model.Job, startKey, endKey kv.Key, physicalTableID int64, element *meta.Element) error { - if r.enableConcurrentDDL { - return updateDDLReorgHandle(r.s, job.ID, startKey, endKey, physicalTableID, element) - } - return r.m.UpdateDDLReorgHandle(job.ID, startKey, endKey, physicalTableID, element) +func newReorgHandler(t *meta.Meta, sess *session) *reorgHandler { + return &reorgHandler{m: t, s: sess} } // InitDDLReorgHandle initializes the job reorganization information. func (r *reorgHandler) InitDDLReorgHandle(job *model.Job, startKey, endKey kv.Key, physicalTableID int64, element *meta.Element) error { - if r.enableConcurrentDDL { - return initDDLReorgHandle(r.s, job.ID, startKey, endKey, physicalTableID, element) - } - return r.m.UpdateDDLReorgHandle(job.ID, startKey, endKey, physicalTableID, element) + return initDDLReorgHandle(r.s, job.ID, startKey, endKey, physicalTableID, element) } // RemoveReorgElementFailPoint removes the element of the reorganization information. func (r *reorgHandler) RemoveReorgElementFailPoint(job *model.Job) error { - if r.enableConcurrentDDL { - return removeReorgElement(r.s, job) - } - return r.m.RemoveReorgElement(job) + return removeReorgElement(r.s, job) } // RemoveDDLReorgHandle removes the job reorganization related handles. func (r *reorgHandler) RemoveDDLReorgHandle(job *model.Job, elements []*meta.Element) error { - if r.enableConcurrentDDL { - return removeDDLReorgHandle(r.s, job, elements) - } - return r.m.RemoveDDLReorgHandle(job, elements) + return removeDDLReorgHandle(r.s, job, elements) } // GetDDLReorgHandle gets the latest processed DDL reorganize position. func (r *reorgHandler) GetDDLReorgHandle(job *model.Job) (element *meta.Element, startKey, endKey kv.Key, physicalTableID int64, err error) { - if r.enableConcurrentDDL { - return getDDLReorgHandle(r.s, job) - } - return r.m.GetDDLReorgHandle(job) + return getDDLReorgHandle(r.s, job) } diff --git a/ddl/schema_test.go b/ddl/schema_test.go index 70206ed2f179f..3be4fb4e4d278 100644 --- a/ddl/schema_test.go +++ b/ddl/schema_test.go @@ -163,13 +163,13 @@ func testDropSchema(t *testing.T, ctx sessionctx.Context, d ddl.DDL, dbInfo *mod return job, ver } -func isDDLJobDone(test *testing.T, t *meta.Meta) bool { - job, err := t.GetDDLJobByIdx(0) - require.NoError(test, err) - if job == nil { +func isDDLJobDone(test *testing.T, t *meta.Meta, store kv.Storage) bool { + tk := testkit.NewTestKit(test, store) + rows := tk.MustQuery("select * from mysql.tidb_ddl_job").Rows() + + if len(rows) == 0 { return true } - time.Sleep(testLease) return false } @@ -185,7 +185,7 @@ func testCheckSchemaState(test *testing.T, store kv.Storage, dbInfo *model.DBInf require.NoError(test, err) if state == model.StateNone { - isDropped = isDDLJobDone(test, t) + isDropped = isDDLJobDone(test, t, store) if !isDropped { return nil } diff --git a/ddl/schematracker/checker.go b/ddl/schematracker/checker.go index b1533d0246fb1..6e38f6c8bdb79 100644 --- a/ddl/schematracker/checker.go +++ b/ddl/schematracker/checker.go @@ -541,16 +541,6 @@ func (d Checker) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { return d.realDDL.DoDDLJob(ctx, job) } -// MoveJobFromQueue2Table implements the DDL interface. -func (d Checker) MoveJobFromQueue2Table(bool) error { - panic("implement me") -} - -// MoveJobFromTable2Queue implements the DDL interface. -func (d Checker) MoveJobFromTable2Queue() error { - panic("implement me") -} - // StorageDDLInjector wraps kv.Storage to inject checker to domain's DDL in bootstrap time. type StorageDDLInjector struct { kv.Storage diff --git a/ddl/schematracker/dm_tracker.go b/ddl/schematracker/dm_tracker.go index 75f8fa35b429d..5d3f693deaa0f 100644 --- a/ddl/schematracker/dm_tracker.go +++ b/ddl/schematracker/dm_tracker.go @@ -1256,13 +1256,3 @@ func (SchemaTracker) GetInfoSchemaWithInterceptor(ctx sessionctx.Context) infosc func (SchemaTracker) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { return nil } - -// MoveJobFromQueue2Table implements the DDL interface, it's no-op in DM's case. -func (SchemaTracker) MoveJobFromQueue2Table(b bool) error { - panic("implement me") -} - -// MoveJobFromTable2Queue implements the DDL interface, it's no-op in DM's case. -func (SchemaTracker) MoveJobFromTable2Queue() error { - panic("implement me") -} diff --git a/ddl/stat_test.go b/ddl/stat_test.go index 556b9eb5dadc7..db8abc45be30c 100644 --- a/ddl/stat_test.go +++ b/ddl/stat_test.go @@ -25,14 +25,12 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/testkit" @@ -152,20 +150,13 @@ func TestGetDDLInfo(t *testing.T) { } func addDDLJobs(sess session.Session, txn kv.Transaction, job *model.Job) error { - if variable.EnableConcurrentDDL.Load() { - b, err := job.Encode(true) - if err != nil { - return err - } - _, err = sess.Execute(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), fmt.Sprintf("insert into mysql.tidb_ddl_job(job_id, reorg, schema_ids, table_ids, job_meta, type, processing) values (%d, %t, %s, %s, %s, %d, %t)", - job.ID, job.MayNeedReorg(), strconv.Quote(strconv.FormatInt(job.SchemaID, 10)), strconv.Quote(strconv.FormatInt(job.TableID, 10)), wrapKey2String(b), job.Type, false)) + b, err := job.Encode(true) + if err != nil { return err } - m := meta.NewMeta(txn) - if job.MayNeedReorg() { - return m.EnQueueDDLJob(job, meta.AddIndexJobListKey) - } - return m.EnQueueDDLJob(job) + _, err = sess.Execute(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), fmt.Sprintf("insert into mysql.tidb_ddl_job(job_id, reorg, schema_ids, table_ids, job_meta, type, processing) values (%d, %t, %s, %s, %s, %d, %t)", + job.ID, job.MayNeedReorg(), strconv.Quote(strconv.FormatInt(job.SchemaID, 10)), strconv.Quote(strconv.FormatInt(job.TableID, 10)), wrapKey2String(b), job.Type, false)) + return err } func wrapKey2String(key []byte) string { diff --git a/executor/executor.go b/executor/executor.go index 9f95e63aaed20..603996ad7764f 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -353,7 +353,7 @@ func (e *CancelDDLJobsExec) Open(ctx context.Context) error { if err != nil { return err } - e.errs, err = ddl.CancelJobs(newSess, e.ctx.GetStore(), e.jobIDs) + e.errs, err = ddl.CancelJobs(newSess, e.jobIDs) e.releaseSysSession(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), newSess) return err } diff --git a/expression/integration_serial_test.go b/expression/integration_serial_test.go index 77574b4e309a2..b70b7be4a5070 100644 --- a/expression/integration_serial_test.go +++ b/expression/integration_serial_test.go @@ -3762,16 +3762,6 @@ func TestSetVariables(t *testing.T) { _, err = tk.Exec("set @@global.max_prepared_stmt_count='';") require.Error(t, err) require.Error(t, err, variable.ErrWrongTypeForVar.GenWithStackByArgs("max_prepared_stmt_count").Error()) - - tk.MustExec("set @@global.tidb_enable_concurrent_ddl=1") - tk.MustQuery("select @@global.tidb_enable_concurrent_ddl").Check(testkit.Rows("1")) - require.True(t, variable.EnableConcurrentDDL.Load()) - tk.MustExec("set @@global.tidb_enable_metadata_lock=0") - tk.MustExec("set @@global.tidb_enable_concurrent_ddl=0") - tk.MustQuery("select @@global.tidb_enable_concurrent_ddl").Check(testkit.Rows("0")) - require.False(t, variable.EnableConcurrentDDL.Load()) - testkit.NewTestKit(t, store).MustQuery("select @@global.tidb_enable_concurrent_ddl").Check(testkit.Rows("0")) - tk.MustExec("set @@global.tidb_enable_concurrent_ddl=1") } func TestPreparePlanCache(t *testing.T) { diff --git a/infoschema/cluster_tables_test.go b/infoschema/cluster_tables_test.go index 73488028a26ec..89244bf984631 100644 --- a/infoschema/cluster_tables_test.go +++ b/infoschema/cluster_tables_test.go @@ -41,7 +41,6 @@ import ( "github.com/pingcap/tidb/parser/auth" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/server" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/mockstore/mockstorage" @@ -817,10 +816,6 @@ func (s *clusterTablesSuite) newTestKitWithRoot(t *testing.T) *testkit.TestKit { } func TestMDLView(t *testing.T) { - if !variable.EnableConcurrentDDL.Load() { - t.Skipf("test requires concurrent ddl") - } - // setup suite s := new(clusterTablesSuite) s.store, s.dom = testkit.CreateMockStoreAndDomain(t) diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index d4d4c4fa588f7..574529a822b13 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -1670,10 +1670,6 @@ func TestVariablesInfo(t *testing.T) { tk := testkit.NewTestKit(t, store) - if !variable.EnableConcurrentDDL.Load() { - t.Skip("skip test when concurrent DDL is disabled") - } - tk.MustExec("use information_schema") tk.MustExec("SET GLOBAL innodb_compression_level = 8;") diff --git a/meta/BUILD.bazel b/meta/BUILD.bazel index 791662be8c215..c6c796a9771c1 100644 --- a/meta/BUILD.bazel +++ b/meta/BUILD.bazel @@ -16,10 +16,8 @@ go_library( "//parser/mysql", "//structure", "//util/dbterror", - "//util/logutil", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/kvrpcpb", - "@org_uber_go_zap//:zap", ], ) @@ -33,12 +31,10 @@ go_test( embed = [":meta"], flaky = True, deps = [ - "//ddl", "//kv", "//parser/model", "//store/mockstore", "//testkit/testsetup", - "//testkit/testutil", "//util", "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//require", diff --git a/meta/meta.go b/meta/meta.go index 9f262be8b464d..97f6756a582b2 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -19,7 +19,6 @@ import ( "encoding/binary" "encoding/json" "fmt" - "math" "strconv" "strings" "sync" @@ -34,8 +33,6 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/structure" "github.com/pingcap/tidb/util/dbterror" - "github.com/pingcap/tidb/util/logutil" - "go.uber.org/zap" ) var ( @@ -77,7 +74,6 @@ var ( mPolicyGlobalID = []byte("PolicyGlobalID") mPolicyMagicByte = CurrentMagicByteVer mDDLTableVersion = []byte("DDLTableVersion") - mConcurrentDDL = []byte("concurrentDDL") mMetaDataLock = []byte("metadataLock") ) @@ -129,17 +125,13 @@ type Meta struct { // NewMeta creates a Meta in transaction txn. // If the current Meta needs to handle a job, jobListKey is the type of the job's list. -func NewMeta(txn kv.Transaction, jobListKeys ...JobListKeyType) *Meta { +func NewMeta(txn kv.Transaction) *Meta { txn.SetOption(kv.Priority, kv.PriorityHigh) txn.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) t := structure.NewStructure(txn, txn, mMetaPrefix) - listKey := DefaultJobListKey - if len(jobListKeys) != 0 { - listKey = jobListKeys[0] - } return &Meta{txn: t, StartTS: txn.StartTS(), - jobListKey: listKey, + jobListKey: DefaultJobListKey, } } @@ -622,27 +614,6 @@ func (m *Meta) CheckMDLTableExists() (bool, error) { return bytes.Equal(v, []byte("2")), nil } -// SetConcurrentDDL set the concurrent DDL flag. -func (m *Meta) SetConcurrentDDL(b bool) error { - var data []byte - if b { - data = []byte("1") - } else { - data = []byte("0") - } - return errors.Trace(m.txn.Set(mConcurrentDDL, data)) -} - -// IsConcurrentDDL returns true if the concurrent DDL flag is set. -func (m *Meta) IsConcurrentDDL() (bool, error) { - val, err := m.txn.Get(mConcurrentDDL) - if err != nil { - return false, errors.Trace(err) - } - - return len(val) == 0 || bytes.Equal(val, []byte("1")), nil -} - // SetMetadataLock sets the metadata lock. func (m *Meta) SetMetadataLock(b bool) error { var data []byte @@ -987,12 +958,8 @@ var ( mDDLJobListKey = []byte("DDLJobList") mDDLJobAddIdxList = []byte("DDLJobAddIdxList") mDDLJobHistoryKey = []byte("DDLJobHistory") - mDDLJobReorgKey = []byte("DDLJobReorg") ) -// JobListKeyType is a key type of the DDL job queue. -type JobListKeyType []byte - var ( // DefaultJobListKey keeps all actions of DDL jobs except "add index". DefaultJobListKey JobListKeyType = mDDLJobListKey @@ -1018,31 +985,8 @@ func (m *Meta) EnQueueDDLJob(job *model.Job, jobListKeys ...JobListKeyType) erro return m.enQueueDDLJob(listKey, job, true) } -// EnQueueDDLJobNoUpdate adds a DDL job to the list without update raw args. -func (m *Meta) EnQueueDDLJobNoUpdate(job *model.Job, jobListKeys ...JobListKeyType) error { - listKey := m.jobListKey - if len(jobListKeys) != 0 { - listKey = jobListKeys[0] - } - - return m.enQueueDDLJob(listKey, job, false) -} - -func (m *Meta) deQueueDDLJob(key []byte) (*model.Job, error) { - value, err := m.txn.LPop(key) - if err != nil || value == nil { - return nil, errors.Trace(err) - } - - job := &model.Job{} - err = job.Decode(value) - return job, errors.Trace(err) -} - -// DeQueueDDLJob pops a DDL job from the list. -func (m *Meta) DeQueueDDLJob() (*model.Job, error) { - return m.deQueueDDLJob(m.jobListKey) -} +// JobListKeyType is a key type of the DDL job queue. +type JobListKeyType []byte func (m *Meta) getDDLJob(key []byte, index int64) (*model.Job, error) { value, err := m.txn.LIndex(key, index) @@ -1063,61 +1007,6 @@ func (m *Meta) getDDLJob(key []byte, index int64) (*model.Job, error) { return job, errors.Trace(err) } -// GetDDLJobByIdx returns the corresponding DDL job by the index. -// The length of jobListKeys can only be 1 or 0. -// If its length is 1, we need to replace m.jobListKey with jobListKeys[0]. -// Otherwise, we use m.jobListKey directly. -func (m *Meta) GetDDLJobByIdx(index int64, jobListKeys ...JobListKeyType) (*model.Job, error) { - listKey := m.jobListKey - if len(jobListKeys) != 0 { - listKey = jobListKeys[0] - } - - startTime := time.Now() - job, err := m.getDDLJob(listKey, index) - metrics.MetaHistogram.WithLabelValues(metrics.GetDDLJobByIdx, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) - return job, errors.Trace(err) -} - -// updateDDLJob updates the DDL job with index and key. -// updateRawArgs is used to determine whether to update the raw args when encode the job. -func (m *Meta) updateDDLJob(index int64, job *model.Job, key []byte, updateRawArgs bool) error { - b, err := job.Encode(updateRawArgs) - if err == nil { - err = m.txn.LSet(key, index, b) - } - return errors.Trace(err) -} - -// UpdateDDLJob updates the DDL job with index. -// updateRawArgs is used to determine whether to update the raw args when encode the job. -// The length of jobListKeys can only be 1 or 0. -// If its length is 1, we need to replace m.jobListKey with jobListKeys[0]. -// Otherwise, we use m.jobListKey directly. -func (m *Meta) UpdateDDLJob(index int64, job *model.Job, updateRawArgs bool, jobListKeys ...JobListKeyType) error { - listKey := m.jobListKey - if len(jobListKeys) != 0 { - listKey = jobListKeys[0] - } - - startTime := time.Now() - err := m.updateDDLJob(index, job, listKey, updateRawArgs) - metrics.MetaHistogram.WithLabelValues(metrics.UpdateDDLJob, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) - return errors.Trace(err) -} - -// DDLJobQueueLen returns the DDL job queue length. -// The length of jobListKeys can only be 1 or 0. -// If its length is 1, we need to replace m.jobListKey with jobListKeys[0]. -// Otherwise, we use m.jobListKey directly. -func (m *Meta) DDLJobQueueLen(jobListKeys ...JobListKeyType) (int64, error) { - listKey := m.jobListKey - if len(jobListKeys) != 0 { - listKey = jobListKeys[0] - } - return m.txn.LLen(listKey) -} - // GetAllDDLJobsInQueue gets all DDL Jobs in the current queue. // The length of jobListKeys can only be 1 or 0. // If its length is 1, we need to replace m.jobListKey with jobListKeys[0]. @@ -1152,45 +1041,6 @@ func (*Meta) jobIDKey(id int64) []byte { return b } -func (m *Meta) reorgJobCurrentElement(id int64) []byte { - b := make([]byte, 0, 12) - b = append(b, m.jobIDKey(id)...) - b = append(b, "_ele"...) - return b -} - -func (m *Meta) reorgJobStartHandle(id int64, element *Element) []byte { - b := make([]byte, 0, 16+len(element.TypeKey)) - b = append(b, m.jobIDKey(id)...) - b = append(b, element.TypeKey...) - eID := make([]byte, 8) - binary.BigEndian.PutUint64(eID, uint64(element.ID)) - b = append(b, eID...) - return b -} - -func (*Meta) reorgJobEndHandle(id int64, element *Element) []byte { - b := make([]byte, 8, 25) - binary.BigEndian.PutUint64(b, uint64(id)) - b = append(b, element.TypeKey...) - eID := make([]byte, 8) - binary.BigEndian.PutUint64(eID, uint64(element.ID)) - b = append(b, eID...) - b = append(b, "_end"...) - return b -} - -func (*Meta) reorgJobPhysicalTableID(id int64, element *Element) []byte { - b := make([]byte, 8, 25) - binary.BigEndian.PutUint64(b, uint64(id)) - b = append(b, element.TypeKey...) - eID := make([]byte, 8) - binary.BigEndian.PutUint64(eID, uint64(element.ID)) - b = append(b, eID...) - b = append(b, "_pid"...) - return b -} - func (m *Meta) addHistoryDDLJob(key []byte, job *model.Job, updateRawArgs bool) error { b, err := job.Encode(updateRawArgs) if err == nil { @@ -1352,160 +1202,6 @@ func DecodeElement(b []byte) (*Element, error) { return &Element{ID: int64(id), TypeKey: tp}, nil } -// UpdateDDLReorgStartHandle saves the job reorganization latest processed element and start handle for later resuming. -func (m *Meta) UpdateDDLReorgStartHandle(job *model.Job, element *Element, startKey kv.Key) error { - err := m.txn.HSet(mDDLJobReorgKey, m.reorgJobCurrentElement(job.ID), element.EncodeElement()) - if err != nil { - return errors.Trace(err) - } - if startKey != nil { - err = m.txn.HSet(mDDLJobReorgKey, m.reorgJobStartHandle(job.ID, element), startKey) - if err != nil { - return errors.Trace(err) - } - } - return nil -} - -// UpdateDDLReorgHandle saves the job reorganization latest processed information for later resuming. -func (m *Meta) UpdateDDLReorgHandle(jobID int64, startKey, endKey kv.Key, physicalTableID int64, element *Element) error { - err := m.txn.HSet(mDDLJobReorgKey, m.reorgJobCurrentElement(jobID), element.EncodeElement()) - if err != nil { - return errors.Trace(err) - } - if startKey != nil { - err = m.txn.HSet(mDDLJobReorgKey, m.reorgJobStartHandle(jobID, element), startKey) - if err != nil { - return errors.Trace(err) - } - } - if endKey != nil { - err = m.txn.HSet(mDDLJobReorgKey, m.reorgJobEndHandle(jobID, element), endKey) - if err != nil { - return errors.Trace(err) - } - } - err = m.txn.HSet(mDDLJobReorgKey, m.reorgJobPhysicalTableID(jobID, element), []byte(strconv.FormatInt(physicalTableID, 10))) - return errors.Trace(err) -} - -// ClearAllDDLReorgHandle clears all reorganization related handles. -func (m *Meta) ClearAllDDLReorgHandle() error { - return m.txn.HClear(mDDLJobReorgKey) -} - -// ClearALLDDLJob clears all DDL jobs. -func (m *Meta) ClearALLDDLJob() error { - if err := m.txn.LClear(mDDLJobAddIdxList); err != nil { - return errors.Trace(err) - } - if err := m.txn.LClear(mDDLJobListKey); err != nil { - return errors.Trace(err) - } - return nil -} - -// ClearAllHistoryJob clears all history jobs. **IT IS VERY DANGEROUS** -func (m *Meta) ClearAllHistoryJob() error { - if err := m.txn.HClear(mDDLJobHistoryKey); err != nil { - return errors.Trace(err) - } - return nil -} - -// RemoveReorgElement removes the element of the reorganization information. -func (m *Meta) RemoveReorgElement(job *model.Job) error { - err := m.txn.HDel(mDDLJobReorgKey, m.reorgJobCurrentElement(job.ID)) - if err != nil { - return errors.Trace(err) - } - return nil -} - -// RemoveDDLReorgHandle removes the job reorganization related handles. -func (m *Meta) RemoveDDLReorgHandle(job *model.Job, elements []*Element) error { - if len(elements) == 0 { - return nil - } - - err := m.txn.HDel(mDDLJobReorgKey, m.reorgJobCurrentElement(job.ID)) - if err != nil { - return errors.Trace(err) - } - - for _, element := range elements { - err = m.txn.HDel(mDDLJobReorgKey, m.reorgJobStartHandle(job.ID, element)) - if err != nil { - return errors.Trace(err) - } - if err = m.txn.HDel(mDDLJobReorgKey, m.reorgJobEndHandle(job.ID, element)); err != nil { - logutil.BgLogger().Warn("remove DDL reorg end handle", zap.Error(err)) - } - if err = m.txn.HDel(mDDLJobReorgKey, m.reorgJobPhysicalTableID(job.ID, element)); err != nil { - logutil.BgLogger().Warn("remove DDL reorg physical ID", zap.Error(err)) - } - } - return nil -} - -// GetDDLReorgHandle gets the latest processed DDL reorganize position. -func (m *Meta) GetDDLReorgHandle(job *model.Job) (element *Element, startKey, endKey kv.Key, physicalTableID int64, err error) { - elementBytes, err := m.txn.HGet(mDDLJobReorgKey, m.reorgJobCurrentElement(job.ID)) - if err != nil { - return nil, nil, nil, 0, errors.Trace(err) - } - if elementBytes == nil { - return nil, nil, nil, 0, ErrDDLReorgElementNotExist - } - element, err = DecodeElement(elementBytes) - if err != nil { - return nil, nil, nil, 0, errors.Trace(err) - } - - startKey, err = getReorgJobFieldHandle(m.txn, m.reorgJobStartHandle(job.ID, element)) - if err != nil { - return nil, nil, nil, 0, errors.Trace(err) - } - endKey, err = getReorgJobFieldHandle(m.txn, m.reorgJobEndHandle(job.ID, element)) - if err != nil { - return nil, nil, nil, 0, errors.Trace(err) - } - - physicalTableID, err = m.txn.HGetInt64(mDDLJobReorgKey, m.reorgJobPhysicalTableID(job.ID, element)) - if err != nil { - err = errors.Trace(err) - return - } - - // physicalTableID may be 0, because older version TiDB (without table partition) doesn't store them. - // update them to table's in this case. - if physicalTableID == 0 { - if job.ReorgMeta != nil { - endKey = kv.IntHandle(job.ReorgMeta.EndHandle).Encoded() - } else { - endKey = kv.IntHandle(math.MaxInt64).Encoded() - } - physicalTableID = job.TableID - logutil.BgLogger().Warn("new TiDB binary running on old TiDB DDL reorg data", - zap.Int64("partition ID", physicalTableID), - zap.Stringer("startHandle", startKey), - zap.Stringer("endHandle", endKey)) - } - return -} - -func getReorgJobFieldHandle(t *structure.TxStructure, reorgJobField []byte) (kv.Key, error) { - bs, err := t.HGet(mDDLJobReorgKey, reorgJobField) - if err != nil { - return nil, errors.Trace(err) - } - keyNotFound := bs == nil - if keyNotFound { - return nil, nil - } - return bs, nil -} - func (*Meta) schemaDiffKey(schemaVersion int64) []byte { return []byte(fmt.Sprintf("%s:%d", mSchemaDiffPrefix, schemaVersion)) } diff --git a/meta/meta_test.go b/meta/meta_test.go index 28603f76189ad..d1d932821bce3 100644 --- a/meta/meta_test.go +++ b/meta/meta_test.go @@ -17,18 +17,15 @@ package meta_test import ( "context" "fmt" - "math" "strconv" "testing" "time" "github.com/pingcap/errors" - "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/store/mockstore" - "github.com/pingcap/tidb/testkit/testutil" "github.com/pingcap/tidb/util" "github.com/stretchr/testify/require" ) @@ -448,202 +445,6 @@ func TestElement(t *testing.T) { require.EqualError(t, err, `invalid encoded element "_col_" length 5`) } -func TestDDL(t *testing.T) { - testCases := []struct { - desc string - startHandle kv.Handle - endHandle kv.Handle - }{ - { - "kv.IntHandle", - kv.IntHandle(1), - kv.IntHandle(2), - }, - { - "kv.CommonHandle", - testutil.MustNewCommonHandle(t, "abc", 1222, "string"), - testutil.MustNewCommonHandle(t, "dddd", 1222, "string"), - }, - } - - for _, tc := range testCases { - // copy iterator variable into a new variable, see issue #27779 - tc := tc - t.Run(tc.desc, func(t *testing.T) { - store, err := mockstore.NewMockStore() - require.NoError(t, err) - defer func() { - err := store.Close() - require.NoError(t, err) - }() - - txn, err := store.Begin() - require.NoError(t, err) - - m := meta.NewMeta(txn) - - job := &model.Job{ID: 1} - err = m.EnQueueDDLJob(job) - require.NoError(t, err) - n, err := m.DDLJobQueueLen() - require.NoError(t, err) - require.Equal(t, int64(1), n) - - v, err := m.GetDDLJobByIdx(0) - require.NoError(t, err) - require.Equal(t, job, v) - v, err = m.GetDDLJobByIdx(1) - require.NoError(t, err) - require.Nil(t, v) - - job.ID = 2 - err = m.UpdateDDLJob(0, job, true) - require.NoError(t, err) - - element := &meta.Element{ID: 123, TypeKey: meta.IndexElementKey} - // There are 3 meta key relate to index reorganization: - // start_handle, end_handle and physical_table_id. - // Only start_handle is initialized. - err = m.UpdateDDLReorgStartHandle(job, element, kv.IntHandle(1).Encoded()) - require.NoError(t, err) - - // Since physical_table_id is uninitialized, we simulate older TiDB version that doesn't store them. - // In this case GetDDLReorgHandle always return maxInt64 as end_handle. - e, i, j, k, err := m.GetDDLReorgHandle(job) - require.NoError(t, err) - require.Equal(t, element, e) - require.Equal(t, kv.Key(kv.IntHandle(1).Encoded()), i) - require.Equal(t, kv.Key(kv.IntHandle(math.MaxInt64).Encoded()), j) - require.Equal(t, int64(0), k) - - element = &meta.Element{ID: 222, TypeKey: meta.ColumnElementKey} - err = m.UpdateDDLReorgHandle(job.ID, tc.startHandle.Encoded(), tc.endHandle.Encoded(), 3, element) - require.NoError(t, err) - element1 := &meta.Element{ID: 223, TypeKey: meta.IndexElementKey} - err = m.UpdateDDLReorgHandle(job.ID, tc.startHandle.Encoded(), tc.endHandle.Encoded(), 3, element1) - require.NoError(t, err) - - e, i, j, k, err = m.GetDDLReorgHandle(job) - require.NoError(t, err) - require.Equal(t, element1, e) - require.Equal(t, kv.Key(tc.startHandle.Encoded()), i) - require.Equal(t, kv.Key(tc.endHandle.Encoded()), j) - require.Equal(t, int64(3), k) - - err = m.RemoveDDLReorgHandle(job, []*meta.Element{element, element1}) - require.NoError(t, err) - e, i, j, k, err = m.GetDDLReorgHandle(job) - require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err)) - require.Nil(t, e) - require.Nil(t, i) - require.Nil(t, j) - require.Equal(t, k, int64(0)) - - // new TiDB binary running on old TiDB DDL reorg data. - e, i, j, k, err = m.GetDDLReorgHandle(job) - require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err)) - require.Nil(t, e) - require.Nil(t, i) - require.Nil(t, j) - require.Equal(t, k, int64(0)) - - // Test GetDDLReorgHandle failed. - _, _, _, _, err = m.GetDDLReorgHandle(job) - require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err)) - - v, err = m.DeQueueDDLJob() - require.NoError(t, err) - require.Equal(t, job, v) - - err = m.AddHistoryDDLJob(job, true) - require.NoError(t, err) - v, err = m.GetHistoryDDLJob(2) - require.NoError(t, err) - require.Equal(t, job, v) - - // Add multiple history jobs. - arg := "test arg" - historyJob1 := &model.Job{ID: 1234} - historyJob1.Args = append(job.Args, arg) - err = m.AddHistoryDDLJob(historyJob1, true) - require.NoError(t, err) - historyJob2 := &model.Job{ID: 123} - historyJob2.Args = append(job.Args, arg) - err = m.AddHistoryDDLJob(historyJob2, false) - require.NoError(t, err) - all, err := ddl.GetAllHistoryDDLJobs(m) - require.NoError(t, err) - var lastID int64 - for _, job := range all { - require.Greater(t, job.ID, lastID) - lastID = job.ID - arg1 := "" - err := job.DecodeArgs(&arg1) - require.NoError(t, err) - if job.ID == historyJob1.ID { - require.Equal(t, historyJob1.Args[0], *(job.Args[0].(*string))) - } else { - require.Len(t, job.Args, 0) - } - } - - // Test for get last N history ddl jobs. - historyJobs, err := ddl.GetLastNHistoryDDLJobs(m, 2) - require.NoError(t, err) - require.Len(t, historyJobs, 2) - require.Equal(t, int64(1234), historyJobs[0].ID) - require.Equal(t, int64(123), historyJobs[1].ID) - - // Test GetAllDDLJobsInQueue. - err = m.EnQueueDDLJob(job) - require.NoError(t, err) - job1 := &model.Job{ID: 2} - err = m.EnQueueDDLJob(job1) - require.NoError(t, err) - jobs, err := m.GetAllDDLJobsInQueue() - require.NoError(t, err) - expectJobs := []*model.Job{job, job1} - require.Equal(t, expectJobs, jobs) - - err = txn.Commit(context.Background()) - require.NoError(t, err) - }) - } -} - -func TestAddIndexJob(t *testing.T) { - store, err := mockstore.NewMockStore() - require.NoError(t, err) - defer func() { - err := store.Close() - require.NoError(t, err) - }() - - txn1, err := store.Begin() - require.NoError(t, err) - - m := meta.NewMeta(txn1, meta.AddIndexJobListKey) - job := &model.Job{ID: 1} - err = m.EnQueueDDLJob(job) - require.NoError(t, err) - job.ID = 123 - err = m.UpdateDDLJob(0, job, true, meta.AddIndexJobListKey) - require.NoError(t, err) - v, err := m.GetDDLJobByIdx(0, meta.AddIndexJobListKey) - require.NoError(t, err) - require.Equal(t, job, v) - l, err := m.DDLJobQueueLen(meta.AddIndexJobListKey) - require.NoError(t, err) - require.Equal(t, int64(1), l) - jobs, err := m.GetAllDDLJobsInQueue(meta.AddIndexJobListKey) - require.NoError(t, err) - expectJobs := []*model.Job{job} - require.Equal(t, expectJobs, jobs) - - err = txn1.Commit(context.Background()) - require.NoError(t, err) -} - func BenchmarkGenGlobalIDs(b *testing.B) { store, err := mockstore.NewMockStore() require.NoError(b, err) @@ -773,45 +574,6 @@ func TestSequenceKey(b *testing.T) { require.Equal(b, tableID, id) } -func TestClearJob(t *testing.T) { - store, err := mockstore.NewMockStore() - require.NoError(t, err) - defer func() { - require.NoError(t, store.Close()) - }() - - txn, err := store.Begin() - require.NoError(t, err) - - job1 := &model.Job{ID: 1, TableID: 1, Type: model.ActionAddColumn} - job2 := &model.Job{ID: 2, TableID: 1, Type: model.ActionCreateTable} - job3 := &model.Job{ID: 3, TableID: 2, Type: model.ActionDropColumn} - - m := meta.NewMeta(txn) - - require.NoError(t, m.EnQueueDDLJob(job1)) - require.NoError(t, m.EnQueueDDLJob(job2)) - require.NoError(t, m.EnQueueDDLJob(job3)) - - require.NoError(t, m.AddHistoryDDLJob(job1, false)) - require.NoError(t, m.AddHistoryDDLJob(job2, false)) - - jobs, err := m.GetAllDDLJobsInQueue() - require.NoError(t, err) - require.Len(t, jobs, 3) - require.NoError(t, m.ClearALLDDLJob()) - jobs, err = m.GetAllDDLJobsInQueue() - require.NoError(t, err) - require.Len(t, jobs, 0) - - count, err := m.GetHistoryDDLCount() - require.NoError(t, err) - require.Equal(t, count, uint64(2)) - - err = txn.Rollback() - require.NoError(t, err) -} - func TestCreateMySQLDatabase(t *testing.T) { store, err := mockstore.NewMockStore() require.NoError(t, err) @@ -835,41 +597,3 @@ func TestCreateMySQLDatabase(t *testing.T) { err = txn.Rollback() require.NoError(t, err) } - -func TestDDLTable(t *testing.T) { - store, err := mockstore.NewMockStore() - require.NoError(t, err) - defer func() { - require.NoError(t, store.Close()) - }() - - txn, err := store.Begin() - require.NoError(t, err) - - m := meta.NewMeta(txn) - - exists, err := m.CheckDDLTableExists() - require.NoError(t, err) - require.False(t, exists) - - err = m.SetDDLTables() - require.NoError(t, err) - - exists, err = m.CheckDDLTableExists() - require.NoError(t, err) - require.True(t, exists) - - err = m.SetConcurrentDDL(true) - require.NoError(t, err) - b, err := m.IsConcurrentDDL() - require.NoError(t, err) - require.True(t, b) - err = m.SetConcurrentDDL(false) - require.NoError(t, err) - b, err = m.IsConcurrentDDL() - require.NoError(t, err) - require.False(t, b) - - err = txn.Rollback() - require.NoError(t, err) -} diff --git a/metrics/meta.go b/metrics/meta.go index 519ba6a0924a1..af967fe48a3bb 100644 --- a/metrics/meta.go +++ b/metrics/meta.go @@ -34,8 +34,6 @@ var ( GetSchemaDiff = "get_schema_diff" SetSchemaDiff = "set_schema_diff" - GetDDLJobByIdx = "get_ddl_job" - UpdateDDLJob = "update_ddl_job" GetHistoryDDLJob = "get_history_ddl_job" MetaHistogram = prometheus.NewHistogramVec( diff --git a/session/bootstrap.go b/session/bootstrap.go index 94484eefd9438..56509ffafebfd 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -961,11 +961,6 @@ func upgrade(s Session) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap) _, err = s.ExecuteInternal(ctx, "COMMIT") - if err == nil && ver <= version92 { - logutil.BgLogger().Info("start migrate DDLs") - err = domain.GetDomain(s).DDL().MoveJobFromQueue2Table(true) - } - if err != nil { sleepTime := 1 * time.Second logutil.BgLogger().Info("update bootstrap ver failed", diff --git a/session/session.go b/session/session.go index 9f707e19a1fda..2d95bf3bc73d6 100644 --- a/session/session.go +++ b/session/session.go @@ -3200,15 +3200,14 @@ func InitMDLTable(store kv.Storage) error { // InitMDLVariableForBootstrap initializes the metadata lock variable. func InitMDLVariableForBootstrap(store kv.Storage) error { - initValue := variable.DefTiDBEnableConcurrentDDL err := kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) - return t.SetMetadataLock(initValue) + return t.SetMetadataLock(true) }) if err != nil { return err } - variable.EnableMDL.Store(initValue) + variable.EnableMDL.Store(true) return nil } diff --git a/sessionctx/variable/BUILD.bazel b/sessionctx/variable/BUILD.bazel index e964aaa1fcdcb..b178ccf0a95da 100644 --- a/sessionctx/variable/BUILD.bazel +++ b/sessionctx/variable/BUILD.bazel @@ -32,7 +32,6 @@ go_library( "//parser/types", "//sessionctx/sessionstates", "//sessionctx/stmtctx", - "//sessionctx/variable/featuretag/concurrencyddl", "//sessionctx/variable/featuretag/distributereorg", "//tidb-binlog/pump_client", "//types", diff --git a/sessionctx/variable/featuretag/concurrencyddl/BUILD.bazel b/sessionctx/variable/featuretag/concurrencyddl/BUILD.bazel deleted file mode 100644 index 44c1cede3c2b7..0000000000000 --- a/sessionctx/variable/featuretag/concurrencyddl/BUILD.bazel +++ /dev/null @@ -1,11 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") - -go_library( - name = "concurrencyddl", - srcs = [ - "default.go", - "non_default.go", - ], - importpath = "github.com/pingcap/tidb/sessionctx/variable/featuretag/concurrencyddl", - visibility = ["//visibility:public"], -) diff --git a/sessionctx/variable/featuretag/concurrencyddl/default.go b/sessionctx/variable/featuretag/concurrencyddl/default.go deleted file mode 100644 index 8aca4924268f0..0000000000000 --- a/sessionctx/variable/featuretag/concurrencyddl/default.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build !featuretag - -package concurrencyddl - -// TiDBEnableConcurrentDDL is a feature tag -const TiDBEnableConcurrentDDL bool = true diff --git a/sessionctx/variable/featuretag/concurrencyddl/non_default.go b/sessionctx/variable/featuretag/concurrencyddl/non_default.go deleted file mode 100644 index 72218abe958a3..0000000000000 --- a/sessionctx/variable/featuretag/concurrencyddl/non_default.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build featuretag - -package concurrencyddl - -// TiDBEnableConcurrentDDL is a feature tag -const TiDBEnableConcurrentDDL bool = false diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 777b6996b49be..d56bcab9fb3b2 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1101,18 +1101,6 @@ var defaultSysVars = []*SysVar{ return err }, }, - {Scope: ScopeGlobal, Name: TiDBEnableConcurrentDDL, Value: BoolToOnOff(DefTiDBEnableConcurrentDDL), Type: TypeBool, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { - if EnableConcurrentDDL.Load() != TiDBOptOn(val) { - err := SwitchConcurrentDDL(TiDBOptOn(val)) - if err != nil { - return err - } - EnableConcurrentDDL.Store(TiDBOptOn(val)) - } - return nil - }, GetGlobal: func(_ context.Context, s *SessionVars) (string, error) { - return BoolToOnOff(EnableConcurrentDDL.Load()), nil - }}, {Scope: ScopeGlobal, Name: TiDBEnableMDL, Value: BoolToOnOff(DefTiDBEnableMDL), Type: TypeBool, SetGlobal: func(_ context.Context, vars *SessionVars, val string) error { if EnableMDL.Load() != TiDBOptOn(val) { err := SwitchMDL(TiDBOptOn(val)) diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index c8da3ed5c10e6..5ae1adfac403a 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/sessionctx/variable/featuretag/concurrencyddl" "github.com/pingcap/tidb/sessionctx/variable/featuretag/distributereorg" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/paging" @@ -845,8 +844,6 @@ const ( // TiDBMaxAutoAnalyzeTime is the max time that auto analyze can run. If auto analyze runs longer than the value, it // will be killed. 0 indicates that there is no time limit. TiDBMaxAutoAnalyzeTime = "tidb_max_auto_analyze_time" - // TiDBEnableConcurrentDDL indicates whether to enable the new DDL framework. - TiDBEnableConcurrentDDL = "tidb_enable_concurrent_ddl" // TiDBDDLEnableDistributeReorg indicates whether to enable the new Reorg framework. TiDBDDLEnableDistributeReorg = "tidb_ddl_distribute_reorg" // TiDBGenerateBinaryPlan indicates whether binary plan should be generated in slow log and statements summary. @@ -1097,7 +1094,6 @@ const ( DefTiDBPrepPlanCacheSize = 100 DefTiDBEnablePrepPlanCacheMemoryMonitor = true DefTiDBPrepPlanCacheMemoryGuardRatio = 0.1 - DefTiDBEnableConcurrentDDL = concurrencyddl.TiDBEnableConcurrentDDL DefTiDBDDLEnableDistributeReorg = distributereorg.TiDBEnableDistributeReorg DefTiDBSimplifiedMetrics = false DefTiDBEnablePaging = true @@ -1202,7 +1198,6 @@ var ( MaxAutoAnalyzeTime = atomic.NewInt64(DefTiDBMaxAutoAnalyzeTime) // variables for plan cache PreparedPlanCacheMemoryGuardRatio = atomic.NewFloat64(DefTiDBPrepPlanCacheMemoryGuardRatio) - EnableConcurrentDDL = atomic.NewBool(DefTiDBEnableConcurrentDDL) DDLEnableDistributeReorg = atomic.NewBool(DefTiDBDDLEnableDistributeReorg) DDLForce2Queue = atomic.NewBool(false) EnableNoopVariables = atomic.NewBool(DefTiDBEnableNoopVariables) @@ -1249,8 +1244,6 @@ var ( SetStatsCacheCapacity atomic.Value // SetPDClientDynamicOption is the func registered by domain SetPDClientDynamicOption atomic.Pointer[func(string, string)] - // SwitchConcurrentDDL is the func registered by DDL to switch concurrent DDL. - SwitchConcurrentDDL func(bool) error = nil // SwitchMDL is the func registered by DDL to switch MDL. SwitchMDL func(bool2 bool) error = nil // EnableDDL is the func registered by ddl to enable running ddl in this instance. diff --git a/telemetry/data_feature_usage_test.go b/telemetry/data_feature_usage_test.go index 369073009c0a4..cb3272d110b29 100644 --- a/telemetry/data_feature_usage_test.go +++ b/telemetry/data_feature_usage_test.go @@ -499,10 +499,6 @@ func TestFlashbackCluster(t *testing.T) { } func TestAddIndexAccelerationAndMDL(t *testing.T) { - if !variable.EnableConcurrentDDL.Load() { - t.Skipf("test requires concurrent ddl") - } - store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) usage, err := telemetry.GetFeatureUsage(tk.Session())