From a025a6ead1c6fc24da63639f5909ba2969cdd419 Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 28 Sep 2021 21:52:44 +0800 Subject: [PATCH 01/22] check handle consistency Signed-off-by: ekexium --- table/tables/mutation_checker.go | 112 +++++++++++++++++++++----- table/tables/mutation_checker_test.go | 45 ++++++++--- 2 files changed, 127 insertions(+), 30 deletions(-) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index c127945652389..ce8bafce18960 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -36,6 +36,11 @@ type mutation struct { value []byte } +type richIndexMutation struct { + mutation + indexID int64 +} + type columnMaps struct { ColumnIDToInfo map[int64]*model.ColumnInfo ColumnIDToFieldType map[int64]*types.FieldType @@ -47,20 +52,14 @@ type columnMaps struct { // Namely, assume the database is consistent before, applying the mutations shouldn't break the consistency. // It aims at reducing bugs that will corrupt data, and preventing mistakes from spreading if possible. // +// 3 conditions are checked: +// (1) row.value is consistent with input data +// (2) the handle is consistent in row and index insertions +// (3) the keys of the indices are consistent with the values of rows +// // The check doesn't work and just returns nil when: // (1) the table is partitioned // (2) new collation is enabled and restored data is needed -// -// How it works: -// -// Assume the set of row values changes from V1 to V2, we check -// (1) V2 - V1 = {added indices} -// (2) V1 - V2 = {deleted indices} -// -// To check (1), we need -// (a) {added indices} is a subset of {needed indices} => each index mutation is consistent with the input/row key/value -// (b) {needed indices} is a subset of {added indices}. The check process would be exactly the same with how we generate -// the mutations, thus ignored. func CheckIndexConsistency( txn kv.Transaction, sessVars *variable.SessionVars, t *TableCommon, rowToInsert, rowToRemove []types.Datum, memBuffer kv.MemBuffer, sh kv.StagingHandle, @@ -86,33 +85,104 @@ func CheckIndexConsistency( return errors.Trace(err) } } + + richIndexMutations, err := buildRichIndexMutations(indexMutations) + if err != nil { + return err + } + + if rowInsertion.key != nil { + if err = checkHandleConsistency(rowInsertion, richIndexMutations, columnMaps.IndexIDToInfo); err != nil { + return errors.Trace(err) + } + } + if err := checkIndexKeys( - sessVars, t, rowToInsert, rowToRemove, indexMutations, columnMaps.IndexIDToInfo, columnMaps.IndexIDToRowColInfos, + sessVars, t, rowToInsert, rowToRemove, richIndexMutations, columnMaps.IndexIDToInfo, columnMaps.IndexIDToRowColInfos, ); err != nil { return errors.Trace(err) } return nil } +// buildRichIndexMutations attaches some reusable info to index mutations +func buildRichIndexMutations(indexMutations []mutation) ([]richIndexMutation, error) { + richIndexMutations := make([]richIndexMutation, 0, len(indexMutations)) + for _, m := range indexMutations { + _, indexID, _, err := tablecodec.DecodeIndexKey(m.key) + if err != nil { + return nil, errors.Trace(err) + } + richIndexMutations = append(richIndexMutations, richIndexMutation{m, indexID}) + } + return richIndexMutations, nil +} + +// checkHandleConsistency checks whether the handles, with regard to a single-row change, +// in row insertions and index insertions are consistent. +// A PUT_index implies a PUT_row with the same handle. +// Deletions are not checked since the values of deletions are unknown +func checkHandleConsistency(rowInsertion mutation, indexMutations []richIndexMutation, indexIDToInfo map[int64]*model.IndexInfo) error { + var insertionHandle kv.Handle + var err error + + if rowInsertion.key == nil { + return nil + } + insertionHandle, err = tablecodec.DecodeRowKey(rowInsertion.key) + if err != nil { + return errors.Trace(err) + } + + var indexHandle kv.Handle + for _, m := range indexMutations { + if len(m.value) == 0 { + continue + } + + indexInfo, ok := indexIDToInfo[m.indexID] + if !ok { + return errors.New("index not found") + } + + indexHandle, err = tablecodec.DecodeIndexHandle(m.key, m.value, len(indexInfo.Columns)) + if err != nil { + return errors.Trace(err) + } + if indexHandle.Compare(insertionHandle) != 0 { + return errors.Errorf("inconsistent handles in row and index insertions. index handle = %v, "+ + "row handle = %v, index = %+v, row = %+v", + indexHandle, insertionHandle, m, rowInsertion) + } + } + + return err +} + // checkIndexKeys checks whether the decoded data from keys of index mutations are consistent with the expected ones. +// +// How it works: +// +// Assume the set of row values changes from V1 to V2, we check +// (1) V2 - V1 = {added indices} +// (2) V1 - V2 = {deleted indices} +// +// To check (1), we need +// (a) {added indices} is a subset of {needed indices} => each index mutation is consistent with the input/row key/value +// (b) {needed indices} is a subset of {added indices}. The check process would be exactly the same with how we generate +// the mutations, thus ignored. func checkIndexKeys( sessVars *variable.SessionVars, t *TableCommon, rowToInsert, rowToRemove []types.Datum, - indexMutations []mutation, indexIDToInfo map[int64]*model.IndexInfo, + indexMutations []richIndexMutation, indexIDToInfo map[int64]*model.IndexInfo, indexIDToRowColInfos map[int64][]rowcodec.ColInfo, ) error { - var indexData []types.Datum for _, m := range indexMutations { - _, indexID, _, err := tablecodec.DecodeIndexKey(m.key) - if err != nil { - return errors.Trace(err) - } - - indexInfo, ok := indexIDToInfo[indexID] + indexInfo, ok := indexIDToInfo[m.indexID] if !ok { return errors.New("index not found") } - rowColInfos, ok := indexIDToRowColInfos[indexID] + rowColInfos, ok := indexIDToRowColInfos[m.indexID] if !ok { return errors.New("index not found") } diff --git a/table/tables/mutation_checker_test.go b/table/tables/mutation_checker_test.go index ae41933f8522c..e1781a6d5ea93 100644 --- a/table/tables/mutation_checker_test.go +++ b/table/tables/mutation_checker_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/rowcodec" "github.com/stretchr/testify/require" @@ -171,7 +172,7 @@ func TestCheckRowInsertionConsistency(t *testing.T) { } } -func TestCheckIndexKeys(t *testing.T) { +func TestCheckIndexKeysAndCheckHandleConsistency(t *testing.T) { // dimensions of the domain of checkIndexKeys: // 1. location *2 // 2. table structure @@ -179,6 +180,7 @@ func TestCheckIndexKeys(t *testing.T) { // (2) clustered index *2 // (3) string collation *2 // We don't test primary clustered index and int handle, since they should not have index mutations. + // Assume PK is always the first column (string). // cases locations := []*time.Location{time.UTC, time.Local} @@ -222,12 +224,13 @@ func TestCheckIndexKeys(t *testing.T) { {ID: 2, Offset: 1, FieldType: *types.NewFieldType(mysql.TypeDatetime)}, }, { - {ID: 1, Offset: 0, FieldType: *types.NewFieldTypeWithCollation(mysql.TypeString, "big5_chinese_ci", + {ID: 1, Offset: 0, FieldType: *types.NewFieldTypeWithCollation(mysql.TypeString, "utf8_unicode_ci", types.UnspecifiedLength)}, {ID: 2, Offset: 1, FieldType: *types.NewFieldType(mysql.TypeDatetime)}, }, } sessVars := variable.NewSessionVars() + rd := rowcodec.Encoder{Enable: true} now := types.CurrentTime(mysql.TypeDatetime) rowToInsert := []types.Datum{ @@ -262,34 +265,58 @@ func TestCheckIndexKeys(t *testing.T) { IsCommonHandle: isCommonHandle, } table := MockTableFromMeta(&tableInfo).(*TableCommon) + // + var handle kv.Handle + if isCommonHandle { + encoded, err := codec.EncodeKey(sessVars.StmtCtx, nil, rowToInsert[0]) + require.Nil(t, err) + handle, err = kv.NewCommonHandle(encoded) + require.Nil(t, err) + } else { + handle = kv.IntHandle(1) + } for i, indexInfo := range indexInfos { index := table.indices[i] maps := getOrBuildColumnMaps(getter, setter, table) - insertionKey, insertionValue, err := buildKeyValue(index, rowToInsert, sessVars, tableInfo, indexInfo, table) + + // test checkIndexKeys + insertionKey, insertionValue, err := buildIndexKeyValue(index, rowToInsert, sessVars, tableInfo, + indexInfo, table, handle) require.Nil(t, err) - deletionKey, _, err := buildKeyValue(index, rowToRemove, sessVars, tableInfo, indexInfo, table) + deletionKey, _, err := buildIndexKeyValue(index, rowToRemove, sessVars, tableInfo, indexInfo, table, + handle) require.Nil(t, err) indexMutations := []mutation{{key: insertionKey, value: insertionValue}, {key: deletionKey}} + richIndexMutations, err := buildRichIndexMutations(indexMutations) + require.Nil(t, err) err = checkIndexKeys( - sessVars, table, rowToInsert, rowToRemove, indexMutations, maps.IndexIDToInfo, + sessVars, table, rowToInsert, rowToRemove, richIndexMutations, maps.IndexIDToInfo, maps.IndexIDToRowColInfos, ) require.Nil(t, err) + + // test checkHandleConsistency + rowKey := tablecodec.EncodeRowKeyWithHandle(table.tableID, handle) + rowValue, err := tablecodec.EncodeRow(sessVars.StmtCtx, rowToInsert, []int64{1, 2}, nil, nil, &rd) + require.Nil(t, err) + rowMutation := mutation{key: rowKey, value: rowValue} + err = checkHandleConsistency(rowMutation, richIndexMutations, maps.IndexIDToInfo) + require.Nil(t, err) } } } } } -func buildKeyValue(index table.Index, rowToInsert []types.Datum, sessVars *variable.SessionVars, - tableInfo model.TableInfo, indexInfo *model.IndexInfo, table *TableCommon) ([]byte, []byte, error) { +func buildIndexKeyValue(index table.Index, rowToInsert []types.Datum, sessVars *variable.SessionVars, + tableInfo model.TableInfo, indexInfo *model.IndexInfo, table *TableCommon, handle kv.Handle) ([]byte, []byte, error) { indexedValues, err := index.FetchValues(rowToInsert, nil) if err != nil { return nil, nil, err } key, distinct, err := tablecodec.GenIndexKey( - sessVars.StmtCtx, &tableInfo, indexInfo, 1, indexedValues, kv.IntHandle(1), nil, + sessVars.StmtCtx, &tableInfo, indexInfo, 1, indexedValues, handle, nil, ) if err != nil { return nil, nil, err @@ -297,7 +324,7 @@ func buildKeyValue(index table.Index, rowToInsert []types.Datum, sessVars *varia rsData := TryGetHandleRestoredDataWrapper(table, rowToInsert, nil, indexInfo) value, err := tablecodec.GenIndexValuePortal( sessVars.StmtCtx, &tableInfo, indexInfo, NeedRestoredData(indexInfo.Columns, tableInfo.Columns), - distinct, false, indexedValues, kv.IntHandle(1), 0, rsData, + distinct, false, indexedValues, handle, 0, rsData, ) if err != nil { return nil, nil, err From f8d3c6ac24c42dad0258c81d982a858a16312d30 Mon Sep 17 00:00:00 2001 From: ekexium Date: Fri, 8 Oct 2021 16:18:47 +0800 Subject: [PATCH 02/22] refactor according to comments Signed-off-by: ekexium --- table/tables/mutation_checker.go | 46 +++++++++++---------------- table/tables/mutation_checker_test.go | 22 ++++++++++--- 2 files changed, 36 insertions(+), 32 deletions(-) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index ce8bafce18960..68f0c02fce6cc 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -31,14 +31,10 @@ import ( ) type mutation struct { - key kv.Key - flags kv.KeyFlags - value []byte -} - -type richIndexMutation struct { - mutation - indexID int64 + key kv.Key + flags kv.KeyFlags + value []byte + indexID int64 // only for index mutations } type columnMaps struct { @@ -60,6 +56,12 @@ type columnMaps struct { // The check doesn't work and just returns nil when: // (1) the table is partitioned // (2) new collation is enabled and restored data is needed +// +// The check is performed on almost every write. Its performance matters. +// Let M = the number of mutations, C = the number of columns in the table, +// I = the sum of the number of columns in all indices, +// The time complexity is O(M * C + I) +// The space complexity is O(M + C + I) func CheckIndexConsistency( txn kv.Transaction, sessVars *variable.SessionVars, t *TableCommon, rowToInsert, rowToRemove []types.Datum, memBuffer kv.MemBuffer, sh kv.StagingHandle, @@ -86,43 +88,29 @@ func CheckIndexConsistency( } } - richIndexMutations, err := buildRichIndexMutations(indexMutations) if err != nil { return err } if rowInsertion.key != nil { - if err = checkHandleConsistency(rowInsertion, richIndexMutations, columnMaps.IndexIDToInfo); err != nil { + if err = checkHandleConsistency(rowInsertion, indexMutations, columnMaps.IndexIDToInfo); err != nil { return errors.Trace(err) } } if err := checkIndexKeys( - sessVars, t, rowToInsert, rowToRemove, richIndexMutations, columnMaps.IndexIDToInfo, columnMaps.IndexIDToRowColInfos, + sessVars, t, rowToInsert, rowToRemove, indexMutations, columnMaps.IndexIDToInfo, columnMaps.IndexIDToRowColInfos, ); err != nil { return errors.Trace(err) } return nil } -// buildRichIndexMutations attaches some reusable info to index mutations -func buildRichIndexMutations(indexMutations []mutation) ([]richIndexMutation, error) { - richIndexMutations := make([]richIndexMutation, 0, len(indexMutations)) - for _, m := range indexMutations { - _, indexID, _, err := tablecodec.DecodeIndexKey(m.key) - if err != nil { - return nil, errors.Trace(err) - } - richIndexMutations = append(richIndexMutations, richIndexMutation{m, indexID}) - } - return richIndexMutations, nil -} - // checkHandleConsistency checks whether the handles, with regard to a single-row change, // in row insertions and index insertions are consistent. // A PUT_index implies a PUT_row with the same handle. // Deletions are not checked since the values of deletions are unknown -func checkHandleConsistency(rowInsertion mutation, indexMutations []richIndexMutation, indexIDToInfo map[int64]*model.IndexInfo) error { +func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, indexIDToInfo map[int64]*model.IndexInfo) error { var insertionHandle kv.Handle var err error @@ -173,7 +161,7 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []richIndexMut // the mutations, thus ignored. func checkIndexKeys( sessVars *variable.SessionVars, t *TableCommon, rowToInsert, rowToRemove []types.Datum, - indexMutations []richIndexMutation, indexIDToInfo map[int64]*model.IndexInfo, + indexMutations []mutation, indexIDToInfo map[int64]*model.IndexInfo, indexIDToRowColInfos map[int64][]rowcodec.ColInfo, ) error { var indexData []types.Datum @@ -279,7 +267,7 @@ func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer inspector := func(key kv.Key, flags kv.KeyFlags, data []byte) { // only check the current table if tablecodec.DecodeTableID(key) == t.physicalTableID { - m := mutation{key, flags, data} + m := mutation{key, flags, data, 0} if rowcodec.IsRowKey(key) { if len(data) > 0 { if rowInsertion.key == nil { @@ -291,6 +279,10 @@ func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer } } } else { + _, m.indexID, _, err = tablecodec.DecodeIndexKey(m.key) + if err != nil { + err = errors.Trace(err) + } indexMutations = append(indexMutations, m) } } diff --git a/table/tables/mutation_checker_test.go b/table/tables/mutation_checker_test.go index e1781a6d5ea93..ff41991b5b9e9 100644 --- a/table/tables/mutation_checker_test.go +++ b/table/tables/mutation_checker_test.go @@ -266,14 +266,20 @@ func TestCheckIndexKeysAndCheckHandleConsistency(t *testing.T) { } table := MockTableFromMeta(&tableInfo).(*TableCommon) // - var handle kv.Handle + var handle, corruptedHandle kv.Handle if isCommonHandle { encoded, err := codec.EncodeKey(sessVars.StmtCtx, nil, rowToInsert[0]) require.Nil(t, err) + corrupted := make([]byte, len(encoded)) + copy(corrupted, encoded) + corrupted[len(corrupted)-1] = corrupted[len(corrupted)-1] ^ 1 handle, err = kv.NewCommonHandle(encoded) require.Nil(t, err) + corruptedHandle, err = kv.NewCommonHandle(corrupted) + require.Nil(t, err) } else { handle = kv.IntHandle(1) + corruptedHandle = kv.IntHandle(2) } for i, indexInfo := range indexInfos { @@ -287,22 +293,28 @@ func TestCheckIndexKeysAndCheckHandleConsistency(t *testing.T) { deletionKey, _, err := buildIndexKeyValue(index, rowToRemove, sessVars, tableInfo, indexInfo, table, handle) require.Nil(t, err) - indexMutations := []mutation{{key: insertionKey, value: insertionValue}, {key: deletionKey}} - richIndexMutations, err := buildRichIndexMutations(indexMutations) + indexMutations := []mutation{ + {key: insertionKey, value: insertionValue, indexID: indexInfo.ID}, + {key: deletionKey, indexID: indexInfo.ID}, + } require.Nil(t, err) err = checkIndexKeys( - sessVars, table, rowToInsert, rowToRemove, richIndexMutations, maps.IndexIDToInfo, + sessVars, table, rowToInsert, rowToRemove, indexMutations, maps.IndexIDToInfo, maps.IndexIDToRowColInfos, ) require.Nil(t, err) // test checkHandleConsistency rowKey := tablecodec.EncodeRowKeyWithHandle(table.tableID, handle) + corruptedRowKey := tablecodec.EncodeRowKeyWithHandle(table.tableID, corruptedHandle) rowValue, err := tablecodec.EncodeRow(sessVars.StmtCtx, rowToInsert, []int64{1, 2}, nil, nil, &rd) require.Nil(t, err) rowMutation := mutation{key: rowKey, value: rowValue} - err = checkHandleConsistency(rowMutation, richIndexMutations, maps.IndexIDToInfo) + corruptedRowMutation := mutation{key: corruptedRowKey, value: rowValue} + err = checkHandleConsistency(rowMutation, indexMutations, maps.IndexIDToInfo) require.Nil(t, err) + err = checkHandleConsistency(corruptedRowMutation, indexMutations, maps.IndexIDToInfo) + require.NotNil(t, err) } } } From ada1f029568bc5a15c75327380e3d222f6af64c2 Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 11 Oct 2021 19:27:41 +0800 Subject: [PATCH 03/22] refactor according to comments Signed-off-by: ekexium --- table/tables/mutation_checker.go | 7 +++---- table/tables/mutation_checker_test.go | 4 +--- table/tables/tables.go | 6 +++--- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 68f0c02fce6cc..f1e64ba96c47c 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -44,7 +44,7 @@ type columnMaps struct { IndexIDToRowColInfos map[int64][]rowcodec.ColInfo } -// CheckIndexConsistency checks whether the given set of mutations corresponding to a single row is consistent. +// CheckDataConsistency checks whether the given set of mutations corresponding to a single row is consistent. // Namely, assume the database is consistent before, applying the mutations shouldn't break the consistency. // It aims at reducing bugs that will corrupt data, and preventing mistakes from spreading if possible. // @@ -62,7 +62,7 @@ type columnMaps struct { // I = the sum of the number of columns in all indices, // The time complexity is O(M * C + I) // The space complexity is O(M + C + I) -func CheckIndexConsistency( +func CheckDataConsistency( txn kv.Transaction, sessVars *variable.SessionVars, t *TableCommon, rowToInsert, rowToRemove []types.Datum, memBuffer kv.MemBuffer, sh kv.StagingHandle, ) error { @@ -122,7 +122,6 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, in return errors.Trace(err) } - var indexHandle kv.Handle for _, m := range indexMutations { if len(m.value) == 0 { continue @@ -133,7 +132,7 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, in return errors.New("index not found") } - indexHandle, err = tablecodec.DecodeIndexHandle(m.key, m.value, len(indexInfo.Columns)) + indexHandle, err := tablecodec.DecodeIndexHandle(m.key, m.value, len(indexInfo.Columns)) if err != nil { return errors.Trace(err) } diff --git a/table/tables/mutation_checker_test.go b/table/tables/mutation_checker_test.go index ff41991b5b9e9..f4a63fefa0a2e 100644 --- a/table/tables/mutation_checker_test.go +++ b/table/tables/mutation_checker_test.go @@ -265,14 +265,13 @@ func TestCheckIndexKeysAndCheckHandleConsistency(t *testing.T) { IsCommonHandle: isCommonHandle, } table := MockTableFromMeta(&tableInfo).(*TableCommon) - // var handle, corruptedHandle kv.Handle if isCommonHandle { encoded, err := codec.EncodeKey(sessVars.StmtCtx, nil, rowToInsert[0]) require.Nil(t, err) corrupted := make([]byte, len(encoded)) copy(corrupted, encoded) - corrupted[len(corrupted)-1] = corrupted[len(corrupted)-1] ^ 1 + corrupted[len(corrupted)-1] ^= 1 handle, err = kv.NewCommonHandle(encoded) require.Nil(t, err) corruptedHandle, err = kv.NewCommonHandle(corrupted) @@ -297,7 +296,6 @@ func TestCheckIndexKeysAndCheckHandleConsistency(t *testing.T) { {key: insertionKey, value: insertionValue, indexID: indexInfo.ID}, {key: deletionKey, indexID: indexInfo.ID}, } - require.Nil(t, err) err = checkIndexKeys( sessVars, table, rowToInsert, rowToRemove, indexMutations, maps.IndexIDToInfo, maps.IndexIDToRowColInfos, diff --git a/table/tables/tables.go b/table/tables/tables.go index c3537f5173f7c..611d7aaecb184 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -420,7 +420,7 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, if err = memBuffer.Set(key, value); err != nil { return err } - if err = CheckIndexConsistency(txn, sessVars, t, newData, oldData, memBuffer, sh); err != nil { + if err = CheckDataConsistency(txn, sessVars, t, newData, oldData, memBuffer, sh); err != nil { return errors.Trace(err) } memBuffer.Release(sh) @@ -839,7 +839,7 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . return h, err } - if err = CheckIndexConsistency(txn, sessVars, t, r, nil, memBuffer, sh); err != nil { + if err = CheckDataConsistency(txn, sessVars, t, r, nil, memBuffer, sh); err != nil { return nil, errors.Trace(err) } @@ -1097,7 +1097,7 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type sessVars := ctx.GetSessionVars() sc := sessVars.StmtCtx - if err = CheckIndexConsistency(txn, sessVars, t, nil, r, memBuffer, sh); err != nil { + if err = CheckDataConsistency(txn, sessVars, t, nil, r, memBuffer, sh); err != nil { return errors.Trace(err) } memBuffer.Release(sh) From 9a23b8fb48a2c75a6bce755d4f0bbcf8208c00d6 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 14 Oct 2021 16:50:33 +0800 Subject: [PATCH 04/22] verify the effectiveness against 24029 Signed-off-by: ekexium --- session/session_test.go | 12 ++++++++++++ tablecodec/tablecodec.go | 4 ++++ 2 files changed, 16 insertions(+) diff --git a/session/session_test.go b/session/session_test.go index 8addccd831002..fcfe8095c2dbc 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -5643,3 +5643,15 @@ func (s *testSessionSuite) TestLocalTemporaryTableUpdate(c *C) { tk.MustQuery("select * from tmp1").Check(testkit.Rows()) } } + +func (s *testSessionSuite2) TestDefend24029(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + collate.SetNewCollationEnabledForTest(true) + defer collate.SetNewCollationEnabledForTest(false) + tk.MustExec("create table t(a int key, b varchar(20) collate utf8mb4_unicode_ci, c varchar(20) collate utf8mb4_general_ci, unique key idx_b_c(b, c));") + c.Assert(failpoint.Enable("github.com/pingcap/tidb/tablecodec/injectNeedRestoredData", "return(false)"), IsNil) + _, err := tk.Exec("insert into t values (4, 'd', 'F');") + c.Assert(err, NotNil) + c.Assert(strings.Contains(err.Error(), "inconsistent index values"),IsTrue) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/tablecodec/injectNeedRestoredData"), IsNil) +} diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index 2fca105e8e259..6ca1b7b17976c 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -17,6 +17,7 @@ package tablecodec import ( "bytes" "encoding/binary" + "github.com/pingcap/failpoint" "math" "strings" "time" @@ -1124,6 +1125,9 @@ func GenIndexKey(sc *stmtctx.StatementContext, tblInfo *model.TableInfo, idxInfo // | Besides, if the collation of b is _bin, then restored data is an integer indicate the spaces are truncated. Then we use sortKey // | and the restored data together to restore original data. func GenIndexValuePortal(sc *stmtctx.StatementContext, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, needRestoredData bool, distinct bool, untouched bool, indexedValues []types.Datum, h kv.Handle, partitionID int64, restoredData []types.Datum) ([]byte, error) { + failpoint.Inject("injectNeedRestoredData", func(val failpoint.Value) { + needRestoredData = val.(bool) + }) if tblInfo.IsCommonHandle && tblInfo.CommonHandleVersion == 1 { return GenIndexValueForClusteredIndexVersion1(sc, tblInfo, idxInfo, needRestoredData, distinct, untouched, indexedValues, h, partitionID, restoredData) } From 5c3095ef5d7ca03d139fa524f6f92f14c82a30a1 Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 27 Oct 2021 23:46:33 +0800 Subject: [PATCH 05/22] simplest injections Signed-off-by: ekexium --- go.mod | 1 + kv/kv.go | 3 + session/defend_test.go | 23 +++++ session/session_test.go | 2 +- store/driver/txn/unionstore_driver.go | 4 + table/tables/mutation_checker.go | 128 ++++++++++++++++++++++++-- 6 files changed, 154 insertions(+), 7 deletions(-) create mode 100644 session/defend_test.go diff --git a/go.mod b/go.mod index fc9980de52b83..ae4de6c0e6ce6 100644 --- a/go.mod +++ b/go.mod @@ -99,3 +99,4 @@ require ( // FIXME the official repo has some bug makes br_gcs test failed. https://github.com/googleapis/google-cloud-go/pull/3509 // replace cloud.google.com/go/storage => github.com/3pointer/google-cloud-go/storage v1.6.1-0.20210108125931-b59bfa0720b2 +replace github.com/tikv/client-go/v2 => ../client-go diff --git a/kv/kv.go b/kv/kv.go index 42a76c4676a07..3d191f7b95c85 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -171,6 +171,9 @@ type MemBuffer interface { // Size returns sum of keys and values length. Size() int + + // for test + DeleteKey(k Key) } // LockCtx contains information for LockKeys method. diff --git a/session/defend_test.go b/session/defend_test.go new file mode 100644 index 0000000000000..8604ac7820728 --- /dev/null +++ b/session/defend_test.go @@ -0,0 +1,23 @@ +package session_test + +import ( + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/require" + "testing" +) + +func TestDefend14631(t *testing.T) { + store, close := testkit.CreateMockStore(t) + defer close() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`drop table if exists t1`) + tk.MustExec(`create table t1(c1 decimal(6,4), primary key(c1))`) + tk.MustExec(`insert into t1 set c1 = 0.1`) + require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/table/tables/printMutation", "return")) + tk.MustExec(`insert into t1 set c1 = 0.1 on duplicate key update c1 = 1`) + require.Nil(t, failpoint.Disable("github.com/pingcap/tidb/table/tables/printMutation")) + tk.MustExec("admin check table t1") + //tk.MustQuery(`select * from t1 use index(primary)`).Check(testkit.Rows(`1.0000`)) +} diff --git a/session/session_test.go b/session/session_test.go index fcfe8095c2dbc..c28aaf170023d 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -5652,6 +5652,6 @@ func (s *testSessionSuite2) TestDefend24029(c *C) { c.Assert(failpoint.Enable("github.com/pingcap/tidb/tablecodec/injectNeedRestoredData", "return(false)"), IsNil) _, err := tk.Exec("insert into t values (4, 'd', 'F');") c.Assert(err, NotNil) - c.Assert(strings.Contains(err.Error(), "inconsistent index values"),IsTrue) + c.Assert(strings.Contains(err.Error(), "inconsistent index values"), IsTrue) c.Assert(failpoint.Disable("github.com/pingcap/tidb/tablecodec/injectNeedRestoredData"), IsNil) } diff --git a/store/driver/txn/unionstore_driver.go b/store/driver/txn/unionstore_driver.go index a60f5ddcf74c5..2ac1ac768d02c 100644 --- a/store/driver/txn/unionstore_driver.go +++ b/store/driver/txn/unionstore_driver.go @@ -43,6 +43,10 @@ func (m *memBuffer) Delete(k kv.Key) error { return m.MemDB.Delete(k) } +func (m *memBuffer) DeleteKey(k kv.Key) { + m.MemDB.DeleteKey(k) +} + func (m *memBuffer) DeleteWithFlags(k kv.Key, ops ...kv.FlagsOp) error { err := m.MemDB.DeleteWithFlags(k, getTiKVFlagsOps(ops)...) return derr.ToTiDBErr(err) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index f1e64ba96c47c..3ecaca8eb510c 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -15,9 +15,14 @@ package tables import ( + "encoding/hex" "fmt" + "math/rand" + "strings" + "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/stmtctx" @@ -66,6 +71,10 @@ func CheckDataConsistency( txn kv.Transaction, sessVars *variable.SessionVars, t *TableCommon, rowToInsert, rowToRemove []types.Datum, memBuffer kv.MemBuffer, sh kv.StagingHandle, ) error { + failpoint.Inject("corruptMutations", func(commands failpoint.Value) { + corruptMutations(t, txn, sh, commands.(string)) + }) + if t.Meta().GetPartitionInfo() != nil { return nil } @@ -73,11 +82,21 @@ func CheckDataConsistency( // some implementations of MemBuffer doesn't support staging, e.g. that in br/pkg/lightning/backend/kv return nil } - indexMutations, rowInsertion, err := collectTableMutationsFromBufferStage(t, memBuffer, sh) + indexMutations, rowInsertion, _, err := collectTableMutationsFromBufferStage(t, memBuffer, sh) if err != nil { return errors.Trace(err) } + failpoint.Inject("printMutation", func() { + fmt.Println("------------------------------------------------") + fmt.Printf("row to insert: %+v\nrow to remove: %+v\n", rowToInsert, rowToRemove) + fmt.Printf("row insertion: key: %v, value: %v\n", hex.EncodeToString(rowInsertion.key), + hex.EncodeToString(rowInsertion.value)) + for _, im := range indexMutations { + fmt.Printf("index mutation: key: %v, value: %v\n", hex.EncodeToString(im.key), hex.EncodeToString(im.value)) + } + }) + columnMaps := getColumnMaps(txn, t) if rowToInsert != nil { @@ -203,6 +222,8 @@ func checkIndexKeys( } if len(m.value) == 0 { + logutil.BgLogger().Error("line 224", zap.String("mutation", fmt.Sprintf("%#v", m)), zap.Any("indexData", + indexData)) err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToRemove, indexInfo) } else { err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToInsert, indexInfo) @@ -254,14 +275,14 @@ func checkRowInsertionConsistency( } // collectTableMutationsFromBufferStage collects mutations of the current table from the mem buffer stage -// It returns: (1) all index mutations (2) the only row insertion -// If there are no row insertions, the 2nd returned value is nil +// It returns: (1) all index mutations (2) the only row insertion (3) the only row deletion +// If there are no row insertions/deletions, return nil // If there are multiple row insertions, an error is returned func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer, sh kv.StagingHandle) ( - []mutation, mutation, error, + []mutation, mutation, mutation, error, ) { indexMutations := make([]mutation, 0) - var rowInsertion mutation + var rowInsertion, rowDeletion mutation var err error inspector := func(key kv.Key, flags kv.KeyFlags, data []byte) { // only check the current table @@ -276,7 +297,16 @@ func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer "multiple row mutations added/mutated, one = %+v, another = %+v", rowInsertion, m, ) } + } else { + if rowDeletion.key == nil { + rowDeletion = m + } else { + err = errors.Errorf( + "multiple row mutations deleted, one = %+v, another = %+v", rowDeletion, m, + ) + } } + } else { _, m.indexID, _, err = tablecodec.DecodeIndexKey(m.key) if err != nil { @@ -287,7 +317,7 @@ func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer } } memBuffer.InspectStage(sh, inspector) - return indexMutations, rowInsertion, err + return indexMutations, rowInsertion, rowDeletion, err } // compareIndexData compares the decoded index data with the input data. @@ -296,6 +326,7 @@ func compareIndexData( sc *stmtctx.StatementContext, cols []*table.Column, indexData, input []types.Datum, indexInfo *model.IndexInfo, ) error { for i, decodedMutationDatum := range indexData { + logutil.BgLogger().Error("line 326", zap.Int("i", i), zap.Int("Offset", indexInfo.Columns[i].Offset)) expectedDatum := input[indexInfo.Columns[i].Offset] tablecodec.TruncateIndexValue( @@ -373,3 +404,88 @@ func getOrBuildColumnMaps( } return maps } + +// only used in tests +// commands is a comma separated string, each representing a type of corruptions to the mutations +// The injection depends on actual encoding rules. +func corruptMutations(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle, cmds string) error { + commands := strings.Split(cmds, ",") + memBuffer := txn.GetMemBuffer() + rand := rand.New(rand.NewSource(time.Now().UnixNano())) + + indexMutations, _, _, err := collectTableMutationsFromBufferStage(t, memBuffer, sh) + if err != nil { + return errors.Trace(err) + } + + for _, cmd := range commands { + switch cmd { + case "extraIndex": + // an extra index mutation + // TODO: distinguish which part to corrupt, value or handle + { + if len(indexMutations) == 0 { + continue + } + indexMutation := indexMutations[rand.Intn(len(indexMutations))] + key := make([]byte, len(indexMutation.key)) + copy(key, indexMutation.key) + key[len(key)-1] += 1 + memBuffer.Set(key, indexMutation.value) + } + //case "extraIndexByHandle": + // { + // } + case "missingIndex": + // an index mutation is missing + // "missIndex" should be placed in front of "extraIndex"es, + // in case it removes the mutation that was just added + { + if len(indexMutations) == 0 { + continue + } + indexMutation := indexMutations[rand.Intn(len(indexMutations))] + memBuffer.DeleteKey(indexMutation.key) + } + case "corruptIndexKey": + // a corrupted index mutation. + // TODO: distinguish which part is corrupted, value or handle + { + if len(indexMutations) == 0 { + continue + } + indexMutation := indexMutations[rand.Intn(len(indexMutations))] + key := indexMutation.key + memBuffer.DeleteKey(key) + key[len(key)-1] += 1 + memBuffer.Set(key, indexMutation.value) + } + //case "corruptIndexKeyByHandle": + // { + // } + case "corruptIndexValue": + // TODO: distinguish which part to corrupt, int handle, common handle, or restored data? + // It doesn't make much sense to always corrupt the last byte + { + if len(indexMutations) == 0 { + continue + } + indexMutation := indexMutations[rand.Intn(len(indexMutations))] + value := indexMutation.value + if len(value) > 0 { + value[len(value)-1] += 1 + } + memBuffer.Set(indexMutation.key, value) + } + //case "corruptIndexValueCommonHandle": + // { + // } + //case "missIndexValueRestoredData": + // { + // } + default: + return errors.New(fmt.Sprintf("unknown command to corrupt mutation: %s", cmd)) + } + } + return nil +} From 2874f72cae1d31b3201632b5b077a9b137609eef Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 3 Nov 2021 23:17:03 +0800 Subject: [PATCH 06/22] assert that the number of index insertions must be a multiple of row insertions' Signed-off-by: ekexium --- session/txn.go | 6 ++++++ table/tables/mutation_checker.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/session/txn.go b/session/txn.go index 270a551ec8d66..48704f7d15da8 100644 --- a/session/txn.go +++ b/session/txn.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sli" @@ -334,6 +335,11 @@ func (txn *LazyTxn) Commit(ctx context.Context) error { return errors.Trace(kv.ErrInvalidTxn) } + // add an enablement check + if err := tables.CheckTxnConsistency(txn); err != nil { + return errors.Trace(err) + } + txn.mu.Lock() txn.mu.TxnInfo.State = txninfo.TxnCommitting txn.mu.Unlock() diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index f1e64ba96c47c..031940c1affd2 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -373,3 +373,32 @@ func getOrBuildColumnMaps( } return maps } + +func CheckTxnConsistency(txn kv.Transaction) error { + memBuffer := txn.GetMemBuffer() + if memBuffer == nil { + return nil + } + + indexInsertionCount := 0 + rowInsertionCount := 0 + f := func(k kv.Key, v []byte) error { + if rowcodec.IsRowKey(k) { + if len(v) > 0 { + rowInsertionCount += 1 + } + } else { + if len(v) > 0 { + indexInsertionCount += 1 + } + } + return nil + } + if err := kv.WalkMemBuffer(memBuffer, f); err != nil { + return errors.Trace(err) + } + if rowInsertionCount % indexInsertionCount != 0 { + return errors.Errorf("inconsistent index insertion count %d and row insertion count %d", indexInsertionCount, rowInsertionCount) + } + return nil +} From e9fc0de9cd0caf2a3be8021761f9ed0eb88e12cf Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 3 Nov 2021 23:23:14 +0800 Subject: [PATCH 07/22] assert that the number of index insertions must be a multiple of row insertions' Signed-off-by: ekexium --- table/tables/mutation_checker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 031940c1affd2..fab7ee1401223 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -383,7 +383,7 @@ func CheckTxnConsistency(txn kv.Transaction) error { indexInsertionCount := 0 rowInsertionCount := 0 f := func(k kv.Key, v []byte) error { - if rowcodec.IsRowKey(k) { + if rowcodec.IsRowKey(k) { if len(v) > 0 { rowInsertionCount += 1 } @@ -397,7 +397,7 @@ func CheckTxnConsistency(txn kv.Transaction) error { if err := kv.WalkMemBuffer(memBuffer, f); err != nil { return errors.Trace(err) } - if rowInsertionCount % indexInsertionCount != 0 { + if indexInsertionCount%rowInsertionCount != 0 { return errors.Errorf("inconsistent index insertion count %d and row insertion count %d", indexInsertionCount, rowInsertionCount) } return nil From 4012de4e9ef7c9cca70ad8cb34d2a600282e93f0 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 4 Nov 2021 18:20:21 +0800 Subject: [PATCH 08/22] count for each table Signed-off-by: ekexium --- table/tables/mutation_checker.go | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index fab7ee1401223..bb58e78eec332 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -380,16 +380,16 @@ func CheckTxnConsistency(txn kv.Transaction) error { return nil } - indexInsertionCount := 0 - rowInsertionCount := 0 + // count for each table + indexInsertionCount := make(map[int64]int) + rowInsertionCount := make(map[int64]int) f := func(k kv.Key, v []byte) error { - if rowcodec.IsRowKey(k) { - if len(v) > 0 { - rowInsertionCount += 1 - } - } else { - if len(v) > 0 { - indexInsertionCount += 1 + if len(v) > 0 { + tableID := tablecodec.DecodeTableID(k) + if rowcodec.IsRowKey(k) { + rowInsertionCount[tableID] += 1 + } else { + indexInsertionCount[tableID] += 1 } } return nil @@ -397,8 +397,13 @@ func CheckTxnConsistency(txn kv.Transaction) error { if err := kv.WalkMemBuffer(memBuffer, f); err != nil { return errors.Trace(err) } - if indexInsertionCount%rowInsertionCount != 0 { - return errors.Errorf("inconsistent index insertion count %d and row insertion count %d", indexInsertionCount, rowInsertionCount) + for tableID, count := range indexInsertionCount { + // FIXME: always? what if like backfilling? + if rowInsertionCount[tableID] > 0 && count%rowInsertionCount[tableID] != 0 { + return errors.Errorf("inconsistent index insertion count %d and row insertion count %d", + count, rowInsertionCount[tableID]) + } + } return nil } From 403ced7f136cb8c67d002de20074529f380bfb96 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 4 Nov 2021 18:23:27 +0800 Subject: [PATCH 09/22] Auto stash before merge of "verify-effectiveness" and "check-in-txn" --- session/defend_test.go | 18 ++++++++++++++++++ table/tables/mutation_checker.go | 3 --- 2 files changed, 18 insertions(+), 3 deletions(-) mode change 100644 => 100755 table/tables/mutation_checker.go diff --git a/session/defend_test.go b/session/defend_test.go index 8604ac7820728..7a6bbe1967d97 100644 --- a/session/defend_test.go +++ b/session/defend_test.go @@ -21,3 +21,21 @@ func TestDefend14631(t *testing.T) { tk.MustExec("admin check table t1") //tk.MustQuery(`select * from t1 use index(primary)`).Check(testkit.Rows(`1.0000`)) } + +func TestCorrupt(t *testing.T) { + store, close := testkit.CreateMockStore(t) + defer close() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`drop table if exists t1`) + tk.MustExec("set global tidb_enable_mutation_checker = true;") + tk.MustExec("set tidb_enable_mutation_checker = true;") + tk.MustQuery("select @@tidb_enable_mutation_checker").Check(testkit.Rows("1")) + tk.MustExec(`CREATE TABLE t1653 (c1 VARCHAR(10), c1377 VARCHAR(10), KEY i1654 (c1, c1377), KEY i1655 (c1377, c1))`) + failpoint.Enable("github.com/pingcap/tidb/table/tables/corruptMutations", "return(\"missingIndex\")") + tk.MustExec("begin") + tk.MustExec(`insert into t1653 set c1 = 'a', c1377 = 'b'`) + tk.MustExec(`insert into t1653 values('aa', 'bb')`) + tk.MustExec("commit") + failpoint.Disable("github.com/pingcap/tidb/table/tables/corruptMutations") +} diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go old mode 100644 new mode 100755 index b3d7f83f7cabf..cf406176aaafc --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -222,8 +222,6 @@ func checkIndexKeys( } if len(m.value) == 0 { - logutil.BgLogger().Error("line 224", zap.String("mutation", fmt.Sprintf("%#v", m)), zap.Any("indexData", - indexData)) err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToRemove, indexInfo) } else { err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToInsert, indexInfo) @@ -326,7 +324,6 @@ func compareIndexData( sc *stmtctx.StatementContext, cols []*table.Column, indexData, input []types.Datum, indexInfo *model.IndexInfo, ) error { for i, decodedMutationDatum := range indexData { - logutil.BgLogger().Error("line 326", zap.Int("i", i), zap.Int("Offset", indexInfo.Columns[i].Offset)) expectedDatum := input[indexInfo.Columns[i].Offset] tablecodec.TruncateIndexValue( From e60abebad6875e94cc7819d178375f0942321cc3 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 4 Nov 2021 18:59:05 +0800 Subject: [PATCH 10/22] comment CheckTxnConsistency Signed-off-by: ekexium --- table/tables/mutation_checker.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index bb58e78eec332..6baa4b36d0c79 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -374,6 +374,8 @@ func getOrBuildColumnMaps( return maps } +// CheckTxnConsistency checks the number of row/index mutations before the txn commits, +// to prevent some inconsistent transactions. func CheckTxnConsistency(txn kv.Transaction) error { memBuffer := txn.GetMemBuffer() if memBuffer == nil { From 691b50c31742187a4151c80285a1e24b2e3f51e5 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 4 Nov 2021 23:40:18 +0800 Subject: [PATCH 11/22] add a test for check-in-txn Signed-off-by: ekexium --- session/defend_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/session/defend_test.go b/session/defend_test.go index 7a6bbe1967d97..7ea98ab23e882 100644 --- a/session/defend_test.go +++ b/session/defend_test.go @@ -39,3 +39,19 @@ func TestCorrupt(t *testing.T) { tk.MustExec("commit") failpoint.Disable("github.com/pingcap/tidb/table/tables/corruptMutations") } + +func TestCheckInTxn(t *testing.T) { + store, close := testkit.CreateMockStore(t) + defer close() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`drop table if exists t`) + tk.MustExec("set global tidb_enable_mutation_checker = true;") + tk.MustExec("create table t(id int, v varchar(20), unique key i1(id))") + tk.MustExec("insert into t values (1, 'a')") + require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/table/tables/printMutation", "return")) + tk.MustExec("begin") + tk.MustExec("insert into t values (1, 'd'), (3, 'f') on duplicate key update v='x'") + tk.MustExec("commit") + require.Nil(t, failpoint.Disable("github.com/pingcap/tidb/table/tables/printMutation")) +} From a83c74773fcad07a3e6af642e1853ced64115c1d Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 4 Nov 2021 23:42:38 +0800 Subject: [PATCH 12/22] only check when InTxn is true Signed-off-by: ekexium --- session/session.go | 16 ++++++++++++---- session/txn.go | 6 ------ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/session/session.go b/session/session.go index c6d712366c44a..ebb48f970b020 100644 --- a/session/session.go +++ b/session/session.go @@ -45,10 +45,6 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" - "github.com/pingcap/tidb/util/topsql" - "github.com/pingcap/tipb/go-binlog" - "go.uber.org/zap" - "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" @@ -73,6 +69,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/telemetry" "github.com/pingcap/tidb/types" @@ -87,9 +84,12 @@ import ( "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/tableutil" "github.com/pingcap/tidb/util/timeutil" + "github.com/pingcap/tidb/util/topsql" + "github.com/pingcap/tipb/go-binlog" tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/tikv" tikvutil "github.com/tikv/client-go/v2/util" + "go.uber.org/zap" ) var ( @@ -564,6 +564,14 @@ func (s *session) doCommit(ctx context.Context) error { func (s *session) commitTxnWithTemporaryData(ctx context.Context, txn kv.Transaction) error { sessVars := s.sessionVars + + // TODO: add an enablement check + if sessVars.InTxn() { + if err := tables.CheckTxnConsistency(txn); err != nil { + return errors.Trace(err) + } + } + txnTempTables := sessVars.TxnCtx.TemporaryTables if len(txnTempTables) == 0 { return txn.Commit(ctx) diff --git a/session/txn.go b/session/txn.go index 48704f7d15da8..270a551ec8d66 100644 --- a/session/txn.go +++ b/session/txn.go @@ -34,7 +34,6 @@ import ( "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" - "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sli" @@ -335,11 +334,6 @@ func (txn *LazyTxn) Commit(ctx context.Context) error { return errors.Trace(kv.ErrInvalidTxn) } - // add an enablement check - if err := tables.CheckTxnConsistency(txn); err != nil { - return errors.Trace(err) - } - txn.mu.Lock() txn.mu.TxnInfo.State = txninfo.TxnCommitting txn.mu.Unlock() From d027e893bcf9fbb78ff9a263a55213edf832ac82 Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 16 Nov 2021 14:44:44 +0800 Subject: [PATCH 13/22] replace dependency Signed-off-by: ekexium --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index ae4de6c0e6ce6..5dacc61e46c49 100644 --- a/go.mod +++ b/go.mod @@ -99,4 +99,4 @@ require ( // FIXME the official repo has some bug makes br_gcs test failed. https://github.com/googleapis/google-cloud-go/pull/3509 // replace cloud.google.com/go/storage => github.com/3pointer/google-cloud-go/storage v1.6.1-0.20210108125931-b59bfa0720b2 -replace github.com/tikv/client-go/v2 => ../client-go +replace github.com/tikv/client-go/v2 => github.com/ekexium/client-go/v2 v2.0.0-verify diff --git a/go.sum b/go.sum index 8ec3ae8ae9147..a80b08cd09f79 100644 --- a/go.sum +++ b/go.sum @@ -172,6 +172,8 @@ github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/ekexium/client-go/v2 v2.0.0-verify h1:pco+t8VC2yv3A8JltAnRecJnyZ1KyZDD0772zuN6paE= +github.com/ekexium/client-go/v2 v2.0.0-verify/go.mod h1:KwtZXt0JD+bP9bWW2ka0ir3Wp3oTEfZUTh22bs2sI4o= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 h1:clC1lXBpe2kTj2VHdaIu9ajZQe4kcEY9j0NsnDDBZ3o= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -694,8 +696,6 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfK github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= -github.com/tikv/client-go/v2 v2.0.0-alpha.0.20210902062307-4fc565e203a9 h1:GFy5AZ/uQR5G5Pbd2CXVoVj19tsCFN2wdrjRXnfXD80= -github.com/tikv/client-go/v2 v2.0.0-alpha.0.20210902062307-4fc565e203a9/go.mod h1:KwtZXt0JD+bP9bWW2ka0ir3Wp3oTEfZUTh22bs2sI4o= github.com/tikv/pd v1.1.0-beta.0.20210323121136-78679e5e209d/go.mod h1:Jw9KG11C/23Rr7DW4XWQ7H5xOgGZo6DFL1OKAF4+Igw= github.com/tikv/pd v1.1.0-beta.0.20210818082359-acba1da0018d h1:AFm1Dzw+QRUevWRfrFp45CPPkuK/zdSWcfxI10z+WVE= github.com/tikv/pd v1.1.0-beta.0.20210818082359-acba1da0018d/go.mod h1:rammPjeZgpvfrQRPkijcx8tlxF1XM5+m6kRXrkDzCAA= From dafe8a9555072b608a1c410c2c20f546e8bc864b Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 18 Nov 2021 13:23:06 +0800 Subject: [PATCH 14/22] add TestOncall4058 Signed-off-by: ekexium --- session/defend_test.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/session/defend_test.go b/session/defend_test.go index 7ea98ab23e882..56308c69242d8 100644 --- a/session/defend_test.go +++ b/session/defend_test.go @@ -55,3 +55,32 @@ func TestCheckInTxn(t *testing.T) { tk.MustExec("commit") require.Nil(t, failpoint.Disable("github.com/pingcap/tidb/table/tables/printMutation")) } + +func TestOncall4058(t *testing.T) { + store, close := testkit.CreateMockStore(t) + defer close() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`drop table if exists t`) + tk.MustExec("set global tidb_enable_mutation_checker = 1") + tk.MustExec("set tidb_enable_mutation_checker = 1") + tk.MustExec("set tidb_txn_mode='optimistic';") + tk.MustExec("set tidb_disable_txn_auto_retry=false;") + + tk.MustExec("create table t(a double key auto_increment, b int);") + tk.MustExec("insert into t values (146576794, 1);") + tk.MustExec("begin;") + tk.MustExec("insert into t(b) select 1; ") + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + tk2.MustExec("begin") + tk2.MustExec("insert into t values (146576795, 1)") + tk2.MustExec("insert into t values (146576796, 1)") + tk2.MustExec("commit") + + // prevent commit + err := tk.ExecToErr("commit") + require.NotNil(t, err) + tk.MustExec("select * from t") +} From a9f1cc2d2812b56172c2104cf2ecec9c2dbe7c9d Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 13 Dec 2021 16:52:56 +0800 Subject: [PATCH 15/22] fix the merge Signed-off-by: ekexium --- go.sum | 2 -- kv/kv.go | 4 ++-- store/driver/txn/unionstore_driver.go | 2 +- table/tables/mutation_checker.go | 4 ++-- 4 files changed, 5 insertions(+), 7 deletions(-) diff --git a/go.sum b/go.sum index c73611b7c50d3..1ab204f14388d 100644 --- a/go.sum +++ b/go.sum @@ -182,8 +182,6 @@ github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/ekexium/client-go/v2 v2.0.0-verify h1:pco+t8VC2yv3A8JltAnRecJnyZ1KyZDD0772zuN6paE= -github.com/ekexium/client-go/v2 v2.0.0-verify/go.mod h1:KwtZXt0JD+bP9bWW2ka0ir3Wp3oTEfZUTh22bs2sI4o= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 h1:clC1lXBpe2kTj2VHdaIu9ajZQe4kcEY9j0NsnDDBZ3o= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= diff --git a/kv/kv.go b/kv/kv.go index 046e5bfd00314..f9759e2ff4daf 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -174,8 +174,8 @@ type MemBuffer interface { // Size returns sum of keys and values length. Size() int - // for test - DeleteKey(k Key) + // RemoveFromBuffer remove the entry from the buffer. It's used for testing. + RemoveFromBuffer(Key) } // LockCtx contains information for LockKeys method. diff --git a/store/driver/txn/unionstore_driver.go b/store/driver/txn/unionstore_driver.go index 3a481007f14a1..ccfe92b9a5932 100644 --- a/store/driver/txn/unionstore_driver.go +++ b/store/driver/txn/unionstore_driver.go @@ -43,7 +43,7 @@ func (m *memBuffer) Delete(k kv.Key) error { return m.MemDB.Delete(k) } -func (m *memBuffer) DeleteKey(k kv.Key) { +func (m *memBuffer) RemoveFromBuffer(k kv.Key) { m.MemDB.RemoveFromBuffer(k) } diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index ea5805dabba94..7d4029e675b0e 100755 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -444,7 +444,7 @@ func corruptMutations(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle, c continue } indexMutation := indexMutations[rand.Intn(len(indexMutations))] - memBuffer.DeleteKey(indexMutation.key) + memBuffer.RemoveFromBuffer(indexMutation.key) } case "corruptIndexKey": // a corrupted index mutation. @@ -455,7 +455,7 @@ func corruptMutations(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle, c } indexMutation := indexMutations[rand.Intn(len(indexMutations))] key := indexMutation.key - memBuffer.DeleteKey(key) + memBuffer.RemoveFromBuffer(key) key[len(key)-1] += 1 memBuffer.Set(key, indexMutation.value) } From 91c8c51ad62729d9eb6e48edce867913fcee1dce Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 13 Dec 2021 22:44:58 +0800 Subject: [PATCH 16/22] remove unnecessary tests and debug code Signed-off-by: ekexium --- kv/kv.go | 2 +- session/defend_test.go | 86 -------------------------------- session/session_test.go | 42 ++++++++++++---- table/tables/mutation_checker.go | 84 ++++++++++++++++--------------- tablecodec/tablecodec.go | 4 -- 5 files changed, 78 insertions(+), 140 deletions(-) delete mode 100644 session/defend_test.go diff --git a/kv/kv.go b/kv/kv.go index f9759e2ff4daf..c3168e283c60c 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -174,7 +174,7 @@ type MemBuffer interface { // Size returns sum of keys and values length. Size() int - // RemoveFromBuffer remove the entry from the buffer. It's used for testing. + // RemoveFromBuffer removes the entry from the buffer. It's used for testing. RemoveFromBuffer(Key) } diff --git a/session/defend_test.go b/session/defend_test.go deleted file mode 100644 index 56308c69242d8..0000000000000 --- a/session/defend_test.go +++ /dev/null @@ -1,86 +0,0 @@ -package session_test - -import ( - "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/testkit" - "github.com/stretchr/testify/require" - "testing" -) - -func TestDefend14631(t *testing.T) { - store, close := testkit.CreateMockStore(t) - defer close() - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec(`drop table if exists t1`) - tk.MustExec(`create table t1(c1 decimal(6,4), primary key(c1))`) - tk.MustExec(`insert into t1 set c1 = 0.1`) - require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/table/tables/printMutation", "return")) - tk.MustExec(`insert into t1 set c1 = 0.1 on duplicate key update c1 = 1`) - require.Nil(t, failpoint.Disable("github.com/pingcap/tidb/table/tables/printMutation")) - tk.MustExec("admin check table t1") - //tk.MustQuery(`select * from t1 use index(primary)`).Check(testkit.Rows(`1.0000`)) -} - -func TestCorrupt(t *testing.T) { - store, close := testkit.CreateMockStore(t) - defer close() - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec(`drop table if exists t1`) - tk.MustExec("set global tidb_enable_mutation_checker = true;") - tk.MustExec("set tidb_enable_mutation_checker = true;") - tk.MustQuery("select @@tidb_enable_mutation_checker").Check(testkit.Rows("1")) - tk.MustExec(`CREATE TABLE t1653 (c1 VARCHAR(10), c1377 VARCHAR(10), KEY i1654 (c1, c1377), KEY i1655 (c1377, c1))`) - failpoint.Enable("github.com/pingcap/tidb/table/tables/corruptMutations", "return(\"missingIndex\")") - tk.MustExec("begin") - tk.MustExec(`insert into t1653 set c1 = 'a', c1377 = 'b'`) - tk.MustExec(`insert into t1653 values('aa', 'bb')`) - tk.MustExec("commit") - failpoint.Disable("github.com/pingcap/tidb/table/tables/corruptMutations") -} - -func TestCheckInTxn(t *testing.T) { - store, close := testkit.CreateMockStore(t) - defer close() - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec(`drop table if exists t`) - tk.MustExec("set global tidb_enable_mutation_checker = true;") - tk.MustExec("create table t(id int, v varchar(20), unique key i1(id))") - tk.MustExec("insert into t values (1, 'a')") - require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/table/tables/printMutation", "return")) - tk.MustExec("begin") - tk.MustExec("insert into t values (1, 'd'), (3, 'f') on duplicate key update v='x'") - tk.MustExec("commit") - require.Nil(t, failpoint.Disable("github.com/pingcap/tidb/table/tables/printMutation")) -} - -func TestOncall4058(t *testing.T) { - store, close := testkit.CreateMockStore(t) - defer close() - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec(`drop table if exists t`) - tk.MustExec("set global tidb_enable_mutation_checker = 1") - tk.MustExec("set tidb_enable_mutation_checker = 1") - tk.MustExec("set tidb_txn_mode='optimistic';") - tk.MustExec("set tidb_disable_txn_auto_retry=false;") - - tk.MustExec("create table t(a double key auto_increment, b int);") - tk.MustExec("insert into t values (146576794, 1);") - tk.MustExec("begin;") - tk.MustExec("insert into t(b) select 1; ") - - tk2 := testkit.NewTestKit(t, store) - tk2.MustExec("use test") - tk2.MustExec("begin") - tk2.MustExec("insert into t values (146576795, 1)") - tk2.MustExec("insert into t values (146576796, 1)") - tk2.MustExec("commit") - - // prevent commit - err := tk.ExecToErr("commit") - require.NotNil(t, err) - tk.MustExec("select * from t") -} diff --git a/session/session_test.go b/session/session_test.go index b8d5f1b6a0e9b..8bb15ad4cb6d8 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -27,6 +27,7 @@ import ( "strings" "sync" "sync/atomic" + "testing" "time" "github.com/docker/go-units" @@ -61,6 +62,7 @@ import ( "github.com/pingcap/tidb/store/mockstore/mockcopr" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" + testkit2 "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/sqlexec" @@ -5884,14 +5886,34 @@ func (s *testSessionSuite) TestSameNameObjectWithLocalTemporaryTable(c *C) { ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) } -func (s *testSessionSuite2) TestDefend24029(c *C) { - tk := testkit.NewTestKitWithInit(c, s.store) - collate.SetNewCollationEnabledForTest(true) - defer collate.SetNewCollationEnabledForTest(false) - tk.MustExec("create table t(a int key, b varchar(20) collate utf8mb4_unicode_ci, c varchar(20) collate utf8mb4_general_ci, unique key idx_b_c(b, c));") - c.Assert(failpoint.Enable("github.com/pingcap/tidb/tablecodec/injectNeedRestoredData", "return(false)"), IsNil) - _, err := tk.Exec("insert into t values (4, 'd', 'F');") - c.Assert(err, NotNil) - c.Assert(strings.Contains(err.Error(), "inconsistent index values"), IsTrue) - c.Assert(failpoint.Disable("github.com/pingcap/tidb/tablecodec/injectNeedRestoredData"), IsNil) +func TestCorrupt(t *testing.T) { + store, close := testkit2.CreateMockStore(t) + defer close() + tk := testkit2.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`drop table if exists t1`) + tk.MustExec("set global tidb_enable_mutation_checker = true;") + tk.MustExec("set tidb_enable_mutation_checker = true;") + tk.MustQuery("select @@tidb_enable_mutation_checker").Check(testkit2.Rows("1")) + tk.MustExec(`CREATE TABLE t1653 (c1 VARCHAR(10), c1377 VARCHAR(10), KEY i1654 (c1, c1377), KEY i1655 (c1377, c1))`) + failpoint.Enable("github.com/pingcap/tidb/table/tables/corruptMutations", "return(\"missingIndex\")") + tk.MustExec("begin") + tk.MustExec(`insert into t1653 set c1 = 'a', c1377 = 'b'`) + tk.MustExec(`insert into t1653 values('aa', 'bb')`) + tk.MustExec("commit") + failpoint.Disable("github.com/pingcap/tidb/table/tables/corruptMutations") +} + +func TestCheckInTxn(t *testing.T) { + store, close := testkit2.CreateMockStore(t) + defer close() + tk := testkit2.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`drop table if exists t`) + tk.MustExec("set global tidb_enable_mutation_checker = true;") + tk.MustExec("create table t(id int, v varchar(20), unique key i1(id))") + tk.MustExec("insert into t values (1, 'a')") + tk.MustExec("begin") + tk.MustExec("insert into t values (1, 'd'), (3, 'f') on duplicate key update v='x'") + tk.MustExec("commit") } diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 7d4029e675b0e..af95632956bc8 100755 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -15,11 +15,10 @@ package tables import ( - "encoding/hex" + "crypto/rand" "fmt" - "math/rand" + "math/big" "strings" - "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -70,10 +69,15 @@ type columnMaps struct { func CheckDataConsistency( txn kv.Transaction, sessVars *variable.SessionVars, t *TableCommon, rowToInsert, rowToRemove []types.Datum, memBuffer kv.MemBuffer, sh kv.StagingHandle, -) error { +) (err error) { failpoint.Inject("corruptMutations", func(commands failpoint.Value) { - corruptMutations(t, txn, sh, commands.(string)) + if e := corruptMutations(t, txn, sh, commands.(string)); e != nil { + err = e + } }) + if err != nil { + return err + } if t.Meta().GetPartitionInfo() != nil { return nil @@ -82,21 +86,11 @@ func CheckDataConsistency( // some implementations of MemBuffer doesn't support staging, e.g. that in br/pkg/lightning/backend/kv return nil } - indexMutations, rowInsertion, _, err := collectTableMutationsFromBufferStage(t, memBuffer, sh) + indexMutations, rowInsertion, err := collectTableMutationsFromBufferStage(t, memBuffer, sh) if err != nil { return errors.Trace(err) } - failpoint.Inject("printMutation", func() { - fmt.Println("------------------------------------------------") - fmt.Printf("row to insert: %+v\nrow to remove: %+v\n", rowToInsert, rowToRemove) - fmt.Printf("row insertion: key: %v, value: %v\n", hex.EncodeToString(rowInsertion.key), - hex.EncodeToString(rowInsertion.value)) - for _, im := range indexMutations { - fmt.Printf("index mutation: key: %v, value: %v\n", hex.EncodeToString(im.key), hex.EncodeToString(im.value)) - } - }) - columnMaps := getColumnMaps(txn, t) if rowToInsert != nil { @@ -274,14 +268,14 @@ func checkRowInsertionConsistency( } // collectTableMutationsFromBufferStage collects mutations of the current table from the mem buffer stage -// It returns: (1) all index mutations (2) the only row insertion (3) the only row deletion -// If there are no row insertions/deletions, return nil +// It returns: (1) all index mutations (2) the only row insertion +// If there are no row insertions, the 2nd returned value is nil // If there are multiple row insertions, an error is returned func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer, sh kv.StagingHandle) ( - []mutation, mutation, mutation, error, + []mutation, mutation, error, ) { indexMutations := make([]mutation, 0) - var rowInsertion, rowDeletion mutation + var rowInsertion mutation var err error inspector := func(key kv.Key, flags kv.KeyFlags, data []byte) { // only check the current table @@ -296,16 +290,7 @@ func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer "multiple row mutations added/mutated, one = %+v, another = %+v", rowInsertion, m, ) } - } else { - if rowDeletion.key == nil { - rowDeletion = m - } else { - err = errors.Errorf( - "multiple row mutations deleted, one = %+v, another = %+v", rowDeletion, m, - ) - } } - } else { _, m.indexID, _, err = tablecodec.DecodeIndexKey(m.key) if err != nil { @@ -316,7 +301,7 @@ func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer } } memBuffer.InspectStage(sh, inspector) - return indexMutations, rowInsertion, rowDeletion, err + return indexMutations, rowInsertion, err } // compareIndexData compares the decoded index data with the input data. @@ -410,9 +395,8 @@ func getOrBuildColumnMaps( func corruptMutations(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle, cmds string) error { commands := strings.Split(cmds, ",") memBuffer := txn.GetMemBuffer() - rand := rand.New(rand.NewSource(time.Now().UnixNano())) - indexMutations, _, _, err := collectTableMutationsFromBufferStage(t, memBuffer, sh) + indexMutations, _, err := collectTableMutationsFromBufferStage(t, memBuffer, sh) if err != nil { return errors.Trace(err) } @@ -426,11 +410,17 @@ func corruptMutations(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle, c if len(indexMutations) == 0 { continue } - indexMutation := indexMutations[rand.Intn(len(indexMutations))] + r, err := rand.Int(rand.Reader, big.NewInt(int64(len(indexMutations)))) + if err != nil { + return errors.Trace(err) + } + indexMutation := indexMutations[r.Int64()] key := make([]byte, len(indexMutation.key)) copy(key, indexMutation.key) key[len(key)-1] += 1 - memBuffer.Set(key, indexMutation.value) + if err := memBuffer.Set(key, indexMutation.value); err != nil { + return errors.Trace(err) + } } //case "extraIndexByHandle": // { @@ -443,7 +433,11 @@ func corruptMutations(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle, c if len(indexMutations) == 0 { continue } - indexMutation := indexMutations[rand.Intn(len(indexMutations))] + r, err := rand.Int(rand.Reader, big.NewInt(int64(len(indexMutations)))) + if err != nil { + return errors.Trace(err) + } + indexMutation := indexMutations[r.Int64()] memBuffer.RemoveFromBuffer(indexMutation.key) } case "corruptIndexKey": @@ -453,11 +447,17 @@ func corruptMutations(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle, c if len(indexMutations) == 0 { continue } - indexMutation := indexMutations[rand.Intn(len(indexMutations))] + r, err := rand.Int(rand.Reader, big.NewInt(int64(len(indexMutations)))) + if err != nil { + return errors.Trace(err) + } + indexMutation := indexMutations[r.Int64()] key := indexMutation.key memBuffer.RemoveFromBuffer(key) key[len(key)-1] += 1 - memBuffer.Set(key, indexMutation.value) + if err := memBuffer.Set(key, indexMutation.value); err != nil { + return errors.Trace(err) + } } //case "corruptIndexKeyByHandle": // { @@ -469,12 +469,18 @@ func corruptMutations(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle, c if len(indexMutations) == 0 { continue } - indexMutation := indexMutations[rand.Intn(len(indexMutations))] + r, err := rand.Int(rand.Reader, big.NewInt(int64(len(indexMutations)))) + if err != nil { + return errors.Trace(err) + } + indexMutation := indexMutations[r.Int64()] value := indexMutation.value if len(value) > 0 { value[len(value)-1] += 1 } - memBuffer.Set(indexMutation.key, value) + if err := memBuffer.Set(indexMutation.key, value); err != nil { + return errors.Trace(err) + } } //case "corruptIndexValueCommonHandle": // { diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index c3b50c50b6425..f610e09104572 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -17,7 +17,6 @@ package tablecodec import ( "bytes" "encoding/binary" - "github.com/pingcap/failpoint" "math" "strings" "time" @@ -1130,9 +1129,6 @@ func GenIndexKey(sc *stmtctx.StatementContext, tblInfo *model.TableInfo, idxInfo // | Besides, if the collation of b is _bin, then restored data is an integer indicate the spaces are truncated. Then we use sortKey // | and the restored data together to restore original data. func GenIndexValuePortal(sc *stmtctx.StatementContext, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, needRestoredData bool, distinct bool, untouched bool, indexedValues []types.Datum, h kv.Handle, partitionID int64, restoredData []types.Datum) ([]byte, error) { - failpoint.Inject("injectNeedRestoredData", func(val failpoint.Value) { - needRestoredData = val.(bool) - }) if tblInfo.IsCommonHandle && tblInfo.CommonHandleVersion == 1 { return GenIndexValueForClusteredIndexVersion1(sc, tblInfo, idxInfo, needRestoredData, distinct, untouched, indexedValues, h, partitionID, restoredData) } From cc7c2840cf2b3346b1ac82b1e52124587c473be4 Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 14 Dec 2021 21:54:33 +0800 Subject: [PATCH 17/22] move the injection out of the mutation checker Signed-off-by: ekexium --- session/session_test.go | 2 +- table/tables/mutation_checker.go | 18 ++++++++---------- table/tables/tables.go | 11 +++++++++++ 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/session/session_test.go b/session/session_test.go index 8bb15ad4cb6d8..da3276d1742af 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -5896,7 +5896,7 @@ func TestCorrupt(t *testing.T) { tk.MustExec("set tidb_enable_mutation_checker = true;") tk.MustQuery("select @@tidb_enable_mutation_checker").Check(testkit2.Rows("1")) tk.MustExec(`CREATE TABLE t1653 (c1 VARCHAR(10), c1377 VARCHAR(10), KEY i1654 (c1, c1377), KEY i1655 (c1377, c1))`) - failpoint.Enable("github.com/pingcap/tidb/table/tables/corruptMutations", "return(\"missingIndex\")") + failpoint.Enable("github.com/pingcap/tidb/table/tables/corruptMutations", "return(\"extraIndex\")") tk.MustExec("begin") tk.MustExec(`insert into t1653 set c1 = 'a', c1377 = 'b'`) tk.MustExec(`insert into t1653 values('aa', 'bb')`) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index af95632956bc8..7c5093d347fe6 100755 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -69,16 +69,7 @@ type columnMaps struct { func CheckDataConsistency( txn kv.Transaction, sessVars *variable.SessionVars, t *TableCommon, rowToInsert, rowToRemove []types.Datum, memBuffer kv.MemBuffer, sh kv.StagingHandle, -) (err error) { - failpoint.Inject("corruptMutations", func(commands failpoint.Value) { - if e := corruptMutations(t, txn, sh, commands.(string)); e != nil { - err = e - } - }) - if err != nil { - return err - } - +) error { if t.Meta().GetPartitionInfo() != nil { return nil } @@ -530,3 +521,10 @@ func CheckTxnConsistency(txn kv.Transaction) error { } return nil } + +func injectMutationError(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle) error { + failpoint.Inject("corruptMutations", func(commands failpoint.Value) { + failpoint.Return(corruptMutations(t, txn, sh, commands.(string))) + }) + return nil +} diff --git a/table/tables/tables.go b/table/tables/tables.go index db627e2c69998..dc4f6fc66579f 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -429,11 +429,16 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, if err = memBuffer.Set(key, value); err != nil { return err } + + if err = injectMutationError(t, txn, sh); err != nil { + return err + } if sessVars.EnableMutationChecker { if err = CheckDataConsistency(txn, sessVars, t, newData, oldData, memBuffer, sh); err != nil { return errors.Trace(err) } } + memBuffer.Release(sh) if shouldWriteBinlog(sctx, t.meta) { if !t.meta.PKIsHandle && !t.meta.IsCommonHandle { @@ -849,6 +854,9 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . return h, err } + if err = injectMutationError(t, txn, sh); err != nil { + return nil, err + } if sessVars.EnableMutationChecker { if err = CheckDataConsistency(txn, sessVars, t, r, nil, memBuffer, sh); err != nil { return nil, errors.Trace(err) @@ -1109,6 +1117,9 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type sessVars := ctx.GetSessionVars() sc := sessVars.StmtCtx + if err = injectMutationError(t, txn, sh); err != nil { + return err + } if sessVars.EnableMutationChecker { if err = CheckDataConsistency(txn, sessVars, t, nil, r, memBuffer, sh); err != nil { return errors.Trace(err) From b17b8b858b01b497b6f8fa68ceda0151ee9eb58f Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 15 Dec 2021 15:33:44 +0800 Subject: [PATCH 18/22] remove tests Signed-off-by: ekexium --- session/session_test.go | 34 ---------------------------------- 1 file changed, 34 deletions(-) diff --git a/session/session_test.go b/session/session_test.go index da3276d1742af..d0abd9d9f160a 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -27,7 +27,6 @@ import ( "strings" "sync" "sync/atomic" - "testing" "time" "github.com/docker/go-units" @@ -62,7 +61,6 @@ import ( "github.com/pingcap/tidb/store/mockstore/mockcopr" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" - testkit2 "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/sqlexec" @@ -5885,35 +5883,3 @@ func (s *testSessionSuite) TestSameNameObjectWithLocalTemporaryTable(c *C) { " `cs1` int(11) DEFAULT NULL\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) } - -func TestCorrupt(t *testing.T) { - store, close := testkit2.CreateMockStore(t) - defer close() - tk := testkit2.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec(`drop table if exists t1`) - tk.MustExec("set global tidb_enable_mutation_checker = true;") - tk.MustExec("set tidb_enable_mutation_checker = true;") - tk.MustQuery("select @@tidb_enable_mutation_checker").Check(testkit2.Rows("1")) - tk.MustExec(`CREATE TABLE t1653 (c1 VARCHAR(10), c1377 VARCHAR(10), KEY i1654 (c1, c1377), KEY i1655 (c1377, c1))`) - failpoint.Enable("github.com/pingcap/tidb/table/tables/corruptMutations", "return(\"extraIndex\")") - tk.MustExec("begin") - tk.MustExec(`insert into t1653 set c1 = 'a', c1377 = 'b'`) - tk.MustExec(`insert into t1653 values('aa', 'bb')`) - tk.MustExec("commit") - failpoint.Disable("github.com/pingcap/tidb/table/tables/corruptMutations") -} - -func TestCheckInTxn(t *testing.T) { - store, close := testkit2.CreateMockStore(t) - defer close() - tk := testkit2.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec(`drop table if exists t`) - tk.MustExec("set global tidb_enable_mutation_checker = true;") - tk.MustExec("create table t(id int, v varchar(20), unique key i1(id))") - tk.MustExec("insert into t values (1, 'a')") - tk.MustExec("begin") - tk.MustExec("insert into t values (1, 'd'), (3, 'f') on duplicate key update v='x'") - tk.MustExec("commit") -} From 0ff10f9034b44ee3cae43e5908e73bc29db317b2 Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 22 Dec 2021 22:32:00 +0800 Subject: [PATCH 19/22] remove index count check Signed-off-by: ekexium --- session/session.go | 8 ------- table/tables/mutation_checker.go | 38 +------------------------------- 2 files changed, 1 insertion(+), 45 deletions(-) diff --git a/session/session.go b/session/session.go index 55d8267ec3b30..7d639f654e902 100644 --- a/session/session.go +++ b/session/session.go @@ -69,7 +69,6 @@ import ( "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" storeerr "github.com/pingcap/tidb/store/driver/error" - "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/table/temptable" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/telemetry" @@ -565,13 +564,6 @@ func (s *session) doCommit(ctx context.Context) error { func (s *session) commitTxnWithTemporaryData(ctx context.Context, txn kv.Transaction) error { sessVars := s.sessionVars - // TODO: add an enablement check - if sessVars.InTxn() { - if err := tables.CheckTxnConsistency(txn); err != nil { - return errors.Trace(err) - } - } - txnTempTables := sessVars.TxnCtx.TemporaryTables if len(txnTempTables) == 0 { return txn.Commit(ctx) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index f751a91ee92b9..45bc43e665503 100755 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -21,8 +21,8 @@ import ( "strings" "github.com/pingcap/errors" - "github.com/pingcap/tidb/errno" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/stmtctx" @@ -493,42 +493,6 @@ func corruptMutations(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle, c return nil } -// CheckTxnConsistency checks the number of row/index mutations before the txn commits, -// to prevent some inconsistent transactions. -func CheckTxnConsistency(txn kv.Transaction) error { - memBuffer := txn.GetMemBuffer() - if memBuffer == nil { - return nil - } - - // count for each table - indexInsertionCount := make(map[int64]int) - rowInsertionCount := make(map[int64]int) - f := func(k kv.Key, v []byte) error { - if len(v) > 0 { - tableID := tablecodec.DecodeTableID(k) - if rowcodec.IsRowKey(k) { - rowInsertionCount[tableID] += 1 - } else { - indexInsertionCount[tableID] += 1 - } - } - return nil - } - if err := kv.WalkMemBuffer(memBuffer, f); err != nil { - return errors.Trace(err) - } - for tableID, count := range indexInsertionCount { - // FIXME: always? what if like backfilling? - if rowInsertionCount[tableID] > 0 && count%rowInsertionCount[tableID] != 0 { - return errors.Errorf("inconsistent index insertion count %d and row insertion count %d", - count, rowInsertionCount[tableID]) - } - - } - return nil -} - func injectMutationError(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle) error { failpoint.Inject("corruptMutations", func(commands failpoint.Value) { failpoint.Return(corruptMutations(t, txn, sh, commands.(string))) From 6049add47faecd07a87fa98f5a30994152d1b3ac Mon Sep 17 00:00:00 2001 From: ekexium Date: Fri, 24 Dec 2021 17:20:03 +0800 Subject: [PATCH 20/22] remove randomness Signed-off-by: ekexium --- table/tables/mutation_checker.go | 26 ++++---------------------- 1 file changed, 4 insertions(+), 22 deletions(-) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 45bc43e665503..411a79513f09b 100755 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -15,9 +15,7 @@ package tables import ( - "crypto/rand" "fmt" - "math/big" "strings" "github.com/pingcap/errors" @@ -408,11 +406,7 @@ func corruptMutations(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle, c if len(indexMutations) == 0 { continue } - r, err := rand.Int(rand.Reader, big.NewInt(int64(len(indexMutations)))) - if err != nil { - return errors.Trace(err) - } - indexMutation := indexMutations[r.Int64()] + indexMutation := indexMutations[0] key := make([]byte, len(indexMutation.key)) copy(key, indexMutation.key) key[len(key)-1] += 1 @@ -431,11 +425,7 @@ func corruptMutations(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle, c if len(indexMutations) == 0 { continue } - r, err := rand.Int(rand.Reader, big.NewInt(int64(len(indexMutations)))) - if err != nil { - return errors.Trace(err) - } - indexMutation := indexMutations[r.Int64()] + indexMutation := indexMutations[0] memBuffer.RemoveFromBuffer(indexMutation.key) } case "corruptIndexKey": @@ -445,11 +435,7 @@ func corruptMutations(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle, c if len(indexMutations) == 0 { continue } - r, err := rand.Int(rand.Reader, big.NewInt(int64(len(indexMutations)))) - if err != nil { - return errors.Trace(err) - } - indexMutation := indexMutations[r.Int64()] + indexMutation := indexMutations[0] key := indexMutation.key memBuffer.RemoveFromBuffer(key) key[len(key)-1] += 1 @@ -467,11 +453,7 @@ func corruptMutations(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle, c if len(indexMutations) == 0 { continue } - r, err := rand.Int(rand.Reader, big.NewInt(int64(len(indexMutations)))) - if err != nil { - return errors.Trace(err) - } - indexMutation := indexMutations[r.Int64()] + indexMutation := indexMutations[0] value := indexMutation.value if len(value) > 0 { value[len(value)-1] += 1 From 74a51e77e93708ea80497e4b0985cca5f4a4f5c9 Mon Sep 17 00:00:00 2001 From: ekexium Date: Fri, 14 Jan 2022 14:01:39 +0800 Subject: [PATCH 21/22] fix imports Signed-off-by: ekexium --- session/session.go | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/session/session.go b/session/session.go index e5ff0622a0bdf..46b0e4260e8af 100644 --- a/session/session.go +++ b/session/session.go @@ -37,22 +37,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/tidb/parser" - "github.com/pingcap/tidb/parser/ast" - "github.com/pingcap/tidb/parser/auth" - "github.com/pingcap/tidb/parser/charset" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/parser/terror" - "github.com/pingcap/tidb/sessiontxn" - "github.com/pingcap/tidb/store/driver/txn" - "github.com/pingcap/tidb/table/tables" - "github.com/pingcap/tidb/table/temptable" - "github.com/pingcap/tidb/util/topsql" - topsqlstate "github.com/pingcap/tidb/util/topsql/state" - "github.com/pingcap/tipb/go-binlog" - "go.uber.org/zap" - "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" @@ -82,9 +66,12 @@ import ( "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" storeerr "github.com/pingcap/tidb/store/driver/error" + "github.com/pingcap/tidb/store/driver/txn" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/table/temptable" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/telemetry" @@ -101,6 +88,7 @@ import ( "github.com/pingcap/tidb/util/tableutil" "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tidb/util/topsql" + topsqlstate "github.com/pingcap/tidb/util/topsql/state" "github.com/pingcap/tipb/go-binlog" tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" From 91d66585693790dbe6acf22b568e3ad9aa1b4d66 Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 17 Jan 2022 21:12:25 +0800 Subject: [PATCH 22/22] don't skip deletions if possible Signed-off-by: ekexium --- table/tables/mutation_checker.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index ea226d758a4d7..c9deb8bb381c2 100755 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -411,8 +411,14 @@ func corruptMutations(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle, c key := make([]byte, len(indexMutation.key)) copy(key, indexMutation.key) key[len(key)-1] += 1 - if err := memBuffer.Set(key, indexMutation.value); err != nil { - return errors.Trace(err) + if len(indexMutation.value) == 0 { + if err := memBuffer.Delete(key); err != nil { + return errors.Trace(err) + } + } else { + if err := memBuffer.Set(key, indexMutation.value); err != nil { + return errors.Trace(err) + } } } //case "extraIndexByHandle": @@ -423,9 +429,6 @@ func corruptMutations(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle, c // "missIndex" should be placed in front of "extraIndex"es, // in case it removes the mutation that was just added { - if len(indexMutations) == 0 { - continue - } indexMutation := indexMutations[0] memBuffer.RemoveFromBuffer(indexMutation.key) } @@ -433,9 +436,6 @@ func corruptMutations(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle, c // a corrupted index mutation. // TODO: distinguish which part is corrupted, value or handle { - if len(indexMutations) == 0 { - continue - } indexMutation := indexMutations[0] key := indexMutation.key memBuffer.RemoveFromBuffer(key)