Skip to content

Commit

Permalink
ddl: batch check the constrains when we add a unique-index. (#7132) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
winkyao authored Sep 6, 2018
1 parent 389131a commit d0a8faa
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 7 deletions.
1 change: 0 additions & 1 deletion ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ LOOP:
s.mustExec(c, "delete from t1 where c1 = ?", i+10)
}
sessionExec(c, s.store, "create index c3_index on t1 (c3)")

s.mustExec(c, "drop table t1")
}

Expand Down
93 changes: 87 additions & 6 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ type indexRecord struct {
handle int64
key []byte // It's used to lock a record. Record it to reduce the encoding time.
vals []types.Datum // It's the index values.
skip bool // skip indicates that the index key is already exists, we should not add it.
}

type addIndexWorker struct {
Expand All @@ -420,9 +421,13 @@ type addIndexWorker struct {
colFieldMap map[int64]*types.FieldType
closed bool

defaultVals []types.Datum // It's used to reduce the number of new slice.
idxRecords []*indexRecord // It's used to reduce the number of new slice.
rowMap map[int64]types.Datum // It's the index column values map. It is used to reduce the number of making map.
// The following attributes are used to reduce memory allocation.
defaultVals []types.Datum
idxRecords []*indexRecord
rowMap map[int64]types.Datum
idxKeyBufs [][]byte
batchCheckKeys []kv.Key
distinctCheckFlags []bool
}

type reorgIndexTask struct {
Expand Down Expand Up @@ -452,7 +457,6 @@ func newAddIndexWorker(sessCtx sessionctx.Context, d *ddl, id int, t table.Table
index: index,
table: t,
colFieldMap: colFieldMap,

defaultVals: make([]types.Datum, len(t.Cols())),
rowMap: make(map[int64]types.Datum, len(colFieldMap)),
}
Expand Down Expand Up @@ -582,6 +586,71 @@ func (w *addIndexWorker) logSlowOperations(elapsed time.Duration, slowMsg string
}
}

func (w *addIndexWorker) initBatchCheckBufs(batchCount int) {
if len(w.idxKeyBufs) < batchCount {
w.idxKeyBufs = make([][]byte, batchCount)
}

w.batchCheckKeys = w.batchCheckKeys[:0]
w.distinctCheckFlags = w.distinctCheckFlags[:0]
}

func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*indexRecord) error {
idxInfo := w.index.Meta()
if !idxInfo.Unique {
// non-unique key need not to check, just overwrite it,
// because in most case, backfilling indices is not exists.
return nil
}

w.initBatchCheckBufs(len(idxRecords))
stmtCtx := w.sessCtx.GetSessionVars().StmtCtx
for i, record := range idxRecords {
idxKey, distinct, err := w.index.GenIndexKey(stmtCtx, record.vals, record.handle, w.idxKeyBufs[i])
if err != nil {
return errors.Trace(err)
}
// save the buffer to reduce memory allocations.
w.idxKeyBufs[i] = idxKey

w.batchCheckKeys = append(w.batchCheckKeys, idxKey)
w.distinctCheckFlags = append(w.distinctCheckFlags, distinct)
}

batchVals, err := kv.BatchGetValues(txn, w.batchCheckKeys)
if err != nil {
return errors.Trace(err)
}

// 1. unique-key is duplicate and the handle is equal, skip it.
// 2. unique-key is duplicate and the handle is not equal, return duplicate error.
// 3. non-unique-key is duplicate, skip it.
for i, key := range w.batchCheckKeys {
if val, found := batchVals[string(key)]; found {
if w.distinctCheckFlags[i] {
handle, err1 := tables.DecodeHandle(val)
if err1 != nil {
return errors.Trace(err1)
}

if handle != idxRecords[i].handle {
return errors.Trace(kv.ErrKeyExists)
}
}
idxRecords[i].skip = true
} else {
// The keys in w.batchCheckKeys also maybe duplicate,
// so we need to backfill the not found key into `batchVals` map.
if w.distinctCheckFlags[i] {
batchVals[string(key)] = tables.EncodeHandle(idxRecords[i].handle)
}
}
}
// Constrains is already checked.
stmtCtx.BatchCheck = true
return nil
}

// backfillIndexInTxn will backfill table index in a transaction, lock corresponding rowKey, if the value of rowKey is changed,
// indicate that index columns values may changed, index is not allowed to be added, so the txn will rollback and retry.
// backfillIndexInTxn will add w.batchCnt indices once, default value of w.batchCnt is 128.
Expand All @@ -601,15 +670,27 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (nextHan
return errors.Trace(err)
}

err = w.batchCheckUniqueKey(txn, idxRecords)
if err != nil {
return errors.Trace(err)
}

for _, idxRecord := range idxRecords {
scanCount++
// The index is already exists, we skip it, no needs to backfill it.
// The following update, delete, insert on these rows, TiDB can handle it correctly.
if idxRecord.skip {
continue
}

// Lock the row key to notify us that someone delete or update the row,
// then we should not backfill the index of it, otherwise the adding index is redundant.
err := txn.LockKeys(idxRecord.key)
if err != nil {
return errors.Trace(err)
}
scanCount++

// Create the index.
// TODO: backfill unique-key will check constraint every row, we can speed up this case by using batch check.
handle, err := w.index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle)
if err != nil {
if kv.ErrKeyExists.Equal(err) && idxRecord.handle == handle {
Expand Down
16 changes: 16 additions & 0 deletions ddl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,22 @@ func (s *testIntegrationSuite) TestCreateTableIfNotExists(c *C) {
c.Assert(terror.ErrorEqual(infoschema.ErrTableExists, lastWarn.Err), IsTrue)
}

func (s *testIntegrationSuite) TestUniquekeyNullValue(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustExec("USE test")

tk.MustExec("create table t(a int primary key, b varchar(255))")

tk.MustExec("insert into t values(1, NULL)")
tk.MustExec("insert into t values(2, NULL)")
tk.MustExec("alter table t add unique index b(b);")
res := tk.MustQuery("select count(*) from t use index(b);")
res.Check(testkit.Rows("2"))
tk.MustExec("admin check table t")
tk.MustExec("admin check index t b")
}

func (s *testIntegrationSuite) TestEndIncluded(c *C) {
tk := testkit.NewTestKit(c, s.store)

Expand Down

0 comments on commit d0a8faa

Please sign in to comment.