Skip to content

Commit

Permalink
store/tikv: remove kv.Version from package store/tikv (#23282)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreMouche authored Mar 13, 2021
1 parent 2bea06e commit b534ec7
Show file tree
Hide file tree
Showing 19 changed files with 87 additions and 68 deletions.
4 changes: 2 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,7 @@ func (e *AnalyzeFastExec) handleScanIter(iter kv.Iterator) (scanKeysSize int, er
}

func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err error) {
snapshot := e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion)
snapshot := e.ctx.GetStore().GetSnapshot(kv.MaxVersion)
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
Expand All @@ -1166,7 +1166,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err

func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) {
defer e.wg.Done()
snapshot := e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion)
snapshot := e.ctx.GetStore().GetSnapshot(kv.MaxVersion)
snapshot.SetOption(kv.NotFillCache, true)
snapshot.SetOption(kv.IsolationLevel, kv.RC)
snapshot.SetOption(kv.Priority, kv.PriorityLow)
Expand Down
12 changes: 12 additions & 0 deletions store/driver/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,15 @@ func (s *tikvStore) BeginWithOption(option kv.TransactionOption) (kv.Transaction
}
return txn_driver.NewTiKVTxn(txn), err
}

// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
// if ver is MaxVersion or > current max committed version, we will use current version for this snapshot.
func (s *tikvStore) GetSnapshot(ver kv.Version) kv.Snapshot {
return s.KVStore.GetSnapshot(ver.Ver)
}

// CurrentVersion returns current max committed version with the given txnScope (local or global).
func (s *tikvStore) CurrentVersion(txnScope string) (kv.Version, error) {
ver, err := s.KVStore.CurrentTimestamp(txnScope)
return kv.NewVersion(ver), err
}
1 change: 1 addition & 0 deletions store/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Storage interface {
Close() error
UUID() string
CurrentVersion(txnScope string) (kv.Version, error)
CurrentTimestamp(txnScop string) (uint64, error)
GetOracle() oracle.Oracle
SupportDeleteRange() (supported bool)
Name() string
Expand Down
12 changes: 12 additions & 0 deletions store/mockstore/unistore.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,18 @@ func (s *mockStorage) BeginWithOption(option kv.TransactionOption) (kv.Transacti
return newTiKVTxn(txn, err)
}

// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
// if ver is MaxVersion or > current max committed version, we will use current version for this snapshot.
func (s *mockStorage) GetSnapshot(ver kv.Version) kv.Snapshot {
return s.KVStore.GetSnapshot(ver.Ver)
}

// CurrentVersion returns current max committed version with the given txnScope (local or global).
func (s *mockStorage) CurrentVersion(txnScope string) (kv.Version, error) {
ver, err := s.KVStore.CurrentTimestamp(txnScope)
return kv.NewVersion(ver), err
}

func newTiKVTxn(txn *tikv.KVTxn, err error) (kv.Transaction, error) {
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/1pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (s *testOnePCSuite) Test1PC(c *C) {
// Check all keys
keys := [][]byte{k1, k2, k3, k4, k5, k6}
values := [][]byte{v1, v2, v3, v4, v5, v6New}
ver, err := s.store.CurrentVersion(oracle.GlobalTxnScope)
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
c.Assert(err, IsNil)
snap := s.store.GetSnapshot(ver)
for i, k := range keys {
Expand Down Expand Up @@ -214,7 +214,7 @@ func (s *testOnePCSuite) Test1PCDisallowMultiRegion(c *C) {
c.Assert(txn.committer.onePCCommitTS, Equals, uint64(0))
c.Assert(txn.committer.commitTS, Greater, txn.startTS)

ver, err := s.store.CurrentVersion(oracle.GlobalTxnScope)
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
c.Assert(err, IsNil)
snap := s.store.GetSnapshot(ver)
for i, k := range keys {
Expand Down
6 changes: 3 additions & 3 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1277,12 +1277,12 @@ func (c *twoPhaseCommitter) amendPessimisticLock(ctx context.Context, addMutatio
if err != nil {
// KeysNeedToLock won't change, so don't async rollback pessimistic locks here for write conflict.
if terror.ErrorEqual(kv.ErrWriteConflict, err) {
newForUpdateTSVer, err := c.store.CurrentVersion(oracle.GlobalTxnScope)
newForUpdateTSVer, err := c.store.CurrentTimestamp(oracle.GlobalTxnScope)
if err != nil {
return errors.Trace(err)
}
lCtx.ForUpdateTS = newForUpdateTSVer.Ver
c.forUpdateTS = newForUpdateTSVer.Ver
lCtx.ForUpdateTS = newForUpdateTSVer
c.forUpdateTS = newForUpdateTSVer
logutil.Logger(ctx).Info("amend pessimistic lock pessimistic retry lock",
zap.Uint("tryTimes", tryTimes), zap.Uint64("startTS", c.startTS),
zap.Uint64("newForUpdateTS", c.forUpdateTS))
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,12 @@ func (s *testCommitterSuite) mustGetRegionID(c *C, key []byte) uint64 {
}

func (s *testCommitterSuite) isKeyLocked(c *C, key []byte) bool {
ver, err := s.store.CurrentVersion(oracle.GlobalTxnScope)
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
c.Assert(err, IsNil)
bo := NewBackofferWithVars(context.Background(), getMaxBackoff, nil)
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
Key: key,
Version: ver.Ver,
Version: ver,
})
loc, err := s.store.regionCache.LocateKey(bo, key)
c.Assert(err, IsNil)
Expand Down
10 changes: 5 additions & 5 deletions store/tikv/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ func (s *testAsyncCommitCommon) mustGetFromTxn(c *C, txn *KVTxn, key, expectedVa
}

func (s *testAsyncCommitCommon) mustGetLock(c *C, key []byte) *Lock {
ver, err := s.store.CurrentVersion(oracle.GlobalTxnScope)
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
c.Assert(err, IsNil)
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
Key: key,
Version: ver.Ver,
Version: ver,
})
bo := NewBackofferWithVars(context.Background(), 5000, nil)
loc, err := s.store.regionCache.LocateKey(bo, key)
Expand All @@ -96,21 +96,21 @@ func (s *testAsyncCommitCommon) mustGetLock(c *C, key []byte) *Lock {
}

func (s *testAsyncCommitCommon) mustPointGet(c *C, key, expectedValue []byte) {
snap := s.store.GetSnapshot(kv.MaxVersion)
snap := s.store.GetSnapshot(maxTimestamp)
value, err := snap.Get(context.Background(), key)
c.Assert(err, IsNil)
c.Assert(value, BytesEquals, expectedValue)
}

func (s *testAsyncCommitCommon) mustGetFromSnapshot(c *C, version uint64, key, expectedValue []byte) {
snap := s.store.GetSnapshot(kv.Version{Ver: version})
snap := s.store.GetSnapshot(version)
value, err := snap.Get(context.Background(), key)
c.Assert(err, IsNil)
c.Assert(value, BytesEquals, expectedValue)
}

func (s *testAsyncCommitCommon) mustGetNoneFromSnapshot(c *C, version uint64, key []byte) {
snap := s.store.GetSnapshot(kv.Version{Ver: version})
snap := s.store.GetSnapshot(version)
_, err := snap.Get(context.Background(), key)
c.Assert(errors.Cause(err), Equals, kv.ErrNotExist)
}
Expand Down
8 changes: 2 additions & 6 deletions store/tikv/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"time"

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
)
Expand Down Expand Up @@ -51,15 +50,12 @@ type Storage interface {
// Closed returns the closed channel.
Closed() <-chan struct{}

// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
// if ver is MaxVersion or > current max committed version, we will use current version for this snapshot.
GetSnapshot(ver kv.Version) kv.Snapshot
// Close store
Close() error
// UUID return a unique ID which represents a Storage.
UUID() string
// CurrentVersion returns current max committed version with the given txnScope (local or global).
CurrentVersion(txnScope string) (kv.Version, error)
// CurrentTimestamp returns current timestamp with the given txnScope (local or global).
CurrentTimestamp(txnScope string) (uint64, error)
// GetOracle gets a timestamp oracle client.
GetOracle() oracle.Oracle
// SupportDeleteRange gets the storage support delete range or not.
Expand Down
14 changes: 7 additions & 7 deletions store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ func (s *KVStore) beginWithExactStaleness(txnScope string, prevSec uint64) (*KVT
}

// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
// if ver is MaxVersion or > current max committed version, we will use current version for this snapshot.
func (s *KVStore) GetSnapshot(ver kv.Version) kv.Snapshot {
snapshot := newTiKVSnapshot(s, ver, s.nextReplicaReadSeed())
// if ts is MaxVersion or > current max committed version, we will use current version for this snapshot.
func (s *KVStore) GetSnapshot(ts uint64) kv.Snapshot {
snapshot := newTiKVSnapshot(s, ts, s.nextReplicaReadSeed())
return snapshot
}

Expand Down Expand Up @@ -244,14 +244,14 @@ func (s *KVStore) UUID() string {
return s.uuid
}

// CurrentVersion returns current max committed version with the given txnScope (local or global).
func (s *KVStore) CurrentVersion(txnScope string) (kv.Version, error) {
// CurrentTimestamp returns current timestamp with the given txnScope (local or global).
func (s *KVStore) CurrentTimestamp(txnScope string) (uint64, error) {
bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil)
startTS, err := s.getTimestampWithRetry(bo, txnScope)
if err != nil {
return kv.NewVersion(0), errors.Trace(err)
return 0, errors.Trace(err)
}
return kv.NewVersion(startTS), nil
return startTS, nil
}

func (s *KVStore) getTimestampWithRetry(bo *Backoffer, txnScope string) (uint64, error) {
Expand Down
6 changes: 3 additions & 3 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (s *testLockSuite) TestScanLockResolveWithBatchGet(c *C) {
keys = append(keys, []byte{ch})
}

ver, err := s.store.CurrentVersion(oracle.GlobalTxnScope)
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
c.Assert(err, IsNil)
snapshot := newTiKVSnapshot(s.store, ver, 0)
m, err := snapshot.BatchGet(context.Background(), keys)
Expand Down Expand Up @@ -401,12 +401,12 @@ func (s *testLockSuite) prewriteTxnWithTTL(c *C, txn *KVTxn, ttl uint64) {
}

func (s *testLockSuite) mustGetLock(c *C, key []byte) *Lock {
ver, err := s.store.CurrentVersion(oracle.GlobalTxnScope)
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
c.Assert(err, IsNil)
bo := NewBackofferWithVars(context.Background(), getMaxBackoff, nil)
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
Key: key,
Version: ver.Ver,
Version: ver,
})
loc, err := s.store.regionCache.LocateKey(bo, key)
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/safepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (s *testSafePointSuite) TestSafePoint(c *C) {

s.waitUntilErrorPlugIn(txn4.startTS)

snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn4.StartTS()}, 0)
snapshot := newTiKVSnapshot(s.store, txn4.StartTS(), 0)
_, batchgeterr := snapshot.BatchGet(context.Background(), keys)
c.Assert(batchgeterr, NotNil)
isFallBehind = terror.ErrorEqual(errors.Cause(geterr2), ErrGCTooEarly)
Expand Down
6 changes: 3 additions & 3 deletions store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *Scanner) Value() []byte {

// Next return next element.
func (s *Scanner) Next() error {
bo := NewBackofferWithVars(context.WithValue(context.Background(), TxnStartKey, s.snapshot.version.Ver), scannerNextMaxBackoff, s.snapshot.vars)
bo := NewBackofferWithVars(context.WithValue(context.Background(), TxnStartKey, s.snapshot.version), scannerNextMaxBackoff, s.snapshot.vars)
if !s.valid {
return errors.New("scanner iterator is invalid")
}
Expand Down Expand Up @@ -140,7 +140,7 @@ func (s *Scanner) Close() {
}

func (s *Scanner) startTS() uint64 {
return s.snapshot.version.Ver
return s.snapshot.version
}

func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *pb.KvPair) error {
Expand Down Expand Up @@ -245,7 +245,7 @@ func (s *Scanner) getData(bo *Backoffer) error {
if err != nil {
return errors.Trace(err)
}
msBeforeExpired, _, err := newLockResolver(s.snapshot.store).ResolveLocks(bo, s.snapshot.version.Ver, []*Lock{lock})
msBeforeExpired, _, err := newLockResolver(s.snapshot.store).ResolveLocks(bo, s.snapshot.version, []*Lock{lock})
if err != nil {
return errors.Trace(err)
}
Expand Down
5 changes: 2 additions & 3 deletions store/tikv/scan_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
)

type testScanMockSuite struct {
Expand All @@ -41,7 +40,7 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) {

txn, err = store.Begin()
c.Assert(err, IsNil)
snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}, 0)
snapshot := newTiKVSnapshot(store, txn.StartTS(), 0)
scanner, err := newScanner(snapshot, []byte("a"), nil, 10, false)
c.Assert(err, IsNil)
for ch := byte('a'); ch <= byte('z'); ch++ {
Expand Down Expand Up @@ -74,7 +73,7 @@ func (s *testScanMockSuite) TestReverseScan(c *C) {

txn, err = store.Begin()
c.Assert(err, IsNil)
snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}, 0)
snapshot := newTiKVSnapshot(store, txn.StartTS(), 0)
scanner, err := newScanner(snapshot, nil, []byte("z"), 10, true)
c.Assert(err, IsNil)
for ch := byte('y'); ch >= byte('a'); ch-- {
Expand Down
Loading

0 comments on commit b534ec7

Please sign in to comment.