From 3b95add1bb2fb72efc02bb43223244943f69c97e Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 20 Jan 2023 14:18:19 +0800 Subject: [PATCH 01/24] ddl: check the key existence on original index --- ddl/index.go | 6 +++++ ddl/index_merge_tmp_test.go | 45 +++++++++++++++++++++++++++++++++++++ executor/batch_checker.go | 2 +- table/tables/index.go | 7 +++--- 4 files changed, 56 insertions(+), 4 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 221005f58c211..ed1bf0f9e62bd 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -872,6 +872,12 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) return false, ver, errors.Trace(err) case model.BackfillStateMerging: + failpoint.Inject("mockDMLExecutionMerging", func(val failpoint.Value) { + //nolint:forcetypeassert + if val.(bool) && MockDMLExecution != nil { + MockDMLExecution() + } + }) done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, true) if !done { return false, ver, err diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index b637a55d2925f..632f652696067 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -531,3 +531,48 @@ func TestAddIndexMergeConflictWithPessimistic(t *testing.T) { tk.MustExec("admin check table t;") tk.MustQuery("select * from t;").Check(testkit.Rows("1 2")) } + +func TestAddIndexMergeInsertOnMerging(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int default 0, b int default 0)") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &ddl.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } + var err error + switch job.SchemaState { + case model.StateDeleteOnly: + _, err = tk1.Exec("insert into t values (5, 5)") + assert.NoError(t, err) + case model.StateWriteOnly: + _, err = tk1.Exec("insert into t values (5, 7)") + assert.NoError(t, err) + _, err = tk1.Exec("delete from t where b = 7") + assert.NoError(t, err) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + var mockDMLExecErr error + ddl.MockDMLExecution = func() { + _, mockDMLExecErr = tk1.Exec("insert into t values (5, 8);") + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging", "1*return(true)->return(false)")) + tk.MustExec("alter table t add unique index idx(a);") + require.Error(t, mockDMLExecErr) // [kv:1062]Duplicate entry '5' for key 't.idx' + tk.MustQuery("select count(1) from t;").Check(testkit.Rows("1")) + tk.MustExec("admin check table t;") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging")) +} diff --git a/executor/batch_checker.go b/executor/batch_checker.go index 838c6af7bace0..7819e1e1e9b83 100644 --- a/executor/batch_checker.go +++ b/executor/batch_checker.go @@ -201,7 +201,7 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D continue } // If index is used ingest ways, then we should check key from temp index. - if v.Meta().State != model.StatePublic && v.Meta().BackfillState != model.BackfillStateInapplicable { + if v.Meta().State != model.StatePublic && v.Meta().BackfillState == model.BackfillStateRunning { _, key, _ = tables.GenTempIdxKeyByState(v.Meta(), key) } colValStr, err1 := formatDataForDupError(colVals) diff --git a/table/tables/index.go b/table/tables/index.go index 29e3964959aa9..021a0ecf773a8 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -299,9 +299,10 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue } } if lazyCheck { - var flags []kv.FlagsOp - if needPresumeKey != KeyInTempIndexIsDeleted { - flags = []kv.FlagsOp{kv.SetPresumeKeyNotExists} + flags := []kv.FlagsOp{kv.SetPresumeKeyNotExists} + if keyIsTempIdxKey && needPresumeKey == KeyInTempIndexIsDeleted { + // The writes temp index should check if the key is deleted. + flags = flags[:0] } if !vars.ConstraintCheckInPlacePessimistic && vars.TxnCtx.IsPessimistic && vars.InTxn() && !vars.InRestrictedSQL && vars.ConnectionID > 0 { From df2fb49621fb68fc89d882c290a52fa9aed9242d Mon Sep 17 00:00:00 2001 From: tangenta Date: Sat, 28 Jan 2023 15:15:49 +0800 Subject: [PATCH 02/24] add a test for insert on duplicate update --- ddl/index_merge_tmp_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index 632f652696067..355a2132a36cc 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -565,14 +565,15 @@ func TestAddIndexMergeInsertOnMerging(t *testing.T) { callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) d.SetHook(callback) - var mockDMLExecErr error ddl.MockDMLExecution = func() { - _, mockDMLExecErr = tk1.Exec("insert into t values (5, 8);") + _, err := tk1.Exec("insert into t values (5, 8);") + assert.Error(t, err) // [kv:1062]Duplicate entry '5' for key 't.idx' + _, err = tk1.Exec("insert into t values (5, 8) on duplicate key update a = 6;") + assert.NoError(t, err) // The row should be normally updated to (6, 5). } require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging", "1*return(true)->return(false)")) tk.MustExec("alter table t add unique index idx(a);") - require.Error(t, mockDMLExecErr) // [kv:1062]Duplicate entry '5' for key 't.idx' - tk.MustQuery("select count(1) from t;").Check(testkit.Rows("1")) tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("6 5")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging")) } From 576c3740f7ebd95bb3eebbfd20b786d7cb2df68a Mon Sep 17 00:00:00 2001 From: tangenta Date: Sat, 28 Jan 2023 15:17:14 +0800 Subject: [PATCH 03/24] update comment --- table/tables/index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/table/tables/index.go b/table/tables/index.go index 021a0ecf773a8..57871dcedac10 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -301,7 +301,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue if lazyCheck { flags := []kv.FlagsOp{kv.SetPresumeKeyNotExists} if keyIsTempIdxKey && needPresumeKey == KeyInTempIndexIsDeleted { - // The writes temp index should check if the key is deleted. + // Only writing to temp index should check if the key is deleted. flags = flags[:0] } if !vars.ConstraintCheckInPlacePessimistic && vars.TxnCtx.IsPessimistic && vars.InTxn() && From 6a9b48e69e6f9f24feb64778300d5a9f77ac922c Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 30 Jan 2023 20:06:40 +0800 Subject: [PATCH 04/24] fix replace and insert on duplicate update issues --- ddl/index.go | 7 ++- ddl/indexmergetest/merge_test.go | 77 ++++++++++++++++++++++++++- executor/batch_checker.go | 2 +- executor/insert.go | 46 ++++++++++------ executor/replace.go | 11 ++-- table/tables/index.go | 91 +++++++++++++++++++++++--------- tablecodec/tablecodec.go | 4 +- 7 files changed, 184 insertions(+), 54 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index ed1bf0f9e62bd..809c15bcd2a24 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -874,8 +874,8 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo case model.BackfillStateMerging: failpoint.Inject("mockDMLExecutionMerging", func(val failpoint.Value) { //nolint:forcetypeassert - if val.(bool) && MockDMLExecution != nil { - MockDMLExecution() + if val.(bool) && MockDMLExecutionMerging != nil { + MockDMLExecutionMerging() } }) done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, true) @@ -1717,6 +1717,9 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC // MockDMLExecution is only used for test. var MockDMLExecution func() +// MockDMLExecutionMerging is only used for test. +var MockDMLExecutionMerging func() + func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { if reorgInfo.mergingTmpIdx { logutil.BgLogger().Info("[ddl] start to merge temp index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) diff --git a/ddl/indexmergetest/merge_test.go b/ddl/indexmergetest/merge_test.go index f04a3a5cfa2c2..c2f448feede77 100644 --- a/ddl/indexmergetest/merge_test.go +++ b/ddl/indexmergetest/merge_test.go @@ -535,7 +535,7 @@ func TestAddIndexMergeInsertOnMerging(t *testing.T) { d := dom.DDL() originalCallback := d.GetHook() defer d.SetHook(originalCallback) - callback := &ddl.TestDDLCallback{} + callback := &callback.TestDDLCallback{} onJobUpdatedExportedFunc := func(job *model.Job) { if t.Failed() { return @@ -555,7 +555,7 @@ func TestAddIndexMergeInsertOnMerging(t *testing.T) { callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) d.SetHook(callback) - ddl.MockDMLExecution = func() { + ddl.MockDMLExecutionMerging = func() { _, err := tk1.Exec("insert into t values (5, 8);") assert.Error(t, err) // [kv:1062]Duplicate entry '5' for key 't.idx' _, err = tk1.Exec("insert into t values (5, 8) on duplicate key update a = 6;") @@ -567,3 +567,76 @@ func TestAddIndexMergeInsertOnMerging(t *testing.T) { tk.MustQuery("select * from t;").Check(testkit.Rows("6 5")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging")) } + +func TestAddIndexMergeReplaceOnMerging(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int default 0, b int default 0);") + tk.MustExec("insert into t values (5, 5);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + ddl.MockDMLExecution = func() { + _, err := tk1.Exec("delete from t where b = 5;") + assert.NoError(t, err) + } + + ddl.MockDMLExecutionMerging = func() { + _, err := tk1.Exec("replace into t values (5, 8);") + assert.NoError(t, err) + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecution", "1*return(true)->return(false)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging", "1*return(true)->return(false)")) + tk.MustExec("alter table t add unique index idx(a);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("5 8")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging")) +} + +func TestAddIndexMergeInsertToDeletedTempIndex(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int default 0, b int default 0)") + tk.MustExec("insert into t values (5, 5);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } + var err error + switch job.SchemaState { + case model.StateWriteOnly: + _, err = tk1.Exec("delete from t where b = 5") + assert.NoError(t, err) + _, err := tk1.Exec("set @@tidb_constraint_check_in_place = true;") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t values (5, 8);") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t values (5, 8);") + assert.Error(t, err) + _, err = tk1.Exec("set @@tidb_constraint_check_in_place = false;") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t values (5, 8);") + assert.Error(t, err) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + tk.MustExec("alter table t add unique index idx(a);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("5 8")) +} diff --git a/executor/batch_checker.go b/executor/batch_checker.go index 7819e1e1e9b83..838c6af7bace0 100644 --- a/executor/batch_checker.go +++ b/executor/batch_checker.go @@ -201,7 +201,7 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D continue } // If index is used ingest ways, then we should check key from temp index. - if v.Meta().State != model.StatePublic && v.Meta().BackfillState == model.BackfillStateRunning { + if v.Meta().State != model.StatePublic && v.Meta().BackfillState != model.BackfillStateInapplicable { _, key, _ = tables.GenTempIdxKeyByState(v.Meta(), key) } colValStr, err1 := formatDataForDupError(colVals) diff --git a/executor/insert.go b/executor/insert.go index 1a4eb27d3c6f6..df64430a2e37c 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -158,11 +158,8 @@ func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []t for _, r := range rows { for _, uk := range r.uniqueKeys { if val, found := values[string(uk.newKey)]; found { - if tablecodec.IsTempIndexKey(uk.newKey) { - if tablecodec.CheckTempIndexValueIsDelete(val) { - continue - } - val = tablecodec.DecodeTempIndexOriginValue(val) + if isTemp, _ := tablecodec.IsTempIndexKey(uk.newKey); isTemp { + continue } handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle) if err != nil { @@ -214,6 +211,31 @@ func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Tr return err } +func getDuplicatedHandle(uniqueKey kv.Key, val []byte, ctx context.Context, txn kv.Transaction, isCommon bool) (dupHandle kv.Handle, err error) { + if isTemp, idxID := tablecodec.IsTempIndexKey(uniqueKey); isTemp { + _, h, deleted, _, _ := tablecodec.DecodeTempIndexValue(val, isCommon) + if deleted { + originKey := uniqueKey.Clone() + tablecodec.TempIndexKey2IndexKey(idxID, originKey) + originVal, err := txn.Get(ctx, originKey) + if err != nil { + return nil, err + } + originHandle, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, isCommon) + if err != nil { + return nil, err + } + if originHandle.Equal(h) { + // The key has been deleted. This is not a duplicated key. + return nil, nil + } + return originHandle, nil + } + return h, nil + } + return tablecodec.DecodeHandleInUniqueIndexValue(val, isCommon) +} + // batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table. func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.Datum) error { // Get keys need to be checked. @@ -268,19 +290,13 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D } return err } - // Since the temp index stores deleted key with marked 'deleteu' for unique key at the end - // of value, So if return a key we check and skip deleted key. - if tablecodec.IsTempIndexKey(uk.newKey) { - if tablecodec.CheckTempIndexValueIsDelete(val) { - continue - } - val = tablecodec.DecodeTempIndexOriginValue(val) - } - handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle) + handle, err := getDuplicatedHandle(uk.newKey, val, ctx, txn, uk.commonHandle) if err != nil { return err } - + if handle == nil { + continue + } err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate) if err != nil { if kv.IsErrNotFound(err) { diff --git a/executor/replace.go b/executor/replace.go index bfc70ebc4451c..d7cd9e3a50a91 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -183,16 +183,13 @@ func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r } return false, false, err } - if tablecodec.IsTempIndexKey(uk.newKey) { - if tablecodec.CheckTempIndexValueIsDelete(val) { - continue - } - val = tablecodec.DecodeTempIndexOriginValue(val) - } - handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle) + handle, err := getDuplicatedHandle(uk.newKey, val, ctx, txn, uk.commonHandle) if err != nil { return false, true, err } + if handle == nil { + continue + } rowUnchanged, err := e.removeRow(ctx, txn, handle, r) if err != nil { return false, true, err diff --git a/table/tables/index.go b/table/tables/index.go index 57871dcedac10..581d64eb7bac5 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -281,28 +281,20 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue if err != nil && !kv.IsErrNotFound(err) { return nil, err } + // The index key value is not found or deleted. if err != nil || len(value) == 0 || (keyIsTempIdxKey && tablecodec.CheckTempIndexValueIsDelete(value)) { lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil - var needPresumeKey TempIndexKeyState if keyIsTempIdxKey { idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) - needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, key, distinct, h, c.tblInfo.IsCommonHandle) - if err != nil { - return nil, err - } - } else { - if len(tempKey) > 0 { - needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) - if err != nil { - return nil, err - } - } + } + needPresumeNotExists, err := needPresumeKeyNotExistsFlag(ctx, txn, key, tempKey, distinct, h, c.tblInfo.IsCommonHandle, keyIsTempIdxKey, c.idxInfo.ID) + if err != nil { + return nil, err } if lazyCheck { - flags := []kv.FlagsOp{kv.SetPresumeKeyNotExists} - if keyIsTempIdxKey && needPresumeKey == KeyInTempIndexIsDeleted { - // Only writing to temp index should check if the key is deleted. - flags = flags[:0] + var flags []kv.FlagsOp + if needPresumeNotExists { + flags = []kv.FlagsOp{kv.SetPresumeKeyNotExists} } if !vars.ConstraintCheckInPlacePessimistic && vars.TxnCtx.IsPessimistic && vars.InTxn() && !vars.InRestrictedSQL && vars.ConnectionID > 0 { @@ -317,7 +309,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue } if len(tempKey) > 0 { idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) - if lazyCheck && needPresumeKey != KeyInTempIndexIsDeleted { + if lazyCheck && needPresumeNotExists { err = txn.GetMemBuffer().SetWithFlags(tempKey, idxVal, kv.SetPresumeKeyNotExists) } else { err = txn.GetMemBuffer().Set(tempKey, idxVal) @@ -352,6 +344,54 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue return nil, nil } +func needPresumeKeyNotExistsFlag(ctx context.Context, txn kv.Transaction, key, tempKey kv.Key, distinct bool, + h kv.Handle, isCommon bool, keyIsTempIdxKey bool, idxID int64) (needFlag bool, err error) { + var tmpKeyState TempIndexKeyState + var tmpIdxHandle kv.Handle + // Check the value in temp/origin index key to determine if the key exists. + if keyIsTempIdxKey { + tmpKeyState, tmpIdxHandle, err = KeyExistInTempIndex(ctx, txn, key, distinct, h, isCommon) + if err != nil { + return false, err + } + } else if len(tempKey) > 0 { + tmpKeyState, tmpIdxHandle, err = KeyExistInTempIndex(ctx, txn, tempKey, distinct, h, isCommon) + if err != nil { + return false, err + } + } else { + return true, nil + } + if tmpKeyState == KeyInTempIndexIsDeleted { + // The key is deleted in temp index. However, it may still exist in the original index. + // We need to get the handle from original index value to determine + // whether the deleted marker matches. + // - If they match, we need "put"(overwrite) semantic, the "presumeKeyNotExists" flag is not needed. + // - If they don't match, we need "insert" semantic, the "presumeKeyNotExists" flag is required. + if distinct { + originKey := key.Clone() + tablecodec.TempIndexKey2IndexKey(idxID, originKey) + originVal, err := txn.Get(ctx, originKey) + if err != nil { + if kv.IsErrNotFound(err) { + err = nil // Suppress the error. + } + return false, err + } + originHandle, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, isCommon) + if err != nil { + return false, err + } + if originHandle.Equal(tmpIdxHandle) { + // The handle on deleted marker matches the handle in original index. + return false, nil + } + return true, nil + } + } + return true, nil +} + // Delete removes the entry for handle h and indexedValues from KV index. func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle) error { indexedValues := c.getIndexedValue(indexedValue) @@ -627,7 +667,7 @@ const ( // KeyExistInTempIndex is used to check the unique key exist status in temp index. func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (TempIndexKeyState, kv.Handle, error) { // Only check temp index key. - if !tablecodec.IsTempIndexKey(key) { + if isTemp, _ := tablecodec.IsTempIndexKey(key); !isTemp { return KeyInTempIndexUnknown, nil, nil } value, err := txn.Get(ctx, key) @@ -643,17 +683,18 @@ func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, di return KeyInTempIndexUnknown, nil, errors.New("temp index value length should great than 1") } - if tablecodec.CheckTempIndexValueIsDelete(value) { - return KeyInTempIndexIsDeleted, nil, nil + originVal, handle, isDelete, _, _ := tablecodec.DecodeTempIndexValue(value, IsCommonHandle) + if isDelete { + return KeyInTempIndexIsDeleted, handle, nil } // Check if handle equal. - var handle kv.Handle if distinct { - originVal := tablecodec.DecodeTempIndexOriginValue(value) - handle, err = tablecodec.DecodeHandleInUniqueIndexValue(originVal, IsCommonHandle) - if err != nil { - return KeyInTempIndexUnknown, nil, err + if handle == nil { + handle, err = tablecodec.DecodeHandleInUniqueIndexValue(originVal, IsCommonHandle) + if err != nil { + return KeyInTempIndexUnknown, nil, err + } } if !handle.Equal(h) { return KeyInTempIndexConflict, handle, kv.ErrKeyExists diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index b3726a414fa26..75a20e99fbf65 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -1153,7 +1153,7 @@ func TempIndexKey2IndexKey(originIdxID int64, tempIdxKey []byte) { } // IsTempIndexKey check whether the input key is for a temp index. -func IsTempIndexKey(indexKey []byte) bool { +func IsTempIndexKey(indexKey []byte) (bool, int64) { var ( indexIDKey []byte indexID int64 @@ -1163,7 +1163,7 @@ func IsTempIndexKey(indexKey []byte) bool { indexIDKey = indexKey[prefixLen : prefixLen+8] indexID = codec.DecodeCmpUintToInt(binary.BigEndian.Uint64(indexIDKey)) tempIndexID = int64(TempIndexPrefix) | indexID - return tempIndexID == indexID + return tempIndexID == indexID, indexID & IndexIDMask } // TempIndexValueFlag is the flag of temporary index value. From a2240376868767a7746093192a145e8ec7e5d4a9 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 31 Jan 2023 15:41:53 +0800 Subject: [PATCH 05/24] fix replace and insert on duplicate update issues --- executor/insert.go | 35 +------- executor/replace.go | 10 +-- table/tables/index.go | 200 ++++++++++++++++-------------------------- 3 files changed, 80 insertions(+), 165 deletions(-) diff --git a/executor/insert.go b/executor/insert.go index df64430a2e37c..080e24441dc25 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -211,31 +212,6 @@ func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Tr return err } -func getDuplicatedHandle(uniqueKey kv.Key, val []byte, ctx context.Context, txn kv.Transaction, isCommon bool) (dupHandle kv.Handle, err error) { - if isTemp, idxID := tablecodec.IsTempIndexKey(uniqueKey); isTemp { - _, h, deleted, _, _ := tablecodec.DecodeTempIndexValue(val, isCommon) - if deleted { - originKey := uniqueKey.Clone() - tablecodec.TempIndexKey2IndexKey(idxID, originKey) - originVal, err := txn.Get(ctx, originKey) - if err != nil { - return nil, err - } - originHandle, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, isCommon) - if err != nil { - return nil, err - } - if originHandle.Equal(h) { - // The key has been deleted. This is not a duplicated key. - return nil, nil - } - return originHandle, nil - } - return h, nil - } - return tablecodec.DecodeHandleInUniqueIndexValue(val, isCommon) -} - // batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table. func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.Datum) error { // Get keys need to be checked. @@ -283,14 +259,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D } for _, uk := range r.uniqueKeys { - val, err := txn.Get(ctx, uk.newKey) - if err != nil { - if kv.IsErrNotFound(err) { - continue - } - return err - } - handle, err := getDuplicatedHandle(uk.newKey, val, ctx, txn, uk.commonHandle) + handle, err := tables.FetchDuplicatedHandle(uk.newKey, ctx, txn, e.Table.Meta().ID, uk.commonHandle, false) if err != nil { return err } diff --git a/executor/replace.go b/executor/replace.go index d7cd9e3a50a91..5d0c43d122adf 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -176,17 +177,10 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error { // 3. error: the error. func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r toBeCheckedRow) (bool, bool, error) { for _, uk := range r.uniqueKeys { - val, err := txn.Get(ctx, uk.newKey) + handle, err := tables.FetchDuplicatedHandle(uk.newKey, ctx, txn, e.Table.Meta().ID, uk.commonHandle, false) if err != nil { - if kv.IsErrNotFound(err) { - continue - } return false, false, err } - handle, err := getDuplicatedHandle(uk.newKey, val, ctx, txn, uk.commonHandle) - if err != nil { - return false, true, err - } if handle == nil { continue } diff --git a/table/tables/index.go b/table/tables/index.go index 581d64eb7bac5..56a990c3b9cf2 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -16,7 +16,6 @@ package tables import ( "context" - "errors" "sync" "github.com/opentracing/opentracing-go" @@ -287,7 +286,8 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue if keyIsTempIdxKey { idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) } - needPresumeNotExists, err := needPresumeKeyNotExistsFlag(ctx, txn, key, tempKey, distinct, h, c.tblInfo.IsCommonHandle, keyIsTempIdxKey, c.idxInfo.ID) + needPresumeNotExists, err := needPresumeKeyNotExistsFlag(ctx, txn, key, tempKey, h, + keyIsTempIdxKey, c.tblInfo.IsCommonHandle, c.tblInfo.ID) if err != nil { return nil, err } @@ -344,52 +344,26 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue return nil, nil } -func needPresumeKeyNotExistsFlag(ctx context.Context, txn kv.Transaction, key, tempKey kv.Key, distinct bool, - h kv.Handle, isCommon bool, keyIsTempIdxKey bool, idxID int64) (needFlag bool, err error) { - var tmpKeyState TempIndexKeyState - var tmpIdxHandle kv.Handle - // Check the value in temp/origin index key to determine if the key exists. +func needPresumeKeyNotExistsFlag(ctx context.Context, txn kv.Transaction, key, tempKey kv.Key, + h kv.Handle, keyIsTempIdxKey bool, isCommon bool, tblID int64) (needFlag bool, err error) { + var dupHandle kv.Handle if keyIsTempIdxKey { - tmpKeyState, tmpIdxHandle, err = KeyExistInTempIndex(ctx, txn, key, distinct, h, isCommon) + dupHandle, err = FetchDuplicatedHandle(key, ctx, txn, tblID, isCommon, true) if err != nil { return false, err } } else if len(tempKey) > 0 { - tmpKeyState, tmpIdxHandle, err = KeyExistInTempIndex(ctx, txn, tempKey, distinct, h, isCommon) + dupHandle, err = FetchDuplicatedHandle(tempKey, ctx, txn, tblID, isCommon, true) if err != nil { return false, err } } else { return true, nil } - if tmpKeyState == KeyInTempIndexIsDeleted { - // The key is deleted in temp index. However, it may still exist in the original index. - // We need to get the handle from original index value to determine - // whether the deleted marker matches. - // - If they match, we need "put"(overwrite) semantic, the "presumeKeyNotExists" flag is not needed. - // - If they don't match, we need "insert" semantic, the "presumeKeyNotExists" flag is required. - if distinct { - originKey := key.Clone() - tablecodec.TempIndexKey2IndexKey(idxID, originKey) - originVal, err := txn.Get(ctx, originKey) - if err != nil { - if kv.IsErrNotFound(err) { - err = nil // Suppress the error. - } - return false, err - } - originHandle, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, isCommon) - if err != nil { - return false, err - } - if originHandle.Equal(tmpIdxHandle) { - // The handle on deleted marker matches the handle in original index. - return false, nil - } - return true, nil - } + if dupHandle != nil && !dupHandle.Equal(h) { + return false, kv.ErrKeyExists } - return true, nil + return false, nil } // Delete removes the entry for handle h and indexedValues from KV index. @@ -522,54 +496,87 @@ func GenTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tem func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle) (bool, kv.Handle, error) { indexedValues := c.getIndexedValue(indexedValue) for _, val := range indexedValues { - key, distinct, err := c.GenIndexKey(sc, val, h, nil) + key, _, err := c.GenIndexKey(sc, val, h, nil) if err != nil { return false, nil, err } - - var ( - tempKey []byte - keyVer byte - ) // If index current is in creating status and using ingest mode, we need first // check key exist status in temp index. - key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key) - if keyVer != TempIndexKeyTypeNone { - KeyExistInfo, h1, err1 := KeyExistInTempIndex(context.TODO(), txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) - if err1 != nil { - return false, nil, err - } - switch KeyExistInfo { - case KeyInTempIndexNotExist, KeyInTempIndexIsDeleted: - return false, nil, nil - case KeyInTempIndexConflict: - return true, h1, kv.ErrKeyExists - case KeyInTempIndexIsItself: - continue - } - } - - value, err := txn.Get(context.TODO(), key) - if kv.IsErrNotFound(err) { - return false, nil, nil + key, tempKey, _ := GenTempIdxKeyByState(c.idxInfo, key) + if len(tempKey) > 0 { + key = tempKey } + dupHandle, err := FetchDuplicatedHandle(key, context.TODO(), txn, c.tblInfo.ID, c.tblInfo.IsCommonHandle, true) if err != nil { return false, nil, err } + if dupHandle == nil { + return false, nil, nil + } + } + return true, h, nil +} - // For distinct index, the value of key is handle. - if distinct { - var handle kv.Handle - handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle) +// FetchDuplicatedHandle is used to find the duplicated row's handle for a given unique index key. +func FetchDuplicatedHandle(uniqueKey kv.Key, ctx context.Context, + txn kv.Transaction, tableID int64, isCommon bool, checkDistinct bool) (dupHandle kv.Handle, err error) { + val, err := txn.Get(ctx, uniqueKey) + if err != nil { + if kv.IsErrNotFound(err) { + return nil, nil + } + return nil, err + } + if isTemp, idxID := tablecodec.IsTempIndexKey(uniqueKey); isTemp { + originValInTmp, deletedHandle, deleted, _, _ := tablecodec.DecodeTempIndexValue(val, isCommon) + if deleted { + originKey := uniqueKey.Clone() + tablecodec.TempIndexKey2IndexKey(idxID, originKey) + originVal, err := txn.Get(ctx, originKey) if err != nil { - return false, nil, err + if kv.IsErrNotFound(err) { + // The key has been deleted. This is not a duplicated key. + return nil, nil + } + // Unexpected errors. + return nil, err + } + if checkDistinct && !tablecodec.IndexKVIsUnique(originVal) { + return nil, nil } - if !handle.Equal(h) { - return true, handle, kv.ErrKeyExists + originHandle, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, isCommon) + if err != nil { + return nil, err + } + if originHandle.Equal(deletedHandle) { + // The key has been deleted. This is not a duplicated key. + return nil, nil + } + // The inequality means multiple modifications happened in the same key. + // We use the handle in origin index value to check if the row exists. + recPrefix := tablecodec.GenTableRecordPrefix(tableID) + rowKey := tablecodec.EncodeRecordKey(recPrefix, originHandle) + _, err = txn.Get(context.Background(), rowKey) + if err != nil { + if kv.IsErrNotFound(err) { + // The key has been deleted. This is not a duplicated key. + return nil, nil + } + // Unexpected errors. + return nil, err } + // The row exists. This is the duplicated key. + return originHandle, nil } + if checkDistinct && !tablecodec.IndexKVIsUnique(originValInTmp) { + return nil, nil + } + return tablecodec.DecodeHandleInUniqueIndexValue(originValInTmp, isCommon) } - return true, h, nil + if checkDistinct && !tablecodec.IndexKVIsUnique(val) { + return nil, nil + } + return tablecodec.DecodeHandleInUniqueIndexValue(val, isCommon) } func (c *index) FetchValues(r []types.Datum, vals []types.Datum) ([]types.Datum, error) { @@ -647,58 +654,3 @@ func TryAppendCommonHandleRowcodecColInfos(colInfo []rowcodec.ColInfo, tblInfo * } return colInfo } - -// TempIndexKeyState is the state of the temporary index key. -type TempIndexKeyState byte - -const ( - // KeyInTempIndexUnknown whether the key exists or not in temp index is unknown. - KeyInTempIndexUnknown TempIndexKeyState = iota - // KeyInTempIndexNotExist the key is not exist in temp index. - KeyInTempIndexNotExist - // KeyInTempIndexIsDeleted the key is marked deleted in temp index. - KeyInTempIndexIsDeleted - // KeyInTempIndexIsItself the key is correlated to itself in temp index. - KeyInTempIndexIsItself - // KeyInTempIndexConflict the key is conflict in temp index. - KeyInTempIndexConflict -) - -// KeyExistInTempIndex is used to check the unique key exist status in temp index. -func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (TempIndexKeyState, kv.Handle, error) { - // Only check temp index key. - if isTemp, _ := tablecodec.IsTempIndexKey(key); !isTemp { - return KeyInTempIndexUnknown, nil, nil - } - value, err := txn.Get(ctx, key) - if kv.IsErrNotFound(err) { - return KeyInTempIndexNotExist, nil, nil - } - if err != nil { - return KeyInTempIndexUnknown, nil, err - } - - // Since KeyExistInTempIndex only accept temp index key, so the value length should great than 1 for key version. - if len(value) < 1 { - return KeyInTempIndexUnknown, nil, errors.New("temp index value length should great than 1") - } - - originVal, handle, isDelete, _, _ := tablecodec.DecodeTempIndexValue(value, IsCommonHandle) - if isDelete { - return KeyInTempIndexIsDeleted, handle, nil - } - - // Check if handle equal. - if distinct { - if handle == nil { - handle, err = tablecodec.DecodeHandleInUniqueIndexValue(originVal, IsCommonHandle) - if err != nil { - return KeyInTempIndexUnknown, nil, err - } - } - if !handle.Equal(h) { - return KeyInTempIndexConflict, handle, kv.ErrKeyExists - } - } - return KeyInTempIndexIsItself, handle, nil -} From c0548a4cd625f0db1fc40f80964597df19a641ea Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 31 Jan 2023 15:49:35 +0800 Subject: [PATCH 06/24] make context.Context as the first arg of a function --- executor/insert.go | 2 +- executor/replace.go | 2 +- table/tables/index.go | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/executor/insert.go b/executor/insert.go index 080e24441dc25..22d421d114b56 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -259,7 +259,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D } for _, uk := range r.uniqueKeys { - handle, err := tables.FetchDuplicatedHandle(uk.newKey, ctx, txn, e.Table.Meta().ID, uk.commonHandle, false) + handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, txn, e.Table.Meta().ID, uk.commonHandle, false) if err != nil { return err } diff --git a/executor/replace.go b/executor/replace.go index 5d0c43d122adf..72cf73512458c 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -177,7 +177,7 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error { // 3. error: the error. func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r toBeCheckedRow) (bool, bool, error) { for _, uk := range r.uniqueKeys { - handle, err := tables.FetchDuplicatedHandle(uk.newKey, ctx, txn, e.Table.Meta().ID, uk.commonHandle, false) + handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, txn, e.Table.Meta().ID, uk.commonHandle, false) if err != nil { return false, false, err } diff --git a/table/tables/index.go b/table/tables/index.go index 56a990c3b9cf2..e84d9cee25a7c 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -348,12 +348,12 @@ func needPresumeKeyNotExistsFlag(ctx context.Context, txn kv.Transaction, key, t h kv.Handle, keyIsTempIdxKey bool, isCommon bool, tblID int64) (needFlag bool, err error) { var dupHandle kv.Handle if keyIsTempIdxKey { - dupHandle, err = FetchDuplicatedHandle(key, ctx, txn, tblID, isCommon, true) + dupHandle, err = FetchDuplicatedHandle(ctx, key, txn, tblID, isCommon, true) if err != nil { return false, err } } else if len(tempKey) > 0 { - dupHandle, err = FetchDuplicatedHandle(tempKey, ctx, txn, tblID, isCommon, true) + dupHandle, err = FetchDuplicatedHandle(ctx, tempKey, txn, tblID, isCommon, true) if err != nil { return false, err } @@ -506,7 +506,7 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV if len(tempKey) > 0 { key = tempKey } - dupHandle, err := FetchDuplicatedHandle(key, context.TODO(), txn, c.tblInfo.ID, c.tblInfo.IsCommonHandle, true) + dupHandle, err := FetchDuplicatedHandle(context.TODO(), key, txn, c.tblInfo.ID, c.tblInfo.IsCommonHandle, true) if err != nil { return false, nil, err } @@ -518,7 +518,7 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV } // FetchDuplicatedHandle is used to find the duplicated row's handle for a given unique index key. -func FetchDuplicatedHandle(uniqueKey kv.Key, ctx context.Context, +func FetchDuplicatedHandle(ctx context.Context, uniqueKey kv.Key, txn kv.Transaction, tableID int64, isCommon bool, checkDistinct bool) (dupHandle kv.Handle, err error) { val, err := txn.Get(ctx, uniqueKey) if err != nil { From 549e14b8ddb94c56bdba1c962499098b61c0c866 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 31 Jan 2023 17:49:41 +0800 Subject: [PATCH 07/24] fix admin check --- executor/insert.go | 9 +++++- executor/replace.go | 9 +++++- table/tables/index.go | 67 +++++++++++++++++++++---------------------- 3 files changed, 49 insertions(+), 36 deletions(-) diff --git a/executor/insert.go b/executor/insert.go index 22d421d114b56..57bef07628f10 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -259,7 +259,14 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D } for _, uk := range r.uniqueKeys { - handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, txn, e.Table.Meta().ID, uk.commonHandle, false) + val, err := txn.Get(ctx, uk.newKey) + if err != nil { + if kv.IsErrNotFound(err) { + continue + } + return err + } + handle, err := tables.FetchDuplicatedHandleForDistinctKey(ctx, uk.newKey, val, txn, e.Table.Meta().ID, uk.commonHandle) if err != nil { return err } diff --git a/executor/replace.go b/executor/replace.go index 72cf73512458c..a437d4bf72299 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -177,10 +177,17 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error { // 3. error: the error. func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r toBeCheckedRow) (bool, bool, error) { for _, uk := range r.uniqueKeys { - handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, txn, e.Table.Meta().ID, uk.commonHandle, false) + val, err := txn.Get(ctx, uk.newKey) if err != nil { + if kv.IsErrNotFound(err) { + continue + } return false, false, err } + handle, err := tables.FetchDuplicatedHandleForDistinctKey(ctx, uk.newKey, val, txn, e.Table.Meta().ID, uk.commonHandle) + if err != nil { + return false, true, err + } if handle == nil { continue } diff --git a/table/tables/index.go b/table/tables/index.go index e84d9cee25a7c..36712f2172d27 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -346,20 +346,25 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue func needPresumeKeyNotExistsFlag(ctx context.Context, txn kv.Transaction, key, tempKey kv.Key, h kv.Handle, keyIsTempIdxKey bool, isCommon bool, tblID int64) (needFlag bool, err error) { - var dupHandle kv.Handle + var uniqueTempKey kv.Key if keyIsTempIdxKey { - dupHandle, err = FetchDuplicatedHandle(ctx, key, txn, tblID, isCommon, true) - if err != nil { - return false, err - } + uniqueTempKey = key } else if len(tempKey) > 0 { - dupHandle, err = FetchDuplicatedHandle(ctx, tempKey, txn, tblID, isCommon, true) - if err != nil { - return false, err - } + uniqueTempKey = tempKey } else { return true, nil } + val, err := txn.Get(ctx, uniqueTempKey) + if err != nil { + if kv.IsErrNotFound(err) { + return false, nil + } + return false, err + } + dupHandle, err := FetchDuplicatedHandleForDistinctKey(ctx, uniqueTempKey, val, txn, tblID, isCommon) + if err != nil { + return false, err + } if dupHandle != nil && !dupHandle.Equal(h) { return false, kv.ErrKeyExists } @@ -496,7 +501,7 @@ func GenTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tem func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle) (bool, kv.Handle, error) { indexedValues := c.getIndexedValue(indexedValue) for _, val := range indexedValues { - key, _, err := c.GenIndexKey(sc, val, h, nil) + key, distinct, err := c.GenIndexKey(sc, val, h, nil) if err != nil { return false, nil, err } @@ -506,31 +511,34 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV if len(tempKey) > 0 { key = tempKey } - dupHandle, err := FetchDuplicatedHandle(context.TODO(), key, txn, c.tblInfo.ID, c.tblInfo.IsCommonHandle, true) + val, err := txn.Get(context.TODO(), key) if err != nil { + if kv.IsErrNotFound(err) { + return false, nil, nil + } return false, nil, err } - if dupHandle == nil { - return false, nil, nil + if distinct { + dupHandle, err := FetchDuplicatedHandleForDistinctKey(context.TODO(), key, val, txn, c.tblInfo.ID, c.tblInfo.IsCommonHandle) + if err != nil { + return false, nil, err + } + if dupHandle == nil || !dupHandle.Equal(h) { + return false, nil, nil + } + continue } } return true, h, nil } -// FetchDuplicatedHandle is used to find the duplicated row's handle for a given unique index key. -func FetchDuplicatedHandle(ctx context.Context, uniqueKey kv.Key, - txn kv.Transaction, tableID int64, isCommon bool, checkDistinct bool) (dupHandle kv.Handle, err error) { - val, err := txn.Get(ctx, uniqueKey) - if err != nil { - if kv.IsErrNotFound(err) { - return nil, nil - } - return nil, err - } - if isTemp, idxID := tablecodec.IsTempIndexKey(uniqueKey); isTemp { +// FetchDuplicatedHandleForDistinctKey is used to find the duplicated row's handle for a given unique index key. +func FetchDuplicatedHandleForDistinctKey(ctx context.Context, distinctKey kv.Key, val []byte, + txn kv.Transaction, tableID int64, isCommon bool) (dupHandle kv.Handle, err error) { + if isTemp, idxID := tablecodec.IsTempIndexKey(distinctKey); isTemp { originValInTmp, deletedHandle, deleted, _, _ := tablecodec.DecodeTempIndexValue(val, isCommon) if deleted { - originKey := uniqueKey.Clone() + originKey := distinctKey.Clone() tablecodec.TempIndexKey2IndexKey(idxID, originKey) originVal, err := txn.Get(ctx, originKey) if err != nil { @@ -541,9 +549,6 @@ func FetchDuplicatedHandle(ctx context.Context, uniqueKey kv.Key, // Unexpected errors. return nil, err } - if checkDistinct && !tablecodec.IndexKVIsUnique(originVal) { - return nil, nil - } originHandle, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, isCommon) if err != nil { return nil, err @@ -568,14 +573,8 @@ func FetchDuplicatedHandle(ctx context.Context, uniqueKey kv.Key, // The row exists. This is the duplicated key. return originHandle, nil } - if checkDistinct && !tablecodec.IndexKVIsUnique(originValInTmp) { - return nil, nil - } return tablecodec.DecodeHandleInUniqueIndexValue(originValInTmp, isCommon) } - if checkDistinct && !tablecodec.IndexKVIsUnique(val) { - return nil, nil - } return tablecodec.DecodeHandleInUniqueIndexValue(val, isCommon) } From fc126100f3257d40341dbd47c03b373bbbb69ff1 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 1 Feb 2023 11:36:30 +0800 Subject: [PATCH 08/24] fix integration test --- executor/insert.go | 9 +---- executor/replace.go | 9 +---- table/tables/index.go | 81 +++++++++++++++++++++++-------------------- 3 files changed, 45 insertions(+), 54 deletions(-) diff --git a/executor/insert.go b/executor/insert.go index 57bef07628f10..5aca13c1ba1aa 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -259,14 +259,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D } for _, uk := range r.uniqueKeys { - val, err := txn.Get(ctx, uk.newKey) - if err != nil { - if kv.IsErrNotFound(err) { - continue - } - return err - } - handle, err := tables.FetchDuplicatedHandleForDistinctKey(ctx, uk.newKey, val, txn, e.Table.Meta().ID, uk.commonHandle) + _, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID, uk.commonHandle) if err != nil { return err } diff --git a/executor/replace.go b/executor/replace.go index a437d4bf72299..801ff40ebd252 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -177,17 +177,10 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error { // 3. error: the error. func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r toBeCheckedRow) (bool, bool, error) { for _, uk := range r.uniqueKeys { - val, err := txn.Get(ctx, uk.newKey) + _, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID, uk.commonHandle) if err != nil { - if kv.IsErrNotFound(err) { - continue - } return false, false, err } - handle, err := tables.FetchDuplicatedHandleForDistinctKey(ctx, uk.newKey, val, txn, e.Table.Meta().ID, uk.commonHandle) - if err != nil { - return false, true, err - } if handle == nil { continue } diff --git a/table/tables/index.go b/table/tables/index.go index 36712f2172d27..184eff71e3a76 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -354,18 +354,11 @@ func needPresumeKeyNotExistsFlag(ctx context.Context, txn kv.Transaction, key, t } else { return true, nil } - val, err := txn.Get(ctx, uniqueTempKey) - if err != nil { - if kv.IsErrNotFound(err) { - return false, nil - } - return false, err - } - dupHandle, err := FetchDuplicatedHandleForDistinctKey(ctx, uniqueTempKey, val, txn, tblID, isCommon) + foundKey, dupHandle, err := FetchDuplicatedHandle(ctx, uniqueTempKey, true, txn, tblID, isCommon) if err != nil { return false, err } - if dupHandle != nil && !dupHandle.Equal(h) { + if foundKey && dupHandle != nil && !dupHandle.Equal(h) { return false, kv.ErrKeyExists } return false, nil @@ -511,51 +504,53 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV if len(tempKey) > 0 { key = tempKey } - val, err := txn.Get(context.TODO(), key) - if err != nil { - if kv.IsErrNotFound(err) { - return false, nil, nil - } + foundKey, dupHandle, err := FetchDuplicatedHandle(context.TODO(), key, distinct, txn, c.tblInfo.ID, c.tblInfo.IsCommonHandle) + if err != nil || !foundKey { return false, nil, err } - if distinct { - dupHandle, err := FetchDuplicatedHandleForDistinctKey(context.TODO(), key, val, txn, c.tblInfo.ID, c.tblInfo.IsCommonHandle) - if err != nil { - return false, nil, err - } - if dupHandle == nil || !dupHandle.Equal(h) { - return false, nil, nil - } - continue + if dupHandle != nil && !dupHandle.Equal(h) { + return false, nil, err } + continue } return true, h, nil } -// FetchDuplicatedHandleForDistinctKey is used to find the duplicated row's handle for a given unique index key. -func FetchDuplicatedHandleForDistinctKey(ctx context.Context, distinctKey kv.Key, val []byte, - txn kv.Transaction, tableID int64, isCommon bool) (dupHandle kv.Handle, err error) { - if isTemp, idxID := tablecodec.IsTempIndexKey(distinctKey); isTemp { +// FetchDuplicatedHandle is used to find the duplicated row's handle for a given unique index key. +func FetchDuplicatedHandle(ctx context.Context, key kv.Key, distinct bool, + txn kv.Transaction, tableID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) { + val, err := txn.Get(ctx, key) + if err != nil { + if kv.IsErrNotFound(err) { + return false, nil, nil + } + // Unexpected errors. + return false, nil, err // Unexpected errors. + } + if isTemp, idxID := tablecodec.IsTempIndexKey(key); isTemp { originValInTmp, deletedHandle, deleted, _, _ := tablecodec.DecodeTempIndexValue(val, isCommon) if deleted { - originKey := distinctKey.Clone() + originKey := key.Clone() tablecodec.TempIndexKey2IndexKey(idxID, originKey) originVal, err := txn.Get(ctx, originKey) if err != nil { if kv.IsErrNotFound(err) { // The key has been deleted. This is not a duplicated key. - return nil, nil + return false, nil, nil } // Unexpected errors. - return nil, err + return false, nil, err + } + if !distinct { + return false, nil, nil } originHandle, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, isCommon) if err != nil { - return nil, err + return false, nil, err } if originHandle.Equal(deletedHandle) { // The key has been deleted. This is not a duplicated key. - return nil, nil + return false, nil, nil } // The inequality means multiple modifications happened in the same key. // We use the handle in origin index value to check if the row exists. @@ -564,18 +559,28 @@ func FetchDuplicatedHandleForDistinctKey(ctx context.Context, distinctKey kv.Key _, err = txn.Get(context.Background(), rowKey) if err != nil { if kv.IsErrNotFound(err) { - // The key has been deleted. This is not a duplicated key. - return nil, nil + // The row has been deleted. This is not a duplicated key. + return false, nil, nil } // Unexpected errors. - return nil, err + return false, nil, err } // The row exists. This is the duplicated key. - return originHandle, nil + return true, originHandle, nil } - return tablecodec.DecodeHandleInUniqueIndexValue(originValInTmp, isCommon) + // The value in temp index is not the delete marker. + if !distinct { + return true, nil, nil + } + h, err := tablecodec.DecodeHandleInUniqueIndexValue(originValInTmp, isCommon) + return true, h, err + } + // The index key is not from temp index. + if !distinct { + return true, nil, nil } - return tablecodec.DecodeHandleInUniqueIndexValue(val, isCommon) + h, err := tablecodec.DecodeHandleInUniqueIndexValue(val, isCommon) + return true, h, err } func (c *index) FetchValues(r []types.Datum, vals []types.Datum) ([]types.Datum, error) { From 1874a4e34e49a09de0cbb80710c0be5d74f04a8f Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 1 Feb 2023 20:43:01 +0800 Subject: [PATCH 09/24] add some comments --- executor/insert.go | 5 ++++- table/tables/index.go | 6 +++--- tablecodec/tablecodec.go | 4 ++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/executor/insert.go b/executor/insert.go index 5aca13c1ba1aa..83b486d5d1020 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -159,7 +159,10 @@ func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []t for _, r := range rows { for _, uk := range r.uniqueKeys { if val, found := values[string(uk.newKey)]; found { - if isTemp, _ := tablecodec.IsTempIndexKey(uk.newKey); isTemp { + if isTemp, _ := tablecodec.CheckTempIndexKey(uk.newKey); isTemp { + // If it is a temp index, the value cannot be decoded by DecodeHandleInUniqueIndexValue. + // Since this function is an optimization, we can skip prefetching the rows referenced by + // temp indexes. continue } handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle) diff --git a/table/tables/index.go b/table/tables/index.go index 184eff71e3a76..6283e733e9fc7 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -525,13 +525,13 @@ func FetchDuplicatedHandle(ctx context.Context, key kv.Key, distinct bool, return false, nil, nil } // Unexpected errors. - return false, nil, err // Unexpected errors. + return false, nil, err } - if isTemp, idxID := tablecodec.IsTempIndexKey(key); isTemp { + if isTemp, originIdxID := tablecodec.CheckTempIndexKey(key); isTemp { originValInTmp, deletedHandle, deleted, _, _ := tablecodec.DecodeTempIndexValue(val, isCommon) if deleted { originKey := key.Clone() - tablecodec.TempIndexKey2IndexKey(idxID, originKey) + tablecodec.TempIndexKey2IndexKey(originIdxID, originKey) originVal, err := txn.Get(ctx, originKey) if err != nil { if kv.IsErrNotFound(err) { diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index 75a20e99fbf65..9fa75104bff91 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -1152,8 +1152,8 @@ func TempIndexKey2IndexKey(originIdxID int64, tempIdxKey []byte) { binary.BigEndian.PutUint64(tempIdxKey[prefixLen:], eid) } -// IsTempIndexKey check whether the input key is for a temp index. -func IsTempIndexKey(indexKey []byte) (bool, int64) { +// CheckTempIndexKey checks whether the input key is for a temp index. +func CheckTempIndexKey(indexKey []byte) (isTemp bool, originIdxID int64) { var ( indexIDKey []byte indexID int64 From dda1225291f082488597732d84725df59335fe76 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 2 Feb 2023 15:46:55 +0800 Subject: [PATCH 10/24] address comment --- table/tables/index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/table/tables/index.go b/table/tables/index.go index 6283e733e9fc7..824fb7aaea85a 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -556,7 +556,7 @@ func FetchDuplicatedHandle(ctx context.Context, key kv.Key, distinct bool, // We use the handle in origin index value to check if the row exists. recPrefix := tablecodec.GenTableRecordPrefix(tableID) rowKey := tablecodec.EncodeRecordKey(recPrefix, originHandle) - _, err = txn.Get(context.Background(), rowKey) + _, err = txn.Get(ctx, rowKey) if err != nil { if kv.IsErrNotFound(err) { // The row has been deleted. This is not a duplicated key. From 0e36c800294f89e0dc74455572482c32ee832379 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 2 Feb 2023 21:18:22 +0800 Subject: [PATCH 11/24] fix merging case --- ddl/index.go | 9 ++- ddl/index_merge_tmp.go | 7 +++ ddl/indexmergetest/merge_test.go | 52 +++++++++++++++-- table/tables/index.go | 96 ++++++++++++++++++++------------ 4 files changed, 120 insertions(+), 44 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 87e9b585e2937..652438d3d10af 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -875,10 +875,10 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) return false, ver, errors.Trace(err) case model.BackfillStateMerging: - failpoint.Inject("mockDMLExecutionMerging", func(val failpoint.Value) { + failpoint.Inject("mockDMLExecutionStateMerging", func(val failpoint.Value) { //nolint:forcetypeassert - if val.(bool) && MockDMLExecutionMerging != nil { - MockDMLExecutionMerging() + if val.(bool) && MockDMLExecutionStateMerging != nil { + MockDMLExecutionStateMerging() } }) done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, true) @@ -1722,6 +1722,9 @@ var MockDMLExecution func() // MockDMLExecutionMerging is only used for test. var MockDMLExecutionMerging func() +// MockDMLExecutionStateMerging is only used for test. +var MockDMLExecutionStateMerging func() + func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { if reorgInfo.mergingTmpIdx { logutil.BgLogger().Info("[ddl] start to merge temp index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) diff --git a/ddl/index_merge_tmp.go b/ddl/index_merge_tmp.go index 302bb6a50a620..397c5ff29855a 100644 --- a/ddl/index_merge_tmp.go +++ b/ddl/index_merge_tmp.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/variable" @@ -198,6 +199,12 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC return nil }) + failpoint.Inject("mockDMLExecutionMerging", func(val failpoint.Value) { + //nolint:forcetypeassert + if val.(bool) && MockDMLExecutionMerging != nil { + MockDMLExecutionMerging() + } + }) logSlowOperations(time.Since(oprStartTime), "AddIndexMergeDataInTxn", 3000) return } diff --git a/ddl/indexmergetest/merge_test.go b/ddl/indexmergetest/merge_test.go index c2f448feede77..ab321e7e8fdf8 100644 --- a/ddl/indexmergetest/merge_test.go +++ b/ddl/indexmergetest/merge_test.go @@ -555,17 +555,17 @@ func TestAddIndexMergeInsertOnMerging(t *testing.T) { callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) d.SetHook(callback) - ddl.MockDMLExecutionMerging = func() { + ddl.MockDMLExecutionStateMerging = func() { _, err := tk1.Exec("insert into t values (5, 8);") assert.Error(t, err) // [kv:1062]Duplicate entry '5' for key 't.idx' _, err = tk1.Exec("insert into t values (5, 8) on duplicate key update a = 6;") assert.NoError(t, err) // The row should be normally updated to (6, 5). } - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging", "1*return(true)->return(false)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging", "1*return(true)->return(false)")) tk.MustExec("alter table t add unique index idx(a);") tk.MustExec("admin check table t;") tk.MustQuery("select * from t;").Check(testkit.Rows("6 5")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging")) } func TestAddIndexMergeReplaceOnMerging(t *testing.T) { @@ -584,17 +584,17 @@ func TestAddIndexMergeReplaceOnMerging(t *testing.T) { assert.NoError(t, err) } - ddl.MockDMLExecutionMerging = func() { + ddl.MockDMLExecutionStateMerging = func() { _, err := tk1.Exec("replace into t values (5, 8);") assert.NoError(t, err) } require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecution", "1*return(true)->return(false)")) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging", "1*return(true)->return(false)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging", "1*return(true)->return(false)")) tk.MustExec("alter table t add unique index idx(a);") tk.MustExec("admin check table t;") tk.MustQuery("select * from t;").Check(testkit.Rows("5 8")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging")) } func TestAddIndexMergeInsertToDeletedTempIndex(t *testing.T) { @@ -640,3 +640,43 @@ func TestAddIndexMergeInsertToDeletedTempIndex(t *testing.T) { tk.MustExec("admin check table t;") tk.MustQuery("select * from t;").Check(testkit.Rows("5 8")) } + +func TestAddIndexMergeDoubleDelete2(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, a int default 0);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } + switch job.SchemaState { + case model.StateDeleteOnly: + _, err := tk1.Exec("insert into t values (1, 1);") + assert.NoError(t, err) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + ddl.MockDMLExecutionMerging = func() { + _, err := tk1.Exec("replace into t values (2, 1);") + assert.NoError(t, err) + _, err = tk1.Exec("delete from t where id = 2;") + assert.NoError(t, err) + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging", "1*return(true)->return(false)")) + tk.MustExec("alter table t add unique index idx(a);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows()) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging")) +} diff --git a/table/tables/index.go b/table/tables/index.go index 824fb7aaea85a..cc86e20e667c1 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -519,31 +519,54 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV // FetchDuplicatedHandle is used to find the duplicated row's handle for a given unique index key. func FetchDuplicatedHandle(ctx context.Context, key kv.Key, distinct bool, txn kv.Transaction, tableID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) { - val, err := txn.Get(ctx, key) + if isTemp, originIdxID := tablecodec.CheckTempIndexKey(key); isTemp { + return fetchDuplicatedHandleForTempIndexKey(ctx, key, distinct, txn, tableID, originIdxID, isCommon) + } + // The index key is not from temp index. + val, err := getKeyInTxn(ctx, txn, key) + if err != nil || len(val) == 0 { + return false, nil, err + } + if distinct { + h, err := tablecodec.DecodeHandleInUniqueIndexValue(val, isCommon) + return true, h, err + } else { + return true, nil, nil + } +} + +func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, distinct bool, + txn kv.Transaction, tableID, idxID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) { + tempVal, err := getKeyInTxn(ctx, txn, tempKey) if err != nil { - if kv.IsErrNotFound(err) { - return false, nil, nil - } - // Unexpected errors. return false, nil, err } - if isTemp, originIdxID := tablecodec.CheckTempIndexKey(key); isTemp { - originValInTmp, deletedHandle, deleted, _, _ := tablecodec.DecodeTempIndexValue(val, isCommon) - if deleted { - originKey := key.Clone() - tablecodec.TempIndexKey2IndexKey(originIdxID, originKey) - originVal, err := txn.Get(ctx, originKey) + if tempVal == nil { + originKey := tempKey.Clone() + tablecodec.TempIndexKey2IndexKey(idxID, originKey) + originVal, err := getKeyInTxn(ctx, txn, originKey) + if err != nil || originVal == nil { + return false, nil, err + } + if distinct { + originHandle, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, isCommon) if err != nil { - if kv.IsErrNotFound(err) { - // The key has been deleted. This is not a duplicated key. - return false, nil, nil - } - // Unexpected errors. return false, nil, err } - if !distinct { - return false, nil, nil - } + return true, originHandle, err + } else { + return false, nil, nil + } + } + originValInTmp, deletedHandle, deleted, _, _ := tablecodec.DecodeTempIndexValue(tempVal, isCommon) + if deleted { + originKey := tempKey.Clone() + tablecodec.TempIndexKey2IndexKey(idxID, originKey) + originVal, err := getKeyInTxn(ctx, txn, originKey) + if err != nil || originVal == nil { + return false, nil, err + } + if distinct { originHandle, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, isCommon) if err != nil { return false, nil, err @@ -556,31 +579,34 @@ func FetchDuplicatedHandle(ctx context.Context, key kv.Key, distinct bool, // We use the handle in origin index value to check if the row exists. recPrefix := tablecodec.GenTableRecordPrefix(tableID) rowKey := tablecodec.EncodeRecordKey(recPrefix, originHandle) - _, err = txn.Get(ctx, rowKey) - if err != nil { - if kv.IsErrNotFound(err) { - // The row has been deleted. This is not a duplicated key. - return false, nil, nil - } - // Unexpected errors. + rowVal, err := getKeyInTxn(ctx, txn, rowKey) + if err != nil || rowVal == nil { return false, nil, err } // The row exists. This is the duplicated key. return true, originHandle, nil + } else { + return false, nil, nil } - // The value in temp index is not the delete marker. - if !distinct { - return true, nil, nil - } + } + // The value in temp index is not the delete marker. + if distinct { h, err := tablecodec.DecodeHandleInUniqueIndexValue(originValInTmp, isCommon) return true, h, err - } - // The index key is not from temp index. - if !distinct { + } else { return true, nil, nil } - h, err := tablecodec.DecodeHandleInUniqueIndexValue(val, isCommon) - return true, h, err +} + +func getKeyInTxn(ctx context.Context, txn kv.Transaction, key kv.Key) ([]byte, error) { + val, err := txn.Get(ctx, key) + if err != nil { + if kv.IsErrNotFound(err) { + return nil, nil + } + return nil, err + } + return val, nil } func (c *index) FetchValues(r []types.Datum, vals []types.Datum) ([]types.Datum, error) { From ee5cbd7a10f0f37d588e9cf3ea74b37a93d85e2c Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 2 Feb 2023 23:12:35 +0800 Subject: [PATCH 12/24] fix linter --- table/tables/index.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/table/tables/index.go b/table/tables/index.go index cc86e20e667c1..78c966c140684 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -530,9 +530,8 @@ func FetchDuplicatedHandle(ctx context.Context, key kv.Key, distinct bool, if distinct { h, err := tablecodec.DecodeHandleInUniqueIndexValue(val, isCommon) return true, h, err - } else { - return true, nil, nil } + return true, nil, nil } func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, distinct bool, @@ -554,9 +553,8 @@ func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, d return false, nil, err } return true, originHandle, err - } else { - return false, nil, nil } + return false, nil, nil } originValInTmp, deletedHandle, deleted, _, _ := tablecodec.DecodeTempIndexValue(tempVal, isCommon) if deleted { @@ -585,17 +583,15 @@ func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, d } // The row exists. This is the duplicated key. return true, originHandle, nil - } else { - return false, nil, nil } + return false, nil, nil } // The value in temp index is not the delete marker. if distinct { h, err := tablecodec.DecodeHandleInUniqueIndexValue(originValInTmp, isCommon) return true, h, err - } else { - return true, nil, nil } + return true, nil, nil } func getKeyInTxn(ctx context.Context, txn kv.Transaction, key kv.Key) ([]byte, error) { From b9c995b8546c8fd2d9ab30cb859365f416a1a822 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 7 Feb 2023 21:04:25 +0800 Subject: [PATCH 13/24] record all the operations to the temp index value --- ddl/index_merge_tmp.go | 65 +++++---- ddl/indexmergetest/merge_test.go | 50 ++++++- table/tables/index.go | 71 ++++++--- table/tables/mutation_checker.go | 27 +++- tablecodec/tablecodec.go | 239 ++++++++++++++++++++++--------- tablecodec/tablecodec_test.go | 59 +++++--- 6 files changed, 369 insertions(+), 142 deletions(-) diff --git a/ddl/index_merge_tmp.go b/ddl/index_merge_tmp.go index 7d7529a319f7a..e568bb84e89f8 100644 --- a/ddl/index_merge_tmp.go +++ b/ddl/index_merge_tmp.go @@ -259,40 +259,49 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor return false, nil } - originVal, handle, isDelete, unique, keyVer := tablecodec.DecodeTempIndexValue(rawValue, isCommonHandle) - if keyVer == tables.TempIndexKeyTypeMerge || keyVer == tables.TempIndexKeyTypeDelete { - // For 'm' version kvs, they are double-written. - // For 'd' version kvs, they are written in the delete-only state and can be dropped safely. - return true, nil + tempIdxVal, err := tablecodec.DecodeTempIndexValue(rawValue, isCommonHandle) + if err != nil { + return false, err } + tempIdxVal = tempIdxVal.FilterOverwritten() + + // Extract the operations on the original index and replay them later. + for _, elem := range tempIdxVal { + if elem.KeyVer == tables.TempIndexKeyTypeMerge || elem.KeyVer == tables.TempIndexKeyTypeDelete { + // For 'm' version kvs, they are double-written. + // For 'd' version kvs, they are written in the delete-only state and can be dropped safely. + return true, nil + } - if handle == nil { - // If the handle is not found in the value of the temp index, it means - // 1) This is not a deletion marker, the handle is in the key or the origin value. - // 2) This is a deletion marker, but the handle is in the key of temp index. - handle, err = tablecodec.DecodeIndexHandle(indexKey, originVal, len(w.index.Meta().Columns)) - if err != nil { - return false, err + if elem.Handle == nil { + // If the handle is not found in the value of the temp index, it means + // 1) This is not a deletion marker, the handle is in the key or the origin value. + // 2) This is a deletion marker, but the handle is in the key of temp index. + elem.Handle, err = tablecodec.DecodeIndexHandle(indexKey, elem.Value, len(w.index.Meta().Columns)) + if err != nil { + return false, err + } } - } - originIdxKey := make([]byte, len(indexKey)) - copy(originIdxKey, indexKey) - tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey) + originIdxKey := make([]byte, len(indexKey)) + copy(originIdxKey, indexKey) + tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey) - idxRecord := &temporaryIndexRecord{ - handle: handle, - delete: isDelete, - unique: unique, - skip: false, - } - if !isDelete { - idxRecord.vals = originVal - idxRecord.distinct = tablecodec.IndexKVIsUnique(originVal) + idxRecord := &temporaryIndexRecord{ + handle: elem.Handle, + delete: elem.IsDelete, + unique: elem.Distinct, + skip: false, + } + if !elem.IsDelete { + idxRecord.vals = elem.Value + idxRecord.distinct = tablecodec.IndexKVIsUnique(elem.Value) + } + w.tmpIdxRecords = append(w.tmpIdxRecords, idxRecord) + w.originIdxKeys = append(w.originIdxKeys, originIdxKey) + w.tmpIdxKeys = append(w.tmpIdxKeys, indexKey) } - w.tmpIdxRecords = append(w.tmpIdxRecords, idxRecord) - w.originIdxKeys = append(w.originIdxKeys, originIdxKey) - w.tmpIdxKeys = append(w.tmpIdxKeys, indexKey) + lastKey = indexKey return true, nil }) diff --git a/ddl/indexmergetest/merge_test.go b/ddl/indexmergetest/merge_test.go index 7b4c64a32a610..a8b2380912977 100644 --- a/ddl/indexmergetest/merge_test.go +++ b/ddl/indexmergetest/merge_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/ddl/ingest" "github.com/pingcap/tidb/ddl/internal/callback" "github.com/pingcap/tidb/ddl/testutil" + "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/tablecodec" @@ -649,7 +650,7 @@ func TestAddIndexMergeInsertToDeletedTempIndex(t *testing.T) { tk.MustQuery("select * from t;").Check(testkit.Rows("5 8")) } -func TestAddIndexMergeDoubleDelete2(t *testing.T) { +func TestAddIndexMergeReplaceDelete(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) @@ -688,3 +689,50 @@ func TestAddIndexMergeDoubleDelete2(t *testing.T) { tk.MustQuery("select * from t;").Check(testkit.Rows()) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging")) } + +func TestAddIndexMergeDeleteDifferentHandle(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, c char(10));") + tk.MustExec("insert into t values (1, 'a');") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + runInDeleteOnly := false + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() || runInDeleteOnly { + return + } + if job.SnapshotVer == 0 { + return + } + switch job.SchemaState { + case model.StateWriteReorganization: + _, err := tk1.Exec("insert into t values (2, 'a');") + assert.NoError(t, err) + _, err = tk1.Exec("replace into t values (3, 'a');") + assert.NoError(t, err) + runInDeleteOnly = true + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + ddl.MockDMLExecution = func() { + // It is too late to remove the duplicated index value. + _, err := tk1.Exec("delete from t where id = 1;") + assert.NoError(t, err) + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecution", "1*return(true)->return(false)")) + tk.MustGetErrCode("alter table t add unique index idx(c);", errno.ErrDupEntry) + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("3 a")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution")) +} diff --git a/table/tables/index.go b/table/tables/index.go index 5a6255e83e773..e682d160e37b2 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -238,18 +238,21 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue opt.IgnoreAssertion = opt.IgnoreAssertion || c.idxInfo.State != model.StatePublic if !distinct || skipCheck || opt.Untouched { + val := idxVal if keyIsTempIdxKey && !opt.Untouched { // Untouched key-values never occur in the storage. - idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer} + val = tempVal.Encode(nil) } - err = txn.GetMemBuffer().Set(key, idxVal) + err = txn.GetMemBuffer().Set(key, val) if err != nil { return nil, err } if len(tempKey) > 0 { if !opt.Untouched { // Untouched key-values never occur in the storage. - idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer} + val = tempVal.Encode(nil) } - err = txn.GetMemBuffer().Set(tempKey, idxVal) + err = txn.GetMemBuffer().Set(tempKey, val) if err != nil { return nil, err } @@ -279,11 +282,20 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue if err != nil && !kv.IsErrNotFound(err) { return nil, err } + var tempIdxVal tablecodec.TempIndexValue + if len(value) > 0 && keyIsTempIdxKey { + tempIdxVal, err = tablecodec.DecodeTempIndexValue(value, c.tblInfo.IsCommonHandle) + if err != nil { + return nil, err + } + } // The index key value is not found or deleted. - if err != nil || len(value) == 0 || (keyIsTempIdxKey && tablecodec.CheckTempIndexValueIsDelete(value)) { + if err != nil || len(value) == 0 || (!tempIdxVal.IsEmpty() && tempIdxVal.Current().IsDelete) { + val := idxVal lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil if keyIsTempIdxKey { - idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: true} + val = tempVal.Encode(value) } needPresumeNotExists, err := needPresumeKeyNotExistsFlag(ctx, txn, key, tempKey, h, keyIsTempIdxKey, c.tblInfo.IsCommonHandle, c.tblInfo.ID) @@ -299,19 +311,20 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue !vars.InRestrictedSQL && vars.ConnectionID > 0 { flags = append(flags, kv.SetNeedConstraintCheckInPrewrite) } - err = txn.GetMemBuffer().SetWithFlags(key, idxVal, flags...) + err = txn.GetMemBuffer().SetWithFlags(key, val, flags...) } else { - err = txn.GetMemBuffer().Set(key, idxVal) + err = txn.GetMemBuffer().Set(key, val) } if err != nil { return nil, err } if len(tempKey) > 0 { - idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: true} + val = tempVal.Encode(value) if lazyCheck && needPresumeNotExists { - err = txn.GetMemBuffer().SetWithFlags(tempKey, idxVal, kv.SetPresumeKeyNotExists) + err = txn.GetMemBuffer().SetWithFlags(tempKey, val, kv.SetPresumeKeyNotExists) } else { - err = txn.GetMemBuffer().Set(tempKey, idxVal) + err = txn.GetMemBuffer().Set(tempKey, val) } if err != nil { return nil, err @@ -331,8 +344,8 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue continue } - if keyIsTempIdxKey { - value = tablecodec.DecodeTempIndexOriginValue(value) + if keyIsTempIdxKey && !tempIdxVal.IsEmpty() { + value = tempIdxVal.Current().Value } handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle) if err != nil { @@ -373,6 +386,16 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed } key, tempKey, tempKeyVer := GenTempIdxKeyByState(c.idxInfo, key) + var originTempVal []byte + if len(tempKey) > 0 { + // Get the origin value of the temporary index key. + // Append the new delete operations to the end of the origin value. + originTempVal, err = getKeyInTxn(context.TODO(), txn, tempKey) + if err != nil { + return err + } + } + tempValElem := tablecodec.TempIndexValueElem{Handle: h, KeyVer: tempKeyVer, IsDelete: true, Distinct: distinct} if distinct { if len(key) > 0 { @@ -382,7 +405,8 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed } } if len(tempKey) > 0 { - tempVal := tablecodec.EncodeTempIndexValueDeletedUnique(h, tempKeyVer) + // Append to the end of the origin value for distinct value. + tempVal := tempValElem.Encode(originTempVal) err = txn.GetMemBuffer().Set(tempKey, tempVal) if err != nil { return err @@ -396,7 +420,7 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed } } if len(tempKey) > 0 { - tempVal := tablecodec.EncodeTempIndexValueDeleted(tempKeyVer) + tempVal := tempValElem.Encode(nil) err = txn.GetMemBuffer().Set(tempKey, tempVal) if err != nil { return err @@ -535,11 +559,11 @@ func FetchDuplicatedHandle(ctx context.Context, key kv.Key, distinct bool, func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, distinct bool, txn kv.Transaction, tableID, idxID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) { - tempVal, err := getKeyInTxn(ctx, txn, tempKey) + tempRawVal, err := getKeyInTxn(ctx, txn, tempKey) if err != nil { return false, nil, err } - if tempVal == nil { + if tempRawVal == nil { originKey := tempKey.Clone() tablecodec.TempIndexKey2IndexKey(idxID, originKey) originVal, err := getKeyInTxn(ctx, txn, originKey) @@ -555,8 +579,12 @@ func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, d } return false, nil, nil } - originValInTmp, deletedHandle, deleted, _, _ := tablecodec.DecodeTempIndexValue(tempVal, isCommon) - if deleted { + tempVal, err := tablecodec.DecodeTempIndexValue(tempRawVal, isCommon) + if err != nil { + return false, nil, err + } + curElem := tempVal.Current() + if curElem.IsDelete { originKey := tempKey.Clone() tablecodec.TempIndexKey2IndexKey(idxID, originKey) originVal, err := getKeyInTxn(ctx, txn, originKey) @@ -568,7 +596,7 @@ func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, d if err != nil { return false, nil, err } - if originHandle.Equal(deletedHandle) { + if originHandle.Equal(curElem.Handle) { // The key has been deleted. This is not a duplicated key. return false, nil, nil } @@ -587,12 +615,13 @@ func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, d } // The value in temp index is not the delete marker. if distinct { - h, err := tablecodec.DecodeHandleInUniqueIndexValue(originValInTmp, isCommon) + h, err := tablecodec.DecodeHandleInUniqueIndexValue(curElem.Value, isCommon) return true, h, err } return true, nil, nil } +// getKeyInTxn gets the value of the key in the transaction, and ignore the ErrNotExist error. func getKeyInTxn(ctx context.Context, txn kv.Transaction, key kv.Key) ([]byte, error) { val, err := txn.Get(ctx, key) if err != nil { diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 328989d88ad3f..137d2907825b8 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -154,7 +154,17 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, in indexHandle kv.Handle ) if idxID != m.indexID { - value = tablecodec.DecodeTempIndexOriginValue(m.value) + if tablecodec.TempIndexValueIsUntouched(m.value) { + // We never commit the untouched key values to the storage. Skip this check. + continue + } + tempIdxVal, err := tablecodec.DecodeTempIndexValue(m.value, false) + if err != nil { + return err + } + if !tempIdxVal.IsEmpty() { + value = tempIdxVal.Current().Value + } if len(value) == 0 { // Skip the deleted operation values. continue @@ -209,9 +219,20 @@ func checkIndexKeys( return errors.New("index not found") } + var isTmpIdxValAndDeleted bool // If this is temp index data, need remove last byte of index data. if idxID != m.indexID { - value = append(value, m.value[:len(m.value)-1]...) + if tablecodec.TempIndexValueIsUntouched(m.value) { + // We never commit the untouched key values to the storage. Skip this check. + continue + } + tmpVal, err := tablecodec.DecodeTempIndexValue(m.value, t.Meta().IsCommonHandle) + if err != nil { + return err + } + curElem := tmpVal.Current() + isTmpIdxValAndDeleted = curElem.IsDelete + value = append(value, curElem.Value...) } else { value = append(value, m.value...) } @@ -245,7 +266,7 @@ func checkIndexKeys( } // When it is in add index new backfill state. - if len(value) == 0 || (idxID != m.indexID && (tablecodec.CheckTempIndexValueIsDelete(value))) { + if len(value) == 0 || isTmpIdxValAndDeleted { err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToRemove, indexInfo, t.Meta()) } else { err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToInsert, indexInfo, t.Meta()) diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index 7043e69a12e4f..a5c90d5690d7a 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -1177,94 +1177,199 @@ func CheckTempIndexKey(indexKey []byte) (isTemp bool, originIdxID int64) { type TempIndexValueFlag byte const ( - // TempIndexValueFlagNormal means the following value is the normal index value. + // TempIndexValueFlagNormal means the following value is a distinct the normal index value. TempIndexValueFlagNormal TempIndexValueFlag = iota - // TempIndexValueFlagDeleted means this is a representation of a "delete" operation. + // TempIndexValueFlagNonDistinctNormal means the following value is the non-distinct normal index value. + TempIndexValueFlagNonDistinctNormal + // TempIndexValueFlagDeleted means the following value is the distinct and deleted index value. TempIndexValueFlagDeleted + // TempIndexValueFlagNonDistinctDeleted means the following value is the non-distinct deleted index value. + TempIndexValueFlagNonDistinctDeleted ) -// EncodeTempIndexValue encodes the value of temporary index. -// Note: this function changes the input value. -func EncodeTempIndexValue(value []byte, keyVer byte) []byte { - value = append(value, 0) - copy(value[1:], value[:len(value)-1]) - value[0] = byte(TempIndexValueFlagNormal) // normal flag + value + tempKeyVer - value = append(value, keyVer) - return value -} +// TempIndexValue is the value of temporary index. +// It contains one or more element, each element represents a history index operations on the original index. +// A temp index value element is encoded as one of: +// - [flag 1 byte][value_length 2 bytes ] [value value_len bytes] [key_version 1 byte] {distinct normal} +// - [flag 1 byte][value value_len bytes] [key_version 1 byte] {non-distinct normal} +// - [flag 1 byte][handle_length 2 bytes] [handle handle_len bytes] [key_version 1 byte] {distinct deleted} +// - [flag 1 byte] [key_version 1 byte] {non-distinct deleted} +// +// The temp index value is encoded as: +// - [element 1][element 2]...[element n] {for distinct values} +// - [element 1] {for non-distinct values} +type TempIndexValue []*TempIndexValueElem -// EncodeTempIndexValueDeletedUnique encodes the value of temporary index for unique index. -func EncodeTempIndexValueDeletedUnique(handle kv.Handle, keyVer byte) []byte { - var hEncoded []byte - var hLen int - if handle.IsInt() { - var data [8]byte - binary.BigEndian.PutUint64(data[:], uint64(handle.IntValue())) - hEncoded = data[:] - hLen = 8 - } else { - hEncoded = handle.Encoded() - hLen = len(hEncoded) - } - val := make([]byte, 0, 1+hLen+1) // deleted flag + handle + tempKeyVer - val = append(val, byte(TempIndexValueFlagDeleted)) - val = append(val, hEncoded...) - val = append(val, keyVer) - return val +// IsEmpty checks whether the value is empty. +func (v TempIndexValue) IsEmpty() bool { + return len(v) == 0 } -// EncodeTempIndexValueDeleted encodes the delete operation on origin index to a value for temporary index. -func EncodeTempIndexValueDeleted(keyVer byte) []byte { - // Handle is not needed because it is already in the key. - val := make([]byte, 0, 2) // deleted flag + tempKeyVer - val = append(val, byte(TempIndexValueFlagDeleted)) - val = append(val, keyVer) - return val +// Current returns the current latest temp index value. +func (v TempIndexValue) Current() *TempIndexValueElem { + return v[len(v)-1] } -// DecodeTempIndexValue decodes the value of temporary index. -func DecodeTempIndexValue(value []byte, isCommonHandle bool) (originVal []byte, handle kv.Handle, isDelete bool, isUnique bool, keyVer byte) { - if len(value) == 0 { - return nil, nil, false, false, 0 +// FilterOverwritten is used by the temp index merge process to remove the overwritten index operations. +// For example, the value {temp_idx_key -> [h2, h2d, h3, h1d]} recorded four operations on the original index. +// 'h2d' overwrite 'h2', we can remove 'h2' from the value. +func (v TempIndexValue) FilterOverwritten() TempIndexValue { + if len(v) <= 1 || !v[0].Distinct { + return v } - switch TempIndexValueFlag(value[0]) { - case TempIndexValueFlagNormal: - originVal = value[1 : len(value)-1] - keyVer = value[len(value)-1] - case TempIndexValueFlagDeleted: - isDelete = true - if len(value) == 2 { - keyVer = value[1] + occurred := kv.NewHandleMap() + for i := len(v) - 1; i >= 0; i-- { + if _, ok := occurred.Get(v[i].Handle); !ok { + occurred.Set(v[i].Handle, struct{}{}) } else { - isUnique = true - if isCommonHandle { - handle, _ = kv.NewCommonHandle(value[1 : len(value)-1]) + v[i] = nil + } + } + ret := v[:0] + for _, elem := range v { + if elem != nil { + ret = append(ret, elem) + } + } + return ret +} + +// TempIndexValueElem represents a history index operations on the original index. +// A temp index value element is encoded as one of: +// - [flag 1 byte][value_length 2 bytes ] [value value_len bytes] [key_version 1 byte] {distinct normal} +// - [flag 1 byte][value value_len bytes] [key_version 1 byte] {non-distinct normal} +// - [flag 1 byte][handle_length 2 bytes] [handle handle_len bytes] [key_version 1 byte] {distinct deleted} +// - [flag 1 byte] [key_version 1 byte] {non-distinct deleted} +type TempIndexValueElem struct { + Value []byte + Handle kv.Handle + KeyVer byte + IsDelete bool + Distinct bool +} + +// Encode encodes the temp index value. +func (v *TempIndexValueElem) Encode(buf []byte) []byte { + if v.IsDelete { + if v.Distinct { + handle := v.Handle + var hEncoded []byte + var hLen uint16 + if handle.IsInt() { + hEncoded = codec.EncodeUint(hEncoded, uint64(handle.IntValue())) + hLen = 8 } else { - handle = decodeIntHandleInIndexValue(value[1 : len(value)-1]) + hEncoded = handle.Encoded() + hLen = uint16(len(hEncoded)) + } + // flag + handle length + handle + temp key version + if buf == nil { + buf = make([]byte, 0, hLen+4) } - keyVer = value[len(value)-1] + buf = append(buf, byte(TempIndexValueFlagDeleted)) + buf = append(buf, byte(hLen>>8), byte(hLen)) + buf = append(buf, hEncoded...) + buf = append(buf, v.KeyVer) + return buf } - } - return + // flag + temp key version + if buf == nil { + buf = make([]byte, 0, 2) + } + buf = append(buf, byte(TempIndexValueFlagNonDistinctDeleted)) + buf = append(buf, v.KeyVer) + return buf + } + if v.Distinct { + // flag + value length + value + temp key version + if buf == nil { + buf = make([]byte, 0, len(v.Value)+4) + } + buf = append(buf, byte(TempIndexValueFlagNormal)) + vLen := uint16(len(v.Value)) + buf = append(buf, byte(vLen>>8), byte(vLen)) + buf = append(buf, v.Value...) + buf = append(buf, v.KeyVer) + return buf + } + // flag + value + temp key version + if buf == nil { + buf = make([]byte, 0, len(v.Value)+2) + } + buf = append(buf, byte(TempIndexValueFlagNonDistinctNormal)) + buf = append(buf, v.Value...) + buf = append(buf, v.KeyVer) + return buf } -// CheckTempIndexValueIsDelete checks whether the value is a delete operation. -func CheckTempIndexValueIsDelete(value []byte) bool { - if len(value) == 0 { - return false +// DecodeTempIndexValue decodes the temp index value. +func DecodeTempIndexValue(value []byte, isCommonHandle bool) (TempIndexValue, error) { + var ( + values []*TempIndexValueElem + err error + ) + for len(value) > 0 { + v := &TempIndexValueElem{} + value, err = v.DecodeOne(value, isCommonHandle) + if err != nil { + return nil, err + } + values = append(values, v) } - return TempIndexValueFlag(value[0]) == TempIndexValueFlagDeleted + return values, nil } -// DecodeTempIndexOriginValue decodes the value of origin index from a temp index value. -func DecodeTempIndexOriginValue(value []byte) []byte { - if len(value) == 0 { - return nil +// DecodeOne decodes one temp index value element. +func (v *TempIndexValueElem) DecodeOne(b []byte, isCommonHandle bool) (remain []byte, err error) { + flag := TempIndexValueFlag(b[0]) + b = b[1:] + switch flag { + case TempIndexValueFlagNormal: + vLen := (uint16(b[0]) << 8) + uint16(b[1]) + b = b[2:] + v.Value = b[:vLen] + b = b[vLen:] + v.KeyVer = b[0] + b = b[1:] + v.Distinct = true + v.Handle, err = DecodeHandleInUniqueIndexValue(v.Value, isCommonHandle) + return b, err + case TempIndexValueFlagNonDistinctNormal: + v.Value = b[:len(b)-1] + v.KeyVer = b[len(b)-1] + return nil, nil + case TempIndexValueFlagDeleted: + hLen := (uint16(b[0]) << 8) + uint16(b[1]) + b = b[2:] + if isCommonHandle { + v.Handle, _ = kv.NewCommonHandle(b[:hLen]) + } else { + v.Handle = decodeIntHandleInIndexValue(b[:hLen]) + } + b = b[hLen:] + v.KeyVer = b[0] + b = b[1:] + v.Distinct = true + v.IsDelete = true + return b, nil + case TempIndexValueFlagNonDistinctDeleted: + v.KeyVer = b[0] + b = b[1:] + v.IsDelete = true + return b, nil + default: + return nil, errors.New("invalid temp index value") } - if TempIndexValueFlag(value[0]) == TempIndexValueFlagNormal { - return value[1 : len(value)-1] +} + +// TempIndexValueIsUntouched returns true if the value is untouched. +// All the temp index value has the suffix of temp key version. +// All the temp key versions differ from the uncommitted KV flag. +func TempIndexValueIsUntouched(b []byte) bool { + if len(b) > 0 && b[len(b)-1] == kv.UnCommitIndexKVFlag { + return true } - return nil + return false } // GenIndexValuePortal is the portal for generating index value. diff --git a/tablecodec/tablecodec_test.go b/tablecodec/tablecodec_test.go index 231d58cf18bd3..01050b10fd88d 100644 --- a/tablecodec/tablecodec_test.go +++ b/tablecodec/tablecodec_test.go @@ -617,26 +617,41 @@ func TestTempIndexValueCodec(t *testing.T) { require.NoError(t, err) encodedValueCopy := make([]byte, len(encodedValue)) copy(encodedValueCopy, encodedValue) - tempIdxVal := EncodeTempIndexValue(encodedValue, 'b') - originVal, handle, isDelete, unique, keyVer := DecodeTempIndexValue(tempIdxVal, false) - require.Nil(t, handle) - require.False(t, isDelete || unique) - require.Equal(t, keyVer, byte('b')) - require.EqualValues(t, encodedValueCopy, originVal) - - tempIdxVal = EncodeTempIndexValueDeletedUnique(kv.IntHandle(100), 'm') - originVal, handle, isDelete, unique, keyVer = DecodeTempIndexValue(tempIdxVal, false) - require.Equal(t, handle.IntValue(), int64(100)) - require.True(t, isDelete) - require.True(t, unique) - require.Equal(t, keyVer, byte('m')) - require.Empty(t, originVal) - - tempIdxVal = EncodeTempIndexValueDeleted('b') - originVal, handle, isDelete, unique, keyVer = DecodeTempIndexValue(tempIdxVal, false) - require.Nil(t, handle) - require.True(t, isDelete) - require.False(t, unique) - require.Equal(t, keyVer, byte('b')) - require.Empty(t, originVal) + + tempIdxVal := TempIndexValueElem{ + Value: encodedValue, + KeyVer: 'b', + } + val := tempIdxVal.Encode(nil) + var newTempIdxVal TempIndexValueElem + remain, err := newTempIdxVal.DecodeOne(val, false) + require.NoError(t, err) + require.Equal(t, 0, len(remain)) + require.EqualValues(t, tempIdxVal, newTempIdxVal) + + idxVal := EncodeHandleInUniqueIndexValue(kv.IntHandle(100), false) + tempIdxVal = TempIndexValueElem{ + Value: idxVal, + KeyVer: 'm', + Distinct: true, + } + newTempIdxVal = TempIndexValueElem{} + val = tempIdxVal.Encode(nil) + remain, err = newTempIdxVal.DecodeOne(val, false) + require.NoError(t, err) + require.Equal(t, 0, len(remain)) + require.Equal(t, newTempIdxVal.Handle.IntValue(), int64(100)) + newTempIdxVal.Handle = nil + require.EqualValues(t, tempIdxVal, newTempIdxVal) + + tempIdxVal = TempIndexValueElem{ + IsDelete: true, + KeyVer: 'b', + } + newTempIdxVal = TempIndexValueElem{} + val = tempIdxVal.Encode(nil) + remain, err = newTempIdxVal.DecodeOne(val, false) + require.NoError(t, err) + require.Equal(t, 0, len(remain)) + require.EqualValues(t, tempIdxVal, newTempIdxVal) } From c2040b1d34babf6b1c56dd5f7668c750e023e0d0 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 7 Feb 2023 21:12:44 +0800 Subject: [PATCH 14/24] fix linter --- table/tables/mutation_checker.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 137d2907825b8..b9472a0b7f5b5 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -158,7 +158,8 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, in // We never commit the untouched key values to the storage. Skip this check. continue } - tempIdxVal, err := tablecodec.DecodeTempIndexValue(m.value, false) + var tempIdxVal tablecodec.TempIndexValue + tempIdxVal, err = tablecodec.DecodeTempIndexValue(m.value, false) if err != nil { return err } From c3984c48366f0b7203cf6ac820a064772722741e Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 7 Feb 2023 21:23:33 +0800 Subject: [PATCH 15/24] update bazel --- ddl/indexmergetest/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/ddl/indexmergetest/BUILD.bazel b/ddl/indexmergetest/BUILD.bazel index 5f6e4215f664e..b70146ae8d461 100644 --- a/ddl/indexmergetest/BUILD.bazel +++ b/ddl/indexmergetest/BUILD.bazel @@ -17,6 +17,7 @@ go_test( "//ddl/internal/callback", "//ddl/testutil", "//domain", + "//errno", "//kv", "//meta/autoid", "//parser/model", From 1c6e9d69632a5c4d0cc253a89d3733c3083cc8e2 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 8 Feb 2023 10:37:00 +0800 Subject: [PATCH 16/24] fix panic on clustered index table --- ddl/indexmergetest/merge_test.go | 46 +++++++++++++++++++++++++-- table/tables/mutation_checker.go | 8 ++--- table/tables/mutation_checker_test.go | 4 +-- 3 files changed, 49 insertions(+), 9 deletions(-) diff --git a/ddl/indexmergetest/merge_test.go b/ddl/indexmergetest/merge_test.go index a8b2380912977..9673a42238a4c 100644 --- a/ddl/indexmergetest/merge_test.go +++ b/ddl/indexmergetest/merge_test.go @@ -705,9 +705,9 @@ func TestAddIndexMergeDeleteDifferentHandle(t *testing.T) { originalCallback := d.GetHook() defer d.SetHook(originalCallback) callback := &callback.TestDDLCallback{} - runInDeleteOnly := false + runDML := false onJobUpdatedExportedFunc := func(job *model.Job) { - if t.Failed() || runInDeleteOnly { + if t.Failed() || runDML { return } if job.SnapshotVer == 0 { @@ -719,7 +719,7 @@ func TestAddIndexMergeDeleteDifferentHandle(t *testing.T) { assert.NoError(t, err) _, err = tk1.Exec("replace into t values (3, 'a');") assert.NoError(t, err) - runInDeleteOnly = true + runDML = true } } callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) @@ -736,3 +736,43 @@ func TestAddIndexMergeDeleteDifferentHandle(t *testing.T) { tk.MustQuery("select * from t;").Check(testkit.Rows("3 a")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution")) } + +func TestAddIndexDecodeTempIndexCommonHandle(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id_a bigint, id_b char(20), c char(20), primary key (id_a, id_b));") + tk.MustExec("insert into t values (1, 'id_1', 'char_1');") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + runDML := false + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() || runDML { + return + } + if job.SnapshotVer == 0 { + return + } + switch job.SchemaState { + case model.StateWriteReorganization: + _, err := tk1.Exec("insert into t values (2, 'id_2', 'char_2');") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t values (3, 'id_3', 'char_3');") + assert.NoError(t, err) + runDML = true + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + tk.MustExec("alter table t add unique index idx(c);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 id_1 char_1", "2 id_2 char_2", "3 id_3 char_3")) +} diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index b9472a0b7f5b5..9c32c8be35771 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -106,7 +106,7 @@ func CheckDataConsistency( // } if rowInsertion.key != nil { - if err = checkHandleConsistency(rowInsertion, indexMutations, columnMaps.IndexIDToInfo, t.Meta().Name.O); err != nil { + if err = checkHandleConsistency(rowInsertion, indexMutations, columnMaps.IndexIDToInfo, t.Meta()); err != nil { return errors.Trace(err) } } @@ -123,7 +123,7 @@ func CheckDataConsistency( // in row insertions and index insertions are consistent. // A PUT_index implies a PUT_row with the same handle. // Deletions are not checked since the values of deletions are unknown -func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, indexIDToInfo map[int64]*model.IndexInfo, tableName string) error { +func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, indexIDToInfo map[int64]*model.IndexInfo, tblInfo *model.TableInfo) error { var insertionHandle kv.Handle var err error @@ -159,7 +159,7 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, in continue } var tempIdxVal tablecodec.TempIndexValue - tempIdxVal, err = tablecodec.DecodeTempIndexValue(m.value, false) + tempIdxVal, err = tablecodec.DecodeTempIndexValue(m.value, tblInfo.IsCommonHandle) if err != nil { return err } @@ -181,7 +181,7 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, in } // NOTE: handle type can be different, see issue 29520 if indexHandle.IsInt() == insertionHandle.IsInt() && indexHandle.Compare(insertionHandle) != 0 { - err = ErrInconsistentHandle.GenWithStackByArgs(tableName, indexInfo.Name.O, indexHandle, insertionHandle, m, rowInsertion) + err = ErrInconsistentHandle.GenWithStackByArgs(tblInfo.Name, indexInfo.Name.O, indexHandle, insertionHandle, m, rowInsertion) logutil.BgLogger().Error("inconsistent handle in index and record insertions", zap.Error(err)) return err } diff --git a/table/tables/mutation_checker_test.go b/table/tables/mutation_checker_test.go index 43fb35c21a5b6..4c44e90a7d244 100644 --- a/table/tables/mutation_checker_test.go +++ b/table/tables/mutation_checker_test.go @@ -310,9 +310,9 @@ func TestCheckIndexKeysAndCheckHandleConsistency(t *testing.T) { require.Nil(t, err) rowMutation := mutation{key: rowKey, value: rowValue} corruptedRowMutation := mutation{key: corruptedRowKey, value: rowValue} - err = checkHandleConsistency(rowMutation, indexMutations, maps.IndexIDToInfo, "t") + err = checkHandleConsistency(rowMutation, indexMutations, maps.IndexIDToInfo, &tableInfo) require.Nil(t, err) - err = checkHandleConsistency(corruptedRowMutation, indexMutations, maps.IndexIDToInfo, "t") + err = checkHandleConsistency(corruptedRowMutation, indexMutations, maps.IndexIDToInfo, &tableInfo) require.NotNil(t, err) } } From bd7d17a3d06116f1746293c181fe53f99430e29e Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 8 Feb 2023 10:38:11 +0800 Subject: [PATCH 17/24] address comment --- table/tables/index.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/table/tables/index.go b/table/tables/index.go index e682d160e37b2..fdb3b7d33b4c3 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -387,8 +387,8 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed key, tempKey, tempKeyVer := GenTempIdxKeyByState(c.idxInfo, key) var originTempVal []byte - if len(tempKey) > 0 { - // Get the origin value of the temporary index key. + if len(tempKey) > 0 && c.idxInfo.Unique { + // Get the origin value of the unique temporary index key. // Append the new delete operations to the end of the origin value. originTempVal, err = getKeyInTxn(context.TODO(), txn, tempKey) if err != nil { From 7f4a3c4f5942ced494641196c6974eed2607d67d Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 8 Feb 2023 14:27:53 +0800 Subject: [PATCH 18/24] print value to log when inconsistency detected --- table/tables/mutation_checker.go | 11 +++++++---- table/tables/mutation_checker_test.go | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 9c32c8be35771..83f53cba5de0a 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -15,6 +15,7 @@ package tables import ( + "encoding/hex" "fmt" "strings" @@ -268,9 +269,9 @@ func checkIndexKeys( // When it is in add index new backfill state. if len(value) == 0 || isTmpIdxValAndDeleted { - err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToRemove, indexInfo, t.Meta()) + err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToRemove, indexInfo, t.Meta(), m.key, value) } else { - err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToInsert, indexInfo, t.Meta()) + err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToInsert, indexInfo, t.Meta(), m.key, value) } if err != nil { return errors.Trace(err) @@ -354,7 +355,7 @@ func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer // Returns error if the index data is not a subset of the input data. func compareIndexData( sc *stmtctx.StatementContext, cols []*table.Column, indexData, input []types.Datum, indexInfo *model.IndexInfo, - tableInfo *model.TableInfo, + tableInfo *model.TableInfo, key, value []byte, ) error { for i := range indexData { decodedMutationDatum := indexData[i] @@ -381,7 +382,9 @@ func compareIndexData( tableInfo.Name.O, indexInfo.Name.O, cols[indexInfo.Columns[i].Offset].ColumnInfo.Name.O, decodedMutationDatum.String(), expectedDatum.String(), ) - logutil.BgLogger().Error("inconsistent indexed value in index insertion", zap.Error(err)) + logutil.BgLogger().Error("inconsistent indexed value in index insertion", zap.Error(err), + zap.String("key", hex.EncodeToString(key)), + zap.String("value", hex.EncodeToString(value))) return err } } diff --git a/table/tables/mutation_checker_test.go b/table/tables/mutation_checker_test.go index 4c44e90a7d244..23652a15c6cbc 100644 --- a/table/tables/mutation_checker_test.go +++ b/table/tables/mutation_checker_test.go @@ -81,7 +81,7 @@ func TestCompareIndexData(t *testing.T) { } indexInfo := &model.IndexInfo{Name: model.NewCIStr("i0"), Columns: indexCols} - err := compareIndexData(sc, cols, data.indexData, data.inputData, indexInfo, &model.TableInfo{Name: model.NewCIStr("t")}) + err := compareIndexData(sc, cols, data.indexData, data.inputData, indexInfo, &model.TableInfo{Name: model.NewCIStr("t")}, nil, nil) require.Equal(t, data.correct, err == nil, "case id = %v", caseID) } } From 9dfe8afb850a8962fd669c4daef597b006a84824 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 8 Feb 2023 16:42:05 +0800 Subject: [PATCH 19/24] fix insert ignore may introduce invalid temp index value --- ddl/indexmergetest/merge_test.go | 38 ++++++++++++++++++++++++++++++++ table/tables/index.go | 4 ++-- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/ddl/indexmergetest/merge_test.go b/ddl/indexmergetest/merge_test.go index 9673a42238a4c..c840bf42bb6ea 100644 --- a/ddl/indexmergetest/merge_test.go +++ b/ddl/indexmergetest/merge_test.go @@ -776,3 +776,41 @@ func TestAddIndexDecodeTempIndexCommonHandle(t *testing.T) { tk.MustExec("admin check table t;") tk.MustQuery("select * from t;").Check(testkit.Rows("1 id_1 char_1", "2 id_2 char_2", "3 id_3 char_3")) } + +func TestAddIndexInsertIgnoreOnBackfill(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, b int);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + runDML := false + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() || runDML { + return + } + switch job.SchemaState { + case model.StateWriteReorganization: + _, err := tk1.Exec("insert ignore into t values (1, 1);") + assert.NoError(t, err) + _, err = tk1.Exec("insert ignore into t values (2, 2);") + assert.NoError(t, err) + _, err = tk1.Exec("update t set b = null where id = 1;") + assert.NoError(t, err) + runDML = true + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + tk.MustExec("alter table t add unique index idx(b);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 ", "2 2")) +} diff --git a/table/tables/index.go b/table/tables/index.go index fdb3b7d33b4c3..73ae9dd171a18 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -240,7 +240,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue if !distinct || skipCheck || opt.Untouched { val := idxVal if keyIsTempIdxKey && !opt.Untouched { // Untouched key-values never occur in the storage. - tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer} + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct} val = tempVal.Encode(nil) } err = txn.GetMemBuffer().Set(key, val) @@ -249,7 +249,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue } if len(tempKey) > 0 { if !opt.Untouched { // Untouched key-values never occur in the storage. - tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer} + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct} val = tempVal.Encode(nil) } err = txn.GetMemBuffer().Set(tempKey, val) From 86d5a9c2e64f0e6e09f44d265a6558af166d0f10 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 8 Feb 2023 17:19:54 +0800 Subject: [PATCH 20/24] move failpoint to a common place for distributed reorg --- ddl/index.go | 15 +++++++++------ ddl/indexmergetest/merge_test.go | 6 ++++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 8a10568573dba..25a0a64bed9a6 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -884,12 +884,6 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) return false, ver, errors.Trace(err) case model.BackfillStateMerging: - failpoint.Inject("mockDMLExecutionStateMerging", func(val failpoint.Value) { - //nolint:forcetypeassert - if val.(bool) && MockDMLExecutionStateMerging != nil { - MockDMLExecutionStateMerging() - } - }) done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, true) if !done { return false, ver, err @@ -970,6 +964,15 @@ func convertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *m func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) { elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}} + + failpoint.Inject("mockDMLExecutionStateMerging", func(val failpoint.Value) { + //nolint:forcetypeassert + if val.(bool) && indexInfo.BackfillState == model.BackfillStateMerging && + MockDMLExecutionStateMerging != nil { + MockDMLExecutionStateMerging() + } + }) + sctx, err1 := w.sessPool.get() if err1 != nil { err = err1 diff --git a/ddl/indexmergetest/merge_test.go b/ddl/indexmergetest/merge_test.go index c840bf42bb6ea..d95350c15ab5a 100644 --- a/ddl/indexmergetest/merge_test.go +++ b/ddl/indexmergetest/merge_test.go @@ -569,8 +569,9 @@ func TestAddIndexMergeInsertOnMerging(t *testing.T) { assert.Error(t, err) // [kv:1062]Duplicate entry '5' for key 't.idx' _, err = tk1.Exec("insert into t values (5, 8) on duplicate key update a = 6;") assert.NoError(t, err) // The row should be normally updated to (6, 5). + ddl.MockDMLExecutionStateMerging = nil } - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging", "1*return(true)->return(false)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging", "return(true)")) tk.MustExec("alter table t add unique index idx(a);") tk.MustExec("admin check table t;") tk.MustQuery("select * from t;").Check(testkit.Rows("6 5")) @@ -596,9 +597,10 @@ func TestAddIndexMergeReplaceOnMerging(t *testing.T) { ddl.MockDMLExecutionStateMerging = func() { _, err := tk1.Exec("replace into t values (5, 8);") assert.NoError(t, err) + ddl.MockDMLExecutionStateMerging = nil } require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecution", "1*return(true)->return(false)")) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging", "1*return(true)->return(false)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging", "return(true)")) tk.MustExec("alter table t add unique index idx(a);") tk.MustExec("admin check table t;") tk.MustQuery("select * from t;").Check(testkit.Rows("5 8")) From c365c3ea48e72aad880ca5a861374e3278ec74f2 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 8 Feb 2023 19:17:04 +0800 Subject: [PATCH 21/24] remove debug info and refine --- ddl/index_merge_tmp.go | 4 ++-- table/tables/index.go | 6 +++--- table/tables/mutation_checker.go | 13 +++++-------- tablecodec/tablecodec.go | 10 +++++----- tablecodec/tablecodec_test.go | 4 ++-- 5 files changed, 17 insertions(+), 20 deletions(-) diff --git a/ddl/index_merge_tmp.go b/ddl/index_merge_tmp.go index e568bb84e89f8..6c8b6a3d019d8 100644 --- a/ddl/index_merge_tmp.go +++ b/ddl/index_merge_tmp.go @@ -289,11 +289,11 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor idxRecord := &temporaryIndexRecord{ handle: elem.Handle, - delete: elem.IsDelete, + delete: elem.Delete, unique: elem.Distinct, skip: false, } - if !elem.IsDelete { + if !elem.Delete { idxRecord.vals = elem.Value idxRecord.distinct = tablecodec.IndexKVIsUnique(elem.Value) } diff --git a/table/tables/index.go b/table/tables/index.go index 73ae9dd171a18..8b8ce6660ca1c 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -290,7 +290,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue } } // The index key value is not found or deleted. - if err != nil || len(value) == 0 || (!tempIdxVal.IsEmpty() && tempIdxVal.Current().IsDelete) { + if err != nil || len(value) == 0 || (!tempIdxVal.IsEmpty() && tempIdxVal.Current().Delete) { val := idxVal lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil if keyIsTempIdxKey { @@ -395,7 +395,7 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed return err } } - tempValElem := tablecodec.TempIndexValueElem{Handle: h, KeyVer: tempKeyVer, IsDelete: true, Distinct: distinct} + tempValElem := tablecodec.TempIndexValueElem{Handle: h, KeyVer: tempKeyVer, Delete: true, Distinct: distinct} if distinct { if len(key) > 0 { @@ -584,7 +584,7 @@ func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, d return false, nil, err } curElem := tempVal.Current() - if curElem.IsDelete { + if curElem.Delete { originKey := tempKey.Clone() tablecodec.TempIndexKey2IndexKey(idxID, originKey) originVal, err := getKeyInTxn(ctx, txn, originKey) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 83f53cba5de0a..52b517eb93050 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -15,7 +15,6 @@ package tables import ( - "encoding/hex" "fmt" "strings" @@ -233,7 +232,7 @@ func checkIndexKeys( return err } curElem := tmpVal.Current() - isTmpIdxValAndDeleted = curElem.IsDelete + isTmpIdxValAndDeleted = curElem.Delete value = append(value, curElem.Value...) } else { value = append(value, m.value...) @@ -269,9 +268,9 @@ func checkIndexKeys( // When it is in add index new backfill state. if len(value) == 0 || isTmpIdxValAndDeleted { - err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToRemove, indexInfo, t.Meta(), m.key, value) + err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToRemove, indexInfo, t.Meta()) } else { - err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToInsert, indexInfo, t.Meta(), m.key, value) + err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToInsert, indexInfo, t.Meta()) } if err != nil { return errors.Trace(err) @@ -355,7 +354,7 @@ func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer // Returns error if the index data is not a subset of the input data. func compareIndexData( sc *stmtctx.StatementContext, cols []*table.Column, indexData, input []types.Datum, indexInfo *model.IndexInfo, - tableInfo *model.TableInfo, key, value []byte, + tableInfo *model.TableInfo, ) error { for i := range indexData { decodedMutationDatum := indexData[i] @@ -382,9 +381,7 @@ func compareIndexData( tableInfo.Name.O, indexInfo.Name.O, cols[indexInfo.Columns[i].Offset].ColumnInfo.Name.O, decodedMutationDatum.String(), expectedDatum.String(), ) - logutil.BgLogger().Error("inconsistent indexed value in index insertion", zap.Error(err), - zap.String("key", hex.EncodeToString(key)), - zap.String("value", hex.EncodeToString(value))) + logutil.BgLogger().Error("inconsistent indexed value in index insertion", zap.Error(err)) return err } } diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index a5c90d5690d7a..394277ba2768c 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -1212,7 +1212,7 @@ func (v TempIndexValue) Current() *TempIndexValueElem { // FilterOverwritten is used by the temp index merge process to remove the overwritten index operations. // For example, the value {temp_idx_key -> [h2, h2d, h3, h1d]} recorded four operations on the original index. -// 'h2d' overwrite 'h2', we can remove 'h2' from the value. +// Since 'h2d' overwrites 'h2', we can remove 'h2' from the value. func (v TempIndexValue) FilterOverwritten() TempIndexValue { if len(v) <= 1 || !v[0].Distinct { return v @@ -1244,13 +1244,13 @@ type TempIndexValueElem struct { Value []byte Handle kv.Handle KeyVer byte - IsDelete bool + Delete bool Distinct bool } // Encode encodes the temp index value. func (v *TempIndexValueElem) Encode(buf []byte) []byte { - if v.IsDelete { + if v.Delete { if v.Distinct { handle := v.Handle var hEncoded []byte @@ -1350,12 +1350,12 @@ func (v *TempIndexValueElem) DecodeOne(b []byte, isCommonHandle bool) (remain [] v.KeyVer = b[0] b = b[1:] v.Distinct = true - v.IsDelete = true + v.Delete = true return b, nil case TempIndexValueFlagNonDistinctDeleted: v.KeyVer = b[0] b = b[1:] - v.IsDelete = true + v.Delete = true return b, nil default: return nil, errors.New("invalid temp index value") diff --git a/tablecodec/tablecodec_test.go b/tablecodec/tablecodec_test.go index 01050b10fd88d..a23e4393fc1cd 100644 --- a/tablecodec/tablecodec_test.go +++ b/tablecodec/tablecodec_test.go @@ -645,8 +645,8 @@ func TestTempIndexValueCodec(t *testing.T) { require.EqualValues(t, tempIdxVal, newTempIdxVal) tempIdxVal = TempIndexValueElem{ - IsDelete: true, - KeyVer: 'b', + Delete: true, + KeyVer: 'b', } newTempIdxVal = TempIndexValueElem{} val = tempIdxVal.Encode(nil) From f51ce7f8eab6fc44e2de6ce0fee476dba6ed341f Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 8 Feb 2023 19:29:26 +0800 Subject: [PATCH 22/24] fix build --- table/tables/mutation_checker_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/table/tables/mutation_checker_test.go b/table/tables/mutation_checker_test.go index 23652a15c6cbc..4c44e90a7d244 100644 --- a/table/tables/mutation_checker_test.go +++ b/table/tables/mutation_checker_test.go @@ -81,7 +81,7 @@ func TestCompareIndexData(t *testing.T) { } indexInfo := &model.IndexInfo{Name: model.NewCIStr("i0"), Columns: indexCols} - err := compareIndexData(sc, cols, data.indexData, data.inputData, indexInfo, &model.TableInfo{Name: model.NewCIStr("t")}, nil, nil) + err := compareIndexData(sc, cols, data.indexData, data.inputData, indexInfo, &model.TableInfo{Name: model.NewCIStr("t")}) require.Equal(t, data.correct, err == nil, "case id = %v", caseID) } } From 0e75f01bd42688f664dea6a3bd0931ff9a6257d6 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 8 Feb 2023 20:40:09 +0800 Subject: [PATCH 23/24] should not return during iterating temp index value elem --- ddl/index_merge_tmp.go | 2 +- ddl/indexmergetest/merge_test.go | 42 ++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/ddl/index_merge_tmp.go b/ddl/index_merge_tmp.go index 6c8b6a3d019d8..029c87542de11 100644 --- a/ddl/index_merge_tmp.go +++ b/ddl/index_merge_tmp.go @@ -270,7 +270,7 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor if elem.KeyVer == tables.TempIndexKeyTypeMerge || elem.KeyVer == tables.TempIndexKeyTypeDelete { // For 'm' version kvs, they are double-written. // For 'd' version kvs, they are written in the delete-only state and can be dropped safely. - return true, nil + continue } if elem.Handle == nil { diff --git a/ddl/indexmergetest/merge_test.go b/ddl/indexmergetest/merge_test.go index d95350c15ab5a..f74db4e0b9eb9 100644 --- a/ddl/indexmergetest/merge_test.go +++ b/ddl/indexmergetest/merge_test.go @@ -816,3 +816,45 @@ func TestAddIndexInsertIgnoreOnBackfill(t *testing.T) { tk.MustExec("admin check table t;") tk.MustQuery("select * from t;").Check(testkit.Rows("1 ", "2 2")) } + +func TestAddIndexMultipleDelete(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, b int);") + tk.MustExec("insert into t values (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } + switch job.SchemaState { + case model.StateDeleteOnly: + _, err := tk1.Exec("delete from t where id in (4, 5, 6);") + assert.NoError(t, err) + case model.StateWriteOnly: + _, err := tk1.Exec("delete from t where id in (2, 3);") + assert.NoError(t, err) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + ddl.MockDMLExecution = func() { + _, err := tk1.Exec("delete from t where id = 1;") + assert.NoError(t, err) + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecution", "1*return(true)->return(false)")) + tk.MustExec("alter table t add unique index idx(b);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows()) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution")) +} From 8d1ed649c0afce666ae0e529cedca7e174aa99ed Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 8 Feb 2023 21:05:16 +0800 Subject: [PATCH 24/24] add more test for TempIndexValue encoding --- tablecodec/tablecodec_test.go | 43 +++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tablecodec/tablecodec_test.go b/tablecodec/tablecodec_test.go index a23e4393fc1cd..adc4ccc78c13b 100644 --- a/tablecodec/tablecodec_test.go +++ b/tablecodec/tablecodec_test.go @@ -654,4 +654,47 @@ func TestTempIndexValueCodec(t *testing.T) { require.NoError(t, err) require.Equal(t, 0, len(remain)) require.EqualValues(t, tempIdxVal, newTempIdxVal) + + tempIdxVal = TempIndexValueElem{ + Delete: true, + KeyVer: 'b', + Distinct: true, + Handle: kv.IntHandle(100), + } + newTempIdxVal = TempIndexValueElem{} + val = tempIdxVal.Encode(nil) + remain, err = newTempIdxVal.DecodeOne(val, false) + require.NoError(t, err) + require.Equal(t, 0, len(remain)) + require.EqualValues(t, tempIdxVal, newTempIdxVal) + + // Test multiple temp index value elements. + idxVal = EncodeHandleInUniqueIndexValue(kv.IntHandle(100), false) + tempIdxVal = TempIndexValueElem{ + Value: idxVal, + KeyVer: 'm', + Distinct: true, + } + tempIdxVal2 := TempIndexValueElem{ + Handle: kv.IntHandle(100), + KeyVer: 'm', + Distinct: true, + Delete: true, + } + idxVal3 := EncodeHandleInUniqueIndexValue(kv.IntHandle(101), false) + tempIdxVal3 := TempIndexValueElem{ + Value: idxVal3, + KeyVer: 'm', + Distinct: true, + } + val = tempIdxVal.Encode(nil) + val = tempIdxVal2.Encode(val) + val = tempIdxVal3.Encode(val) + var result TempIndexValue + result, err = DecodeTempIndexValue(val, false) + require.NoError(t, err) + require.Equal(t, 3, len(result)) + require.Equal(t, result[0].Handle.IntValue(), int64(100)) + require.Equal(t, result[1].Handle.IntValue(), int64(100)) + require.Equal(t, result[2].Handle.IntValue(), int64(101)) }