diff --git a/ddl/index.go b/ddl/index.go index 4785b12e41090..833a825e24e30 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -962,6 +962,15 @@ func convertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *m func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) { elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}} + + failpoint.Inject("mockDMLExecutionStateMerging", func(val failpoint.Value) { + //nolint:forcetypeassert + if val.(bool) && indexInfo.BackfillState == model.BackfillStateMerging && + MockDMLExecutionStateMerging != nil { + MockDMLExecutionStateMerging() + } + }) + sctx, err1 := w.sessPool.get() if err1 != nil { err = err1 @@ -1789,6 +1798,12 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC // MockDMLExecution is only used for test. var MockDMLExecution func() +// MockDMLExecutionMerging is only used for test. +var MockDMLExecutionMerging func() + +// MockDMLExecutionStateMerging is only used for test. +var MockDMLExecutionStateMerging func() + func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { if reorgInfo.mergingTmpIdx { logutil.BgLogger().Info("[ddl] start to merge temp index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) diff --git a/ddl/index_merge_tmp.go b/ddl/index_merge_tmp.go index 6e3e98b04380b..029c87542de11 100644 --- a/ddl/index_merge_tmp.go +++ b/ddl/index_merge_tmp.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/variable" @@ -198,6 +199,12 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC return nil }) + failpoint.Inject("mockDMLExecutionMerging", func(val failpoint.Value) { + //nolint:forcetypeassert + if val.(bool) && MockDMLExecutionMerging != nil { + MockDMLExecutionMerging() + } + }) logSlowOperations(time.Since(oprStartTime), "AddIndexMergeDataInTxn", 3000) return } @@ -252,40 +259,49 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor return false, nil } - originVal, handle, isDelete, unique, keyVer := tablecodec.DecodeTempIndexValue(rawValue, isCommonHandle) - if keyVer == tables.TempIndexKeyTypeMerge || keyVer == tables.TempIndexKeyTypeDelete { - // For 'm' version kvs, they are double-written. - // For 'd' version kvs, they are written in the delete-only state and can be dropped safely. - return true, nil + tempIdxVal, err := tablecodec.DecodeTempIndexValue(rawValue, isCommonHandle) + if err != nil { + return false, err } + tempIdxVal = tempIdxVal.FilterOverwritten() + + // Extract the operations on the original index and replay them later. + for _, elem := range tempIdxVal { + if elem.KeyVer == tables.TempIndexKeyTypeMerge || elem.KeyVer == tables.TempIndexKeyTypeDelete { + // For 'm' version kvs, they are double-written. + // For 'd' version kvs, they are written in the delete-only state and can be dropped safely. + continue + } - if handle == nil { - // If the handle is not found in the value of the temp index, it means - // 1) This is not a deletion marker, the handle is in the key or the origin value. - // 2) This is a deletion marker, but the handle is in the key of temp index. - handle, err = tablecodec.DecodeIndexHandle(indexKey, originVal, len(w.index.Meta().Columns)) - if err != nil { - return false, err + if elem.Handle == nil { + // If the handle is not found in the value of the temp index, it means + // 1) This is not a deletion marker, the handle is in the key or the origin value. + // 2) This is a deletion marker, but the handle is in the key of temp index. + elem.Handle, err = tablecodec.DecodeIndexHandle(indexKey, elem.Value, len(w.index.Meta().Columns)) + if err != nil { + return false, err + } } - } - originIdxKey := make([]byte, len(indexKey)) - copy(originIdxKey, indexKey) - tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey) + originIdxKey := make([]byte, len(indexKey)) + copy(originIdxKey, indexKey) + tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey) - idxRecord := &temporaryIndexRecord{ - handle: handle, - delete: isDelete, - unique: unique, - skip: false, - } - if !isDelete { - idxRecord.vals = originVal - idxRecord.distinct = tablecodec.IndexKVIsUnique(originVal) + idxRecord := &temporaryIndexRecord{ + handle: elem.Handle, + delete: elem.Delete, + unique: elem.Distinct, + skip: false, + } + if !elem.Delete { + idxRecord.vals = elem.Value + idxRecord.distinct = tablecodec.IndexKVIsUnique(elem.Value) + } + w.tmpIdxRecords = append(w.tmpIdxRecords, idxRecord) + w.originIdxKeys = append(w.originIdxKeys, originIdxKey) + w.tmpIdxKeys = append(w.tmpIdxKeys, indexKey) } - w.tmpIdxRecords = append(w.tmpIdxRecords, idxRecord) - w.originIdxKeys = append(w.originIdxKeys, originIdxKey) - w.tmpIdxKeys = append(w.tmpIdxKeys, indexKey) + lastKey = indexKey return true, nil }) diff --git a/ddl/indexmergetest/BUILD.bazel b/ddl/indexmergetest/BUILD.bazel index 5f6e4215f664e..b70146ae8d461 100644 --- a/ddl/indexmergetest/BUILD.bazel +++ b/ddl/indexmergetest/BUILD.bazel @@ -17,6 +17,7 @@ go_test( "//ddl/internal/callback", "//ddl/testutil", "//domain", + "//errno", "//kv", "//meta/autoid", "//parser/model", diff --git a/ddl/indexmergetest/merge_test.go b/ddl/indexmergetest/merge_test.go index f63164bafe871..f74db4e0b9eb9 100644 --- a/ddl/indexmergetest/merge_test.go +++ b/ddl/indexmergetest/merge_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/ddl/ingest" "github.com/pingcap/tidb/ddl/internal/callback" "github.com/pingcap/tidb/ddl/testutil" + "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/tablecodec" @@ -529,3 +530,331 @@ func TestAddIndexMergeConflictWithPessimistic(t *testing.T) { tk.MustExec("admin check table t;") tk.MustQuery("select * from t;").Check(testkit.Rows("1 2")) } + +func TestAddIndexMergeInsertOnMerging(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int default 0, b int default 0)") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } + var err error + switch job.SchemaState { + case model.StateDeleteOnly: + _, err = tk1.Exec("insert into t values (5, 5)") + assert.NoError(t, err) + case model.StateWriteOnly: + _, err = tk1.Exec("insert into t values (5, 7)") + assert.NoError(t, err) + _, err = tk1.Exec("delete from t where b = 7") + assert.NoError(t, err) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + ddl.MockDMLExecutionStateMerging = func() { + _, err := tk1.Exec("insert into t values (5, 8);") + assert.Error(t, err) // [kv:1062]Duplicate entry '5' for key 't.idx' + _, err = tk1.Exec("insert into t values (5, 8) on duplicate key update a = 6;") + assert.NoError(t, err) // The row should be normally updated to (6, 5). + ddl.MockDMLExecutionStateMerging = nil + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging", "return(true)")) + tk.MustExec("alter table t add unique index idx(a);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("6 5")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging")) +} + +func TestAddIndexMergeReplaceOnMerging(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int default 0, b int default 0);") + tk.MustExec("insert into t values (5, 5);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + ddl.MockDMLExecution = func() { + _, err := tk1.Exec("delete from t where b = 5;") + assert.NoError(t, err) + } + + ddl.MockDMLExecutionStateMerging = func() { + _, err := tk1.Exec("replace into t values (5, 8);") + assert.NoError(t, err) + ddl.MockDMLExecutionStateMerging = nil + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecution", "1*return(true)->return(false)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging", "return(true)")) + tk.MustExec("alter table t add unique index idx(a);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("5 8")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging")) +} + +func TestAddIndexMergeInsertToDeletedTempIndex(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int default 0, b int default 0)") + tk.MustExec("insert into t values (5, 5);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } + var err error + switch job.SchemaState { + case model.StateWriteOnly: + _, err = tk1.Exec("delete from t where b = 5") + assert.NoError(t, err) + _, err := tk1.Exec("set @@tidb_constraint_check_in_place = true;") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t values (5, 8);") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t values (5, 8);") + assert.Error(t, err) + _, err = tk1.Exec("set @@tidb_constraint_check_in_place = false;") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t values (5, 8);") + assert.Error(t, err) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + tk.MustExec("alter table t add unique index idx(a);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("5 8")) +} + +func TestAddIndexMergeReplaceDelete(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, a int default 0);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } + switch job.SchemaState { + case model.StateDeleteOnly: + _, err := tk1.Exec("insert into t values (1, 1);") + assert.NoError(t, err) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + ddl.MockDMLExecutionMerging = func() { + _, err := tk1.Exec("replace into t values (2, 1);") + assert.NoError(t, err) + _, err = tk1.Exec("delete from t where id = 2;") + assert.NoError(t, err) + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging", "1*return(true)->return(false)")) + tk.MustExec("alter table t add unique index idx(a);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows()) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging")) +} + +func TestAddIndexMergeDeleteDifferentHandle(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, c char(10));") + tk.MustExec("insert into t values (1, 'a');") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + runDML := false + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() || runDML { + return + } + if job.SnapshotVer == 0 { + return + } + switch job.SchemaState { + case model.StateWriteReorganization: + _, err := tk1.Exec("insert into t values (2, 'a');") + assert.NoError(t, err) + _, err = tk1.Exec("replace into t values (3, 'a');") + assert.NoError(t, err) + runDML = true + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + ddl.MockDMLExecution = func() { + // It is too late to remove the duplicated index value. + _, err := tk1.Exec("delete from t where id = 1;") + assert.NoError(t, err) + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecution", "1*return(true)->return(false)")) + tk.MustGetErrCode("alter table t add unique index idx(c);", errno.ErrDupEntry) + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("3 a")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution")) +} + +func TestAddIndexDecodeTempIndexCommonHandle(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id_a bigint, id_b char(20), c char(20), primary key (id_a, id_b));") + tk.MustExec("insert into t values (1, 'id_1', 'char_1');") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + runDML := false + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() || runDML { + return + } + if job.SnapshotVer == 0 { + return + } + switch job.SchemaState { + case model.StateWriteReorganization: + _, err := tk1.Exec("insert into t values (2, 'id_2', 'char_2');") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t values (3, 'id_3', 'char_3');") + assert.NoError(t, err) + runDML = true + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + tk.MustExec("alter table t add unique index idx(c);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 id_1 char_1", "2 id_2 char_2", "3 id_3 char_3")) +} + +func TestAddIndexInsertIgnoreOnBackfill(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, b int);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + runDML := false + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() || runDML { + return + } + switch job.SchemaState { + case model.StateWriteReorganization: + _, err := tk1.Exec("insert ignore into t values (1, 1);") + assert.NoError(t, err) + _, err = tk1.Exec("insert ignore into t values (2, 2);") + assert.NoError(t, err) + _, err = tk1.Exec("update t set b = null where id = 1;") + assert.NoError(t, err) + runDML = true + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + tk.MustExec("alter table t add unique index idx(b);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 ", "2 2")) +} + +func TestAddIndexMultipleDelete(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, b int);") + tk.MustExec("insert into t values (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } + switch job.SchemaState { + case model.StateDeleteOnly: + _, err := tk1.Exec("delete from t where id in (4, 5, 6);") + assert.NoError(t, err) + case model.StateWriteOnly: + _, err := tk1.Exec("delete from t where id in (2, 3);") + assert.NoError(t, err) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + ddl.MockDMLExecution = func() { + _, err := tk1.Exec("delete from t where id = 1;") + assert.NoError(t, err) + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecution", "1*return(true)->return(false)")) + tk.MustExec("alter table t add unique index idx(b);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows()) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution")) +} diff --git a/executor/insert.go b/executor/insert.go index 9d958d5d8bd2f..2b6e45fb018d7 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -152,11 +153,11 @@ func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []t for _, r := range rows { for _, uk := range r.uniqueKeys { if val, found := values[string(uk.newKey)]; found { - if tablecodec.IsTempIndexKey(uk.newKey) { - if tablecodec.CheckTempIndexValueIsDelete(val) { - continue - } - val = tablecodec.DecodeTempIndexOriginValue(val) + if isTemp, _ := tablecodec.CheckTempIndexKey(uk.newKey); isTemp { + // If it is a temp index, the value cannot be decoded by DecodeHandleInUniqueIndexValue. + // Since this function is an optimization, we can skip prefetching the rows referenced by + // temp indexes. + continue } handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle) if err != nil { @@ -251,26 +252,13 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D } for _, uk := range r.uniqueKeys { - val, err := txn.Get(ctx, uk.newKey) + _, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID, uk.commonHandle) if err != nil { - if kv.IsErrNotFound(err) { - continue - } return err } - // Since the temp index stores deleted key with marked 'deleteu' for unique key at the end - // of value, So if return a key we check and skip deleted key. - if tablecodec.IsTempIndexKey(uk.newKey) { - if tablecodec.CheckTempIndexValueIsDelete(val) { - continue - } - val = tablecodec.DecodeTempIndexOriginValue(val) - } - handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle) - if err != nil { - return err + if handle == nil { + continue } - err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate) if err != nil { if kv.IsErrNotFound(err) { diff --git a/executor/replace.go b/executor/replace.go index bfc70ebc4451c..801ff40ebd252 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -176,22 +177,12 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error { // 3. error: the error. func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r toBeCheckedRow) (bool, bool, error) { for _, uk := range r.uniqueKeys { - val, err := txn.Get(ctx, uk.newKey) + _, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID, uk.commonHandle) if err != nil { - if kv.IsErrNotFound(err) { - continue - } return false, false, err } - if tablecodec.IsTempIndexKey(uk.newKey) { - if tablecodec.CheckTempIndexValueIsDelete(val) { - continue - } - val = tablecodec.DecodeTempIndexOriginValue(val) - } - handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle) - if err != nil { - return false, true, err + if handle == nil { + continue } rowUnchanged, err := e.removeRow(ctx, txn, handle, r) if err != nil { diff --git a/table/tables/index.go b/table/tables/index.go index 57c6498e698fd..8b8ce6660ca1c 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -16,7 +16,6 @@ package tables import ( "context" - "errors" "sync" "github.com/pingcap/tidb/kv" @@ -239,18 +238,21 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue opt.IgnoreAssertion = opt.IgnoreAssertion || c.idxInfo.State != model.StatePublic if !distinct || skipCheck || opt.Untouched { + val := idxVal if keyIsTempIdxKey && !opt.Untouched { // Untouched key-values never occur in the storage. - idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct} + val = tempVal.Encode(nil) } - err = txn.GetMemBuffer().Set(key, idxVal) + err = txn.GetMemBuffer().Set(key, val) if err != nil { return nil, err } if len(tempKey) > 0 { if !opt.Untouched { // Untouched key-values never occur in the storage. - idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct} + val = tempVal.Encode(nil) } - err = txn.GetMemBuffer().Set(tempKey, idxVal) + err = txn.GetMemBuffer().Set(tempKey, val) if err != nil { return nil, err } @@ -280,45 +282,49 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue if err != nil && !kv.IsErrNotFound(err) { return nil, err } - if err != nil || len(value) == 0 || (keyIsTempIdxKey && tablecodec.CheckTempIndexValueIsDelete(value)) { + var tempIdxVal tablecodec.TempIndexValue + if len(value) > 0 && keyIsTempIdxKey { + tempIdxVal, err = tablecodec.DecodeTempIndexValue(value, c.tblInfo.IsCommonHandle) + if err != nil { + return nil, err + } + } + // The index key value is not found or deleted. + if err != nil || len(value) == 0 || (!tempIdxVal.IsEmpty() && tempIdxVal.Current().Delete) { + val := idxVal lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil - var needPresumeKey TempIndexKeyState if keyIsTempIdxKey { - idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) - needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, key, distinct, h, c.tblInfo.IsCommonHandle) - if err != nil { - return nil, err - } - } else { - if len(tempKey) > 0 { - needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) - if err != nil { - return nil, err - } - } + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: true} + val = tempVal.Encode(value) + } + needPresumeNotExists, err := needPresumeKeyNotExistsFlag(ctx, txn, key, tempKey, h, + keyIsTempIdxKey, c.tblInfo.IsCommonHandle, c.tblInfo.ID) + if err != nil { + return nil, err } if lazyCheck { var flags []kv.FlagsOp - if needPresumeKey != KeyInTempIndexIsDeleted { + if needPresumeNotExists { flags = []kv.FlagsOp{kv.SetPresumeKeyNotExists} } if !vars.ConstraintCheckInPlacePessimistic && vars.TxnCtx.IsPessimistic && vars.InTxn() && !vars.InRestrictedSQL && vars.ConnectionID > 0 { flags = append(flags, kv.SetNeedConstraintCheckInPrewrite) } - err = txn.GetMemBuffer().SetWithFlags(key, idxVal, flags...) + err = txn.GetMemBuffer().SetWithFlags(key, val, flags...) } else { - err = txn.GetMemBuffer().Set(key, idxVal) + err = txn.GetMemBuffer().Set(key, val) } if err != nil { return nil, err } if len(tempKey) > 0 { - idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) - if lazyCheck && needPresumeKey != KeyInTempIndexIsDeleted { - err = txn.GetMemBuffer().SetWithFlags(tempKey, idxVal, kv.SetPresumeKeyNotExists) + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: true} + val = tempVal.Encode(value) + if lazyCheck && needPresumeNotExists { + err = txn.GetMemBuffer().SetWithFlags(tempKey, val, kv.SetPresumeKeyNotExists) } else { - err = txn.GetMemBuffer().Set(tempKey, idxVal) + err = txn.GetMemBuffer().Set(tempKey, val) } if err != nil { return nil, err @@ -338,8 +344,8 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue continue } - if keyIsTempIdxKey { - value = tablecodec.DecodeTempIndexOriginValue(value) + if keyIsTempIdxKey && !tempIdxVal.IsEmpty() { + value = tempIdxVal.Current().Value } handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle) if err != nil { @@ -350,6 +356,26 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue return nil, nil } +func needPresumeKeyNotExistsFlag(ctx context.Context, txn kv.Transaction, key, tempKey kv.Key, + h kv.Handle, keyIsTempIdxKey bool, isCommon bool, tblID int64) (needFlag bool, err error) { + var uniqueTempKey kv.Key + if keyIsTempIdxKey { + uniqueTempKey = key + } else if len(tempKey) > 0 { + uniqueTempKey = tempKey + } else { + return true, nil + } + foundKey, dupHandle, err := FetchDuplicatedHandle(ctx, uniqueTempKey, true, txn, tblID, isCommon) + if err != nil { + return false, err + } + if foundKey && dupHandle != nil && !dupHandle.Equal(h) { + return false, kv.ErrKeyExists + } + return false, nil +} + // Delete removes the entry for handle h and indexedValues from KV index. func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle) error { indexedValues := c.getIndexedValue(indexedValue) @@ -360,6 +386,16 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed } key, tempKey, tempKeyVer := GenTempIdxKeyByState(c.idxInfo, key) + var originTempVal []byte + if len(tempKey) > 0 && c.idxInfo.Unique { + // Get the origin value of the unique temporary index key. + // Append the new delete operations to the end of the origin value. + originTempVal, err = getKeyInTxn(context.TODO(), txn, tempKey) + if err != nil { + return err + } + } + tempValElem := tablecodec.TempIndexValueElem{Handle: h, KeyVer: tempKeyVer, Delete: true, Distinct: distinct} if distinct { if len(key) > 0 { @@ -369,7 +405,8 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed } } if len(tempKey) > 0 { - tempVal := tablecodec.EncodeTempIndexValueDeletedUnique(h, tempKeyVer) + // Append to the end of the origin value for distinct value. + tempVal := tempValElem.Encode(originTempVal) err = txn.GetMemBuffer().Set(tempKey, tempVal) if err != nil { return err @@ -383,7 +420,7 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed } } if len(tempKey) > 0 { - tempVal := tablecodec.EncodeTempIndexValueDeleted(tempKeyVer) + tempVal := tempValElem.Encode(nil) err = txn.GetMemBuffer().Set(tempKey, tempVal) if err != nil { return err @@ -484,50 +521,116 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV if err != nil { return false, nil, err } - - var ( - tempKey []byte - keyVer byte - ) // If index current is in creating status and using ingest mode, we need first // check key exist status in temp index. - key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key) - if keyVer != TempIndexKeyTypeNone { - KeyExistInfo, h1, err1 := KeyExistInTempIndex(context.TODO(), txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) - if err1 != nil { - return false, nil, err - } - switch KeyExistInfo { - case KeyInTempIndexNotExist, KeyInTempIndexIsDeleted: - return false, nil, nil - case KeyInTempIndexConflict: - return true, h1, kv.ErrKeyExists - case KeyInTempIndexIsItself: - continue - } + key, tempKey, _ := GenTempIdxKeyByState(c.idxInfo, key) + if len(tempKey) > 0 { + key = tempKey } - - value, err := txn.Get(context.TODO(), key) - if kv.IsErrNotFound(err) { - return false, nil, nil + foundKey, dupHandle, err := FetchDuplicatedHandle(context.TODO(), key, distinct, txn, c.tblInfo.ID, c.tblInfo.IsCommonHandle) + if err != nil || !foundKey { + return false, nil, err } - if err != nil { + if dupHandle != nil && !dupHandle.Equal(h) { return false, nil, err } + continue + } + return true, h, nil +} + +// FetchDuplicatedHandle is used to find the duplicated row's handle for a given unique index key. +func FetchDuplicatedHandle(ctx context.Context, key kv.Key, distinct bool, + txn kv.Transaction, tableID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) { + if isTemp, originIdxID := tablecodec.CheckTempIndexKey(key); isTemp { + return fetchDuplicatedHandleForTempIndexKey(ctx, key, distinct, txn, tableID, originIdxID, isCommon) + } + // The index key is not from temp index. + val, err := getKeyInTxn(ctx, txn, key) + if err != nil || len(val) == 0 { + return false, nil, err + } + if distinct { + h, err := tablecodec.DecodeHandleInUniqueIndexValue(val, isCommon) + return true, h, err + } + return true, nil, nil +} - // For distinct index, the value of key is handle. +func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, distinct bool, + txn kv.Transaction, tableID, idxID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) { + tempRawVal, err := getKeyInTxn(ctx, txn, tempKey) + if err != nil { + return false, nil, err + } + if tempRawVal == nil { + originKey := tempKey.Clone() + tablecodec.TempIndexKey2IndexKey(idxID, originKey) + originVal, err := getKeyInTxn(ctx, txn, originKey) + if err != nil || originVal == nil { + return false, nil, err + } if distinct { - var handle kv.Handle - handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle) + originHandle, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, isCommon) if err != nil { return false, nil, err } - if !handle.Equal(h) { - return true, handle, kv.ErrKeyExists + return true, originHandle, err + } + return false, nil, nil + } + tempVal, err := tablecodec.DecodeTempIndexValue(tempRawVal, isCommon) + if err != nil { + return false, nil, err + } + curElem := tempVal.Current() + if curElem.Delete { + originKey := tempKey.Clone() + tablecodec.TempIndexKey2IndexKey(idxID, originKey) + originVal, err := getKeyInTxn(ctx, txn, originKey) + if err != nil || originVal == nil { + return false, nil, err + } + if distinct { + originHandle, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, isCommon) + if err != nil { + return false, nil, err + } + if originHandle.Equal(curElem.Handle) { + // The key has been deleted. This is not a duplicated key. + return false, nil, nil } + // The inequality means multiple modifications happened in the same key. + // We use the handle in origin index value to check if the row exists. + recPrefix := tablecodec.GenTableRecordPrefix(tableID) + rowKey := tablecodec.EncodeRecordKey(recPrefix, originHandle) + rowVal, err := getKeyInTxn(ctx, txn, rowKey) + if err != nil || rowVal == nil { + return false, nil, err + } + // The row exists. This is the duplicated key. + return true, originHandle, nil } + return false, nil, nil } - return true, h, nil + // The value in temp index is not the delete marker. + if distinct { + h, err := tablecodec.DecodeHandleInUniqueIndexValue(curElem.Value, isCommon) + return true, h, err + } + return true, nil, nil +} + +// getKeyInTxn gets the value of the key in the transaction, and ignore the ErrNotExist error. +func getKeyInTxn(ctx context.Context, txn kv.Transaction, key kv.Key) ([]byte, error) { + val, err := txn.Get(ctx, key) + if err != nil { + if kv.IsErrNotFound(err) { + return nil, nil + } + return nil, err + } + return val, nil } func (c *index) FetchValues(r []types.Datum, vals []types.Datum) ([]types.Datum, error) { @@ -605,57 +708,3 @@ func TryAppendCommonHandleRowcodecColInfos(colInfo []rowcodec.ColInfo, tblInfo * } return colInfo } - -// TempIndexKeyState is the state of the temporary index key. -type TempIndexKeyState byte - -const ( - // KeyInTempIndexUnknown whether the key exists or not in temp index is unknown. - KeyInTempIndexUnknown TempIndexKeyState = iota - // KeyInTempIndexNotExist the key is not exist in temp index. - KeyInTempIndexNotExist - // KeyInTempIndexIsDeleted the key is marked deleted in temp index. - KeyInTempIndexIsDeleted - // KeyInTempIndexIsItself the key is correlated to itself in temp index. - KeyInTempIndexIsItself - // KeyInTempIndexConflict the key is conflict in temp index. - KeyInTempIndexConflict -) - -// KeyExistInTempIndex is used to check the unique key exist status in temp index. -func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (TempIndexKeyState, kv.Handle, error) { - // Only check temp index key. - if !tablecodec.IsTempIndexKey(key) { - return KeyInTempIndexUnknown, nil, nil - } - value, err := txn.Get(ctx, key) - if kv.IsErrNotFound(err) { - return KeyInTempIndexNotExist, nil, nil - } - if err != nil { - return KeyInTempIndexUnknown, nil, err - } - - // Since KeyExistInTempIndex only accept temp index key, so the value length should great than 1 for key version. - if len(value) < 1 { - return KeyInTempIndexUnknown, nil, errors.New("temp index value length should great than 1") - } - - if tablecodec.CheckTempIndexValueIsDelete(value) { - return KeyInTempIndexIsDeleted, nil, nil - } - - // Check if handle equal. - var handle kv.Handle - if distinct { - originVal := tablecodec.DecodeTempIndexOriginValue(value) - handle, err = tablecodec.DecodeHandleInUniqueIndexValue(originVal, IsCommonHandle) - if err != nil { - return KeyInTempIndexUnknown, nil, err - } - if !handle.Equal(h) { - return KeyInTempIndexConflict, handle, kv.ErrKeyExists - } - } - return KeyInTempIndexIsItself, handle, nil -} diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 328989d88ad3f..52b517eb93050 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -106,7 +106,7 @@ func CheckDataConsistency( // } if rowInsertion.key != nil { - if err = checkHandleConsistency(rowInsertion, indexMutations, columnMaps.IndexIDToInfo, t.Meta().Name.O); err != nil { + if err = checkHandleConsistency(rowInsertion, indexMutations, columnMaps.IndexIDToInfo, t.Meta()); err != nil { return errors.Trace(err) } } @@ -123,7 +123,7 @@ func CheckDataConsistency( // in row insertions and index insertions are consistent. // A PUT_index implies a PUT_row with the same handle. // Deletions are not checked since the values of deletions are unknown -func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, indexIDToInfo map[int64]*model.IndexInfo, tableName string) error { +func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, indexIDToInfo map[int64]*model.IndexInfo, tblInfo *model.TableInfo) error { var insertionHandle kv.Handle var err error @@ -154,7 +154,18 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, in indexHandle kv.Handle ) if idxID != m.indexID { - value = tablecodec.DecodeTempIndexOriginValue(m.value) + if tablecodec.TempIndexValueIsUntouched(m.value) { + // We never commit the untouched key values to the storage. Skip this check. + continue + } + var tempIdxVal tablecodec.TempIndexValue + tempIdxVal, err = tablecodec.DecodeTempIndexValue(m.value, tblInfo.IsCommonHandle) + if err != nil { + return err + } + if !tempIdxVal.IsEmpty() { + value = tempIdxVal.Current().Value + } if len(value) == 0 { // Skip the deleted operation values. continue @@ -170,7 +181,7 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, in } // NOTE: handle type can be different, see issue 29520 if indexHandle.IsInt() == insertionHandle.IsInt() && indexHandle.Compare(insertionHandle) != 0 { - err = ErrInconsistentHandle.GenWithStackByArgs(tableName, indexInfo.Name.O, indexHandle, insertionHandle, m, rowInsertion) + err = ErrInconsistentHandle.GenWithStackByArgs(tblInfo.Name, indexInfo.Name.O, indexHandle, insertionHandle, m, rowInsertion) logutil.BgLogger().Error("inconsistent handle in index and record insertions", zap.Error(err)) return err } @@ -209,9 +220,20 @@ func checkIndexKeys( return errors.New("index not found") } + var isTmpIdxValAndDeleted bool // If this is temp index data, need remove last byte of index data. if idxID != m.indexID { - value = append(value, m.value[:len(m.value)-1]...) + if tablecodec.TempIndexValueIsUntouched(m.value) { + // We never commit the untouched key values to the storage. Skip this check. + continue + } + tmpVal, err := tablecodec.DecodeTempIndexValue(m.value, t.Meta().IsCommonHandle) + if err != nil { + return err + } + curElem := tmpVal.Current() + isTmpIdxValAndDeleted = curElem.Delete + value = append(value, curElem.Value...) } else { value = append(value, m.value...) } @@ -245,7 +267,7 @@ func checkIndexKeys( } // When it is in add index new backfill state. - if len(value) == 0 || (idxID != m.indexID && (tablecodec.CheckTempIndexValueIsDelete(value))) { + if len(value) == 0 || isTmpIdxValAndDeleted { err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToRemove, indexInfo, t.Meta()) } else { err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToInsert, indexInfo, t.Meta()) diff --git a/table/tables/mutation_checker_test.go b/table/tables/mutation_checker_test.go index 43fb35c21a5b6..4c44e90a7d244 100644 --- a/table/tables/mutation_checker_test.go +++ b/table/tables/mutation_checker_test.go @@ -310,9 +310,9 @@ func TestCheckIndexKeysAndCheckHandleConsistency(t *testing.T) { require.Nil(t, err) rowMutation := mutation{key: rowKey, value: rowValue} corruptedRowMutation := mutation{key: corruptedRowKey, value: rowValue} - err = checkHandleConsistency(rowMutation, indexMutations, maps.IndexIDToInfo, "t") + err = checkHandleConsistency(rowMutation, indexMutations, maps.IndexIDToInfo, &tableInfo) require.Nil(t, err) - err = checkHandleConsistency(corruptedRowMutation, indexMutations, maps.IndexIDToInfo, "t") + err = checkHandleConsistency(corruptedRowMutation, indexMutations, maps.IndexIDToInfo, &tableInfo) require.NotNil(t, err) } } diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index 249a4f35495a0..394277ba2768c 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -1159,8 +1159,8 @@ func TempIndexKey2IndexKey(originIdxID int64, tempIdxKey []byte) { binary.BigEndian.PutUint64(tempIdxKey[prefixLen:], eid) } -// IsTempIndexKey check whether the input key is for a temp index. -func IsTempIndexKey(indexKey []byte) bool { +// CheckTempIndexKey checks whether the input key is for a temp index. +func CheckTempIndexKey(indexKey []byte) (isTemp bool, originIdxID int64) { var ( indexIDKey []byte indexID int64 @@ -1170,101 +1170,206 @@ func IsTempIndexKey(indexKey []byte) bool { indexIDKey = indexKey[prefixLen : prefixLen+8] indexID = codec.DecodeCmpUintToInt(binary.BigEndian.Uint64(indexIDKey)) tempIndexID = int64(TempIndexPrefix) | indexID - return tempIndexID == indexID + return tempIndexID == indexID, indexID & IndexIDMask } // TempIndexValueFlag is the flag of temporary index value. type TempIndexValueFlag byte const ( - // TempIndexValueFlagNormal means the following value is the normal index value. + // TempIndexValueFlagNormal means the following value is a distinct the normal index value. TempIndexValueFlagNormal TempIndexValueFlag = iota - // TempIndexValueFlagDeleted means this is a representation of a "delete" operation. + // TempIndexValueFlagNonDistinctNormal means the following value is the non-distinct normal index value. + TempIndexValueFlagNonDistinctNormal + // TempIndexValueFlagDeleted means the following value is the distinct and deleted index value. TempIndexValueFlagDeleted + // TempIndexValueFlagNonDistinctDeleted means the following value is the non-distinct deleted index value. + TempIndexValueFlagNonDistinctDeleted ) -// EncodeTempIndexValue encodes the value of temporary index. -// Note: this function changes the input value. -func EncodeTempIndexValue(value []byte, keyVer byte) []byte { - value = append(value, 0) - copy(value[1:], value[:len(value)-1]) - value[0] = byte(TempIndexValueFlagNormal) // normal flag + value + tempKeyVer - value = append(value, keyVer) - return value -} +// TempIndexValue is the value of temporary index. +// It contains one or more element, each element represents a history index operations on the original index. +// A temp index value element is encoded as one of: +// - [flag 1 byte][value_length 2 bytes ] [value value_len bytes] [key_version 1 byte] {distinct normal} +// - [flag 1 byte][value value_len bytes] [key_version 1 byte] {non-distinct normal} +// - [flag 1 byte][handle_length 2 bytes] [handle handle_len bytes] [key_version 1 byte] {distinct deleted} +// - [flag 1 byte] [key_version 1 byte] {non-distinct deleted} +// +// The temp index value is encoded as: +// - [element 1][element 2]...[element n] {for distinct values} +// - [element 1] {for non-distinct values} +type TempIndexValue []*TempIndexValueElem -// EncodeTempIndexValueDeletedUnique encodes the value of temporary index for unique index. -func EncodeTempIndexValueDeletedUnique(handle kv.Handle, keyVer byte) []byte { - var hEncoded []byte - var hLen int - if handle.IsInt() { - var data [8]byte - binary.BigEndian.PutUint64(data[:], uint64(handle.IntValue())) - hEncoded = data[:] - hLen = 8 - } else { - hEncoded = handle.Encoded() - hLen = len(hEncoded) - } - val := make([]byte, 0, 1+hLen+1) // deleted flag + handle + tempKeyVer - val = append(val, byte(TempIndexValueFlagDeleted)) - val = append(val, hEncoded...) - val = append(val, keyVer) - return val +// IsEmpty checks whether the value is empty. +func (v TempIndexValue) IsEmpty() bool { + return len(v) == 0 } -// EncodeTempIndexValueDeleted encodes the delete operation on origin index to a value for temporary index. -func EncodeTempIndexValueDeleted(keyVer byte) []byte { - // Handle is not needed because it is already in the key. - val := make([]byte, 0, 2) // deleted flag + tempKeyVer - val = append(val, byte(TempIndexValueFlagDeleted)) - val = append(val, keyVer) - return val +// Current returns the current latest temp index value. +func (v TempIndexValue) Current() *TempIndexValueElem { + return v[len(v)-1] } -// DecodeTempIndexValue decodes the value of temporary index. -func DecodeTempIndexValue(value []byte, isCommonHandle bool) (originVal []byte, handle kv.Handle, isDelete bool, isUnique bool, keyVer byte) { - if len(value) == 0 { - return nil, nil, false, false, 0 +// FilterOverwritten is used by the temp index merge process to remove the overwritten index operations. +// For example, the value {temp_idx_key -> [h2, h2d, h3, h1d]} recorded four operations on the original index. +// Since 'h2d' overwrites 'h2', we can remove 'h2' from the value. +func (v TempIndexValue) FilterOverwritten() TempIndexValue { + if len(v) <= 1 || !v[0].Distinct { + return v } - switch TempIndexValueFlag(value[0]) { - case TempIndexValueFlagNormal: - originVal = value[1 : len(value)-1] - keyVer = value[len(value)-1] - case TempIndexValueFlagDeleted: - isDelete = true - if len(value) == 2 { - keyVer = value[1] + occurred := kv.NewHandleMap() + for i := len(v) - 1; i >= 0; i-- { + if _, ok := occurred.Get(v[i].Handle); !ok { + occurred.Set(v[i].Handle, struct{}{}) } else { - isUnique = true - if isCommonHandle { - handle, _ = kv.NewCommonHandle(value[1 : len(value)-1]) + v[i] = nil + } + } + ret := v[:0] + for _, elem := range v { + if elem != nil { + ret = append(ret, elem) + } + } + return ret +} + +// TempIndexValueElem represents a history index operations on the original index. +// A temp index value element is encoded as one of: +// - [flag 1 byte][value_length 2 bytes ] [value value_len bytes] [key_version 1 byte] {distinct normal} +// - [flag 1 byte][value value_len bytes] [key_version 1 byte] {non-distinct normal} +// - [flag 1 byte][handle_length 2 bytes] [handle handle_len bytes] [key_version 1 byte] {distinct deleted} +// - [flag 1 byte] [key_version 1 byte] {non-distinct deleted} +type TempIndexValueElem struct { + Value []byte + Handle kv.Handle + KeyVer byte + Delete bool + Distinct bool +} + +// Encode encodes the temp index value. +func (v *TempIndexValueElem) Encode(buf []byte) []byte { + if v.Delete { + if v.Distinct { + handle := v.Handle + var hEncoded []byte + var hLen uint16 + if handle.IsInt() { + hEncoded = codec.EncodeUint(hEncoded, uint64(handle.IntValue())) + hLen = 8 } else { - handle = decodeIntHandleInIndexValue(value[1 : len(value)-1]) + hEncoded = handle.Encoded() + hLen = uint16(len(hEncoded)) + } + // flag + handle length + handle + temp key version + if buf == nil { + buf = make([]byte, 0, hLen+4) } - keyVer = value[len(value)-1] + buf = append(buf, byte(TempIndexValueFlagDeleted)) + buf = append(buf, byte(hLen>>8), byte(hLen)) + buf = append(buf, hEncoded...) + buf = append(buf, v.KeyVer) + return buf } - } - return + // flag + temp key version + if buf == nil { + buf = make([]byte, 0, 2) + } + buf = append(buf, byte(TempIndexValueFlagNonDistinctDeleted)) + buf = append(buf, v.KeyVer) + return buf + } + if v.Distinct { + // flag + value length + value + temp key version + if buf == nil { + buf = make([]byte, 0, len(v.Value)+4) + } + buf = append(buf, byte(TempIndexValueFlagNormal)) + vLen := uint16(len(v.Value)) + buf = append(buf, byte(vLen>>8), byte(vLen)) + buf = append(buf, v.Value...) + buf = append(buf, v.KeyVer) + return buf + } + // flag + value + temp key version + if buf == nil { + buf = make([]byte, 0, len(v.Value)+2) + } + buf = append(buf, byte(TempIndexValueFlagNonDistinctNormal)) + buf = append(buf, v.Value...) + buf = append(buf, v.KeyVer) + return buf } -// CheckTempIndexValueIsDelete checks whether the value is a delete operation. -func CheckTempIndexValueIsDelete(value []byte) bool { - if len(value) == 0 { - return false +// DecodeTempIndexValue decodes the temp index value. +func DecodeTempIndexValue(value []byte, isCommonHandle bool) (TempIndexValue, error) { + var ( + values []*TempIndexValueElem + err error + ) + for len(value) > 0 { + v := &TempIndexValueElem{} + value, err = v.DecodeOne(value, isCommonHandle) + if err != nil { + return nil, err + } + values = append(values, v) } - return TempIndexValueFlag(value[0]) == TempIndexValueFlagDeleted + return values, nil } -// DecodeTempIndexOriginValue decodes the value of origin index from a temp index value. -func DecodeTempIndexOriginValue(value []byte) []byte { - if len(value) == 0 { - return nil +// DecodeOne decodes one temp index value element. +func (v *TempIndexValueElem) DecodeOne(b []byte, isCommonHandle bool) (remain []byte, err error) { + flag := TempIndexValueFlag(b[0]) + b = b[1:] + switch flag { + case TempIndexValueFlagNormal: + vLen := (uint16(b[0]) << 8) + uint16(b[1]) + b = b[2:] + v.Value = b[:vLen] + b = b[vLen:] + v.KeyVer = b[0] + b = b[1:] + v.Distinct = true + v.Handle, err = DecodeHandleInUniqueIndexValue(v.Value, isCommonHandle) + return b, err + case TempIndexValueFlagNonDistinctNormal: + v.Value = b[:len(b)-1] + v.KeyVer = b[len(b)-1] + return nil, nil + case TempIndexValueFlagDeleted: + hLen := (uint16(b[0]) << 8) + uint16(b[1]) + b = b[2:] + if isCommonHandle { + v.Handle, _ = kv.NewCommonHandle(b[:hLen]) + } else { + v.Handle = decodeIntHandleInIndexValue(b[:hLen]) + } + b = b[hLen:] + v.KeyVer = b[0] + b = b[1:] + v.Distinct = true + v.Delete = true + return b, nil + case TempIndexValueFlagNonDistinctDeleted: + v.KeyVer = b[0] + b = b[1:] + v.Delete = true + return b, nil + default: + return nil, errors.New("invalid temp index value") } - if TempIndexValueFlag(value[0]) == TempIndexValueFlagNormal { - return value[1 : len(value)-1] +} + +// TempIndexValueIsUntouched returns true if the value is untouched. +// All the temp index value has the suffix of temp key version. +// All the temp key versions differ from the uncommitted KV flag. +func TempIndexValueIsUntouched(b []byte) bool { + if len(b) > 0 && b[len(b)-1] == kv.UnCommitIndexKVFlag { + return true } - return nil + return false } // GenIndexValuePortal is the portal for generating index value. diff --git a/tablecodec/tablecodec_test.go b/tablecodec/tablecodec_test.go index 231d58cf18bd3..adc4ccc78c13b 100644 --- a/tablecodec/tablecodec_test.go +++ b/tablecodec/tablecodec_test.go @@ -617,26 +617,84 @@ func TestTempIndexValueCodec(t *testing.T) { require.NoError(t, err) encodedValueCopy := make([]byte, len(encodedValue)) copy(encodedValueCopy, encodedValue) - tempIdxVal := EncodeTempIndexValue(encodedValue, 'b') - originVal, handle, isDelete, unique, keyVer := DecodeTempIndexValue(tempIdxVal, false) - require.Nil(t, handle) - require.False(t, isDelete || unique) - require.Equal(t, keyVer, byte('b')) - require.EqualValues(t, encodedValueCopy, originVal) - - tempIdxVal = EncodeTempIndexValueDeletedUnique(kv.IntHandle(100), 'm') - originVal, handle, isDelete, unique, keyVer = DecodeTempIndexValue(tempIdxVal, false) - require.Equal(t, handle.IntValue(), int64(100)) - require.True(t, isDelete) - require.True(t, unique) - require.Equal(t, keyVer, byte('m')) - require.Empty(t, originVal) - - tempIdxVal = EncodeTempIndexValueDeleted('b') - originVal, handle, isDelete, unique, keyVer = DecodeTempIndexValue(tempIdxVal, false) - require.Nil(t, handle) - require.True(t, isDelete) - require.False(t, unique) - require.Equal(t, keyVer, byte('b')) - require.Empty(t, originVal) + + tempIdxVal := TempIndexValueElem{ + Value: encodedValue, + KeyVer: 'b', + } + val := tempIdxVal.Encode(nil) + var newTempIdxVal TempIndexValueElem + remain, err := newTempIdxVal.DecodeOne(val, false) + require.NoError(t, err) + require.Equal(t, 0, len(remain)) + require.EqualValues(t, tempIdxVal, newTempIdxVal) + + idxVal := EncodeHandleInUniqueIndexValue(kv.IntHandle(100), false) + tempIdxVal = TempIndexValueElem{ + Value: idxVal, + KeyVer: 'm', + Distinct: true, + } + newTempIdxVal = TempIndexValueElem{} + val = tempIdxVal.Encode(nil) + remain, err = newTempIdxVal.DecodeOne(val, false) + require.NoError(t, err) + require.Equal(t, 0, len(remain)) + require.Equal(t, newTempIdxVal.Handle.IntValue(), int64(100)) + newTempIdxVal.Handle = nil + require.EqualValues(t, tempIdxVal, newTempIdxVal) + + tempIdxVal = TempIndexValueElem{ + Delete: true, + KeyVer: 'b', + } + newTempIdxVal = TempIndexValueElem{} + val = tempIdxVal.Encode(nil) + remain, err = newTempIdxVal.DecodeOne(val, false) + require.NoError(t, err) + require.Equal(t, 0, len(remain)) + require.EqualValues(t, tempIdxVal, newTempIdxVal) + + tempIdxVal = TempIndexValueElem{ + Delete: true, + KeyVer: 'b', + Distinct: true, + Handle: kv.IntHandle(100), + } + newTempIdxVal = TempIndexValueElem{} + val = tempIdxVal.Encode(nil) + remain, err = newTempIdxVal.DecodeOne(val, false) + require.NoError(t, err) + require.Equal(t, 0, len(remain)) + require.EqualValues(t, tempIdxVal, newTempIdxVal) + + // Test multiple temp index value elements. + idxVal = EncodeHandleInUniqueIndexValue(kv.IntHandle(100), false) + tempIdxVal = TempIndexValueElem{ + Value: idxVal, + KeyVer: 'm', + Distinct: true, + } + tempIdxVal2 := TempIndexValueElem{ + Handle: kv.IntHandle(100), + KeyVer: 'm', + Distinct: true, + Delete: true, + } + idxVal3 := EncodeHandleInUniqueIndexValue(kv.IntHandle(101), false) + tempIdxVal3 := TempIndexValueElem{ + Value: idxVal3, + KeyVer: 'm', + Distinct: true, + } + val = tempIdxVal.Encode(nil) + val = tempIdxVal2.Encode(val) + val = tempIdxVal3.Encode(val) + var result TempIndexValue + result, err = DecodeTempIndexValue(val, false) + require.NoError(t, err) + require.Equal(t, 3, len(result)) + require.Equal(t, result[0].Handle.IntValue(), int64(100)) + require.Equal(t, result[1].Handle.IntValue(), int64(100)) + require.Equal(t, result[2].Handle.IntValue(), int64(101)) }