Skip to content

Commit

Permalink
store/tikv:use tikv.error.ErrNotExist instead of kv.ErrNotExist (#24136)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreMouche authored Apr 28, 2021
1 parent 22dbcc5 commit a1d4d9d
Show file tree
Hide file tree
Showing 19 changed files with 83 additions and 61 deletions.
10 changes: 10 additions & 0 deletions store/driver/txn/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ func extractKeyErr(err error) error {
notFoundDetail := prettyLockNotFoundKey(e.Retryable)
return kv.ErrTxnRetryable.GenWithStackByArgs(e.Retryable + " " + notFoundDetail)
}
return toTiDBErr(err)
}

func toTiDBErr(err error) error {
if err == nil {
return nil
}
if tikverr.IsErrNotFound(err) {
return kv.ErrNotExist
}
return errors.Trace(err)
}

Expand Down
5 changes: 2 additions & 3 deletions store/driver/txn/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
)
Expand Down Expand Up @@ -48,7 +47,7 @@ func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) {
func (s *tikvSnapshot) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
scanner, err := s.KVSnapshot.Iter(k, upperBound)
if err != nil {
return nil, errors.Trace(err)
return nil, toTiDBErr(err)
}
return &tikvScanner{scanner.(*tikv.Scanner)}, err
}
Expand All @@ -57,7 +56,7 @@ func (s *tikvSnapshot) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
func (s *tikvSnapshot) IterReverse(k kv.Key) (kv.Iterator, error) {
scanner, err := s.KVSnapshot.IterReverse(k)
if err != nil {
return nil, errors.Trace(err)
return nil, toTiDBErr(err)
}
return &tikvScanner{scanner.(*tikv.Scanner)}, err
}
Expand Down
13 changes: 8 additions & 5 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (txn *tikvTxn) GetSnapshot() kv.Snapshot {
// The Iterator must be Closed after use.
func (txn *tikvTxn) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
it, err := txn.KVTxn.Iter(k, upperBound)
return newKVIterator(it), errors.Trace(err)
return newKVIterator(it), toTiDBErr(err)
}

// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
Expand All @@ -84,7 +84,7 @@ func (txn *tikvTxn) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
// TODO: Add lower bound limit
func (txn *tikvTxn) IterReverse(k kv.Key) (kv.Iterator, error) {
it, err := txn.KVTxn.IterReverse(k)
return newKVIterator(it), errors.Trace(err)
return newKVIterator(it), toTiDBErr(err)
}

// BatchGet gets kv from the memory buffer of statement and transaction, and the kv storage.
Expand All @@ -100,15 +100,18 @@ func (txn *tikvTxn) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]b
}

func (txn *tikvTxn) Delete(k kv.Key) error {
return txn.KVTxn.Delete(k)
err := txn.KVTxn.Delete(k)
return toTiDBErr(err)
}

func (txn *tikvTxn) Get(ctx context.Context, k kv.Key) ([]byte, error) {
return txn.KVTxn.Get(ctx, k)
data, err := txn.KVTxn.Get(ctx, k)
return data, toTiDBErr(err)
}

func (txn *tikvTxn) Set(k kv.Key, v []byte) error {
return txn.KVTxn.Set(k, v)
err := txn.KVTxn.Set(k, v)
return toTiDBErr(err)
}

func (txn *tikvTxn) GetMemBuffer() kv.MemBuffer {
Expand Down
30 changes: 18 additions & 12 deletions store/driver/txn/unionstore_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package txn
import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/unionstore"
Expand All @@ -39,15 +38,18 @@ func (m *memBuffer) Delete(k kv.Key) error {
}

func (m *memBuffer) DeleteWithFlags(k kv.Key, ops ...tikvstore.FlagsOp) error {
return m.MemDB.DeleteWithFlags(k, ops...)
err := m.MemDB.DeleteWithFlags(k, ops...)
return toTiDBErr(err)
}

func (m *memBuffer) Get(_ context.Context, key kv.Key) ([]byte, error) {
return m.MemDB.Get(key)
data, err := m.MemDB.Get(key)
return data, toTiDBErr(err)
}

func (m *memBuffer) GetFlags(key kv.Key) (tikvstore.KeyFlags, error) {
return m.MemDB.GetFlags(key)
data, err := m.MemDB.GetFlags(key)
return data, toTiDBErr(err)
}

func (m *memBuffer) Staging() kv.StagingHandle {
Expand All @@ -70,11 +72,13 @@ func (m *memBuffer) InspectStage(handle kv.StagingHandle, f func(kv.Key, tikvsto
}

func (m *memBuffer) Set(key kv.Key, value []byte) error {
return m.MemDB.Set(key, value)
err := m.MemDB.Set(key, value)
return toTiDBErr(err)
}

func (m *memBuffer) SetWithFlags(key kv.Key, value []byte, ops ...kv.FlagsOp) error {
return m.MemDB.SetWithFlags(key, value, ops...)
err := m.MemDB.SetWithFlags(key, value, ops...)
return toTiDBErr(err)
}

// Iter creates an Iterator positioned on the first entry that k <= entry's key.
Expand All @@ -83,7 +87,7 @@ func (m *memBuffer) SetWithFlags(key kv.Key, value []byte, ops ...kv.FlagsOp) er
// The Iterator must be Closed after use.
func (m *memBuffer) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
it, err := m.MemDB.Iter(k, upperBound)
return &tikvIterator{Iterator: it}, errors.Trace(err)
return &tikvIterator{Iterator: it}, toTiDBErr(err)
}

// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
Expand All @@ -92,7 +96,7 @@ func (m *memBuffer) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
// TODO: Add lower bound limit
func (m *memBuffer) IterReverse(k kv.Key) (kv.Iterator, error) {
it, err := m.MemDB.IterReverse(k)
return &tikvIterator{Iterator: it}, errors.Trace(err)
return &tikvIterator{Iterator: it}, toTiDBErr(err)
}

// SnapshotIter returns a Iterator for a snapshot of MemBuffer.
Expand All @@ -116,7 +120,8 @@ func (u *tikvUnionStore) GetMemBuffer() kv.MemBuffer {
}

func (u *tikvUnionStore) Get(ctx context.Context, k kv.Key) ([]byte, error) {
return u.KVUnionStore.Get(ctx, k)
data, err := u.KVUnionStore.Get(ctx, k)
return data, toTiDBErr(err)
}

func (u *tikvUnionStore) HasPresumeKeyNotExists(k kv.Key) bool {
Expand All @@ -129,7 +134,7 @@ func (u *tikvUnionStore) UnmarkPresumeKeyNotExists(k kv.Key) {

func (u *tikvUnionStore) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
it, err := u.KVUnionStore.Iter(k, upperBound)
return newKVIterator(it), errors.Trace(err)
return newKVIterator(it), toTiDBErr(err)
}

// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
Expand All @@ -138,7 +143,7 @@ func (u *tikvUnionStore) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error)
// TODO: Add lower bound limit
func (u *tikvUnionStore) IterReverse(k kv.Key) (kv.Iterator, error) {
it, err := u.KVUnionStore.IterReverse(k)
return newKVIterator(it), errors.Trace(err)
return newKVIterator(it), toTiDBErr(err)
}

type tikvGetter struct {
Expand All @@ -150,7 +155,8 @@ func newKVGetter(getter unionstore.Getter) kv.Getter {
}

func (g *tikvGetter) Get(_ context.Context, k kv.Key) ([]byte, error) {
return g.Getter.Get(k)
data, err := g.Getter.Get(k)
return data, toTiDBErr(err)
}

// tikvIterator wraps unionstore.Iterator as kv.Iterator
Expand Down
3 changes: 0 additions & 3 deletions store/tikv/error/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ const (
CodeDataOutOfRange = 1690
CodeLockAcquireFailAndNoWaitSet = 3572

// Codeor codes used by TiDB ddl package
CodeLockExpire = 8229

// TiKV/PD/TiFlash errors.
CodePDServerTimeout = 9001
CodeTiKVServerTimeout = 9002
Expand Down
7 changes: 7 additions & 0 deletions store/tikv/error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ var (
ErrBodyMissing = errors.New("response body is missing")
// ErrTiDBShuttingDown is returned when TiDB is closing and send request to tikv fail, do not retry.
ErrTiDBShuttingDown = errors.New("tidb server shutting down")
// ErrNotExist means the related data not exist.
ErrNotExist = errors.New("not exist")
)

// MismatchClusterID represents the message that the cluster ID of the PD client does not match the PD.
Expand Down Expand Up @@ -56,6 +58,11 @@ var (
_ = dbterror.ClassTiKV.NewStd(CodeDivisionByZero)
)

// IsErrNotFound checks if err is a kind of NotFound error.
func IsErrNotFound(err error) bool {
return errors.ErrorEqual(err, ErrNotExist)
}

// ErrDeadlock wraps *kvrpcpb.Deadlock to implement the error interface.
// It also marks if the deadlock is retryable.
type ErrDeadlock struct {
Expand Down
3 changes: 1 addition & 2 deletions store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/pingcap/errors"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
tidbkv "github.com/pingcap/tidb/kv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
Expand Down Expand Up @@ -59,7 +58,7 @@ func newScanner(snapshot *KVSnapshot, startKey []byte, endKey []byte, batchSize
nextEndKey: endKey,
}
err := scanner.Next()
if tidbkv.IsErrNotFound(err) {
if tikverr.IsErrNotFound(err) {
return scanner, nil
}
return scanner, errors.Trace(err)
Expand Down
3 changes: 1 addition & 2 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/failpoint"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
tidbkv "github.com/pingcap/tidb/kv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
Expand Down Expand Up @@ -385,7 +384,7 @@ func (s *KVSnapshot) Get(ctx context.Context, k []byte) ([]byte, error) {
}

if len(val) == 0 {
return nil, tidbkv.ErrNotExist
return nil, tikverr.ErrNotExist
}
return val, nil
}
Expand Down
6 changes: 3 additions & 3 deletions store/tikv/tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed(c *C) {
c.Assert(err, IsNil)
c.Assert(v, BytesEquals, []byte("a1"))
_, err = txn.Get(context.TODO(), []byte("b"))
errMsgMustContain(c, err, "key not exist")
c.Assert(tikverr.IsErrNotFound(err), IsTrue)

// clean again, shouldn't be failed when a rollback already exist.
ctx := context.Background()
Expand Down Expand Up @@ -605,7 +605,7 @@ func (s *testCommitterSuite) TestRejectCommitTS(c *C) {
txn1, err := s.store.BeginWithStartTS(oracle.GlobalTxnScope, committer.GetStartTS()+2)
c.Assert(err, IsNil)
_, err = txn1.Get(bo.GetCtx(), []byte("x"))
c.Assert(tidbkv.IsErrNotFound(err), IsTrue)
c.Assert(tikverr.IsErrNotFound(err), IsTrue)

txn2, err := s.store.BeginWithStartTS(oracle.GlobalTxnScope, math.MaxUint64)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -1136,7 +1136,7 @@ func (s *testCommitterSuite) TestPushPessimisticLock(c *C) {
elapsed := time.Since(start)
// The optimistic lock shouldn't block reads.
c.Assert(elapsed, Less, 500*time.Millisecond)
c.Assert(tidbkv.IsErrNotFound(err), IsTrue)
c.Assert(tikverr.IsErrNotFound(err), IsTrue)

txn1.Rollback()
txn2.Rollback()
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/tests/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/store/tikv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/mockstore/cluster"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -122,7 +122,7 @@ func (s *testAsyncCommitCommon) mustGetFromSnapshot(c *C, version uint64, key, e
func (s *testAsyncCommitCommon) mustGetNoneFromSnapshot(c *C, version uint64, key []byte) {
snap := s.store.GetSnapshot(version)
_, err := snap.Get(context.Background(), key)
c.Assert(errors.Cause(err), Equals, tidbkv.ErrNotExist)
c.Assert(errors.Cause(err), Equals, tikverr.ErrNotExist)
}

func (s *testAsyncCommitCommon) beginAsyncCommitWithLinearizability(c *C) tikv.TxnProbe {
Expand Down
14 changes: 7 additions & 7 deletions store/tikv/tests/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
Expand Down Expand Up @@ -506,9 +506,9 @@ func (s *testLockSuite) TestBatchResolveLocks(c *C) {
c.Assert(err, IsNil)
// transaction 1 is rolled back
_, err = txn.Get(context.Background(), []byte("k1"))
c.Assert(err, Equals, tidbkv.ErrNotExist)
c.Assert(err, Equals, tikverr.ErrNotExist)
_, err = txn.Get(context.Background(), []byte("k2"))
c.Assert(err, Equals, tidbkv.ErrNotExist)
c.Assert(err, Equals, tikverr.ErrNotExist)
// transaction 2 is committed
v, err := txn.Get(context.Background(), []byte("k3"))
c.Assert(err, IsNil)
Expand Down Expand Up @@ -617,9 +617,9 @@ func (s *testLockSuite) TestResolveTxnFallenBackFromAsyncCommit(c *C) {
t3, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = t3.Get(context.Background(), []byte("fb1"))
errMsgMustContain(c, err, "key not exist")
c.Assert(tikverr.IsErrNotFound(err), IsTrue)
_, err = t3.Get(context.Background(), []byte("fb2"))
errMsgMustContain(c, err, "key not exist")
c.Assert(tikverr.IsErrNotFound(err), IsTrue)
}

func (s *testLockSuite) TestBatchResolveTxnFallenBackFromAsyncCommit(c *C) {
Expand All @@ -637,7 +637,7 @@ func (s *testLockSuite) TestBatchResolveTxnFallenBackFromAsyncCommit(c *C) {
t3, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = t3.Get(context.Background(), []byte("fb1"))
errMsgMustContain(c, err, "key not exist")
c.Assert(tikverr.IsErrNotFound(err), IsTrue)
_, err = t3.Get(context.Background(), []byte("fb2"))
errMsgMustContain(c, err, "key not exist")
c.Assert(tikverr.IsErrNotFound(err), IsTrue)
}
3 changes: 2 additions & 1 deletion store/tikv/tests/snapshot_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/store/tikv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/kv"
)

Expand Down Expand Up @@ -230,7 +231,7 @@ func (s *testSnapshotFailSuite) TestRetryPointGetResolveTS(c *C) {
// Should get nothing with max version, and **not pushing forward minCommitTS** of the primary lock
snapshot := s.store.GetSnapshot(math.MaxUint64)
_, err = snapshot.Get(context.Background(), []byte("k2"))
c.Assert(err, ErrorMatches, ".*key not exist")
c.Assert(tikverr.IsErrNotFound(err), IsTrue)

initialCommitTS := committer.GetCommitTS()
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforeCommit"), IsNil)
Expand Down
8 changes: 4 additions & 4 deletions store/tikv/tests/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
Expand Down Expand Up @@ -143,7 +143,7 @@ func (s *testSnapshotSuite) TestSnapshotCache(c *C) {
c.Assert(err, IsNil)

_, err = snapshot.Get(ctx, []byte("y"))
c.Assert(tidbkv.IsErrNotFound(err), IsTrue)
c.Assert(tikverr.IsErrNotFound(err), IsTrue)

c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/snapshot-get-cache-fail"), IsNil)
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func (s *testSnapshotSuite) TestSkipLargeTxnLock(c *C) {
txn1 := s.beginTxn(c)
// txn1 is not blocked by txn in the large txn protocol.
_, err = txn1.Get(ctx, x)
c.Assert(tidbkv.IsErrNotFound(errors.Trace(err)), IsTrue)
c.Assert(tikverr.IsErrNotFound(errors.Trace(err)), IsTrue)

res, err := toTiDBTxn(&txn1).BatchGet(ctx, toTiDBKeys([][]byte{x, y, []byte("z")}))
c.Assert(err, IsNil)
Expand Down Expand Up @@ -224,7 +224,7 @@ func (s *testSnapshotSuite) TestPointGetSkipTxnLock(c *C) {
c.Assert(committer.GetPrimaryKey(), BytesEquals, x)
// Point get secondary key. Shouldn't be blocked by the lock and read old data.
_, err = snapshot.Get(ctx, y)
c.Assert(tidbkv.IsErrNotFound(errors.Trace(err)), IsTrue)
c.Assert(tikverr.IsErrNotFound(errors.Trace(err)), IsTrue)
c.Assert(time.Since(start), Less, 500*time.Millisecond)

// Commit the primary key
Expand Down
Loading

0 comments on commit a1d4d9d

Please sign in to comment.