Skip to content

Commit

Permalink
store/tikv: remove use of SchemaLease transaction option in store/tikv (
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Apr 28, 2021
1 parent 91ca4ea commit 75be70c
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 7 deletions.
2 changes: 2 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
txn: txn.KVTxn,
binInfo: val.(*binloginfo.BinlogInfo), // val cannot be other type.
})
case tikvstore.SchemaChecker:
txn.SetSchemaLeaseChecker(val.(tikv.SchemaLeaseChecker))
case tikvstore.IsolationLevel:
level := getTiKVIsolationLevel(val.(kv.IsoLevel))
txn.KVTxn.GetSnapshot().SetIsolationLevel(level)
Expand Down
8 changes: 4 additions & 4 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1252,7 +1252,8 @@ type SchemaVer interface {
SchemaMetaVersion() int64
}

type schemaLeaseChecker interface {
// SchemaLeaseChecker is used to validate schema version is not changed during transaction execution.
type SchemaLeaseChecker interface {
// CheckBySchemaVer checks if the schema has changed for the transaction related tables between the startSchemaVer
// and the schema version at txnTS, all the related schema changes will be returned.
CheckBySchemaVer(txnTS uint64, startSchemaVer SchemaVer) (*RelatedSchemaChange, error)
Expand Down Expand Up @@ -1398,8 +1399,7 @@ func (c *twoPhaseCommitter) checkSchemaValid(ctx context.Context, checkTS uint64
err := errors.Errorf("mock check schema valid failure")
failpoint.Return(nil, false, err)
})
checker, ok := c.txn.us.GetOption(kv.SchemaChecker).(schemaLeaseChecker)
if !ok {
if c.txn.schemaLeaseChecker == nil {
if c.sessionID > 0 {
logutil.Logger(ctx).Warn("schemaLeaseChecker is not set for this transaction",
zap.Uint64("sessionID", c.sessionID),
Expand All @@ -1408,7 +1408,7 @@ func (c *twoPhaseCommitter) checkSchemaValid(ctx context.Context, checkTS uint64
}
return nil, false, nil
}
relatedChanges, err := checker.CheckBySchemaVer(checkTS, startInfoSchema)
relatedChanges, err := c.txn.schemaLeaseChecker.CheckBySchemaVer(checkTS, startInfoSchema)
if err != nil {
if tryAmend && relatedChanges != nil && relatedChanges.Amendable && c.txn.schemaAmender != nil {
memAmended, amendErr := c.tryAmendTxn(ctx, startInfoSchema, relatedChanges)
Expand Down
12 changes: 9 additions & 3 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ type KVTxn struct {
// commitCallback is called after current transaction gets committed
commitCallback func(info string, err error)

binlog BinlogExecutor
isPessimistic bool
kvFilter KVFilter
binlog BinlogExecutor
schemaLeaseChecker SchemaLeaseChecker
isPessimistic bool
kvFilter KVFilter
}

func newTiKVTxn(store *KVStore, txnScope string) (*KVTxn, error) {
Expand Down Expand Up @@ -200,6 +201,11 @@ func (txn *KVTxn) DelOption(opt int) {
txn.us.DelOption(opt)
}

// SetSchemaLeaseChecker sets a hook to check schema version.
func (txn *KVTxn) SetSchemaLeaseChecker(checker SchemaLeaseChecker) {
txn.schemaLeaseChecker = checker
}

// SetPessimistic indicates if the transaction should use pessimictic lock.
func (txn *KVTxn) SetPessimistic(b bool) {
txn.isPessimistic = b
Expand Down

0 comments on commit 75be70c

Please sign in to comment.