Skip to content

Commit

Permalink
*: speed up the operation of "admin check table" (pingcap#8572)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Jul 31, 2019
1 parent 16a81b6 commit ea09111
Show file tree
Hide file tree
Showing 10 changed files with 630 additions and 172 deletions.
116 changes: 111 additions & 5 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,112 @@ func (s *testSuite2) TestAdminCleanupIndexMore(c *C) {
tk.MustExec("admin check table admin_test")
}

func (s *testSuite2) TestAdminCheckTableFailed(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists admin_test")
tk.MustExec("create table admin_test (c1 int, c2 int, c3 varchar(255) default '1', primary key(c1), key(c3), unique key(c2), key(c2, c3))")
tk.MustExec("insert admin_test (c1, c2, c3) values (-10, -20, 'y'), (-1, -10, 'z'), (1, 11, 'a'), (2, 12, 'b'), (5, 15, 'c'), (10, 20, 'd'), (20, 30, 'e')")

// Make some corrupted index. Build the index information.
s.ctx = mock.NewContext()
s.ctx.Store = s.store
is := s.domain.InfoSchema()
dbName := model.NewCIStr("test")
tblName := model.NewCIStr("admin_test")
tbl, err := is.TableByName(dbName, tblName)
c.Assert(err, IsNil)
tblInfo := tbl.Meta()
idxInfo := tblInfo.Indices[1]
indexOpr := tables.NewIndex(tblInfo.ID, tblInfo, idxInfo)
sc := s.ctx.GetSessionVars().StmtCtx
tk.Se.GetSessionVars().IndexLookupSize = 3
tk.Se.GetSessionVars().MaxChunkSize = 3

// Reduce one row of index.
// Table count > index count.
// Index c2 is missing 11.
txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(-10), -1, nil)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
_, err = tk.Exec("admin check table admin_test")
c.Assert(err.Error(), Equals,
"[executor:8003]admin_test err:[admin:1]index:<nil> != record:&admin.RecordData{Handle:-1, Values:[]types.Datum{types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:-10, b:[]uint8(nil), x:interface {}(nil)}}}")
c.Assert(executor.ErrAdminCheckTable.Equal(err), IsTrue)
r := tk.MustQuery("admin recover index admin_test c2")
r.Check(testkit.Rows("1 7"))
tk.MustExec("admin check table admin_test")

// Add one row of index.
// Table count < index count.
// Index c2 has one more values ​​than table data: 0, and the handle 0 hasn't correlative record.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(0), 0)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
_, err = tk.Exec("admin check table admin_test")
c.Assert(err.Error(), Equals, "handle 0, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:0, b:[]uint8(nil), x:interface {}(nil)} != record:<nil>")

// Add one row of index.
// Table count < index count.
// Index c2 has two more values ​​than table data: 10, 13, and these handles have correlative record.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(0), 0, nil)
c.Assert(err, IsNil)
// Make sure the index value "19" is smaller "21". Then we scan to "19" before "21".
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(19), 10)
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(13), 2)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
_, err = tk.Exec("admin check table admin_test")
c.Assert(err.Error(), Equals, "col c2, handle 2, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:13, b:[]uint8(nil), x:interface {}(nil)} != record:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:12, b:[]uint8(nil), x:interface {}(nil)}")

// Table count = index count.
// Two indices have the same handle.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(13), 2, nil)
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(12), 2, nil)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
_, err = tk.Exec("admin check table admin_test")
c.Assert(err.Error(), Equals, "col c2, handle 10, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:19, b:[]uint8(nil), x:interface {}(nil)} != record:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:20, b:[]uint8(nil), x:interface {}(nil)}")

// Table count = index count.
// Index c2 has one line of data is 19, the corresponding table data is 20.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(12), 2)
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(20), 10, nil)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
_, err = tk.Exec("admin check table admin_test")
c.Assert(err.Error(), Equals, "col c2, handle 10, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:19, b:[]uint8(nil), x:interface {}(nil)} != record:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:20, b:[]uint8(nil), x:interface {}(nil)}")

// Recover records.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(19), 10, nil)
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(20), 10)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
tk.MustExec("admin check table admin_test")
}

func (s *testSuite1) TestAdminCheckTable(c *C) {
// test NULL value.
tk := testkit.NewTestKit(c, s.store)
Expand Down Expand Up @@ -455,16 +561,16 @@ func (s *testSuite1) TestAdminCheckTable(c *C) {

// Test index in virtual generated column.
tk.MustExec(`drop table if exists test`)
tk.MustExec(`create table test ( b json , c int as (JSON_EXTRACT(b,'$.d')) , index idxc(c));`)
tk.MustExec(`create table test ( b json , c int as (JSON_EXTRACT(b,'$.d')), index idxc(c));`)
tk.MustExec(`INSERT INTO test set b='{"d": 100}';`)
tk.MustExec(`admin check table test;`)
// Test prefix index.
tk.MustExec(`drop table if exists t`)
tk.MustExec(`CREATE TABLE t (
ID CHAR(32) NOT NULL,
name CHAR(32) NOT NULL,
value CHAR(255),
INDEX indexIDname (ID(8),name(8)));`)
ID CHAR(32) NOT NULL,
name CHAR(32) NOT NULL,
value CHAR(255),
INDEX indexIDname (ID(8),name(8)));`)
tk.MustExec(`INSERT INTO t VALUES ('keyword','urlprefix','text/ /text');`)
tk.MustExec(`admin check table t;`)

Expand Down
55 changes: 51 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ func (b *executorBuilder) buildCheckIndex(v *plannercore.CheckIndex) Executor {
b.err = err
return nil
}
readerExec.ranges = ranger.FullRange()
readerExec.isCheckOp = true

buildIndexLookUpChecker(b, v.IndexLookUpReader, readerExec)

e := &CheckIndexExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
Expand All @@ -320,12 +320,59 @@ func (b *executorBuilder) buildCheckIndex(v *plannercore.CheckIndex) Executor {
return e
}

// buildIndexLookUpChecker builds check information to IndexLookUpReader.
func buildIndexLookUpChecker(b *executorBuilder, readerPlan *plannercore.PhysicalIndexLookUpReader,
readerExec *IndexLookUpExecutor) {
is := readerPlan.IndexPlans[0].(*plannercore.PhysicalIndexScan)
readerExec.dagPB.OutputOffsets = make([]uint32, 0, len(is.Index.Columns))
for i := 0; i <= len(is.Index.Columns); i++ {
readerExec.dagPB.OutputOffsets = append(readerExec.dagPB.OutputOffsets, uint32(i))
}
readerExec.ranges = ranger.FullRange()
ts := readerPlan.TablePlans[0].(*plannercore.PhysicalTableScan)
readerExec.handleIdx = ts.HandleIdx

tps := make([]*types.FieldType, 0, len(is.Columns)+1)
for _, col := range is.Columns {
tps = append(tps, &col.FieldType)
}
tps = append(tps, types.NewFieldType(mysql.TypeLonglong))
readerExec.checkIndexValue = &checkIndexValue{genExprs: is.GenExprs, idxColTps: tps}

colNames := make([]string, 0, len(is.Columns))
for _, col := range is.Columns {
colNames = append(colNames, col.Name.O)
}
var err error
readerExec.idxTblCols, err = table.FindCols(readerExec.table.Cols(), colNames, true)
if err != nil {
b.err = errors.Trace(err)
return
}
}

func (b *executorBuilder) buildCheckTable(v *plannercore.CheckTable) Executor {
readerExecs := make([]*IndexLookUpExecutor, 0, len(v.IndexLookUpReaders))
for _, readerPlan := range v.IndexLookUpReaders {
readerExec, err := buildNoRangeIndexLookUpReader(b, readerPlan)
if err != nil {
b.err = errors.Trace(err)
return nil
}
buildIndexLookUpChecker(b, readerPlan, readerExec)

readerExecs = append(readerExecs, readerExec)
}

e := &CheckTableExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
tables: v.Tables,
dbName: v.DBName,
tblInfo: v.TblInfo,
indices: v.Indices,
is: b.is,
genExprs: v.GenExprs,
srcs: readerExecs,
exitCh: make(chan struct{}),
retCh: make(chan error, len(v.Indices)),
}
return e
}
Expand Down
Loading

0 comments on commit ea09111

Please sign in to comment.