Skip to content

Commit

Permalink
store/tikv: Make GC's tests more strict (#11084)
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
MyonKeminta authored Jul 16, 2019
1 parent 2640d6d commit f772318
Show file tree
Hide file tree
Showing 6 changed files with 359 additions and 40 deletions.
48 changes: 48 additions & 0 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ func (s *testMockTiKVSuite) mustBatchResolveLock(c *C, txnInfos map[uint64]uint6
c.Assert(s.store.BatchResolveLock(nil, nil, txnInfos), IsNil)
}

func (s *testMockTiKVSuite) mustGC(c *C, safePoint uint64) {
c.Assert(s.store.GC(nil, nil, safePoint), IsNil)
}

func (s *testMockTiKVSuite) mustDeleteRange(c *C, startKey, endKey string) {
err := s.store.DeleteRange([]byte(startKey), []byte(endKey))
c.Assert(err, IsNil)
Expand Down Expand Up @@ -488,6 +492,50 @@ func (s *testMockTiKVSuite) TestBatchResolveLock(c *C) {
s.mustScanLock(c, 30, nil)
}

func (s *testMockTiKVSuite) TestGC(c *C) {
var safePoint uint64 = 100

// Prepare data
s.mustPutOK(c, "k1", "v1", 1, 2)
s.mustPutOK(c, "k1", "v2", 11, 12)

s.mustPutOK(c, "k2", "v1", 1, 2)
s.mustPutOK(c, "k2", "v2", 11, 12)
s.mustPutOK(c, "k2", "v3", 101, 102)

s.mustPutOK(c, "k3", "v1", 1, 2)
s.mustPutOK(c, "k3", "v2", 11, 12)
s.mustDeleteOK(c, "k3", 101, 102)

s.mustPutOK(c, "k4", "v1", 1, 2)
s.mustDeleteOK(c, "k4", 11, 12)

// Check prepared data
s.mustGetOK(c, "k1", 5, "v1")
s.mustGetOK(c, "k1", 15, "v2")
s.mustGetOK(c, "k2", 5, "v1")
s.mustGetOK(c, "k2", 15, "v2")
s.mustGetOK(c, "k2", 105, "v3")
s.mustGetOK(c, "k3", 5, "v1")
s.mustGetOK(c, "k3", 15, "v2")
s.mustGetNone(c, "k3", 105)
s.mustGetOK(c, "k4", 5, "v1")
s.mustGetNone(c, "k4", 105)

s.mustGC(c, safePoint)

s.mustGetNone(c, "k1", 5)
s.mustGetOK(c, "k1", 15, "v2")
s.mustGetNone(c, "k2", 5)
s.mustGetOK(c, "k2", 15, "v2")
s.mustGetOK(c, "k2", 105, "v3")
s.mustGetNone(c, "k3", 5)
s.mustGetOK(c, "k3", 15, "v2")
s.mustGetNone(c, "k3", 105)
s.mustGetNone(c, "k4", 5)
s.mustGetNone(c, "k4", 105)
}

func (s *testMockTiKVSuite) TestRollbackAndWriteConflict(c *C) {
s.mustPutOK(c, "test", "test", 1, 3)
req := &kvrpcpb.PrewriteRequest{
Expand Down
1 change: 1 addition & 0 deletions store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ type MVCCStore interface {
ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error)
ResolveLock(startKey, endKey []byte, startTS, commitTS uint64) error
BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error
GC(startKey, endKey []byte, safePoint uint64) error
DeleteRange(startKey, endKey []byte) error
Close() error
}
Expand Down
66 changes: 66 additions & 0 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,72 @@ func (mvcc *MVCCLevelDB) BatchResolveLock(startKey, endKey []byte, txnInfos map[
return mvcc.db.Write(batch, nil)
}

// GC implements the MVCCStore interface
func (mvcc *MVCCLevelDB) GC(startKey, endKey []byte, safePoint uint64) error {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()

iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey)
defer iter.Release()
if err != nil {
return errors.Trace(err)
}

// Mock TiKV usually doesn't need to process large amount of data. So write it in a single batch.
batch := &leveldb.Batch{}

for iter.Valid() {
lockDec := lockDecoder{expectKey: currKey}
ok, err := lockDec.Decode(iter)
if err != nil {
return errors.Trace(err)
}
if ok && lockDec.lock.startTS <= safePoint {
return errors.Errorf(
"key %+q has lock with startTs %v which is under safePoint %v",
currKey,
lockDec.lock.startTS,
safePoint)
}

keepNext := true
dec := valueDecoder{expectKey: currKey}

for iter.Valid() {
ok, err := dec.Decode(iter)
if err != nil {
return errors.Trace(err)
}

if !ok {
// Go to the next key
currKey, _, err = mvccDecode(iter.Key())
if err != nil {
return errors.Trace(err)
}
break
}

if dec.value.commitTS > safePoint {
continue
}

if dec.value.valueType == typePut || dec.value.valueType == typeDelete {
// Keep the latest version if it's `typePut`
if !keepNext || dec.value.valueType == typeDelete {
batch.Delete(mvccEncode(currKey, dec.value.commitTS))
}
keepNext = false
} else {
// Delete all other types
batch.Delete(mvccEncode(currKey, dec.value.commitTS))
}
}
}

return mvcc.db.Write(batch, nil)
}

// DeleteRange implements the MVCCStore interface.
func (mvcc *MVCCLevelDB) DeleteRange(startKey, endKey []byte) error {
return mvcc.doRawDeleteRange(codec.EncodeBytes(nil, startKey), codec.EncodeBytes(nil, endKey))
Expand Down
11 changes: 10 additions & 1 deletion store/mockstore/mocktikv/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ var tsMu = struct {

type pdClient struct {
cluster *Cluster
// SafePoint set by `UpdateGCSafePoint`. Not to be confused with SafePointKV.
gcSafePoint uint64
gcSafePointMu sync.Mutex
}

// NewPDClient creates a mock pd.Client that uses local timestamp and meta data
Expand Down Expand Up @@ -108,7 +111,13 @@ func (c *pdClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption)
}

func (c *pdClient) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) {
return 0, nil
c.gcSafePointMu.Lock()
defer c.gcSafePointMu.Unlock()

if safePoint > c.gcSafePoint {
c.gcSafePoint = safePoint
}
return c.gcSafePoint, nil
}

func (c *pdClient) Close() {
Expand Down
14 changes: 13 additions & 1 deletion store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,18 @@ func (h *rpcHandler) handleKvResolveLock(req *kvrpcpb.ResolveLockRequest) *kvrpc
return &kvrpcpb.ResolveLockResponse{}
}

func (h *rpcHandler) handleKvGC(req *kvrpcpb.GCRequest) *kvrpcpb.GCResponse {
startKey := MvccKey(h.startKey).Raw()
endKey := MvccKey(h.endKey).Raw()
err := h.mvccStore.GC(startKey, endKey, req.GetSafePoint())
if err != nil {
return &kvrpcpb.GCResponse{
Error: convertToKeyError(err),
}
}
return &kvrpcpb.GCResponse{}
}

func (h *rpcHandler) handleKvDeleteRange(req *kvrpcpb.DeleteRangeRequest) *kvrpcpb.DeleteRangeResponse {
if !h.checkKeyInRegion(req.StartKey) {
panic("KvDeleteRange: key not in region")
Expand Down Expand Up @@ -773,7 +785,7 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
resp.Resp = &kvrpcpb.GCResponse{RegionError: err}
return resp, nil
}
resp.Resp = &kvrpcpb.GCResponse{}
resp.Resp = handler.handleKvGC(r)
case tikvrpc.CmdDeleteRange:
r := req.DeleteRange()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
Expand Down
Loading

0 comments on commit f772318

Please sign in to comment.