From 7a2b3c40fe5fd672757f72d5de8f877ce1a3b3df Mon Sep 17 00:00:00 2001 From: longfangsong Date: Thu, 13 May 2021 20:59:10 +0800 Subject: [PATCH 1/5] break dependency --- kv/fault_injection.go | 4 +- kv/kv.go | 49 +------------------------ session/session.go | 12 +++--- session/txn.go | 5 ++- store/driver/tikv_driver.go | 2 +- store/helper/helper.go | 2 +- store/mockstore/mockstorage/storage.go | 2 +- store/tikv/extract_start_ts_test.go | 21 +++++------ store/tikv/kv.go | 5 +-- store/tikv/txn.go | 51 ++++++++++++++++++++++++-- util/mock/context.go | 3 +- util/mock/store.go | 3 +- 12 files changed, 81 insertions(+), 78 deletions(-) diff --git a/kv/fault_injection.go b/kv/fault_injection.go index 218ca9cbd6966..25e4e8d6c6d6c 100644 --- a/kv/fault_injection.go +++ b/kv/fault_injection.go @@ -16,6 +16,8 @@ package kv import ( "context" "sync" + + "github.com/pingcap/tidb/store/tikv" ) // InjectionConfig is used for fault injections for KV components. @@ -64,7 +66,7 @@ func (s *InjectedStore) Begin() (Transaction, error) { } // BeginWithOption creates an injected Transaction with given option. -func (s *InjectedStore) BeginWithOption(option TransactionOption) (Transaction, error) { +func (s *InjectedStore) BeginWithOption(option tikv.TransactionOption) (Transaction, error) { txn, err := s.Storage.BeginWithOption(option) return &InjectedTransaction{ Transaction: txn, diff --git a/kv/kv.go b/kv/kv.go index 1fad79d641009..9b04ff33e21c6 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/store/tikv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/memory" @@ -341,59 +342,13 @@ type Driver interface { Open(path string) (Storage, error) } -// TransactionOption indicates the option when beginning a transaction -// `TxnScope` must be set for each object -// Every other fields are optional, but currently at most one of them can be set -type TransactionOption struct { - TxnScope string - StartTS *uint64 - PrevSec *uint64 - MinStartTS *uint64 - MaxPrevSec *uint64 -} - -// DefaultTransactionOption creates a default TransactionOption, ie. Work in GlobalTxnScope and get start ts when got used -func DefaultTransactionOption() TransactionOption { - return TransactionOption{TxnScope: oracle.GlobalTxnScope} -} - -// SetMaxPrevSec set maxPrevSec -func (to TransactionOption) SetMaxPrevSec(maxPrevSec uint64) TransactionOption { - to.MaxPrevSec = &maxPrevSec - return to -} - -// SetMinStartTS set minStartTS -func (to TransactionOption) SetMinStartTS(minStartTS uint64) TransactionOption { - to.MinStartTS = &minStartTS - return to -} - -// SetStartTs set startTS -func (to TransactionOption) SetStartTs(startTS uint64) TransactionOption { - to.StartTS = &startTS - return to -} - -// SetPrevSec set prevSec -func (to TransactionOption) SetPrevSec(prevSec uint64) TransactionOption { - to.PrevSec = &prevSec - return to -} - -// SetTxnScope set txnScope -func (to TransactionOption) SetTxnScope(txnScope string) TransactionOption { - to.TxnScope = txnScope - return to -} - // Storage defines the interface for storage. // Isolation should be at least SI(SNAPSHOT ISOLATION) type Storage interface { // Begin a global transaction Begin() (Transaction, error) // Begin a transaction with given option - BeginWithOption(option TransactionOption) (Transaction, error) + BeginWithOption(option tikv.TransactionOption) (Transaction, error) // GetSnapshot gets a snapshot that is able to read any data which data is <= ver. // if ver is MaxVersion or > current max committed version, we will use current version for this snapshot. GetSnapshot(ver Version) Snapshot diff --git a/session/session.go b/session/session.go index 8fc7c4c37eac8..12b277ca8edbe 100644 --- a/session/session.go +++ b/session/session.go @@ -1949,7 +1949,7 @@ func (s *session) NewTxn(ctx context.Context) error { zap.String("txnScope", txnScope)) } - txn, err := s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope())) + txn, err := s.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope())) if err != nil { return err } @@ -2746,7 +2746,7 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error { } // no need to get txn from txnFutureCh since txn should init with startTs - txn, err := s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(s.GetSessionVars().CheckAndGetTxnScope()).SetStartTs(startTS)) + txn, err := s.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(s.GetSessionVars().CheckAndGetTxnScope()).SetStartTs(startTS)) if err != nil { return err } @@ -2779,22 +2779,22 @@ func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionc txnScope := s.GetSessionVars().CheckAndGetTxnScope() switch option.Mode { case ast.TimestampBoundReadTimestamp: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) + txn, err = s.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) if err != nil { return err } case ast.TimestampBoundExactStaleness: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec)) + txn, err = s.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec)) if err != nil { return err } case ast.TimestampBoundMaxStaleness: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec)) + txn, err = s.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec)) if err != nil { return err } case ast.TimestampBoundMinReadTimestamp: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS)) + txn, err = s.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS)) if err != nil { return err } diff --git a/session/txn.go b/session/txn.go index 133cafb976aae..aac693822b0e4 100644 --- a/session/txn.go +++ b/session/txn.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" + "github.com/pingcap/tidb/store/tikv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/tablecodec" @@ -436,14 +437,14 @@ type txnFuture struct { func (tf *txnFuture) wait() (kv.Transaction, error) { startTS, err := tf.future.Wait() if err == nil { - return tf.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(tf.txnScope).SetStartTs(startTS)) + return tf.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(tf.txnScope).SetStartTs(startTS)) } else if config.GetGlobalConfig().Store == "unistore" { return nil, err } logutil.BgLogger().Warn("wait tso failed", zap.Error(err)) // It would retry get timestamp. - return tf.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(tf.txnScope)) + return tf.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(tf.txnScope)) } func (s *session) getTxnFuture(ctx context.Context) *txnFuture { diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index cc0f217280f31..2b847654936c8 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -307,7 +307,7 @@ func (s *tikvStore) Begin() (kv.Transaction, error) { } // BeginWithOption begins a transaction with given option -func (s *tikvStore) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) { +func (s *tikvStore) BeginWithOption(option tikv.TransactionOption) (kv.Transaction, error) { txn, err := s.KVStore.BeginWithOption(option) if err != nil { return nil, derr.ToTiDBErr(err) diff --git a/store/helper/helper.go b/store/helper/helper.go index e96ad4ae21851..337b398468abc 100644 --- a/store/helper/helper.go +++ b/store/helper/helper.go @@ -48,7 +48,7 @@ import ( // Methods copied from kv.Storage and tikv.Storage due to limitation of go1.13. type Storage interface { Begin() (kv.Transaction, error) - BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) + BeginWithOption(option tikv.TransactionOption) (kv.Transaction, error) GetSnapshot(ver kv.Version) kv.Snapshot GetClient() kv.Client GetMPPClient() kv.MPPClient diff --git a/store/mockstore/mockstorage/storage.go b/store/mockstore/mockstorage/storage.go index 36ded5e434817..2080a9dfe65c4 100644 --- a/store/mockstore/mockstorage/storage.go +++ b/store/mockstore/mockstorage/storage.go @@ -83,7 +83,7 @@ func (s *mockStorage) ShowStatus(ctx context.Context, key string) (interface{}, } // BeginWithOption begins a transaction with given option -func (s *mockStorage) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) { +func (s *mockStorage) BeginWithOption(option tikv.TransactionOption) (kv.Transaction, error) { return newTiKVTxn(s.KVStore.BeginWithOption(option)) } diff --git a/store/tikv/extract_start_ts_test.go b/store/tikv/extract_start_ts_test.go index b392ca365cde8..22d1b04a60f02 100644 --- a/store/tikv/extract_start_ts_test.go +++ b/store/tikv/extract_start_ts_test.go @@ -17,7 +17,6 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/mockstore/unistore" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/tikvrpc" @@ -69,22 +68,22 @@ func (s *extractStartTsSuite) TestExtractStartTs(c *C) { cases := []struct { expectedTS uint64 - option kv.TransactionOption + option TransactionOption }{ // StartTS setted - {100, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: &i, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, + {100, TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: &i, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, // PrevSec setted - {200, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: &i, MinStartTS: nil, MaxPrevSec: nil}}, + {200, TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: &i, MinStartTS: nil, MaxPrevSec: nil}}, // MinStartTS setted, global - {101, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, + {101, TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, // MinStartTS setted, local - {102, kv.TransactionOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, + {102, TransactionOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, // MaxPrevSec setted // however we need to add more cases to check the behavior when it fall backs to MinStartTS setted // see `TestMaxPrevSecFallback` - {200, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, + {200, TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, // nothing setted - {300, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, + {300, TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, } for _, cs := range cases { expected := cs.expectedTS @@ -102,10 +101,10 @@ func (s *extractStartTsSuite) TestMaxPrevSecFallback(c *C) { i := uint64(100) cases := []struct { expectedTS uint64 - option kv.TransactionOption + option TransactionOption }{ - {0x8000000000000001, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, - {0x8000000000000002, kv.TransactionOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, + {0x8000000000000001, TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, + {0x8000000000000002, TransactionOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, } for _, cs := range cases { result, _ := extractStartTs(s.store, cs.option) diff --git a/store/tikv/kv.go b/store/tikv/kv.go index f61db4168ef7d..a465493694106 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" - tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/config" tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/kv" @@ -184,11 +183,11 @@ func (s *KVStore) runSafePointChecker() { // Begin a global transaction. func (s *KVStore) Begin() (*KVTxn, error) { - return s.BeginWithOption(tidbkv.DefaultTransactionOption()) + return s.BeginWithOption(DefaultTransactionOption()) } // BeginWithOption begins a transaction with the given TransactionOption -func (s *KVStore) BeginWithOption(options tidbkv.TransactionOption) (*KVTxn, error) { +func (s *KVStore) BeginWithOption(options TransactionOption) (*KVTxn, error) { return newTiKVTxnWithOptions(s, options) } diff --git a/store/tikv/txn.go b/store/tikv/txn.go index cba091cbdc8da..2c3ca3ce4200a 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/kv" tikverr "github.com/pingcap/tidb/store/tikv/error" tikv "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/logutil" @@ -55,6 +54,52 @@ type SchemaAmender interface { AmendTxn(ctx context.Context, startInfoSchema SchemaVer, change *RelatedSchemaChange, mutations CommitterMutations) (CommitterMutations, error) } +// TransactionOption indicates the option when beginning a transaction +// `TxnScope` must be set for each object +// Every other fields are optional, but currently at most one of them can be set +type TransactionOption struct { + TxnScope string + StartTS *uint64 + PrevSec *uint64 + MinStartTS *uint64 + MaxPrevSec *uint64 +} + +// DefaultTransactionOption creates a default TransactionOption, ie. Work in GlobalTxnScope and get start ts when got used +func DefaultTransactionOption() TransactionOption { + return TransactionOption{TxnScope: oracle.GlobalTxnScope} +} + +// SetMaxPrevSec set maxPrevSec +func (to TransactionOption) SetMaxPrevSec(maxPrevSec uint64) TransactionOption { + to.MaxPrevSec = &maxPrevSec + return to +} + +// SetMinStartTS set minStartTS +func (to TransactionOption) SetMinStartTS(minStartTS uint64) TransactionOption { + to.MinStartTS = &minStartTS + return to +} + +// SetStartTs set startTS +func (to TransactionOption) SetStartTs(startTS uint64) TransactionOption { + to.StartTS = &startTS + return to +} + +// SetPrevSec set prevSec +func (to TransactionOption) SetPrevSec(prevSec uint64) TransactionOption { + to.PrevSec = &prevSec + return to +} + +// SetTxnScope set txnScope +func (to TransactionOption) SetTxnScope(txnScope string) TransactionOption { + to.TxnScope = txnScope + return to +} + // KVTxn contains methods to interact with a TiKV transaction. type KVTxn struct { snapshot *KVSnapshot @@ -90,7 +135,7 @@ type KVTxn struct { kvFilter KVFilter } -func extractStartTs(store *KVStore, options kv.TransactionOption) (uint64, error) { +func extractStartTs(store *KVStore, options TransactionOption) (uint64, error) { var startTs uint64 var err error if options.StartTS != nil { @@ -137,7 +182,7 @@ func extractStartTs(store *KVStore, options kv.TransactionOption) (uint64, error return startTs, err } -func newTiKVTxnWithOptions(store *KVStore, options kv.TransactionOption) (*KVTxn, error) { +func newTiKVTxnWithOptions(store *KVStore, options TransactionOption) (*KVTxn, error) { if options.TxnScope == "" { options.TxnScope = oracle.GlobalTxnScope } diff --git a/util/mock/context.go b/util/mock/context.go index 4b329e0ff1f55..e7739417335f4 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/disk" @@ -212,7 +213,7 @@ func (c *Context) InitTxnWithStartTS(startTS uint64) error { return nil } if c.Store != nil { - txn, err := c.Store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(oracle.GlobalTxnScope).SetStartTs(startTS)) + txn, err := c.Store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(oracle.GlobalTxnScope).SetStartTs(startTS)) if err != nil { return errors.Trace(err) } diff --git a/util/mock/store.go b/util/mock/store.go index 804f3d6a3f2d3..acde0f9836309 100644 --- a/util/mock/store.go +++ b/util/mock/store.go @@ -17,6 +17,7 @@ import ( "context" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" ) @@ -38,7 +39,7 @@ func (s *Store) GetOracle() oracle.Oracle { return nil } func (s *Store) Begin() (kv.Transaction, error) { return nil, nil } // BeginWithOption implements kv.Storage interface. -func (s *Store) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) { +func (s *Store) BeginWithOption(option tikv.TransactionOption) (kv.Transaction, error) { return s.Begin() } From d5f3e0864235d8e465af1067240cb259366ed7f1 Mon Sep 17 00:00:00 2001 From: longfangsong Date: Fri, 14 May 2021 12:07:01 +0800 Subject: [PATCH 2/5] Move unit tests --- kv/fault_injection_test.go | 3 +- kv/interface_mock_test.go | 3 +- store/tikv/kv.go | 18 ++++- store/tikv/tests/2pc_test.go | 5 +- .../tikv/{ => tests}/extract_start_ts_test.go | 67 +++++++++---------- store/tikv/txn.go | 31 ++++----- 6 files changed, 68 insertions(+), 59 deletions(-) mode change 100644 => 100755 store/tikv/kv.go rename store/tikv/{ => tests}/extract_start_ts_test.go (56%) mode change 100644 => 100755 store/tikv/txn.go diff --git a/kv/fault_injection_test.go b/kv/fault_injection_test.go index 33b6535214b2c..39279a7490987 100644 --- a/kv/fault_injection_test.go +++ b/kv/fault_injection_test.go @@ -19,6 +19,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" ) @@ -35,7 +36,7 @@ func (s testFaultInjectionSuite) TestFaultInjectionBasic(c *C) { storage := NewInjectedStore(newMockStorage(), &cfg) txn, err := storage.Begin() c.Assert(err, IsNil) - _, err = storage.BeginWithOption(TransactionOption{}.SetTxnScope(oracle.GlobalTxnScope).SetStartTs(0)) + _, err = storage.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(oracle.GlobalTxnScope).SetStartTs(0)) c.Assert(err, IsNil) ver := Version{Ver: 1} snap := storage.GetSnapshot(ver) diff --git a/kv/interface_mock_test.go b/kv/interface_mock_test.go index 2388c4f48b9f3..a5deba32e465a 100644 --- a/kv/interface_mock_test.go +++ b/kv/interface_mock_test.go @@ -17,6 +17,7 @@ import ( "context" "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" ) @@ -158,7 +159,7 @@ func (s *mockStorage) Begin() (Transaction, error) { return newMockTxn(), nil } -func (s *mockStorage) BeginWithOption(option TransactionOption) (Transaction, error) { +func (s *mockStorage) BeginWithOption(option tikv.TransactionOption) (Transaction, error) { return newMockTxn(), nil } diff --git a/store/tikv/kv.go b/store/tikv/kv.go old mode 100644 new mode 100755 index a465493694106..13317b3feda40 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/store/tikv/config" tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/kv" @@ -149,6 +150,18 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client return store, nil } +// SetRegionCacheStore is used to set a store in region cache, for testing only +func (s *KVStore) SetRegionCacheStore(id uint64, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) { + s.regionCache.storeMu.Lock() + defer s.regionCache.storeMu.Unlock() + s.regionCache.storeMu.stores[id] = &Store{ + storeID: id, + storeType: storeType, + state: state, + labels: labels, + } +} + // EnableTxnLocalLatches enables txn latch. It should be called before using // the store to serve any requests. func (s *KVStore) EnableTxnLocalLatches(size uint) { @@ -358,7 +371,8 @@ func (s *KVStore) getSafeTS(storeID uint64) uint64 { return safeTS.(uint64) } -func (s *KVStore) setSafeTS(storeID, safeTS uint64) { +// SetSafeTS sets safeTs for store storeID, export for testing +func (s *KVStore) SetSafeTS(storeID, safeTS uint64) { s.safeTSMap.Store(storeID, safeTS) } @@ -416,7 +430,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { return } safeTSResp := resp.Resp.(*kvrpcpb.StoreSafeTSResponse) - s.setSafeTS(storeID, safeTSResp.GetSafeTs()) + s.SetSafeTS(storeID, safeTSResp.GetSafeTs()) }(ctx, wg, storeID, storeAddr) } wg.Wait() diff --git a/store/tikv/tests/2pc_test.go b/store/tikv/tests/2pc_test.go index 5589752043b2b..3165509af8f03 100644 --- a/store/tikv/tests/2pc_test.go +++ b/store/tikv/tests/2pc_test.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" - tidbkv "github.com/pingcap/tidb/kv" drivertxn "github.com/pingcap/tidb/store/driver/txn" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/config" @@ -603,12 +602,12 @@ func (s *testCommitterSuite) TestRejectCommitTS(c *C) { // Use max.Uint64 to read the data and success. // That means the final commitTS > startTS+2, it's not the one we provide. // So we cover the rety commitTS logic. - txn1, err := s.store.BeginWithOption(tidbkv.DefaultTransactionOption().SetStartTs(committer.GetStartTS() + 2)) + txn1, err := s.store.BeginWithOption(tikv.DefaultTransactionOption().SetStartTs(committer.GetStartTS() + 2)) c.Assert(err, IsNil) _, err = txn1.Get(bo.GetCtx(), []byte("x")) c.Assert(tikverr.IsErrNotFound(err), IsTrue) - txn2, err := s.store.BeginWithOption(tidbkv.DefaultTransactionOption().SetStartTs(math.MaxUint64)) + txn2, err := s.store.BeginWithOption(tikv.DefaultTransactionOption().SetStartTs(math.MaxUint64)) c.Assert(err, IsNil) val, err := txn2.Get(bo.GetCtx(), []byte("x")) c.Assert(err, IsNil) diff --git a/store/tikv/extract_start_ts_test.go b/store/tikv/tests/extract_start_ts_test.go similarity index 56% rename from store/tikv/extract_start_ts_test.go rename to store/tikv/tests/extract_start_ts_test.go index 22d1b04a60f02..4ad1c207da755 100644 --- a/store/tikv/extract_start_ts_test.go +++ b/store/tikv/tests/extract_start_ts_test.go @@ -11,19 +11,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tikv +package tikv_test import ( . "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/store/mockstore/unistore" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/tikvrpc" ) type extractStartTsSuite struct { - store *KVStore + store *tikv.KVStore } var _ = SerialSuites(&extractStartTsSuite{}) @@ -32,30 +33,22 @@ func (s *extractStartTsSuite) SetUpTest(c *C) { client, pdClient, cluster, err := unistore.New("") c.Assert(err, IsNil) unistore.BootstrapWithSingleStore(cluster) - store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0) + store, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0) c.Assert(err, IsNil) - store.regionCache.storeMu.stores[2] = &Store{ - storeID: 2, - storeType: tikvrpc.TiKV, - state: uint64(resolved), - labels: []*metapb.StoreLabel{ - { - Key: DCLabelKey, - Value: oracle.LocalTxnScope, - }, + store.SetRegionCacheStore(2, tikvrpc.TiKV, 1, []*metapb.StoreLabel{ + { + Key: tikv.DCLabelKey, + Value: oracle.LocalTxnScope, }, - } - store.regionCache.storeMu.stores[3] = &Store{ - storeID: 3, - storeType: tikvrpc.TiKV, - state: uint64(resolved), - labels: []*metapb.StoreLabel{{ - Key: DCLabelKey, + }) + store.SetRegionCacheStore(3, tikvrpc.TiKV, 1, []*metapb.StoreLabel{ + { + Key: tikv.DCLabelKey, Value: "Some Random Label", - }}, - } - store.setSafeTS(2, 102) - store.setSafeTS(3, 101) + }, + }) + store.SetSafeTS(2, 102) + store.SetSafeTS(3, 101) s.store = store } @@ -68,26 +61,26 @@ func (s *extractStartTsSuite) TestExtractStartTs(c *C) { cases := []struct { expectedTS uint64 - option TransactionOption + option tikv.TransactionOption }{ // StartTS setted - {100, TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: &i, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, + {100, tikv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: &i, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, // PrevSec setted - {200, TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: &i, MinStartTS: nil, MaxPrevSec: nil}}, + {200, tikv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: &i, MinStartTS: nil, MaxPrevSec: nil}}, // MinStartTS setted, global - {101, TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, + {101, tikv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, // MinStartTS setted, local - {102, TransactionOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, + {102, tikv.TransactionOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, // MaxPrevSec setted // however we need to add more cases to check the behavior when it fall backs to MinStartTS setted // see `TestMaxPrevSecFallback` - {200, TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, + {200, tikv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, // nothing setted - {300, TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, + {300, tikv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, } for _, cs := range cases { expected := cs.expectedTS - result, _ := extractStartTs(s.store, cs.option) + result, _ := tikv.ExtractStartTs(s.store, cs.option) c.Assert(result, Equals, expected) } @@ -96,18 +89,18 @@ func (s *extractStartTsSuite) TestExtractStartTs(c *C) { } func (s *extractStartTsSuite) TestMaxPrevSecFallback(c *C) { - s.store.setSafeTS(2, 0x8000000000000002) - s.store.setSafeTS(3, 0x8000000000000001) + s.store.SetSafeTS(2, 0x8000000000000002) + s.store.SetSafeTS(3, 0x8000000000000001) i := uint64(100) cases := []struct { expectedTS uint64 - option TransactionOption + option tikv.TransactionOption }{ - {0x8000000000000001, TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, - {0x8000000000000002, TransactionOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, + {0x8000000000000001, tikv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, + {0x8000000000000002, tikv.TransactionOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, } for _, cs := range cases { - result, _ := extractStartTs(s.store, cs.option) + result, _ := tikv.ExtractStartTs(s.store, cs.option) c.Assert(result, Equals, cs.expectedTS) } } diff --git a/store/tikv/txn.go b/store/tikv/txn.go old mode 100644 new mode 100755 index 2c3ca3ce4200a..8bf3d3b0f31f5 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -135,23 +135,24 @@ type KVTxn struct { kvFilter KVFilter } -func extractStartTs(store *KVStore, options TransactionOption) (uint64, error) { +// ExtractStartTs use `option` to get the proper startTS for a transaction +func ExtractStartTs(store *KVStore, option TransactionOption) (uint64, error) { var startTs uint64 var err error - if options.StartTS != nil { - startTs = *options.StartTS - } else if options.PrevSec != nil { + if option.StartTS != nil { + startTs = *option.StartTS + } else if option.PrevSec != nil { bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) - startTs, err = store.getStalenessTimestamp(bo, options.TxnScope, *options.PrevSec) - } else if options.MinStartTS != nil { + startTs, err = store.getStalenessTimestamp(bo, option.TxnScope, *option.PrevSec) + } else if option.MinStartTS != nil { stores := make([]*Store, 0) allStores := store.regionCache.getStoresByType(tikvrpc.TiKV) - if options.TxnScope != oracle.GlobalTxnScope { + if option.TxnScope != oracle.GlobalTxnScope { for _, store := range allStores { if store.IsLabelsMatch([]*metapb.StoreLabel{ { Key: DCLabelKey, - Value: options.TxnScope, + Value: option.TxnScope, }, }) { stores = append(stores, store) @@ -161,23 +162,23 @@ func extractStartTs(store *KVStore, options TransactionOption) (uint64, error) { stores = allStores } safeTS := store.getMinSafeTSByStores(stores) - startTs = *options.MinStartTS + startTs = *option.MinStartTS // If the safeTS is larger than the minStartTS, we will use safeTS as StartTS, otherwise we will use // minStartTS directly. if oracle.CompareTS(startTs, safeTS) < 0 { startTs = safeTS } - } else if options.MaxPrevSec != nil { + } else if option.MaxPrevSec != nil { bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) - minStartTS, err := store.getStalenessTimestamp(bo, options.TxnScope, *options.MaxPrevSec) + minStartTS, err := store.getStalenessTimestamp(bo, option.TxnScope, *option.MaxPrevSec) if err != nil { return 0, errors.Trace(err) } - options.MinStartTS = &minStartTS - return extractStartTs(store, options) + option.MinStartTS = &minStartTS + return ExtractStartTs(store, option) } else { bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) - startTs, err = store.getTimestampWithRetry(bo, options.TxnScope) + startTs, err = store.getTimestampWithRetry(bo, option.TxnScope) } return startTs, err } @@ -186,7 +187,7 @@ func newTiKVTxnWithOptions(store *KVStore, options TransactionOption) (*KVTxn, e if options.TxnScope == "" { options.TxnScope = oracle.GlobalTxnScope } - startTs, err := extractStartTs(store, options) + startTs, err := ExtractStartTs(store, options) if err != nil { return nil, errors.Trace(err) } From 72cb181cdf45d60652c02e32a3ddb518f8775634 Mon Sep 17 00:00:00 2001 From: longfangsong Date: Fri, 14 May 2021 18:54:08 +0800 Subject: [PATCH 3/5] Address some comments --- store/tikv/kv.go | 19 +++---------------- store/tikv/test_probe.go | 18 ++++++++++++++++++ store/tikv/tests/extract_start_ts_test.go | 16 +++++++++------- store/tikv/txn.go | 0 4 files changed, 30 insertions(+), 23 deletions(-) mode change 100755 => 100644 store/tikv/kv.go mode change 100755 => 100644 store/tikv/txn.go diff --git a/store/tikv/kv.go b/store/tikv/kv.go old mode 100755 new mode 100644 index 13317b3feda40..5e46cb530d110 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/store/tikv/config" tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/kv" @@ -150,18 +149,6 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client return store, nil } -// SetRegionCacheStore is used to set a store in region cache, for testing only -func (s *KVStore) SetRegionCacheStore(id uint64, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) { - s.regionCache.storeMu.Lock() - defer s.regionCache.storeMu.Unlock() - s.regionCache.storeMu.stores[id] = &Store{ - storeID: id, - storeType: storeType, - state: state, - labels: labels, - } -} - // EnableTxnLocalLatches enables txn latch. It should be called before using // the store to serve any requests. func (s *KVStore) EnableTxnLocalLatches(size uint) { @@ -371,8 +358,8 @@ func (s *KVStore) getSafeTS(storeID uint64) uint64 { return safeTS.(uint64) } -// SetSafeTS sets safeTs for store storeID, export for testing -func (s *KVStore) SetSafeTS(storeID, safeTS uint64) { +// setSafeTS sets safeTs for store storeID, export for testing +func (s *KVStore) setSafeTS(storeID, safeTS uint64) { s.safeTSMap.Store(storeID, safeTS) } @@ -430,7 +417,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { return } safeTSResp := resp.Resp.(*kvrpcpb.StoreSafeTSResponse) - s.SetSafeTS(storeID, safeTSResp.GetSafeTs()) + s.setSafeTS(storeID, safeTSResp.GetSafeTs()) }(ctx, wg, storeID, storeAddr) } wg.Wait() diff --git a/store/tikv/test_probe.go b/store/tikv/test_probe.go index 1a8dc5062218d..3e80e6310fe4b 100644 --- a/store/tikv/test_probe.go +++ b/store/tikv/test_probe.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/store/tikv/unionstore" @@ -81,6 +82,23 @@ func (s StoreProbe) SaveSafePoint(v uint64) error { return saveSafePoint(s.GetSafePointKV(), v) } +// SetRegionCacheStore is used to set a store in region cache, for testing only +func (s StoreProbe) SetRegionCacheStore(id uint64, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) { + s.regionCache.storeMu.Lock() + defer s.regionCache.storeMu.Unlock() + s.regionCache.storeMu.stores[id] = &Store{ + storeID: id, + storeType: storeType, + state: state, + labels: labels, + } +} + +// SetSafeTS is used to set safeTS for the store with `storeID` +func (s StoreProbe) SetSafeTS(storeID, safeTS uint64) { + s.setSafeTS(storeID, safeTS) +} + // TxnProbe wraps a txn and exports internal states for testing purpose. type TxnProbe struct { *KVTxn diff --git a/store/tikv/tests/extract_start_ts_test.go b/store/tikv/tests/extract_start_ts_test.go index 4ad1c207da755..606e97afbc50a 100644 --- a/store/tikv/tests/extract_start_ts_test.go +++ b/store/tikv/tests/extract_start_ts_test.go @@ -35,21 +35,22 @@ func (s *extractStartTsSuite) SetUpTest(c *C) { unistore.BootstrapWithSingleStore(cluster) store, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0) c.Assert(err, IsNil) - store.SetRegionCacheStore(2, tikvrpc.TiKV, 1, []*metapb.StoreLabel{ + probe := tikv.StoreProbe{KVStore: store} + probe.SetRegionCacheStore(2, tikvrpc.TiKV, 1, []*metapb.StoreLabel{ { Key: tikv.DCLabelKey, Value: oracle.LocalTxnScope, }, }) - store.SetRegionCacheStore(3, tikvrpc.TiKV, 1, []*metapb.StoreLabel{ + probe.SetRegionCacheStore(3, tikvrpc.TiKV, 1, []*metapb.StoreLabel{ { Key: tikv.DCLabelKey, Value: "Some Random Label", }, }) - store.SetSafeTS(2, 102) - store.SetSafeTS(3, 101) - s.store = store + probe.SetSafeTS(2, 102) + probe.SetSafeTS(3, 101) + s.store = probe.KVStore } func (s *extractStartTsSuite) TestExtractStartTs(c *C) { @@ -89,8 +90,9 @@ func (s *extractStartTsSuite) TestExtractStartTs(c *C) { } func (s *extractStartTsSuite) TestMaxPrevSecFallback(c *C) { - s.store.SetSafeTS(2, 0x8000000000000002) - s.store.SetSafeTS(3, 0x8000000000000001) + probe := tikv.StoreProbe{KVStore: s.store} + probe.SetSafeTS(2, 0x8000000000000002) + probe.SetSafeTS(3, 0x8000000000000001) i := uint64(100) cases := []struct { expectedTS uint64 diff --git a/store/tikv/txn.go b/store/tikv/txn.go old mode 100755 new mode 100644 From ee60d0efba2499126578c1dea5b33a36e174747f Mon Sep 17 00:00:00 2001 From: longfangsong Date: Fri, 14 May 2021 18:56:39 +0800 Subject: [PATCH 4/5] Update comments --- store/tikv/txn.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 8bf3d3b0f31f5..ed78d2ae5244c 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -70,31 +70,31 @@ func DefaultTransactionOption() TransactionOption { return TransactionOption{TxnScope: oracle.GlobalTxnScope} } -// SetMaxPrevSec set maxPrevSec +// SetMaxPrevSec returns a new TransactionOption with MaxPrevSec set to maxPrevSec func (to TransactionOption) SetMaxPrevSec(maxPrevSec uint64) TransactionOption { to.MaxPrevSec = &maxPrevSec return to } -// SetMinStartTS set minStartTS +// SetMinStartTS returns a new TransactionOption with MinStartTS set to minStartTS func (to TransactionOption) SetMinStartTS(minStartTS uint64) TransactionOption { to.MinStartTS = &minStartTS return to } -// SetStartTs set startTS +// SetStartTs returns a new TransactionOption with StartTS set to startTS func (to TransactionOption) SetStartTs(startTS uint64) TransactionOption { to.StartTS = &startTS return to } -// SetPrevSec set prevSec +// SetPrevSec returns a new TransactionOption with PrevSec set to prevSec func (to TransactionOption) SetPrevSec(prevSec uint64) TransactionOption { to.PrevSec = &prevSec return to } -// SetTxnScope set txnScope +// SetTxnScope returns a new TransactionOption with TxnScope set to txnScope func (to TransactionOption) SetTxnScope(txnScope string) TransactionOption { to.TxnScope = txnScope return to From a52ba0468b63663627429c5631c8a56289bbbe75 Mon Sep 17 00:00:00 2001 From: longfangsong Date: Wed, 19 May 2021 10:42:44 +0800 Subject: [PATCH 5/5] Rename TransactionOption to StartTSOption, which might be less misunderstanding --- kv/fault_injection.go | 2 +- kv/fault_injection_test.go | 2 +- kv/interface_mock_test.go | 2 +- kv/kv.go | 2 +- session/session.go | 12 ++++---- session/txn.go | 4 +-- store/driver/tikv_driver.go | 2 +- store/helper/helper.go | 2 +- store/mockstore/mockstorage/storage.go | 2 +- store/tikv/kv.go | 6 ++-- store/tikv/tests/2pc_test.go | 4 +-- store/tikv/tests/extract_start_ts_test.go | 20 ++++++------- store/tikv/txn.go | 34 +++++++++++------------ util/mock/context.go | 2 +- util/mock/store.go | 2 +- 15 files changed, 49 insertions(+), 49 deletions(-) diff --git a/kv/fault_injection.go b/kv/fault_injection.go index 25e4e8d6c6d6c..d61685a7f8a71 100644 --- a/kv/fault_injection.go +++ b/kv/fault_injection.go @@ -66,7 +66,7 @@ func (s *InjectedStore) Begin() (Transaction, error) { } // BeginWithOption creates an injected Transaction with given option. -func (s *InjectedStore) BeginWithOption(option tikv.TransactionOption) (Transaction, error) { +func (s *InjectedStore) BeginWithOption(option tikv.StartTSOption) (Transaction, error) { txn, err := s.Storage.BeginWithOption(option) return &InjectedTransaction{ Transaction: txn, diff --git a/kv/fault_injection_test.go b/kv/fault_injection_test.go index 39279a7490987..4979dbf4268cd 100644 --- a/kv/fault_injection_test.go +++ b/kv/fault_injection_test.go @@ -36,7 +36,7 @@ func (s testFaultInjectionSuite) TestFaultInjectionBasic(c *C) { storage := NewInjectedStore(newMockStorage(), &cfg) txn, err := storage.Begin() c.Assert(err, IsNil) - _, err = storage.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(oracle.GlobalTxnScope).SetStartTs(0)) + _, err = storage.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(oracle.GlobalTxnScope).SetStartTs(0)) c.Assert(err, IsNil) ver := Version{Ver: 1} snap := storage.GetSnapshot(ver) diff --git a/kv/interface_mock_test.go b/kv/interface_mock_test.go index c353772952f18..9e41832678294 100644 --- a/kv/interface_mock_test.go +++ b/kv/interface_mock_test.go @@ -155,7 +155,7 @@ func (s *mockStorage) Begin() (Transaction, error) { return newMockTxn(), nil } -func (s *mockStorage) BeginWithOption(option tikv.TransactionOption) (Transaction, error) { +func (s *mockStorage) BeginWithOption(option tikv.StartTSOption) (Transaction, error) { return newMockTxn(), nil } diff --git a/kv/kv.go b/kv/kv.go index 30d0253e5dc9f..471dfe09a110b 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -346,7 +346,7 @@ type Storage interface { // Begin a global transaction Begin() (Transaction, error) // Begin a transaction with given option - BeginWithOption(option tikv.TransactionOption) (Transaction, error) + BeginWithOption(option tikv.StartTSOption) (Transaction, error) // GetSnapshot gets a snapshot that is able to read any data which data is <= ver. // if ver is MaxVersion or > current max committed version, we will use current version for this snapshot. GetSnapshot(ver Version) Snapshot diff --git a/session/session.go b/session/session.go index 9dafd276a2045..c9b3f8f7a8abd 100644 --- a/session/session.go +++ b/session/session.go @@ -1975,7 +1975,7 @@ func (s *session) NewTxn(ctx context.Context) error { zap.String("txnScope", txnScope)) } - txn, err := s.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope())) + txn, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope())) if err != nil { return err } @@ -2768,7 +2768,7 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error { } // no need to get txn from txnFutureCh since txn should init with startTs - txn, err := s.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(s.GetSessionVars().CheckAndGetTxnScope()).SetStartTs(startTS)) + txn, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(s.GetSessionVars().CheckAndGetTxnScope()).SetStartTs(startTS)) if err != nil { return err } @@ -2801,22 +2801,22 @@ func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionc txnScope := s.GetSessionVars().CheckAndGetTxnScope() switch option.Mode { case ast.TimestampBoundReadTimestamp: - txn, err = s.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) if err != nil { return err } case ast.TimestampBoundExactStaleness: - txn, err = s.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec)) if err != nil { return err } case ast.TimestampBoundMaxStaleness: - txn, err = s.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec)) if err != nil { return err } case ast.TimestampBoundMinReadTimestamp: - txn, err = s.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS)) if err != nil { return err } diff --git a/session/txn.go b/session/txn.go index aac693822b0e4..294725f8efaa0 100644 --- a/session/txn.go +++ b/session/txn.go @@ -437,14 +437,14 @@ type txnFuture struct { func (tf *txnFuture) wait() (kv.Transaction, error) { startTS, err := tf.future.Wait() if err == nil { - return tf.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(tf.txnScope).SetStartTs(startTS)) + return tf.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(tf.txnScope).SetStartTs(startTS)) } else if config.GetGlobalConfig().Store == "unistore" { return nil, err } logutil.BgLogger().Warn("wait tso failed", zap.Error(err)) // It would retry get timestamp. - return tf.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(tf.txnScope)) + return tf.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(tf.txnScope)) } func (s *session) getTxnFuture(ctx context.Context) *txnFuture { diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index 2b847654936c8..2d93b7eda4abb 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -307,7 +307,7 @@ func (s *tikvStore) Begin() (kv.Transaction, error) { } // BeginWithOption begins a transaction with given option -func (s *tikvStore) BeginWithOption(option tikv.TransactionOption) (kv.Transaction, error) { +func (s *tikvStore) BeginWithOption(option tikv.StartTSOption) (kv.Transaction, error) { txn, err := s.KVStore.BeginWithOption(option) if err != nil { return nil, derr.ToTiDBErr(err) diff --git a/store/helper/helper.go b/store/helper/helper.go index 3a8cf64256d14..8eb9b9d7db828 100644 --- a/store/helper/helper.go +++ b/store/helper/helper.go @@ -48,7 +48,7 @@ import ( // Methods copied from kv.Storage and tikv.Storage due to limitation of go1.13. type Storage interface { Begin() (kv.Transaction, error) - BeginWithOption(option tikv.TransactionOption) (kv.Transaction, error) + BeginWithOption(option tikv.StartTSOption) (kv.Transaction, error) GetSnapshot(ver kv.Version) kv.Snapshot GetClient() kv.Client GetMPPClient() kv.MPPClient diff --git a/store/mockstore/mockstorage/storage.go b/store/mockstore/mockstorage/storage.go index 7edfc3cf3e6d6..7d78d1a9b7418 100644 --- a/store/mockstore/mockstorage/storage.go +++ b/store/mockstore/mockstorage/storage.go @@ -83,7 +83,7 @@ func (s *mockStorage) ShowStatus(ctx context.Context, key string) (interface{}, } // BeginWithOption begins a transaction with given option -func (s *mockStorage) BeginWithOption(option tikv.TransactionOption) (kv.Transaction, error) { +func (s *mockStorage) BeginWithOption(option tikv.StartTSOption) (kv.Transaction, error) { return newTiKVTxn(s.KVStore.BeginWithOption(option)) } diff --git a/store/tikv/kv.go b/store/tikv/kv.go index 8fc55994fc8e8..622313f382abd 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -188,11 +188,11 @@ func (s *KVStore) runSafePointChecker() { // Begin a global transaction. func (s *KVStore) Begin() (*KVTxn, error) { - return s.BeginWithOption(DefaultTransactionOption()) + return s.BeginWithOption(DefaultStartTSOption()) } -// BeginWithOption begins a transaction with the given TransactionOption -func (s *KVStore) BeginWithOption(options TransactionOption) (*KVTxn, error) { +// BeginWithOption begins a transaction with the given StartTSOption +func (s *KVStore) BeginWithOption(options StartTSOption) (*KVTxn, error) { return newTiKVTxnWithOptions(s, options) } diff --git a/store/tikv/tests/2pc_test.go b/store/tikv/tests/2pc_test.go index 3165509af8f03..43b682160c514 100644 --- a/store/tikv/tests/2pc_test.go +++ b/store/tikv/tests/2pc_test.go @@ -602,12 +602,12 @@ func (s *testCommitterSuite) TestRejectCommitTS(c *C) { // Use max.Uint64 to read the data and success. // That means the final commitTS > startTS+2, it's not the one we provide. // So we cover the rety commitTS logic. - txn1, err := s.store.BeginWithOption(tikv.DefaultTransactionOption().SetStartTs(committer.GetStartTS() + 2)) + txn1, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetStartTs(committer.GetStartTS() + 2)) c.Assert(err, IsNil) _, err = txn1.Get(bo.GetCtx(), []byte("x")) c.Assert(tikverr.IsErrNotFound(err), IsTrue) - txn2, err := s.store.BeginWithOption(tikv.DefaultTransactionOption().SetStartTs(math.MaxUint64)) + txn2, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetStartTs(math.MaxUint64)) c.Assert(err, IsNil) val, err := txn2.Get(bo.GetCtx(), []byte("x")) c.Assert(err, IsNil) diff --git a/store/tikv/tests/extract_start_ts_test.go b/store/tikv/tests/extract_start_ts_test.go index 606e97afbc50a..566211006b66c 100644 --- a/store/tikv/tests/extract_start_ts_test.go +++ b/store/tikv/tests/extract_start_ts_test.go @@ -62,22 +62,22 @@ func (s *extractStartTsSuite) TestExtractStartTs(c *C) { cases := []struct { expectedTS uint64 - option tikv.TransactionOption + option tikv.StartTSOption }{ // StartTS setted - {100, tikv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: &i, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, + {100, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: &i, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, // PrevSec setted - {200, tikv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: &i, MinStartTS: nil, MaxPrevSec: nil}}, + {200, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: &i, MinStartTS: nil, MaxPrevSec: nil}}, // MinStartTS setted, global - {101, tikv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, + {101, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, // MinStartTS setted, local - {102, tikv.TransactionOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, + {102, tikv.StartTSOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, // MaxPrevSec setted // however we need to add more cases to check the behavior when it fall backs to MinStartTS setted // see `TestMaxPrevSecFallback` - {200, tikv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, + {200, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, // nothing setted - {300, tikv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, + {300, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, } for _, cs := range cases { expected := cs.expectedTS @@ -96,10 +96,10 @@ func (s *extractStartTsSuite) TestMaxPrevSecFallback(c *C) { i := uint64(100) cases := []struct { expectedTS uint64 - option tikv.TransactionOption + option tikv.StartTSOption }{ - {0x8000000000000001, tikv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, - {0x8000000000000002, tikv.TransactionOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, + {0x8000000000000001, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, + {0x8000000000000002, tikv.StartTSOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, } for _, cs := range cases { result, _ := tikv.ExtractStartTs(s.store, cs.option) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 87159ea40246f..d228b834e00dc 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -54,10 +54,10 @@ type SchemaAmender interface { AmendTxn(ctx context.Context, startInfoSchema SchemaVer, change *RelatedSchemaChange, mutations CommitterMutations) (CommitterMutations, error) } -// TransactionOption indicates the option when beginning a transaction +// StartTSOption indicates the option when beginning a transaction // `TxnScope` must be set for each object // Every other fields are optional, but currently at most one of them can be set -type TransactionOption struct { +type StartTSOption struct { TxnScope string StartTS *uint64 PrevSec *uint64 @@ -65,37 +65,37 @@ type TransactionOption struct { MaxPrevSec *uint64 } -// DefaultTransactionOption creates a default TransactionOption, ie. Work in GlobalTxnScope and get start ts when got used -func DefaultTransactionOption() TransactionOption { - return TransactionOption{TxnScope: oracle.GlobalTxnScope} +// DefaultStartTSOption creates a default StartTSOption, ie. Work in GlobalTxnScope and get start ts when got used +func DefaultStartTSOption() StartTSOption { + return StartTSOption{TxnScope: oracle.GlobalTxnScope} } -// SetMaxPrevSec returns a new TransactionOption with MaxPrevSec set to maxPrevSec -func (to TransactionOption) SetMaxPrevSec(maxPrevSec uint64) TransactionOption { +// SetMaxPrevSec returns a new StartTSOption with MaxPrevSec set to maxPrevSec +func (to StartTSOption) SetMaxPrevSec(maxPrevSec uint64) StartTSOption { to.MaxPrevSec = &maxPrevSec return to } -// SetMinStartTS returns a new TransactionOption with MinStartTS set to minStartTS -func (to TransactionOption) SetMinStartTS(minStartTS uint64) TransactionOption { +// SetMinStartTS returns a new StartTSOption with MinStartTS set to minStartTS +func (to StartTSOption) SetMinStartTS(minStartTS uint64) StartTSOption { to.MinStartTS = &minStartTS return to } -// SetStartTs returns a new TransactionOption with StartTS set to startTS -func (to TransactionOption) SetStartTs(startTS uint64) TransactionOption { +// SetStartTs returns a new StartTSOption with StartTS set to startTS +func (to StartTSOption) SetStartTs(startTS uint64) StartTSOption { to.StartTS = &startTS return to } -// SetPrevSec returns a new TransactionOption with PrevSec set to prevSec -func (to TransactionOption) SetPrevSec(prevSec uint64) TransactionOption { +// SetPrevSec returns a new StartTSOption with PrevSec set to prevSec +func (to StartTSOption) SetPrevSec(prevSec uint64) StartTSOption { to.PrevSec = &prevSec return to } -// SetTxnScope returns a new TransactionOption with TxnScope set to txnScope -func (to TransactionOption) SetTxnScope(txnScope string) TransactionOption { +// SetTxnScope returns a new StartTSOption with TxnScope set to txnScope +func (to StartTSOption) SetTxnScope(txnScope string) StartTSOption { to.TxnScope = txnScope return to } @@ -136,7 +136,7 @@ type KVTxn struct { } // ExtractStartTs use `option` to get the proper startTS for a transaction -func ExtractStartTs(store *KVStore, option TransactionOption) (uint64, error) { +func ExtractStartTs(store *KVStore, option StartTSOption) (uint64, error) { var startTs uint64 var err error if option.StartTS != nil { @@ -183,7 +183,7 @@ func ExtractStartTs(store *KVStore, option TransactionOption) (uint64, error) { return startTs, err } -func newTiKVTxnWithOptions(store *KVStore, options TransactionOption) (*KVTxn, error) { +func newTiKVTxnWithOptions(store *KVStore, options StartTSOption) (*KVTxn, error) { if options.TxnScope == "" { options.TxnScope = oracle.GlobalTxnScope } diff --git a/util/mock/context.go b/util/mock/context.go index b90459a84b1ca..6df2c9c10d356 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -205,7 +205,7 @@ func (c *Context) InitTxnWithStartTS(startTS uint64) error { return nil } if c.Store != nil { - txn, err := c.Store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(oracle.GlobalTxnScope).SetStartTs(startTS)) + txn, err := c.Store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(oracle.GlobalTxnScope).SetStartTs(startTS)) if err != nil { return errors.Trace(err) } diff --git a/util/mock/store.go b/util/mock/store.go index 68e4e3f223d14..3adba59e115e5 100644 --- a/util/mock/store.go +++ b/util/mock/store.go @@ -39,7 +39,7 @@ func (s *Store) GetOracle() oracle.Oracle { return nil } func (s *Store) Begin() (kv.Transaction, error) { return nil, nil } // BeginWithOption implements kv.Storage interface. -func (s *Store) BeginWithOption(option tikv.TransactionOption) (kv.Transaction, error) { +func (s *Store) BeginWithOption(option tikv.StartTSOption) (kv.Transaction, error) { return s.Begin() }