Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Fix the issue that rawkv restore's end key is incorrectly inclusive when restoring partial of backed up data (#201) #726

Merged
merged 2 commits into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,10 @@ func (importer *FileImporter) downloadRawKVSST(
if bytes.Compare(importer.rawStartKey, sstMeta.Range.GetStart()) > 0 {
sstMeta.Range.Start = importer.rawStartKey
}
// TODO: importer.RawEndKey is exclusive but sstMeta.Range.End is inclusive. How to exclude importer.RawEndKey?
if len(importer.rawEndKey) > 0 && bytes.Compare(importer.rawEndKey, sstMeta.Range.GetEnd()) < 0 {
if len(importer.rawEndKey) > 0 &&
(len(sstMeta.Range.GetEnd()) == 0 || bytes.Compare(importer.rawEndKey, sstMeta.Range.GetEnd()) <= 0) {
sstMeta.Range.End = importer.rawEndKey
sstMeta.EndKeyExclusive = true
}
if bytes.Compare(sstMeta.Range.GetStart(), sstMeta.Range.GetEnd()) > 0 {
return nil, errors.Trace(berrors.ErrKVRangeIsEmpty)
Expand All @@ -428,6 +429,7 @@ func (importer *FileImporter) downloadRawKVSST(
StorageBackend: importer.backend,
Name: file.GetName(),
RewriteRule: rule,
IsRawKv: true,
}
log.Debug("download SST", logutil.SSTMeta(&sstMeta), logutil.Region(regionInfo.Region))
var err error
Expand Down
41 changes: 38 additions & 3 deletions tests/br_rawkv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"hash/crc64"
"math/rand"
"strings"
"time"

"github.com/pingcap/errors"
Expand All @@ -18,12 +19,14 @@ import (

var (
pdAddr = flag.String("pd", "127.0.0.1:2379", "Address of PD")
runMode = flag.String("mode", "", "Mode. One of 'rand-gen', 'checksum', 'scan' and 'diff'")
runMode = flag.String("mode", "", "Mode. One of 'rand-gen', 'checksum', 'scan', 'diff', 'delete' and 'put'")
startKeyStr = flag.String("start-key", "", "Start key in hex")
endKeyStr = flag.String("end-key", "", "End key in hex")
keyMaxLen = flag.Int("key-max-len", 32, "Max length of keys for rand-gen mode")
concurrency = flag.Int("concurrency", 32, "Concurrency to run rand-gen")
duration = flag.Int("duration", 10, "duration(second) of rand-gen")
putDataStr = flag.String("put-data", "", "Kv pairs to put to the cluster in hex. "+
"kv pairs are separated by commas, key and value in a pair are separated by a colon")
)

func createClient(addr string) (*tikv.RawKVClient, error) {
Expand All @@ -42,8 +45,8 @@ func main() {
if err != nil {
log.Panic("Invalid endKey: %v, err: %+v", zap.String("endkey", *endKeyStr), zap.Error(err))
}

if len(endKey) == 0 {
// For "put" mode, the key range is not used. So no need to throw error here.
if len(endKey) == 0 && *runMode != "put" {
log.Panic("Empty endKey is not supported yet")
}

Expand All @@ -66,6 +69,8 @@ func main() {
err = scan(client, startKey, endKey)
case "delete":
err = deleteRange(client, startKey, endKey)
case "put":
err = put(client, *putDataStr)
}

if err != nil {
Expand Down Expand Up @@ -233,6 +238,7 @@ func checksum(client *tikv.RawKVClient, startKey, endKey []byte) error {
}

log.Info("Checksum result", zap.Uint64("checksum", res))
fmt.Printf("Checksum result: %016x\n", res)
return nil
}

Expand Down Expand Up @@ -268,6 +274,35 @@ func scan(client *tikv.RawKVClient, startKey, endKey []byte) error {
return nil
}

func put(client *tikv.RawKVClient, dataStr string) error {
keys := make([][]byte, 0)
values := make([][]byte, 0)

for _, pairStr := range strings.Split(dataStr, ",") {
pair := strings.Split(pairStr, ":")
if len(pair) != 2 {
return errors.Errorf("invalid kv pair string %q", pairStr)
}

key, err := hex.DecodeString(strings.Trim(pair[0], " "))
if err != nil {
return errors.Annotatef(err, "invalid kv pair string %q", pairStr)
}
value, err := hex.DecodeString(strings.Trim(pair[1], " "))
if err != nil {
return errors.Annotatef(err, "invalid kv pair string %q", pairStr)
}

keys = append(keys, key)
values = append(values, value)
}

log.Info("Put rawkv data", zap.ByteStrings("keys", keys), zap.ByteStrings("values", values))

err := client.BatchPut(keys, values)
return errors.Trace(err)
}

const defaultScanBatchSize = 128

type rawKVScanner struct {
Expand Down
23 changes: 13 additions & 10 deletions tests/br_rawkv/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ checksum_empty=$(checksum 31 3130303030303030)
# generate raw kv randomly in range[start-key, end-key) in 10s
bin/rawkv --pd $PD_ADDR --mode rand-gen --start-key 31 --end-key 3130303030303030 --duration 10

# put some keys around 311122 to check the correctness of endKey of restoring
bin/rawkv --pd $PD_ADDR --mode put --put-data "311121:31, 31112100:32, 311122:33, 31112200:34, 3111220000:35, 311123:36"

checksum_ori=$(checksum 31 3130303030303030)
checksum_partial=$(checksum 311111 311122)

Expand Down Expand Up @@ -75,15 +78,15 @@ if [ "$checksum_new" != "$checksum_empty" ];then
fail_and_exit
fi

# FIXME restore rawkv partially after change endkey to inclusive
# echo "restore start..."
# run_br --pd $PD_ADDR restore raw -s "local://$TEST_DIR/$BACKUP_DIR" --start 311111 --end 311122 --format hex --concurrency 4
#
# checksum_new=$(checksum 31 3130303030303030)
#
# if [ "$checksum_new" != "$checksum_partial" ];then
# echo "checksum failed after restore"
# fail_and_exit
# fi
echo "partial restore start..."
run_br --pd $PD_ADDR restore raw -s "local://$TEST_DIR/$BACKUP_DIR" --start 311111 --end 311122 --format hex --concurrency 4
bin/rawkv --pd $PD_ADDR --mode scan --start-key 311121 --end-key 33

checksum_new=$(checksum 31 3130303030303030)

if [ "$checksum_new" != "$checksum_partial" ];then
echo "checksum failed after restore"
fail_and_exit
fi

echo "TEST: [$TEST_NAME] successed!"