Skip to content

Commit

Permalink
util/admin: support admin check table on partition table (#12796) (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored and sre-bot committed Nov 6, 2019
1 parent 4815107 commit 80cc260
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 62 deletions.
99 changes: 97 additions & 2 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,101 @@ func (s *testSuite2) TestAdminCleanupIndexMore(c *C) {
tk.MustExec("admin check table admin_test")
}

func (s *testSuite2) TestAdminCheckPartitionTableFailed(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists admin_test_p")
tk.MustExec("create table admin_test_p (c1 int key,c2 int,c3 int,index idx(c2)) partition by hash(c1) partitions 4")
tk.MustExec("insert admin_test_p (c1, c2, c3) values (0,0,0), (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5)")
tk.MustExec("admin check table admin_test_p")

// Make some corrupted index. Build the index information.
s.ctx = mock.NewContext()
s.ctx.Store = s.store
is := s.domain.InfoSchema()
dbName := model.NewCIStr("test")
tblName := model.NewCIStr("admin_test_p")
tbl, err := is.TableByName(dbName, tblName)
c.Assert(err, IsNil)
tblInfo := tbl.Meta()
idxInfo := tblInfo.Indices[0]
sc := s.ctx.GetSessionVars().StmtCtx
tk.Se.GetSessionVars().IndexLookupSize = 3
tk.Se.GetSessionVars().MaxChunkSize = 3

// Reduce one row of index on partitions.
// Table count > index count.
for i := 0; i <= 5; i++ {
partitionIdx := i % len(tblInfo.GetPartitionInfo().Definitions)
indexOpr := tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[partitionIdx].ID, tblInfo, idxInfo)
txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(i), int64(i), nil)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
err = tk.ExecToErr("admin check table admin_test_p")
c.Assert(err.Error(), Equals, fmt.Sprintf("[executor:8003]admin_test_p err:[admin:1]index:<nil> != record:&admin.RecordData{Handle:%d, Values:[]types.Datum{types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:%d, b:[]uint8(nil), x:interface {}(nil)}}}", i, i))
c.Assert(executor.ErrAdminCheckTable.Equal(err), IsTrue)
// TODO: fix admin recover for partition table.
//r := tk.MustQuery("admin recover index admin_test_p idx")
//r.Check(testkit.Rows("0 0"))
//tk.MustExec("admin check table admin_test_p")
// Manual recover index.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(i), int64(i))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
tk.MustExec("admin check table admin_test_p")
}

// Add one row of index on partitions.
// Table count < index count.
for i := 0; i <= 5; i++ {
partitionIdx := i % len(tblInfo.GetPartitionInfo().Definitions)
indexOpr := tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[partitionIdx].ID, tblInfo, idxInfo)
txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(i+8), int64(i+8))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
err = tk.ExecToErr("admin check table admin_test_p")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, fmt.Sprintf("handle %d, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:%d, b:[]uint8(nil), x:interface {}(nil)} != record:<nil>", i+8, i+8))
// TODO: fix admin recover for partition table.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(i+8), int64(i+8), nil)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
tk.MustExec("admin check table admin_test_p")
}

// Table count = index count, but the index value was wrong.
for i := 0; i <= 5; i++ {
partitionIdx := i % len(tblInfo.GetPartitionInfo().Definitions)
indexOpr := tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[partitionIdx].ID, tblInfo, idxInfo)
txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(i+8), int64(i))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
err = tk.ExecToErr("admin check table admin_test_p")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, fmt.Sprintf("col c2, handle %d, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:%d, b:[]uint8(nil), x:interface {}(nil)} != record:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:%d, b:[]uint8(nil), x:interface {}(nil)}", i, i+8, i))
// TODO: fix admin recover for partition table.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(i+8), int64(i), nil)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
tk.MustExec("admin check table admin_test_p")
}
}

func (s *testSuite2) TestAdminCheckTableFailed(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down Expand Up @@ -525,8 +620,8 @@ func (s *testSuite1) TestAdminCheckTable(c *C) {
tk.MustExec(`drop table if exists test`)
tk.MustExec(`create table test (
a time,
PRIMARY KEY (a)
);`)
PRIMARY KEY (a)
);`)

tk.MustExec(`insert into test set a='12:10:36';`)
tk.MustExec(`admin check table test`)
Expand Down
9 changes: 5 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,12 @@ func (b *executorBuilder) buildCheckTable(v *plannercore.CheckTable) Executor {
e := &CheckTableExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
dbName: v.DBName,
tblInfo: v.TblInfo,
indices: v.Indices,
table: v.Table,
indexInfos: v.IndexInfos,
is: b.is,
srcs: readerExecs,
exitCh: make(chan struct{}),
retCh: make(chan error, len(v.Indices)),
retCh: make(chan error, len(readerExecs)),
}
return e
}
Expand Down Expand Up @@ -1846,7 +1846,8 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
}
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
tbl, _ := b.is.TableByID(ts.Table.ID)
if isPartition, physicalTableID := ts.IsPartition(); isPartition {
isPartition, physicalTableID := ts.IsPartition()
if isPartition {
pt := tbl.(table.PartitionedTable)
tbl = pt.GetPartition(physicalTableID)
}
Expand Down
64 changes: 40 additions & 24 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -452,14 +453,14 @@ func getTableName(is infoschema.InfoSchema, id int64) string {
type CheckTableExec struct {
baseExecutor

dbName string
tblInfo *model.TableInfo
indices []table.Index
srcs []*IndexLookUpExecutor
done bool
is infoschema.InfoSchema
exitCh chan struct{}
retCh chan error
dbName string
table table.Table
indexInfos []*model.IndexInfo
srcs []*IndexLookUpExecutor
done bool
is infoschema.InfoSchema
exitCh chan struct{}
retCh chan error
}

// Open implements the Executor Open interface.
Expand Down Expand Up @@ -487,7 +488,20 @@ func (e *CheckTableExec) Close() error {
return firstErr
}

func (e *CheckTableExec) checkIndexHandle(ctx context.Context, num int, src *IndexLookUpExecutor) error {
func (e *CheckTableExec) checkTableIndexHandle(ctx context.Context, idxInfo *model.IndexInfo) error {
// For partition table, there will be multi same index indexLookUpReaders on different partitions.
for _, src := range e.srcs {
if src.index.Name.L == idxInfo.Name.L {
err := e.checkIndexHandle(ctx, src)
if err != nil {
return err
}
}
}
return nil
}

func (e *CheckTableExec) checkIndexHandle(ctx context.Context, src *IndexLookUpExecutor) error {
cols := src.schema.Columns
retFieldTypes := make([]*types.FieldType, len(cols))
for i := range cols {
Expand Down Expand Up @@ -528,20 +542,19 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
defer func() { e.done = true }()

idxNames := make([]string, 0, len(e.indices))
for _, idx := range e.indices {
idxNames = append(idxNames, idx.Meta().Name.O)
idxNames := make([]string, 0, len(e.indexInfos))
for _, idx := range e.indexInfos {
idxNames = append(idxNames, idx.Name.O)
}
greater, idxOffset, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.tblInfo.Name.O, idxNames)
greater, idxOffset, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.table.Meta().Name.O, idxNames)
if err != nil {
tbl := e.srcs[idxOffset].table
if greater == admin.IdxCntGreater {
err = e.checkIndexHandle(ctx, idxOffset, e.srcs[idxOffset])
err = e.checkTableIndexHandle(ctx, e.indexInfos[idxOffset])
} else if greater == admin.TblCntGreater {
err = e.checkTableRecord(tbl, idxOffset)
err = e.checkTableRecord(idxOffset)
}
if err != nil && admin.ErrDataInConsistent.Equal(err) {
return ErrAdminCheckTable.GenWithStack("%v err:%v", tbl.Meta().Name, err)
return ErrAdminCheckTable.GenWithStack("%v err:%v", e.table.Meta().Name, err)
}
return errors.Trace(err)
}
Expand All @@ -555,7 +568,7 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
go func(num int) {
defer wg.Done()
util.WithRecovery(func() {
err1 := e.checkIndexHandle(ctx, num, e.srcs[num])
err1 := e.checkIndexHandle(ctx, e.srcs[num])
if err1 != nil {
logutil.Logger(ctx).Info("check index handle failed", zap.Error(err))
}
Expand All @@ -576,21 +589,24 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}

func (e *CheckTableExec) checkTableRecord(tbl table.Table, idxOffset int) error {
idx := e.indices[idxOffset]
func (e *CheckTableExec) checkTableRecord(idxOffset int) error {
idxInfo := e.indexInfos[idxOffset]
// TODO: Fix me later, can not use genExprs in indexLookUpReader, because the schema of expression is different.
genExprs := e.srcs[idxOffset].genExprs
txn, err := e.ctx.Txn(true)
if err != nil {
return err
}
if tbl.Meta().GetPartitionInfo() == nil {
return admin.CheckRecordAndIndex(e.ctx, txn, tbl, idx, genExprs)
if e.table.Meta().GetPartitionInfo() == nil {
idx := tables.NewIndex(e.table.Meta().ID, e.table.Meta(), idxInfo)
return admin.CheckRecordAndIndex(e.ctx, txn, e.table, idx, genExprs)
}

info := tbl.Meta().GetPartitionInfo()
info := e.table.Meta().GetPartitionInfo()
for _, def := range info.Definitions {
pid := def.ID
partition := tbl.(table.PartitionedTable).GetPartition(pid)
partition := e.table.(table.PartitionedTable).GetPartition(pid)
idx := tables.NewIndex(def.ID, e.table.Meta(), idxInfo)
if err := admin.CheckRecordAndIndex(e.ctx, txn, partition, idx, genExprs); err != nil {
return errors.Trace(err)
}
Expand Down
7 changes: 5 additions & 2 deletions executor/statement_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,12 @@ func (s *testSuite1) TestStatementContext(c *C) {
_, err = tk.Exec("insert t1 values (unhex('F0A48BAE'))")
c.Assert(err, NotNil)
c.Assert(terror.ErrorEqual(err, table.ErrTruncateWrongValue), IsTrue, Commentf("err %v", err))
config.GetGlobalConfig().CheckMb4ValueInUTF8 = false
conf := config.GetGlobalConfig()
conf.CheckMb4ValueInUTF8 = false
config.StoreGlobalConfig(conf)
tk.MustExec("insert t1 values (unhex('f09f8c80'))")
config.GetGlobalConfig().CheckMb4ValueInUTF8 = true
conf.CheckMb4ValueInUTF8 = true
config.StoreGlobalConfig(conf)
_, err = tk.Exec("insert t1 values (unhex('F0A48BAE'))")
c.Assert(err, NotNil)
}
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ type CheckTable struct {
baseSchemaProducer

DBName string
TblInfo *model.TableInfo
Indices []table.Index
Table table.Table
IndexInfos []*model.IndexInfo
IndexLookUpReaders []*PhysicalIndexLookUpReader
}

Expand Down
2 changes: 1 addition & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexIn
cop.tablePlan = ts
}

is.initSchema(ds.id, idx, cop.tablePlan != nil)
is.initSchema(idx, cop.tablePlan != nil)
indexConds, tblConds := splitIndexFilterConditions(filterConds, idx.Columns, ds.tableInfo)
path := &accessPath{
indexFilters: indexConds,
Expand Down
4 changes: 2 additions & 2 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty, candid
ts.SetSchema(ds.schema.Clone())
cop.tablePlan = ts
}
is.initSchema(ds.id, idx, cop.tablePlan != nil)
is.initSchema(idx, cop.tablePlan != nil)
// Only use expectedCnt when it's smaller than the count we calculated.
// e.g. IndexScan(count1)->After Filter(count2). The `ds.stats.RowCount` is count2. count1 is the one we need to calculate
// If expectedCnt and count2 are both zero and we go into the below `if` block, the count1 will be set to zero though it's shouldn't be.
Expand Down Expand Up @@ -567,7 +567,7 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty, candid
}

// TODO: refactor this part, we should not call Clone in fact.
func (is *PhysicalIndexScan) initSchema(id int, idx *model.IndexInfo, isDoubleRead bool) {
func (is *PhysicalIndexScan) initSchema(idx *model.IndexInfo, isDoubleRead bool) {
indexCols := make([]*expression.Column, 0, len(idx.Columns))
for _, col := range idx.Columns {
colFound := is.dataSourceSchema.FindColumnByName(col.Name.L)
Expand Down
Loading

0 comments on commit 80cc260

Please sign in to comment.