Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support cleanup dangling index data command #6102

Merged
merged 19 commits into from
Apr 2, 2018
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ const (
AdminCancelDDLJobs
AdminCheckIndex
AdminRecoverIndex
AdminCleanupIndex
AdminCheckIndexRange
AdminShowDDLJobQueries
AdminChecksumTable
Expand Down
247 changes: 247 additions & 0 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand All @@ -38,6 +39,7 @@ import (
var (
_ Executor = &CheckIndexRangeExec{}
_ Executor = &RecoverIndexExec{}
_ Executor = &CleanupIndexExec{}
)

// CheckIndexRangeExec outputs the index values which has handle between begin and end.
Expand Down Expand Up @@ -457,3 +459,248 @@ func (e *RecoverIndexExec) NextChunk(ctx context.Context, chk *chunk.Chunk) erro
e.done = true
return nil
}

// CleanupIndexExec represents a cleanup index executor.
// It is built from "admin cleanup index" statement, is used to delete
// dangling index data.
type CleanupIndexExec struct {
baseExecutor

done bool
removeCnt uint64

index table.Index
table table.Table

idxCols []*model.ColumnInfo
idxColFieldTypes []*types.FieldType
idxChunk *chunk.Chunk

matchedIndex map[int64]idxData
batchSize uint64
batchKeys []kv.Key
idxValsBufs [][]types.Datum
lastIdxKey []byte
scanRowCnt uint64
}

type idxData struct {
matched bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we don't use this field.

idxVals []types.Datum
}

func (e *CleanupIndexExec) getIdxColTypes() []*types.FieldType {
if e.idxColFieldTypes != nil {
return e.idxColFieldTypes
}
e.idxColFieldTypes = make([]*types.FieldType, 0, len(e.idxCols))
for _, col := range e.idxCols {
e.idxColFieldTypes = append(e.idxColFieldTypes, &col.FieldType)
}
return e.idxColFieldTypes
}

func (e *CleanupIndexExec) batchGetRecord(txn kv.Transaction) (map[string][]byte, error) {
for handle := range e.matchedIndex {
e.batchKeys = append(e.batchKeys, e.table.RecordKey(handle))
}
values, err := txn.GetSnapshot().BatchGet(e.batchKeys)
if err != nil {
return nil, errors.Trace(err)
}
return values, nil
}

func (e *CleanupIndexExec) deleteDanglingIdx(txn kv.Transaction, values map[string][]byte) error {
for _, k := range e.batchKeys {
if _, found := values[string(k)]; !found {
_, handle, err := tablecodec.DecodeRecordKey(k)
if err != nil {
return errors.Trace(err)
}
if err := e.index.Delete(e.ctx.GetSessionVars().StmtCtx, txn, e.matchedIndex[handle].idxVals,
handle); err != nil {
return errors.Trace(err)
}
e.removeCnt++
if e.removeCnt%e.batchSize == 0 {
log.Infof("[cleaning up dangling index] table: %v, index: %v, count: %v.",
e.table.Meta().Name.String(), e.index.Meta().Name.String(), e.removeCnt)
}
}
}
return nil
}

func (e *CleanupIndexExec) extractIdxVals(row chunk.Row, idxVals []types.Datum) []types.Datum {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function logic is the same as RecoverIndexExec's extractIdxVals. Could we use the same one?

if idxVals == nil {
idxVals = make([]types.Datum, 0, row.Len()-1)
} else {
idxVals = idxVals[:0]
}

for i := 0; i < row.Len()-1; i++ {
colVal := row.GetDatum(i, e.idxColFieldTypes[i])
idxVals = append(idxVals, *colVal.Copy())
}
return idxVals
}

func (e *CleanupIndexExec) fetchIndex(ctx context.Context, txn kv.Transaction) error {
result, err := e.buildIndexScan(ctx, txn)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(result.Close)

sc := e.ctx.GetSessionVars().StmtCtx
for {
err := result.NextChunk(ctx, e.idxChunk)
if err != nil {
return errors.Trace(err)
}
if e.idxChunk.NumRows() == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check the e.scanRowCnt to return.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

friendly ping @jackysp Please resolve reviewer's comment

return nil
}
iter := chunk.NewIterator4Chunk(e.idxChunk)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
handle := row.GetInt64(len(e.idxCols) - 1)
idxVals := e.extractIdxVals(row, e.idxValsBufs[e.scanRowCnt])
e.idxValsBufs[e.scanRowCnt] = idxVals
e.matchedIndex[handle] = idxData{false, idxVals}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove the field ofmatched ?

idxKey, _, err := e.index.GenIndexKey(sc, idxVals, handle, nil)
if err != nil {
return errors.Trace(err)
}
e.scanRowCnt++
e.lastIdxKey = idxKey
if e.scanRowCnt >= e.batchSize {
return nil
}
}
}
}

// NextChunk implements the Executor NextChunk interface.
func (e *CleanupIndexExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
}
for {
errInTxn := kv.RunInNewTxn(e.ctx.GetStore(), true, func(txn kv.Transaction) error {
err := e.fetchIndex(ctx, txn)
if err != nil {
return errors.Trace(err)
}
values, err := e.batchGetRecord(txn)
if err != nil {
return errors.Trace(err)
}
err = e.deleteDanglingIdx(txn, values)
if err != nil {
return errors.Trace(err)
}
return nil
})
if errInTxn != nil {
return errors.Trace(errInTxn)
}
if e.scanRowCnt == 0 {
break
}
e.scanRowCnt = 0
e.batchKeys = e.batchKeys[:0]
for k := range e.matchedIndex {
delete(e.matchedIndex, k)
}
}
e.done = true
chk.AppendUint64(0, e.removeCnt)
return nil
}

func (e *CleanupIndexExec) buildIndexScan(ctx context.Context, txn kv.Transaction) (distsql.SelectResult, error) {
dagPB, err := e.buildIdxDAGPB(txn)
if err != nil {
return nil, errors.Trace(err)
}
sc := e.ctx.GetSessionVars().StmtCtx
var builder distsql.RequestBuilder
ranges := ranger.FullNewRange()
kvReq, err := builder.SetIndexRanges(sc, e.table.Meta().ID, e.index.Meta().ID, ranges).
SetDAGRequest(dagPB).
SetKeepOrder(true).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
kvReq.KeyRanges[0].StartKey = kv.Key(e.lastIdxKey).PrefixNext()
kvReq.Concurrency = 1
result, err := distsql.Select(ctx, e.ctx, kvReq, e.getIdxColTypes(), statistics.NewQueryFeedback(0, nil, 0, false))
if err != nil {
return nil, errors.Trace(err)
}
result.Fetch(ctx)
return result, nil
}

// Open implements the Executor Open interface.
func (e *CleanupIndexExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.idxChunk = chunk.NewChunk(e.getIdxColTypes())
e.matchedIndex = make(map[int64]idxData, e.batchSize)
e.batchKeys = make([]kv.Key, 0, e.batchSize)
e.idxValsBufs = make([][]types.Datum, e.batchSize)
sc := e.ctx.GetSessionVars().StmtCtx
idxKey, _, err := e.index.GenIndexKey(sc, []types.Datum{{}}, math.MinInt64, nil)
if err != nil {
return errors.Trace(err)
}
e.lastIdxKey = idxKey
return nil
}

func (e *CleanupIndexExec) buildIdxDAGPB(txn kv.Transaction) (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.StartTs = txn.StartTS()
dagReq.TimeZoneOffset = timeZoneOffset(e.ctx)
sc := e.ctx.GetSessionVars().StmtCtx
dagReq.Flags = statementContextToFlags(sc)
for i := range e.idxCols {
dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i))
}

execPB := e.constructIndexScanPB()
dagReq.Executors = append(dagReq.Executors, execPB)
err := plan.SetPBColumnsDefaultValue(e.ctx, dagReq.Executors[0].IdxScan.Columns, e.idxCols)
if err != nil {
return nil, errors.Trace(err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these codes be extracted into a function?
The logic is used in many places.

Copy link
Member Author

@jackysp jackysp Mar 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are looked similar, but not exactly the same. It needs a refactor. Maybe in the next pr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the same as CheckIndexRangeExec's buildDAGPB.
But if you want to refactor some of the exec in admin, you can handle it in the next PR.


limitExec := e.constructLimitPB()
dagReq.Executors = append(dagReq.Executors, limitExec)

return dagReq, nil
}

func (e *CleanupIndexExec) constructIndexScanPB() *tipb.Executor {
idxExec := &tipb.IndexScan{
TableId: e.table.Meta().ID,
IndexId: e.index.Meta().ID,
Columns: plan.ColumnsToProto(e.idxCols, e.table.Meta().PKIsHandle),
}
return &tipb.Executor{Tp: tipb.ExecType_TypeIndexScan, IdxScan: idxExec}
}

func (e *CleanupIndexExec) constructLimitPB() *tipb.Executor {
limitExec := &tipb.Limit{
Limit: e.batchSize,
}
return &tipb.Executor{Tp: tipb.ExecType_TypeLimit, Limit: limitExec}
}

// Close implements the Executor Close interface.
func (e *CleanupIndexExec) Close() error {
return nil
}
Loading