Skip to content

Commit

Permalink
*: add concurrency limit and address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Jul 30, 2019
1 parent cbf4ba7 commit 2c7d2d2
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 78 deletions.
7 changes: 3 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,21 +322,20 @@ func (b *executorBuilder) buildCheckIndex(v *plannercore.CheckIndex) Executor {
return e
}

// buildIndexLookUpChecker builds check information to IndexLookUpReader.
func buildIndexLookUpChecker(b *executorBuilder, readerPlan *plannercore.PhysicalIndexLookUpReader,
readerExec *IndexLookUpExecutor) {
is := readerPlan.IndexPlans[0].(*plannercore.PhysicalIndexScan)
readerExec.dagPB.OutputOffsets = make([]uint32, 0, len(is.Index.Columns))
for i := 0; i <= len(is.Index.Columns); i++ {
readerExec.dagPB.OutputOffsets = append(readerExec.dagPB.OutputOffsets, uint32(i))
}
// set tps
tps := make([]*types.FieldType, 0, len(is.Columns)+1)
for _, col := range is.Columns {
tps = append(tps, &col.FieldType)
}
tps = append(tps, types.NewFieldType(mysql.TypeLonglong))
readerExec.tps = tps
readerExec.tbl = readerExec.table
readerExec.idxColTps = tps
readerExec.idxInfo = readerExec.index

colNames := make([]string, 0, len(is.Columns))
Expand All @@ -345,7 +344,7 @@ func buildIndexLookUpChecker(b *executorBuilder, readerPlan *plannercore.Physica
}

var err error
readerExec.cols, err = table.FindCols(readerExec.table.Cols(), colNames, true)
readerExec.idxTblCols, err = table.FindCols(readerExec.table.Cols(), colNames, true)
if err != nil {
b.err = errors.Trace(err)
return
Expand Down
30 changes: 15 additions & 15 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,12 +354,11 @@ type IndexLookUpExecutor struct {
}

type checkIndexValue struct {
isCheckOp bool
tps []*types.FieldType
tbl table.Table
idxInfo *model.IndexInfo
cols []*table.Column
genExprs map[model.TableColumnID]expression.Expression
isCheckOp bool
idxColTps []*types.FieldType
idxInfo *model.IndexInfo
idxTblCols []*table.Column
genExprs map[model.TableColumnID]expression.Expression
}

// Open implements the Executor Open interface.
Expand Down Expand Up @@ -448,7 +447,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
}
tps := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}
if e.isCheckOp {
tps = e.tps
tps = e.idxColTps
}
// Since the first read only need handle information. So its returned col is only 1.
result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, tps, e.feedback, getPhysicalPlanIDs(e.idxPlans))
Expand Down Expand Up @@ -652,7 +651,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
}()
var chk *chunk.Chunk
if w.isCheckOp {
chk = chunk.NewChunkWithCapacity(w.tps, w.maxChunkSize)
chk = chunk.NewChunkWithCapacity(w.idxColTps, w.maxChunkSize)
} else {
chk = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.idxLookup.maxChunkSize)
}
Expand Down Expand Up @@ -701,7 +700,7 @@ func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk,
}
if w.isCheckOp {
if retChk == nil {
retChk = chunk.NewChunkWithCapacity(w.tps, w.batchSize)
retChk = chunk.NewChunkWithCapacity(w.idxColTps, w.batchSize)
}
retChk.Append(chk, 0, chk.NumRows())
}
Expand Down Expand Up @@ -796,7 +795,8 @@ func (w *tableWorker) pickAndExecTask(ctx context.Context) {

func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, tableReader Executor) error {
chk := newFirstChunk(tableReader)
vals := make([]types.Datum, 0, len(w.cols))
tblInfo := w.idxLookup.table.Meta()
vals := make([]types.Datum, 0, len(w.idxTblCols))
for {
err := tableReader.Next(ctx, chk)
if err != nil {
Expand All @@ -805,7 +805,7 @@ func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, ta
if chk.NumRows() == 0 {
for h := range task.indexOrder {
idxRow := task.idxRows.GetRow(task.indexOrder[h])
return errors.Errorf("handle %#v, index:%#v != record:%#v", h, idxRow.GetDatum(0, w.tps[0]), nil)
return errors.Errorf("handle %#v, index:%#v != record:%#v", h, idxRow.GetDatum(0, w.idxColTps[0]), nil)
}
break
}
Expand All @@ -821,9 +821,9 @@ func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, ta
delete(task.indexOrder, handle)
idxRow := task.idxRows.GetRow(offset)
vals = vals[:0]
for i, col := range w.cols {
for i, col := range w.idxTblCols {
if col.IsGenerated() && !col.GeneratedStored {
expr := w.genExprs[model.TableColumnID{TableID: w.tbl.Meta().ID, ColumnID: col.ID}]
expr := w.genExprs[model.TableColumnID{TableID: tblInfo.ID, ColumnID: col.ID}]
// Eval the column value
val, err := expr.Eval(row)
if err != nil {
Expand All @@ -838,9 +838,9 @@ func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, ta
vals = append(vals, row.GetDatum(i, &col.FieldType))
}
}
vals = tables.TruncateIndexValuesIfNeeded(w.tbl.Meta(), w.idxInfo, vals)
vals = tables.TruncateIndexValuesIfNeeded(tblInfo, w.idxInfo, vals)
for i, val := range vals {
col := w.cols[i]
col := w.idxTblCols[i]
tp := &col.FieldType
ret := chunk.Compare(idxRow, i, &val)
if ret != 0 {
Expand Down
26 changes: 22 additions & 4 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
Expand Down Expand Up @@ -507,6 +508,12 @@ func (e *CheckTableExec) checkIndexHandle(ctx context.Context, num int, src *Ind
return errors.Trace(err)
}

func (e *CheckTableExec) handlePanic(r interface{}) {
if r != nil {
e.retCh <- errors.Errorf("%v", r)
}
}

// Next implements the Executor Next interface.
func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
if e.done || len(e.srcs) == 0 {
Expand All @@ -533,13 +540,24 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
}

// The number of table rows is equal to the number of index rows.
// TODO: Make the value of concurrency adjustable. And we can consider the number of records.
concurrency := 3
wg := sync.WaitGroup{}
for i := range e.srcs {
wg.Add(1)
go func(num int) {
err1 := e.checkIndexHandle(ctx, num, e.srcs[num])
if err1 != nil {
logutil.Logger(ctx).Info("check index handle failed", zap.Error(err))
}
defer wg.Done()
util.WithRecovery(func() {
err1 := e.checkIndexHandle(ctx, num, e.srcs[num])
if err1 != nil {
logutil.Logger(ctx).Info("check index handle failed", zap.Error(err))
}
}, e.handlePanic)
}(i)

if (i+1)%concurrency == 0 {
wg.Wait()
}
}

for i := 0; i < len(e.srcs); i++ {
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ require (
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/montanaflynn/stats v0.0.0-20180911141734-db72e6cae808 // indirect
github.com/myesui/uuid v1.0.0 // indirect
github.com/ngaut/log v0.0.0-20180314031856-b8e36e7ba5ac
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef
github.com/opentracing/basictracer-go v1.0.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ github.com/montanaflynn/stats v0.0.0-20180911141734-db72e6cae808 h1:pmpDGKLw4n82
github.com/montanaflynn/stats v0.0.0-20180911141734-db72e6cae808/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/myesui/uuid v1.0.0 h1:xCBmH4l5KuvLYc5L7AS7SZg9/jKdIFubM7OVoLqaQUI=
github.com/myesui/uuid v1.0.0/go.mod h1:2CDfNgU0LR8mIdO8vdWd8i9gWWxLlcoIGGpSNgafq84=
github.com/ngaut/log v0.0.0-20180314031856-b8e36e7ba5ac h1:wyheT2lPXRQqYPWY2IVW5BTLrbqCsnhL61zK2R5goLA=
github.com/ngaut/log v0.0.0-20180314031856-b8e36e7ba5ac/go.mod h1:ueVCjKQllPmX7uEvCYnZD5b8qjidGf1TCH61arVe4SU=
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 h1:7KAv7KMGTTqSmYZtNdcNTgsos+vFzULLwyElndwn+5c=
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7/go.mod h1:iWMfgwqYW+e8n5lC/jjNEhwcjbRDpl5NT7n2h+4UNcI=
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3bPQ/0cuYh2H4rkg0tytX/07k=
Expand Down
118 changes: 68 additions & 50 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,83 +657,107 @@ func (b *PlanBuilder) buildAdmin(ctx context.Context, as *ast.AdminStmt) (Plan,
return ret, nil
}

func (b *PlanBuilder) buildPhysicalIndexLookUpReader(ctx context.Context, dbName model.CIStr, tbl table.Table, idx *model.IndexInfo, id int) (Plan, error) {
// getGenExprs gets generated expressions map.
func (b *PlanBuilder) getGenExprs(ctx context.Context, dbName model.CIStr, tbl table.Table, idx *model.IndexInfo) (
map[model.TableColumnID]expression.Expression, error) {
tblInfo := tbl.Meta()
columns := make([]*model.ColumnInfo, 0, len(idx.Columns))
tblColumns := make([]*model.ColumnInfo, 0, len(tbl.Cols()))

// Get generated expressions.
genExprs := make(map[model.TableColumnID]expression.Expression)
genExprsMap := make(map[model.TableColumnID]expression.Expression)
exprs := make([]expression.Expression, 0, len(tbl.Cols()))
genExprIdxs := make([]model.TableColumnID, len(tbl.Cols()))
mockTablePlan := LogicalTableDual{}.Init(b.ctx)
mockTablePlan.SetSchema(expression.TableInfo2SchemaWithDBName(b.ctx, dbName, tblInfo))
for _, column := range idx.Columns {
col := table.FindCol(tbl.Cols(), column.Name.L)
if !col.IsGenerated() {
continue
for i, colExpr := range mockTablePlan.Schema().Columns {
col := tbl.Cols()[i]
var expr expression.Expression
expr = colExpr
if col.IsGenerated() && !col.GeneratedStored {
var err error
expr, _, err = b.rewrite(ctx, col.GeneratedExpr, mockTablePlan, nil, true)
if err != nil {
return nil, errors.Trace(err)
}
expr = expression.BuildCastFunction(b.ctx, expr, colExpr.GetType())
found := false
for _, column := range idx.Columns {
if strings.EqualFold(col.Name.L, column.Name.L) {
found = true
break
}
}
if found {
genColumnID := model.TableColumnID{TableID: tblInfo.ID, ColumnID: col.ColumnInfo.ID}
genExprsMap[genColumnID] = expr
genExprIdxs[i] = genColumnID
}
}
columnName := &ast.ColumnName{Name: column.Name}
columnName.SetText(column.Name.O)

colExpr, _, err := mockTablePlan.findColumn(columnName)
if err != nil {
return nil, errors.Trace(err)
exprs = append(exprs, expr)
}
// Re-iterate expressions to handle those virtual generated columns that refers to the other generated columns.
for i, expr := range exprs {
exprs[i] = expression.ColumnSubstitute(expr, mockTablePlan.Schema(), exprs)
if _, ok := genExprsMap[genExprIdxs[i]]; ok {
genExprsMap[genExprIdxs[i]] = exprs[i]
}
}
return genExprsMap, nil
}

expr, _, err := b.rewrite(ctx, col.GeneratedExpr, mockTablePlan, nil, true)
if err != nil {
return nil, errors.Trace(err)
}
expr = expression.BuildCastFunction(b.ctx, expr, colExpr.GetType())
genColumnID := model.TableColumnID{TableID: tblInfo.ID, ColumnID: col.ColumnInfo.ID}
genExprs[genColumnID] = expr
func (b *PlanBuilder) buildPhysicalIndexLookUpReader(ctx context.Context, dbName model.CIStr, tbl table.Table, idx *model.IndexInfo, id int) (Plan, error) {
genExprsMap, err := b.getGenExprs(ctx, dbName, tbl, idx)
if err != nil {
return nil, errors.Trace(err)
}

// Get generated columns.
var genCols []*expression.Column
pkOffset := -1
tblInfo := tbl.Meta()
colsMap := make(map[int64]struct{})
schema := expression.NewSchema(make([]*expression.Column, 0, len(idx.Columns))...)
idxReaderCols := make([]*model.ColumnInfo, 0, len(idx.Columns))
tblReaderCols := make([]*model.ColumnInfo, 0, len(tbl.Cols()))
for _, idxCol := range idx.Columns {
for _, col := range tblInfo.Columns {
if idxCol.Name.L == col.Name.L {
columns = append(columns, col)
tblColumns = append(tblColumns, col)
idxReaderCols = append(idxReaderCols, col)
tblReaderCols = append(tblReaderCols, col)
schema.Append(&expression.Column{
ColName: col.Name,
UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(),
RetType: &col.FieldType})
colsMap[col.ID] = struct{}{}
if mysql.HasPriKeyFlag(col.Flag) {
pkOffset = len(tblColumns) - 1
pkOffset = len(tblReaderCols) - 1
}
}
genColumnID := model.TableColumnID{TableID: tblInfo.ID, ColumnID: col.ID}
if expr, ok := genExprs[genColumnID]; ok {
if expr, ok := genExprsMap[genColumnID]; ok {
cols := expression.ExtractColumns(expr)
genCols = append(genCols, cols...)
}
}
}
// Add generated columns to tblSchema and tblReaderCols.
tblSchema := schema.Clone()
for _, col := range genCols {
if _, ok := colsMap[col.ID]; !ok {
c := table.FindCol(tbl.Cols(), col.ColName.O)
if c != nil {
col.Index = len(tblColumns)
tblColumns = append(tblColumns, c.ColumnInfo)
col.Index = len(tblReaderCols)
tblReaderCols = append(tblReaderCols, c.ColumnInfo)
tblSchema.Append(&expression.Column{
ColName: c.Name,
UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(),
RetType: &c.FieldType})
colsMap[c.ID] = struct{}{}
if mysql.HasPriKeyFlag(c.Flag) {
pkOffset = len(tblColumns) - 1
pkOffset = len(tblReaderCols) - 1
}
}
}
}
if !tbl.Meta().PKIsHandle || pkOffset == -1 {
tblColumns = append(tblColumns, model.NewExtraHandleColInfo())
tblReaderCols = append(tblReaderCols, model.NewExtraHandleColInfo())
handleCol := &expression.Column{
DBName: dbName,
TblName: tblInfo.Name,
Expand All @@ -743,30 +767,27 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReader(ctx context.Context, dbName
ID: model.ExtraHandleID,
}
tblSchema.Append(handleCol)
pkOffset = len(tblColumns) - 1
pkOffset = len(tblReaderCols) - 1
}

is := PhysicalIndexScan{
Table: tblInfo,
TableAsName: &tblInfo.Name,
DBName: dbName,
Columns: columns,
Columns: idxReaderCols,
Index: idx,
dataSourceSchema: schema,
Ranges: ranger.FullRange(),
KeepOrder: false,
GenExprs: genExprs,
GenExprs: genExprsMap,
}.Init(b.ctx)
is.stats = property.NewSimpleStats(0)
cop := &copTask{indexPlan: is}
// It's double read case.
ts := PhysicalTableScan{Columns: tblColumns, Table: is.Table}.Init(b.ctx)
ts := PhysicalTableScan{Columns: tblReaderCols, Table: is.Table}.Init(b.ctx)
ts.SetSchema(tblSchema)
cop.tablePlan = ts
cop := &copTask{indexPlan: is, tablePlan: ts}
ts.HandleIdx = pkOffset
is.initSchema(id, idx, true)
t := finishCopTask(b.ctx, cop)
rootT := t.(*rootTask)
rootT := finishCopTask(b.ctx, cop).(*rootTask)
return rootT.p, nil
}

Expand All @@ -780,14 +801,14 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReaders(ctx context.Context, dbNam
if idxInfo.State != model.StatePublic {
logutil.Logger(context.Background()).Info("build physical index lookup reader, the index isn't public",
zap.String("index", idxInfo.Name.O), zap.Stringer("state", idxInfo.State), zap.String("table", tblInfo.Name.O))
} else {
indices = append(indices, idx)
reader, err := b.buildPhysicalIndexLookUpReader(ctx, dbName, tbl, idxInfo, i)
if err != nil {
return nil, nil, err
}
indexLookUpReaders = append(indexLookUpReaders, reader)
continue
}
indices = append(indices, idx)
reader, err := b.buildPhysicalIndexLookUpReader(ctx, dbName, tbl, idxInfo, i)
if err != nil {
return nil, nil, err
}
indexLookUpReaders = append(indexLookUpReaders, reader)
}
if len(indexLookUpReaders) == 0 {
return nil, nil, nil
Expand All @@ -802,10 +823,7 @@ func (b *PlanBuilder) buildAdminCheckTable(ctx context.Context, as *ast.AdminStm
TblInfo: tbl.TableInfo,
}

mockTablePlan := LogicalTableDual{}.Init(b.ctx)
tableInfo := as.Tables[0].TableInfo
schema := expression.TableInfo2SchemaWithDBName(b.ctx, tbl.Schema, tableInfo)
mockTablePlan.SetSchema(schema)
table, ok := b.is.TableByID(tableInfo.ID)
if !ok {
return nil, infoschema.ErrTableNotExists.GenWithStackByArgs(tbl.DBInfo.Name.O, tableInfo.Name.O)
Expand Down
Loading

0 comments on commit 2c7d2d2

Please sign in to comment.