Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: Add raw delete range API #6157

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1946e20
Update kvproto
Mar 21, 2018
9a925e1
Add raw delete range to rawkv
Mar 21, 2018
ae1392b
Merge branch 'master' into misono/add-raw-delete-range-interface
Mar 23, 2018
c195b3d
Fix format of Gopkg.lock
Mar 23, 2018
220ef01
make check
Mar 23, 2018
fcd208c
Merge branch 'master' into misono/add-raw-delete-range-interface
Mar 26, 2018
0d44811
format
Mar 26, 2018
6696040
Move position of RawDeleteRange to just after RawDelete
Mar 27, 2018
a9b68ca
store/mockstore: Add raw delete range to mock store
Mar 27, 2018
fec48ad
Merge branch 'misono/mock-tikv-raw-delete-range' into misono/add-raw-…
Mar 27, 2018
cd74685
Add test for raw delete range
Mar 27, 2018
7c28d87
Merge branch 'master' into misono/add-raw-delete-range-interface
Mar 27, 2018
28b9499
Merge branch 'master' into misono/mock-tikv-raw-delete-range
MyonKeminta Mar 27, 2018
e5f809b
Merge branch 'master' into misono/mock-tikv-raw-delete-range
zhangjinpeng87 Mar 28, 2018
485e8f2
Merge remote-tracking branch 'upstream/master' into misono/mock-tikv-…
Apr 2, 2018
019fda6
Merge branch 'master' into misono/add-raw-delete-range-interface
Apr 2, 2018
a81270b
Merge branch 'master' into misono/add-raw-delete-range-interface
Apr 2, 2018
c88dac2
Add test for mock raw delete range of MvccStore
Apr 3, 2018
05d88e4
Merge branch 'master' into misono/mock-tikv-raw-delete-range
Apr 3, 2018
6132848
make check
Apr 3, 2018
6bfd6e3
Merge branch 'master' into misono/add-raw-delete-range-interface
Apr 3, 2018
17771f2
Merge branch 'misono/mock-tikv-raw-delete-range' into misono/add-raw-…
Apr 3, 2018
2021743
fmt
Apr 3, 2018
006d18e
Merge branch 'misono/mock-tikv-raw-delete-range' into misono/add-raw-…
Apr 3, 2018
fc7786d
Address comments
Apr 3, 2018
13d3a93
Separate success and error metrics
Apr 4, 2018
6214b7d
Merge branch 'master' into misono/add-raw-delete-range-interface
coocood Apr 4, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ type RawKV interface {
RawScan(startKey, endKey []byte, limit int) []Pair
RawPut(key, value []byte)
RawDelete(key []byte)
RawDeleteRange(startKey, endKey []byte)
}

// MVCCDebugger is for debugging.
Expand Down Expand Up @@ -780,6 +781,23 @@ func (s *MvccStore) RawDelete(key []byte) {
s.rawkv.Delete(newRawEntry(key))
}

// RawDeleteRange deletes all key-value pairs in a given range
func (s *MvccStore) RawDeleteRange(startKey, endKey []byte) {
s.Lock()
defer s.Unlock()

var entriesToDelete []*rawEntry
iterator := func(item btree.Item) bool {
entriesToDelete = append(entriesToDelete, item.(*rawEntry))
return true
}
s.rawkv.AscendRange(newRawEntry(startKey), newRawEntry(endKey), iterator)

for _, entry := range entriesToDelete {
s.rawkv.Delete(entry)
}
}

// RawScan reads up to a limited number of rawkv Pairs.
func (s *MvccStore) RawScan(startKey, endKey []byte, limit int) []Pair {
s.RLock()
Expand Down
18 changes: 18 additions & 0 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,17 @@ func (h *rpcHandler) handleKvRawDelete(req *kvrpcpb.RawDeleteRequest) *kvrpcpb.R
return &kvrpcpb.RawDeleteResponse{}
}

func (h *rpcHandler) handleKvRawDeleteRange(req *kvrpcpb.RawDeleteRangeRequest) *kvrpcpb.RawDeleteRangeResponse {
rawKV, ok := h.mvccStore.(RawKV)
if !ok {
return &kvrpcpb.RawDeleteRangeResponse{
Error: "not implemented",
}
}
rawKV.RawDeleteRange(req.GetStartKey(), req.GetEndKey())
return &kvrpcpb.RawDeleteRangeResponse{}
}

func (h *rpcHandler) handleKvRawScan(req *kvrpcpb.RawScanRequest) *kvrpcpb.RawScanResponse {
rawKV, ok := h.mvccStore.(RawKV)
if !ok {
Expand Down Expand Up @@ -598,6 +609,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
return resp, nil
}
resp.RawDelete = handler.handleKvRawDelete(r)
case tikvrpc.CmdRawDeleteRange:
r := req.RawDeleteRange
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.RawDeleteRange = &kvrpcpb.RawDeleteRangeResponse{RegionError: err}
return resp, nil
}
resp.RawDeleteRange = handler.handleKvRawDeleteRange(r)
case tikvrpc.CmdRawScan:
r := req.RawScan
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
Expand Down
72 changes: 72 additions & 0 deletions store/tikv/rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tikv

import (
"bytes"
"time"

"github.com/juju/errors"
Expand Down Expand Up @@ -155,6 +156,32 @@ func (c *RawKVClient) Delete(key []byte) error {
return nil
}

// DeleteRange deletes all key-value pairs in a range from TiKV
func (c *RawKVClient) DeleteRange(startKey []byte, endKey []byte) error {
start := time.Now()
defer func() {
metrics.TiKVRawkvCmdHistogram.WithLabelValues("delete_range").Observe(time.Since(start).Seconds())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add succ or error label?

}()

// Process each affected region respectively
for !bytes.Equal(startKey, endKey) {
resp, actualEndKey, err := c.sendDeleteRangeReq(startKey, endKey)
if err != nil {
return errors.Trace(err)
}
cmdResp := resp.RawDeleteRange
if cmdResp == nil {
return errors.Trace(ErrBodyMissing)
}
if cmdResp.GetError() != "" {
return errors.New(cmdResp.GetError())
}
startKey = actualEndKey
}

return nil
}

// Scan queries continuous kv pairs, starts from startKey, up to limit pairs.
// If you want to exclude the startKey, append a '\0' to the key: `Scan(append(startKey, '\0'), limit)`.
func (c *RawKVClient) Scan(startKey []byte, limit int) (keys [][]byte, values [][]byte, err error) {
Expand Down Expand Up @@ -219,3 +246,48 @@ func (c *RawKVClient) sendReq(key []byte, req *tikvrpc.Request) (*tikvrpc.Respon
return resp, loc, nil
}
}

// sendDeleteRangeReq sends a raw delete range request and returns the response and the actual endKey.
// If the given range spans over more than one regions, the actual endKey is the end of the first region.
// We can't use sendReq directly, because we need to know the end of the region before we send the request
// TODO: Is there any better way to avoid duplicating code with func `sendReq` ?
func (c *RawKVClient) sendDeleteRangeReq(startKey []byte, endKey []byte) (*tikvrpc.Response, []byte, error) {
bo := NewBackoffer(context.Background(), rawkvMaxBackoff)
sender := NewRegionRequestSender(c.regionCache, c.rpcClient)
for {
loc, err := c.regionCache.LocateKey(bo, startKey)
if err != nil {
return nil, nil, errors.Trace(err)
}

actualEndKey := endKey
if len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, endKey) < 0 {
actualEndKey = loc.EndKey
}

req := &tikvrpc.Request{
Type: tikvrpc.CmdRawDeleteRange,
RawDeleteRange: &kvrpcpb.RawDeleteRangeRequest{
StartKey: startKey,
EndKey: actualEndKey,
},
}

resp, err := sender.SendReq(bo, req, loc.Region, readTimeoutShort)
if err != nil {
return nil, nil, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return nil, nil, errors.Trace(err)
}
if regionErr != nil {
err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return nil, nil, errors.Trace(err)
}
continue
}
return resp, actualEndKey, nil
}
}
74 changes: 61 additions & 13 deletions store/tikv/rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tikv

import (
"bytes"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a newline after this

. "github.com/pingcap/check"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"golang.org/x/net/context"
Expand Down Expand Up @@ -78,6 +79,38 @@ func (s *testRawKVSuite) mustScan(c *C, startKey string, limit int, expect ...st
}
}

func (s *testRawKVSuite) mustDeleteRange(c *C, startKey, endKey []byte, expected map[string]string) {
err := s.client.DeleteRange(startKey, endKey)
c.Assert(err, IsNil)

for keyStr := range expected {
key := []byte(keyStr)
if bytes.Compare(startKey, key) <= 0 && bytes.Compare(key, endKey) < 0 {
delete(expected, keyStr)
}
}

s.checkData(c, expected)
}

func (s *testRawKVSuite) checkData(c *C, expected map[string]string) {
keys, values, err := s.client.Scan([]byte(""), len(expected)+1)
c.Assert(err, IsNil)

c.Assert(len(expected), Equals, len(keys))
for i, key := range keys {
c.Assert(expected[string(key)], Equals, string(values[i]))
}
}

func (s *testRawKVSuite) split(c *C, regionKey, splitKey string) {
loc, err := s.client.regionCache.LocateKey(s.bo, []byte(regionKey))
c.Assert(err, IsNil)

newRegionID, peerID := s.cluster.AllocID(), s.cluster.AllocID()
s.cluster.SplitRaw(loc.Region.id, newRegionID, []byte(splitKey), []uint64{peerID}, peerID)
}

func (s *testRawKVSuite) TestSimple(c *C) {
s.mustNotExist(c, []byte("key"))
s.mustPut(c, []byte("key"), []byte("value"))
Expand All @@ -89,13 +122,10 @@ func (s *testRawKVSuite) TestSimple(c *C) {
}

func (s *testRawKVSuite) TestSplit(c *C) {
loc, err := s.client.regionCache.LocateKey(s.bo, []byte("k"))
c.Assert(err, IsNil)
s.mustPut(c, []byte("k1"), []byte("v1"))
s.mustPut(c, []byte("k3"), []byte("v3"))

newRegionID, peerID := s.cluster.AllocID(), s.cluster.AllocID()
s.cluster.SplitRaw(loc.Region.id, newRegionID, []byte("k2"), []uint64{peerID}, peerID)
s.split(c, "k", "k2")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let split return error and c.Assert(s.split(c, "k", "k2"), IsNil) here,
or rename split to mustSplit

I prefer former, because when there are many split() call and assert failed, we won't lost the error stack.


s.mustGet(c, []byte("k1"), []byte("v1"))
s.mustGet(c, []byte("k3"), []byte("v3"))
Expand All @@ -115,16 +145,34 @@ func (s *testRawKVSuite) TestScan(c *C) {
s.mustScan(c, "k2", 3, "k3", "v3", "k5", "v5", "k7", "v7")
}

split := func(regionKey, splitKey string) {
loc, err := s.client.regionCache.LocateKey(s.bo, []byte(regionKey))
c.Assert(err, IsNil)
newRegionID, peerID := s.cluster.AllocID(), s.cluster.AllocID()
s.cluster.SplitRaw(loc.Region.id, newRegionID, []byte(splitKey), []uint64{peerID}, peerID)
}

check()
split("k", "k2")
s.split(c, "k", "k2")
check()
split("k2", "k5")
s.split(c, "k2", "k5")
check()
}

func (s *testRawKVSuite) TestDeleteRange(c *C) {
// Init data
testData := map[string]string{}
for _, i := range []byte("abcd") {
for j := byte('0'); j <= byte('9'); j++ {
key := []byte{i, j}
value := []byte{'v', i, j}
s.mustPut(c, key, value)

testData[string(key)] = string(value)
}
}

s.split(c, "b", "b")
s.split(c, "c", "c")
s.split(c, "d", "d")

s.checkData(c, testData)
s.mustDeleteRange(c, []byte("b"), []byte("c0"), testData)
s.mustDeleteRange(c, []byte("c11"), []byte("c12"), testData)
s.mustDeleteRange(c, []byte("d0"), []byte("d0"), testData)
s.mustDeleteRange(c, []byte("c5"), []byte("d5"), testData)
s.mustDeleteRange(c, []byte("a"), []byte("z"), testData)
}
3 changes: 3 additions & 0 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ func (s *mockTikvGrpcServer) RawPut(context.Context, *kvrpcpb.RawPutRequest) (*k
func (s *mockTikvGrpcServer) RawDelete(context.Context, *kvrpcpb.RawDeleteRequest) (*kvrpcpb.RawDeleteResponse, error) {
return nil, errors.New("unreachable")
}
func (s *mockTikvGrpcServer) RawDeleteRange(context.Context, *kvrpcpb.RawDeleteRangeRequest) (*kvrpcpb.RawDeleteRangeResponse, error) {
return nil, errors.New("unreachable")
}
func (s *mockTikvGrpcServer) RawScan(context.Context, *kvrpcpb.RawScanRequest) (*kvrpcpb.RawScanResponse, error) {
return nil, errors.New("unreachable")
}
Expand Down
15 changes: 15 additions & 0 deletions store/tikv/tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
CmdRawGet CmdType = 256 + iota
CmdRawPut
CmdRawDelete
CmdRawDeleteRange
CmdRawScan

CmdCop CmdType = 512 + iota
Expand Down Expand Up @@ -87,6 +88,8 @@ func (t CmdType) String() string {
return "RawPut"
case CmdRawDelete:
return "RawDelete"
case CmdRawDeleteRange:
return "RawDeleteRange"
case CmdRawScan:
return "RawScan"
case CmdCop:
Expand Down Expand Up @@ -121,6 +124,7 @@ type Request struct {
RawGet *kvrpcpb.RawGetRequest
RawPut *kvrpcpb.RawPutRequest
RawDelete *kvrpcpb.RawDeleteRequest
RawDeleteRange *kvrpcpb.RawDeleteRangeRequest
RawScan *kvrpcpb.RawScanRequest
Cop *coprocessor.Request
MvccGetByKey *kvrpcpb.MvccGetByKeyRequest
Expand All @@ -145,6 +149,7 @@ type Response struct {
RawGet *kvrpcpb.RawGetResponse
RawPut *kvrpcpb.RawPutResponse
RawDelete *kvrpcpb.RawDeleteResponse
RawDeleteRange *kvrpcpb.RawDeleteRangeResponse
RawScan *kvrpcpb.RawScanResponse
Cop *coprocessor.Response
CopStream *CopStreamResponse
Expand Down Expand Up @@ -199,6 +204,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
req.RawPut.Context = ctx
case CmdRawDelete:
req.RawDelete.Context = ctx
case CmdRawDeleteRange:
req.RawDeleteRange.Context = ctx
case CmdRawScan:
req.RawScan.Context = ctx
case CmdCop:
Expand Down Expand Up @@ -279,6 +286,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) {
resp.RawDelete = &kvrpcpb.RawDeleteResponse{
RegionError: e,
}
case CmdRawDeleteRange:
resp.RawDeleteRange = &kvrpcpb.RawDeleteRangeResponse{
RegionError: e,
}
case CmdRawScan:
resp.RawScan = &kvrpcpb.RawScanResponse{
RegionError: e,
Expand Down Expand Up @@ -343,6 +354,8 @@ func (resp *Response) GetRegionError() (*errorpb.Error, error) {
e = resp.RawPut.GetRegionError()
case CmdRawDelete:
e = resp.RawDelete.GetRegionError()
case CmdRawDeleteRange:
e = resp.RawDeleteRange.GetRegionError()
case CmdRawScan:
e = resp.RawScan.GetRegionError()
case CmdCop:
Expand Down Expand Up @@ -397,6 +410,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp
resp.RawPut, err = client.RawPut(ctx, req.RawPut)
case CmdRawDelete:
resp.RawDelete, err = client.RawDelete(ctx, req.RawDelete)
case CmdRawDeleteRange:
resp.RawDeleteRange, err = client.RawDeleteRange(ctx, req.RawDeleteRange)
case CmdRawScan:
resp.RawScan, err = client.RawScan(ctx, req.RawScan)
case CmdCop:
Expand Down
Loading