Skip to content

Commit

Permalink
Merge branch 'master' into txn-scope
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing committed May 19, 2021
2 parents cb8d445 + 49d1eaa commit 797fd21
Show file tree
Hide file tree
Showing 16 changed files with 145 additions and 126 deletions.
4 changes: 3 additions & 1 deletion kv/fault_injection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package kv
import (
"context"
"sync"

"github.com/pingcap/tidb/store/tikv"
)

// InjectionConfig is used for fault injections for KV components.
Expand Down Expand Up @@ -64,7 +66,7 @@ func (s *InjectedStore) Begin() (Transaction, error) {
}

// BeginWithOption creates an injected Transaction with given option.
func (s *InjectedStore) BeginWithOption(option TransactionOption) (Transaction, error) {
func (s *InjectedStore) BeginWithOption(option tikv.StartTSOption) (Transaction, error) {
txn, err := s.Storage.BeginWithOption(option)
return &InjectedTransaction{
Transaction: txn,
Expand Down
3 changes: 2 additions & 1 deletion kv/fault_injection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/store/tikv"
)

type testFaultInjectionSuite struct{}
Expand All @@ -34,7 +35,7 @@ func (s testFaultInjectionSuite) TestFaultInjectionBasic(c *C) {
storage := NewInjectedStore(newMockStorage(), &cfg)
txn, err := storage.Begin()
c.Assert(err, IsNil)
_, err = storage.BeginWithOption(TransactionOption{}.SetTxnScope(GlobalTxnScope).SetStartTs(0))
_, err = storage.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(GlobalTxnScope).SetStartTs(0))
c.Assert(err, IsNil)
ver := Version{Ver: 1}
snap := storage.GetSnapshot(ver)
Expand Down
3 changes: 2 additions & 1 deletion kv/interface_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"

"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
)

Expand Down Expand Up @@ -154,7 +155,7 @@ func (s *mockStorage) Begin() (Transaction, error) {
return newMockTxn(), nil
}

func (s *mockStorage) BeginWithOption(option TransactionOption) (Transaction, error) {
func (s *mockStorage) BeginWithOption(option tikv.StartTSOption) (Transaction, error) {
return newMockTxn(), nil
}

Expand Down
49 changes: 2 additions & 47 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/memory"
Expand Down Expand Up @@ -339,59 +340,13 @@ type Driver interface {
Open(path string) (Storage, error)
}

// TransactionOption indicates the option when beginning a transaction
// `TxnScope` must be set for each object
// Every other fields are optional, but currently at most one of them can be set
type TransactionOption struct {
TxnScope string
StartTS *uint64
PrevSec *uint64
MinStartTS *uint64
MaxPrevSec *uint64
}

// DefaultTransactionOption creates a default TransactionOption, ie. Work in GlobalTxnScope and get start ts when got used
func DefaultTransactionOption() TransactionOption {
return TransactionOption{TxnScope: GlobalTxnScope}
}

// SetMaxPrevSec set maxPrevSec
func (to TransactionOption) SetMaxPrevSec(maxPrevSec uint64) TransactionOption {
to.MaxPrevSec = &maxPrevSec
return to
}

// SetMinStartTS set minStartTS
func (to TransactionOption) SetMinStartTS(minStartTS uint64) TransactionOption {
to.MinStartTS = &minStartTS
return to
}

// SetStartTs set startTS
func (to TransactionOption) SetStartTs(startTS uint64) TransactionOption {
to.StartTS = &startTS
return to
}

// SetPrevSec set prevSec
func (to TransactionOption) SetPrevSec(prevSec uint64) TransactionOption {
to.PrevSec = &prevSec
return to
}

// SetTxnScope set txnScope
func (to TransactionOption) SetTxnScope(txnScope string) TransactionOption {
to.TxnScope = txnScope
return to
}

// Storage defines the interface for storage.
// Isolation should be at least SI(SNAPSHOT ISOLATION)
type Storage interface {
// Begin a global transaction
Begin() (Transaction, error)
// Begin a transaction with given option
BeginWithOption(option TransactionOption) (Transaction, error)
BeginWithOption(option tikv.StartTSOption) (Transaction, error)
// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
// if ver is MaxVersion or > current max committed version, we will use current version for this snapshot.
GetSnapshot(ver Version) Snapshot
Expand Down
12 changes: 6 additions & 6 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1974,7 +1974,7 @@ func (s *session) NewTxn(ctx context.Context) error {
zap.String("txnScope", txnScope))
}

txn, err := s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope()))
txn, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope()))
if err != nil {
return err
}
Expand Down Expand Up @@ -2767,7 +2767,7 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error {
}

// no need to get txn from txnFutureCh since txn should init with startTs
txn, err := s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(s.GetSessionVars().CheckAndGetTxnScope()).SetStartTs(startTS))
txn, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(s.GetSessionVars().CheckAndGetTxnScope()).SetStartTs(startTS))
if err != nil {
return err
}
Expand Down Expand Up @@ -2800,22 +2800,22 @@ func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionc
txnScope := s.GetSessionVars().CheckAndGetTxnScope()
switch option.Mode {
case ast.TimestampBoundReadTimestamp:
txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetStartTs(option.StartTS))
txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetStartTs(option.StartTS))
if err != nil {
return err
}
case ast.TimestampBoundExactStaleness:
txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec))
txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec))
if err != nil {
return err
}
case ast.TimestampBoundMaxStaleness:
txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec))
txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec))
if err != nil {
return err
}
case ast.TimestampBoundMinReadTimestamp:
txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS))
txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS))
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/session/txninfo"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -436,14 +437,14 @@ type txnFuture struct {
func (tf *txnFuture) wait() (kv.Transaction, error) {
startTS, err := tf.future.Wait()
if err == nil {
return tf.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(tf.txnScope).SetStartTs(startTS))
return tf.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(tf.txnScope).SetStartTs(startTS))
} else if config.GetGlobalConfig().Store == "unistore" {
return nil, err
}

logutil.BgLogger().Warn("wait tso failed", zap.Error(err))
// It would retry get timestamp.
return tf.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(tf.txnScope))
return tf.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(tf.txnScope))
}

func (s *session) getTxnFuture(ctx context.Context) *txnFuture {
Expand Down
2 changes: 1 addition & 1 deletion store/driver/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (s *tikvStore) Begin() (kv.Transaction, error) {
}

// BeginWithOption begins a transaction with given option
func (s *tikvStore) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) {
func (s *tikvStore) BeginWithOption(option tikv.StartTSOption) (kv.Transaction, error) {
txn, err := s.KVStore.BeginWithOption(option)
if err != nil {
return nil, derr.ToTiDBErr(err)
Expand Down
2 changes: 1 addition & 1 deletion store/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import (
// Methods copied from kv.Storage and tikv.Storage due to limitation of go1.13.
type Storage interface {
Begin() (kv.Transaction, error)
BeginWithOption(option kv.TransactionOption) (kv.Transaction, error)
BeginWithOption(option tikv.StartTSOption) (kv.Transaction, error)
GetSnapshot(ver kv.Version) kv.Snapshot
GetClient() kv.Client
GetMPPClient() kv.MPPClient
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mockstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *mockStorage) ShowStatus(ctx context.Context, key string) (interface{},
}

// BeginWithOption begins a transaction with given option
func (s *mockStorage) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) {
func (s *mockStorage) BeginWithOption(option tikv.StartTSOption) (kv.Transaction, error) {
return newTiKVTxn(s.KVStore.BeginWithOption(option))
}

Expand Down
8 changes: 4 additions & 4 deletions store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/config"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/kv"
Expand Down Expand Up @@ -189,11 +188,11 @@ func (s *KVStore) runSafePointChecker() {

// Begin a global transaction.
func (s *KVStore) Begin() (*KVTxn, error) {
return s.BeginWithOption(tidbkv.DefaultTransactionOption())
return s.BeginWithOption(DefaultStartTSOption())
}

// BeginWithOption begins a transaction with the given TransactionOption
func (s *KVStore) BeginWithOption(options tidbkv.TransactionOption) (*KVTxn, error) {
// BeginWithOption begins a transaction with the given StartTSOption
func (s *KVStore) BeginWithOption(options StartTSOption) (*KVTxn, error) {
return newTiKVTxnWithOptions(s, options)
}

Expand Down Expand Up @@ -389,6 +388,7 @@ func (s *KVStore) getSafeTS(storeID uint64) uint64 {
return safeTS.(uint64)
}

// setSafeTS sets safeTs for store storeID, export for testing
func (s *KVStore) setSafeTS(storeID, safeTS uint64) {
s.safeTSMap.Store(storeID, safeTS)
}
Expand Down
18 changes: 18 additions & 0 deletions store/tikv/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/store/tikv/unionstore"
Expand Down Expand Up @@ -81,6 +82,23 @@ func (s StoreProbe) SaveSafePoint(v uint64) error {
return saveSafePoint(s.GetSafePointKV(), v)
}

// SetRegionCacheStore is used to set a store in region cache, for testing only
func (s StoreProbe) SetRegionCacheStore(id uint64, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) {
s.regionCache.storeMu.Lock()
defer s.regionCache.storeMu.Unlock()
s.regionCache.storeMu.stores[id] = &Store{
storeID: id,
storeType: storeType,
state: state,
labels: labels,
}
}

// SetSafeTS is used to set safeTS for the store with `storeID`
func (s StoreProbe) SetSafeTS(storeID, safeTS uint64) {
s.setSafeTS(storeID, safeTS)
}

// TxnProbe wraps a txn and exports internal states for testing purpose.
type TxnProbe struct {
*KVTxn
Expand Down
5 changes: 2 additions & 3 deletions store/tikv/tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
tidbkv "github.com/pingcap/tidb/kv"
drivertxn "github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/config"
Expand Down Expand Up @@ -603,12 +602,12 @@ func (s *testCommitterSuite) TestRejectCommitTS(c *C) {
// Use max.Uint64 to read the data and success.
// That means the final commitTS > startTS+2, it's not the one we provide.
// So we cover the rety commitTS logic.
txn1, err := s.store.BeginWithOption(tidbkv.DefaultTransactionOption().SetStartTs(committer.GetStartTS() + 2))
txn1, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetStartTs(committer.GetStartTS() + 2))
c.Assert(err, IsNil)
_, err = txn1.Get(bo.GetCtx(), []byte("x"))
c.Assert(tikverr.IsErrNotFound(err), IsTrue)

txn2, err := s.store.BeginWithOption(tidbkv.DefaultTransactionOption().SetStartTs(math.MaxUint64))
txn2, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetStartTs(math.MaxUint64))
c.Assert(err, IsNil)
val, err := txn2.Get(bo.GetCtx(), []byte("x"))
c.Assert(err, IsNil)
Expand Down
Loading

0 comments on commit 797fd21

Please sign in to comment.