Skip to content

Commit

Permalink
table: refactor options for `Table.AddRecord/Table.UpdateRecord/Index…
Browse files Browse the repository at this point in the history
….Create`
  • Loading branch information
lcwangchao committed Aug 1, 2024
1 parent b5555c3 commit edac28e
Show file tree
Hide file tree
Showing 15 changed files with 230 additions and 125 deletions.
4 changes: 2 additions & 2 deletions pkg/ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func checkAddWriteOnly(ctx sessionctx.Context, deleteOnlyTable, writeOnlyTable t
if err != nil {
return errors.Trace(err)
}
err = writeOnlyTable.UpdateRecord(context.Background(), ctx.GetTableCtx(), h, types.MakeDatums(1, 2, 3), types.MakeDatums(2, 2, 3), touchedSlice(writeOnlyTable))
err = writeOnlyTable.UpdateRecord(ctx.GetTableCtx(), h, types.MakeDatums(1, 2, 3), types.MakeDatums(2, 2, 3), touchedSlice(writeOnlyTable))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -317,7 +317,7 @@ func checkAddPublic(sctx sessionctx.Context, writeOnlyTable, publicTable table.T
return errors.Errorf("%v", oldRow)
}
newRow := types.MakeDatums(3, 4, oldRow[2].GetValue())
err = writeOnlyTable.UpdateRecord(context.Background(), sctx.GetTableCtx(), h, oldRow, newRow, touchedSlice(writeOnlyTable))
err = writeOnlyTable.UpdateRecord(sctx.GetTableCtx(), h, oldRow, newRow, touchedSlice(writeOnlyTable))
if err != nil {
return errors.Trace(err)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ddl/index_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func checkAddWriteOnlyForAddIndex(ctx sessionctx.Context, delOnlyTbl, writeOnlyT
}

// WriteOnlyTable: update t set c2 = 1 where c1 = 4 and c2 = 4
err = writeOnlyTbl.UpdateRecord(context.Background(), ctx.GetTableCtx(), kv.IntHandle(4), types.MakeDatums(4, 4), types.MakeDatums(4, 1), touchedSlice(writeOnlyTbl))
err = writeOnlyTbl.UpdateRecord(ctx.GetTableCtx(), kv.IntHandle(4), types.MakeDatums(4, 4), types.MakeDatums(4, 1), touchedSlice(writeOnlyTbl))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -183,7 +183,7 @@ func checkAddWriteOnlyForAddIndex(ctx sessionctx.Context, delOnlyTbl, writeOnlyT
}

// DeleteOnlyTable: update t set c2 = 3 where c1 = 4 and c2 = 1
err = delOnlyTbl.UpdateRecord(context.Background(), ctx.GetTableCtx(), kv.IntHandle(4), types.MakeDatums(4, 1), types.MakeDatums(4, 3), touchedSlice(writeOnlyTbl))
err = delOnlyTbl.UpdateRecord(ctx.GetTableCtx(), kv.IntHandle(4), types.MakeDatums(4, 1), types.MakeDatums(4, 3), touchedSlice(writeOnlyTbl))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -250,7 +250,7 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table
}

// WriteOnlyTable: update t set c2 = 5 where c1 = 7 and c2 = 7
err = writeTbl.UpdateRecord(context.Background(), ctx.GetTableCtx(), kv.IntHandle(7), types.MakeDatums(7, 7), types.MakeDatums(7, 5), touchedSlice(writeTbl))
err = writeTbl.UpdateRecord(ctx.GetTableCtx(), kv.IntHandle(7), types.MakeDatums(7, 7), types.MakeDatums(7, 5), touchedSlice(writeTbl))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -328,7 +328,7 @@ func checkDropWriteOnly(ctx sessionctx.Context, publicTbl, writeTbl table.Table)
}

// WriteOnlyTable update t set c2 = 7 where c1 = 8 and c2 = 8
err = writeTbl.UpdateRecord(context.Background(), ctx.GetTableCtx(), kv.IntHandle(8), types.MakeDatums(8, 8), types.MakeDatums(8, 7), touchedSlice(writeTbl))
err = writeTbl.UpdateRecord(ctx.GetTableCtx(), kv.IntHandle(8), types.MakeDatums(8, 8), types.MakeDatums(8, 7), touchedSlice(writeTbl))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -383,7 +383,7 @@ func checkDropDeleteOnly(ctx sessionctx.Context, writeTbl, delTbl table.Table) e
}

// DeleteOnlyTable update t set c2 = 10 where c1 = 9
err = delTbl.UpdateRecord(context.Background(), ctx.GetTableCtx(), kv.IntHandle(9), types.MakeDatums(9, 9), types.MakeDatums(9, 10), touchedSlice(delTbl))
err = delTbl.UpdateRecord(ctx.GetTableCtx(), kv.IntHandle(9), types.MakeDatums(9, 9), types.MakeDatums(9, 10), touchedSlice(delTbl))
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func updateRecord(
}
} else {
// Update record to new value and update index.
if err := t.UpdateRecord(ctx, sctx.GetTableCtx(), h, oldData, newData, modified); err != nil {
if err := t.UpdateRecord(sctx.GetTableCtx(), h, oldData, newData, modified, table.WithCtx(ctx)); err != nil {
if terr, ok := errors.Cause(err).(*terror.Error); ok && (terr.Code() == errno.ErrNoPartitionForGivenValue || terr.Code() == errno.ErrRowDoesNotMatchGivenPartitionSet) {
ec := sctx.GetSessionVars().StmtCtx.ErrCtx()
return false, ec.HandleError(err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -2436,7 +2436,7 @@ func (it *infoschemaTable) RemoveRecord(ctx table.MutateContext, h kv.Handle, r
}

// UpdateRecord implements table.Table UpdateRecord interface.
func (it *infoschemaTable) UpdateRecord(gctx context.Context, ctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool) error {
func (it *infoschemaTable) UpdateRecord(ctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool, opts ...table.UpdateRecordOption) error {
return table.ErrUnsupportedOp
}

Expand Down Expand Up @@ -2529,7 +2529,7 @@ func (vt *VirtualTable) RemoveRecord(ctx table.MutateContext, h kv.Handle, r []t
}

// UpdateRecord implements table.Table UpdateRecord interface.
func (vt *VirtualTable) UpdateRecord(ctx context.Context, sctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool) error {
func (vt *VirtualTable) UpdateRecord(ctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool, opts ...table.UpdateRecordOption) error {
return table.ErrUnsupportedOp
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/table/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ go_test(
embed = [":table"],
flaky = True,
race = "on",
shard_count = 9,
shard_count = 10,
deps = [
"//pkg/errctx",
"//pkg/errno",
Expand Down
38 changes: 21 additions & 17 deletions pkg/table/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package table

import (
"context"
"time"

"github.com/pingcap/tidb/pkg/errctx"
Expand All @@ -32,51 +31,56 @@ type IndexIterator interface {

// CreateIdxOpt contains the options will be used when creating an index.
type CreateIdxOpt struct {
Ctx context.Context
Untouched bool // If true, the index key/value is no need to commit.
commonMutateOpt
IgnoreAssertion bool
FromBackFill bool
}

// NewCreateIdxOpt creates a new CreateIdxOpt.
func NewCreateIdxOpt(opts ...CreateIdxOption) *CreateIdxOpt {
opt := &CreateIdxOpt{}
for _, o := range opts {
o.ApplyCreateIdxOpt(opt)
}
return opt
}

// CreateIdxOption is defined for the Create() method of the Index interface.
type CreateIdxOption interface {
ApplyCreateIdxOpt(*CreateIdxOpt)
}

// CreateIdxOptFunc is defined for the Create() method of Index interface.
// Here is a blog post about how to use this pattern:
// https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis
type CreateIdxOptFunc func(*CreateIdxOpt)

// IndexIsUntouched uses to indicate the index kv is untouched.
var IndexIsUntouched CreateIdxOptFunc = func(opt *CreateIdxOpt) {
opt.Untouched = true
// ApplyCreateIdxOpt implements the CreateIdxOption interface.
func (f CreateIdxOptFunc) ApplyCreateIdxOpt(opt *CreateIdxOpt) {
f(opt)
}

// WithIgnoreAssertion uses to indicate the process can ignore assertion.
var WithIgnoreAssertion = func(opt *CreateIdxOpt) {
var WithIgnoreAssertion CreateIdxOptFunc = func(opt *CreateIdxOpt) {
opt.IgnoreAssertion = true
}

// FromBackfill indicates that the index is created by DDL backfill worker.
// In the backfill-merge process, the index KVs from DML will be redirected to
// the temp index. On the other hand, the index KVs from DDL backfill worker should
// never be redirected to the temp index.
var FromBackfill = func(opt *CreateIdxOpt) {
var FromBackfill CreateIdxOptFunc = func(opt *CreateIdxOpt) {
opt.FromBackFill = true
}

// WithCtx returns a CreateIdxFunc.
// This option is used to pass context.Context.
func WithCtx(ctx context.Context) CreateIdxOptFunc {
return func(opt *CreateIdxOpt) {
opt.Ctx = ctx
}
}

// Index is the interface for index data on KV store.
type Index interface {
// Meta returns IndexInfo.
Meta() *model.IndexInfo
// TableMeta returns TableInfo
TableMeta() *model.TableInfo
// Create supports insert into statement.
Create(ctx MutateContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...CreateIdxOptFunc) (kv.Handle, error)
Create(ctx MutateContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...CreateIdxOption) (kv.Handle, error)
// Delete supports delete from statement.
Delete(ctx MutateContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error
// GenIndexKVIter generate index key and value for multi-valued index, use iterator to reduce the memory allocation.
Expand Down
92 changes: 80 additions & 12 deletions pkg/table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,38 +119,106 @@ var (
// RecordIterFunc is used for low-level record iteration.
type RecordIterFunc func(h kv.Handle, rec []types.Datum, cols []*Column) (more bool, err error)

// commonMutateOpt is the common options for mutating a table.
type commonMutateOpt struct {
Ctx context.Context
}

// AddRecordOpt contains the options will be used when adding a record.
type AddRecordOpt struct {
CreateIdxOpt
commonMutateOpt
IsUpdate bool
ReserveAutoID int
}

// NewAddRecordOpt creates a new AddRecordOpt with options.
func NewAddRecordOpt(opts ...AddRecordOption) *AddRecordOpt {
opt := &AddRecordOpt{}
for _, o := range opts {
o.ApplyAddRecordOpt(opt)
}
return opt
}

// GetCreateIdxOpt creates a CreateIdxOpt.
func (opt *AddRecordOpt) GetCreateIdxOpt() *CreateIdxOpt {
return &CreateIdxOpt{commonMutateOpt: opt.commonMutateOpt}
}

// AddRecordOption is defined for the AddRecord() method of the Table interface.
type AddRecordOption interface {
ApplyOn(*AddRecordOpt)
ApplyAddRecordOpt(*AddRecordOpt)
}

// UpdateRecordOpt contains the options will be used when updating a record.
type UpdateRecordOpt struct {
commonMutateOpt
}

// NewUpdateRecordOpt creates a new UpdateRecordOpt with options.
func NewUpdateRecordOpt(opts ...UpdateRecordOption) *UpdateRecordOpt {
opt := &UpdateRecordOpt{}
for _, o := range opts {
o.ApplyUpdateRecordOpt(opt)
}
return opt
}

// GetAddRecordOpt creates a AddRecordOpt.
func (opt *UpdateRecordOpt) GetAddRecordOpt() *AddRecordOpt {
return &AddRecordOpt{commonMutateOpt: opt.commonMutateOpt}
}

// GetCreateIdxOpt creates a CreateIdxOpt.
func (opt *UpdateRecordOpt) GetCreateIdxOpt() *CreateIdxOpt {
return &CreateIdxOpt{commonMutateOpt: opt.commonMutateOpt}
}

// UpdateRecordOption is defined for the UpdateRecord() method of the Table interface.
type UpdateRecordOption interface {
ApplyUpdateRecordOpt(*UpdateRecordOpt)
}

// CommonMutateOptFunc is a function to provide common options for mutating a table.
type CommonMutateOptFunc func(*commonMutateOpt)

// ApplyAddRecordOpt implements the AddRecordOption interface.
func (f CommonMutateOptFunc) ApplyAddRecordOpt(opt *AddRecordOpt) {
f(&opt.commonMutateOpt)
}

// ApplyUpdateRecordOpt implements the UpdateRecordOption interface.
func (f CommonMutateOptFunc) ApplyUpdateRecordOpt(opt *UpdateRecordOpt) {
f(&opt.commonMutateOpt)
}

// ApplyCreateIdxOpt implements the CreateIdxOption interface.
func (f CommonMutateOptFunc) ApplyCreateIdxOpt(opt *CreateIdxOpt) {
f(&opt.commonMutateOpt)
}

// WithCtx returns a CommonMutateOptFunc.
// This option is used to pass context.Context.
func WithCtx(ctx context.Context) CommonMutateOptFunc {
return func(opt *commonMutateOpt) {
opt.Ctx = ctx
}
}

// WithReserveAutoIDHint tells the AddRecord operation to reserve a batch of auto ID in the stmtctx.
type WithReserveAutoIDHint int

// ApplyOn implements the AddRecordOption interface.
func (n WithReserveAutoIDHint) ApplyOn(opt *AddRecordOpt) {
// ApplyAddRecordOpt implements the AddRecordOption interface.
func (n WithReserveAutoIDHint) ApplyAddRecordOpt(opt *AddRecordOpt) {
opt.ReserveAutoID = int(n)
}

// ApplyOn implements the AddRecordOption interface, so any CreateIdxOptFunc
// can be passed as the optional argument to the table.AddRecord method.
func (f CreateIdxOptFunc) ApplyOn(opt *AddRecordOpt) {
f(&opt.CreateIdxOpt)
}

// IsUpdate is a defined value for AddRecordOptFunc.
var IsUpdate AddRecordOption = isUpdate{}

type isUpdate struct{}

func (i isUpdate) ApplyOn(opt *AddRecordOpt) {
func (i isUpdate) ApplyAddRecordOpt(opt *AddRecordOpt) {
opt.IsUpdate = true
}

Expand Down Expand Up @@ -202,7 +270,7 @@ type Table interface {
AddRecord(ctx MutateContext, r []types.Datum, opts ...AddRecordOption) (recordID kv.Handle, err error)

// UpdateRecord updates a row which should contain only writable columns.
UpdateRecord(gctx context.Context, ctx MutateContext, h kv.Handle, currData, newData []types.Datum, touched []bool) error
UpdateRecord(ctx MutateContext, h kv.Handle, currData, newData []types.Datum, touched []bool, opts ...UpdateRecordOption) error

// RemoveRecord removes a row in the table.
RemoveRecord(ctx MutateContext, h kv.Handle, r []types.Datum) error
Expand Down
37 changes: 37 additions & 0 deletions pkg/table/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package table

import (
"context"
"testing"

mysql "github.com/pingcap/tidb/pkg/errno"
Expand All @@ -41,3 +42,39 @@ func TestErrorCode(t *testing.T) {
require.Equal(t, mysql.ErrNoPartitionForGivenValue, int(terror.ToSQLError(ErrNoPartitionForGivenValue).Code))
require.Equal(t, mysql.ErrLockOrActiveTransaction, int(terror.ToSQLError(ErrLockOrActiveTransaction).Code))
}

func TestOptions(t *testing.T) {
ctx := context.WithValue(context.Background(), "test", "test")
// NewAddRecordOpt without option
addOpt := NewAddRecordOpt()
require.Equal(t, AddRecordOpt{}, *addOpt)
require.Equal(t, CreateIdxOpt{}, *(addOpt.GetCreateIdxOpt()))
// NewAddRecordOpt with options
addOpt = NewAddRecordOpt(WithCtx(ctx), IsUpdate, WithReserveAutoIDHint(12))
require.Equal(t, AddRecordOpt{
commonMutateOpt: commonMutateOpt{Ctx: ctx},
IsUpdate: true,
ReserveAutoID: 12,
}, *addOpt)
require.Equal(t, CreateIdxOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *(addOpt.GetCreateIdxOpt()))
// NewUpdateRecordOpt without option
updateOpt := NewUpdateRecordOpt()
require.Equal(t, UpdateRecordOpt{}, *updateOpt)
require.Equal(t, AddRecordOpt{}, *(updateOpt.GetAddRecordOpt()))
require.Equal(t, CreateIdxOpt{}, *(updateOpt.GetCreateIdxOpt()))
// NewUpdateRecordOpt with options
updateOpt = NewUpdateRecordOpt(WithCtx(ctx))
require.Equal(t, UpdateRecordOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *updateOpt)
require.Equal(t, AddRecordOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *(updateOpt.GetAddRecordOpt()))
require.Equal(t, CreateIdxOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *(updateOpt.GetCreateIdxOpt()))
// NewCreateIdxOpt without option
createIdxOpt := NewCreateIdxOpt()
require.Equal(t, CreateIdxOpt{}, *createIdxOpt)
// NewCreateIdxOpt with options
createIdxOpt = NewCreateIdxOpt(WithCtx(ctx), WithIgnoreAssertion, FromBackfill)
require.Equal(t, CreateIdxOpt{
commonMutateOpt: commonMutateOpt{Ctx: ctx},
IgnoreAssertion: true,
FromBackFill: true,
}, *createIdxOpt)
}
1 change: 1 addition & 0 deletions pkg/table/tables/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"//pkg/util/dbterror",
"//pkg/util/generatedexpr",
"//pkg/util/hack",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/ranger",
"//pkg/util/rowcodec",
Expand Down
3 changes: 2 additions & 1 deletion pkg/table/tables/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/types"
_ "github.com/pingcap/tidb/pkg/util/context"
Expand Down Expand Up @@ -198,7 +199,7 @@ func BenchmarkUpdateRecordInPipelinedDML(b *testing.B) {
for j := 0; j < batchSize; j++ {
// Update record
handle := kv.IntHandle(j)
err := tb.UpdateRecord(context.TODO(), se.GetTableCtx(), handle, records[j], newData[j], touched)
err := tb.UpdateRecord(se.GetTableCtx(), handle, records[j], newData[j], touched, table.WithCtx(context.TODO()))
if err != nil {
b.Fatal(err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/table/tables/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,13 @@ func txnCtxAddCachedTable(sctx table.MutateContext, tid int64, handle *cachedTab
}

// UpdateRecord implements table.Table
func (c *cachedTable) UpdateRecord(ctx context.Context, sctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool) error {
func (c *cachedTable) UpdateRecord(ctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool, opts ...table.UpdateRecordOption) error {
// Prevent furthur writing when the table is already too large.
if atomic.LoadInt64(&c.totalSize) > cachedTableSizeLimit {
return table.ErrOptOnCacheTable.GenWithStackByArgs("table too large")
}
txnCtxAddCachedTable(sctx, c.Meta().ID, c)
return c.TableCommon.UpdateRecord(ctx, sctx, h, oldData, newData, touched)
txnCtxAddCachedTable(ctx, c.Meta().ID, c)
return c.TableCommon.UpdateRecord(ctx, h, oldData, newData, touched, opts...)
}

// RemoveRecord implements table.Table RemoveRecord interface.
Expand Down
Loading

0 comments on commit edac28e

Please sign in to comment.