Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util/admin: support admin check table on partition table #12796

Merged
merged 9 commits into from
Oct 29, 2019
99 changes: 97 additions & 2 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,101 @@ func (s *testSuite5) TestAdminCleanupIndexMore(c *C) {
tk.MustExec("admin check table admin_test")
}

func (s *testSuite2) TestAdminCheckPartitionTableFailed(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists admin_test_p")
tk.MustExec("create table admin_test_p (c1 int key,c2 int,c3 int,index idx(c2)) partition by hash(c1) partitions 4")
tk.MustExec("insert admin_test_p (c1, c2, c3) values (0,0,0), (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5)")
tk.MustExec("admin check table admin_test_p")

// Make some corrupted index. Build the index information.
s.ctx = mock.NewContext()
s.ctx.Store = s.store
is := s.domain.InfoSchema()
dbName := model.NewCIStr("test")
tblName := model.NewCIStr("admin_test_p")
tbl, err := is.TableByName(dbName, tblName)
c.Assert(err, IsNil)
tblInfo := tbl.Meta()
idxInfo := tblInfo.Indices[0]
sc := s.ctx.GetSessionVars().StmtCtx
tk.Se.GetSessionVars().IndexLookupSize = 3
tk.Se.GetSessionVars().MaxChunkSize = 3

// Reduce one row of index on partitions.
// Table count > index count.
for i := 0; i <= 5; i++ {
partitionIdx := i % len(tblInfo.GetPartitionInfo().Definitions)
indexOpr := tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[partitionIdx].ID, tblInfo, idxInfo)
txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(i), int64(i), nil)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
err = tk.ExecToErr("admin check table admin_test_p")
c.Assert(err.Error(), Equals, fmt.Sprintf("[executor:8003]admin_test_p err:[admin:1]index:<nil> != record:&admin.RecordData{Handle:%d, Values:[]types.Datum{types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:%d, b:[]uint8(nil), x:interface {}(nil)}}}", i, i))
c.Assert(executor.ErrAdminCheckTable.Equal(err), IsTrue)
// TODO: fix admin recover for partition table.
//r := tk.MustQuery("admin recover index admin_test_p idx")
//r.Check(testkit.Rows("0 0"))
//tk.MustExec("admin check table admin_test_p")
// Manual recover index.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(i), int64(i))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
tk.MustExec("admin check table admin_test_p")
}

// Add one row of index on partitions.
// Table count < index count.
for i := 0; i <= 5; i++ {
partitionIdx := i % len(tblInfo.GetPartitionInfo().Definitions)
indexOpr := tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[partitionIdx].ID, tblInfo, idxInfo)
txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(i+8), int64(i+8))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
err = tk.ExecToErr("admin check table admin_test_p")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, fmt.Sprintf("handle %d, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:%d, b:[]uint8(nil), x:interface {}(nil)} != record:<nil>", i+8, i+8))
// TODO: fix admin recover for partition table.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(i+8), int64(i+8), nil)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
tk.MustExec("admin check table admin_test_p")
}

// Table count = index count, but the index value was wrong.
for i := 0; i <= 5; i++ {
partitionIdx := i % len(tblInfo.GetPartitionInfo().Definitions)
indexOpr := tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[partitionIdx].ID, tblInfo, idxInfo)
txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(i+8), int64(i))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
err = tk.ExecToErr("admin check table admin_test_p")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, fmt.Sprintf("col c2, handle %d, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:%d, b:[]uint8(nil), x:interface {}(nil)} != record:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:%d, b:[]uint8(nil), x:interface {}(nil)}", i, i+8, i))
// TODO: fix admin recover for partition table.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(i+8), int64(i), nil)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
tk.MustExec("admin check table admin_test_p")
}
}

func (s *testSuite5) TestAdminCheckTableFailed(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down Expand Up @@ -525,8 +620,8 @@ func (s *testSuite2) TestAdminCheckTable(c *C) {
tk.MustExec(`drop table if exists test`)
tk.MustExec(`create table test (
a time,
PRIMARY KEY (a)
);`)
PRIMARY KEY (a)
);`)

tk.MustExec(`insert into test set a='12:10:36';`)
tk.MustExec(`admin check table test`)
Expand Down
9 changes: 5 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,12 @@ func (b *executorBuilder) buildCheckTable(v *plannercore.CheckTable) Executor {
e := &CheckTableExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
dbName: v.DBName,
tblInfo: v.TblInfo,
indices: v.Indices,
table: v.Table,
indexInfos: v.IndexInfos,
is: b.is,
srcs: readerExecs,
exitCh: make(chan struct{}),
retCh: make(chan error, len(v.Indices)),
retCh: make(chan error, len(readerExecs)),
}
return e
}
Expand Down Expand Up @@ -1888,7 +1888,8 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
}
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
tbl, _ := b.is.TableByID(ts.Table.ID)
if isPartition, physicalTableID := ts.IsPartition(); isPartition {
isPartition, physicalTableID := ts.IsPartition()
if isPartition {
pt := tbl.(table.PartitionedTable)
tbl = pt.GetPartition(physicalTableID)
}
Expand Down
64 changes: 40 additions & 24 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -479,14 +480,14 @@ func getTableName(is infoschema.InfoSchema, id int64) string {
type CheckTableExec struct {
baseExecutor

dbName string
tblInfo *model.TableInfo
indices []table.Index
srcs []*IndexLookUpExecutor
done bool
is infoschema.InfoSchema
exitCh chan struct{}
retCh chan error
dbName string
table table.Table
indexInfos []*model.IndexInfo
srcs []*IndexLookUpExecutor
done bool
is infoschema.InfoSchema
exitCh chan struct{}
retCh chan error
}

// Open implements the Executor Open interface.
Expand Down Expand Up @@ -514,7 +515,20 @@ func (e *CheckTableExec) Close() error {
return firstErr
}

func (e *CheckTableExec) checkIndexHandle(ctx context.Context, num int, src *IndexLookUpExecutor) error {
func (e *CheckTableExec) checkTableIndexHandle(ctx context.Context, idxInfo *model.IndexInfo) error {
// For partition table, there will be multi same index indexLookUpReaders on different partitions.
for _, src := range e.srcs {
if src.index.Name.L == idxInfo.Name.L {
err := e.checkIndexHandle(ctx, src)
if err != nil {
return err
}
}
}
return nil
}

func (e *CheckTableExec) checkIndexHandle(ctx context.Context, src *IndexLookUpExecutor) error {
cols := src.schema.Columns
retFieldTypes := make([]*types.FieldType, len(cols))
for i := range cols {
Expand Down Expand Up @@ -555,20 +569,19 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
defer func() { e.done = true }()

idxNames := make([]string, 0, len(e.indices))
for _, idx := range e.indices {
idxNames = append(idxNames, idx.Meta().Name.O)
idxNames := make([]string, 0, len(e.indexInfos))
for _, idx := range e.indexInfos {
idxNames = append(idxNames, idx.Name.O)
}
greater, idxOffset, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.tblInfo.Name.O, idxNames)
greater, idxOffset, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.table.Meta().Name.O, idxNames)
if err != nil {
tbl := e.srcs[idxOffset].table
if greater == admin.IdxCntGreater {
err = e.checkIndexHandle(ctx, idxOffset, e.srcs[idxOffset])
err = e.checkTableIndexHandle(ctx, e.indexInfos[idxOffset])
} else if greater == admin.TblCntGreater {
err = e.checkTableRecord(tbl, idxOffset)
err = e.checkTableRecord(idxOffset)
}
if err != nil && admin.ErrDataInConsistent.Equal(err) {
return ErrAdminCheckTable.GenWithStack("%v err:%v", tbl.Meta().Name, err)
return ErrAdminCheckTable.GenWithStack("%v err:%v", e.table.Meta().Name, err)
}
return errors.Trace(err)
}
Expand All @@ -582,7 +595,7 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
go func(num int) {
defer wg.Done()
util.WithRecovery(func() {
err1 := e.checkIndexHandle(ctx, num, e.srcs[num])
err1 := e.checkIndexHandle(ctx, e.srcs[num])
if err1 != nil {
logutil.Logger(ctx).Info("check index handle failed", zap.Error(err))
}
Expand All @@ -603,21 +616,24 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}

func (e *CheckTableExec) checkTableRecord(tbl table.Table, idxOffset int) error {
idx := e.indices[idxOffset]
func (e *CheckTableExec) checkTableRecord(idxOffset int) error {
idxInfo := e.indexInfos[idxOffset]
// TODO: Fix me later, can not use genExprs in indexLookUpReader, because the schema of expression is different.
genExprs := e.srcs[idxOffset].genExprs
txn, err := e.ctx.Txn(true)
if err != nil {
return err
}
if tbl.Meta().GetPartitionInfo() == nil {
return admin.CheckRecordAndIndex(e.ctx, txn, tbl, idx, genExprs)
if e.table.Meta().GetPartitionInfo() == nil {
idx := tables.NewIndex(e.table.Meta().ID, e.table.Meta(), idxInfo)
return admin.CheckRecordAndIndex(e.ctx, txn, e.table, idx, genExprs)
}

info := tbl.Meta().GetPartitionInfo()
info := e.table.Meta().GetPartitionInfo()
for _, def := range info.Definitions {
pid := def.ID
partition := tbl.(table.PartitionedTable).GetPartition(pid)
partition := e.table.(table.PartitionedTable).GetPartition(pid)
idx := tables.NewIndex(def.ID, e.table.Meta(), idxInfo)
if err := admin.CheckRecordAndIndex(e.ctx, txn, partition, idx, genExprs); err != nil {
return errors.Trace(err)
}
Expand Down
7 changes: 5 additions & 2 deletions executor/statement_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,12 @@ func (s *testSuite1) TestStatementContext(c *C) {
_, err = tk.Exec("insert t1 values (unhex('F0A48BAE'))")
c.Assert(err, NotNil)
c.Assert(terror.ErrorEqual(err, table.ErrTruncateWrongValue), IsTrue, Commentf("err %v", err))
config.GetGlobalConfig().CheckMb4ValueInUTF8 = false
conf := config.GetGlobalConfig()
conf.CheckMb4ValueInUTF8 = false
config.StoreGlobalConfig(conf)
tk.MustExec("insert t1 values (unhex('f09f8c80'))")
config.GetGlobalConfig().CheckMb4ValueInUTF8 = true
conf.CheckMb4ValueInUTF8 = true
config.StoreGlobalConfig(conf)
_, err = tk.Exec("insert t1 values (unhex('F0A48BAE'))")
c.Assert(err, NotNil)
}
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ type CheckTable struct {
baseSchemaProducer

DBName string
TblInfo *model.TableInfo
Indices []table.Index
Table table.Table
IndexInfos []*model.IndexInfo
IndexLookUpReaders []*PhysicalIndexLookUpReader
}

Expand Down
2 changes: 1 addition & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ func (p *LogicalJoin) constructInnerIndexScanTask(
}
cop.tablePlan = ts
}
is.initSchema(ds.id, path.index, path.fullIdxCols, cop.tablePlan != nil)
is.initSchema(path.index, path.fullIdxCols, cop.tablePlan != nil)
rowSize := is.indexScanRowSize(path.index, ds, true)
sessVars := ds.ctx.GetSessionVars()
cop.cst = rowCount * rowSize * sessVars.ScanFactor
Expand Down
4 changes: 2 additions & 2 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ func (is *PhysicalIndexScan) indexScanRowSize(idx *model.IndexInfo, ds *DataSour
return ds.TblColHists.GetAvgRowSize(scanCols, true)
}

func (is *PhysicalIndexScan) initSchema(id int, idx *model.IndexInfo, idxExprCols []*expression.Column, isDoubleRead bool) {
func (is *PhysicalIndexScan) initSchema(idx *model.IndexInfo, idxExprCols []*expression.Column, isDoubleRead bool) {
indexCols := make([]*expression.Column, len(is.IdxCols), len(idx.Columns)+1)
copy(indexCols, is.IdxCols)
for i := len(is.IdxCols); i < len(idx.Columns); i++ {
Expand Down Expand Up @@ -1114,7 +1114,7 @@ func (ds *DataSource) getOriginalPhysicalIndexScan(prop *property.PhysicalProper
is.Hist = &statsTbl.Indices[idx.ID].Histogram
}
rowCount := path.countAfterAccess
is.initSchema(ds.id, idx, path.fullIdxCols, !isSingleScan)
is.initSchema(idx, path.fullIdxCols, !isSingleScan)
// Only use expectedCnt when it's smaller than the count we calculated.
// e.g. IndexScan(count1)->After Filter(count2). The `ds.stats.RowCount` is count2. count1 is the one we need to calculate
// If expectedCnt and count2 are both zero and we go into the below `if` block, the count1 will be set to zero though it's shouldn't be.
Expand Down
Loading