diff --git a/DEPS.bzl b/DEPS.bzl index f1e62f4b48de3..0bbb1ceb3335a 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -2645,8 +2645,8 @@ def go_deps(): name = "com_github_opentracing_basictracer_go", build_file_proto_mode = "disable_global", importpath = "github.com/opentracing/basictracer-go", - sum = "h1:YyUAhaEfjoWXclZVJ9sGoNct7j4TVk7lZWlQw5UXuoo=", - version = "v1.0.0", + sum = "h1:Oa1fTSBvAl8pa3U+IJYqrKm0NALwH9OsgwOqDv4xJW0=", + version = "v1.1.0", ) go_repository( name = "com_github_opentracing_contrib_go_stdlib", @@ -2750,6 +2750,14 @@ def go_deps(): sum = "h1:JhzVVoYvbOACxoUmOs6V/G4D5nPVUW73rKvXxP4XUJc=", version = "v0.0.0-20180830031419-95f893ade6f2", ) + go_repository( + name = "com_github_philhofer_fwd", + build_file_proto_mode = "disable", + importpath = "github.com/philhofer/fwd", + sum = "h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ=", + version = "v1.1.1", + ) + go_repository( name = "com_github_pierrec_lz4", build_file_proto_mode = "disable_global", @@ -3193,6 +3201,14 @@ def go_deps(): sum = "h1:y0cMJ0qjii33BnD6tMGcF/+gHYsoKQ6tbwQpy233OII=", version = "v0.0.0-20180711163814-62bca832be04", ) + go_repository( + name = "com_github_silentred_gid", + build_file_proto_mode = "disable", + importpath = "github.com/silentred/gid", + sum = "h1:JdsH8McqPUeY8IN4C0gxENnJG2zysvh+/xDJWhPvGVQ=", + version = "v1.0.0", + ) + go_repository( name = "com_github_sirupsen_logrus", build_file_proto_mode = "disable_global", @@ -3410,6 +3426,14 @@ def go_deps(): sum = "h1:p8XInTnkUlLabBT7bDS3aZCeemO6tJ/7b5mHN8WbSIE=", version = "v2.0.1-0.20220913051514-ffaaf7131a8d", ) + go_repository( + name = "com_github_tikv_minitrace_go", + build_file_proto_mode = "disable", + importpath = "github.com/tikv/minitrace-go", + sum = "h1:nvIrUVo5YJZMsCn6yTxrpgrokIo/wug5N/nL5mc7v50=", + version = "v0.0.0-20220923091513-8e6316bb4097", + ) + go_repository( name = "com_github_tikv_pd_client", build_file_proto_mode = "disable_global", @@ -3431,6 +3455,13 @@ def go_deps(): sum = "h1:phZCcypL/vtx6cGxObJgWZ5wexZF5SXFPLOM+ru0e/M=", version = "v0.1.0", ) + go_repository( + name = "com_github_tinylib_msgp", + build_file_proto_mode = "disable", + importpath = "github.com/tinylib/msgp", + sum = "h1:2gXmtWueD2HefZHQe1QOy9HVzmFrLOVvsXwXBQ0ayy0=", + version = "v1.1.5", + ) go_repository( name = "com_github_tklauser_go_sysconf", @@ -3467,6 +3498,13 @@ def go_deps(): sum = "h1:iAj0a8e6+dXSL7Liq0aXPox36FiN1dBbjA6lt9fl65s=", version = "v2.5.0", ) + go_repository( + name = "com_github_ttacon_chalk", + build_file_proto_mode = "disable", + importpath = "github.com/ttacon/chalk", + sum = "h1:OXcKh35JaYsGMRzpvFkLv/MEyPuL49CThT1pZ8aSml4=", + version = "v0.0.0-20160626202418-22c06c80ed31", + ) go_repository( name = "com_github_twmb_murmur3", diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index 094c7ceb7ac95..2f39cc1a6f2fe 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -40,6 +40,7 @@ go_library( "//br/pkg/utils", "//br/pkg/version", "//config", + "//ddl", "//kv", "//parser/model", "//parser/mysql", diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index d691eb00e52f3..2d3a29a82b6f0 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -21,12 +21,14 @@ go_library( "ddl_api.go", "ddl_tiflash_api.go", "ddl_worker.go", + "ddl_worker_util.go", "ddl_workerpool.go", "delete_range.go", "delete_range_util.go", "foreign_key.go", "generated_column.go", "index.go", + "index_distsql.go", "index_merge_tmp.go", "job_table.go", "mock.go", @@ -99,6 +101,7 @@ go_library( "//util/domainutil", "//util/filter", "//util/gcutil", + "//util/generic", "//util/hack", "//util/logutil", "//util/mathutil", @@ -128,6 +131,8 @@ go_library( "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//tikvrpc", "@com_github_tikv_client_go_v2//txnkv/rangetask", + "@com_github_tikv_minitrace_go//:minitrace-go", + "@com_github_tikv_minitrace_go//jaeger", "@io_etcd_go_etcd_client_v3//:client", "@org_golang_x_exp//slices", "@org_uber_go_atomic//:atomic", diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 0736cbb58215f..6199e7ed40ba2 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/mathutil" decoder "github.com/pingcap/tidb/util/rowDecoder" "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tidb/util/topsql" @@ -155,6 +156,13 @@ type reorgBackfillTask struct { endInclude bool } +func (r *reorgBackfillTask) excludedEndKey() kv.Key { + if r.endInclude { + return r.endKey.Next() + } + return r.endKey +} + func (r *reorgBackfillTask) String() string { physicalID := strconv.FormatInt(r.physicalTableID, 10) startKey := tryDecodeToHandleString(r.startKey) @@ -184,6 +192,8 @@ type backfillWorker struct { closed bool priority int tp backfillWorkerType + + copReqReaders *copReqReaders } func newBackfillWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, @@ -194,25 +204,12 @@ func newBackfillWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable reorgInfo: reorgInfo, batchCnt: int(variable.GetDDLReorgBatchSize()), sessCtx: sessCtx, - taskCh: make(chan *reorgBackfillTask, 1), - resultCh: make(chan *backfillResult, 1), priority: reorgInfo.Job.Priority, tp: tp, } } -func (w *backfillWorker) Close() { - if !w.closed { - w.closed = true - close(w.taskCh) - } -} - -func closeBackfillWorkers(workers []*backfillWorker) { - for _, worker := range workers { - worker.Close() - } -} +func (w *backfillWorker) Close() {} // handleBackfillTask backfills range [task.startHandle, task.endHandle) handle's index to table. func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, bf backfiller) *backfillResult { @@ -285,43 +282,51 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, return result } -func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { +func (w *backfillWorker) run(ctx context.Context, d *ddlCtx, bf backfiller, job *model.Job) { logutil.BgLogger().Info("[ddl] backfill worker start", zap.Stringer("type", w.tp), zap.Int("workerID", w.id)) - defer func() { - w.resultCh <- &backfillResult{err: dbterror.ErrReorgPanic} - }() defer util.Recover(metrics.LabelDDL, "backfillWorker.run", nil, false) for { - task, more := <-w.taskCh - if !more { + exit := false + select { + case task, more := <-w.taskCh: + if !more { + exit = true + break + } + d.setDDLLabelForTopSQL(job) + + logutil.BgLogger().Debug("[ddl] backfill worker got task", zap.Int("workerID", w.id), zap.String("task", task.String())) + failpoint.Inject("mockBackfillRunErr", func() { + if w.id == 0 { + result := &backfillResult{addedCount: 0, nextKey: nil, err: errors.Errorf("mock backfill error")} + w.resultCh <- result + failpoint.Continue() + } + }) + + failpoint.Inject("mockHighLoadForAddIndex", func() { + sqlPrefixes := []string{"alter"} + topsql.MockHighCPULoad(job.Query, sqlPrefixes, 5) + }) + + failpoint.Inject("mockBackfillSlow", func() { + time.Sleep(100 * time.Millisecond) + }) + + // Dynamic change batch size. + w.batchCnt = int(variable.GetDDLReorgBatchSize()) + finish := injectSpan(job.ID, fmt.Sprintf("%s-%d", "handle-backfill-task", w.id)) + result := w.handleBackfillTask(d, task, bf) + finish() + w.resultCh <- result + case <-ctx.Done(): + exit = true + } + if exit { break } - d.setDDLLabelForTopSQL(job) - - logutil.BgLogger().Debug("[ddl] backfill worker got task", zap.Int("workerID", w.id), zap.String("task", task.String())) - failpoint.Inject("mockBackfillRunErr", func() { - if w.id == 0 { - result := &backfillResult{addedCount: 0, nextKey: nil, err: errors.Errorf("mock backfill error")} - w.resultCh <- result - failpoint.Continue() - } - }) - - failpoint.Inject("mockHighLoadForAddIndex", func() { - sqlPrefixes := []string{"alter"} - topsql.MockHighCPULoad(job.Query, sqlPrefixes, 5) - }) - - failpoint.Inject("mockBackfillSlow", func() { - time.Sleep(100 * time.Millisecond) - }) - - // Dynamic change batch size. - w.batchCnt = int(variable.GetDDLReorgBatchSize()) - result := w.handleBackfillTask(d, task, bf) - w.resultCh <- result } logutil.BgLogger().Info("[ddl] backfill worker exit", zap.Stringer("type", w.tp), @@ -357,7 +362,7 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey return ranges, nil } -func waitTaskResults(workers []*backfillWorker, taskCnt int, +func waitTaskResults(resultsCh chan *backfillResult, taskCnt int, totalAddedCount *int64, startKey kv.Key) (kv.Key, int64, error) { var ( addedCount int64 @@ -365,8 +370,7 @@ func waitTaskResults(workers []*backfillWorker, taskCnt int, firstErr error ) for i := 0; i < taskCnt; i++ { - worker := workers[i] - result := <-worker.resultCh + result := <-resultsCh if firstErr == nil && result.err != nil { firstErr = result.err // We should wait all working workers exits, any way. @@ -374,7 +378,8 @@ func waitTaskResults(workers []*backfillWorker, taskCnt int, } if result.err != nil { - logutil.BgLogger().Warn("[ddl] backfill worker failed", zap.Int("workerID", worker.id), + logutil.BgLogger().Warn("[ddl] backfill worker failed", + zap.String("result next key", hex.EncodeToString(result.nextKey)), zap.Error(result.err)) } @@ -390,15 +395,19 @@ func waitTaskResults(workers []*backfillWorker, taskCnt int, // sendTasksAndWait sends tasks to workers, and waits for all the running workers to return results, // there are taskCnt running workers. -func (dc *ddlCtx) sendTasksAndWait(sessPool *sessionPool, reorgInfo *reorgInfo, totalAddedCount *int64, workers []*backfillWorker, batchTasks []*reorgBackfillTask) error { - for i, task := range batchTasks { - workers[i].taskCh <- task +func (dc *ddlCtx) sendTasksAndWait(sessPool *sessionPool, reorgInfo *reorgInfo, totalAddedCount *int64, + batchTasks []*reorgBackfillTask, rTaskCh, wTaskCh chan *reorgBackfillTask, resCh chan *backfillResult) error { + for _, task := range batchTasks { + if rTaskCh != nil { + rTaskCh <- task + } + wTaskCh <- task } startKey := batchTasks[0].startKey taskCnt := len(batchTasks) startTime := time.Now() - nextKey, taskAddedCount, err := waitTaskResults(workers, taskCnt, totalAddedCount, startKey) + nextKey, taskAddedCount, err := waitTaskResults(resCh, taskCnt, totalAddedCount, startKey) elapsedTime := time.Since(startTime) if err == nil { err = dc.isReorgRunnable(reorgInfo.Job) @@ -409,7 +418,6 @@ func (dc *ddlCtx) sendTasksAndWait(sessPool *sessionPool, reorgInfo *reorgInfo, err1 := reorgInfo.UpdateReorgMeta(nextKey, sessPool) metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblError).Observe(elapsedTime.Seconds()) logutil.BgLogger().Warn("[ddl] backfill worker handle batch tasks failed", - zap.Stringer("type", workers[0].tp), zap.ByteString("elementType", reorgInfo.currElement.TypeKey), zap.Int64("elementID", reorgInfo.currElement.ID), zap.Int64("totalAddedCount", *totalAddedCount), @@ -426,7 +434,6 @@ func (dc *ddlCtx) sendTasksAndWait(sessPool *sessionPool, reorgInfo *reorgInfo, dc.getReorgCtx(reorgInfo.Job).setNextKey(nextKey) metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblOK).Observe(elapsedTime.Seconds()) logutil.BgLogger().Info("[ddl] backfill workers successfully processed batch", - zap.Stringer("type", workers[0].tp), zap.ByteString("elementType", reorgInfo.currElement.TypeKey), zap.Int64("elementID", reorgInfo.currElement.ID), zap.Int64("totalAddedCount", *totalAddedCount), @@ -465,9 +472,10 @@ func tryDecodeToHandleString(key kv.Key) string { } // handleRangeTasks sends tasks to workers, and returns remaining kvRanges that is not handled. -func (dc *ddlCtx) handleRangeTasks(sessPool *sessionPool, t table.Table, workers []*backfillWorker, reorgInfo *reorgInfo, - totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) { - batchTasks := make([]*reorgBackfillTask, 0, len(workers)) +func (dc *ddlCtx) handleRangeTasks(sessPool *sessionPool, t table.Table, batchSize int, reorgInfo *reorgInfo, + totalAddedCount *int64, kvRanges []kv.KeyRange, rTaskCh, wTaskCh chan *reorgBackfillTask, retCh chan *backfillResult) ([]kv.KeyRange, error) { + defer injectSpan(reorgInfo.ID, "send-wait-tasks")() + batchTasks := make([]*reorgBackfillTask, 0, batchSize) physicalTableID := reorgInfo.PhysicalTableID var prefix kv.Key @@ -476,10 +484,11 @@ func (dc *ddlCtx) handleRangeTasks(sessPool *sessionPool, t table.Table, workers } else { prefix = t.RecordPrefix() } + job := reorgInfo.Job // Build reorg tasks. for i, keyRange := range kvRanges { endKey := keyRange.EndKey - endK, err := getRangeEndKey(reorgInfo.d.jobContext(reorgInfo.Job), workers[0].sessCtx.GetStore(), workers[0].priority, prefix, keyRange.StartKey, endKey) + endK, err := getRangeEndKey(reorgInfo.d.jobContext(job), dc.store, job.Priority, prefix, keyRange.StartKey, endKey) if err != nil { logutil.BgLogger().Info("[ddl] send range task to workers, get reverse key failed", zap.Error(err)) } else { @@ -496,7 +505,7 @@ func (dc *ddlCtx) handleRangeTasks(sessPool *sessionPool, t table.Table, workers endInclude: endK.Cmp(keyRange.EndKey) != 0 || i == len(kvRanges)-1} batchTasks = append(batchTasks, task) - if len(batchTasks) >= len(workers) { + if len(batchTasks) >= batchSize { break } } @@ -506,7 +515,7 @@ func (dc *ddlCtx) handleRangeTasks(sessPool *sessionPool, t table.Table, workers } // Wait tasks finish. - err := dc.sendTasksAndWait(sessPool, reorgInfo, totalAddedCount, workers, batchTasks) + err := dc.sendTasksAndWait(sessPool, reorgInfo, totalAddedCount, batchTasks, rTaskCh, wTaskCh, retCh) if err != nil { return nil, errors.Trace(err) } @@ -569,6 +578,8 @@ func setSessCtxLocation(sctx sessionctx.Context, info *reorgInfo) error { return nil } +const backfillTaskBatchSize = 1024 + // writePhysicalTableRecord handles the "add index" or "modify/change column" reorganization state for a non-partitioned table or a partition. // For a partitioned table, it should be handled partition by partition. // @@ -612,16 +623,20 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic // variable.ddlReorgWorkerCounter can be modified by system variable "tidb_ddl_reorg_worker_cnt". workerCnt := variable.GetDDLReorgWorkerCounter() backfillWorkers := make([]*backfillWorker, 0, workerCnt) - defer func() { - closeBackfillWorkers(backfillWorkers) - }() + backfillCtxCancels := make([]func(), 0, workerCnt) jc := dc.jobContext(job) + wTaskCh := make(chan *reorgBackfillTask, backfillTaskBatchSize) + resultCh := make(chan *backfillResult, backfillTaskBatchSize) + var rTaskCh chan *reorgBackfillTask for { + finish := injectSpan(job.ID, "split-table-ranges") kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey) if err != nil { + finish() return errors.Trace(err) } + finish() // For dynamic adjust backfill worker number. if err := loadDDLReorgVars(dc.ctx, sessPool); err != nil { @@ -633,6 +648,11 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic if len(kvRanges) < int(workerCnt) { workerCnt = int32(len(kvRanges)) } + var copReqReaders *copReqReaders + useCopRead := variable.EnableCoprRead.Load() != "0" + if useCopRead { + workerCnt = mathutil.Max(workerCnt/2, 1) + } // Enlarge the worker size. for i := len(backfillWorkers); i < int(workerCnt); i++ { sessCtx := newContext(reorgInfo.d.store) @@ -653,6 +673,8 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic sessCtx.GetSessionVars().StmtCtx.DividedByZeroAsWarning = !sqlMode.HasStrictMode() sessCtx.GetSessionVars().StmtCtx.IgnoreZeroInDate = !sqlMode.HasStrictMode() || sqlMode.HasAllowInvalidDatesMode() sessCtx.GetSessionVars().StmtCtx.NoZeroDate = sqlMode.HasStrictMode() + bfCtx, cancel := context.WithCancel(dc.ctx) + backfillCtxCancels = append(backfillCtxCancels, cancel) switch bfWorkerType { case typeAddIndexWorker: @@ -660,31 +682,45 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic if err != nil { return errors.Trace(err) } + if useCopRead && copReqReaders == nil { + indexInfo := model.FindIndexInfoByID(t.Meta().Indices, reorgInfo.currElement.ID) + copCtx := newCopContext(t.Meta(), indexInfo, sessCtx) + rTaskCh = make(chan *reorgBackfillTask, backfillTaskBatchSize) + copReqReaders = newCopReqReaders(dc.ctx, copCtx, job.ID, int(workerCnt), rTaskCh) + logutil.BgLogger().Info("[ddl] fetch index values with coprocessor", + zap.String("table", t.Meta().Name.O), + zap.String("index", indexInfo.Name.O)) + } + idxWorker.backfillWorker.copReqReaders = copReqReaders backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker) - go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job) + go idxWorker.backfillWorker.run(bfCtx, reorgInfo.d, idxWorker, job) case typeAddIndexMergeTmpWorker: tmpIdxWorker := newMergeTempIndexWorker(sessCtx, i, t, reorgInfo, jc) backfillWorkers = append(backfillWorkers, tmpIdxWorker.backfillWorker) - go tmpIdxWorker.backfillWorker.run(reorgInfo.d, tmpIdxWorker, job) + go tmpIdxWorker.backfillWorker.run(bfCtx, reorgInfo.d, tmpIdxWorker, job) case typeUpdateColumnWorker: // Setting InCreateOrAlterStmt tells the difference between SELECT casting and ALTER COLUMN casting. sessCtx.GetSessionVars().StmtCtx.InCreateOrAlterStmt = true updateWorker := newUpdateColumnWorker(sessCtx, i, t, decodeColMap, reorgInfo, jc) backfillWorkers = append(backfillWorkers, updateWorker.backfillWorker) - go updateWorker.backfillWorker.run(reorgInfo.d, updateWorker, job) + go updateWorker.backfillWorker.run(bfCtx, reorgInfo.d, updateWorker, job) case typeCleanUpIndexWorker: idxWorker := newCleanUpIndexWorker(sessCtx, i, t, decodeColMap, reorgInfo, jc) backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker) - go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job) + go idxWorker.backfillWorker.run(bfCtx, reorgInfo.d, idxWorker, job) default: return errors.New("unknow backfill type") } + lastBfWorker := backfillWorkers[i] + lastBfWorker.taskCh = wTaskCh + lastBfWorker.resultCh = resultCh } // Shrink the worker size. if len(backfillWorkers) > int(workerCnt) { - workers := backfillWorkers[workerCnt:] backfillWorkers = backfillWorkers[:workerCnt] - closeBackfillWorkers(workers) + for i := int(workerCnt); i < len(backfillWorkers); i++ { + backfillCtxCancels[i]() + } } failpoint.Inject("checkBackfillWorkerNum", func(val failpoint.Value) { @@ -723,12 +759,17 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic return errors.New(ingest.LitErrGetBackendFail) } } - remains, err := dc.handleRangeTasks(sessPool, t, backfillWorkers, reorgInfo, &totalAddedCount, kvRanges) + remains, err := dc.handleRangeTasks(sessPool, t, backfillTaskBatchSize, reorgInfo, &totalAddedCount, kvRanges, rTaskCh, wTaskCh, resultCh) if err != nil { return errors.Trace(err) } if len(remains) == 0 { + if rTaskCh != nil { + close(rTaskCh) + } + close(wTaskCh) + close(resultCh) break } startKey = remains[0].StartKey diff --git a/ddl/ddl_worker_util.go b/ddl/ddl_worker_util.go new file mode 100644 index 0000000000000..40dec3447de99 --- /dev/null +++ b/ddl/ddl_worker_util.go @@ -0,0 +1,155 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "bytes" + "context" + "fmt" + "math" + "strings" + "time" + + "github.com/pingcap/tidb/util/generic" + "github.com/pingcap/tidb/util/logutil" + minitrace "github.com/tikv/minitrace-go" + "github.com/tikv/minitrace-go/jaeger" + "go.uber.org/zap" + "golang.org/x/exp/slices" +) + +var timeDetails = generic.NewSyncMap[int64, *spanCtx](10) + +type spanCtx struct { + ctx context.Context + root minitrace.TraceHandle +} + +func injectSpan(jobID int64, event string) func() { + if sctx, ok := timeDetails.Load(jobID); ok { + hd := minitrace.StartSpan(sctx.ctx, event) + return func() { + hd.Finish() + } + } + return func() {} +} + +func initializeTrace(jobID int64) { + ctx, root := minitrace.StartRootSpan(context.Background(), + "add-index-worker", uint64(jobID), 0, nil) + timeDetails.Store(jobID, &spanCtx{ + ctx: ctx, + root: root, + }) +} + +func collectTrace(jobID int64) string { + if sctx, ok := timeDetails.Load(jobID); ok { + rootTrace, _ := sctx.root.Collect() + analyzed := analyzeTrace(rootTrace) + if len(rootTrace.Spans) < 1000 { + reportTrace(rootTrace) + } + timeDetails.Delete(jobID) + return analyzed + } + return "" +} + +const batchSize = 512 + +func reportTrace(rootTrace minitrace.Trace) { + buf := bytes.NewBuffer(make([]uint8, 0, 4096)) + for _, subTrace := range splitTraces(rootTrace) { + buf.Reset() + trace := jaeger.MiniSpansToJaegerTrace("add-index", subTrace) + err := jaeger.ThriftCompactEncode(buf, trace) + if err != nil { + logutil.BgLogger().Warn("cannot collectTrace", zap.Error(err)) + return + } + err = jaeger.Send(buf.Bytes(), "127.0.0.1:6831") + if err != nil { + logutil.BgLogger().Warn("cannot collectTrace", zap.Error(err)) + return + } + } +} + +func splitTraces(trace minitrace.Trace) []minitrace.Trace { + var traces []minitrace.Trace + for len(trace.Spans) > batchSize { + traces = append(traces, minitrace.Trace{ + TraceID: trace.TraceID, + Spans: trace.Spans[:batchSize], + }) + trace.Spans = trace.Spans[batchSize:] + } + traces = append(traces, minitrace.Trace{ + TraceID: trace.TraceID, + Spans: trace.Spans, + }) + return traces +} + +func analyzeTrace(trace minitrace.Trace) string { + groupByEvent := make(map[string][]*minitrace.Span, 16) + for i, span := range trace.Spans { + spans := groupByEvent[span.Event] + if len(spans) == 0 { + groupByEvent[span.Event] = []*minitrace.Span{&trace.Spans[i]} + } else { + groupByEvent[span.Event] = append(spans, &trace.Spans[i]) + } + } + orderedEvents := make([]string, 0, len(groupByEvent)) + for event := range groupByEvent { + orderedEvents = append(orderedEvents, event) + } + slices.Sort(orderedEvents) + var sb strings.Builder + sb.WriteString("{") + for i := 0; i < len(orderedEvents); i++ { + spans := groupByEvent[orderedEvents[i]] + sum := uint64(0) + min := uint64(math.MaxUint64) + max := uint64(0) + for _, span := range spans { + dur := span.DurationNs + sum += dur + if dur < min { + min = dur + } + if dur > max { + max = dur + } + } + sb.WriteString(orderedEvents[i]) + sb.WriteString(":") + if len(spans) < 20 { + sb.WriteString(fmt.Sprintf("%f", time.Duration(sum).Seconds())) + } else { + sb.WriteString(fmt.Sprintf(`{sum: %f, min: %f, max: %f, cnt: %d}`, + time.Duration(sum).Seconds(), time.Duration(min).Seconds(), + time.Duration(max).Seconds(), len(spans))) + } + if i != len(orderedEvents)-1 { + sb.WriteString(", ") + } + } + sb.WriteString("}") + return sb.String() +} diff --git a/ddl/index.go b/ddl/index.go index 914e5e8ff48a3..f7e00195e163b 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -600,11 +600,13 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo return ver, err } logutil.BgLogger().Info("[ddl] run add index job", zap.String("job", job.String()), zap.Reflect("indexInfo", indexInfo)) + initializeTrace(job.ID) } originalState := indexInfo.State switch indexInfo.State { case model.StateNone: // none -> delete only + defer injectSpan(job.ID, "none")() reorgTp := pickBackfillType(w, job) if reorgTp.NeedMergeProcess() { // Increase telemetryAddIndexIngestUsage @@ -620,6 +622,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo job.SchemaState = model.StateDeleteOnly case model.StateDeleteOnly: // delete only -> write only + defer injectSpan(job.ID, "delete-only")() indexInfo.State = model.StateWriteOnly _, err = checkPrimaryKeyNotNull(d, w, t, job, tblInfo, indexInfo) if err != nil { @@ -632,6 +635,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo job.SchemaState = model.StateWriteOnly case model.StateWriteOnly: // write only -> reorganization + defer injectSpan(job.ID, "write-only")() indexInfo.State = model.StateWriteReorganization _, err = checkPrimaryKeyNotNull(d, w, t, job, tblInfo, indexInfo) if err != nil { @@ -646,6 +650,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo job.SchemaState = model.StateWriteReorganization case model.StateWriteReorganization: // reorganization -> public + defer injectSpan(job.ID, "write-reorg")() tbl, err := getTable(d.store, schemaID, tblInfo) if err != nil { return ver, errors.Trace(err) @@ -676,10 +681,12 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo job.Args = []interface{}{indexInfo.ID, false /*if exists*/, getPartitionIDs(tbl.Meta())} // Finish this job. job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) + details := collectTrace(job.ID) + logutil.BgLogger().Info("[ddl] finish add index job", zap.String("job", job.String()), + zap.String("time details", details)) default: err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("index", tblInfo.State) } - return ver, errors.Trace(err) } @@ -778,6 +785,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo } switch indexInfo.BackfillState { case model.BackfillStateRunning: + defer injectSpan(job.ID, "write-reorg-backfill")() logutil.BgLogger().Info("[ddl] index backfill state running", zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O), zap.String("index", indexInfo.Name.O)) switch bfProcess { @@ -805,6 +813,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo if !done { return false, ver, nil } + defer injectSpan(job.ID, "write-reorg-lit-import")() err = bc.FinishImport(indexInfo.ID, indexInfo.Unique, tbl) if err != nil { if kv.ErrKeyExists.Equal(err) { @@ -828,6 +837,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) return false, ver, errors.Trace(err) case model.BackfillStateReadyToMerge: + defer injectSpan(job.ID, "write-reorg-ready-merge")() logutil.BgLogger().Info("[ddl] index backfill state ready to merge", zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O), zap.String("index", indexInfo.Name.O)) indexInfo.BackfillState = model.BackfillStateMerging @@ -838,6 +848,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) return false, ver, errors.Trace(err) case model.BackfillStateMerging: + defer injectSpan(job.ID, "write-reorg-merging")() done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, true) if !done { return false, ver, err @@ -1161,10 +1172,11 @@ type baseIndexWorker struct { metricCounter prometheus.Counter // The following attributes are used to reduce memory allocation. - defaultVals []types.Datum - idxRecords []*indexRecord - rowMap map[int64]types.Datum - rowDecoder *decoder.RowDecoder + defaultVals []types.Datum + idxRecords []*indexRecord + idxKVRecords []idxKV + rowMap map[int64]types.Datum + rowDecoder *decoder.RowDecoder sqlMode mysql.SQLMode jobContext *JobContext @@ -1270,7 +1282,7 @@ func (w *baseIndexWorker) getIndexRecord(idxInfo *model.IndexInfo, handle kv.Han idxVal[j] = idxColumnVal } - rsData := tables.TryGetHandleRestoredDataWrapper(w.table, nil, w.rowMap, idxInfo) + rsData := tables.TryGetHandleRestoredDataWrapper(w.table.Meta(), nil, w.rowMap, idxInfo) idxRecord := &indexRecord{handle: handle, key: recordKey, vals: idxVal, rsData: rsData} return idxRecord, nil } @@ -1474,7 +1486,8 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC oprStartTime := time.Now() ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType()) - errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { + defer injectSpan(w.reorgInfo.Job.ID, fmt.Sprintf("%s-%d", "fetch-create-txn", w.id))() + errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) (err error) { taskCtx.addedCount = 0 taskCtx.scanCount = 0 txn.SetOption(kv.Priority, w.priority) @@ -1482,73 +1495,124 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC txn.SetOption(kv.ResourceGroupTagger, tagger) } - idxRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange) + var ( + idxRecords []*indexRecord + idxKVs []idxKV + nextKey kv.Key + taskDone bool + ) + finish := injectSpan(w.reorgInfo.Job.ID, fmt.Sprintf("%s-%d", "fetch-rows", w.id)) + var pushDownKVEncoding bool + if w.copReqReaders != nil { + pushDownKVEncoding = w.copReqReaders.kvResultsCh != nil + r := w.copReqReaders + if !pushDownKVEncoding { + w.idxRecords = w.idxRecords[:0] + w.idxRecords, nextKey, taskDone, err = fetchRowColValsFromCop(w, r.resultsCh, w.idxRecords, handleRange) + idxRecords = w.idxRecords + } else { + w.idxKVRecords = w.idxKVRecords[:0] + w.idxKVRecords, nextKey, taskDone, err = fetchRowColValsFromCop(w, r.kvResultsCh, w.idxKVRecords, handleRange) + idxKVs = w.idxKVRecords + } + } else { + idxRecords, nextKey, taskDone, err = w.fetchRowColVals(txn, handleRange) + } + finish() if err != nil { return errors.Trace(err) } taskCtx.nextKey = nextKey taskCtx.done = taskDone + if !pushDownKVEncoding { + err = w.batchCheckUniqueKey(txn, idxRecords) + if err != nil { + return errors.Trace(err) + } + return w.createIndexRecords(idxRecords, &taskCtx, needMergeTmpIdx, txn) + } else { + return w.createIndexKVs(idxKVs, &taskCtx, txn) + } + }) + logSlowOperations(time.Since(oprStartTime), "AddIndexBackfillDataInTxn", 3000) - err = w.batchCheckUniqueKey(txn, idxRecords) - if err != nil { - return errors.Trace(err) + return +} + +func (w *addIndexWorker) createIndexRecords(idxRecords []*indexRecord, taskCtx *backfillTaskContext, needMergeTmpIdx bool, txn kv.Transaction) error { + defer injectSpan(w.reorgInfo.Job.ID, fmt.Sprintf("%s-%d", "create-records", w.id))() + for _, rec := range idxRecords { + taskCtx.scanCount++ + // The index is already exists, we skip it, no needs to backfill it. + // The following update, delete, insert on these rows, TiDB can handle it correctly. + if rec.skip { + continue } - for _, idxRecord := range idxRecords { - taskCtx.scanCount++ - // The index is already exists, we skip it, no needs to backfill it. - // The following update, delete, insert on these rows, TiDB can handle it correctly. - if idxRecord.skip { - continue + // When the backfill-merge process is used, the writes from DML are redirected to a temp index. + // The write-conflict will be handled by the merge worker. Thus, the locks are unnecessary. + if !needMergeTmpIdx && rec.key != nil { + // We need to add this lock to make sure pessimistic transaction can realize this operation. + // For the normal pessimistic transaction, it's ok. But if async commit is used, it may lead to inconsistent data and index. + err := txn.LockKeys(context.Background(), new(kv.LockCtx), rec.key) + if err != nil { + return errors.Trace(err) } + } - // When the backfill-merge process is used, the writes from DML are redirected to a temp index. - // The write-conflict will be handled by the merge worker. Thus, the locks are unnecessary. - if !needMergeTmpIdx { - // We need to add this lock to make sure pessimistic transaction can realize this operation. - // For the normal pessimistic transaction, it's ok. But if async commit is used, it may lead to inconsistent data and index. - err := txn.LockKeys(context.Background(), new(kv.LockCtx), idxRecord.key) - if err != nil { - return errors.Trace(err) + // Create the index. + if w.writerCtx == nil { + handle, err := w.index.Create(w.sessCtx, txn, rec.vals, rec.handle, rec.rsData, table.WithIgnoreAssertion, table.FromBackfill) + if err != nil { + if kv.ErrKeyExists.Equal(err) && rec.handle.Equal(handle) { + // Index already exists, skip it. + continue } + return errors.Trace(err) } + } else { // The lightning environment is ready. + vars := w.sessCtx.GetSessionVars() + sCtx, writeBufs := vars.StmtCtx, vars.GetWriteStmtBufs() + key, distinct, err := w.index.GenIndexKey(sCtx, rec.vals, rec.handle, writeBufs.IndexKeyBuf) + if err != nil { + return errors.Trace(err) + } + idxVal, err := w.index.GenIndexValue(sCtx, distinct, rec.vals, rec.handle, rec.rsData) + if err != nil { + return errors.Trace(err) + } + err = w.writerCtx.WriteRow(key, idxVal) + if err != nil { + return errors.Trace(err) + } + writeBufs.IndexKeyBuf = key + } + taskCtx.addedCount++ + } + return nil +} - // Create the index. - if w.writerCtx == nil { - handle, err := w.index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle, idxRecord.rsData, table.WithIgnoreAssertion, table.FromBackfill) - if err != nil { - if kv.ErrKeyExists.Equal(err) && idxRecord.handle.Equal(handle) { - // Index already exists, skip it. - continue - } - - return errors.Trace(err) - } - } else { // The lightning environment is ready. - vars := w.sessCtx.GetSessionVars() - sCtx, writeBufs := vars.StmtCtx, vars.GetWriteStmtBufs() - key, distinct, err := w.index.GenIndexKey(sCtx, idxRecord.vals, idxRecord.handle, writeBufs.IndexKeyBuf) - if err != nil { - return errors.Trace(err) - } - idxVal, err := w.index.GenIndexValue(sCtx, distinct, idxRecord.vals, idxRecord.handle, idxRecord.rsData) - if err != nil { - return errors.Trace(err) - } - err = w.writerCtx.WriteRow(key, idxVal) +func (w *addIndexWorker) createIndexKVs(idxKVs []idxKV, taskCtx *backfillTaskContext, txn kv.Transaction) error { + defer injectSpan(w.reorgInfo.Job.ID, fmt.Sprintf("%s-%d", "create-records", w.id))() + for _, idxKV := range idxKVs { + taskCtx.scanCount++ + // Create the index. + if w.writerCtx == nil { + err := txn.GetMemBuffer().Set(idxKV.key, idxKV.val) + if err != nil { + return errors.Trace(err) + } + } else { // The lightning environment is ready. + if len(idxKVs) != 0 { + err := w.writerCtx.WriteRow(idxKV.key, idxKV.val) if err != nil { return errors.Trace(err) } - writeBufs.IndexKeyBuf = key } - taskCtx.addedCount++ } - - return nil - }) - logSlowOperations(time.Since(oprStartTime), "AddIndexBackfillDataInTxn", 3000) - - return + taskCtx.addedCount++ + } + return nil } func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { diff --git a/ddl/index_distsql.go b/ddl/index_distsql.go new file mode 100644 index 0000000000000..f29f9d597b1a5 --- /dev/null +++ b/ddl/index_distsql.go @@ -0,0 +1,421 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/distsql" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/generic" + "github.com/pingcap/tidb/util/timeutil" + "github.com/pingcap/tipb/go-tipb" +) + +type copContext struct { + tblInfo *model.TableInfo + idxInfo *model.IndexInfo + colInfos []*model.ColumnInfo + fieldTps []*types.FieldType + sessCtx sessionctx.Context + pushDownEncoding bool +} + +type copReqReaders struct { + tasksCh chan *reorgBackfillTask + resultsCh chan *indexRecord + kvResultsCh chan idxKV + results generic.SyncMap[string, error] +} + +type copReqReader struct { + id int + traceID int64 + copCtx *copContext + srcChunk *chunk.Chunk + + idxRecordChan chan *indexRecord + idxKVChan chan idxKV + results *generic.SyncMap[string, error] +} + +func (c *copReqReader) run(ctx context.Context, wg *sync.WaitGroup, tasks chan *reorgBackfillTask) { + for { + select { + case task, ok := <-tasks: + if !ok { + wg.Done() + return + } + finish := injectSpan(c.traceID, fmt.Sprintf("cop-read-%d", c.id)) + err := kv.RunInNewTxn(ctx, c.copCtx.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { + if c.copCtx.pushDownEncoding { + return c.copCtx.sendEncodedIdxRecords(ctx, c.idxKVChan, txn.StartTS(), + task.startKey, task.excludedEndKey(), c.traceID, c.id) + } else { + return c.copCtx.sendIdxRecords(ctx, c.idxRecordChan, c.srcChunk, txn.StartTS(), + task.startKey, task.excludedEndKey(), c.traceID, c.id) + } + }) + finish() + c.results.Store(string(task.endKey), err) + } + } +} + +func newCopReqReaders(ctx context.Context, copCtx *copContext, jobID int64, readerCnt int, tasks chan *reorgBackfillTask) *copReqReaders { + p := &copReqReaders{ + tasksCh: tasks, + results: generic.NewSyncMap[string, error](readerCnt), + } + if copCtx.pushDownEncoding { + p.kvResultsCh = make(chan idxKV, int(variable.MaxDDLReorgBatchSize)) + } else { + p.resultsCh = make(chan *indexRecord, int(variable.MaxDDLReorgBatchSize)) + } + wg := &sync.WaitGroup{} + for i := 0; i < readerCnt; i++ { + wg.Add(1) + r := &copReqReader{ + id: i, + traceID: jobID, + copCtx: copCtx, + idxRecordChan: p.resultsCh, + idxKVChan: p.kvResultsCh, + srcChunk: chunk.NewChunkWithCapacity(copCtx.fieldTps, 1024), + results: &p.results, + } + go r.run(ctx, wg, tasks) + } + go func() { + wg.Wait() + if copCtx.pushDownEncoding { + close(p.kvResultsCh) + } else { + close(p.resultsCh) + } + }() + return p +} + +func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx sessionctx.Context) *copContext { + colInfos := make([]*model.ColumnInfo, 0, len(idxInfo.Columns)) + fieldTps := make([]*types.FieldType, 0, len(idxInfo.Columns)) + for _, idxCol := range idxInfo.Columns { + c := tblInfo.Columns[idxCol.Offset] + colInfos = append(colInfos, c) + fieldTps = append(fieldTps, &c.FieldType) + } + + pkColInfos, pkFieldTps := buildHandleColInfoAndFieldTypes(tblInfo) + colInfos = append(colInfos, pkColInfos...) + fieldTps = append(fieldTps, pkFieldTps...) + + copCtx := &copContext{ + tblInfo: tblInfo, + idxInfo: idxInfo, + colInfos: colInfos, + fieldTps: fieldTps, + sessCtx: sessCtx, + pushDownEncoding: variable.EnableCoprRead.Load() == "2", + } + return copCtx +} + +func (c *copContext) buildTableScan(ctx context.Context, startTS uint64, start, end kv.Key) (distsql.SelectResult, error) { + dagPB, err := buildDAGPB(c.sessCtx, c.tblInfo, c.colInfos) + if err != nil { + return nil, err + } + + var builder distsql.RequestBuilder + kvReq, err := builder. + SetDAGRequest(dagPB). + SetStartTS(startTS). + SetKeyRanges([]kv.KeyRange{{StartKey: start, EndKey: end}}). + SetKeepOrder(true). + SetFromSessionVars(c.sessCtx.GetSessionVars()). + SetFromInfoSchema(c.sessCtx.GetDomainInfoSchema()). + Build() + if err != nil { + return nil, err + } + + kvReq.Concurrency = 1 + return distsql.Select(ctx, c.sessCtx, kvReq, c.fieldTps, statistics.NewQueryFeedback(0, nil, 0, false)) +} + +func buildDDLPB(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, colInfos []*model.ColumnInfo) (*tipb.DDLRequest, error) { + ddlReq := &tipb.DDLRequest{} + ddlReq.TableInfo = new(tipb.TableInfo) + ddlReq.IndexInfo = new(tipb.IndexInfo) + ddlReq.TableInfo.TableId = tblInfo.ID + ddlReq.TableInfo.Columns = util.ColumnsToProto(colInfos, tblInfo.PKIsHandle) + ddlReq.IndexInfo.TableId = tblInfo.ID + ddlReq.IndexInfo.IndexId = idxInfo.ID + indexColInfos := make([]*model.ColumnInfo, 0, len(idxInfo.Columns)) + for _, idxCol := range idxInfo.Columns { + indexColInfos = append(indexColInfos, tblInfo.Cols()[idxCol.Offset]) + } + ddlReq.IndexInfo.Columns = util.ColumnsToProto(indexColInfos, tblInfo.PKIsHandle) + ddlReq.Columns = ddlReq.TableInfo.Columns + ddlReq.IndexInfo.Unique = idxInfo.Unique + + return ddlReq, nil +} + +func buildDAGPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.DAGRequest, error) { + dagReq := &tipb.DAGRequest{} + dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(sCtx.GetSessionVars().Location()) + sc := sCtx.GetSessionVars().StmtCtx + dagReq.Flags = sc.PushDownFlags() + for i := range colInfos { + dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i)) + } + execPB, err := constructTableScanPB(sCtx, tblInfo, colInfos) + if err != nil { + return nil, err + } + dagReq.Executors = append(dagReq.Executors, execPB) + distsql.SetEncodeType(sCtx, dagReq) + return dagReq, nil +} + +func constructTableScanPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.Executor, error) { + tblScan := tables.BuildTableScanFromInfos(tblInfo, colInfos) + tblScan.TableId = tblInfo.ID + err := setPBColumnsDefaultValue(sCtx, tblScan.Columns, colInfos) + return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan}, err +} + +func (c *copContext) buildScanIndexKV(ctx context.Context, startTS uint64, start, end kv.Key) (kv.Response, error) { + ddlPB, err := buildDDLPB(c.tblInfo, c.idxInfo, c.colInfos) + if err != nil { + return nil, err + } + + ddlPB.Ranges = append(ddlPB.Ranges, tipb.KeyRange{Low: start, High: end}) + + var builder distsql.RequestBuilder + kvReq, err := builder. + SetDDLRequest(ddlPB). + SetStartTS(startTS). + SetKeyRanges([]kv.KeyRange{{StartKey: start, EndKey: end}}). + SetKeepOrder(true). + SetFromSessionVars(c.sessCtx.GetSessionVars()). + SetFromInfoSchema(c.sessCtx.GetDomainInfoSchema()). + Build() + if err != nil { + return nil, err + } + + kvReq.Concurrency = 1 + option := &kv.ClientSendOption{ + SessionMemTracker: c.sessCtx.GetSessionVars().StmtCtx.MemTracker, + } + + resp := c.sessCtx.GetClient().Send(ctx, kvReq, c.sessCtx.GetSessionVars().KVVars, option) + if resp == nil { + return nil, errors.New("client returns nil response") + } + return resp, nil +} + +func (c *copContext) sendIdxRecords(ctx context.Context, ch chan *indexRecord, srcChk *chunk.Chunk, + startTS uint64, start, end kv.Key, traceID int64, wid int) error { + sctx := c.sessCtx.GetSessionVars().StmtCtx + srcResult, err := c.buildTableScan(ctx, startTS, start, end) + if err != nil { + return errors.Trace(err) + } + for { + finish := injectSpan(traceID, fmt.Sprintf("cop-req-%d", wid)) + err := srcResult.Next(ctx, srcChk) + if err != nil { + finish() + return errors.Trace(err) + } + if srcChk.NumRows() == 0 { + finish() + return nil + } + iter := chunk.NewIterator4Chunk(srcChk) + finish() + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + idxDt, hdDt := extractIdxValsAndHandle(row, c.idxInfo, c.fieldTps) + handle, err := buildHandle(hdDt, c.tblInfo, c.idxInfo, sctx) + if err != nil { + return errors.Trace(err) + } + rsData := tables.TryGetHandleRestoredDataWrapper(c.tblInfo, hdDt, nil, c.idxInfo) + ch <- &indexRecord{handle: handle, key: nil, vals: idxDt, rsData: rsData, skip: false} + } + } +} + +type idxKV struct { + key kv.Key + val []byte +} + +func (c *copContext) sendEncodedIdxRecords(ctx context.Context, ch chan idxKV, startTS uint64, + start, end kv.Key, traceID int64, wid int) error { + resp, err := c.buildScanIndexKV(ctx, startTS, start, end) + if err != nil { + return errors.Trace(err) + } + colResp := &tipb.DDLResponse{} + for { + finish := injectSpan(traceID, fmt.Sprintf("cop-req-%d", wid)) + data, err := resp.Next(ctx) + if err != nil { + finish() + return errors.Trace(err) + } + if data == nil { + finish() + return nil + } + colResp.Reset() + colResp.Keys = make([][]byte, 0, 2*1024*1024) + colResp.Values = make([][]byte, 0, 2*1024*1024) + if err = colResp.Unmarshal(data.GetData()); err != nil { + finish() + return errors.Trace(err) + } + finish() + for i := 0; i < len(colResp.Keys); i++ { + ch <- idxKV{key: colResp.Keys[i], val: colResp.Values[i]} + } + } +} + +func fetchRowColValsFromCop[V *indexRecord | idxKV](w *addIndexWorker, ch chan V, buf []V, + handleRange reorgBackfillTask) ([]V, kv.Key, bool, error) { + taskDone := false + var err error + curRangeKey := string(handleRange.endKey) + timer := time.NewTimer(200 * time.Millisecond) + for { + select { + case record, more := <-ch: + if !more { + taskDone = true + break + } + buf = append(buf, record) + case <-timer.C: + err, taskDone = w.copReqReaders.results.Load(curRangeKey) + } + if taskDone { + w.copReqReaders.results.Delete(curRangeKey) + break + } + if len(buf) >= w.batchCnt { + break + } + } + timer.Stop() + return buf, handleRange.startKey, taskDone, err +} + +func buildHandleColInfoAndFieldTypes(tbInfo *model.TableInfo) ([]*model.ColumnInfo, []*types.FieldType) { + if tbInfo.PKIsHandle { + for i := range tbInfo.Columns { + if mysql.HasPriKeyFlag(tbInfo.Columns[i].GetFlag()) { + return []*model.ColumnInfo{tbInfo.Columns[i]}, []*types.FieldType{&tbInfo.Columns[i].FieldType} + } + } + } else if tbInfo.IsCommonHandle { + primaryIdx := tables.FindPrimaryIndex(tbInfo) + pkCols := make([]*model.ColumnInfo, 0, len(primaryIdx.Columns)) + pkFts := make([]*types.FieldType, 0, len(primaryIdx.Columns)) + for i := range tbInfo.Columns { + pkCols = append(pkCols, tbInfo.Columns[i]) + pkFts = append(pkFts, &tbInfo.Columns[i].FieldType) + } + return pkCols, pkFts + } + extra := model.NewExtraHandleColInfo() + return []*model.ColumnInfo{extra}, []*types.FieldType{&extra.FieldType} +} + +func extractIdxValsAndHandle(row chunk.Row, idxInfo *model.IndexInfo, fieldTps []*types.FieldType) ([]types.Datum, []types.Datum) { + datumBuf := make([]types.Datum, 0, len(fieldTps)) + idxColLen := len(idxInfo.Columns) + for i, ft := range fieldTps { + datumBuf = append(datumBuf, row.GetDatum(i, ft)) + } + return datumBuf[:idxColLen], datumBuf[idxColLen:] +} + +func buildHandle(pkDts []types.Datum, tblInfo *model.TableInfo, + idxInfo *model.IndexInfo, stmtCtx *stmtctx.StatementContext) (kv.Handle, error) { + if tblInfo.IsCommonHandle { + tablecodec.TruncateIndexValues(tblInfo, idxInfo, pkDts) + handleBytes, err := codec.EncodeKey(stmtCtx, nil, pkDts...) + if err != nil { + return nil, err + } + return kv.NewCommonHandle(handleBytes) + } + return kv.IntHandle(pkDts[0].GetInt64()), nil +} + +// setPBColumnsDefaultValue sets the default values of tipb.ColumnInfos. +func setPBColumnsDefaultValue(ctx sessionctx.Context, pbColumns []*tipb.ColumnInfo, columns []*model.ColumnInfo) error { + for i, c := range columns { + // For virtual columns, we set their default values to NULL so that TiKV will return NULL properly, + // They real values will be compute later. + if c.IsGenerated() && !c.GeneratedStored { + pbColumns[i].DefaultVal = []byte{codec.NilFlag} + } + if c.GetOriginDefaultValue() == nil { + continue + } + + sessVars := ctx.GetSessionVars() + originStrict := sessVars.StrictSQLMode + sessVars.StrictSQLMode = false + d, err := table.GetColOriginDefaultValue(ctx, c) + sessVars.StrictSQLMode = originStrict + if err != nil { + return err + } + + pbColumns[i].DefaultVal, err = tablecodec.EncodeValue(sessVars.StmtCtx, nil, d) + if err != nil { + return err + } + } + return nil +} diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index 24522a9548608..4089db162ac4b 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -41,6 +41,7 @@ func TestAddIndexMergeProcess(t *testing.T) { // Force onCreateIndex use the txn-merge process. ingest.LitInitialized = false tk.MustExec("set @@global.tidb_ddl_enable_fast_reorg = 1;") + tk.MustExec("set global tidb_ddl_enable_copr_read = 1;") var checkErr error var runDML, backfillDone bool diff --git a/ddl/ingest/engine.go b/ddl/ingest/engine.go index d875d78e346d0..f4a028bad7698 100644 --- a/ddl/ingest/engine.go +++ b/ddl/ingest/engine.go @@ -184,9 +184,9 @@ func (ei *engineInfo) newWriterContext(workerID int) (*WriterContext, error) { // WriteRow Write one row into local writer buffer. func (wCtx *WriterContext) WriteRow(key, idxVal []byte) error { - kvs := make([]common.KvPair, 1) + var kvs [1]common.KvPair kvs[0].Key = key kvs[0].Val = idxVal - row := kv.MakeRowsFromKvPairs(kvs) + row := kv.MakeRowsFromKvPairs(kvs[:]) return wCtx.lWrite.WriteRows(wCtx.ctx, nil, row) } diff --git a/distsql/request_builder.go b/distsql/request_builder.go index aae83a0dd0053..fa5bb73768731 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -181,6 +181,17 @@ func (builder *RequestBuilder) SetChecksumRequest(checksum *tipb.ChecksumRequest return builder } +// SetDDLRequest sets the request type to "ReqTypeDDL" and construct request data. +func (builder *RequestBuilder) SetDDLRequest(ddl *tipb.DDLRequest) *RequestBuilder { + if builder.err == nil { + builder.Request.Tp = kv.ReqTypeDDL + builder.Request.Data, builder.err = ddl.Marshal() + builder.Request.NotFillCache = true + } + + return builder +} + // SetKeyRanges sets "KeyRanges" for "kv.Request". func (builder *RequestBuilder) SetKeyRanges(keyRanges []kv.KeyRange) *RequestBuilder { builder.Request.KeyRanges = keyRanges diff --git a/executor/admin.go b/executor/admin.go index ba219b70b6db3..b69ef74b97f2a 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -380,7 +380,7 @@ func (e *RecoverIndexExec) fetchRecoverRows(ctx context.Context, srcResult dists } idxVals := extractIdxVals(row, e.idxValsBufs[result.scanRowCount], e.colFieldTypes, idxValLen) e.idxValsBufs[result.scanRowCount] = idxVals - rsData := tables.TryGetHandleRestoredDataWrapper(e.table, plannercore.GetCommonHandleDatum(e.handleCols, row), nil, e.index.Meta()) + rsData := tables.TryGetHandleRestoredDataWrapper(e.table.Meta(), plannercore.GetCommonHandleDatum(e.handleCols, row), nil, e.index.Meta()) e.recoverRows = append(e.recoverRows, recoverRows{handle: handle, idxVals: idxVals, rsData: rsData, skip: false}) result.scanRowCount++ result.currentHandle = handle diff --git a/executor/kvtest/BUILD.bazel b/executor/kvtest/BUILD.bazel index 457d5e0d3dcf1..c746c6013029f 100644 --- a/executor/kvtest/BUILD.bazel +++ b/executor/kvtest/BUILD.bazel @@ -7,8 +7,8 @@ go_test( "kv_test.go", "main_test.go", ], - race = "on", flaky = True, + race = "on", deps = [ "//config", "//meta/autoid", diff --git a/go.mod b/go.mod index 88b19ed5e8771..43f0cbfd41a9b 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/charithe/durationcheck v0.0.9 github.com/cheggaaa/pb/v3 v3.0.8 github.com/cheynewallace/tabby v1.1.1 + github.com/cockroachdb/errors v1.8.1 github.com/cockroachdb/pebble v0.0.0-20210719141320-8c3bd06debb5 github.com/coocood/freecache v1.2.1 github.com/coreos/go-semver v0.3.0 @@ -58,7 +59,7 @@ require ( github.com/mgechev/revive v1.2.4-0.20220827111817-553604eaced5 github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 github.com/nishanths/predeclared v0.2.2 - github.com/opentracing/basictracer-go v1.0.0 + github.com/opentracing/basictracer-go v1.1.0 github.com/opentracing/opentracing-go v1.2.0 github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 github.com/pingcap/badger v1.5.1-0.20220314162537-ab58fbf40580 @@ -84,6 +85,7 @@ require ( github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 github.com/tikv/client-go/v2 v2.0.1-0.20220913051514-ffaaf7131a8d + github.com/tikv/minitrace-go v0.0.0-20220923091513-8e6316bb4097 github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 github.com/twmb/murmur3 v1.1.3 @@ -135,7 +137,6 @@ require ( github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/chavacava/garif v0.0.0-20220630083739-93517212f375 // indirect - github.com/cockroachdb/errors v1.8.1 // indirect github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f // indirect github.com/cockroachdb/redact v1.0.8 // indirect github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2 // indirect @@ -243,3 +244,5 @@ replace ( github.com/pingcap/tidb/parser => ./parser go.opencensus.io => go.opencensus.io v0.23.1-0.20220331163232-052120675fac ) + +replace github.com/pingcap/tipb => github.com/wjhuang2016/tipb v0.0.0-20221008063631-6d62ac9c19a2 diff --git a/go.sum b/go.sum index 7cbe99ed6235e..76e55d8631960 100644 --- a/go.sum +++ b/go.sum @@ -711,8 +711,9 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/opentracing-contrib/go-stdlib v0.0.0-20170113013457-1de4cc2120e7/go.mod h1:PLldrQSroqzH70Xl+1DQcGnefIbqsKR7UDaiux3zV+w= -github.com/opentracing/basictracer-go v1.0.0 h1:YyUAhaEfjoWXclZVJ9sGoNct7j4TVk7lZWlQw5UXuoo= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= +github.com/opentracing/basictracer-go v1.1.0 h1:Oa1fTSBvAl8pa3U+IJYqrKm0NALwH9OsgwOqDv4xJW0= +github.com/opentracing/basictracer-go v1.1.0/go.mod h1:V2HZueSJEp879yv285Aap1BS69fQMD+MNP1mRs6mBQc= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= @@ -730,6 +731,7 @@ github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea/go.mod h1:1VcHEd3 github.com/petermattis/goid v0.0.0-20170504144140-0ded85884ba5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 h1:JhzVVoYvbOACxoUmOs6V/G4D5nPVUW73rKvXxP4XUJc= github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE= +github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -762,8 +764,6 @@ github.com/pingcap/log v1.1.0 h1:ELiPxACz7vdo1qAvvaWJg1NrYFoY6gqAh/+Uo6aXdD8= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 h1:HYbcxtnkN3s5tqrZ/z3eJS4j3Db8wMphEm1q10lY/TM= github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4/go.mod h1:sDCsM39cGiv2vwunZkaFA917vVkqDTGSPbbV7z4Oops= -github.com/pingcap/tipb v0.0.0-20220824081009-0714a57aff1d h1:kWYridgsn8xSKYJ2EkXp7uj5HwJnG5snpY3XP8oYmPU= -github.com/pingcap/tipb v0.0.0-20220824081009-0714a57aff1d/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 h1:49lOXmGaUpV9Fz3gd7TFZY106KVlPVa5jcYD1gaQf98= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -850,6 +850,7 @@ github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0/go.mod h1:919Lwc github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/shurcooL/vfsgen v0.0.0-20180711163814-62bca832be04 h1:y0cMJ0qjii33BnD6tMGcF/+gHYsoKQ6tbwQpy233OII= github.com/shurcooL/vfsgen v0.0.0-20180711163814-62bca832be04/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= +github.com/silentred/gid v1.0.0/go.mod h1:DMQPn66uY+3ed7rWfzOVET7VbDBAhjz+6AmmlixUK08= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= @@ -903,10 +904,13 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tikv/client-go/v2 v2.0.1-0.20220913051514-ffaaf7131a8d h1:p8XInTnkUlLabBT7bDS3aZCeemO6tJ/7b5mHN8WbSIE= github.com/tikv/client-go/v2 v2.0.1-0.20220913051514-ffaaf7131a8d/go.mod h1:6pedLz7wiINLHXwCT1+yMZmzuG42+ubtBkkfcwoukIo= +github.com/tikv/minitrace-go v0.0.0-20220923091513-8e6316bb4097 h1:nvIrUVo5YJZMsCn6yTxrpgrokIo/wug5N/nL5mc7v50= +github.com/tikv/minitrace-go v0.0.0-20220923091513-8e6316bb4097/go.mod h1:ukJr0BfYeYbO3n15LAV2Dp4jvFpIPF2g14NU227ZTLY= github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db h1:r1eMh9Rny3hfWuBuxOnbsCRrR4FhthiNxLQ5rAUtaww= github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db/go.mod h1:ew8kS0yIcEaSetuuywkTLIUBR+sz3J5XvAYRae11qwc= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk= +github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw= github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk= @@ -916,6 +920,7 @@ github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hM github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 h1:uruHq4dN7GR16kFc5fp3d1RIYzJW5onx8Ybykw2YQFA= github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q= github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA= github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM= @@ -931,6 +936,8 @@ github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPU github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f h1:9DDCDwOyEy/gId+IEMrFHLuQ5R/WV0KNxWLler8X2OY= github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f/go.mod h1:8sdOQnirw1PrcnTJYkmW1iOHtUmblMmGdUOHyWYycLI= +github.com/wjhuang2016/tipb v0.0.0-20221008063631-6d62ac9c19a2 h1:13LEN/7sdwcoTRDlBxqPUixvUqPrzHzn3J0EaasrpXg= +github.com/wjhuang2016/tipb v0.0.0-20221008063631-6d62ac9c19a2/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/xdg/scram v1.0.3/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= @@ -1126,6 +1133,7 @@ golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20200421231249-e086a090c8fd/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= @@ -1358,6 +1366,7 @@ golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE= +golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201125231158-b5590deeca9b/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= diff --git a/kv/kv.go b/kv/kv.go index d39dc1ee5c862..0d312605aaf61 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -295,6 +295,7 @@ const ( ReqTypeDAG = 103 ReqTypeAnalyze = 104 ReqTypeChecksum = 105 + ReqTypeDDL = 106 ReqSubTypeBasic = 0 ReqSubTypeDesc = 10000 diff --git a/session/schema_amender.go b/session/schema_amender.go index 955d30cc42ada..caaec7994ae3a 100644 --- a/session/schema_amender.go +++ b/session/schema_amender.go @@ -447,7 +447,7 @@ func (a *amendOperationAddIndexInfo) genIndexKeyValue(ctx context.Context, sctx idxVals = append(idxVals, chk.GetRow(0).GetDatum(oldCol.Offset, &oldCol.FieldType)) } - rsData := tables.TryGetHandleRestoredDataWrapper(a.tblInfoAtCommit, getCommonHandleDatum(a.tblInfoAtCommit, chk.GetRow(0)), nil, a.indexInfoAtCommit.Meta()) + rsData := tables.TryGetHandleRestoredDataWrapper(a.tblInfoAtCommit.Meta(), getCommonHandleDatum(a.tblInfoAtCommit, chk.GetRow(0)), nil, a.indexInfoAtCommit.Meta()) // Generate index key buf. newIdxKey, distinct, err := tablecodec.GenIndexKey(sctx.GetSessionVars().StmtCtx, diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 37fd015fce28f..d844bf6f5d9a6 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1826,6 +1826,12 @@ var defaultSysVars = []*SysVar{ EnableFastReorg.Store(TiDBOptOn(val)) return nil }}, + {Scope: ScopeGlobal, Name: TiDBDDLEnableCoprRead, Value: DefTiDBDDLEnableCoprRead, PossibleValues: []string{"0", "1", "2"}, Type: TypeEnum, GetGlobal: func(sv *SessionVars) (string, error) { + return EnableCoprRead.Load(), nil + }, SetGlobal: func(s *SessionVars, val string) error { + EnableCoprRead.Store(val) + return nil + }}, // This system var is set disk quota for lightning sort dir, from 100 GB to 1PB. {Scope: ScopeGlobal, Name: TiDBDDLDiskQuota, Value: strconv.Itoa(DefTiDBDDLDiskQuota), Type: TypeInt, MinValue: DefTiDBDDLDiskQuota, MaxValue: 1024 * 1024 * DefTiDBDDLDiskQuota / 100, GetGlobal: func(sv *SessionVars) (string, error) { return strconv.FormatUint(DDLDiskQuota.Load(), 10), nil diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 06da8d8c53ce8..a11e60b727609 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -837,6 +837,8 @@ const ( TiDBDDLEnableFastReorg = "tidb_ddl_enable_fast_reorg" // TiDBDDLDiskQuota used to set disk quota for lightning add index. TiDBDDLDiskQuota = "tidb_ddl_disk_quota" + // TiDBDDLEnableCoprRead is used to control whether to read with coprocessor for adding index. + TiDBDDLEnableCoprRead = "tidb_ddl_enable_copr_read" ) // TiDB intentional limits @@ -1058,6 +1060,7 @@ const ( DefTiDBEnableMDL = false DefTiFlashFastScan = false DefTiDBEnableFastReorg = false + DefTiDBDDLEnableCoprRead = "0" DefTiDBDDLDiskQuota = 100 * 1024 * 1024 * 1024 // 100GB DefExecutorConcurrency = 5 DefTiDBEnableGeneralPlanCache = false @@ -1118,6 +1121,8 @@ var ( EnableMDL = atomic.NewBool(DefTiDBEnableMDL) // EnableFastReorg indicates whether to use lightning to enhance DDL reorg performance. EnableFastReorg = atomic.NewBool(DefTiDBEnableFastReorg) + // EnableCoprRead indicates whether to read with coprocessor for adding index. + EnableCoprRead = atomic.NewString(DefTiDBDDLEnableCoprRead) // DDLDiskQuota is the temporary variable for set disk quota for lightning DDLDiskQuota = atomic.NewUint64(DefTiDBDDLDiskQuota) // EnableForeignKey indicates whether to enable foreign key feature. diff --git a/table/tables/tables.go b/table/tables/tables.go index 05724a67fa311..e9b66bf445436 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -977,7 +977,7 @@ func (t *TableCommon) addIndices(sctx sessionctx.Context, recordID kv.Handle, r idxMeta := v.Meta() dupErr = kv.ErrKeyExists.FastGenByArgs(entryKey, idxMeta.Name.String()) } - rsData := TryGetHandleRestoredDataWrapper(t, r, nil, v.Meta()) + rsData := TryGetHandleRestoredDataWrapper(t.Meta(), r, nil, v.Meta()) if dupHandle, err := v.Create(sctx, txn, indexVals, recordID, rsData, opts...); err != nil { if kv.ErrKeyExists.Equal(err) { return dupHandle, dupErr @@ -1345,7 +1345,7 @@ func (t *TableCommon) buildIndexForRow(ctx sessionctx.Context, h kv.Handle, vals if untouched { opts = append(opts, table.IndexIsUntouched) } - rsData := TryGetHandleRestoredDataWrapper(t, newData, nil, idx.Meta()) + rsData := TryGetHandleRestoredDataWrapper(t.Meta(), newData, nil, idx.Meta()) if _, err := idx.Create(ctx, txn, vals, h, rsData, opts...); err != nil { if kv.ErrKeyExists.Equal(err) { // Make error message consistent with MySQL. @@ -1866,14 +1866,14 @@ func (t *TableCommon) GetSequenceCommon() *sequenceCommon { } // TryGetHandleRestoredDataWrapper tries to get the restored data for handle if needed. The argument can be a slice or a map. -func TryGetHandleRestoredDataWrapper(t table.Table, row []types.Datum, rowMap map[int64]types.Datum, idx *model.IndexInfo) []types.Datum { - if !collate.NewCollationEnabled() || !t.Meta().IsCommonHandle || t.Meta().CommonHandleVersion == 0 { +func TryGetHandleRestoredDataWrapper(tblInfo *model.TableInfo, row []types.Datum, rowMap map[int64]types.Datum, idx *model.IndexInfo) []types.Datum { + if !collate.NewCollationEnabled() || !tblInfo.IsCommonHandle || tblInfo.CommonHandleVersion == 0 { return nil } rsData := make([]types.Datum, 0, 4) - pkIdx := FindPrimaryIndex(t.Meta()) + pkIdx := FindPrimaryIndex(tblInfo) for _, pkIdxCol := range pkIdx.Columns { - pkCol := t.Meta().Columns[pkIdxCol.Offset] + pkCol := tblInfo.Columns[pkIdxCol.Offset] if !types.NeedRestoredData(&pkCol.FieldType) { continue }