From 75be70cd4cd33eacb95df5c567158ec60dcd4bd2 Mon Sep 17 00:00:00 2001 From: disksing Date: Thu, 29 Apr 2021 02:39:56 +0800 Subject: [PATCH] store/tikv: remove use of SchemaLease transaction option in store/tikv (#24331) --- store/driver/txn/txn_driver.go | 2 ++ store/tikv/2pc.go | 8 ++++---- store/tikv/txn.go | 12 +++++++++--- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 66413e6bdef28..9a5731c14973e 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -129,6 +129,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { txn: txn.KVTxn, binInfo: val.(*binloginfo.BinlogInfo), // val cannot be other type. }) + case tikvstore.SchemaChecker: + txn.SetSchemaLeaseChecker(val.(tikv.SchemaLeaseChecker)) case tikvstore.IsolationLevel: level := getTiKVIsolationLevel(val.(kv.IsoLevel)) txn.KVTxn.GetSnapshot().SetIsolationLevel(level) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index dafd0847af797..b01e18eebd5d1 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -1252,7 +1252,8 @@ type SchemaVer interface { SchemaMetaVersion() int64 } -type schemaLeaseChecker interface { +// SchemaLeaseChecker is used to validate schema version is not changed during transaction execution. +type SchemaLeaseChecker interface { // CheckBySchemaVer checks if the schema has changed for the transaction related tables between the startSchemaVer // and the schema version at txnTS, all the related schema changes will be returned. CheckBySchemaVer(txnTS uint64, startSchemaVer SchemaVer) (*RelatedSchemaChange, error) @@ -1398,8 +1399,7 @@ func (c *twoPhaseCommitter) checkSchemaValid(ctx context.Context, checkTS uint64 err := errors.Errorf("mock check schema valid failure") failpoint.Return(nil, false, err) }) - checker, ok := c.txn.us.GetOption(kv.SchemaChecker).(schemaLeaseChecker) - if !ok { + if c.txn.schemaLeaseChecker == nil { if c.sessionID > 0 { logutil.Logger(ctx).Warn("schemaLeaseChecker is not set for this transaction", zap.Uint64("sessionID", c.sessionID), @@ -1408,7 +1408,7 @@ func (c *twoPhaseCommitter) checkSchemaValid(ctx context.Context, checkTS uint64 } return nil, false, nil } - relatedChanges, err := checker.CheckBySchemaVer(checkTS, startInfoSchema) + relatedChanges, err := c.txn.schemaLeaseChecker.CheckBySchemaVer(checkTS, startInfoSchema) if err != nil { if tryAmend && relatedChanges != nil && relatedChanges.Amendable && c.txn.schemaAmender != nil { memAmended, amendErr := c.tryAmendTxn(ctx, startInfoSchema, relatedChanges) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index d1eae7c6e2b05..67477c510cee4 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -74,9 +74,10 @@ type KVTxn struct { // commitCallback is called after current transaction gets committed commitCallback func(info string, err error) - binlog BinlogExecutor - isPessimistic bool - kvFilter KVFilter + binlog BinlogExecutor + schemaLeaseChecker SchemaLeaseChecker + isPessimistic bool + kvFilter KVFilter } func newTiKVTxn(store *KVStore, txnScope string) (*KVTxn, error) { @@ -200,6 +201,11 @@ func (txn *KVTxn) DelOption(opt int) { txn.us.DelOption(opt) } +// SetSchemaLeaseChecker sets a hook to check schema version. +func (txn *KVTxn) SetSchemaLeaseChecker(checker SchemaLeaseChecker) { + txn.schemaLeaseChecker = checker +} + // SetPessimistic indicates if the transaction should use pessimictic lock. func (txn *KVTxn) SetPessimistic(b bool) { txn.isPessimistic = b