diff --git a/pkg/ddl/column_change_test.go b/pkg/ddl/column_change_test.go index b147cbbf9dd34..9a4779058a52f 100644 --- a/pkg/ddl/column_change_test.go +++ b/pkg/ddl/column_change_test.go @@ -253,7 +253,7 @@ func checkAddWriteOnly(ctx sessionctx.Context, deleteOnlyTable, writeOnlyTable t if err != nil { return errors.Trace(err) } - err = writeOnlyTable.UpdateRecord(context.Background(), ctx.GetTableCtx(), h, types.MakeDatums(1, 2, 3), types.MakeDatums(2, 2, 3), touchedSlice(writeOnlyTable)) + err = writeOnlyTable.UpdateRecord(ctx.GetTableCtx(), h, types.MakeDatums(1, 2, 3), types.MakeDatums(2, 2, 3), touchedSlice(writeOnlyTable)) if err != nil { return errors.Trace(err) } @@ -317,7 +317,7 @@ func checkAddPublic(sctx sessionctx.Context, writeOnlyTable, publicTable table.T return errors.Errorf("%v", oldRow) } newRow := types.MakeDatums(3, 4, oldRow[2].GetValue()) - err = writeOnlyTable.UpdateRecord(context.Background(), sctx.GetTableCtx(), h, oldRow, newRow, touchedSlice(writeOnlyTable)) + err = writeOnlyTable.UpdateRecord(sctx.GetTableCtx(), h, oldRow, newRow, touchedSlice(writeOnlyTable)) if err != nil { return errors.Trace(err) } diff --git a/pkg/ddl/index_change_test.go b/pkg/ddl/index_change_test.go index 2d8f262462ce4..34ad661cecccf 100644 --- a/pkg/ddl/index_change_test.go +++ b/pkg/ddl/index_change_test.go @@ -173,7 +173,7 @@ func checkAddWriteOnlyForAddIndex(ctx sessionctx.Context, delOnlyTbl, writeOnlyT } // WriteOnlyTable: update t set c2 = 1 where c1 = 4 and c2 = 4 - err = writeOnlyTbl.UpdateRecord(context.Background(), ctx.GetTableCtx(), kv.IntHandle(4), types.MakeDatums(4, 4), types.MakeDatums(4, 1), touchedSlice(writeOnlyTbl)) + err = writeOnlyTbl.UpdateRecord(ctx.GetTableCtx(), kv.IntHandle(4), types.MakeDatums(4, 4), types.MakeDatums(4, 1), touchedSlice(writeOnlyTbl)) if err != nil { return errors.Trace(err) } @@ -183,7 +183,7 @@ func checkAddWriteOnlyForAddIndex(ctx sessionctx.Context, delOnlyTbl, writeOnlyT } // DeleteOnlyTable: update t set c2 = 3 where c1 = 4 and c2 = 1 - err = delOnlyTbl.UpdateRecord(context.Background(), ctx.GetTableCtx(), kv.IntHandle(4), types.MakeDatums(4, 1), types.MakeDatums(4, 3), touchedSlice(writeOnlyTbl)) + err = delOnlyTbl.UpdateRecord(ctx.GetTableCtx(), kv.IntHandle(4), types.MakeDatums(4, 1), types.MakeDatums(4, 3), touchedSlice(writeOnlyTbl)) if err != nil { return errors.Trace(err) } @@ -250,7 +250,7 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table } // WriteOnlyTable: update t set c2 = 5 where c1 = 7 and c2 = 7 - err = writeTbl.UpdateRecord(context.Background(), ctx.GetTableCtx(), kv.IntHandle(7), types.MakeDatums(7, 7), types.MakeDatums(7, 5), touchedSlice(writeTbl)) + err = writeTbl.UpdateRecord(ctx.GetTableCtx(), kv.IntHandle(7), types.MakeDatums(7, 7), types.MakeDatums(7, 5), touchedSlice(writeTbl)) if err != nil { return errors.Trace(err) } @@ -328,7 +328,7 @@ func checkDropWriteOnly(ctx sessionctx.Context, publicTbl, writeTbl table.Table) } // WriteOnlyTable update t set c2 = 7 where c1 = 8 and c2 = 8 - err = writeTbl.UpdateRecord(context.Background(), ctx.GetTableCtx(), kv.IntHandle(8), types.MakeDatums(8, 8), types.MakeDatums(8, 7), touchedSlice(writeTbl)) + err = writeTbl.UpdateRecord(ctx.GetTableCtx(), kv.IntHandle(8), types.MakeDatums(8, 8), types.MakeDatums(8, 7), touchedSlice(writeTbl)) if err != nil { return errors.Trace(err) } @@ -383,7 +383,7 @@ func checkDropDeleteOnly(ctx sessionctx.Context, writeTbl, delTbl table.Table) e } // DeleteOnlyTable update t set c2 = 10 where c1 = 9 - err = delTbl.UpdateRecord(context.Background(), ctx.GetTableCtx(), kv.IntHandle(9), types.MakeDatums(9, 9), types.MakeDatums(9, 10), touchedSlice(delTbl)) + err = delTbl.UpdateRecord(ctx.GetTableCtx(), kv.IntHandle(9), types.MakeDatums(9, 9), types.MakeDatums(9, 10), touchedSlice(delTbl)) if err != nil { return errors.Trace(err) } diff --git a/pkg/executor/write.go b/pkg/executor/write.go index 98d8518bd4215..aa71e48bdab51 100644 --- a/pkg/executor/write.go +++ b/pkg/executor/write.go @@ -194,7 +194,7 @@ func updateRecord( } } else { // Update record to new value and update index. - if err := t.UpdateRecord(ctx, sctx.GetTableCtx(), h, oldData, newData, modified); err != nil { + if err := t.UpdateRecord(sctx.GetTableCtx(), h, oldData, newData, modified, table.WithCtx(ctx)); err != nil { if terr, ok := errors.Cause(err).(*terror.Error); ok && (terr.Code() == errno.ErrNoPartitionForGivenValue || terr.Code() == errno.ErrRowDoesNotMatchGivenPartitionSet) { ec := sctx.GetSessionVars().StmtCtx.ErrCtx() return false, ec.HandleError(err) diff --git a/pkg/infoschema/tables.go b/pkg/infoschema/tables.go index 89e5f088fac40..144c0ae9cc2a4 100644 --- a/pkg/infoschema/tables.go +++ b/pkg/infoschema/tables.go @@ -2436,7 +2436,7 @@ func (it *infoschemaTable) RemoveRecord(ctx table.MutateContext, h kv.Handle, r } // UpdateRecord implements table.Table UpdateRecord interface. -func (it *infoschemaTable) UpdateRecord(gctx context.Context, ctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { +func (it *infoschemaTable) UpdateRecord(ctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool, opts ...table.UpdateRecordOption) error { return table.ErrUnsupportedOp } @@ -2529,7 +2529,7 @@ func (vt *VirtualTable) RemoveRecord(ctx table.MutateContext, h kv.Handle, r []t } // UpdateRecord implements table.Table UpdateRecord interface. -func (vt *VirtualTable) UpdateRecord(ctx context.Context, sctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { +func (vt *VirtualTable) UpdateRecord(ctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool, opts ...table.UpdateRecordOption) error { return table.ErrUnsupportedOp } diff --git a/pkg/table/BUILD.bazel b/pkg/table/BUILD.bazel index 33b766c024e52..90d494dd7ad75 100644 --- a/pkg/table/BUILD.bazel +++ b/pkg/table/BUILD.bazel @@ -53,7 +53,7 @@ go_test( embed = [":table"], flaky = True, race = "on", - shard_count = 9, + shard_count = 10, deps = [ "//pkg/errctx", "//pkg/errno", diff --git a/pkg/table/index.go b/pkg/table/index.go index d451a22db1eca..9c523f7bf9606 100644 --- a/pkg/table/index.go +++ b/pkg/table/index.go @@ -15,7 +15,6 @@ package table import ( - "context" "time" "github.com/pingcap/tidb/pkg/errctx" @@ -32,24 +31,37 @@ type IndexIterator interface { // CreateIdxOpt contains the options will be used when creating an index. type CreateIdxOpt struct { - Ctx context.Context - Untouched bool // If true, the index key/value is no need to commit. + commonMutateOpt IgnoreAssertion bool FromBackFill bool } +// NewCreateIdxOpt creates a new CreateIdxOpt. +func NewCreateIdxOpt(opts ...CreateIdxOption) *CreateIdxOpt { + opt := &CreateIdxOpt{} + for _, o := range opts { + o.ApplyCreateIdxOpt(opt) + } + return opt +} + +// CreateIdxOption is defined for the Create() method of the Index interface. +type CreateIdxOption interface { + ApplyCreateIdxOpt(*CreateIdxOpt) +} + // CreateIdxOptFunc is defined for the Create() method of Index interface. // Here is a blog post about how to use this pattern: // https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis type CreateIdxOptFunc func(*CreateIdxOpt) -// IndexIsUntouched uses to indicate the index kv is untouched. -var IndexIsUntouched CreateIdxOptFunc = func(opt *CreateIdxOpt) { - opt.Untouched = true +// ApplyCreateIdxOpt implements the CreateIdxOption interface. +func (f CreateIdxOptFunc) ApplyCreateIdxOpt(opt *CreateIdxOpt) { + f(opt) } // WithIgnoreAssertion uses to indicate the process can ignore assertion. -var WithIgnoreAssertion = func(opt *CreateIdxOpt) { +var WithIgnoreAssertion CreateIdxOptFunc = func(opt *CreateIdxOpt) { opt.IgnoreAssertion = true } @@ -57,18 +69,10 @@ var WithIgnoreAssertion = func(opt *CreateIdxOpt) { // In the backfill-merge process, the index KVs from DML will be redirected to // the temp index. On the other hand, the index KVs from DDL backfill worker should // never be redirected to the temp index. -var FromBackfill = func(opt *CreateIdxOpt) { +var FromBackfill CreateIdxOptFunc = func(opt *CreateIdxOpt) { opt.FromBackFill = true } -// WithCtx returns a CreateIdxFunc. -// This option is used to pass context.Context. -func WithCtx(ctx context.Context) CreateIdxOptFunc { - return func(opt *CreateIdxOpt) { - opt.Ctx = ctx - } -} - // Index is the interface for index data on KV store. type Index interface { // Meta returns IndexInfo. @@ -76,7 +80,7 @@ type Index interface { // TableMeta returns TableInfo TableMeta() *model.TableInfo // Create supports insert into statement. - Create(ctx MutateContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...CreateIdxOptFunc) (kv.Handle, error) + Create(ctx MutateContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...CreateIdxOption) (kv.Handle, error) // Delete supports delete from statement. Delete(ctx MutateContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error // GenIndexKVIter generate index key and value for multi-valued index, use iterator to reduce the memory allocation. diff --git a/pkg/table/table.go b/pkg/table/table.go index 6e4eb24a26c92..c971d751014c9 100644 --- a/pkg/table/table.go +++ b/pkg/table/table.go @@ -119,38 +119,106 @@ var ( // RecordIterFunc is used for low-level record iteration. type RecordIterFunc func(h kv.Handle, rec []types.Datum, cols []*Column) (more bool, err error) +// commonMutateOpt is the common options for mutating a table. +type commonMutateOpt struct { + Ctx context.Context +} + // AddRecordOpt contains the options will be used when adding a record. type AddRecordOpt struct { - CreateIdxOpt + commonMutateOpt IsUpdate bool ReserveAutoID int } +// NewAddRecordOpt creates a new AddRecordOpt with options. +func NewAddRecordOpt(opts ...AddRecordOption) *AddRecordOpt { + opt := &AddRecordOpt{} + for _, o := range opts { + o.ApplyAddRecordOpt(opt) + } + return opt +} + +// GetCreateIdxOpt creates a CreateIdxOpt. +func (opt *AddRecordOpt) GetCreateIdxOpt() *CreateIdxOpt { + return &CreateIdxOpt{commonMutateOpt: opt.commonMutateOpt} +} + // AddRecordOption is defined for the AddRecord() method of the Table interface. type AddRecordOption interface { - ApplyOn(*AddRecordOpt) + ApplyAddRecordOpt(*AddRecordOpt) +} + +// UpdateRecordOpt contains the options will be used when updating a record. +type UpdateRecordOpt struct { + commonMutateOpt +} + +// NewUpdateRecordOpt creates a new UpdateRecordOpt with options. +func NewUpdateRecordOpt(opts ...UpdateRecordOption) *UpdateRecordOpt { + opt := &UpdateRecordOpt{} + for _, o := range opts { + o.ApplyUpdateRecordOpt(opt) + } + return opt +} + +// GetAddRecordOpt creates a AddRecordOpt. +func (opt *UpdateRecordOpt) GetAddRecordOpt() *AddRecordOpt { + return &AddRecordOpt{commonMutateOpt: opt.commonMutateOpt} +} + +// GetCreateIdxOpt creates a CreateIdxOpt. +func (opt *UpdateRecordOpt) GetCreateIdxOpt() *CreateIdxOpt { + return &CreateIdxOpt{commonMutateOpt: opt.commonMutateOpt} +} + +// UpdateRecordOption is defined for the UpdateRecord() method of the Table interface. +type UpdateRecordOption interface { + ApplyUpdateRecordOpt(*UpdateRecordOpt) +} + +// CommonMutateOptFunc is a function to provide common options for mutating a table. +type CommonMutateOptFunc func(*commonMutateOpt) + +// ApplyAddRecordOpt implements the AddRecordOption interface. +func (f CommonMutateOptFunc) ApplyAddRecordOpt(opt *AddRecordOpt) { + f(&opt.commonMutateOpt) +} + +// ApplyUpdateRecordOpt implements the UpdateRecordOption interface. +func (f CommonMutateOptFunc) ApplyUpdateRecordOpt(opt *UpdateRecordOpt) { + f(&opt.commonMutateOpt) +} + +// ApplyCreateIdxOpt implements the CreateIdxOption interface. +func (f CommonMutateOptFunc) ApplyCreateIdxOpt(opt *CreateIdxOpt) { + f(&opt.commonMutateOpt) +} + +// WithCtx returns a CommonMutateOptFunc. +// This option is used to pass context.Context. +func WithCtx(ctx context.Context) CommonMutateOptFunc { + return func(opt *commonMutateOpt) { + opt.Ctx = ctx + } } // WithReserveAutoIDHint tells the AddRecord operation to reserve a batch of auto ID in the stmtctx. type WithReserveAutoIDHint int -// ApplyOn implements the AddRecordOption interface. -func (n WithReserveAutoIDHint) ApplyOn(opt *AddRecordOpt) { +// ApplyAddRecordOpt implements the AddRecordOption interface. +func (n WithReserveAutoIDHint) ApplyAddRecordOpt(opt *AddRecordOpt) { opt.ReserveAutoID = int(n) } -// ApplyOn implements the AddRecordOption interface, so any CreateIdxOptFunc -// can be passed as the optional argument to the table.AddRecord method. -func (f CreateIdxOptFunc) ApplyOn(opt *AddRecordOpt) { - f(&opt.CreateIdxOpt) -} - // IsUpdate is a defined value for AddRecordOptFunc. var IsUpdate AddRecordOption = isUpdate{} type isUpdate struct{} -func (i isUpdate) ApplyOn(opt *AddRecordOpt) { +func (i isUpdate) ApplyAddRecordOpt(opt *AddRecordOpt) { opt.IsUpdate = true } @@ -202,7 +270,7 @@ type Table interface { AddRecord(ctx MutateContext, r []types.Datum, opts ...AddRecordOption) (recordID kv.Handle, err error) // UpdateRecord updates a row which should contain only writable columns. - UpdateRecord(gctx context.Context, ctx MutateContext, h kv.Handle, currData, newData []types.Datum, touched []bool) error + UpdateRecord(ctx MutateContext, h kv.Handle, currData, newData []types.Datum, touched []bool, opts ...UpdateRecordOption) error // RemoveRecord removes a row in the table. RemoveRecord(ctx MutateContext, h kv.Handle, r []types.Datum) error diff --git a/pkg/table/table_test.go b/pkg/table/table_test.go index cfdcdd8e3dafc..a0dc976eb2e2f 100644 --- a/pkg/table/table_test.go +++ b/pkg/table/table_test.go @@ -15,6 +15,7 @@ package table import ( + "context" "testing" mysql "github.com/pingcap/tidb/pkg/errno" @@ -41,3 +42,39 @@ func TestErrorCode(t *testing.T) { require.Equal(t, mysql.ErrNoPartitionForGivenValue, int(terror.ToSQLError(ErrNoPartitionForGivenValue).Code)) require.Equal(t, mysql.ErrLockOrActiveTransaction, int(terror.ToSQLError(ErrLockOrActiveTransaction).Code)) } + +func TestOptions(t *testing.T) { + ctx := context.WithValue(context.Background(), "test", "test") + // NewAddRecordOpt without option + addOpt := NewAddRecordOpt() + require.Equal(t, AddRecordOpt{}, *addOpt) + require.Equal(t, CreateIdxOpt{}, *(addOpt.GetCreateIdxOpt())) + // NewAddRecordOpt with options + addOpt = NewAddRecordOpt(WithCtx(ctx), IsUpdate, WithReserveAutoIDHint(12)) + require.Equal(t, AddRecordOpt{ + commonMutateOpt: commonMutateOpt{Ctx: ctx}, + IsUpdate: true, + ReserveAutoID: 12, + }, *addOpt) + require.Equal(t, CreateIdxOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *(addOpt.GetCreateIdxOpt())) + // NewUpdateRecordOpt without option + updateOpt := NewUpdateRecordOpt() + require.Equal(t, UpdateRecordOpt{}, *updateOpt) + require.Equal(t, AddRecordOpt{}, *(updateOpt.GetAddRecordOpt())) + require.Equal(t, CreateIdxOpt{}, *(updateOpt.GetCreateIdxOpt())) + // NewUpdateRecordOpt with options + updateOpt = NewUpdateRecordOpt(WithCtx(ctx)) + require.Equal(t, UpdateRecordOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *updateOpt) + require.Equal(t, AddRecordOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *(updateOpt.GetAddRecordOpt())) + require.Equal(t, CreateIdxOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *(updateOpt.GetCreateIdxOpt())) + // NewCreateIdxOpt without option + createIdxOpt := NewCreateIdxOpt() + require.Equal(t, CreateIdxOpt{}, *createIdxOpt) + // NewCreateIdxOpt with options + createIdxOpt = NewCreateIdxOpt(WithCtx(ctx), WithIgnoreAssertion, FromBackfill) + require.Equal(t, CreateIdxOpt{ + commonMutateOpt: commonMutateOpt{Ctx: ctx}, + IgnoreAssertion: true, + FromBackFill: true, + }, *createIdxOpt) +} diff --git a/pkg/table/tables/BUILD.bazel b/pkg/table/tables/BUILD.bazel index 2e653027c6c09..86c641ae56ba7 100644 --- a/pkg/table/tables/BUILD.bazel +++ b/pkg/table/tables/BUILD.bazel @@ -43,6 +43,7 @@ go_library( "//pkg/util/dbterror", "//pkg/util/generatedexpr", "//pkg/util/hack", + "//pkg/util/intest", "//pkg/util/logutil", "//pkg/util/ranger", "//pkg/util/rowcodec", diff --git a/pkg/table/tables/bench_test.go b/pkg/table/tables/bench_test.go index 3131a066fcb6f..67bb98d2a888f 100644 --- a/pkg/table/tables/bench_test.go +++ b/pkg/table/tables/bench_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/sessiontxn" + "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/types" _ "github.com/pingcap/tidb/pkg/util/context" @@ -198,7 +199,7 @@ func BenchmarkUpdateRecordInPipelinedDML(b *testing.B) { for j := 0; j < batchSize; j++ { // Update record handle := kv.IntHandle(j) - err := tb.UpdateRecord(context.TODO(), se.GetTableCtx(), handle, records[j], newData[j], touched) + err := tb.UpdateRecord(se.GetTableCtx(), handle, records[j], newData[j], touched, table.WithCtx(context.TODO())) if err != nil { b.Fatal(err) } diff --git a/pkg/table/tables/cache.go b/pkg/table/tables/cache.go index 515a2a781f5bc..1051ee8a995f6 100644 --- a/pkg/table/tables/cache.go +++ b/pkg/table/tables/cache.go @@ -254,13 +254,13 @@ func txnCtxAddCachedTable(sctx table.MutateContext, tid int64, handle *cachedTab } // UpdateRecord implements table.Table -func (c *cachedTable) UpdateRecord(ctx context.Context, sctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { +func (c *cachedTable) UpdateRecord(ctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool, opts ...table.UpdateRecordOption) error { // Prevent furthur writing when the table is already too large. if atomic.LoadInt64(&c.totalSize) > cachedTableSizeLimit { return table.ErrOptOnCacheTable.GenWithStackByArgs("table too large") } - txnCtxAddCachedTable(sctx, c.Meta().ID, c) - return c.TableCommon.UpdateRecord(ctx, sctx, h, oldData, newData, touched) + txnCtxAddCachedTable(ctx, c.Meta().ID, c) + return c.TableCommon.UpdateRecord(ctx, h, oldData, newData, touched, opts...) } // RemoveRecord implements table.Table RemoveRecord interface. diff --git a/pkg/table/tables/index.go b/pkg/table/tables/index.go index 0f9eaf6ebddc3..f2eacd72838a5 100644 --- a/pkg/table/tables/index.go +++ b/pkg/table/tables/index.go @@ -158,15 +158,15 @@ out: // Create creates a new entry in the kvIndex data. // If the index is unique and there is an existing entry with the same key, // Create will return the existing entry's handle as the first return value, ErrKeyExists as the second return value. -func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...table.CreateIdxOptFunc) (kv.Handle, error) { +func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...table.CreateIdxOption) (kv.Handle, error) { + opt := table.NewCreateIdxOpt(opts...) + return c.create(sctx, txn, indexedValue, h, handleRestoreData, false, opt) +} + +func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum, untouched bool, opt *table.CreateIdxOpt) (kv.Handle, error) { if c.Meta().Unique { txn.CacheTableInfo(c.phyTblID, c.tblInfo) } - var opt table.CreateIdxOpt - for _, fn := range opts { - fn(&opt) - } - indexedValues := c.getIndexedValue(indexedValue) ctx := opt.Ctx if ctx != nil { @@ -203,10 +203,10 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu if txn.IsPipelined() { // For pipelined DML, disable the untouched optimization to avoid extra RPCs for MemBuffer.Get(). // TODO: optimize this. - opt.Untouched = false + untouched = false } - if opt.Untouched { + if untouched { txn, err1 := sctx.Txn(true) if err1 != nil { return nil, err1 @@ -228,7 +228,7 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu return nil, err } if keyFlags.HasPresumeKeyNotExists() { - opt.Untouched = false + untouched = false } } } @@ -240,7 +240,7 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu c.needRestoredData = NeedRestoredData(c.idxInfo.Columns, c.tblInfo.Columns) }) idxVal, err := tablecodec.GenIndexValuePortal(loc, c.tblInfo, c.idxInfo, - c.needRestoredData, distinct, opt.Untouched, value, h, c.phyTblID, handleRestoreData, nil) + c.needRestoredData, distinct, untouched, value, h, c.phyTblID, handleRestoreData, nil) err = ec.HandleError(err) if err != nil { return nil, err @@ -248,9 +248,9 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu opt.IgnoreAssertion = opt.IgnoreAssertion || c.idxInfo.State != model.StatePublic - if !distinct || skipCheck || opt.Untouched { + if !distinct || skipCheck || untouched { val := idxVal - if opt.Untouched && (keyIsTempIdxKey || len(tempKey) > 0) { + if untouched && (keyIsTempIdxKey || len(tempKey) > 0) { // Untouched key-values never occur in the storage and the temp index is not public. // It is unnecessary to write the untouched temp index key-values. continue @@ -271,7 +271,7 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu return nil, err } } - if !opt.IgnoreAssertion && (!opt.Untouched) { + if !opt.IgnoreAssertion && (!untouched) { if sctx.GetSessionVars().LazyCheckKeyNotExists() && !txn.IsPessimistic() { err = txn.SetAssertion(key, kv.SetAssertUnknown) } else { diff --git a/pkg/table/tables/partition.go b/pkg/table/tables/partition.go index 3f94e2b9379ee..e2a5e002928d7 100644 --- a/pkg/table/tables/partition.go +++ b/pkg/table/tables/partition.go @@ -16,7 +16,6 @@ package tables import ( "bytes" - "context" stderr "errors" "fmt" "hash/crc32" @@ -1491,6 +1490,11 @@ func (t *partitionedTable) locateHashPartition(ctx expression.EvalContext, partE // GetPartition returns a Table, which is actually a partition. func (t *partitionedTable) GetPartition(pid int64) table.PhysicalTable { + return t.getPartition(pid) +} + +// getPartition returns a Table, which is actually a partition. +func (t *partitionedTable) getPartition(pid int64) *partition { // Attention, can't simply use `return t.partitions[pid]` here. // Because A nil of type *partition is a kind of `table.PhysicalTable` part, ok := t.partitions[pid] @@ -1594,6 +1598,7 @@ func (t *partitionedTable) AddRecord(ctx table.MutateContext, r []types.Datum, o } func partitionedTableAddRecord(ctx table.MutateContext, t *partitionedTable, r []types.Datum, partitionSelection map[int64]struct{}, opts []table.AddRecordOption) (recordID kv.Handle, err error) { + opt := table.NewAddRecordOpt(opts...) pid, err := t.locatePartition(ctx.GetExprCtx().GetEvalCtx(), r) if err != nil { return nil, errors.Trace(err) @@ -1615,8 +1620,8 @@ func partitionedTableAddRecord(ctx table.MutateContext, t *partitionedTable, r [ return nil, errors.WithStack(err) } } - tbl := t.GetPartition(pid) - recordID, err = tbl.AddRecord(ctx, r, opts...) + tbl := t.getPartition(pid) + recordID, err = tbl.addRecord(ctx, r, opt) if err != nil { return } @@ -1629,8 +1634,8 @@ func partitionedTableAddRecord(ctx table.MutateContext, t *partitionedTable, r [ if err != nil { return nil, errors.Trace(err) } - tbl = t.GetPartition(pid) - recordID, err = tbl.AddRecord(ctx, r, opts...) + tbl = t.getPartition(pid) + recordID, err = tbl.addRecord(ctx, r, opt) if err != nil { return } @@ -1712,15 +1717,16 @@ func (t *partitionedTable) GetAllPartitionIDs() []int64 { // UpdateRecord implements table.Table UpdateRecord interface. // `touched` means which columns are really modified, used for secondary indices. // Length of `oldData` and `newData` equals to length of `t.WritableCols()`. -func (t *partitionedTable) UpdateRecord(ctx context.Context, sctx table.MutateContext, h kv.Handle, currData, newData []types.Datum, touched []bool) error { - return partitionedTableUpdateRecord(ctx, sctx, t, h, currData, newData, touched, nil) +func (t *partitionedTable) UpdateRecord(ctx table.MutateContext, h kv.Handle, currData, newData []types.Datum, touched []bool, opts ...table.UpdateRecordOption) error { + return partitionedTableUpdateRecord(ctx, t, h, currData, newData, touched, nil, opts...) } -func (t *partitionTableWithGivenSets) UpdateRecord(ctx context.Context, sctx table.MutateContext, h kv.Handle, currData, newData []types.Datum, touched []bool) error { - return partitionedTableUpdateRecord(ctx, sctx, t.partitionedTable, h, currData, newData, touched, t.givenSetPartitions) +func (t *partitionTableWithGivenSets) UpdateRecord(ctx table.MutateContext, h kv.Handle, currData, newData []types.Datum, touched []bool, opts ...table.UpdateRecordOption) error { + return partitionedTableUpdateRecord(ctx, t.partitionedTable, h, currData, newData, touched, t.givenSetPartitions, opts...) } -func partitionedTableUpdateRecord(gctx context.Context, ctx table.MutateContext, t *partitionedTable, h kv.Handle, currData, newData []types.Datum, touched []bool, partitionSelection map[int64]struct{}) error { +func partitionedTableUpdateRecord(ctx table.MutateContext, t *partitionedTable, h kv.Handle, currData, newData []types.Datum, touched []bool, partitionSelection map[int64]struct{}, opts ...table.UpdateRecordOption) error { + opt := table.NewUpdateRecordOpt(opts...) ectx := ctx.GetExprCtx() from, err := t.locatePartition(ectx.GetEvalCtx(), currData) if err != nil { @@ -1767,7 +1773,7 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx table.MutateContext, return errors.Trace(err) } - _, err = t.GetPartition(to).AddRecord(ctx, newData) + _, err = t.getPartition(to).addRecord(ctx, newData, opt.GetAddRecordOpt()) if err != nil { return errors.Trace(err) } @@ -1789,7 +1795,7 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx table.MutateContext, } if newTo == newFrom && newTo != 0 { // Update needs to be done in StateDeleteOnly as well - err = t.GetPartition(newTo).UpdateRecord(gctx, ctx, h, currData, newData, touched) + err = t.getPartition(newTo).updateRecord(ctx, h, currData, newData, touched, opt) if err != nil { return errors.Trace(err) } @@ -1798,14 +1804,14 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx table.MutateContext, } if newFrom != 0 { - err = t.GetPartition(newFrom).RemoveRecord(ctx, h, currData) + err = t.getPartition(newFrom).RemoveRecord(ctx, h, currData) // TODO: Can this happen? When the data is not yet backfilled? if err != nil { return errors.Trace(err) } } if newTo != 0 && t.Meta().GetPartitionInfo().DDLState != model.StateDeleteOnly { - _, err = t.GetPartition(newTo).AddRecord(ctx, newData) + _, err = t.getPartition(newTo).addRecord(ctx, newData, opt.GetAddRecordOpt()) if err != nil { return errors.Trace(err) } @@ -1813,8 +1819,8 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx table.MutateContext, memBuffer.Release(sh) return nil } - tbl := t.GetPartition(to) - err = tbl.UpdateRecord(gctx, ctx, h, currData, newData, touched) + tbl := t.getPartition(to) + err = tbl.updateRecord(ctx, h, currData, newData, touched, opt) if err != nil { return errors.Trace(err) } @@ -1830,11 +1836,11 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx table.MutateContext, return errors.Trace(err) } if newTo == newFrom { - tbl = t.GetPartition(newTo) + tbl = t.getPartition(newTo) if t.Meta().Partition.DDLState == model.StateDeleteOnly { err = tbl.RemoveRecord(ctx, h, currData) } else { - err = tbl.UpdateRecord(gctx, ctx, h, currData, newData, touched) + err = tbl.updateRecord(ctx, h, currData, newData, touched, opt) } if err != nil { return errors.Trace(err) @@ -1842,14 +1848,14 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx table.MutateContext, memBuffer.Release(sh) return nil } - tbl = t.GetPartition(newFrom) + tbl = t.getPartition(newFrom) err = tbl.RemoveRecord(ctx, h, currData) if err != nil { return errors.Trace(err) } if t.Meta().GetPartitionInfo().DDLState != model.StateDeleteOnly { - tbl = t.GetPartition(newTo) - _, err = tbl.AddRecord(ctx, newData) + tbl = t.getPartition(newTo) + _, err = tbl.addRecord(ctx, newData, opt.GetAddRecordOpt()) if err != nil { return errors.Trace(err) } diff --git a/pkg/table/tables/tables.go b/pkg/table/tables/tables.go index 83f58f727c844..b9c5d83fe4c28 100644 --- a/pkg/table/tables/tables.go +++ b/pkg/table/tables/tables.go @@ -49,6 +49,7 @@ import ( "github.com/pingcap/tidb/pkg/util/codec" "github.com/pingcap/tidb/pkg/util/collate" "github.com/pingcap/tidb/pkg/util/generatedexpr" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/stringutil" "github.com/pingcap/tidb/pkg/util/tableutil" @@ -272,11 +273,24 @@ func initTableIndices(t *TableCommon) error { // Use partition ID for index, because TableCommon may be table or partition. idx := NewIndex(t.physicalTableID, tblInfo, idxInfo) + intest.AssertFunc(func() bool { + // `TableCommon.indices` is type of `[]table.Index` to implement `table.Table` interface. + // However, the actual type of each element in it should be `*index`. + // This assumption is used in implementation details that using `adIndex` to cast `type.Index` to `*index`. + _, ok := idx.(*index) + intest.Assert(ok, "index should be type of `*index`") + return true + }) t.indices = append(t.indices, idx) } return nil } +// asIndex casts a table.Index to *index which is the actual type of index in TableCommon. +func asIndex(idx table.Index) *index { + return idx.(*index) +} + func initTableCommonWithIndices(t *TableCommon, tblInfo *model.TableInfo, physicalTableID int64, cols []*table.Column, allocs autoid.Allocators, constraints []*table.Constraint) error { initTableCommon(t, tblInfo, physicalTableID, cols, allocs, constraints) return initTableIndices(t) @@ -425,7 +439,12 @@ func (t *TableCommon) shouldAssert(level variable.AssertionLevel) bool { // UpdateRecord implements table.Table UpdateRecord interface. // `touched` means which columns are really modified, used for secondary indices. // Length of `oldData` and `newData` equals to length of `t.WritableCols()`. -func (t *TableCommon) UpdateRecord(ctx context.Context, sctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { +func (t *TableCommon) UpdateRecord(ctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool, opts ...table.UpdateRecordOption) error { + opt := table.NewUpdateRecordOpt(opts...) + return t.updateRecord(ctx, h, oldData, newData, touched, opt) +} + +func (t *TableCommon) updateRecord(sctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool, opt *table.UpdateRecordOpt) error { txn, err := sctx.Txn(true) if err != nil { return err @@ -512,7 +531,7 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx table.MutateContext } } // rebuild index - err = t.rebuildIndices(sctx, txn, h, touched, oldData, newData, table.WithCtx(ctx)) + err = t.rebuildIndices(sctx, txn, h, touched, oldData, newData, opt.GetCreateIdxOpt()) if err != nil { return err } @@ -587,7 +606,7 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx table.MutateContext return nil } -func (t *TableCommon) rebuildIndices(ctx table.MutateContext, txn kv.Transaction, h kv.Handle, touched []bool, oldData []types.Datum, newData []types.Datum, opts ...table.CreateIdxOptFunc) error { +func (t *TableCommon) rebuildIndices(ctx table.MutateContext, txn kv.Transaction, h kv.Handle, touched []bool, oldData []types.Datum, newData []types.Datum, opt *table.CreateIdxOpt) error { for _, idx := range t.deletableIndices() { if t.meta.IsCommonHandle && idx.Meta().Primary { continue @@ -634,7 +653,7 @@ func (t *TableCommon) rebuildIndices(ctx table.MutateContext, txn kv.Transaction if err != nil { return err } - if err := t.buildIndexForRow(ctx, h, newVs, newData, idx, txn, untouched, opts...); err != nil { + if err := t.buildIndexForRow(ctx, h, newVs, newData, asIndex(idx), txn, untouched, opt); err != nil { return err } } @@ -653,22 +672,6 @@ func FindPrimaryIndex(tblInfo *model.TableInfo) *model.IndexInfo { return pkIdx } -// CommonAddRecordCtx is used in `AddRecord` to avoid memory malloc for some temp slices. -// This is useful in lightning parse row data to key-values pairs. This can gain upto 5% performance -// improvement in lightning's local mode. -type CommonAddRecordCtx struct { - colIDs []int64 - row []types.Datum -} - -// commonAddRecordKey is used as key in `sessionctx.Context.Value(key)` -type commonAddRecordKey struct{} - -// String implement `stringer.String` for CommonAddRecordKey -func (c commonAddRecordKey) String() string { - return "_common_add_record_context_key" -} - // TryGetCommonPkColumnIds get the IDs of primary key column if the table has common handle. func TryGetCommonPkColumnIds(tbl *model.TableInfo) []int64 { if !tbl.IsCommonHandle { @@ -734,17 +737,16 @@ func checkTempTableSize(tmpTable tbctx.TemporaryTableHandler, sizeLimit int64) e // AddRecord implements table.Table AddRecord interface. func (t *TableCommon) AddRecord(sctx table.MutateContext, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { + // TODO: optimize the allocation (and calculation) of opt. + opt := table.NewAddRecordOpt(opts...) + return t.addRecord(sctx, r, opt) +} + +func (t *TableCommon) addRecord(sctx table.MutateContext, r []types.Datum, opt *table.AddRecordOpt) (recordID kv.Handle, err error) { txn, err := sctx.Txn(true) if err != nil { return nil, err } - - // TODO: optimize the allocation (and calculation) of opt. - var opt table.AddRecordOpt - for _, fn := range opts { - fn.ApplyOn(&opt) - } - if m := t.Meta(); m.TempTableType != model.TempTableNone { if tmpTable, sizeLimit, ok := addTemporaryTable(sctx, m); ok { if err = checkTempTableSize(tmpTable, sizeLimit); err != nil { @@ -948,17 +950,8 @@ func (t *TableCommon) AddRecord(sctx table.MutateContext, r []types.Datum, opts return nil, err } - var createIdxOpts []table.CreateIdxOptFunc - if len(opts) > 0 { - createIdxOpts = make([]table.CreateIdxOptFunc, 0, len(opts)) - for _, fn := range opts { - if raw, ok := fn.(table.CreateIdxOptFunc); ok { - createIdxOpts = append(createIdxOpts, raw) - } - } - } // Insert new entries into indices. - h, err := t.addIndices(sctx, recordID, r, txn, createIdxOpts) + h, err := t.addIndices(sctx, recordID, r, txn, opt.GetCreateIdxOpt()) if err != nil { return h, err } @@ -1016,7 +1009,7 @@ func genIndexKeyStrs(colVals []types.Datum) ([]string, error) { } // addIndices adds data into indices. If any key is duplicated, returns the original handle. -func (t *TableCommon) addIndices(sctx table.MutateContext, recordID kv.Handle, r []types.Datum, txn kv.Transaction, opts []table.CreateIdxOptFunc) (kv.Handle, error) { +func (t *TableCommon) addIndices(sctx table.MutateContext, recordID kv.Handle, r []types.Datum, txn kv.Transaction, opt *table.CreateIdxOpt) (kv.Handle, error) { writeBufs := sctx.GetMutateBuffers().GetWriteStmtBufs() indexVals := writeBufs.IndexValsBuf skipCheck := sctx.GetSessionVars().StmtCtx.BatchCheck @@ -1046,7 +1039,7 @@ func (t *TableCommon) addIndices(sctx table.MutateContext, recordID kv.Handle, r dupErr = kv.GenKeyExistsErr(colStrVals, fmt.Sprintf("%s.%s", v.TableMeta().Name.String(), v.Meta().Name.String())) } rsData := TryGetHandleRestoredDataWrapper(t.meta, r, nil, v.Meta()) - if dupHandle, err := v.Create(sctx, txn, indexVals, recordID, rsData, opts...); err != nil { + if dupHandle, err := asIndex(v).create(sctx, txn, indexVals, recordID, rsData, false, opt); err != nil { if kv.ErrKeyExists.Equal(err) { return dupHandle, dupErr } @@ -1427,14 +1420,9 @@ func (t *TableCommon) removeRowIndex(ctx table.MutateContext, h kv.Handle, vals } // buildIndexForRow implements table.Table BuildIndexForRow interface. -func (t *TableCommon) buildIndexForRow(ctx table.MutateContext, h kv.Handle, vals []types.Datum, newData []types.Datum, idx table.Index, txn kv.Transaction, untouched bool, popts ...table.CreateIdxOptFunc) error { - var opts []table.CreateIdxOptFunc - opts = append(opts, popts...) - if untouched { - opts = append(opts, table.IndexIsUntouched) - } +func (t *TableCommon) buildIndexForRow(ctx table.MutateContext, h kv.Handle, vals []types.Datum, newData []types.Datum, idx *index, txn kv.Transaction, untouched bool, opt *table.CreateIdxOpt) error { rsData := TryGetHandleRestoredDataWrapper(t.meta, newData, nil, idx.Meta()) - if _, err := idx.Create(ctx, txn, vals, h, rsData, opts...); err != nil { + if _, err := idx.create(ctx, txn, vals, h, rsData, untouched, opt); err != nil { if kv.ErrKeyExists.Equal(err) { // Make error message consistent with MySQL. tablecodec.TruncateIndexValues(t.meta, idx.Meta(), vals) diff --git a/pkg/table/tables/tables_test.go b/pkg/table/tables/tables_test.go index 78151ac9d3628..9c9801d35921d 100644 --- a/pkg/table/tables/tables_test.go +++ b/pkg/table/tables/tables_test.go @@ -111,7 +111,7 @@ func TestBasic(t *testing.T) { _, err = tb.AddRecord(ctx.GetTableCtx(), types.MakeDatums(2, "abc")) require.Error(t, err) - require.Nil(t, tb.UpdateRecord(context.Background(), ctx.GetTableCtx(), rid, types.MakeDatums(1, "abc"), types.MakeDatums(1, "cba"), []bool{false, true})) + require.Nil(t, tb.UpdateRecord(ctx.GetTableCtx(), rid, types.MakeDatums(1, "abc"), types.MakeDatums(1, "cba"), []bool{false, true})) err = tables.IterRecords(tb, ctx, tb.Cols(), func(_ kv.Handle, data []types.Datum, cols []*table.Column) (bool, error) { return true, nil