From ea091117f9cfc758f0cf1cdb94ec9301640d79c7 Mon Sep 17 00:00:00 2001 From: Lynn Date: Wed, 31 Jul 2019 17:11:30 +0800 Subject: [PATCH 1/2] *: speed up the operation of "admin check table" (#8572) --- executor/admin_test.go | 116 +++++++++++++++- executor/builder.go | 55 +++++++- executor/distsql.go | 189 ++++++++++++++++++++------ executor/executor.go | 140 +++++++++++++------ executor/executor_test.go | 2 +- planner/core/common_plans.go | 7 +- planner/core/physical_plans.go | 5 + planner/core/planbuilder.go | 241 ++++++++++++++++++++++++--------- util/admin/admin.go | 39 ++++-- util/admin/admin_test.go | 8 +- 10 files changed, 630 insertions(+), 172 deletions(-) diff --git a/executor/admin_test.go b/executor/admin_test.go index f2317570e23b7..b5109ebae6bc2 100644 --- a/executor/admin_test.go +++ b/executor/admin_test.go @@ -399,6 +399,112 @@ func (s *testSuite2) TestAdminCleanupIndexMore(c *C) { tk.MustExec("admin check table admin_test") } +func (s *testSuite2) TestAdminCheckTableFailed(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists admin_test") + tk.MustExec("create table admin_test (c1 int, c2 int, c3 varchar(255) default '1', primary key(c1), key(c3), unique key(c2), key(c2, c3))") + tk.MustExec("insert admin_test (c1, c2, c3) values (-10, -20, 'y'), (-1, -10, 'z'), (1, 11, 'a'), (2, 12, 'b'), (5, 15, 'c'), (10, 20, 'd'), (20, 30, 'e')") + + // Make some corrupted index. Build the index information. + s.ctx = mock.NewContext() + s.ctx.Store = s.store + is := s.domain.InfoSchema() + dbName := model.NewCIStr("test") + tblName := model.NewCIStr("admin_test") + tbl, err := is.TableByName(dbName, tblName) + c.Assert(err, IsNil) + tblInfo := tbl.Meta() + idxInfo := tblInfo.Indices[1] + indexOpr := tables.NewIndex(tblInfo.ID, tblInfo, idxInfo) + sc := s.ctx.GetSessionVars().StmtCtx + tk.Se.GetSessionVars().IndexLookupSize = 3 + tk.Se.GetSessionVars().MaxChunkSize = 3 + + // Reduce one row of index. + // Table count > index count. + // Index c2 is missing 11. + txn, err := s.store.Begin() + c.Assert(err, IsNil) + err = indexOpr.Delete(sc, txn, types.MakeDatums(-10), -1, nil) + c.Assert(err, IsNil) + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) + _, err = tk.Exec("admin check table admin_test") + c.Assert(err.Error(), Equals, + "[executor:8003]admin_test err:[admin:1]index: != record:&admin.RecordData{Handle:-1, Values:[]types.Datum{types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:-10, b:[]uint8(nil), x:interface {}(nil)}}}") + c.Assert(executor.ErrAdminCheckTable.Equal(err), IsTrue) + r := tk.MustQuery("admin recover index admin_test c2") + r.Check(testkit.Rows("1 7")) + tk.MustExec("admin check table admin_test") + + // Add one row of index. + // Table count < index count. + // Index c2 has one more values ​​than table data: 0, and the handle 0 hasn't correlative record. + txn, err = s.store.Begin() + c.Assert(err, IsNil) + _, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(0), 0) + c.Assert(err, IsNil) + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) + _, err = tk.Exec("admin check table admin_test") + c.Assert(err.Error(), Equals, "handle 0, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:0, b:[]uint8(nil), x:interface {}(nil)} != record:") + + // Add one row of index. + // Table count < index count. + // Index c2 has two more values ​​than table data: 10, 13, and these handles have correlative record. + txn, err = s.store.Begin() + c.Assert(err, IsNil) + err = indexOpr.Delete(sc, txn, types.MakeDatums(0), 0, nil) + c.Assert(err, IsNil) + // Make sure the index value "19" is smaller "21". Then we scan to "19" before "21". + _, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(19), 10) + c.Assert(err, IsNil) + _, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(13), 2) + c.Assert(err, IsNil) + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) + _, err = tk.Exec("admin check table admin_test") + c.Assert(err.Error(), Equals, "col c2, handle 2, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:13, b:[]uint8(nil), x:interface {}(nil)} != record:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:12, b:[]uint8(nil), x:interface {}(nil)}") + + // Table count = index count. + // Two indices have the same handle. + txn, err = s.store.Begin() + c.Assert(err, IsNil) + err = indexOpr.Delete(sc, txn, types.MakeDatums(13), 2, nil) + c.Assert(err, IsNil) + err = indexOpr.Delete(sc, txn, types.MakeDatums(12), 2, nil) + c.Assert(err, IsNil) + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) + _, err = tk.Exec("admin check table admin_test") + c.Assert(err.Error(), Equals, "col c2, handle 10, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:19, b:[]uint8(nil), x:interface {}(nil)} != record:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:20, b:[]uint8(nil), x:interface {}(nil)}") + + // Table count = index count. + // Index c2 has one line of data is 19, the corresponding table data is 20. + txn, err = s.store.Begin() + c.Assert(err, IsNil) + _, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(12), 2) + c.Assert(err, IsNil) + err = indexOpr.Delete(sc, txn, types.MakeDatums(20), 10, nil) + c.Assert(err, IsNil) + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) + _, err = tk.Exec("admin check table admin_test") + c.Assert(err.Error(), Equals, "col c2, handle 10, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:19, b:[]uint8(nil), x:interface {}(nil)} != record:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:20, b:[]uint8(nil), x:interface {}(nil)}") + + // Recover records. + txn, err = s.store.Begin() + c.Assert(err, IsNil) + err = indexOpr.Delete(sc, txn, types.MakeDatums(19), 10, nil) + c.Assert(err, IsNil) + _, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(20), 10) + c.Assert(err, IsNil) + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) + tk.MustExec("admin check table admin_test") +} + func (s *testSuite1) TestAdminCheckTable(c *C) { // test NULL value. tk := testkit.NewTestKit(c, s.store) @@ -455,16 +561,16 @@ func (s *testSuite1) TestAdminCheckTable(c *C) { // Test index in virtual generated column. tk.MustExec(`drop table if exists test`) - tk.MustExec(`create table test ( b json , c int as (JSON_EXTRACT(b,'$.d')) , index idxc(c));`) + tk.MustExec(`create table test ( b json , c int as (JSON_EXTRACT(b,'$.d')), index idxc(c));`) tk.MustExec(`INSERT INTO test set b='{"d": 100}';`) tk.MustExec(`admin check table test;`) // Test prefix index. tk.MustExec(`drop table if exists t`) tk.MustExec(`CREATE TABLE t ( - ID CHAR(32) NOT NULL, - name CHAR(32) NOT NULL, - value CHAR(255), - INDEX indexIDname (ID(8),name(8)));`) + ID CHAR(32) NOT NULL, + name CHAR(32) NOT NULL, + value CHAR(255), + INDEX indexIDname (ID(8),name(8)));`) tk.MustExec(`INSERT INTO t VALUES ('keyword','urlprefix','text/ /text');`) tk.MustExec(`admin check table t;`) diff --git a/executor/builder.go b/executor/builder.go index 88c02a2dbae11..2a6cd62f48023 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -306,8 +306,8 @@ func (b *executorBuilder) buildCheckIndex(v *plannercore.CheckIndex) Executor { b.err = err return nil } - readerExec.ranges = ranger.FullRange() - readerExec.isCheckOp = true + + buildIndexLookUpChecker(b, v.IndexLookUpReader, readerExec) e := &CheckIndexExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), @@ -320,12 +320,59 @@ func (b *executorBuilder) buildCheckIndex(v *plannercore.CheckIndex) Executor { return e } +// buildIndexLookUpChecker builds check information to IndexLookUpReader. +func buildIndexLookUpChecker(b *executorBuilder, readerPlan *plannercore.PhysicalIndexLookUpReader, + readerExec *IndexLookUpExecutor) { + is := readerPlan.IndexPlans[0].(*plannercore.PhysicalIndexScan) + readerExec.dagPB.OutputOffsets = make([]uint32, 0, len(is.Index.Columns)) + for i := 0; i <= len(is.Index.Columns); i++ { + readerExec.dagPB.OutputOffsets = append(readerExec.dagPB.OutputOffsets, uint32(i)) + } + readerExec.ranges = ranger.FullRange() + ts := readerPlan.TablePlans[0].(*plannercore.PhysicalTableScan) + readerExec.handleIdx = ts.HandleIdx + + tps := make([]*types.FieldType, 0, len(is.Columns)+1) + for _, col := range is.Columns { + tps = append(tps, &col.FieldType) + } + tps = append(tps, types.NewFieldType(mysql.TypeLonglong)) + readerExec.checkIndexValue = &checkIndexValue{genExprs: is.GenExprs, idxColTps: tps} + + colNames := make([]string, 0, len(is.Columns)) + for _, col := range is.Columns { + colNames = append(colNames, col.Name.O) + } + var err error + readerExec.idxTblCols, err = table.FindCols(readerExec.table.Cols(), colNames, true) + if err != nil { + b.err = errors.Trace(err) + return + } +} + func (b *executorBuilder) buildCheckTable(v *plannercore.CheckTable) Executor { + readerExecs := make([]*IndexLookUpExecutor, 0, len(v.IndexLookUpReaders)) + for _, readerPlan := range v.IndexLookUpReaders { + readerExec, err := buildNoRangeIndexLookUpReader(b, readerPlan) + if err != nil { + b.err = errors.Trace(err) + return nil + } + buildIndexLookUpChecker(b, readerPlan, readerExec) + + readerExecs = append(readerExecs, readerExec) + } + e := &CheckTableExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), - tables: v.Tables, + dbName: v.DBName, + tblInfo: v.TblInfo, + indices: v.Indices, is: b.is, - genExprs: v.GenExprs, + srcs: readerExecs, + exitCh: make(chan struct{}), + retCh: make(chan error, len(v.Indices)), } return e } diff --git a/executor/distsql.go b/executor/distsql.go index f24658531d43d..e3c4bfa6d2cdb 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -38,6 +38,7 @@ import ( "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" @@ -63,6 +64,7 @@ type lookupTableTask struct { handles []int64 rowIdx []int // rowIdx represents the handle index for every row. Only used when keep order. rows []chunk.Row + idxRows *chunk.Chunk cursor int doneCh chan error @@ -72,6 +74,9 @@ type lookupTableTask struct { // The handles fetched from index is originally ordered by index, but we need handles to be ordered by itself // to do table request. indexOrder map[int64]int + // duplicatedIndexOrder map likes indexOrder. But it's used when checkIndexValue isn't nil and + // the same handle of index has multiple values. + duplicatedIndexOrder map[int64]int // memUsage records the memory usage of this task calculated by table worker. // memTracker is used to release memUsage after task is done and unused. @@ -376,8 +381,8 @@ type IndexLookUpExecutor struct { // memTracker is used to track the memory usage of this executor. memTracker *memory.Tracker - // isCheckOp is used to determine whether we need to check the consistency of the index data. - isCheckOp bool + // checkIndexValue is used to check the consistency of the index data. + *checkIndexValue corColInIdxSide bool idxPlans []plannercore.PhysicalPlan @@ -388,6 +393,12 @@ type IndexLookUpExecutor struct { colLens []int } +type checkIndexValue struct { + idxColTps []*types.FieldType + idxTblCols []*table.Column + genExprs map[model.TableColumnID]expression.Expression +} + // Open implements the Executor Open interface. func (e *IndexLookUpExecutor) Open(ctx context.Context) error { var err error @@ -472,21 +483,26 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k if err != nil { return err } + tps := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} + if e.checkIndexValue != nil { + tps = e.idxColTps + } // Since the first read only need handle information. So its returned col is only 1. - result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, e.feedback, getPhysicalPlanIDs(e.idxPlans)) + result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, tps, e.feedback, getPhysicalPlanIDs(e.idxPlans)) if err != nil { return err } result.Fetch(ctx) worker := &indexWorker{ - idxLookup: e, - workCh: workCh, - finished: e.finished, - resultCh: e.resultCh, - keepOrder: e.keepOrder, - batchSize: initBatchSize, - maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, - maxChunkSize: e.maxChunkSize, + idxLookup: e, + workCh: workCh, + finished: e.finished, + resultCh: e.resultCh, + keepOrder: e.keepOrder, + batchSize: initBatchSize, + checkIndexValue: e.checkIndexValue, + maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, + maxChunkSize: e.maxChunkSize, } if worker.batchSize > worker.maxBatchSize { worker.batchSize = worker.maxBatchSize @@ -522,13 +538,13 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha e.tblWorkerWg.Add(lookupConcurrencyLimit) for i := 0; i < lookupConcurrencyLimit; i++ { worker := &tableWorker{ - idxLookup: e, - workCh: workCh, - finished: e.finished, - buildTblReader: e.buildTableReader, - keepOrder: e.keepOrder, - handleIdx: e.handleIdx, - isCheckOp: e.isCheckOp, + idxLookup: e, + workCh: workCh, + finished: e.finished, + buildTblReader: e.buildTableReader, + keepOrder: e.keepOrder, + handleIdx: e.handleIdx, + checkIndexValue: e.checkIndexValue, memTracker: memory.NewTracker(stringutil.MemoizeStr(func() string { return "TableWorker_" + strconv.Itoa(i) }), e.ctx.GetSessionVars().MemQuotaIndexLookupReader), } @@ -645,6 +661,9 @@ type indexWorker struct { batchSize int maxBatchSize int maxChunkSize int + + // checkIndexValue is used to check the consistency of the index data. + *checkIndexValue } // fetchHandles fetches a batch of handles from index data and builds the index lookup tasks. @@ -668,9 +687,14 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes } } }() - chk := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.idxLookup.maxChunkSize) + var chk *chunk.Chunk + if w.checkIndexValue != nil { + chk = chunk.NewChunkWithCapacity(w.idxColTps, w.maxChunkSize) + } else { + chk = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.idxLookup.maxChunkSize) + } for { - handles, err := w.extractTaskHandles(ctx, chk, result) + handles, retChunk, err := w.extractTaskHandles(ctx, chk, result) if err != nil { doneCh := make(chan error, 1) doneCh <- err @@ -683,7 +707,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes return count, nil } count += int64(len(handles)) - task := w.buildTableTask(handles) + task := w.buildTableTask(handles, retChunk) select { case <-ctx.Done(): return count, nil @@ -695,30 +719,40 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes } } -func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) (handles []int64, err error) { +func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) ( + handles []int64, retChk *chunk.Chunk, err error) { + handleOffset := chk.NumCols() - 1 handles = make([]int64, 0, w.batchSize) for len(handles) < w.batchSize { chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize) - err = idxResult.Next(ctx, chk) + err = errors.Trace(idxResult.Next(ctx, chk)) if err != nil { - return handles, err + return handles, nil, err } if chk.NumRows() == 0 { - return handles, nil + return handles, retChk, nil } for i := 0; i < chk.NumRows(); i++ { - handles = append(handles, chk.GetRow(i).GetInt64(0)) + h := chk.GetRow(i).GetInt64(handleOffset) + handles = append(handles, h) + } + if w.checkIndexValue != nil { + if retChk == nil { + retChk = chunk.NewChunkWithCapacity(w.idxColTps, w.batchSize) + } + retChk.Append(chk, 0, chk.NumRows()) } } w.batchSize *= 2 if w.batchSize > w.maxBatchSize { w.batchSize = w.maxBatchSize } - return handles, nil + return handles, retChk, nil } -func (w *indexWorker) buildTableTask(handles []int64) *lookupTableTask { +func (w *indexWorker) buildTableTask(handles []int64, retChk *chunk.Chunk) *lookupTableTask { var indexOrder map[int64]int + var duplicatedIndexOrder map[int64]int if w.keepOrder { // Save the index order. indexOrder = make(map[int64]int, len(handles)) @@ -726,10 +760,27 @@ func (w *indexWorker) buildTableTask(handles []int64) *lookupTableTask { indexOrder[h] = i } } + + if w.checkIndexValue != nil { + // Save the index order. + indexOrder = make(map[int64]int, len(handles)) + duplicatedIndexOrder = make(map[int64]int) + for i, h := range handles { + if _, ok := indexOrder[h]; ok { + duplicatedIndexOrder[h] = i + } else { + indexOrder[h] = i + } + } + } + task := &lookupTableTask{ - handles: handles, - indexOrder: indexOrder, + handles: handles, + indexOrder: indexOrder, + duplicatedIndexOrder: duplicatedIndexOrder, + idxRows: retChk, } + task.doneCh = make(chan error, 1) return task } @@ -746,8 +797,8 @@ type tableWorker struct { // memTracker is used to track the memory usage of this executor. memTracker *memory.Tracker - // isCheckOp is used to determine whether we need to check the consistency of the index data. - isCheckOp bool + // checkIndexValue is used to check the consistency of the index data. + *checkIndexValue } // pickAndExecTask picks tasks from workCh, and execute them. @@ -780,6 +831,66 @@ func (w *tableWorker) pickAndExecTask(ctx context.Context) { } } +func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, tableReader Executor) error { + chk := newFirstChunk(tableReader) + tblInfo := w.idxLookup.table.Meta() + vals := make([]types.Datum, 0, len(w.idxTblCols)) + for { + err := tableReader.Next(ctx, chk) + if err != nil { + return errors.Trace(err) + } + if chk.NumRows() == 0 { + for h := range task.indexOrder { + idxRow := task.idxRows.GetRow(task.indexOrder[h]) + return errors.Errorf("handle %#v, index:%#v != record:%#v", h, idxRow.GetDatum(0, w.idxColTps[0]), nil) + } + break + } + + tblReaderExec := tableReader.(*TableReaderExecutor) + iter := chunk.NewIterator4Chunk(chk) + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + handle := row.GetInt64(w.handleIdx) + offset, ok := task.indexOrder[handle] + if !ok { + offset = task.duplicatedIndexOrder[handle] + } + delete(task.indexOrder, handle) + idxRow := task.idxRows.GetRow(offset) + vals = vals[:0] + for i, col := range w.idxTblCols { + if col.IsGenerated() && !col.GeneratedStored { + expr := w.genExprs[model.TableColumnID{TableID: tblInfo.ID, ColumnID: col.ID}] + // Eval the column value + val, err := expr.Eval(row) + if err != nil { + return errors.Trace(err) + } + val, err = table.CastValue(tblReaderExec.ctx, val, col.ColumnInfo) + if err != nil { + return errors.Trace(err) + } + vals = append(vals, val) + } else { + vals = append(vals, row.GetDatum(i, &col.FieldType)) + } + } + vals = tables.TruncateIndexValuesIfNeeded(tblInfo, w.idxLookup.index, vals) + for i, val := range vals { + col := w.idxTblCols[i] + tp := &col.FieldType + ret := chunk.Compare(idxRow, i, &val) + if ret != 0 { + return errors.Errorf("col %s, handle %#v, index:%#v != record:%#v", col.Name, handle, idxRow.GetDatum(i, tp), val) + } + } + } + } + + return nil +} + // executeTask executes the table look up tasks. We will construct a table reader and send request by handles. // Then we hold the returning rows and finish this task. func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) error { @@ -790,6 +901,10 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er } defer terror.Call(tableReader.Close) + if w.checkIndexValue != nil { + return w.compareData(ctx, task, tableReader) + } + task.memTracker = w.memTracker memUsage := int64(cap(task.handles) * 8) task.memUsage = memUsage @@ -831,16 +946,6 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er } if handleCnt != len(task.rows) { - if w.isCheckOp { - obtainedHandlesMap := make(map[int64]struct{}, len(task.rows)) - for _, row := range task.rows { - handle := row.GetInt64(w.handleIdx) - obtainedHandlesMap[handle] = struct{}{} - } - return errors.Errorf("inconsistent index %s handle count %d isn't equal to value count %d, missing handles %v in a batch", - w.idxLookup.index.Name.O, handleCnt, len(task.rows), GetLackHandles(task.handles, obtainedHandlesMap)) - } - if len(w.idxLookup.tblPlans) == 1 { obtainedHandlesMap := make(map[int64]struct{}, len(task.rows)) for _, row := range task.rows { diff --git a/executor/executor.go b/executor/executor.go index bbe9e99d8d0fa..10b0e26ac7e22 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/admin" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/execdetails" @@ -451,11 +452,14 @@ func getTableName(is infoschema.InfoSchema, id int64) string { type CheckTableExec struct { baseExecutor - tables []*ast.TableName - done bool - is infoschema.InfoSchema - - genExprs map[model.TableColumnID]expression.Expression + dbName string + tblInfo *model.TableInfo + indices []table.Index + srcs []*IndexLookUpExecutor + done bool + is infoschema.InfoSchema + exitCh chan struct{} + retCh chan error } // Open implements the Executor Open interface. @@ -463,63 +467,121 @@ func (e *CheckTableExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return err } + for _, src := range e.srcs { + if err := src.Open(ctx); err != nil { + return errors.Trace(err) + } + } e.done = false return nil } +func (e *CheckTableExec) checkIndexHandle(ctx context.Context, num int, src *IndexLookUpExecutor) error { + cols := src.schema.Columns + retFieldTypes := make([]*types.FieldType, len(cols)) + for i := range cols { + retFieldTypes[i] = cols[i].RetType + } + chk := chunk.New(retFieldTypes, e.initCap, e.maxChunkSize) + + var err error + for { + err = src.Next(ctx, chk) + if err != nil { + break + } + if chk.NumRows() == 0 { + break + } + + select { + case <-e.exitCh: + return nil + default: + } + } + e.retCh <- errors.Trace(err) + return errors.Trace(err) +} + +func (e *CheckTableExec) handlePanic(r interface{}) { + if r != nil { + e.retCh <- errors.Errorf("%v", r) + } +} + // Next implements the Executor Next interface. func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error { - if e.done { + if e.done || len(e.srcs) == 0 { return nil } defer func() { e.done = true }() - for _, t := range e.tables { - dbName := t.DBInfo.Name - tb, err := e.is.TableByName(dbName, t.Name) - if err != nil { - return err + + idxNames := make([]string, 0, len(e.indices)) + for _, idx := range e.indices { + idxNames = append(idxNames, idx.Meta().Name.O) + } + greater, idxOffset, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.tblInfo.Name.O, idxNames) + if err != nil { + tbl := e.srcs[idxOffset].table + if greater == admin.IdxCntGreater { + err = e.checkIndexHandle(ctx, idxOffset, e.srcs[idxOffset]) + } else if greater == admin.TblCntGreater { + err = e.checkTableRecord(tbl, idxOffset) } - if tb.Meta().GetPartitionInfo() != nil { - err = e.doCheckPartitionedTable(tb.(table.PartitionedTable)) - } else { - err = e.doCheckTable(tb) + if err != nil && admin.ErrDataInConsistent.Equal(err) { + return ErrAdminCheckTable.GenWithStack("%v err:%v", tbl.Meta().Name, err) } - if err != nil { - logutil.Logger(ctx).Warn("check table failed", zap.String("tableName", t.Name.O), zap.Error(err)) - if admin.ErrDataInConsistent.Equal(err) { - return ErrAdminCheckTable.GenWithStack("%v err:%v", t.Name, err) - } + return errors.Trace(err) + } + + // The number of table rows is equal to the number of index rows. + // TODO: Make the value of concurrency adjustable. And we can consider the number of records. + concurrency := 3 + wg := sync.WaitGroup{} + for i := range e.srcs { + wg.Add(1) + go func(num int) { + defer wg.Done() + util.WithRecovery(func() { + err1 := e.checkIndexHandle(ctx, num, e.srcs[num]) + if err1 != nil { + logutil.Logger(ctx).Info("check index handle failed", zap.Error(err)) + } + }, e.handlePanic) + }(i) - return errors.Errorf("%v err:%v", t.Name, err) + if (i+1)%concurrency == 0 { + wg.Wait() } } - return nil -} -func (e *CheckTableExec) doCheckPartitionedTable(tbl table.PartitionedTable) error { - info := tbl.Meta().GetPartitionInfo() - for _, def := range info.Definitions { - pid := def.ID - partition := tbl.GetPartition(pid) - if err := e.doCheckTable(partition); err != nil { - return err + for i := 0; i < len(e.srcs); i++ { + err = <-e.retCh + if err != nil { + return errors.Trace(err) } } return nil } -func (e *CheckTableExec) doCheckTable(tbl table.Table) error { +func (e *CheckTableExec) checkTableRecord(tbl table.Table, idxOffset int) error { + idx := e.indices[idxOffset] + genExprs := e.srcs[idxOffset].genExprs txn, err := e.ctx.Txn(true) if err != nil { return err } - for _, idx := range tbl.Indices() { - if idx.Meta().State != model.StatePublic { - continue - } - err := admin.CompareIndexData(e.ctx, txn, tbl, idx, e.genExprs) - if err != nil { - return err + if tbl.Meta().GetPartitionInfo() == nil { + return admin.CheckRecordAndIndex(e.ctx, txn, tbl, idx, genExprs) + } + + info := tbl.Meta().GetPartitionInfo() + for _, def := range info.Definitions { + pid := def.ID + partition := tbl.(table.PartitionedTable).GetPartition(pid) + if err := admin.CheckRecordAndIndex(e.ctx, txn, partition, idx, genExprs); err != nil { + return errors.Trace(err) } } return nil @@ -563,7 +625,7 @@ func (e *CheckIndexExec) Next(ctx context.Context, req *chunk.Chunk) error { } defer func() { e.done = true }() - err := admin.CheckIndicesCount(e.ctx, e.dbName, e.tableName, []string{e.idxName}) + _, _, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.tableName, []string{e.idxName}) if err != nil { return err } diff --git a/executor/executor_test.go b/executor/executor_test.go index ef0dc69f208e5..d22c32d67f78f 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -3061,7 +3061,7 @@ func (s *testSuite) TestCheckIndex(c *C) { c.Assert(err, IsNil) _, err = se.Execute(context.Background(), "admin check index t c") c.Assert(err, NotNil) - c.Assert(strings.Contains(err.Error(), "isn't equal to value count"), IsTrue) + c.Assert(err.Error(), Equals, "handle 3, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:30, b:[]uint8(nil), x:interface {}(nil)} != record:") // set data to: // index data (handle, data): (1, 10), (2, 20), (3, 30), (4, 40) diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index cb45d2fe126eb..39b1a491f23eb 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -74,9 +74,10 @@ type ShowNextRowID struct { type CheckTable struct { baseSchemaProducer - Tables []*ast.TableName - - GenExprs map[model.TableColumnID]expression.Expression + DBName string + TblInfo *model.TableInfo + Indices []table.Index + IndexLookUpReaders []*PhysicalIndexLookUpReader } // RecoverIndex is used for backfilling corrupted index data. diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 67048b70f7e7c..a43f3fa03f340 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -119,6 +119,8 @@ type PhysicalIndexScan struct { // The index scan may be on a partition. isPartition bool physicalTableID int64 + + GenExprs map[model.TableColumnID]expression.Expression } // PhysicalMemTable reads memory table. @@ -160,6 +162,9 @@ type PhysicalTableScan struct { physicalTableID int64 rangeDecidedBy []*expression.Column + + // HandleIdx is the index of handle, which is only used for admin check table. + HandleIdx int } // IsPartition returns true and partition ID if it's actually a partition. diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 4c7287d2cc503..b8508ae2c7a7f 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -15,6 +15,7 @@ package core import ( "bytes" + "context" "fmt" "strings" @@ -37,7 +38,9 @@ import ( "github.com/pingcap/tidb/types" driver "github.com/pingcap/tidb/types/parser_driver" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/ranger" + "go.uber.org/zap" ) type visitInfo struct { @@ -564,42 +567,7 @@ func (b *PlanBuilder) buildCheckIndex(dbName model.CIStr, as *ast.AdminStmt) (Pl return nil, errors.Errorf("index %s state %s isn't public", as.Index, idx.State) } - id := 1 - columns := make([]*model.ColumnInfo, 0, len(idx.Columns)) - schema := expression.NewSchema(make([]*expression.Column, 0, len(idx.Columns))...) - for _, idxCol := range idx.Columns { - for _, col := range tblInfo.Columns { - if idxCol.Name.L == col.Name.L { - columns = append(columns, col) - schema.Append(&expression.Column{ - ColName: col.Name, - UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), - RetType: &col.FieldType, - }) - } - } - } - is := PhysicalIndexScan{ - Table: tblInfo, - TableAsName: &tblName.Name, - DBName: dbName, - Columns: columns, - Index: idx, - dataSourceSchema: schema, - Ranges: ranger.FullRange(), - KeepOrder: false, - }.Init(b.ctx) - is.stats = &property.StatsInfo{} - cop := &copTask{indexPlan: is} - // It's double read case. - ts := PhysicalTableScan{Columns: columns, Table: is.Table}.Init(b.ctx) - ts.SetSchema(is.dataSourceSchema) - cop.tablePlan = ts - is.initSchema(id, idx, true) - t := finishCopTask(b.ctx, cop) - - rootT := t.(*rootTask) - return rootT.p, nil + return b.buildPhysicalIndexLookUpReader(dbName, tbl, idx, 1) } func (b *PlanBuilder) buildAdmin(as *ast.AdminStmt) (Plan, error) { @@ -683,43 +651,188 @@ func (b *PlanBuilder) buildAdmin(as *ast.AdminStmt) (Plan, error) { return ret, nil } -func (b *PlanBuilder) buildAdminCheckTable(as *ast.AdminStmt) (*CheckTable, error) { - p := &CheckTable{Tables: as.Tables} - p.GenExprs = make(map[model.TableColumnID]expression.Expression, len(p.Tables)) - +// getGenExprs gets generated expressions map. +func (b *PlanBuilder) getGenExprs(dbName model.CIStr, tbl table.Table, idx *model.IndexInfo) ( + map[model.TableColumnID]expression.Expression, error) { + tblInfo := tbl.Meta() + genExprsMap := make(map[model.TableColumnID]expression.Expression) + exprs := make([]expression.Expression, 0, len(tbl.Cols())) + genExprIdxs := make([]model.TableColumnID, len(tbl.Cols())) mockTablePlan := LogicalTableDual{}.Init(b.ctx) - for _, tbl := range p.Tables { - tableInfo := tbl.TableInfo - schema := expression.TableInfo2SchemaWithDBName(b.ctx, tbl.Schema, tableInfo) - table, ok := b.is.TableByID(tableInfo.ID) - if !ok { - return nil, infoschema.ErrTableNotExists.GenWithStackByArgs(tbl.DBInfo.Name.O, tableInfo.Name.O) + mockTablePlan.SetSchema(expression.TableInfo2SchemaWithDBName(b.ctx, dbName, tblInfo)) + for i, colExpr := range mockTablePlan.Schema().Columns { + col := tbl.Cols()[i] + var expr expression.Expression + expr = colExpr + if col.IsGenerated() && !col.GeneratedStored { + var err error + expr, _, err = b.rewrite(col.GeneratedExpr, mockTablePlan, nil, true) + if err != nil { + return nil, errors.Trace(err) + } + expr = expression.BuildCastFunction(b.ctx, expr, colExpr.GetType()) + found := false + for _, column := range idx.Columns { + if strings.EqualFold(col.Name.L, column.Name.L) { + found = true + break + } + } + if found { + genColumnID := model.TableColumnID{TableID: tblInfo.ID, ColumnID: col.ColumnInfo.ID} + genExprsMap[genColumnID] = expr + genExprIdxs[i] = genColumnID + } + } + exprs = append(exprs, expr) + } + // Re-iterate expressions to handle those virtual generated columns that refers to the other generated columns. + for i, expr := range exprs { + exprs[i] = expression.ColumnSubstitute(expr, mockTablePlan.Schema(), exprs) + if _, ok := genExprsMap[genExprIdxs[i]]; ok { + genExprsMap[genExprIdxs[i]] = exprs[i] } + } + return genExprsMap, nil +} - mockTablePlan.SetSchema(schema) +func (b *PlanBuilder) buildPhysicalIndexLookUpReader(dbName model.CIStr, tbl table.Table, idx *model.IndexInfo, id int) (Plan, error) { + genExprsMap, err := b.getGenExprs(dbName, tbl, idx) + if err != nil { + return nil, errors.Trace(err) + } - // Calculate generated columns. - columns := table.Cols() - for _, column := range columns { - if !column.IsGenerated() { - continue + // Get generated columns. + var genCols []*expression.Column + pkOffset := -1 + tblInfo := tbl.Meta() + colsMap := make(map[int64]struct{}) + schema := expression.NewSchema(make([]*expression.Column, 0, len(idx.Columns))...) + idxReaderCols := make([]*model.ColumnInfo, 0, len(idx.Columns)) + tblReaderCols := make([]*model.ColumnInfo, 0, len(tbl.Cols())) + for _, idxCol := range idx.Columns { + for _, col := range tblInfo.Columns { + if idxCol.Name.L == col.Name.L { + idxReaderCols = append(idxReaderCols, col) + tblReaderCols = append(tblReaderCols, col) + schema.Append(&expression.Column{ + ColName: col.Name, + UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), + RetType: &col.FieldType}) + colsMap[col.ID] = struct{}{} + if mysql.HasPriKeyFlag(col.Flag) { + pkOffset = len(tblReaderCols) - 1 + } } - columnName := &ast.ColumnName{Name: column.Name} - columnName.SetText(column.Name.O) - - colExpr, _, err := mockTablePlan.findColumn(columnName) - if err != nil { - return nil, err + genColumnID := model.TableColumnID{TableID: tblInfo.ID, ColumnID: col.ID} + if expr, ok := genExprsMap[genColumnID]; ok { + cols := expression.ExtractColumns(expr) + genCols = append(genCols, cols...) } - - expr, _, err := b.rewrite(column.GeneratedExpr, mockTablePlan, nil, true) - if err != nil { - return nil, err + } + } + // Add generated columns to tblSchema and tblReaderCols. + tblSchema := schema.Clone() + for _, col := range genCols { + if _, ok := colsMap[col.ID]; !ok { + c := table.FindCol(tbl.Cols(), col.ColName.O) + if c != nil { + col.Index = len(tblReaderCols) + tblReaderCols = append(tblReaderCols, c.ColumnInfo) + tblSchema.Append(&expression.Column{ + ColName: c.Name, + UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), + RetType: &c.FieldType}) + colsMap[c.ID] = struct{}{} + if mysql.HasPriKeyFlag(c.Flag) { + pkOffset = len(tblReaderCols) - 1 + } } - expr = expression.BuildCastFunction(b.ctx, expr, colExpr.GetType()) - p.GenExprs[model.TableColumnID{TableID: tableInfo.ID, ColumnID: column.ColumnInfo.ID}] = expr } } + if !tbl.Meta().PKIsHandle || pkOffset == -1 { + tblReaderCols = append(tblReaderCols, model.NewExtraHandleColInfo()) + handleCol := &expression.Column{ + DBName: dbName, + TblName: tblInfo.Name, + ColName: model.ExtraHandleName, + RetType: types.NewFieldType(mysql.TypeLonglong), + UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), + ID: model.ExtraHandleID, + } + tblSchema.Append(handleCol) + pkOffset = len(tblReaderCols) - 1 + } + + is := PhysicalIndexScan{ + Table: tblInfo, + TableAsName: &tblInfo.Name, + DBName: dbName, + Columns: idxReaderCols, + Index: idx, + dataSourceSchema: schema, + Ranges: ranger.FullRange(), + GenExprs: genExprsMap, + }.Init(b.ctx) + is.stats = property.NewSimpleStats(0) + // It's double read case. + ts := PhysicalTableScan{Columns: tblReaderCols, Table: is.Table}.Init(b.ctx) + ts.SetSchema(tblSchema) + cop := &copTask{indexPlan: is, tablePlan: ts} + ts.HandleIdx = pkOffset + is.initSchema(id, idx, true) + rootT := finishCopTask(b.ctx, cop).(*rootTask) + return rootT.p, nil +} + +func (b *PlanBuilder) buildPhysicalIndexLookUpReaders(dbName model.CIStr, tbl table.Table) ([]Plan, []table.Index, error) { + tblInfo := tbl.Meta() + // get index information + indices := make([]table.Index, 0, len(tblInfo.Indices)) + indexLookUpReaders := make([]Plan, 0, len(tblInfo.Indices)) + for i, idx := range tbl.Indices() { + idxInfo := idx.Meta() + if idxInfo.State != model.StatePublic { + logutil.Logger(context.Background()).Info("build physical index lookup reader, the index isn't public", + zap.String("index", idxInfo.Name.O), zap.Stringer("state", idxInfo.State), zap.String("table", tblInfo.Name.O)) + continue + } + indices = append(indices, idx) + reader, err := b.buildPhysicalIndexLookUpReader(dbName, tbl, idxInfo, i) + if err != nil { + return nil, nil, err + } + indexLookUpReaders = append(indexLookUpReaders, reader) + } + if len(indexLookUpReaders) == 0 { + return nil, nil, nil + } + return indexLookUpReaders, indices, nil +} + +func (b *PlanBuilder) buildAdminCheckTable(as *ast.AdminStmt) (*CheckTable, error) { + tbl := as.Tables[0] + p := &CheckTable{ + DBName: tbl.Schema.O, + TblInfo: tbl.TableInfo, + } + + tableInfo := as.Tables[0].TableInfo + table, ok := b.is.TableByID(tableInfo.ID) + if !ok { + return nil, infoschema.ErrTableNotExists.GenWithStackByArgs(tbl.DBInfo.Name.O, tableInfo.Name.O) + } + + readerPlans, indices, err := b.buildPhysicalIndexLookUpReaders(tbl.Schema, table) + if err != nil { + return nil, errors.Trace(err) + } + readers := make([]*PhysicalIndexLookUpReader, 0, len(readerPlans)) + for _, plan := range readerPlans { + readers = append(readers, plan.(*PhysicalIndexLookUpReader)) + } + p.Indices = indices + p.IndexLookUpReaders = readers return p, nil } diff --git a/util/admin/admin.go b/util/admin/admin.go index facf632b6b828..1b00b9c4a4fb8 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "io" + "math" "sort" "time" @@ -267,28 +268,46 @@ func getCount(ctx sessionctx.Context, sql string) (int64, error) { return rows[0].GetInt64(0), nil } +// Count greater Types +const ( + // TblCntGreater means that the number of table rows is more than the number of index rows. + TblCntGreater byte = 1 + // IdxCntGreater means that the number of index rows is more than the number of table rows. + IdxCntGreater byte = 2 +) + // CheckIndicesCount compares indices count with table count. +// It returns the count greater type, the index offset and an error. // It returns nil if the count from the index is equal to the count from the table columns, -// otherwise it returns an error with a different information. -func CheckIndicesCount(ctx sessionctx.Context, dbName, tableName string, indices []string) error { +// otherwise it returns an error and the corresponding index's offset. +func CheckIndicesCount(ctx sessionctx.Context, dbName, tableName string, indices []string) (byte, int, error) { // Add `` for some names like `table name`. sql := fmt.Sprintf("SELECT COUNT(*) FROM `%s`.`%s`", dbName, tableName) tblCnt, err := getCount(ctx, sql) if err != nil { - return errors.Trace(err) + return 0, 0, errors.Trace(err) } - for _, idx := range indices { + for i, idx := range indices { sql = fmt.Sprintf("SELECT COUNT(*) FROM `%s`.`%s` USE INDEX(`%s`)", dbName, tableName, idx) idxCnt, err := getCount(ctx, sql) if err != nil { - return errors.Trace(err) + return 0, i, errors.Trace(err) } - if tblCnt != idxCnt { - return errors.Errorf("table count %d != index(%s) count %d", tblCnt, idx, idxCnt) + logutil.Logger(context.Background()).Info("check indices count, table %s cnt %d, index %s cnt %d", + zap.String("table", tableName), zap.Int64("cnt", tblCnt), zap.Reflect("index", idx), zap.Int64("cnt", idxCnt)) + if tblCnt == idxCnt { + continue } - } - return nil + var ret byte + if tblCnt > idxCnt { + ret = TblCntGreater + } else if idxCnt > tblCnt { + ret = IdxCntGreater + } + return ret, i, errors.Errorf("table count %d != index(%s) count %d", tblCnt, idx, idxCnt) + } + return 0, 0, nil } // ScanIndexData scans the index handles and values in a limited number, according to the index information. @@ -444,7 +463,7 @@ func CheckRecordAndIndex(sessCtx sessionctx.Context, txn kv.Transaction, t table cols[i] = t.Cols()[col.Offset] } - startKey := t.RecordKey(0) + startKey := t.RecordKey(math.MinInt64) filterFunc := func(h1 int64, vals1 []types.Datum, cols []*table.Column) (bool, error) { for i, val := range vals1 { col := cols[i] diff --git a/util/admin/admin_test.go b/util/admin/admin_test.go index 1c30ffa90155b..97fe10063d5f7 100644 --- a/util/admin/admin_test.go +++ b/util/admin/admin_test.go @@ -458,7 +458,7 @@ func (s *testSuite) testIndex(c *C, ctx sessionctx.Context, dbName string, tb ta c.Assert(err, IsNil) idxNames := []string{idx.Meta().Name.L} - err = CheckIndicesCount(ctx, dbName, tb.Meta().Name.L, idxNames) + _, _, err = CheckIndicesCount(ctx, dbName, tb.Meta().Name.L, idxNames) c.Assert(err, IsNil) mockCtx := mock.NewContext() @@ -480,7 +480,7 @@ func (s *testSuite) testIndex(c *C, ctx sessionctx.Context, dbName string, tb ta diffMsg := newDiffRetError("index", record1, nil) c.Assert(err.Error(), DeepEquals, diffMsg) - err = CheckIndicesCount(ctx, dbName, tb.Meta().Name.L, idxNames) + _, _, err = CheckIndicesCount(ctx, dbName, tb.Meta().Name.L, idxNames) c.Assert(err, IsNil) // set data to: @@ -539,7 +539,7 @@ func (s *testSuite) testIndex(c *C, ctx sessionctx.Context, dbName string, tb ta diffMsg = newDiffRetError("index", record1, nil) c.Assert(err.Error(), DeepEquals, diffMsg) - err = CheckIndicesCount(ctx, dbName, tb.Meta().Name.L, idxNames) + _, _, err = CheckIndicesCount(ctx, dbName, tb.Meta().Name.L, idxNames) c.Assert(err.Error(), Equals, "table count 3 != index(c) count 4") // set data to: @@ -559,7 +559,7 @@ func (s *testSuite) testIndex(c *C, ctx sessionctx.Context, dbName string, tb ta diffMsg = newDiffRetError("index", nil, record1) c.Assert(err.Error(), DeepEquals, diffMsg) - err = CheckIndicesCount(ctx, dbName, tb.Meta().Name.L, idxNames) + _, _, err = CheckIndicesCount(ctx, dbName, tb.Meta().Name.L, idxNames) c.Assert(err.Error(), Equals, "table count 4 != index(c) count 3") } From 7b5233596e80fc4faaf786a50f0028b13b1c82f3 Mon Sep 17 00:00:00 2001 From: Lynn Date: Thu, 1 Aug 2019 18:52:59 +0800 Subject: [PATCH 2/2] executor: fix data race of "admin check table" (#11568) --- executor/admin_test.go | 50 +++++++++++++++++++++--------------------- executor/executor.go | 11 ++++++++++ 2 files changed, 36 insertions(+), 25 deletions(-) diff --git a/executor/admin_test.go b/executor/admin_test.go index b5109ebae6bc2..843d39a67c239 100644 --- a/executor/admin_test.go +++ b/executor/admin_test.go @@ -91,10 +91,10 @@ func (s *testSuite2) TestAdminRecoverIndex(c *C) { c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) - _, err = tk.Exec("admin check table admin_test") + err = tk.ExecToErr("admin check table admin_test") c.Assert(err, NotNil) c.Assert(executor.ErrAdminCheckTable.Equal(err), IsTrue) - _, err = tk.Exec("admin check index admin_test c2") + err = tk.ExecToErr("admin check index admin_test c2") c.Assert(err, NotNil) r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c2)") @@ -115,7 +115,7 @@ func (s *testSuite2) TestAdminRecoverIndex(c *C) { err = txn.Commit(context.Background()) c.Assert(err, IsNil) - _, err = tk.Exec("admin check index admin_test c2") + err = tk.ExecToErr("admin check index admin_test c2") c.Assert(err, NotNil) r = tk.MustQuery("admin recover index admin_test c2") r.Check(testkit.Rows("1 5")) @@ -137,9 +137,9 @@ func (s *testSuite2) TestAdminRecoverIndex(c *C) { err = txn.Commit(context.Background()) c.Assert(err, IsNil) - _, err = tk.Exec("admin check table admin_test") + err = tk.ExecToErr("admin check table admin_test") c.Assert(err, NotNil) - _, err = tk.Exec("admin check index admin_test c2") + err = tk.ExecToErr("admin check index admin_test c2") c.Assert(err, NotNil) r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c2)") @@ -261,9 +261,9 @@ func (s *testSuite2) TestAdminCleanupIndex(c *C) { err = txn.Commit(context.Background()) c.Assert(err, IsNil) - _, err = tk.Exec("admin check table admin_test") + err = tk.ExecToErr("admin check table admin_test") c.Assert(err, NotNil) - _, err = tk.Exec("admin check index admin_test c2") + err = tk.ExecToErr("admin check index admin_test c2") c.Assert(err, NotNil) r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c2)") r.Check(testkit.Rows("11")) @@ -273,9 +273,9 @@ func (s *testSuite2) TestAdminCleanupIndex(c *C) { r.Check(testkit.Rows("6")) tk.MustExec("admin check index admin_test c2") - _, err = tk.Exec("admin check table admin_test") + err = tk.ExecToErr("admin check table admin_test") c.Assert(err, NotNil) - _, err = tk.Exec("admin check index admin_test c3") + err = tk.ExecToErr("admin check index admin_test c3") c.Assert(err, NotNil) r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c3)") r.Check(testkit.Rows("9")) @@ -322,9 +322,9 @@ func (s *testSuite2) TestAdminCleanupIndexPKNotHandle(c *C) { err = txn.Commit(context.Background()) c.Assert(err, IsNil) - _, err = tk.Exec("admin check table admin_test") + err = tk.ExecToErr("admin check table admin_test") c.Assert(err, NotNil) - _, err = tk.Exec("admin check index admin_test `primary`") + err = tk.ExecToErr("admin check index admin_test `primary`") c.Assert(err, NotNil) r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(`primary`)") r.Check(testkit.Rows("6")) @@ -374,11 +374,11 @@ func (s *testSuite2) TestAdminCleanupIndexMore(c *C) { err = txn.Commit(context.Background()) c.Assert(err, IsNil) - _, err = tk.Exec("admin check table admin_test") + err = tk.ExecToErr("admin check table admin_test") c.Assert(err, NotNil) - _, err = tk.Exec("admin check index admin_test c1") + err = tk.ExecToErr("admin check index admin_test c1") c.Assert(err, NotNil) - _, err = tk.Exec("admin check index admin_test c2") + err = tk.ExecToErr("admin check index admin_test c2") c.Assert(err, NotNil) r := tk.MustQuery("SELECT COUNT(*) FROM admin_test") r.Check(testkit.Rows("3")) @@ -430,7 +430,7 @@ func (s *testSuite2) TestAdminCheckTableFailed(c *C) { c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) - _, err = tk.Exec("admin check table admin_test") + err = tk.ExecToErr("admin check table admin_test") c.Assert(err.Error(), Equals, "[executor:8003]admin_test err:[admin:1]index: != record:&admin.RecordData{Handle:-1, Values:[]types.Datum{types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:-10, b:[]uint8(nil), x:interface {}(nil)}}}") c.Assert(executor.ErrAdminCheckTable.Equal(err), IsTrue) @@ -447,7 +447,7 @@ func (s *testSuite2) TestAdminCheckTableFailed(c *C) { c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) - _, err = tk.Exec("admin check table admin_test") + err = tk.ExecToErr("admin check table admin_test") c.Assert(err.Error(), Equals, "handle 0, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:0, b:[]uint8(nil), x:interface {}(nil)} != record:") // Add one row of index. @@ -464,7 +464,7 @@ func (s *testSuite2) TestAdminCheckTableFailed(c *C) { c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) - _, err = tk.Exec("admin check table admin_test") + err = tk.ExecToErr("admin check table admin_test") c.Assert(err.Error(), Equals, "col c2, handle 2, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:13, b:[]uint8(nil), x:interface {}(nil)} != record:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:12, b:[]uint8(nil), x:interface {}(nil)}") // Table count = index count. @@ -477,7 +477,7 @@ func (s *testSuite2) TestAdminCheckTableFailed(c *C) { c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) - _, err = tk.Exec("admin check table admin_test") + err = tk.ExecToErr("admin check table admin_test") c.Assert(err.Error(), Equals, "col c2, handle 10, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:19, b:[]uint8(nil), x:interface {}(nil)} != record:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:20, b:[]uint8(nil), x:interface {}(nil)}") // Table count = index count. @@ -490,7 +490,7 @@ func (s *testSuite2) TestAdminCheckTableFailed(c *C) { c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) - _, err = tk.Exec("admin check table admin_test") + err = tk.ExecToErr("admin check table admin_test") c.Assert(err.Error(), Equals, "col c2, handle 10, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:19, b:[]uint8(nil), x:interface {}(nil)} != record:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:20, b:[]uint8(nil), x:interface {}(nil)}") // Recover records. @@ -576,7 +576,7 @@ func (s *testSuite1) TestAdminCheckTable(c *C) { tk.MustExec("use mysql") tk.MustExec(`admin check table test.t;`) - _, err := tk.Exec("admin check table t") + err := tk.ExecToErr("admin check table t") c.Assert(err, NotNil) // test add index on time type column which have default value @@ -616,7 +616,7 @@ func (s *testSuite1) TestAdminCheckTable(c *C) { tk.MustExec(`drop table if exists t1`) tk.MustExec(`create table t1 (a decimal(2,1), index(a))`) tk.MustExec(`insert into t1 set a='1.9'`) - _, err = tk.Exec(`alter table t1 modify column a decimal(3,2);`) + err = tk.ExecToErr(`alter table t1 modify column a decimal(3,2);`) c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "[ddl:203]unsupported modify decimal column precision") tk.MustExec(`delete from t1;`) @@ -659,9 +659,9 @@ func (s *testSuite2) TestAdminCheckWithSnapshot(c *C) { c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) - _, err = tk.Exec("admin check table admin_t_s") + err = tk.ExecToErr("admin check table admin_t_s") c.Assert(err, NotNil) - _, err = tk.Exec("admin check index admin_t_s a") + err = tk.ExecToErr("admin check index admin_t_s a") c.Assert(err, NotNil) // For mocktikv, safe point is not initialized, we manually insert it for snapshot to use. @@ -678,9 +678,9 @@ func (s *testSuite2) TestAdminCheckWithSnapshot(c *C) { tk.MustExec("admin check index admin_t_s a;") tk.MustExec("set @@tidb_snapshot = ''") - _, err = tk.Exec("admin check table admin_t_s") + err = tk.ExecToErr("admin check table admin_t_s") c.Assert(err, NotNil) - _, err = tk.Exec("admin check index admin_t_s a") + err = tk.ExecToErr("admin check index admin_t_s a") c.Assert(err, NotNil) r := tk.MustQuery("admin cleanup index admin_t_s a") diff --git a/executor/executor.go b/executor/executor.go index 10b0e26ac7e22..c1549e5db674e 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -476,6 +476,17 @@ func (e *CheckTableExec) Open(ctx context.Context) error { return nil } +// Close implements the Executor Close interface. +func (e *CheckTableExec) Close() error { + var firstErr error + for _, src := range e.srcs { + if err := src.Close(); err != nil && firstErr == nil { + firstErr = err + } + } + return firstErr +} + func (e *CheckTableExec) checkIndexHandle(ctx context.Context, num int, src *IndexLookUpExecutor) error { cols := src.schema.Columns retFieldTypes := make([]*types.FieldType, len(cols))