diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index b54f52a6d192c..b5b3a8c4af80c 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -27,6 +27,7 @@ go_library( "foreign_key.go", "generated_column.go", "index.go", + "index_cop.go", "index_merge_tmp.go", "job_table.go", "mock.go", @@ -168,6 +169,7 @@ go_test( "fail_test.go", "foreign_key_test.go", "index_change_test.go", + "index_cop_test.go", "index_merge_tmp_test.go", "index_modify_test.go", "integration_test.go", diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 886fcf71841bf..271e46e49d376 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -158,6 +158,13 @@ type reorgBackfillTask struct { endInclude bool } +func (r *reorgBackfillTask) excludedEndKey() kv.Key { + if r.endInclude { + return r.endKey.Next() + } + return r.endKey +} + func (r *reorgBackfillTask) String() string { physicalID := strconv.FormatInt(r.physicalTableID, 10) startKey := tryDecodeToHandleString(r.startKey) @@ -342,6 +349,11 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { w.batchCnt = int(variable.GetDDLReorgBatchSize()) result := w.handleBackfillTask(d, task, bf) w.resultCh <- result + if result.err != nil { + logutil.BgLogger().Info("[ddl] backfill worker exit on error", + zap.Stringer("type", w.tp), zap.Int("workerID", w.id), zap.Error(result.err)) + return + } } } diff --git a/ddl/export_test.go b/ddl/export_test.go index 708b3474515c5..071652a642d42 100644 --- a/ddl/export_test.go +++ b/ddl/export_test.go @@ -14,6 +14,28 @@ package ddl +import ( + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/types" +) + func SetBatchInsertDeleteRangeSize(i int) { batchInsertDeleteRangeSize = i } + +var ( + FetchRowsFromCop4Test = fetchRowsFromCop + NewCopContext4Test = newCopContext +) + +type ( + IndexRecord4Test = *indexRecord +) + +func (i IndexRecord4Test) GetHandle() kv.Handle { + return i.handle +} + +func (i IndexRecord4Test) GetIndexValues() []types.Datum { + return i.vals +} diff --git a/ddl/index.go b/ddl/index.go index 5d53fae442b23..508ff743a2d46 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1180,6 +1180,7 @@ type baseIndexWorker struct { type addIndexWorker struct { baseIndexWorker index table.Index + copCtx *copContext writerCtx *ingest.WriterContext // The following attributes are used to reduce memory allocation. @@ -1200,6 +1201,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) var lwCtx *ingest.WriterContext + var copCtx *copContext if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { bc, ok := ingest.LitBackCtxMgr.Load(job.ID) if !ok { @@ -1213,6 +1215,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable if err != nil { return nil, err } + copCtx = newCopContext(t.Meta(), indexInfo, sessCtx) } return &addIndexWorker{ @@ -1227,6 +1230,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable jobContext: jc, }, index: index, + copCtx: copCtx, writerCtx: lwCtx, }, nil } @@ -1484,7 +1488,7 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC oprStartTime := time.Now() ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType()) - errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { + errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) (err error) { taskCtx.addedCount = 0 taskCtx.scanCount = 0 txn.SetOption(kv.Priority, w.priority) @@ -1492,7 +1496,16 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC txn.SetOption(kv.ResourceGroupTagger, tagger) } - idxRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange) + var ( + idxRecords []*indexRecord + nextKey kv.Key + taskDone bool + ) + if w.copCtx != nil { + idxRecords, nextKey, taskDone, err = w.fetchRowColValsFromCop(txn, handleRange) + } else { + idxRecords, nextKey, taskDone, err = w.fetchRowColVals(txn, handleRange) + } if err != nil { return errors.Trace(err) } diff --git a/ddl/index_cop.go b/ddl/index_cop.go new file mode 100644 index 0000000000000..dfd261740e7c8 --- /dev/null +++ b/ddl/index_cop.go @@ -0,0 +1,221 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/distsql" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/timeutil" + "github.com/pingcap/tipb/go-tipb" +) + +// copReadBatchFactor is the factor of batch size of coprocessor read. +// It multiplies the tidb_ddl_reorg_batch_size to avoid sending too many cop requests for the same handle range. +const copReadBatchFactor = 10 + +func (w *addIndexWorker) fetchRowColValsFromCop(txn kv.Transaction, handleRange reorgBackfillTask) ([]*indexRecord, kv.Key, bool, error) { + w.idxRecords = w.idxRecords[:0] + start, end := handleRange.startKey, handleRange.excludedEndKey() + batchCnt := w.batchCnt * copReadBatchFactor + return fetchRowsFromCop(w.ctx, w.copCtx, start, end, txn.StartTS(), w.idxRecords, batchCnt) +} + +// fetchRowsFromCop sends a coprocessor request and fetches the first batchCnt rows. +func fetchRowsFromCop(ctx context.Context, copCtx *copContext, startKey, endKey kv.Key, startTS uint64, + buf []*indexRecord, batchCnt int) ([]*indexRecord, kv.Key, bool, error) { + srcResult, err := copCtx.buildTableScan(ctx, startTS, startKey, endKey) + if err != nil { + return nil, nil, false, errors.Trace(err) + } + var done bool + buf, done, err = copCtx.fetchTableScanResult(ctx, srcResult, buf, batchCnt) + nextKey := endKey + if !done { + lastHandle := buf[len(buf)-1].handle + prefix := tablecodec.GenTableRecordPrefix(copCtx.tblInfo.ID) + nextKey = tablecodec.EncodeRecordKey(prefix, lastHandle).Next() + } + return buf, nextKey, done, err +} + +type copContext struct { + tblInfo *model.TableInfo + idxInfo *model.IndexInfo + pkInfo *model.IndexInfo + colInfos []*model.ColumnInfo + fieldTps []*types.FieldType + sessCtx sessionctx.Context + + srcChunk *chunk.Chunk +} + +func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx sessionctx.Context) *copContext { + colInfos := make([]*model.ColumnInfo, 0, len(idxInfo.Columns)) + fieldTps := make([]*types.FieldType, 0, len(idxInfo.Columns)) + for _, idxCol := range idxInfo.Columns { + c := tblInfo.Columns[idxCol.Offset] + if c.IsGenerated() && !c.GeneratedStored { + // TODO(tangenta): support reading virtual generated columns. + return nil + } + colInfos = append(colInfos, c) + fieldTps = append(fieldTps, &c.FieldType) + } + + pkColInfos, pkFieldTps, pkInfo := buildHandleColInfoAndFieldTypes(tblInfo) + colInfos = append(colInfos, pkColInfos...) + fieldTps = append(fieldTps, pkFieldTps...) + + copCtx := &copContext{ + tblInfo: tblInfo, + idxInfo: idxInfo, + pkInfo: pkInfo, + colInfos: colInfos, + fieldTps: fieldTps, + sessCtx: sessCtx, + srcChunk: chunk.NewChunkWithCapacity(fieldTps, variable.DefMaxChunkSize), + } + return copCtx +} + +func (c *copContext) buildTableScan(ctx context.Context, startTS uint64, start, end kv.Key) (distsql.SelectResult, error) { + dagPB, err := buildDAGPB(c.sessCtx, c.tblInfo, c.colInfos) + if err != nil { + return nil, err + } + + var builder distsql.RequestBuilder + kvReq, err := builder. + SetDAGRequest(dagPB). + SetStartTS(startTS). + SetKeyRanges([]kv.KeyRange{{StartKey: start, EndKey: end}}). + SetKeepOrder(true). + SetFromSessionVars(c.sessCtx.GetSessionVars()). + SetFromInfoSchema(c.sessCtx.GetDomainInfoSchema()). + SetConcurrency(1). + Build() + if err != nil { + return nil, err + } + return distsql.Select(ctx, c.sessCtx, kvReq, c.fieldTps, statistics.NewQueryFeedback(0, nil, 0, false)) +} + +func (c *copContext) fetchTableScanResult(ctx context.Context, result distsql.SelectResult, + buf []*indexRecord, batchCnt int) ([]*indexRecord, bool, error) { + sctx := c.sessCtx.GetSessionVars().StmtCtx + for { + err := result.Next(ctx, c.srcChunk) + if err != nil { + return nil, false, errors.Trace(err) + } + if c.srcChunk.NumRows() == 0 { + return buf, true, nil + } + iter := chunk.NewIterator4Chunk(c.srcChunk) + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + idxDt, hdDt := extractIdxValsAndHandle(row, c.idxInfo, c.fieldTps) + handle, err := buildHandle(hdDt, c.tblInfo, c.pkInfo, sctx) + if err != nil { + return nil, false, errors.Trace(err) + } + rsData := tables.TryGetHandleRestoredDataWrapper(c.tblInfo, hdDt, nil, c.idxInfo) + buf = append(buf, &indexRecord{handle: handle, key: nil, vals: idxDt, rsData: rsData, skip: false}) + if len(buf) >= batchCnt { + return buf, false, nil + } + } + } +} + +func buildDAGPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.DAGRequest, error) { + dagReq := &tipb.DAGRequest{} + dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(sCtx.GetSessionVars().Location()) + sc := sCtx.GetSessionVars().StmtCtx + dagReq.Flags = sc.PushDownFlags() + for i := range colInfos { + dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i)) + } + execPB, err := constructTableScanPB(sCtx, tblInfo, colInfos) + if err != nil { + return nil, err + } + dagReq.Executors = append(dagReq.Executors, execPB) + distsql.SetEncodeType(sCtx, dagReq) + return dagReq, nil +} + +func constructTableScanPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.Executor, error) { + tblScan := tables.BuildTableScanFromInfos(tblInfo, colInfos) + tblScan.TableId = tblInfo.ID + err := tables.SetPBColumnsDefaultValue(sCtx, tblScan.Columns, colInfos) + return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan}, err +} + +func buildHandleColInfoAndFieldTypes(tbInfo *model.TableInfo) ([]*model.ColumnInfo, []*types.FieldType, *model.IndexInfo) { + if tbInfo.PKIsHandle { + for i := range tbInfo.Columns { + if mysql.HasPriKeyFlag(tbInfo.Columns[i].GetFlag()) { + return []*model.ColumnInfo{tbInfo.Columns[i]}, []*types.FieldType{&tbInfo.Columns[i].FieldType}, nil + } + } + } else if tbInfo.IsCommonHandle { + primaryIdx := tables.FindPrimaryIndex(tbInfo) + pkCols := make([]*model.ColumnInfo, 0, len(primaryIdx.Columns)) + pkFts := make([]*types.FieldType, 0, len(primaryIdx.Columns)) + for _, pkCol := range primaryIdx.Columns { + pkCols = append(pkCols, tbInfo.Columns[pkCol.Offset]) + pkFts = append(pkFts, &tbInfo.Columns[pkCol.Offset].FieldType) + } + return pkCols, pkFts, primaryIdx + } + extra := model.NewExtraHandleColInfo() + return []*model.ColumnInfo{extra}, []*types.FieldType{&extra.FieldType}, nil +} + +func extractIdxValsAndHandle(row chunk.Row, idxInfo *model.IndexInfo, fieldTps []*types.FieldType) ([]types.Datum, []types.Datum) { + datumBuf := make([]types.Datum, 0, len(fieldTps)) + idxColLen := len(idxInfo.Columns) + for i, ft := range fieldTps { + datumBuf = append(datumBuf, row.GetDatum(i, ft)) + } + return datumBuf[:idxColLen], datumBuf[idxColLen:] +} + +func buildHandle(pkDts []types.Datum, tblInfo *model.TableInfo, + pkInfo *model.IndexInfo, stmtCtx *stmtctx.StatementContext) (kv.Handle, error) { + if tblInfo.IsCommonHandle { + tablecodec.TruncateIndexValues(tblInfo, pkInfo, pkDts) + handleBytes, err := codec.EncodeKey(stmtCtx, nil, pkDts...) + if err != nil { + return nil, err + } + return kv.NewCommonHandle(handleBytes) + } + return kv.IntHandle(pkDts[0].GetInt64()), nil +} diff --git a/ddl/index_cop_test.go b/ddl/index_cop_test.go new file mode 100644 index 0000000000000..7f18d415ef8f2 --- /dev/null +++ b/ddl/index_cop_test.go @@ -0,0 +1,102 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl_test + +import ( + "context" + "fmt" + "strconv" + "testing" + + "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/types" + "github.com/stretchr/testify/require" +) + +func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + testFetchRows := func(db, tb, idx string) ([]kv.Handle, [][]types.Datum) { + tbl, err := dom.InfoSchema().TableByName(model.NewCIStr(db), model.NewCIStr(tb)) + require.NoError(t, err) + tblInfo := tbl.Meta() + idxInfo := tblInfo.FindIndexByName(idx) + copCtx := ddl.NewCopContext4Test(tblInfo, idxInfo, tk.Session()) + startKey := tbl.RecordPrefix() + endKey := startKey.PrefixNext() + txn, err := store.Begin() + require.NoError(t, err) + idxRec, _, done, err := ddl.FetchRowsFromCop4Test(context.Background(), copCtx, startKey, endKey, + txn.StartTS(), nil, 10) + require.NoError(t, err) + require.True(t, done) + require.NoError(t, txn.Rollback()) + + handles := make([]kv.Handle, 0, len(idxRec)) + values := make([][]types.Datum, 0, len(idxRec)) + for _, rec := range idxRec { + handles = append(handles, rec.GetHandle()) + values = append(values, rec.GetIndexValues()) + } + return handles, values + } + + // Test nonclustered primary key table. + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a bigint, b int, index idx (b));") + for i := 0; i < 8; i++ { + tk.MustExec("insert into t values (?, ?)", i, i) + } + hds, vals := testFetchRows("test", "t", "idx") + require.Len(t, hds, 8) + for i := 0; i < 8; i++ { + require.Equal(t, hds[i].IntValue(), int64(i+1)) + require.Len(t, vals[i], 1) + require.Equal(t, vals[i][0].GetInt64(), int64(i)) + } + + // Test clustered primary key table(pk_is_handle). + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a bigint primary key, b int, index idx (b));") + for i := 0; i < 8; i++ { + tk.MustExec("insert into t values (?, ?)", i, i) + } + hds, vals = testFetchRows("test", "t", "idx") + require.Len(t, hds, 8) + for i := 0; i < 8; i++ { + require.Equal(t, hds[i].IntValue(), int64(i)) + require.Len(t, vals[i], 1) + require.Equal(t, vals[i][0].GetInt64(), int64(i)) + } + + // Test clustered primary key table(common_handle). + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a varchar(10), b int, c char(10), primary key (a, c) clustered, index idx (b));") + for i := 0; i < 8; i++ { + tk.MustExec("insert into t values (?, ?, ?)", strconv.Itoa(i), i, strconv.Itoa(i)) + } + hds, vals = testFetchRows("test", "t", "idx") + require.Len(t, hds, 8) + for i := 0; i < 8; i++ { + require.Equal(t, hds[i].String(), fmt.Sprintf("{%d, %d}", i, i)) + require.Len(t, vals[i], 1) + require.Equal(t, vals[i][0].GetInt64(), int64(i)) + } +}