diff --git a/executor/builder.go b/executor/builder.go index 058d4b905c918..4295eaa80e334 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -322,6 +322,7 @@ func (b *executorBuilder) buildCheckIndex(v *plannercore.CheckIndex) Executor { return e } +// buildIndexLookUpChecker builds check information to IndexLookUpReader. func buildIndexLookUpChecker(b *executorBuilder, readerPlan *plannercore.PhysicalIndexLookUpReader, readerExec *IndexLookUpExecutor) { is := readerPlan.IndexPlans[0].(*plannercore.PhysicalIndexScan) @@ -329,14 +330,12 @@ func buildIndexLookUpChecker(b *executorBuilder, readerPlan *plannercore.Physica for i := 0; i <= len(is.Index.Columns); i++ { readerExec.dagPB.OutputOffsets = append(readerExec.dagPB.OutputOffsets, uint32(i)) } - // set tps tps := make([]*types.FieldType, 0, len(is.Columns)+1) for _, col := range is.Columns { tps = append(tps, &col.FieldType) } tps = append(tps, types.NewFieldType(mysql.TypeLonglong)) - readerExec.tps = tps - readerExec.tbl = readerExec.table + readerExec.idxColTps = tps readerExec.idxInfo = readerExec.index colNames := make([]string, 0, len(is.Columns)) @@ -345,7 +344,7 @@ func buildIndexLookUpChecker(b *executorBuilder, readerPlan *plannercore.Physica } var err error - readerExec.cols, err = table.FindCols(readerExec.table.Cols(), colNames, true) + readerExec.idxTblCols, err = table.FindCols(readerExec.table.Cols(), colNames, true) if err != nil { b.err = errors.Trace(err) return diff --git a/executor/distsql.go b/executor/distsql.go index 2e79e12deff38..198ca216423d5 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -354,12 +354,11 @@ type IndexLookUpExecutor struct { } type checkIndexValue struct { - isCheckOp bool - tps []*types.FieldType - tbl table.Table - idxInfo *model.IndexInfo - cols []*table.Column - genExprs map[model.TableColumnID]expression.Expression + isCheckOp bool + idxColTps []*types.FieldType + idxInfo *model.IndexInfo + idxTblCols []*table.Column + genExprs map[model.TableColumnID]expression.Expression } // Open implements the Executor Open interface. @@ -448,7 +447,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k } tps := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} if e.isCheckOp { - tps = e.tps + tps = e.idxColTps } // Since the first read only need handle information. So its returned col is only 1. result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, tps, e.feedback, getPhysicalPlanIDs(e.idxPlans)) @@ -652,7 +651,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes }() var chk *chunk.Chunk if w.isCheckOp { - chk = chunk.NewChunkWithCapacity(w.tps, w.maxChunkSize) + chk = chunk.NewChunkWithCapacity(w.idxColTps, w.maxChunkSize) } else { chk = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.idxLookup.maxChunkSize) } @@ -701,7 +700,7 @@ func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, } if w.isCheckOp { if retChk == nil { - retChk = chunk.NewChunkWithCapacity(w.tps, w.batchSize) + retChk = chunk.NewChunkWithCapacity(w.idxColTps, w.batchSize) } retChk.Append(chk, 0, chk.NumRows()) } @@ -796,7 +795,8 @@ func (w *tableWorker) pickAndExecTask(ctx context.Context) { func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, tableReader Executor) error { chk := newFirstChunk(tableReader) - vals := make([]types.Datum, 0, len(w.cols)) + tblInfo := w.idxLookup.table.Meta() + vals := make([]types.Datum, 0, len(w.idxTblCols)) for { err := tableReader.Next(ctx, chk) if err != nil { @@ -805,7 +805,7 @@ func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, ta if chk.NumRows() == 0 { for h := range task.indexOrder { idxRow := task.idxRows.GetRow(task.indexOrder[h]) - return errors.Errorf("handle %#v, index:%#v != record:%#v", h, idxRow.GetDatum(0, w.tps[0]), nil) + return errors.Errorf("handle %#v, index:%#v != record:%#v", h, idxRow.GetDatum(0, w.idxColTps[0]), nil) } break } @@ -821,9 +821,9 @@ func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, ta delete(task.indexOrder, handle) idxRow := task.idxRows.GetRow(offset) vals = vals[:0] - for i, col := range w.cols { + for i, col := range w.idxTblCols { if col.IsGenerated() && !col.GeneratedStored { - expr := w.genExprs[model.TableColumnID{TableID: w.tbl.Meta().ID, ColumnID: col.ID}] + expr := w.genExprs[model.TableColumnID{TableID: tblInfo.ID, ColumnID: col.ID}] // Eval the column value val, err := expr.Eval(row) if err != nil { @@ -838,9 +838,9 @@ func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, ta vals = append(vals, row.GetDatum(i, &col.FieldType)) } } - vals = tables.TruncateIndexValuesIfNeeded(w.tbl.Meta(), w.idxInfo, vals) + vals = tables.TruncateIndexValuesIfNeeded(tblInfo, w.idxInfo, vals) for i, val := range vals { - col := w.cols[i] + col := w.idxTblCols[i] tp := &col.FieldType ret := chunk.Compare(idxRow, i, &val) if ret != 0 { diff --git a/executor/executor.go b/executor/executor.go index 810f82422504e..81361ca25581b 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/admin" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/execdetails" @@ -507,6 +508,12 @@ func (e *CheckTableExec) checkIndexHandle(ctx context.Context, num int, src *Ind return errors.Trace(err) } +func (e *CheckTableExec) handlePanic(r interface{}) { + if r != nil { + e.retCh <- errors.Errorf("%v", r) + } +} + // Next implements the Executor Next interface. func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error { if e.done || len(e.srcs) == 0 { @@ -533,13 +540,24 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error { } // The number of table rows is equal to the number of index rows. + // TODO: Make the value of concurrency adjustable. And we can consider the number of records. + concurrency := 3 + wg := sync.WaitGroup{} for i := range e.srcs { + wg.Add(1) go func(num int) { - err1 := e.checkIndexHandle(ctx, num, e.srcs[num]) - if err1 != nil { - logutil.Logger(ctx).Info("check index handle failed", zap.Error(err)) - } + defer wg.Done() + util.WithRecovery(func() { + err1 := e.checkIndexHandle(ctx, num, e.srcs[num]) + if err1 != nil { + logutil.Logger(ctx).Info("check index handle failed", zap.Error(err)) + } + }, e.handlePanic) }(i) + + if (i+1)%concurrency == 0 { + wg.Wait() + } } for i := 0; i < len(e.srcs); i++ { diff --git a/go.mod b/go.mod index 7664320524d5b..6f5060725a50f 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,6 @@ require ( github.com/modern-go/reflect2 v1.0.1 // indirect github.com/montanaflynn/stats v0.0.0-20180911141734-db72e6cae808 // indirect github.com/myesui/uuid v1.0.0 // indirect - github.com/ngaut/log v0.0.0-20180314031856-b8e36e7ba5ac github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef github.com/opentracing/basictracer-go v1.0.0 diff --git a/go.sum b/go.sum index 2e7a072af9c73..368f4dd6e0d9a 100644 --- a/go.sum +++ b/go.sum @@ -132,8 +132,6 @@ github.com/montanaflynn/stats v0.0.0-20180911141734-db72e6cae808 h1:pmpDGKLw4n82 github.com/montanaflynn/stats v0.0.0-20180911141734-db72e6cae808/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/myesui/uuid v1.0.0 h1:xCBmH4l5KuvLYc5L7AS7SZg9/jKdIFubM7OVoLqaQUI= github.com/myesui/uuid v1.0.0/go.mod h1:2CDfNgU0LR8mIdO8vdWd8i9gWWxLlcoIGGpSNgafq84= -github.com/ngaut/log v0.0.0-20180314031856-b8e36e7ba5ac h1:wyheT2lPXRQqYPWY2IVW5BTLrbqCsnhL61zK2R5goLA= -github.com/ngaut/log v0.0.0-20180314031856-b8e36e7ba5ac/go.mod h1:ueVCjKQllPmX7uEvCYnZD5b8qjidGf1TCH61arVe4SU= github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 h1:7KAv7KMGTTqSmYZtNdcNTgsos+vFzULLwyElndwn+5c= github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7/go.mod h1:iWMfgwqYW+e8n5lC/jjNEhwcjbRDpl5NT7n2h+4UNcI= github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3bPQ/0cuYh2H4rkg0tytX/07k= diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index ff3740ed0f0a9..80af88e989aa7 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -657,83 +657,107 @@ func (b *PlanBuilder) buildAdmin(ctx context.Context, as *ast.AdminStmt) (Plan, return ret, nil } -func (b *PlanBuilder) buildPhysicalIndexLookUpReader(ctx context.Context, dbName model.CIStr, tbl table.Table, idx *model.IndexInfo, id int) (Plan, error) { +// getGenExprs gets generated expressions map. +func (b *PlanBuilder) getGenExprs(ctx context.Context, dbName model.CIStr, tbl table.Table, idx *model.IndexInfo) ( + map[model.TableColumnID]expression.Expression, error) { tblInfo := tbl.Meta() - columns := make([]*model.ColumnInfo, 0, len(idx.Columns)) - tblColumns := make([]*model.ColumnInfo, 0, len(tbl.Cols())) - - // Get generated expressions. - genExprs := make(map[model.TableColumnID]expression.Expression) + genExprsMap := make(map[model.TableColumnID]expression.Expression) + exprs := make([]expression.Expression, 0, len(tbl.Cols())) + genExprIdxs := make([]model.TableColumnID, len(tbl.Cols())) mockTablePlan := LogicalTableDual{}.Init(b.ctx) mockTablePlan.SetSchema(expression.TableInfo2SchemaWithDBName(b.ctx, dbName, tblInfo)) - for _, column := range idx.Columns { - col := table.FindCol(tbl.Cols(), column.Name.L) - if !col.IsGenerated() { - continue + for i, colExpr := range mockTablePlan.Schema().Columns { + col := tbl.Cols()[i] + var expr expression.Expression + expr = colExpr + if col.IsGenerated() && !col.GeneratedStored { + var err error + expr, _, err = b.rewrite(ctx, col.GeneratedExpr, mockTablePlan, nil, true) + if err != nil { + return nil, errors.Trace(err) + } + expr = expression.BuildCastFunction(b.ctx, expr, colExpr.GetType()) + found := false + for _, column := range idx.Columns { + if strings.EqualFold(col.Name.L, column.Name.L) { + found = true + break + } + } + if found { + genColumnID := model.TableColumnID{TableID: tblInfo.ID, ColumnID: col.ColumnInfo.ID} + genExprsMap[genColumnID] = expr + genExprIdxs[i] = genColumnID + } } - columnName := &ast.ColumnName{Name: column.Name} - columnName.SetText(column.Name.O) - - colExpr, _, err := mockTablePlan.findColumn(columnName) - if err != nil { - return nil, errors.Trace(err) + exprs = append(exprs, expr) + } + // Re-iterate expressions to handle those virtual generated columns that refers to the other generated columns. + for i, expr := range exprs { + exprs[i] = expression.ColumnSubstitute(expr, mockTablePlan.Schema(), exprs) + if _, ok := genExprsMap[genExprIdxs[i]]; ok { + genExprsMap[genExprIdxs[i]] = exprs[i] } + } + return genExprsMap, nil +} - expr, _, err := b.rewrite(ctx, col.GeneratedExpr, mockTablePlan, nil, true) - if err != nil { - return nil, errors.Trace(err) - } - expr = expression.BuildCastFunction(b.ctx, expr, colExpr.GetType()) - genColumnID := model.TableColumnID{TableID: tblInfo.ID, ColumnID: col.ColumnInfo.ID} - genExprs[genColumnID] = expr +func (b *PlanBuilder) buildPhysicalIndexLookUpReader(ctx context.Context, dbName model.CIStr, tbl table.Table, idx *model.IndexInfo, id int) (Plan, error) { + genExprsMap, err := b.getGenExprs(ctx, dbName, tbl, idx) + if err != nil { + return nil, errors.Trace(err) } // Get generated columns. var genCols []*expression.Column pkOffset := -1 + tblInfo := tbl.Meta() colsMap := make(map[int64]struct{}) schema := expression.NewSchema(make([]*expression.Column, 0, len(idx.Columns))...) + idxReaderCols := make([]*model.ColumnInfo, 0, len(idx.Columns)) + tblReaderCols := make([]*model.ColumnInfo, 0, len(tbl.Cols())) for _, idxCol := range idx.Columns { for _, col := range tblInfo.Columns { if idxCol.Name.L == col.Name.L { - columns = append(columns, col) - tblColumns = append(tblColumns, col) + idxReaderCols = append(idxReaderCols, col) + tblReaderCols = append(tblReaderCols, col) schema.Append(&expression.Column{ ColName: col.Name, UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), RetType: &col.FieldType}) colsMap[col.ID] = struct{}{} if mysql.HasPriKeyFlag(col.Flag) { - pkOffset = len(tblColumns) - 1 + pkOffset = len(tblReaderCols) - 1 } } genColumnID := model.TableColumnID{TableID: tblInfo.ID, ColumnID: col.ID} - if expr, ok := genExprs[genColumnID]; ok { + if expr, ok := genExprsMap[genColumnID]; ok { cols := expression.ExtractColumns(expr) genCols = append(genCols, cols...) } } } + // Add generated columns to tblSchema and tblReaderCols. tblSchema := schema.Clone() for _, col := range genCols { if _, ok := colsMap[col.ID]; !ok { c := table.FindCol(tbl.Cols(), col.ColName.O) if c != nil { - col.Index = len(tblColumns) - tblColumns = append(tblColumns, c.ColumnInfo) + col.Index = len(tblReaderCols) + tblReaderCols = append(tblReaderCols, c.ColumnInfo) tblSchema.Append(&expression.Column{ ColName: c.Name, UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), RetType: &c.FieldType}) colsMap[c.ID] = struct{}{} if mysql.HasPriKeyFlag(c.Flag) { - pkOffset = len(tblColumns) - 1 + pkOffset = len(tblReaderCols) - 1 } } } } if !tbl.Meta().PKIsHandle || pkOffset == -1 { - tblColumns = append(tblColumns, model.NewExtraHandleColInfo()) + tblReaderCols = append(tblReaderCols, model.NewExtraHandleColInfo()) handleCol := &expression.Column{ DBName: dbName, TblName: tblInfo.Name, @@ -743,30 +767,27 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReader(ctx context.Context, dbName ID: model.ExtraHandleID, } tblSchema.Append(handleCol) - pkOffset = len(tblColumns) - 1 + pkOffset = len(tblReaderCols) - 1 } is := PhysicalIndexScan{ Table: tblInfo, TableAsName: &tblInfo.Name, DBName: dbName, - Columns: columns, + Columns: idxReaderCols, Index: idx, dataSourceSchema: schema, Ranges: ranger.FullRange(), - KeepOrder: false, - GenExprs: genExprs, + GenExprs: genExprsMap, }.Init(b.ctx) is.stats = property.NewSimpleStats(0) - cop := &copTask{indexPlan: is} // It's double read case. - ts := PhysicalTableScan{Columns: tblColumns, Table: is.Table}.Init(b.ctx) + ts := PhysicalTableScan{Columns: tblReaderCols, Table: is.Table}.Init(b.ctx) ts.SetSchema(tblSchema) - cop.tablePlan = ts + cop := &copTask{indexPlan: is, tablePlan: ts} ts.HandleIdx = pkOffset is.initSchema(id, idx, true) - t := finishCopTask(b.ctx, cop) - rootT := t.(*rootTask) + rootT := finishCopTask(b.ctx, cop).(*rootTask) return rootT.p, nil } @@ -780,14 +801,14 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReaders(ctx context.Context, dbNam if idxInfo.State != model.StatePublic { logutil.Logger(context.Background()).Info("build physical index lookup reader, the index isn't public", zap.String("index", idxInfo.Name.O), zap.Stringer("state", idxInfo.State), zap.String("table", tblInfo.Name.O)) - } else { - indices = append(indices, idx) - reader, err := b.buildPhysicalIndexLookUpReader(ctx, dbName, tbl, idxInfo, i) - if err != nil { - return nil, nil, err - } - indexLookUpReaders = append(indexLookUpReaders, reader) + continue + } + indices = append(indices, idx) + reader, err := b.buildPhysicalIndexLookUpReader(ctx, dbName, tbl, idxInfo, i) + if err != nil { + return nil, nil, err } + indexLookUpReaders = append(indexLookUpReaders, reader) } if len(indexLookUpReaders) == 0 { return nil, nil, nil @@ -802,10 +823,7 @@ func (b *PlanBuilder) buildAdminCheckTable(ctx context.Context, as *ast.AdminStm TblInfo: tbl.TableInfo, } - mockTablePlan := LogicalTableDual{}.Init(b.ctx) tableInfo := as.Tables[0].TableInfo - schema := expression.TableInfo2SchemaWithDBName(b.ctx, tbl.Schema, tableInfo) - mockTablePlan.SetSchema(schema) table, ok := b.is.TableByID(tableInfo.ID) if !ok { return nil, infoschema.ErrTableNotExists.GenWithStackByArgs(tbl.DBInfo.Name.O, tableInfo.Name.O) diff --git a/util/admin/admin.go b/util/admin/admin.go index 1b102e46d2e7f..fce94e497bc93 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -14,13 +14,13 @@ package admin import ( + "context" "fmt" "io" "math" "sort" "time" - "github.com/ngaut/log" "github.com/pingcap/errors" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" @@ -293,7 +293,8 @@ func CheckIndicesCount(ctx sessionctx.Context, dbName, tableName string, indices if err != nil { return 0, i, errors.Trace(err) } - log.Infof("check indices count, table %s cnt %d, index %s cnt %d", tableName, tblCnt, idx, idxCnt) + logutil.Logger(context.Background()).Info("check indices count, table %s cnt %d, index %s cnt %d", + zap.String("table", tableName), zap.Int64("cnt", tblCnt), zap.Reflect("index", idx), zap.Int64("cnt", idxCnt)) if tblCnt == idxCnt { continue }