Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: Add raw delete range API #6157

Merged
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1946e20
Update kvproto
Mar 21, 2018
9a925e1
Add raw delete range to rawkv
Mar 21, 2018
ae1392b
Merge branch 'master' into misono/add-raw-delete-range-interface
Mar 23, 2018
c195b3d
Fix format of Gopkg.lock
Mar 23, 2018
220ef01
make check
Mar 23, 2018
fcd208c
Merge branch 'master' into misono/add-raw-delete-range-interface
Mar 26, 2018
0d44811
format
Mar 26, 2018
6696040
Move position of RawDeleteRange to just after RawDelete
Mar 27, 2018
a9b68ca
store/mockstore: Add raw delete range to mock store
Mar 27, 2018
fec48ad
Merge branch 'misono/mock-tikv-raw-delete-range' into misono/add-raw-…
Mar 27, 2018
cd74685
Add test for raw delete range
Mar 27, 2018
7c28d87
Merge branch 'master' into misono/add-raw-delete-range-interface
Mar 27, 2018
28b9499
Merge branch 'master' into misono/mock-tikv-raw-delete-range
MyonKeminta Mar 27, 2018
e5f809b
Merge branch 'master' into misono/mock-tikv-raw-delete-range
zhangjinpeng87 Mar 28, 2018
485e8f2
Merge remote-tracking branch 'upstream/master' into misono/mock-tikv-…
Apr 2, 2018
019fda6
Merge branch 'master' into misono/add-raw-delete-range-interface
Apr 2, 2018
a81270b
Merge branch 'master' into misono/add-raw-delete-range-interface
Apr 2, 2018
c88dac2
Add test for mock raw delete range of MvccStore
Apr 3, 2018
05d88e4
Merge branch 'master' into misono/mock-tikv-raw-delete-range
Apr 3, 2018
6132848
make check
Apr 3, 2018
6bfd6e3
Merge branch 'master' into misono/add-raw-delete-range-interface
Apr 3, 2018
17771f2
Merge branch 'misono/mock-tikv-raw-delete-range' into misono/add-raw-…
Apr 3, 2018
2021743
fmt
Apr 3, 2018
006d18e
Merge branch 'misono/mock-tikv-raw-delete-range' into misono/add-raw-…
Apr 3, 2018
fc7786d
Address comments
Apr 3, 2018
13d3a93
Separate success and error metrics
Apr 4, 2018
6214b7d
Merge branch 'master' into misono/add-raw-delete-range-interface
coocood Apr 4, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 66 additions & 8 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package mocktikv

import (
"bytes"
"strings"
"testing"

Expand All @@ -33,24 +34,81 @@ type testMockTiKVSuite struct {

type testMarshal struct{}

// testMvccStore is used to test MvccStore implementation.
type testMvccStore struct {
testMockTiKVSuite
}

// testMVCCLevelDB is used to test MVCCLevelDB implementation.
type testMVCCLevelDB struct {
testMockTiKVSuite
}

var (
_ = Suite(&testMvccStore{})
_ = Suite(&testMVCCLevelDB{})
_ = Suite(testMarshal{})
)

// testMvccStore is used to test MvccStore implementation.
type testMvccStore struct {
testMockTiKVSuite
}

func (s *testMvccStore) SetUpTest(c *C) {
s.store = NewMvccStore()
}

// testMVCCLevelDB is used to test MVCCLevelDB implementation.
type testMVCCLevelDB struct {
testMockTiKVSuite
func (s *testMvccStore) mustRawGet(c *C, key []byte, value []byte) {
rawkv := s.store.(RawKV)
actualValue := rawkv.RawGet(key)
c.Assert(actualValue, DeepEquals, value)
}

func (s *testMvccStore) rawPut(c *C, key []byte, value []byte) {
rawkv := s.store.(RawKV)
rawkv.RawPut(key, value)
}

func (s *testMvccStore) checkRawData(c *C, expected map[string]string) {
rawkv := s.store.(RawKV)
pairs := rawkv.RawScan([]byte(""), []byte("zzzzzzzz"), len(expected)+1)
c.Assert(len(pairs), Equals, len(expected))

actual := map[string]string{}
for _, pair := range pairs {
actual[string(pair.Key)] = string(pair.Value)
}

c.Assert(actual, DeepEquals, expected)
}

func (s *testMvccStore) mustRawDeleteRange(c *C, startKey, endKey []byte, expectedData map[string]string) {
rawkv := s.store.(RawKV)
rawkv.RawDeleteRange(startKey, endKey)

for strKey := range expectedData {
key := []byte(strKey)
if bytes.Compare(startKey, key) <= 0 && bytes.Compare(key, endKey) < 0 {
delete(expectedData, strKey)
}
}

s.checkRawData(c, expectedData)
}

func (s *testMvccStore) TestRawDeleteRange(c *C) {
testData := map[string]string{}

for i := byte('a'); i <= byte('g'); i++ {
key := []byte{i}
value := []byte{'v', i}
s.rawPut(c, key, value)
testData[string(key)] = string(value)
}

s.checkRawData(c, testData)

s.mustRawDeleteRange(c, []byte("b"), []byte("b"), testData)
s.mustRawDeleteRange(c, []byte("d"), []byte("f"), testData)
s.mustRawDeleteRange(c, []byte("b"), []byte("c"), testData)
s.mustRawDeleteRange(c, []byte("c11"), []byte("c99"), testData)
s.mustRawDeleteRange(c, []byte("a"), []byte("z"), testData)
}

func (s *testMockTiKVSuite) SetUpTest(c *C) {
Expand Down
18 changes: 18 additions & 0 deletions store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ type RawKV interface {
RawScan(startKey, endKey []byte, limit int) []Pair
RawPut(key, value []byte)
RawDelete(key []byte)
RawDeleteRange(startKey, endKey []byte)
}

// MVCCDebugger is for debugging.
Expand Down Expand Up @@ -780,6 +781,23 @@ func (s *MvccStore) RawDelete(key []byte) {
s.rawkv.Delete(newRawEntry(key))
}

// RawDeleteRange deletes all key-value pairs in a given range
func (s *MvccStore) RawDeleteRange(startKey, endKey []byte) {
s.Lock()
defer s.Unlock()

var entriesToDelete []*rawEntry
iterator := func(item btree.Item) bool {
entriesToDelete = append(entriesToDelete, item.(*rawEntry))
return true
}
s.rawkv.AscendRange(newRawEntry(startKey), newRawEntry(endKey), iterator)

for _, entry := range entriesToDelete {
s.rawkv.Delete(entry)
}
}

// RawScan reads up to a limited number of rawkv Pairs.
func (s *MvccStore) RawScan(startKey, endKey []byte, limit int) []Pair {
s.RLock()
Expand Down
18 changes: 18 additions & 0 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,17 @@ func (h *rpcHandler) handleKvRawDelete(req *kvrpcpb.RawDeleteRequest) *kvrpcpb.R
return &kvrpcpb.RawDeleteResponse{}
}

func (h *rpcHandler) handleKvRawDeleteRange(req *kvrpcpb.RawDeleteRangeRequest) *kvrpcpb.RawDeleteRangeResponse {
rawKV, ok := h.mvccStore.(RawKV)
if !ok {
return &kvrpcpb.RawDeleteRangeResponse{
Error: "not implemented",
}
}
rawKV.RawDeleteRange(req.GetStartKey(), req.GetEndKey())
return &kvrpcpb.RawDeleteRangeResponse{}
}

func (h *rpcHandler) handleKvRawScan(req *kvrpcpb.RawScanRequest) *kvrpcpb.RawScanResponse {
rawKV, ok := h.mvccStore.(RawKV)
if !ok {
Expand Down Expand Up @@ -598,6 +609,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
return resp, nil
}
resp.RawDelete = handler.handleKvRawDelete(r)
case tikvrpc.CmdRawDeleteRange:
r := req.RawDeleteRange
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.RawDeleteRange = &kvrpcpb.RawDeleteRangeResponse{RegionError: err}
return resp, nil
}
resp.RawDeleteRange = handler.handleKvRawDeleteRange(r)
case tikvrpc.CmdRawScan:
r := req.RawScan
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
Expand Down
79 changes: 79 additions & 0 deletions store/tikv/rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tikv

import (
"bytes"
"time"

"github.com/juju/errors"
Expand Down Expand Up @@ -155,6 +156,39 @@ func (c *RawKVClient) Delete(key []byte) error {
return nil
}

// DeleteRange deletes all key-value pairs in a range from TiKV
func (c *RawKVClient) DeleteRange(startKey []byte, endKey []byte) error {
start := time.Now()
var err error
defer func() {
var label = "delete_range"
if err != nil {
label += "_error"
}
metrics.TiKVRawkvCmdHistogram.WithLabelValues(label).Observe(time.Since(start).Seconds())
}()

// Process each affected region respectively
for !bytes.Equal(startKey, endKey) {
var resp *tikvrpc.Response
var actualEndKey []byte
resp, actualEndKey, err = c.sendDeleteRangeReq(startKey, endKey)
if err != nil {
return errors.Trace(err)
}
cmdResp := resp.RawDeleteRange
if cmdResp == nil {
return errors.Trace(ErrBodyMissing)
}
if cmdResp.GetError() != "" {
return errors.New(cmdResp.GetError())
}
startKey = actualEndKey
}

return nil
}

// Scan queries continuous kv pairs, starts from startKey, up to limit pairs.
// If you want to exclude the startKey, append a '\0' to the key: `Scan(append(startKey, '\0'), limit)`.
func (c *RawKVClient) Scan(startKey []byte, limit int) (keys [][]byte, values [][]byte, err error) {
Expand Down Expand Up @@ -219,3 +253,48 @@ func (c *RawKVClient) sendReq(key []byte, req *tikvrpc.Request) (*tikvrpc.Respon
return resp, loc, nil
}
}

// sendDeleteRangeReq sends a raw delete range request and returns the response and the actual endKey.
// If the given range spans over more than one regions, the actual endKey is the end of the first region.
// We can't use sendReq directly, because we need to know the end of the region before we send the request
// TODO: Is there any better way to avoid duplicating code with func `sendReq` ?
func (c *RawKVClient) sendDeleteRangeReq(startKey []byte, endKey []byte) (*tikvrpc.Response, []byte, error) {
bo := NewBackoffer(context.Background(), rawkvMaxBackoff)
sender := NewRegionRequestSender(c.regionCache, c.rpcClient)
for {
loc, err := c.regionCache.LocateKey(bo, startKey)
if err != nil {
return nil, nil, errors.Trace(err)
}

actualEndKey := endKey
if len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, endKey) < 0 {
actualEndKey = loc.EndKey
}

req := &tikvrpc.Request{
Type: tikvrpc.CmdRawDeleteRange,
RawDeleteRange: &kvrpcpb.RawDeleteRangeRequest{
StartKey: startKey,
EndKey: actualEndKey,
},
}

resp, err := sender.SendReq(bo, req, loc.Region, readTimeoutShort)
if err != nil {
return nil, nil, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return nil, nil, errors.Trace(err)
}
if regionErr != nil {
err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return nil, nil, errors.Trace(err)
}
continue
}
return resp, actualEndKey, nil
}
}
86 changes: 73 additions & 13 deletions store/tikv/rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package tikv

import (
"bytes"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a newline after this


. "github.com/pingcap/check"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"golang.org/x/net/context"
Expand Down Expand Up @@ -78,6 +80,41 @@ func (s *testRawKVSuite) mustScan(c *C, startKey string, limit int, expect ...st
}
}

func (s *testRawKVSuite) mustDeleteRange(c *C, startKey, endKey []byte, expected map[string]string) {
err := s.client.DeleteRange(startKey, endKey)
c.Assert(err, IsNil)

for keyStr := range expected {
key := []byte(keyStr)
if bytes.Compare(startKey, key) <= 0 && bytes.Compare(key, endKey) < 0 {
delete(expected, keyStr)
}
}

s.checkData(c, expected)
}

func (s *testRawKVSuite) checkData(c *C, expected map[string]string) {
keys, values, err := s.client.Scan([]byte(""), len(expected)+1)
c.Assert(err, IsNil)

c.Assert(len(expected), Equals, len(keys))
for i, key := range keys {
c.Assert(expected[string(key)], Equals, string(values[i]))
}
}

func (s *testRawKVSuite) split(c *C, regionKey, splitKey string) error {
loc, err := s.client.regionCache.LocateKey(s.bo, []byte(regionKey))
if err != nil {
return err
}

newRegionID, peerID := s.cluster.AllocID(), s.cluster.AllocID()
s.cluster.SplitRaw(loc.Region.id, newRegionID, []byte(splitKey), []uint64{peerID}, peerID)
return nil
}

func (s *testRawKVSuite) TestSimple(c *C) {
s.mustNotExist(c, []byte("key"))
s.mustPut(c, []byte("key"), []byte("value"))
Expand All @@ -89,13 +126,11 @@ func (s *testRawKVSuite) TestSimple(c *C) {
}

func (s *testRawKVSuite) TestSplit(c *C) {
loc, err := s.client.regionCache.LocateKey(s.bo, []byte("k"))
c.Assert(err, IsNil)
s.mustPut(c, []byte("k1"), []byte("v1"))
s.mustPut(c, []byte("k3"), []byte("v3"))

newRegionID, peerID := s.cluster.AllocID(), s.cluster.AllocID()
s.cluster.SplitRaw(loc.Region.id, newRegionID, []byte("k2"), []uint64{peerID}, peerID)
err := s.split(c, "k", "k2")
c.Assert(err, IsNil)

s.mustGet(c, []byte("k1"), []byte("v1"))
s.mustGet(c, []byte("k3"), []byte("v3"))
Expand All @@ -115,16 +150,41 @@ func (s *testRawKVSuite) TestScan(c *C) {
s.mustScan(c, "k2", 3, "k3", "v3", "k5", "v5", "k7", "v7")
}

split := func(regionKey, splitKey string) {
loc, err := s.client.regionCache.LocateKey(s.bo, []byte(regionKey))
c.Assert(err, IsNil)
newRegionID, peerID := s.cluster.AllocID(), s.cluster.AllocID()
s.cluster.SplitRaw(loc.Region.id, newRegionID, []byte(splitKey), []uint64{peerID}, peerID)
}

check()
split("k", "k2")

err := s.split(c, "k", "k2")
c.Assert(err, IsNil)
check()
split("k2", "k5")

err = s.split(c, "k2", "k5")
c.Assert(err, IsNil)
check()
}

func (s *testRawKVSuite) TestDeleteRange(c *C) {
// Init data
testData := map[string]string{}
for _, i := range []byte("abcd") {
for j := byte('0'); j <= byte('9'); j++ {
key := []byte{i, j}
value := []byte{'v', i, j}
s.mustPut(c, key, value)

testData[string(key)] = string(value)
}
}

err := s.split(c, "b", "b")
c.Assert(err, IsNil)
err = s.split(c, "c", "c")
c.Assert(err, IsNil)
err = s.split(c, "d", "d")
c.Assert(err, IsNil)

s.checkData(c, testData)
s.mustDeleteRange(c, []byte("b"), []byte("c0"), testData)
s.mustDeleteRange(c, []byte("c11"), []byte("c12"), testData)
s.mustDeleteRange(c, []byte("d0"), []byte("d0"), testData)
s.mustDeleteRange(c, []byte("c5"), []byte("d5"), testData)
s.mustDeleteRange(c, []byte("a"), []byte("z"), testData)
}
Loading