Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

table: refactor options for `Table.AddRecord/Table.UpdateRecord/IndexCreate #55125

Merged
merged 1 commit into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func checkAddWriteOnly(ctx sessionctx.Context, deleteOnlyTable, writeOnlyTable t
if err != nil {
return errors.Trace(err)
}
err = writeOnlyTable.UpdateRecord(context.Background(), ctx.GetTableCtx(), h, types.MakeDatums(1, 2, 3), types.MakeDatums(2, 2, 3), touchedSlice(writeOnlyTable))
err = writeOnlyTable.UpdateRecord(ctx.GetTableCtx(), h, types.MakeDatums(1, 2, 3), types.MakeDatums(2, 2, 3), touchedSlice(writeOnlyTable))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -317,7 +317,7 @@ func checkAddPublic(sctx sessionctx.Context, writeOnlyTable, publicTable table.T
return errors.Errorf("%v", oldRow)
}
newRow := types.MakeDatums(3, 4, oldRow[2].GetValue())
err = writeOnlyTable.UpdateRecord(context.Background(), sctx.GetTableCtx(), h, oldRow, newRow, touchedSlice(writeOnlyTable))
err = writeOnlyTable.UpdateRecord(sctx.GetTableCtx(), h, oldRow, newRow, touchedSlice(writeOnlyTable))
if err != nil {
return errors.Trace(err)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ddl/index_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func checkAddWriteOnlyForAddIndex(ctx sessionctx.Context, delOnlyTbl, writeOnlyT
}

// WriteOnlyTable: update t set c2 = 1 where c1 = 4 and c2 = 4
err = writeOnlyTbl.UpdateRecord(context.Background(), ctx.GetTableCtx(), kv.IntHandle(4), types.MakeDatums(4, 4), types.MakeDatums(4, 1), touchedSlice(writeOnlyTbl))
err = writeOnlyTbl.UpdateRecord(ctx.GetTableCtx(), kv.IntHandle(4), types.MakeDatums(4, 4), types.MakeDatums(4, 1), touchedSlice(writeOnlyTbl))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -183,7 +183,7 @@ func checkAddWriteOnlyForAddIndex(ctx sessionctx.Context, delOnlyTbl, writeOnlyT
}

// DeleteOnlyTable: update t set c2 = 3 where c1 = 4 and c2 = 1
err = delOnlyTbl.UpdateRecord(context.Background(), ctx.GetTableCtx(), kv.IntHandle(4), types.MakeDatums(4, 1), types.MakeDatums(4, 3), touchedSlice(writeOnlyTbl))
err = delOnlyTbl.UpdateRecord(ctx.GetTableCtx(), kv.IntHandle(4), types.MakeDatums(4, 1), types.MakeDatums(4, 3), touchedSlice(writeOnlyTbl))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -250,7 +250,7 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table
}

// WriteOnlyTable: update t set c2 = 5 where c1 = 7 and c2 = 7
err = writeTbl.UpdateRecord(context.Background(), ctx.GetTableCtx(), kv.IntHandle(7), types.MakeDatums(7, 7), types.MakeDatums(7, 5), touchedSlice(writeTbl))
err = writeTbl.UpdateRecord(ctx.GetTableCtx(), kv.IntHandle(7), types.MakeDatums(7, 7), types.MakeDatums(7, 5), touchedSlice(writeTbl))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -328,7 +328,7 @@ func checkDropWriteOnly(ctx sessionctx.Context, publicTbl, writeTbl table.Table)
}

// WriteOnlyTable update t set c2 = 7 where c1 = 8 and c2 = 8
err = writeTbl.UpdateRecord(context.Background(), ctx.GetTableCtx(), kv.IntHandle(8), types.MakeDatums(8, 8), types.MakeDatums(8, 7), touchedSlice(writeTbl))
err = writeTbl.UpdateRecord(ctx.GetTableCtx(), kv.IntHandle(8), types.MakeDatums(8, 8), types.MakeDatums(8, 7), touchedSlice(writeTbl))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -383,7 +383,7 @@ func checkDropDeleteOnly(ctx sessionctx.Context, writeTbl, delTbl table.Table) e
}

// DeleteOnlyTable update t set c2 = 10 where c1 = 9
err = delTbl.UpdateRecord(context.Background(), ctx.GetTableCtx(), kv.IntHandle(9), types.MakeDatums(9, 9), types.MakeDatums(9, 10), touchedSlice(delTbl))
err = delTbl.UpdateRecord(ctx.GetTableCtx(), kv.IntHandle(9), types.MakeDatums(9, 9), types.MakeDatums(9, 10), touchedSlice(delTbl))
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func updateRecord(
}
} else {
// Update record to new value and update index.
if err := t.UpdateRecord(ctx, sctx.GetTableCtx(), h, oldData, newData, modified); err != nil {
if err := t.UpdateRecord(sctx.GetTableCtx(), h, oldData, newData, modified, table.WithCtx(ctx)); err != nil {
if terr, ok := errors.Cause(err).(*terror.Error); ok && (terr.Code() == errno.ErrNoPartitionForGivenValue || terr.Code() == errno.ErrRowDoesNotMatchGivenPartitionSet) {
ec := sctx.GetSessionVars().StmtCtx.ErrCtx()
return false, ec.HandleError(err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -2436,7 +2436,7 @@ func (it *infoschemaTable) RemoveRecord(ctx table.MutateContext, h kv.Handle, r
}

// UpdateRecord implements table.Table UpdateRecord interface.
func (it *infoschemaTable) UpdateRecord(gctx context.Context, ctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool) error {
func (it *infoschemaTable) UpdateRecord(ctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool, opts ...table.UpdateRecordOption) error {
return table.ErrUnsupportedOp
}

Expand Down Expand Up @@ -2529,7 +2529,7 @@ func (vt *VirtualTable) RemoveRecord(ctx table.MutateContext, h kv.Handle, r []t
}

// UpdateRecord implements table.Table UpdateRecord interface.
func (vt *VirtualTable) UpdateRecord(ctx context.Context, sctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool) error {
func (vt *VirtualTable) UpdateRecord(ctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool, opts ...table.UpdateRecordOption) error {
return table.ErrUnsupportedOp
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/table/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ go_test(
embed = [":table"],
flaky = True,
race = "on",
shard_count = 9,
shard_count = 10,
deps = [
"//pkg/errctx",
"//pkg/errno",
Expand Down
38 changes: 21 additions & 17 deletions pkg/table/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package table

import (
"context"
"time"

"github.com/pingcap/tidb/pkg/errctx"
Expand All @@ -32,51 +31,56 @@ type IndexIterator interface {

// CreateIdxOpt contains the options will be used when creating an index.
type CreateIdxOpt struct {
Ctx context.Context
Untouched bool // If true, the index key/value is no need to commit.
commonMutateOpt
IgnoreAssertion bool
FromBackFill bool
}

// NewCreateIdxOpt creates a new CreateIdxOpt.
func NewCreateIdxOpt(opts ...CreateIdxOption) *CreateIdxOpt {
opt := &CreateIdxOpt{}
for _, o := range opts {
o.ApplyCreateIdxOpt(opt)
}
return opt
}

// CreateIdxOption is defined for the Create() method of the Index interface.
type CreateIdxOption interface {
ApplyCreateIdxOpt(*CreateIdxOpt)
}

// CreateIdxOptFunc is defined for the Create() method of Index interface.
// Here is a blog post about how to use this pattern:
// https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis
type CreateIdxOptFunc func(*CreateIdxOpt)

// IndexIsUntouched uses to indicate the index kv is untouched.
var IndexIsUntouched CreateIdxOptFunc = func(opt *CreateIdxOpt) {
opt.Untouched = true
// ApplyCreateIdxOpt implements the CreateIdxOption interface.
func (f CreateIdxOptFunc) ApplyCreateIdxOpt(opt *CreateIdxOpt) {
f(opt)
}

// WithIgnoreAssertion uses to indicate the process can ignore assertion.
var WithIgnoreAssertion = func(opt *CreateIdxOpt) {
var WithIgnoreAssertion CreateIdxOptFunc = func(opt *CreateIdxOpt) {
opt.IgnoreAssertion = true
}

// FromBackfill indicates that the index is created by DDL backfill worker.
// In the backfill-merge process, the index KVs from DML will be redirected to
// the temp index. On the other hand, the index KVs from DDL backfill worker should
// never be redirected to the temp index.
var FromBackfill = func(opt *CreateIdxOpt) {
var FromBackfill CreateIdxOptFunc = func(opt *CreateIdxOpt) {
opt.FromBackFill = true
}

// WithCtx returns a CreateIdxFunc.
// This option is used to pass context.Context.
func WithCtx(ctx context.Context) CreateIdxOptFunc {
return func(opt *CreateIdxOpt) {
opt.Ctx = ctx
}
}

// Index is the interface for index data on KV store.
type Index interface {
// Meta returns IndexInfo.
Meta() *model.IndexInfo
// TableMeta returns TableInfo
TableMeta() *model.TableInfo
// Create supports insert into statement.
Create(ctx MutateContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...CreateIdxOptFunc) (kv.Handle, error)
Create(ctx MutateContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...CreateIdxOption) (kv.Handle, error)
// Delete supports delete from statement.
Delete(ctx MutateContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error
// GenIndexKVIter generate index key and value for multi-valued index, use iterator to reduce the memory allocation.
Expand Down
92 changes: 80 additions & 12 deletions pkg/table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,38 +119,106 @@ var (
// RecordIterFunc is used for low-level record iteration.
type RecordIterFunc func(h kv.Handle, rec []types.Datum, cols []*Column) (more bool, err error)

// commonMutateOpt is the common options for mutating a table.
type commonMutateOpt struct {
Ctx context.Context
}

// AddRecordOpt contains the options will be used when adding a record.
type AddRecordOpt struct {
CreateIdxOpt
commonMutateOpt
IsUpdate bool
ReserveAutoID int
}

// NewAddRecordOpt creates a new AddRecordOpt with options.
func NewAddRecordOpt(opts ...AddRecordOption) *AddRecordOpt {
opt := &AddRecordOpt{}
for _, o := range opts {
o.ApplyAddRecordOpt(opt)
}
return opt
}

// GetCreateIdxOpt creates a CreateIdxOpt.
func (opt *AddRecordOpt) GetCreateIdxOpt() *CreateIdxOpt {
return &CreateIdxOpt{commonMutateOpt: opt.commonMutateOpt}
}

// AddRecordOption is defined for the AddRecord() method of the Table interface.
type AddRecordOption interface {
ApplyOn(*AddRecordOpt)
ApplyAddRecordOpt(*AddRecordOpt)
}

// UpdateRecordOpt contains the options will be used when updating a record.
type UpdateRecordOpt struct {
commonMutateOpt
}

// NewUpdateRecordOpt creates a new UpdateRecordOpt with options.
func NewUpdateRecordOpt(opts ...UpdateRecordOption) *UpdateRecordOpt {
opt := &UpdateRecordOpt{}
for _, o := range opts {
o.ApplyUpdateRecordOpt(opt)
}
return opt
}

// GetAddRecordOpt creates a AddRecordOpt.
func (opt *UpdateRecordOpt) GetAddRecordOpt() *AddRecordOpt {
return &AddRecordOpt{commonMutateOpt: opt.commonMutateOpt}
}

// GetCreateIdxOpt creates a CreateIdxOpt.
func (opt *UpdateRecordOpt) GetCreateIdxOpt() *CreateIdxOpt {
return &CreateIdxOpt{commonMutateOpt: opt.commonMutateOpt}
}

// UpdateRecordOption is defined for the UpdateRecord() method of the Table interface.
type UpdateRecordOption interface {
ApplyUpdateRecordOpt(*UpdateRecordOpt)
}

// CommonMutateOptFunc is a function to provide common options for mutating a table.
type CommonMutateOptFunc func(*commonMutateOpt)

// ApplyAddRecordOpt implements the AddRecordOption interface.
func (f CommonMutateOptFunc) ApplyAddRecordOpt(opt *AddRecordOpt) {
f(&opt.commonMutateOpt)
}

// ApplyUpdateRecordOpt implements the UpdateRecordOption interface.
func (f CommonMutateOptFunc) ApplyUpdateRecordOpt(opt *UpdateRecordOpt) {
f(&opt.commonMutateOpt)
}

// ApplyCreateIdxOpt implements the CreateIdxOption interface.
func (f CommonMutateOptFunc) ApplyCreateIdxOpt(opt *CreateIdxOpt) {
f(&opt.commonMutateOpt)
}

// WithCtx returns a CommonMutateOptFunc.
// This option is used to pass context.Context.
func WithCtx(ctx context.Context) CommonMutateOptFunc {
return func(opt *commonMutateOpt) {
opt.Ctx = ctx
}
}

// WithReserveAutoIDHint tells the AddRecord operation to reserve a batch of auto ID in the stmtctx.
type WithReserveAutoIDHint int

// ApplyOn implements the AddRecordOption interface.
func (n WithReserveAutoIDHint) ApplyOn(opt *AddRecordOpt) {
// ApplyAddRecordOpt implements the AddRecordOption interface.
func (n WithReserveAutoIDHint) ApplyAddRecordOpt(opt *AddRecordOpt) {
opt.ReserveAutoID = int(n)
}

// ApplyOn implements the AddRecordOption interface, so any CreateIdxOptFunc
// can be passed as the optional argument to the table.AddRecord method.
func (f CreateIdxOptFunc) ApplyOn(opt *AddRecordOpt) {
f(&opt.CreateIdxOpt)
}

// IsUpdate is a defined value for AddRecordOptFunc.
var IsUpdate AddRecordOption = isUpdate{}

type isUpdate struct{}

func (i isUpdate) ApplyOn(opt *AddRecordOpt) {
func (i isUpdate) ApplyAddRecordOpt(opt *AddRecordOpt) {
opt.IsUpdate = true
}

Expand Down Expand Up @@ -202,7 +270,7 @@ type Table interface {
AddRecord(ctx MutateContext, r []types.Datum, opts ...AddRecordOption) (recordID kv.Handle, err error)

// UpdateRecord updates a row which should contain only writable columns.
UpdateRecord(gctx context.Context, ctx MutateContext, h kv.Handle, currData, newData []types.Datum, touched []bool) error
UpdateRecord(ctx MutateContext, h kv.Handle, currData, newData []types.Datum, touched []bool, opts ...UpdateRecordOption) error

// RemoveRecord removes a row in the table.
RemoveRecord(ctx MutateContext, h kv.Handle, r []types.Datum) error
Expand Down
37 changes: 37 additions & 0 deletions pkg/table/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package table

import (
"context"
"testing"

mysql "github.com/pingcap/tidb/pkg/errno"
Expand All @@ -41,3 +42,39 @@ func TestErrorCode(t *testing.T) {
require.Equal(t, mysql.ErrNoPartitionForGivenValue, int(terror.ToSQLError(ErrNoPartitionForGivenValue).Code))
require.Equal(t, mysql.ErrLockOrActiveTransaction, int(terror.ToSQLError(ErrLockOrActiveTransaction).Code))
}

func TestOptions(t *testing.T) {
ctx := context.WithValue(context.Background(), "test", "test")
// NewAddRecordOpt without option
addOpt := NewAddRecordOpt()
require.Equal(t, AddRecordOpt{}, *addOpt)
require.Equal(t, CreateIdxOpt{}, *(addOpt.GetCreateIdxOpt()))
// NewAddRecordOpt with options
addOpt = NewAddRecordOpt(WithCtx(ctx), IsUpdate, WithReserveAutoIDHint(12))
require.Equal(t, AddRecordOpt{
commonMutateOpt: commonMutateOpt{Ctx: ctx},
IsUpdate: true,
ReserveAutoID: 12,
}, *addOpt)
require.Equal(t, CreateIdxOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *(addOpt.GetCreateIdxOpt()))
// NewUpdateRecordOpt without option
updateOpt := NewUpdateRecordOpt()
require.Equal(t, UpdateRecordOpt{}, *updateOpt)
require.Equal(t, AddRecordOpt{}, *(updateOpt.GetAddRecordOpt()))
require.Equal(t, CreateIdxOpt{}, *(updateOpt.GetCreateIdxOpt()))
// NewUpdateRecordOpt with options
updateOpt = NewUpdateRecordOpt(WithCtx(ctx))
require.Equal(t, UpdateRecordOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *updateOpt)
require.Equal(t, AddRecordOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *(updateOpt.GetAddRecordOpt()))
require.Equal(t, CreateIdxOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *(updateOpt.GetCreateIdxOpt()))
// NewCreateIdxOpt without option
createIdxOpt := NewCreateIdxOpt()
require.Equal(t, CreateIdxOpt{}, *createIdxOpt)
// NewCreateIdxOpt with options
createIdxOpt = NewCreateIdxOpt(WithCtx(ctx), WithIgnoreAssertion, FromBackfill)
require.Equal(t, CreateIdxOpt{
commonMutateOpt: commonMutateOpt{Ctx: ctx},
IgnoreAssertion: true,
FromBackFill: true,
}, *createIdxOpt)
}
1 change: 1 addition & 0 deletions pkg/table/tables/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"//pkg/util/dbterror",
"//pkg/util/generatedexpr",
"//pkg/util/hack",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/ranger",
"//pkg/util/rowcodec",
Expand Down
3 changes: 2 additions & 1 deletion pkg/table/tables/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/types"
_ "github.com/pingcap/tidb/pkg/util/context"
Expand Down Expand Up @@ -198,7 +199,7 @@ func BenchmarkUpdateRecordInPipelinedDML(b *testing.B) {
for j := 0; j < batchSize; j++ {
// Update record
handle := kv.IntHandle(j)
err := tb.UpdateRecord(context.TODO(), se.GetTableCtx(), handle, records[j], newData[j], touched)
err := tb.UpdateRecord(se.GetTableCtx(), handle, records[j], newData[j], touched, table.WithCtx(context.TODO()))
if err != nil {
b.Fatal(err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/table/tables/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,13 @@ func txnCtxAddCachedTable(sctx table.MutateContext, tid int64, handle *cachedTab
}

// UpdateRecord implements table.Table
func (c *cachedTable) UpdateRecord(ctx context.Context, sctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool) error {
func (c *cachedTable) UpdateRecord(ctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool, opts ...table.UpdateRecordOption) error {
// Prevent furthur writing when the table is already too large.
if atomic.LoadInt64(&c.totalSize) > cachedTableSizeLimit {
return table.ErrOptOnCacheTable.GenWithStackByArgs("table too large")
}
txnCtxAddCachedTable(sctx, c.Meta().ID, c)
return c.TableCommon.UpdateRecord(ctx, sctx, h, oldData, newData, touched)
txnCtxAddCachedTable(ctx, c.Meta().ID, c)
return c.TableCommon.UpdateRecord(ctx, h, oldData, newData, touched, opts...)
}

// RemoveRecord implements table.Table RemoveRecord interface.
Expand Down
Loading