From 13929b6249524dec9c8abf7ccc2b2f2d0df6c246 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 16 Nov 2022 16:33:17 +0800 Subject: [PATCH 1/9] ddl: support read records with copr for adding index --- ddl/backfilling.go | 7 ++ ddl/index.go | 19 +++- ddl/index_cop.go | 235 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 258 insertions(+), 3 deletions(-) create mode 100644 ddl/index_cop.go diff --git a/ddl/backfilling.go b/ddl/backfilling.go index dfd213652ab2e..4e6d9f87190c9 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -158,6 +158,13 @@ type reorgBackfillTask struct { endInclude bool } +func (r *reorgBackfillTask) excludedEndKey() kv.Key { + if r.endInclude { + return r.endKey.Next() + } + return r.endKey +} + func (r *reorgBackfillTask) String() string { physicalID := strconv.FormatInt(r.physicalTableID, 10) startKey := tryDecodeToHandleString(r.startKey) diff --git a/ddl/index.go b/ddl/index.go index 69622ec3b5bef..8329430b6e915 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1177,6 +1177,7 @@ type baseIndexWorker struct { type addIndexWorker struct { baseIndexWorker index table.Index + coprCtx *copContext writerCtx *ingest.WriterContext // The following attributes are used to reduce memory allocation. @@ -1197,6 +1198,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) var lwCtx *ingest.WriterContext + var copCtx *copContext if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { bc, ok := ingest.LitBackCtxMgr.Load(job.ID) if !ok { @@ -1204,12 +1206,13 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable } ei, err := bc.EngMgr.Register(bc, job, reorgInfo.currElement.ID) if err != nil { - return nil, errors.Trace(errors.New(ingest.LitErrCreateEngineFail)) + return nil, errors.Trace(err) } lwCtx, err = ei.NewWriterCtx(id) if err != nil { return nil, err } + copCtx = newCopContext(t.Meta(), indexInfo, sessCtx) } return &addIndexWorker{ @@ -1224,6 +1227,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable jobContext: jc, }, index: index, + coprCtx: copCtx, writerCtx: lwCtx, }, nil } @@ -1481,7 +1485,7 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC oprStartTime := time.Now() ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType()) - errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { + errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) (err error) { taskCtx.addedCount = 0 taskCtx.scanCount = 0 txn.SetOption(kv.Priority, w.priority) @@ -1489,7 +1493,16 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC txn.SetOption(kv.ResourceGroupTagger, tagger) } - idxRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange) + var ( + idxRecords []*indexRecord + nextKey kv.Key + taskDone bool + ) + if w.coprCtx != nil { + idxRecords, nextKey, taskDone, err = w.fetchRowColValsFromCop(txn, handleRange) + } else { + idxRecords, nextKey, taskDone, err = w.fetchRowColVals(txn, handleRange) + } if err != nil { return errors.Trace(err) } diff --git a/ddl/index_cop.go b/ddl/index_cop.go new file mode 100644 index 0000000000000..86dd7a299bbf7 --- /dev/null +++ b/ddl/index_cop.go @@ -0,0 +1,235 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/distsql" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/timeutil" + "github.com/pingcap/tipb/go-tipb" +) + +// copReadBatchFactor is the factor of batch size of coprocessor read. +// It multiplies the tidb_ddl_reorg_batch_size to avoid sending too many cop requests for the same handle range. +const copReadBatchFactor = 10 + +func (w *addIndexWorker) fetchRowColValsFromCop(txn kv.Transaction, handleRange reorgBackfillTask) ([]*indexRecord, kv.Key, bool, error) { + w.idxRecords = w.idxRecords[:0] + srcResult, err := w.coprCtx.buildTableScan(w.ctx, txn.StartTS(), handleRange.startKey, handleRange.excludedEndKey()) + if err != nil { + return nil, nil, false, errors.Trace(err) + } + var done bool + w.idxRecords, done, err = w.coprCtx.fetchTableScanResult(w.ctx, srcResult, w.idxRecords, w.batchCnt*copReadBatchFactor) + nextKey := handleRange.endKey + if !done { + lastHandle := w.idxRecords[len(w.idxRecords)-1].handle + prefix := tablecodec.GenTableRecordPrefix(w.coprCtx.tblInfo.ID) + nextKey = tablecodec.EncodeRecordKey(prefix, lastHandle).Next() + } + return w.idxRecords, nextKey, done, err +} + +type copContext struct { + tblInfo *model.TableInfo + idxInfo *model.IndexInfo + colInfos []*model.ColumnInfo + fieldTps []*types.FieldType + sessCtx sessionctx.Context + + srcChunk *chunk.Chunk +} + +func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx sessionctx.Context) *copContext { + colInfos := make([]*model.ColumnInfo, 0, len(idxInfo.Columns)) + fieldTps := make([]*types.FieldType, 0, len(idxInfo.Columns)) + for _, idxCol := range idxInfo.Columns { + c := tblInfo.Columns[idxCol.Offset] + colInfos = append(colInfos, c) + fieldTps = append(fieldTps, &c.FieldType) + } + + pkColInfos, pkFieldTps := buildHandleColInfoAndFieldTypes(tblInfo) + colInfos = append(colInfos, pkColInfos...) + fieldTps = append(fieldTps, pkFieldTps...) + + copCtx := &copContext{ + tblInfo: tblInfo, + idxInfo: idxInfo, + colInfos: colInfos, + fieldTps: fieldTps, + sessCtx: sessCtx, + srcChunk: chunk.NewChunkWithCapacity(fieldTps, variable.DefMaxChunkSize), + } + return copCtx +} + +func (c *copContext) buildTableScan(ctx context.Context, startTS uint64, start, end kv.Key) (distsql.SelectResult, error) { + dagPB, err := buildDAGPB(c.sessCtx, c.tblInfo, c.colInfos) + if err != nil { + return nil, err + } + + var builder distsql.RequestBuilder + kvReq, err := builder. + SetDAGRequest(dagPB). + SetStartTS(startTS). + SetKeyRanges([]kv.KeyRange{{StartKey: start, EndKey: end}}). + SetKeepOrder(true). + SetFromSessionVars(c.sessCtx.GetSessionVars()). + SetFromInfoSchema(c.sessCtx.GetDomainInfoSchema()). + SetConcurrency(1). + Build() + if err != nil { + return nil, err + } + return distsql.Select(ctx, c.sessCtx, kvReq, c.fieldTps, statistics.NewQueryFeedback(0, nil, 0, false)) +} + +func (c *copContext) fetchTableScanResult(ctx context.Context, result distsql.SelectResult, + buf []*indexRecord, batchCnt int) ([]*indexRecord, bool, error) { + sctx := c.sessCtx.GetSessionVars().StmtCtx + for { + err := result.Next(ctx, c.srcChunk) + if err != nil { + return nil, false, errors.Trace(err) + } + if c.srcChunk.NumRows() == 0 { + return buf, true, nil + } + iter := chunk.NewIterator4Chunk(c.srcChunk) + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + idxDt, hdDt := extractIdxValsAndHandle(row, c.idxInfo, c.fieldTps) + handle, err := buildHandle(hdDt, c.tblInfo, c.idxInfo, sctx) + if err != nil { + return nil, false, errors.Trace(err) + } + rsData := tables.TryGetHandleRestoredDataWrapper(c.tblInfo, hdDt, nil, c.idxInfo) + buf = append(buf, &indexRecord{handle: handle, key: nil, vals: idxDt, rsData: rsData, skip: false}) + if len(buf) >= batchCnt { + return buf, false, nil + } + } + } +} + +func buildDAGPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.DAGRequest, error) { + dagReq := &tipb.DAGRequest{} + dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(sCtx.GetSessionVars().Location()) + sc := sCtx.GetSessionVars().StmtCtx + dagReq.Flags = sc.PushDownFlags() + for i := range colInfos { + dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i)) + } + execPB, err := constructTableScanPB(sCtx, tblInfo, colInfos) + if err != nil { + return nil, err + } + dagReq.Executors = append(dagReq.Executors, execPB) + distsql.SetEncodeType(sCtx, dagReq) + return dagReq, nil +} + +func constructTableScanPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.Executor, error) { + tblScan := tables.BuildTableScanFromInfos(tblInfo, colInfos) + tblScan.TableId = tblInfo.ID + err := tables.SetPBColumnsDefaultValue(sCtx, tblScan.Columns, colInfos) + return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan}, err +} + +func (c *copContext) sendIdxRecords(ctx context.Context, ch chan *indexRecord, srcChk *chunk.Chunk, + startTS uint64, start, end kv.Key) error { + sctx := c.sessCtx.GetSessionVars().StmtCtx + srcResult, err := c.buildTableScan(ctx, startTS, start, end) + if err != nil { + return errors.Trace(err) + } + for { + err := srcResult.Next(ctx, srcChk) + if err != nil { + return errors.Trace(err) + } + if srcChk.NumRows() == 0 { + return nil + } + iter := chunk.NewIterator4Chunk(srcChk) + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + idxDt, hdDt := extractIdxValsAndHandle(row, c.idxInfo, c.fieldTps) + handle, err := buildHandle(hdDt, c.tblInfo, c.idxInfo, sctx) + if err != nil { + return errors.Trace(err) + } + rsData := tables.TryGetHandleRestoredDataWrapper(c.tblInfo, hdDt, nil, c.idxInfo) + ch <- &indexRecord{handle: handle, key: nil, vals: idxDt, rsData: rsData, skip: false} + } + } +} + +func buildHandleColInfoAndFieldTypes(tbInfo *model.TableInfo) ([]*model.ColumnInfo, []*types.FieldType) { + if tbInfo.PKIsHandle { + for i := range tbInfo.Columns { + if mysql.HasPriKeyFlag(tbInfo.Columns[i].GetFlag()) { + return []*model.ColumnInfo{tbInfo.Columns[i]}, []*types.FieldType{&tbInfo.Columns[i].FieldType} + } + } + } else if tbInfo.IsCommonHandle { + primaryIdx := tables.FindPrimaryIndex(tbInfo) + pkCols := make([]*model.ColumnInfo, 0, len(primaryIdx.Columns)) + pkFts := make([]*types.FieldType, 0, len(primaryIdx.Columns)) + for i := range tbInfo.Columns { + pkCols = append(pkCols, tbInfo.Columns[i]) + pkFts = append(pkFts, &tbInfo.Columns[i].FieldType) + } + return pkCols, pkFts + } + extra := model.NewExtraHandleColInfo() + return []*model.ColumnInfo{extra}, []*types.FieldType{&extra.FieldType} +} + +func extractIdxValsAndHandle(row chunk.Row, idxInfo *model.IndexInfo, fieldTps []*types.FieldType) ([]types.Datum, []types.Datum) { + datumBuf := make([]types.Datum, 0, len(fieldTps)) + idxColLen := len(idxInfo.Columns) + for i, ft := range fieldTps { + datumBuf = append(datumBuf, row.GetDatum(i, ft)) + } + return datumBuf[:idxColLen], datumBuf[idxColLen:] +} + +func buildHandle(pkDts []types.Datum, tblInfo *model.TableInfo, + idxInfo *model.IndexInfo, stmtCtx *stmtctx.StatementContext) (kv.Handle, error) { + if tblInfo.IsCommonHandle { + tablecodec.TruncateIndexValues(tblInfo, idxInfo, pkDts) + handleBytes, err := codec.EncodeKey(stmtCtx, nil, pkDts...) + if err != nil { + return nil, err + } + return kv.NewCommonHandle(handleBytes) + } + return kv.IntHandle(pkDts[0].GetInt64()), nil +} From 4bf61ba528d012ad982177d24c0d4d3ffa84b3af Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 17 Nov 2022 10:49:38 +0800 Subject: [PATCH 2/9] update bazel --- ddl/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index b54f52a6d192c..f0d6a7a632738 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -27,6 +27,7 @@ go_library( "foreign_key.go", "generated_column.go", "index.go", + "index_cop.go", "index_merge_tmp.go", "job_table.go", "mock.go", From c9dd6320d11155110fed691fa6b2fa65e568dcb1 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 17 Nov 2022 11:06:45 +0800 Subject: [PATCH 3/9] remove unused function --- ddl/index_cop.go | 28 ---------------------------- 1 file changed, 28 deletions(-) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 86dd7a299bbf7..4be31733bef4f 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -163,34 +163,6 @@ func constructTableScanPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, col return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan}, err } -func (c *copContext) sendIdxRecords(ctx context.Context, ch chan *indexRecord, srcChk *chunk.Chunk, - startTS uint64, start, end kv.Key) error { - sctx := c.sessCtx.GetSessionVars().StmtCtx - srcResult, err := c.buildTableScan(ctx, startTS, start, end) - if err != nil { - return errors.Trace(err) - } - for { - err := srcResult.Next(ctx, srcChk) - if err != nil { - return errors.Trace(err) - } - if srcChk.NumRows() == 0 { - return nil - } - iter := chunk.NewIterator4Chunk(srcChk) - for row := iter.Begin(); row != iter.End(); row = iter.Next() { - idxDt, hdDt := extractIdxValsAndHandle(row, c.idxInfo, c.fieldTps) - handle, err := buildHandle(hdDt, c.tblInfo, c.idxInfo, sctx) - if err != nil { - return errors.Trace(err) - } - rsData := tables.TryGetHandleRestoredDataWrapper(c.tblInfo, hdDt, nil, c.idxInfo) - ch <- &indexRecord{handle: handle, key: nil, vals: idxDt, rsData: rsData, skip: false} - } - } -} - func buildHandleColInfoAndFieldTypes(tbInfo *model.TableInfo) ([]*model.ColumnInfo, []*types.FieldType) { if tbInfo.PKIsHandle { for i := range tbInfo.Columns { From 21e0f777571945911bd665d2adc3f2a2e3aa1b5a Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 17 Nov 2022 11:26:12 +0800 Subject: [PATCH 4/9] use pkInfo to truncate common handle datum --- ddl/index_cop.go | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 4be31733bef4f..86d698b02d8c9 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -41,24 +41,33 @@ const copReadBatchFactor = 10 func (w *addIndexWorker) fetchRowColValsFromCop(txn kv.Transaction, handleRange reorgBackfillTask) ([]*indexRecord, kv.Key, bool, error) { w.idxRecords = w.idxRecords[:0] - srcResult, err := w.coprCtx.buildTableScan(w.ctx, txn.StartTS(), handleRange.startKey, handleRange.excludedEndKey()) + start, end := handleRange.startKey, handleRange.excludedEndKey() + batchCnt := w.batchCnt * copReadBatchFactor + return FetchRowsFromCop(w.ctx, w.coprCtx, start, end, txn.StartTS(), w.idxRecords, batchCnt) +} + +// FetchRowsFromCop sends a coprocessor request and fetches the first batchCnt rows. +func FetchRowsFromCop(ctx context.Context, copCtx *copContext, startKey, endKey kv.Key, startTS uint64, + buf []*indexRecord, batchCnt int) ([]*indexRecord, kv.Key, bool, error) { + srcResult, err := copCtx.buildTableScan(ctx, startTS, startKey, endKey) if err != nil { return nil, nil, false, errors.Trace(err) } var done bool - w.idxRecords, done, err = w.coprCtx.fetchTableScanResult(w.ctx, srcResult, w.idxRecords, w.batchCnt*copReadBatchFactor) - nextKey := handleRange.endKey + buf, done, err = copCtx.fetchTableScanResult(ctx, srcResult, buf, batchCnt) + nextKey := endKey if !done { - lastHandle := w.idxRecords[len(w.idxRecords)-1].handle - prefix := tablecodec.GenTableRecordPrefix(w.coprCtx.tblInfo.ID) + lastHandle := buf[len(buf)-1].handle + prefix := tablecodec.GenTableRecordPrefix(copCtx.tblInfo.ID) nextKey = tablecodec.EncodeRecordKey(prefix, lastHandle).Next() } - return w.idxRecords, nextKey, done, err + return buf, nextKey, done, err } type copContext struct { tblInfo *model.TableInfo idxInfo *model.IndexInfo + pkInfo *model.IndexInfo colInfos []*model.ColumnInfo fieldTps []*types.FieldType sessCtx sessionctx.Context @@ -75,13 +84,14 @@ func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx s fieldTps = append(fieldTps, &c.FieldType) } - pkColInfos, pkFieldTps := buildHandleColInfoAndFieldTypes(tblInfo) + pkColInfos, pkFieldTps, pkInfo := buildHandleColInfoAndFieldTypes(tblInfo) colInfos = append(colInfos, pkColInfos...) fieldTps = append(fieldTps, pkFieldTps...) copCtx := &copContext{ tblInfo: tblInfo, idxInfo: idxInfo, + pkInfo: pkInfo, colInfos: colInfos, fieldTps: fieldTps, sessCtx: sessCtx, @@ -126,7 +136,7 @@ func (c *copContext) fetchTableScanResult(ctx context.Context, result distsql.Se iter := chunk.NewIterator4Chunk(c.srcChunk) for row := iter.Begin(); row != iter.End(); row = iter.Next() { idxDt, hdDt := extractIdxValsAndHandle(row, c.idxInfo, c.fieldTps) - handle, err := buildHandle(hdDt, c.tblInfo, c.idxInfo, sctx) + handle, err := buildHandle(hdDt, c.tblInfo, c.pkInfo, sctx) if err != nil { return nil, false, errors.Trace(err) } @@ -163,11 +173,11 @@ func constructTableScanPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, col return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan}, err } -func buildHandleColInfoAndFieldTypes(tbInfo *model.TableInfo) ([]*model.ColumnInfo, []*types.FieldType) { +func buildHandleColInfoAndFieldTypes(tbInfo *model.TableInfo) ([]*model.ColumnInfo, []*types.FieldType, *model.IndexInfo) { if tbInfo.PKIsHandle { for i := range tbInfo.Columns { if mysql.HasPriKeyFlag(tbInfo.Columns[i].GetFlag()) { - return []*model.ColumnInfo{tbInfo.Columns[i]}, []*types.FieldType{&tbInfo.Columns[i].FieldType} + return []*model.ColumnInfo{tbInfo.Columns[i]}, []*types.FieldType{&tbInfo.Columns[i].FieldType}, nil } } } else if tbInfo.IsCommonHandle { @@ -178,10 +188,10 @@ func buildHandleColInfoAndFieldTypes(tbInfo *model.TableInfo) ([]*model.ColumnIn pkCols = append(pkCols, tbInfo.Columns[i]) pkFts = append(pkFts, &tbInfo.Columns[i].FieldType) } - return pkCols, pkFts + return pkCols, pkFts, primaryIdx } extra := model.NewExtraHandleColInfo() - return []*model.ColumnInfo{extra}, []*types.FieldType{&extra.FieldType} + return []*model.ColumnInfo{extra}, []*types.FieldType{&extra.FieldType}, nil } func extractIdxValsAndHandle(row chunk.Row, idxInfo *model.IndexInfo, fieldTps []*types.FieldType) ([]types.Datum, []types.Datum) { @@ -194,9 +204,9 @@ func extractIdxValsAndHandle(row chunk.Row, idxInfo *model.IndexInfo, fieldTps [ } func buildHandle(pkDts []types.Datum, tblInfo *model.TableInfo, - idxInfo *model.IndexInfo, stmtCtx *stmtctx.StatementContext) (kv.Handle, error) { + pkInfo *model.IndexInfo, stmtCtx *stmtctx.StatementContext) (kv.Handle, error) { if tblInfo.IsCommonHandle { - tablecodec.TruncateIndexValues(tblInfo, idxInfo, pkDts) + tablecodec.TruncateIndexValues(tblInfo, pkInfo, pkDts) handleBytes, err := codec.EncodeKey(stmtCtx, nil, pkDts...) if err != nil { return nil, err From 7b281e1ed981af8ac2c0151edbdccfaf9bf3a09a Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 17 Nov 2022 16:11:39 +0800 Subject: [PATCH 5/9] fix common handle primary key columns --- ddl/index_cop.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 86d698b02d8c9..8163ee9257621 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -43,11 +43,11 @@ func (w *addIndexWorker) fetchRowColValsFromCop(txn kv.Transaction, handleRange w.idxRecords = w.idxRecords[:0] start, end := handleRange.startKey, handleRange.excludedEndKey() batchCnt := w.batchCnt * copReadBatchFactor - return FetchRowsFromCop(w.ctx, w.coprCtx, start, end, txn.StartTS(), w.idxRecords, batchCnt) + return fetchRowsFromCop(w.ctx, w.coprCtx, start, end, txn.StartTS(), w.idxRecords, batchCnt) } -// FetchRowsFromCop sends a coprocessor request and fetches the first batchCnt rows. -func FetchRowsFromCop(ctx context.Context, copCtx *copContext, startKey, endKey kv.Key, startTS uint64, +// fetchRowsFromCop sends a coprocessor request and fetches the first batchCnt rows. +func fetchRowsFromCop(ctx context.Context, copCtx *copContext, startKey, endKey kv.Key, startTS uint64, buf []*indexRecord, batchCnt int) ([]*indexRecord, kv.Key, bool, error) { srcResult, err := copCtx.buildTableScan(ctx, startTS, startKey, endKey) if err != nil { @@ -184,9 +184,9 @@ func buildHandleColInfoAndFieldTypes(tbInfo *model.TableInfo) ([]*model.ColumnIn primaryIdx := tables.FindPrimaryIndex(tbInfo) pkCols := make([]*model.ColumnInfo, 0, len(primaryIdx.Columns)) pkFts := make([]*types.FieldType, 0, len(primaryIdx.Columns)) - for i := range tbInfo.Columns { - pkCols = append(pkCols, tbInfo.Columns[i]) - pkFts = append(pkFts, &tbInfo.Columns[i].FieldType) + for _, pkCol := range primaryIdx.Columns { + pkCols = append(pkCols, tbInfo.Columns[pkCol.Offset]) + pkFts = append(pkFts, &tbInfo.Columns[pkCol.Offset].FieldType) } return pkCols, pkFts, primaryIdx } From 7ca5ca36c026bb4a6e272a81182344ffed098140 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 17 Nov 2022 16:14:37 +0800 Subject: [PATCH 6/9] rename coprCtx to copCtx for consistency --- ddl/index.go | 6 +++--- ddl/index_cop.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 8329430b6e915..39315cd28900f 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1177,7 +1177,7 @@ type baseIndexWorker struct { type addIndexWorker struct { baseIndexWorker index table.Index - coprCtx *copContext + copCtx *copContext writerCtx *ingest.WriterContext // The following attributes are used to reduce memory allocation. @@ -1227,7 +1227,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable jobContext: jc, }, index: index, - coprCtx: copCtx, + copCtx: copCtx, writerCtx: lwCtx, }, nil } @@ -1498,7 +1498,7 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC nextKey kv.Key taskDone bool ) - if w.coprCtx != nil { + if w.copCtx != nil { idxRecords, nextKey, taskDone, err = w.fetchRowColValsFromCop(txn, handleRange) } else { idxRecords, nextKey, taskDone, err = w.fetchRowColVals(txn, handleRange) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 8163ee9257621..84488daf53fbb 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -43,7 +43,7 @@ func (w *addIndexWorker) fetchRowColValsFromCop(txn kv.Transaction, handleRange w.idxRecords = w.idxRecords[:0] start, end := handleRange.startKey, handleRange.excludedEndKey() batchCnt := w.batchCnt * copReadBatchFactor - return fetchRowsFromCop(w.ctx, w.coprCtx, start, end, txn.StartTS(), w.idxRecords, batchCnt) + return fetchRowsFromCop(w.ctx, w.copCtx, start, end, txn.StartTS(), w.idxRecords, batchCnt) } // fetchRowsFromCop sends a coprocessor request and fetches the first batchCnt rows. From e562b0e4148a1e1b9d7b68da1743607cc86331bb Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 18 Nov 2022 14:09:12 +0800 Subject: [PATCH 7/9] add test --- ddl/export_test.go | 22 +++++++++ ddl/index_cop_test.go | 102 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+) create mode 100644 ddl/index_cop_test.go diff --git a/ddl/export_test.go b/ddl/export_test.go index 708b3474515c5..071652a642d42 100644 --- a/ddl/export_test.go +++ b/ddl/export_test.go @@ -14,6 +14,28 @@ package ddl +import ( + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/types" +) + func SetBatchInsertDeleteRangeSize(i int) { batchInsertDeleteRangeSize = i } + +var ( + FetchRowsFromCop4Test = fetchRowsFromCop + NewCopContext4Test = newCopContext +) + +type ( + IndexRecord4Test = *indexRecord +) + +func (i IndexRecord4Test) GetHandle() kv.Handle { + return i.handle +} + +func (i IndexRecord4Test) GetIndexValues() []types.Datum { + return i.vals +} diff --git a/ddl/index_cop_test.go b/ddl/index_cop_test.go new file mode 100644 index 0000000000000..7f18d415ef8f2 --- /dev/null +++ b/ddl/index_cop_test.go @@ -0,0 +1,102 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl_test + +import ( + "context" + "fmt" + "strconv" + "testing" + + "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/types" + "github.com/stretchr/testify/require" +) + +func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + testFetchRows := func(db, tb, idx string) ([]kv.Handle, [][]types.Datum) { + tbl, err := dom.InfoSchema().TableByName(model.NewCIStr(db), model.NewCIStr(tb)) + require.NoError(t, err) + tblInfo := tbl.Meta() + idxInfo := tblInfo.FindIndexByName(idx) + copCtx := ddl.NewCopContext4Test(tblInfo, idxInfo, tk.Session()) + startKey := tbl.RecordPrefix() + endKey := startKey.PrefixNext() + txn, err := store.Begin() + require.NoError(t, err) + idxRec, _, done, err := ddl.FetchRowsFromCop4Test(context.Background(), copCtx, startKey, endKey, + txn.StartTS(), nil, 10) + require.NoError(t, err) + require.True(t, done) + require.NoError(t, txn.Rollback()) + + handles := make([]kv.Handle, 0, len(idxRec)) + values := make([][]types.Datum, 0, len(idxRec)) + for _, rec := range idxRec { + handles = append(handles, rec.GetHandle()) + values = append(values, rec.GetIndexValues()) + } + return handles, values + } + + // Test nonclustered primary key table. + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a bigint, b int, index idx (b));") + for i := 0; i < 8; i++ { + tk.MustExec("insert into t values (?, ?)", i, i) + } + hds, vals := testFetchRows("test", "t", "idx") + require.Len(t, hds, 8) + for i := 0; i < 8; i++ { + require.Equal(t, hds[i].IntValue(), int64(i+1)) + require.Len(t, vals[i], 1) + require.Equal(t, vals[i][0].GetInt64(), int64(i)) + } + + // Test clustered primary key table(pk_is_handle). + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a bigint primary key, b int, index idx (b));") + for i := 0; i < 8; i++ { + tk.MustExec("insert into t values (?, ?)", i, i) + } + hds, vals = testFetchRows("test", "t", "idx") + require.Len(t, hds, 8) + for i := 0; i < 8; i++ { + require.Equal(t, hds[i].IntValue(), int64(i)) + require.Len(t, vals[i], 1) + require.Equal(t, vals[i][0].GetInt64(), int64(i)) + } + + // Test clustered primary key table(common_handle). + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a varchar(10), b int, c char(10), primary key (a, c) clustered, index idx (b));") + for i := 0; i < 8; i++ { + tk.MustExec("insert into t values (?, ?, ?)", strconv.Itoa(i), i, strconv.Itoa(i)) + } + hds, vals = testFetchRows("test", "t", "idx") + require.Len(t, hds, 8) + for i := 0; i < 8; i++ { + require.Equal(t, hds[i].String(), fmt.Sprintf("{%d, %d}", i, i)) + require.Len(t, vals[i], 1) + require.Equal(t, vals[i][0].GetInt64(), int64(i)) + } +} From 28d34f0eeea6a7cabcfe192638193417ce3a0d48 Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 18 Nov 2022 14:30:36 +0800 Subject: [PATCH 8/9] update bazel --- ddl/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index f0d6a7a632738..b5b3a8c4af80c 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -169,6 +169,7 @@ go_test( "fail_test.go", "foreign_key_test.go", "index_change_test.go", + "index_cop_test.go", "index_merge_tmp_test.go", "index_modify_test.go", "integration_test.go", From abdf6264cc4fb10614e04619dee1125b24c592e5 Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 18 Nov 2022 19:02:53 +0800 Subject: [PATCH 9/9] fix integration test --- ddl/backfilling.go | 5 +++++ ddl/index_cop.go | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 14b311127e286..271e46e49d376 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -349,6 +349,11 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { w.batchCnt = int(variable.GetDDLReorgBatchSize()) result := w.handleBackfillTask(d, task, bf) w.resultCh <- result + if result.err != nil { + logutil.BgLogger().Info("[ddl] backfill worker exit on error", + zap.Stringer("type", w.tp), zap.Int("workerID", w.id), zap.Error(result.err)) + return + } } } diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 84488daf53fbb..dfd261740e7c8 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -80,6 +80,10 @@ func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx s fieldTps := make([]*types.FieldType, 0, len(idxInfo.Columns)) for _, idxCol := range idxInfo.Columns { c := tblInfo.Columns[idxCol.Offset] + if c.IsGenerated() && !c.GeneratedStored { + // TODO(tangenta): support reading virtual generated columns. + return nil + } colInfos = append(colInfos, c) fieldTps = append(fieldTps, &c.FieldType) }