Skip to content

Commit

Permalink
*: lock row keys during merging back from temp index (#39936)
Browse files Browse the repository at this point in the history
close #39929
  • Loading branch information
tangenta authored Dec 14, 2022
1 parent 62a8001 commit c26a6b5
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 60 deletions.
37 changes: 25 additions & 12 deletions ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type temporaryIndexRecord struct {
delete bool
unique bool
distinct bool
rowKey kv.Key
}

type mergeIndexWorker struct {
Expand Down Expand Up @@ -133,6 +134,14 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC
if idxRecord.skip {
continue
}

// Lock the corresponding row keys so that it doesn't modify the index KVs
// that are changing by a pessimistic transaction.
err := txn.LockKeys(context.Background(), new(kv.LockCtx), idxRecord.rowKey)
if err != nil {
return errors.Trace(err)
}

if idxRecord.delete {
if idxRecord.unique {
err = txn.GetMemBuffer().DeleteWithFlags(w.originIdxKeys[i], kv.SetNeedLocked)
Expand All @@ -149,6 +158,7 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC
}
return nil
})

logSlowOperations(time.Since(oprStartTime), "AddIndexMergeDataInTxn", 3000)
return
}
Expand All @@ -166,6 +176,7 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor
oprStartTime := startTime
idxPrefix := w.table.IndexPrefix()
var lastKey kv.Key
isCommonHandle := w.table.Meta().IsCommonHandle
err := iterateSnapshotKeys(w.reorgInfo.d.jobContext(w.reorgInfo.Job), w.sessCtx.GetStore(), w.priority, idxPrefix, txn.StartTS(),
taskRange.startKey, taskRange.endKey, func(_ kv.Handle, indexKey kv.Key, rawValue []byte) (more bool, err error) {
oprEndTime := time.Now()
Expand All @@ -182,35 +193,37 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor
return false, nil
}

isDelete := false
unique := false
length := len(rawValue)
keyVer := rawValue[length-1]
originVal, handle, isDelete, unique, keyVer := tablecodec.DecodeTempIndexValue(rawValue, isCommonHandle)
if keyVer == tables.TempIndexKeyTypeMerge || keyVer == tables.TempIndexKeyTypeDelete {
// For 'm' version kvs, they are double-written.
// For 'd' version kvs, they are written in the delete-only state and can be dropped safely.
return true, nil
}
rawValue = rawValue[:length-1]
if bytes.Equal(rawValue, tables.DeleteMarker) {
isDelete = true
} else if bytes.Equal(rawValue, tables.DeleteMarkerUnique) {
isDelete = true
unique = true

if handle == nil {
// If the handle is not found in the value of the temp index, it means
// 1) This is not a deletion marker, the handle is in the key or the origin value.
// 2) This is a deletion marker, but the handle is in the key of temp index.
handle, err = tablecodec.DecodeIndexHandle(indexKey, originVal, len(w.index.Meta().Columns))
if err != nil {
return false, err
}
}
rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), handle)

originIdxKey := make([]byte, len(indexKey))
copy(originIdxKey, indexKey)
tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey)

idxRecord := &temporaryIndexRecord{
rowKey: rowKey,
delete: isDelete,
unique: unique,
skip: false,
}
if !isDelete {
idxRecord.vals = rawValue
idxRecord.distinct = tablecodec.IndexKVIsUnique(rawValue)
idxRecord.vals = originVal
idxRecord.distinct = tablecodec.IndexKVIsUnique(originVal)
}
w.tmpIdxRecords = append(w.tmpIdxRecords, idxRecord)
w.originIdxKeys = append(w.originIdxKeys, originIdxKey)
Expand Down
63 changes: 63 additions & 0 deletions ddl/index_merge_tmp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ddl_test

import (
"testing"
"time"

"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/ingest"
Expand Down Expand Up @@ -376,3 +377,65 @@ func TestAddIndexMergeIndexUpdateOnDeleteOnly(t *testing.T) {
}
tk.MustExec("admin check table t;")
}

func TestAddIndexMergeConflictWithPessimistic(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk.MustExec(`CREATE TABLE t (id int primary key, a int);`)
tk.MustExec(`INSERT INTO t VALUES (1, 1);`)

// Force onCreateIndex use the txn-merge process.
ingest.LitInitialized = false
tk.MustExec("set @@global.tidb_ddl_enable_fast_reorg = 1;")
tk.MustExec("set @@global.tidb_enable_metadata_lock = 0;")

originHook := dom.DDL().GetHook()
callback := &ddl.TestDDLCallback{Do: dom}

runPessimisticTxn := false
callback.OnJobRunBeforeExported = func(job *model.Job) {
if t.Failed() {
return
}
if job.SchemaState == model.StateWriteOnly {
// Write a record to the temp index.
_, err := tk2.Exec("update t set a = 2 where id = 1;")
assert.NoError(t, err)
}
if !runPessimisticTxn && job.SchemaState == model.StateWriteReorganization {
idx := findIdxInfo(dom, "test", "t", "idx")
if idx == nil {
return
}
if idx.BackfillState != model.BackfillStateReadyToMerge {
return
}
runPessimisticTxn = true
_, err := tk2.Exec("begin pessimistic;")
assert.NoError(t, err)
_, err = tk2.Exec("update t set a = 3 where id = 1;")
assert.NoError(t, err)
}
}
dom.DDL().SetHook(callback)
afterCommit := make(chan struct{}, 1)
go func() {
tk.MustExec("alter table t add index idx(a);")
afterCommit <- struct{}{}
}()
timer := time.NewTimer(300 * time.Millisecond)
select {
case <-timer.C:
break
case <-afterCommit:
require.Fail(t, "should be blocked by the pessimistic txn")
}
tk2.MustExec("rollback;")
<-afterCommit
dom.DDL().SetHook(originHook)
tk.MustExec("admin check table t;")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 2"))
}
12 changes: 8 additions & 4 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package executor

import (
"bytes"
"context"
"encoding/hex"
"fmt"
Expand All @@ -32,7 +31,6 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -160,6 +158,12 @@ func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []t
for _, r := range rows {
for _, uk := range r.uniqueKeys {
if val, found := values[string(uk.newKey)]; found {
if tablecodec.IsTempIndexKey(uk.newKey) {
if tablecodec.CheckTempIndexValueIsDelete(val) {
continue
}
val = tablecodec.DecodeTempIndexOriginValue(val)
}
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle)
if err != nil {
return err
Expand Down Expand Up @@ -267,10 +271,10 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
// Since the temp index stores deleted key with marked 'deleteu' for unique key at the end
// of value, So if return a key we check and skip deleted key.
if tablecodec.IsTempIndexKey(uk.newKey) {
rowVal := val[:len(val)-1]
if bytes.Equal(rowVal, tables.DeleteMarkerUnique) {
if tablecodec.CheckTempIndexValueIsDelete(val) {
continue
}
val = tablecodec.DecodeTempIndexOriginValue(val)
}
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r
}
return false, false, err
}
if tablecodec.IsTempIndexKey(uk.newKey) {
if tablecodec.CheckTempIndexValueIsDelete(val) {
continue
}
val = tablecodec.DecodeTempIndexOriginValue(val)
}
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle)
if err != nil {
return false, true, err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
cloud.google.com/go/storage v1.21.0
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.12.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.2.0
github.com/BurntSushi/toml v1.2.1
Expand Down Expand Up @@ -129,7 +130,6 @@ require (
cloud.google.com/go v0.100.2 // indirect
cloud.google.com/go/compute v1.5.0 // indirect
cloud.google.com/go/iam v0.1.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.1 // indirect
github.com/DataDog/zstd v1.4.5 // indirect
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
Expand Down
60 changes: 22 additions & 38 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package tables

import (
"bytes"
"context"
"errors"
"sync"
Expand Down Expand Up @@ -179,15 +178,15 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue

if !distinct || skipCheck || opt.Untouched {
if keyIsTempIdxKey && !opt.Untouched { // Untouched key-values never occur in the storage.
idxVal = append(idxVal, keyVer)
idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer)
}
err = txn.GetMemBuffer().Set(key, idxVal)
if err != nil {
return nil, err
}
if len(tempKey) > 0 {
if !opt.Untouched { // Untouched key-values never occur in the storage.
idxVal = append(idxVal, keyVer)
idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer)
}
err = txn.GetMemBuffer().Set(tempKey, idxVal)
if err != nil {
Expand Down Expand Up @@ -226,11 +225,11 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
if err != nil && !kv.IsErrNotFound(err) {
return nil, err
}
if err != nil || len(value) == 0 {
if err != nil || len(value) == 0 || (keyIsTempIdxKey && tablecodec.CheckTempIndexValueIsDelete(value)) {
lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil
var needPresumeKey tempIndexKeyState
var needPresumeKey TempIndexKeyState
if keyIsTempIdxKey {
idxVal = append(idxVal, keyVer)
idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer)
needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, key, distinct, h, c.tblInfo.IsCommonHandle)
if err != nil {
return nil, err
Expand Down Expand Up @@ -260,7 +259,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
return nil, err
}
if len(tempKey) > 0 {
idxVal = append(idxVal, keyVer)
idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer)
if lazyCheck && needPresumeKey != KeyInTempIndexIsDeleted {
err = txn.GetMemBuffer().SetWithFlags(tempKey, idxVal, kv.SetPresumeKeyNotExists)
} else {
Expand All @@ -281,20 +280,16 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
return nil, err
}

if keyIsTempIdxKey {
value = tablecodec.DecodeTempIndexOriginValue(value)
}
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle)
if err != nil {
return nil, err
}
return handle, kv.ErrKeyExists
}

var (
// DeleteMarker is a marker that the key is deleted.
DeleteMarker = []byte("delete")
// DeleteMarkerUnique is a marker that the unique index key is deleted.
DeleteMarkerUnique = []byte("deleteu")
)

// Delete removes the entry for handle h and indexedValues from KV index.
func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error {
key, distinct, err := c.GenIndexKey(sc, indexedValues, h, nil)
Expand All @@ -312,10 +307,8 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed
}
}
if len(tempKey) > 0 {
val := make([]byte, 0, len(DeleteMarkerUnique)+1)
val = append(val, DeleteMarkerUnique...)
val = append(val, tempKeyVer)
err = txn.GetMemBuffer().Set(tempKey, val)
tempVal := tablecodec.EncodeTempIndexValueDeletedUnique(h, tempKeyVer)
err = txn.GetMemBuffer().Set(tempKey, tempVal)
if err != nil {
return err
}
Expand All @@ -328,10 +321,8 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed
}
}
if len(tempKey) > 0 {
val := make([]byte, 0, len(DeleteMarker)+1)
val = append(val, DeleteMarker...)
val = append(val, tempKeyVer)
err = txn.GetMemBuffer().Set(tempKey, val)
tempVal := tablecodec.EncodeTempIndexValueDeleted(tempKeyVer)
err = txn.GetMemBuffer().Set(tempKey, tempVal)
if err != nil {
return err
}
Expand Down Expand Up @@ -508,11 +499,12 @@ func TryAppendCommonHandleRowcodecColInfos(colInfo []rowcodec.ColInfo, tblInfo *
return colInfo
}

type tempIndexKeyState byte
// TempIndexKeyState is the state of the temporary index key.
type TempIndexKeyState byte

const (
// KeyInTempIndexUnknown whether the key exists or not in temp index is unknown.
KeyInTempIndexUnknown tempIndexKeyState = iota
KeyInTempIndexUnknown TempIndexKeyState = iota
// KeyInTempIndexNotExist the key is not exist in temp index.
KeyInTempIndexNotExist
// KeyInTempIndexIsDeleted the key is marked deleted in temp index.
Expand All @@ -524,7 +516,7 @@ const (
)

// KeyExistInTempIndex is used to check the unique key exist status in temp index.
func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (tempIndexKeyState, kv.Handle, error) {
func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (TempIndexKeyState, kv.Handle, error) {
// Only check temp index key.
if !tablecodec.IsTempIndexKey(key) {
return KeyInTempIndexUnknown, nil, nil
Expand All @@ -541,24 +533,16 @@ func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, di
if len(value) < 1 {
return KeyInTempIndexUnknown, nil, errors.New("temp index value length should great than 1")
}
length := len(value)
// Firstly, we will remove the last byte of key version.
// It should be TempIndexKeyTypeBackfill or TempIndexKeyTypeMerge.
value = value[:length-1]
if distinct {
if bytes.Equal(value, DeleteMarkerUnique) {
return KeyInTempIndexIsDeleted, nil, nil
}
} else {
if bytes.Equal(value, DeleteMarker) {
return KeyInTempIndexIsDeleted, nil, nil
}

if tablecodec.CheckTempIndexValueIsDelete(value) {
return KeyInTempIndexIsDeleted, nil, nil
}

// Check if handle equal.
var handle kv.Handle
if distinct {
handle, err = tablecodec.DecodeHandleInUniqueIndexValue(value, IsCommonHandle)
originVal := tablecodec.DecodeTempIndexOriginValue(value)
handle, err = tablecodec.DecodeHandleInUniqueIndexValue(originVal, IsCommonHandle)
if err != nil {
return KeyInTempIndexUnknown, nil, err
}
Expand Down
Loading

0 comments on commit c26a6b5

Please sign in to comment.