Skip to content

Commit

Permalink
table: make AddRecordOpt/UpdateRecordOpt/CreateIdxOpt immutable (
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Aug 9, 2024
1 parent e1f2b77 commit bf7c8b1
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 69 deletions.
40 changes: 24 additions & 16 deletions pkg/table/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,46 +32,54 @@ type IndexIterator interface {
// CreateIdxOpt contains the options will be used when creating an index.
type CreateIdxOpt struct {
commonMutateOpt
IgnoreAssertion bool
FromBackFill bool
ignoreAssertion bool
fromBackFill bool
}

// NewCreateIdxOpt creates a new CreateIdxOpt.
func NewCreateIdxOpt(opts ...CreateIdxOption) *CreateIdxOpt {
opt := &CreateIdxOpt{}
for _, o := range opts {
o.ApplyCreateIdxOpt(opt)
o.applyCreateIdxOpt(opt)
}
return opt
}

// IgnoreAssertion indicates whether to ignore assertion.
func (opt *CreateIdxOpt) IgnoreAssertion() bool {
return opt.ignoreAssertion
}

// FromBackFill indicates whether the index is created by DDL backfill worker.
func (opt *CreateIdxOpt) FromBackFill() bool {
return opt.fromBackFill
}

// CreateIdxOption is defined for the Create() method of the Index interface.
type CreateIdxOption interface {
ApplyCreateIdxOpt(*CreateIdxOpt)
applyCreateIdxOpt(*CreateIdxOpt)
}

// CreateIdxOptFunc is defined for the Create() method of Index interface.
// Here is a blog post about how to use this pattern:
// https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis
type CreateIdxOptFunc func(*CreateIdxOpt)
type withIgnoreAssertion struct{}

// ApplyCreateIdxOpt implements the CreateIdxOption interface.
func (f CreateIdxOptFunc) ApplyCreateIdxOpt(opt *CreateIdxOpt) {
f(opt)
func (withIgnoreAssertion) applyCreateIdxOpt(opt *CreateIdxOpt) {
opt.ignoreAssertion = true
}

// WithIgnoreAssertion uses to indicate the process can ignore assertion.
var WithIgnoreAssertion CreateIdxOptFunc = func(opt *CreateIdxOpt) {
opt.IgnoreAssertion = true
var WithIgnoreAssertion CreateIdxOption = withIgnoreAssertion{}

type fromBackfill struct{}

func (fromBackfill) applyCreateIdxOpt(opt *CreateIdxOpt) {
opt.fromBackFill = true
}

// FromBackfill indicates that the index is created by DDL backfill worker.
// In the backfill-merge process, the index KVs from DML will be redirected to
// the temp index. On the other hand, the index KVs from DDL backfill worker should
// never be redirected to the temp index.
var FromBackfill CreateIdxOptFunc = func(opt *CreateIdxOpt) {
opt.FromBackFill = true
}
var FromBackfill CreateIdxOption = fromBackfill{}

// Index is the interface for index data on KV store.
type Index interface {
Expand Down
77 changes: 51 additions & 26 deletions pkg/table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,52 +121,77 @@ type RecordIterFunc func(h kv.Handle, rec []types.Datum, cols []*Column) (more b

// commonMutateOpt is the common options for mutating a table.
type commonMutateOpt struct {
Ctx context.Context
DupKeyCheck DupKeyCheckMode
ctx context.Context
dupKeyCheck DupKeyCheckMode
}

// Ctx returns the go context in the option
func (opt *commonMutateOpt) Ctx() context.Context {
return opt.ctx
}

// DupKeyCheck returns the DupKeyCheckMode in the option
func (opt *commonMutateOpt) DupKeyCheck() DupKeyCheckMode {
return opt.dupKeyCheck
}

// AddRecordOpt contains the options will be used when adding a record.
type AddRecordOpt struct {
commonMutateOpt
IsUpdate bool
ReserveAutoID int
isUpdate bool
reserveAutoID int
}

// NewAddRecordOpt creates a new AddRecordOpt with options.
func NewAddRecordOpt(opts ...AddRecordOption) *AddRecordOpt {
opt := &AddRecordOpt{}
for _, o := range opts {
o.ApplyAddRecordOpt(opt)
o.applyAddRecordOpt(opt)
}
return opt
}

// IsUpdate indicates whether the `AddRecord` operation is in an update statement.
func (opt *AddRecordOpt) IsUpdate() bool {
return opt.isUpdate
}

// ReserveAutoID indicates the auto id count that should be reserved.
func (opt *AddRecordOpt) ReserveAutoID() int {
return opt.reserveAutoID
}

// GetCreateIdxOpt creates a CreateIdxOpt.
func (opt *AddRecordOpt) GetCreateIdxOpt() *CreateIdxOpt {
return &CreateIdxOpt{commonMutateOpt: opt.commonMutateOpt}
}

// AddRecordOption is defined for the AddRecord() method of the Table interface.
type AddRecordOption interface {
ApplyAddRecordOpt(*AddRecordOpt)
applyAddRecordOpt(*AddRecordOpt)
}

// UpdateRecordOpt contains the options will be used when updating a record.
type UpdateRecordOpt struct {
commonMutateOpt
// SkipWriteUntouchedIndices is an option to skip write untouched indices when updating a record.
SkipWriteUntouchedIndices bool
// skipWriteUntouchedIndices is an option to skip write untouched indices when updating a record.
skipWriteUntouchedIndices bool
}

// NewUpdateRecordOpt creates a new UpdateRecordOpt with options.
func NewUpdateRecordOpt(opts ...UpdateRecordOption) *UpdateRecordOpt {
opt := &UpdateRecordOpt{}
for _, o := range opts {
o.ApplyUpdateRecordOpt(opt)
o.applyUpdateRecordOpt(opt)
}
return opt
}

// SkipWriteUntouchedIndices indicates whether to skip write untouched indices when updating a record.
func (opt *UpdateRecordOpt) SkipWriteUntouchedIndices() bool {
return opt.skipWriteUntouchedIndices
}

// GetAddRecordOpt creates a AddRecordOpt.
func (opt *UpdateRecordOpt) GetAddRecordOpt() *AddRecordOpt {
return &AddRecordOpt{commonMutateOpt: opt.commonMutateOpt}
Expand All @@ -179,57 +204,57 @@ func (opt *UpdateRecordOpt) GetCreateIdxOpt() *CreateIdxOpt {

// UpdateRecordOption is defined for the UpdateRecord() method of the Table interface.
type UpdateRecordOption interface {
ApplyUpdateRecordOpt(*UpdateRecordOpt)
applyUpdateRecordOpt(*UpdateRecordOpt)
}

// CommonMutateOptFunc is a function to provide common options for mutating a table.
type CommonMutateOptFunc func(*commonMutateOpt)

// ApplyAddRecordOpt implements the AddRecordOption interface.
func (f CommonMutateOptFunc) ApplyAddRecordOpt(opt *AddRecordOpt) {
func (f CommonMutateOptFunc) applyAddRecordOpt(opt *AddRecordOpt) {
f(&opt.commonMutateOpt)
}

// ApplyUpdateRecordOpt implements the UpdateRecordOption interface.
func (f CommonMutateOptFunc) ApplyUpdateRecordOpt(opt *UpdateRecordOpt) {
func (f CommonMutateOptFunc) applyUpdateRecordOpt(opt *UpdateRecordOpt) {
f(&opt.commonMutateOpt)
}

// ApplyCreateIdxOpt implements the CreateIdxOption interface.
func (f CommonMutateOptFunc) ApplyCreateIdxOpt(opt *CreateIdxOpt) {
func (f CommonMutateOptFunc) applyCreateIdxOpt(opt *CreateIdxOpt) {
f(&opt.commonMutateOpt)
}

// WithCtx returns a CommonMutateOptFunc.
// This option is used to pass context.Context.
func WithCtx(ctx context.Context) CommonMutateOptFunc {
return func(opt *commonMutateOpt) {
opt.Ctx = ctx
opt.ctx = ctx
}
}

// WithReserveAutoIDHint tells the AddRecord operation to reserve a batch of auto ID in the stmtctx.
type WithReserveAutoIDHint int

// ApplyAddRecordOpt implements the AddRecordOption interface.
func (n WithReserveAutoIDHint) ApplyAddRecordOpt(opt *AddRecordOpt) {
opt.ReserveAutoID = int(n)
func (n WithReserveAutoIDHint) applyAddRecordOpt(opt *AddRecordOpt) {
opt.reserveAutoID = int(n)
}

// IsUpdate is a defined value for AddRecordOptFunc.
var IsUpdate AddRecordOption = isUpdate{}

type isUpdate struct{}

func (i isUpdate) ApplyAddRecordOpt(opt *AddRecordOpt) {
opt.IsUpdate = true
func (i isUpdate) applyAddRecordOpt(opt *AddRecordOpt) {
opt.isUpdate = true
}

// skipWriteUntouchedIndices implements UpdateRecordOption.
type skipWriteUntouchedIndices struct{}

func (skipWriteUntouchedIndices) ApplyUpdateRecordOpt(opt *UpdateRecordOpt) {
opt.SkipWriteUntouchedIndices = true
func (skipWriteUntouchedIndices) applyUpdateRecordOpt(opt *UpdateRecordOpt) {
opt.skipWriteUntouchedIndices = true
}

// SkipWriteUntouchedIndices is an option to skip write untouched options when updating a record.
Expand Down Expand Up @@ -257,18 +282,18 @@ const (
)

// ApplyAddRecordOpt implements the AddRecordOption interface.
func (m DupKeyCheckMode) ApplyAddRecordOpt(opt *AddRecordOpt) {
opt.DupKeyCheck = m
func (m DupKeyCheckMode) applyAddRecordOpt(opt *AddRecordOpt) {
opt.dupKeyCheck = m
}

// ApplyUpdateRecordOpt implements the UpdateRecordOption interface.
func (m DupKeyCheckMode) ApplyUpdateRecordOpt(opt *UpdateRecordOpt) {
opt.DupKeyCheck = m
func (m DupKeyCheckMode) applyUpdateRecordOpt(opt *UpdateRecordOpt) {
opt.dupKeyCheck = m
}

// ApplyCreateIdxOpt implements the CreateIdxOption interface.
func (m DupKeyCheckMode) ApplyCreateIdxOpt(opt *CreateIdxOpt) {
opt.DupKeyCheck = m
func (m DupKeyCheckMode) applyCreateIdxOpt(opt *CreateIdxOpt) {
opt.dupKeyCheck = m
}

type columnAPI interface {
Expand Down
20 changes: 10 additions & 10 deletions pkg/table/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,29 @@ func TestOptions(t *testing.T) {
// NewAddRecordOpt with options
addOpt = NewAddRecordOpt(WithCtx(ctx), IsUpdate, WithReserveAutoIDHint(12))
require.Equal(t, AddRecordOpt{
commonMutateOpt: commonMutateOpt{Ctx: ctx},
IsUpdate: true,
ReserveAutoID: 12,
commonMutateOpt: commonMutateOpt{ctx: ctx},
isUpdate: true,
reserveAutoID: 12,
}, *addOpt)
require.Equal(t, CreateIdxOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *(addOpt.GetCreateIdxOpt()))
require.Equal(t, CreateIdxOpt{commonMutateOpt: commonMutateOpt{ctx: ctx}}, *(addOpt.GetCreateIdxOpt()))
// NewUpdateRecordOpt without option
updateOpt := NewUpdateRecordOpt()
require.Equal(t, UpdateRecordOpt{}, *updateOpt)
require.Equal(t, AddRecordOpt{}, *(updateOpt.GetAddRecordOpt()))
require.Equal(t, CreateIdxOpt{}, *(updateOpt.GetCreateIdxOpt()))
// NewUpdateRecordOpt with options
updateOpt = NewUpdateRecordOpt(WithCtx(ctx))
require.Equal(t, UpdateRecordOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *updateOpt)
require.Equal(t, AddRecordOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *(updateOpt.GetAddRecordOpt()))
require.Equal(t, CreateIdxOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *(updateOpt.GetCreateIdxOpt()))
require.Equal(t, UpdateRecordOpt{commonMutateOpt: commonMutateOpt{ctx: ctx}}, *updateOpt)
require.Equal(t, AddRecordOpt{commonMutateOpt: commonMutateOpt{ctx: ctx}}, *(updateOpt.GetAddRecordOpt()))
require.Equal(t, CreateIdxOpt{commonMutateOpt: commonMutateOpt{ctx: ctx}}, *(updateOpt.GetCreateIdxOpt()))
// NewCreateIdxOpt without option
createIdxOpt := NewCreateIdxOpt()
require.Equal(t, CreateIdxOpt{}, *createIdxOpt)
// NewCreateIdxOpt with options
createIdxOpt = NewCreateIdxOpt(WithCtx(ctx), WithIgnoreAssertion, FromBackfill)
require.Equal(t, CreateIdxOpt{
commonMutateOpt: commonMutateOpt{Ctx: ctx},
IgnoreAssertion: true,
FromBackFill: true,
commonMutateOpt: commonMutateOpt{ctx: ctx},
ignoreAssertion: true,
fromBackFill: true,
}, *createIdxOpt)
}
14 changes: 7 additions & 7 deletions pkg/table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
txn.CacheTableInfo(c.phyTblID, c.tblInfo)
}
indexedValues := c.getIndexedValue(indexedValue)
ctx := opt.Ctx
ctx := opt.Ctx()
if ctx != nil {
var r tracing.Region
r, ctx = tracing.StartRegionEx(ctx, "index.Create")
Expand All @@ -178,7 +178,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
}
vars := sctx.GetSessionVars()
writeBufs := sctx.GetMutateBuffers().GetWriteStmtBufs()
skipCheck := opt.DupKeyCheck == table.DupKeyCheckSkip
skipCheck := opt.DupKeyCheck() == table.DupKeyCheckSkip
evalCtx := sctx.GetExprCtx().GetEvalCtx()
loc, ec := evalCtx.Location(), evalCtx.ErrCtx()
for _, value := range indexedValues {
Expand All @@ -192,7 +192,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
keyVer byte
keyIsTempIdxKey bool
)
if !opt.FromBackFill {
if !opt.FromBackFill() {
key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key)
if keyVer == TempIndexKeyTypeBackfill || keyVer == TempIndexKeyTypeDelete {
key, tempKey = tempKey, nil
Expand Down Expand Up @@ -246,7 +246,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
return nil, err
}

ignoreAssertion := opt.IgnoreAssertion || c.idxInfo.State != model.StatePublic
ignoreAssertion := opt.IgnoreAssertion() || c.idxInfo.State != model.StatePublic

if !distinct || skipCheck || untouched {
val := idxVal
Expand All @@ -272,7 +272,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
}
}
if !ignoreAssertion && !untouched {
if opt.DupKeyCheck == table.DupKeyCheckLazy && !txn.IsPessimistic() {
if opt.DupKeyCheck() == table.DupKeyCheckLazy && !txn.IsPessimistic() {
err = txn.SetAssertion(key, kv.SetAssertUnknown)
} else {
err = txn.SetAssertion(key, kv.SetAssertNotExist)
Expand All @@ -288,7 +288,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
if c.tblInfo.TempTableType != model.TempTableNone {
// Always check key for temporary table because it does not write to TiKV
value, err = txn.Get(ctx, key)
} else if opt.DupKeyCheck == table.DupKeyCheckLazy && !keyIsTempIdxKey {
} else if opt.DupKeyCheck() == table.DupKeyCheckLazy && !keyIsTempIdxKey {
// For temp index keys, we can't get the temp value from memory buffer, even if the lazy check is enabled.
// Otherwise, it may cause the temp index value to be overwritten, leading to data inconsistency.
value, err = txn.GetMemBuffer().GetLocal(ctx, key)
Expand All @@ -308,7 +308,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
// The index key value is not found or deleted.
if err != nil || len(value) == 0 || (!tempIdxVal.IsEmpty() && tempIdxVal.Current().Delete) {
val := idxVal
lazyCheck := opt.DupKeyCheck == table.DupKeyCheckLazy && err != nil
lazyCheck := opt.DupKeyCheck() == table.DupKeyCheckLazy && err != nil
if keyIsTempIdxKey {
tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: true}
val = tempVal.Encode(value)
Expand Down
Loading

0 comments on commit bf7c8b1

Please sign in to comment.