Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

table: Add option SkipWriteUntouchedIndices for Table.UpdateRecord #55192

Merged
merged 3 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion pkg/executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,19 @@ func updateRecord(
return updated, err
}
} else {
var opts []table.UpdateRecordOption
if sctx.GetSessionVars().InTxn() || sc.InHandleForeignKeyTrigger || sc.ForeignKeyTriggerCtx.HasFKCascades {
// If txn is auto commit and index is untouched, no need to write index value.
// If InHandleForeignKeyTrigger or ForeignKeyTriggerCtx.HasFKCascades is true indicate we may have
// foreign key cascade need to handle later, then we still need to write index value,
// otherwise, the later foreign cascade executor may see data-index inconsistency in txn-mem-buffer.
opts = []table.UpdateRecordOption{table.WithCtx(ctx)}
} else {
opts = []table.UpdateRecordOption{table.WithCtx(ctx), table.SkipWriteUntouchedIndices}
}

// Update record to new value and update index.
if err := t.UpdateRecord(sctx.GetTableCtx(), h, oldData, newData, modified, table.WithCtx(ctx)); err != nil {
if err := t.UpdateRecord(sctx.GetTableCtx(), h, oldData, newData, modified, opts...); err != nil {
if terr, ok := errors.Cause(err).(*terror.Error); ok && (terr.Code() == errno.ErrNoPartitionForGivenValue || terr.Code() == errno.ErrRowDoesNotMatchGivenPartitionSet) {
ec := sctx.GetSessionVars().StmtCtx.ErrCtx()
return false, ec.HandleError(err)
Expand Down
19 changes: 19 additions & 0 deletions pkg/table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ type AddRecordOption interface {
// UpdateRecordOpt contains the options will be used when updating a record.
type UpdateRecordOpt struct {
commonMutateOpt
// SkipWriteUntouchedIndices is an option to skip write untouched indices when updating a record.
SkipWriteUntouchedIndices bool
}

// NewUpdateRecordOpt creates a new UpdateRecordOpt with options.
Expand Down Expand Up @@ -223,6 +225,23 @@ func (i isUpdate) ApplyAddRecordOpt(opt *AddRecordOpt) {
opt.IsUpdate = true
}

// skipWriteUntouchedIndices implements UpdateRecordOption.
type skipWriteUntouchedIndices struct{}

func (skipWriteUntouchedIndices) ApplyUpdateRecordOpt(opt *UpdateRecordOpt) {
opt.SkipWriteUntouchedIndices = true
}

// SkipWriteUntouchedIndices is an option to skip write untouched options when updating a record.
// If there are no later queries in the transaction that need to read the untouched indices,
// you can use this option to improve performance.
// However, it is not safe to use it in an explicit txn or the updated table has some foreign key constraints.
// Because the following read operations in the same txn may not get the correct data with the current implementation.
// See:
// - https://github.com/pingcap/tidb/pull/12609
// - https://github.com/pingcap/tidb/issues/39419
var SkipWriteUntouchedIndices UpdateRecordOption = skipWriteUntouchedIndices{}

// DupKeyCheckMode indicates how to check the duplicated key when adding/updating a record/index.
type DupKeyCheckMode uint8

Expand Down
2 changes: 1 addition & 1 deletion pkg/table/tables/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ go_test(
],
embed = [":tables"],
flaky = True,
shard_count = 32,
shard_count = 33,
deps = [
"//pkg/ddl",
"//pkg/domain",
Expand Down
19 changes: 9 additions & 10 deletions pkg/table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func (t *TableCommon) updateRecord(sctx table.MutateContext, h kv.Handle, oldDat
}
}
// rebuild index
err = t.rebuildIndices(sctx, txn, h, touched, oldData, newData, opt.GetCreateIdxOpt())
err = t.rebuildUpdateRecordIndices(sctx, txn, h, touched, oldData, newData, opt)
if err != nil {
return err
}
Expand Down Expand Up @@ -607,7 +607,11 @@ func (t *TableCommon) updateRecord(sctx table.MutateContext, h kv.Handle, oldDat
return nil
}

func (t *TableCommon) rebuildIndices(ctx table.MutateContext, txn kv.Transaction, h kv.Handle, touched []bool, oldData []types.Datum, newData []types.Datum, opt *table.CreateIdxOpt) error {
func (t *TableCommon) rebuildUpdateRecordIndices(
ctx table.MutateContext, txn kv.Transaction,
h kv.Handle, touched []bool, oldData []types.Datum, newData []types.Datum,
opt *table.UpdateRecordOpt,
) error {
for _, idx := range t.deletableIndices() {
if t.meta.IsCommonHandle && idx.Meta().Primary {
continue
Expand All @@ -626,6 +630,7 @@ func (t *TableCommon) rebuildIndices(ctx table.MutateContext, txn kv.Transaction
break
}
}
createIdxOpt := opt.GetCreateIdxOpt()
for _, idx := range t.Indices() {
if !IsIndexWritable(idx) {
continue
Expand All @@ -641,20 +646,14 @@ func (t *TableCommon) rebuildIndices(ctx table.MutateContext, txn kv.Transaction
untouched = false
break
}
// If txn is auto commit and index is untouched, no need to write index value.
// If InHandleForeignKeyTrigger or ForeignKeyTriggerCtx.HasFKCascades is true indicate we may have
// foreign key cascade need to handle later, then we still need to write index value,
// otherwise, the later foreign cascade executor may see data-index inconsistency in txn-mem-buffer.
sessVars := ctx.GetSessionVars()
if untouched && !sessVars.InTxn() &&
!sessVars.StmtCtx.InHandleForeignKeyTrigger && !sessVars.StmtCtx.ForeignKeyTriggerCtx.HasFKCascades {
if untouched && opt.SkipWriteUntouchedIndices {
continue
}
newVs, err := idx.FetchValues(newData, nil)
if err != nil {
return err
}
if err := t.buildIndexForRow(ctx, h, newVs, newData, asIndex(idx), txn, untouched, opt); err != nil {
if err := t.buildIndexForRow(ctx, h, newVs, newData, asIndex(idx), txn, untouched, createIdxOpt); err != nil {
return err
}
}
Expand Down
65 changes: 65 additions & 0 deletions pkg/table/tables/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,71 @@ func TestTxnAssertion(t *testing.T) {
testUntouchedIndexImpl("OFF", true)
}

func TestSkipWriteUntouchedIndices(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("CREATE TABLE t (a int primary key, b int, c int, key idx_b(b), key idx_c(c))")
tk.MustExec("insert into t values(1, 2, 3)")
tk.MustExec("insert into t values(4, 5, 6)")
defer tk.MustExec("rollback")

tbl, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
ctx := tk.Session().GetTableCtx()

for _, c := range []struct {
opts []table.UpdateRecordOption
isSkip bool
}{
{
opts: nil, // by default, should not skip untouched indices
isSkip: false,
},
{
opts: []table.UpdateRecordOption{table.SkipWriteUntouchedIndices},
isSkip: true,
},
} {
tk.MustExec("rollback")
tk.MustExec("begin")
txn, err := tk.Session().Txn(true)
require.NoError(t, err)
memBuffer := txn.GetMemBuffer()
oldLen := memBuffer.Len()
h := kv.IntHandle(1)
require.NoError(t, tbl.UpdateRecord(ctx, h, types.MakeDatums(1, 2, 3), types.MakeDatums(1, 12, 3), []bool{false, true, false}, c.opts...))
newLen := memBuffer.Len()
if c.isSkip {
// 1 row overridden. 1 index deleted and re-added.
require.Equal(t, oldLen+3, newLen)
} else {
// 1 row overridden. 1 index deleted and re-added, 1 index rewritten even if unchanged.
require.Equal(t, oldLen+4, newLen)
}

checkIndexWrittenInMemBuf := func(idx int, val types.Datum, exists bool, isDel bool) {
ec := errctx.StrictNoWarningContext
key, distinct, err := tbl.Indices()[idx].GenIndexKey(ec, time.UTC, []types.Datum{val}, h, nil)
require.NoError(t, err)
require.False(t, distinct)
indexVal, err := memBuffer.Get(context.TODO(), key)
if !exists {
require.True(t, kv.ErrNotExist.Equal(err))
return
}
require.NoError(t, err)
if isDel {
require.Equal(t, []byte{}, indexVal)
}
}

checkIndexWrittenInMemBuf(0, types.NewIntDatum(2), true, true)
checkIndexWrittenInMemBuf(0, types.NewIntDatum(12), true, false)
checkIndexWrittenInMemBuf(1, types.NewIntDatum(3), !c.isSkip, false)
}
}

func TestDupKeyCheckMode(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down