Skip to content

Commit

Permalink
ddl: set correct startHandle when add indices meets some errors (#6897)
Browse files Browse the repository at this point in the history
  • Loading branch information
winkyao committed Jun 25, 2018
1 parent 2612bba commit 2ef8b3d
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 14 deletions.
131 changes: 130 additions & 1 deletion ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
gofail "github.com/coreos/gofail/runtime"
"github.com/juju/errors"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
Expand All @@ -63,6 +65,8 @@ var _ = Suite(&testDBSuite{})
const defaultBatchSize = 2048

type testDBSuite struct {
cluster *mocktikv.Cluster
mvccStore mocktikv.MVCCStore
store kv.Storage
dom *domain.Domain
schemaName string
Expand All @@ -84,7 +88,16 @@ func (s *testDBSuite) SetUpSuite(c *C) {
s.autoIDStep = autoid.GetStep()
autoid.SetStep(5000)

s.store, err = mockstore.NewMockTikvStore()
s.cluster = mocktikv.NewCluster()
mocktikv.BootstrapWithSingleStore(s.cluster)
s.mvccStore = mocktikv.MustNewMVCCStore()
store, err := mockstore.NewMockTikvStore(
mockstore.WithCluster(s.cluster),
mockstore.WithMVCCStore(s.mvccStore),
)
c.Assert(err, IsNil)

s.store = store
c.Assert(err, IsNil)

s.dom, err = session.BootstrapSession(s.store)
Expand Down Expand Up @@ -1891,3 +1904,119 @@ func (s *testDBSuite) TestUpdateHandleFailed(c *C) {
result.Check(testkit.Rows("1"))
tk.MustExec("admin check index t idx_b")
}

func (s *testDBSuite) TestAddIndexFailed(c *C) {
gofail.Enable("github.com/pingcap/tidb/ddl/mockAddIndexErr", `return(true)`)
defer gofail.Disable("github.com/pingcap/tidb/ddl/mockAddIndexErr")
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("create database if not exists test_add_index_failed")
defer tk.MustExec("drop database test_add_index_failed")
tk.MustExec("use test_add_index_failed")

tk.MustExec("create table t(a bigint PRIMARY KEY, b int)")
for i := 0; i < 1000; i++ {
tk.MustExec(fmt.Sprintf("insert into t values(%v, %v)", i, i))
}

// Get table ID for split.
dom := domain.GetDomain(tk.Se)
is := dom.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test_add_index_failed"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tblID := tbl.Meta().ID

// Split the table.
s.cluster.SplitTable(s.mvccStore, tblID, 100)

tk.MustExec("alter table t add index idx_b(b)")
tk.MustExec("admin check index t idx_b")
tk.MustExec("admin check table t")
}

func (s *testDBSuite) getHistoryDDLJob(id int64) (*model.Job, error) {
var job *model.Job

err := kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err1 error
job, err1 = t.GetHistoryDDLJob(id)
return errors.Trace(err1)
})

return job, errors.Trace(err)
}

func (s *testDBSuite) TestBackwardCompatibility(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("create database if not exists test_backward_compatibility")
defer tk.MustExec("drop database test_backward_compatibility")
tk.MustExec("use test_backward_compatibility")
tk.MustExec("create table t(a int primary key, b int)")
for i := 0; i < 200; i++ {
tk.MustExec(fmt.Sprintf("insert into t values(%v, %v)", i, i))
}

// alter table t add index idx_b(b);
is := s.dom.InfoSchema()
schemaName := model.NewCIStr("test_backward_compatibility")
tableName := model.NewCIStr("t")
schema, ok := is.SchemaByName(schemaName)
c.Assert(ok, IsTrue)
tbl, err := is.TableByName(schemaName, tableName)
c.Assert(err, IsNil)

// Split the table.
s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, 100)

unique := false
indexName := model.NewCIStr("idx_b")
idxColName := &ast.IndexColName{
Column: &ast.ColumnName{
Schema: schemaName,
Table: tableName,
Name: model.NewCIStr("b"),
},
Length: types.UnspecifiedLength,
}
idxColNames := []*ast.IndexColName{idxColName}
var indexOption *ast.IndexOption
job := &model.Job{
SchemaID: schema.ID,
TableID: tbl.Meta().ID,
Type: model.ActionAddIndex,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{unique, indexName, idxColNames, indexOption},
}
txn, err := s.store.Begin()
c.Assert(err, IsNil)
t := meta.NewMeta(txn)
job.ID, err = t.GenGlobalID()
c.Assert(err, IsNil)
job.Version = 1
job.StartTS = txn.StartTS()

// Simulate old TiDB init the add index job, old TiDB will not init the model.Job.ReorgMeta field,
// if we set job.SnapshotVer here, can simulate the behavior.
job.SnapshotVer = txn.StartTS()
err = t.EnQueueDDLJob(job)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
ticker := time.NewTicker(s.lease)
for range ticker.C {
historyJob, err := s.getHistoryDDLJob(job.ID)
c.Assert(err, IsNil)
if historyJob == nil {

continue
}
c.Assert(historyJob.Error, IsNil)

if historyJob.IsSynced() {
break
}
}

// finished add index
tk.MustExec("admin check index t idx_b")
}
30 changes: 17 additions & 13 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,8 @@ func (w *addIndexWorker) handleBackfillTask(task *reorgIndexTask) *addIndexResul
return result
}

var gofailMockAddindexErrOnceGuard bool

func (w *addIndexWorker) run() {
log.Infof("[ddl-reorg] worker[%v] start", w.id)
defer func() {
Expand All @@ -637,8 +639,15 @@ func (w *addIndexWorker) run() {
if !more {
break
}

log.Debug("[ddl-reorg] got backfill index task:#v", task)

// gofail: var mockAddIndexErr bool
//if mockAddIndexErr && !gofailMockAddindexErrOnceGuard && w.id == 0 {
// gofailMockAddindexErrOnceGuard = true
// result := &addIndexResult{addedCount: 0, nextHandle: 0, err: errors.Errorf("mock add index error")}
// w.resultCh <- result
// continue
//}
result := w.handleBackfillTask(task)
w.resultCh <- result
}
Expand Down Expand Up @@ -727,14 +736,16 @@ func (d *ddl) waitTaskResults(workers []*addIndexWorker, taskCnt int, totalAdded
return nextHandle, addedCount, errors.Trace(firstErr)
}

// backfillBatchTasks send tasks to workers, and waits all the running worker return back result,
// handleReorgTasks send tasks to workers, and waits all the running worker return back result,
// there are taskCnt running workers.
func (d *ddl) backfillBatchTasks(startTime time.Time, startHandle int64, reorgInfo *reorgInfo, totalAddedCount *int64, workers []*addIndexWorker, batchTasks []*reorgIndexTask) error {
func (d *ddl) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64, workers []*addIndexWorker, batchTasks []*reorgIndexTask) error {
for i, task := range batchTasks {
workers[i].taskCh <- task
}

startHandle := batchTasks[0].startHandle
taskCnt := len(batchTasks)
startTime := time.Now()
nextHandle, taskAddedCount, err := d.waitTaskResults(workers, taskCnt, totalAddedCount, startHandle)
elapsedTime := time.Since(startTime).Seconds()
if err == nil {
Expand All @@ -760,23 +771,16 @@ func (d *ddl) backfillBatchTasks(startTime time.Time, startHandle int64, reorgIn
}

func (d *ddl) backfillKVRangesIndex(t table.Table, workers []*addIndexWorker, kvRanges []kv.KeyRange, job *model.Job, reorgInfo *reorgInfo) error {
var (
startTime time.Time
startHandle int64
endHandle int64
err error
)
totalAddedCount := job.GetRowCount()
batchTasks := make([]*reorgIndexTask, 0, len(workers))

log.Infof("[ddl-reorg] start to reorg index of %v region ranges.", len(kvRanges))
for i, keyRange := range kvRanges {
startTime = time.Now()

startHandle, endHandle, err = decodeHandleRange(keyRange)
startHandle, endHandle, err := decodeHandleRange(keyRange)
if err != nil {
return errors.Trace(err)
}

endKey := t.RecordKey(endHandle)
endIncluded := false
if endKey.Cmp(keyRange.EndKey) < 0 {
Expand All @@ -787,7 +791,7 @@ func (d *ddl) backfillKVRangesIndex(t table.Table, workers []*addIndexWorker, kv
batchTasks = append(batchTasks, task)
if len(batchTasks) >= len(workers) || i == (len(kvRanges)-1) {
// Wait tasks finish.
err = d.backfillBatchTasks(startTime, startHandle, reorgInfo, &totalAddedCount, workers, batchTasks)
err = d.handleReorgTasks(reorgInfo, &totalAddedCount, workers, batchTasks)
if err != nil {
return errors.Trace(err)
}
Expand Down

0 comments on commit 2ef8b3d

Please sign in to comment.