Skip to content

Commit 08b93fd

Browse files
authored
br: fix the split issue in txn restore (#45441) (#45548)
close #45476
1 parent c349b18 commit 08b93fd

File tree

6 files changed

+429
-2
lines changed

6 files changed

+429
-2
lines changed

Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ build_for_br_integration_test:
327327
$(GOBUILD) $(RACE_FLAG) -o bin/gc br/tests/br_z_gc_safepoint/*.go && \
328328
$(GOBUILD) $(RACE_FLAG) -o bin/oauth br/tests/br_gcs/*.go && \
329329
$(GOBUILD) $(RACE_FLAG) -o bin/rawkv br/tests/br_rawkv/*.go && \
330+
$(GOBUILD) $(RACE_FLAG) -o bin/txnkv br/tests/br_txn/*.go && \
330331
$(GOBUILD) $(RACE_FLAG) -o bin/parquet_gen br/tests/lightning_checkpoint_parquet/*.go \
331332
) || (make failpoint-disable && exit 1)
332333
@make failpoint-disable

br/pkg/task/restore_txn.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func RunRestoreTxn(c context.Context, g glue.Glue, cmdName string, cfg *Config)
8787
!cfg.LogProgress)
8888

8989
// RawKV restore does not need to rewrite keys.
90-
err = restore.SplitRanges(ctx, client, ranges, nil, updateCh, true)
90+
err = restore.SplitRanges(ctx, client, ranges, nil, updateCh, false)
9191
if err != nil {
9292
return errors.Trace(err)
9393
}

br/tests/br_txn/BUILD.bazel

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
2+
3+
go_library(
4+
name = "br_txn_lib",
5+
srcs = ["client.go"],
6+
importpath = "github.com/pingcap/tidb/br/tests/br_txn",
7+
visibility = ["//visibility:private"],
8+
deps = [
9+
"@com_github_pingcap_errors//:errors",
10+
"@com_github_pingcap_log//:log",
11+
"@com_github_tikv_client_go_v2//config",
12+
"@com_github_tikv_client_go_v2//txnkv",
13+
"@org_uber_go_zap//:zap",
14+
],
15+
)
16+
17+
go_binary(
18+
name = "br_txn",
19+
embed = [":br_txn_lib"],
20+
visibility = ["//visibility:public"],
21+
)

br/tests/br_txn/client.go

+268
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/hex"
7+
"flag"
8+
"fmt"
9+
"hash/crc64"
10+
"math/rand"
11+
"time"
12+
13+
"github.com/pingcap/errors"
14+
"github.com/pingcap/log"
15+
"github.com/tikv/client-go/v2/config"
16+
"github.com/tikv/client-go/v2/txnkv"
17+
"go.uber.org/zap"
18+
)
19+
20+
var (
21+
ca = flag.String("ca", "", "CA certificate path for TLS connection")
22+
cert = flag.String("cert", "", "certificate path for TLS connection")
23+
key = flag.String("key", "", "private key path for TLS connection")
24+
pdAddr = flag.String("pd", "127.0.0.1:2379", "Address of PD")
25+
runMode = flag.String("mode", "", "Mode. One of 'rand-gen', 'checksum', 'scan', 'delete'")
26+
startKeyStr = flag.String("start-key", "", "Start key in hex")
27+
endKeyStr = flag.String("end-key", "", "End key in hex")
28+
keyMaxLen = flag.Int("key-max-len", 32, "Max length of keys for rand-gen mode")
29+
concurrency = flag.Int("concurrency", 32, "Concurrency to run rand-gen")
30+
duration = flag.Int("duration", 10, "duration(second) of rand-gen")
31+
)
32+
33+
func createClient(addr string) (*txnkv.Client, error) {
34+
if *ca != "" {
35+
conf := config.GetGlobalConfig()
36+
conf.Security.ClusterSSLCA = *ca
37+
conf.Security.ClusterSSLCert = *cert
38+
conf.Security.ClusterSSLKey = *key
39+
config.StoreGlobalConfig(conf)
40+
}
41+
42+
cli, err := txnkv.NewClient([]string{addr})
43+
return cli, errors.Trace(err)
44+
}
45+
46+
func main() {
47+
flag.Parse()
48+
49+
startKey := []byte(*startKeyStr)
50+
endKey := []byte(*endKeyStr)
51+
if len(endKey) == 0 {
52+
log.Panic("Empty endKey is not supported yet")
53+
}
54+
55+
if *runMode == "test-rand-key" {
56+
testRandKey(startKey, endKey, *keyMaxLen)
57+
return
58+
}
59+
60+
client, err := createClient(*pdAddr)
61+
if err != nil {
62+
log.Panic("Failed to create client", zap.String("pd", *pdAddr), zap.Error(err))
63+
}
64+
65+
switch *runMode {
66+
case "rand-gen":
67+
err = randGenWithDuration(client, startKey, endKey, *keyMaxLen, *concurrency, *duration)
68+
case "checksum":
69+
err = checksum(client, startKey, endKey)
70+
case "delete":
71+
err = deleteRange(client, startKey, endKey)
72+
}
73+
74+
if err != nil {
75+
log.Panic("Error", zap.Error(err))
76+
}
77+
}
78+
79+
func randGenWithDuration(client *txnkv.Client, startKey, endKey []byte,
80+
maxLen int, concurrency int, duration int) error {
81+
var err error
82+
ok := make(chan struct{})
83+
go func() {
84+
err = randGen(client, startKey, endKey, maxLen, concurrency)
85+
ok <- struct{}{}
86+
}()
87+
select {
88+
case <-time.After(time.Second * time.Duration(duration)):
89+
case <-ok:
90+
}
91+
return errors.Trace(err)
92+
}
93+
94+
func randGen(client *txnkv.Client, startKey, endKey []byte, maxLen int, concurrency int) error {
95+
log.Info("Start rand-gen", zap.Int("maxlen", maxLen),
96+
zap.String("startkey", hex.EncodeToString(startKey)), zap.String("endkey", hex.EncodeToString(endKey)))
97+
log.Info("Rand-gen will keep running. Please Ctrl+C to stop manually.")
98+
99+
// Cannot generate shorter key than commonPrefix
100+
commonPrefixLen := 0
101+
for ; commonPrefixLen < len(startKey) && commonPrefixLen < len(endKey) &&
102+
startKey[commonPrefixLen] == endKey[commonPrefixLen]; commonPrefixLen++ {
103+
continue
104+
}
105+
106+
if maxLen < commonPrefixLen {
107+
return errors.Errorf("maxLen (%v) < commonPrefixLen (%v)", maxLen, commonPrefixLen)
108+
}
109+
110+
const batchSize = 32
111+
112+
errCh := make(chan error, concurrency)
113+
for i := maxLen; i <= maxLen+concurrency; i++ {
114+
go func(i int) {
115+
for {
116+
txn, err := client.Begin()
117+
if err != nil {
118+
errCh <- errors.Trace(err)
119+
}
120+
for j := 0; j < batchSize; j++ {
121+
key := randKey(startKey, endKey, i)
122+
// append index to avoid write conflict
123+
key = appendIndex(key, i)
124+
value := randValue()
125+
err = txn.Set(key, value)
126+
if err != nil {
127+
errCh <- errors.Trace(err)
128+
}
129+
}
130+
err = txn.Commit(context.TODO())
131+
if err != nil {
132+
errCh <- errors.Trace(err)
133+
}
134+
}
135+
}(i)
136+
}
137+
138+
err := <-errCh
139+
if err != nil {
140+
return errors.Trace(err)
141+
}
142+
143+
return nil
144+
}
145+
146+
func testRandKey(startKey, endKey []byte, maxLen int) {
147+
for {
148+
k := randKey(startKey, endKey, maxLen)
149+
if bytes.Compare(k, startKey) < 0 || bytes.Compare(k, endKey) >= 0 {
150+
panic(hex.EncodeToString(k))
151+
}
152+
}
153+
}
154+
155+
//nolint:gosec
156+
func randKey(startKey, endKey []byte, maxLen int) []byte {
157+
Retry:
158+
for { // Regenerate on fail
159+
result := make([]byte, 0, maxLen)
160+
161+
upperUnbounded := false
162+
lowerUnbounded := false
163+
164+
for i := 0; i < maxLen; i++ {
165+
upperBound := 256
166+
if !upperUnbounded {
167+
if i >= len(endKey) {
168+
// The generated key is the same as endKey which is invalid. Regenerate it.
169+
continue Retry
170+
}
171+
upperBound = int(endKey[i]) + 1
172+
}
173+
174+
lowerBound := 0
175+
if !lowerUnbounded {
176+
if i >= len(startKey) {
177+
lowerUnbounded = true
178+
} else {
179+
lowerBound = int(startKey[i])
180+
}
181+
}
182+
183+
if lowerUnbounded {
184+
if rand.Intn(257) == 0 {
185+
return result
186+
}
187+
}
188+
189+
value := rand.Intn(upperBound - lowerBound)
190+
value += lowerBound
191+
192+
if value < upperBound-1 {
193+
upperUnbounded = true
194+
}
195+
if value > lowerBound {
196+
lowerUnbounded = true
197+
}
198+
199+
result = append(result, uint8(value))
200+
}
201+
202+
return result
203+
}
204+
}
205+
206+
//nolint:gosec
207+
func appendIndex(key []byte, i int) []byte {
208+
return append(key, uint8(i))
209+
}
210+
211+
//nolint:gosec
212+
func randValue() []byte {
213+
result := make([]byte, 0, 512)
214+
for i := 0; i < 512; i++ {
215+
value := rand.Intn(257)
216+
if value == 256 {
217+
if i > 0 {
218+
return result
219+
}
220+
value--
221+
}
222+
result = append(result, uint8(value))
223+
}
224+
return result
225+
}
226+
227+
func checksum(client *txnkv.Client, startKey, endKey []byte) error {
228+
log.Info("Start checkcum on range",
229+
zap.String("startkey", hex.EncodeToString(startKey)), zap.String("endkey", hex.EncodeToString(endKey)))
230+
231+
txn, err := client.Begin()
232+
if err != nil {
233+
return errors.Trace(err)
234+
}
235+
iter, err := txn.Iter(startKey, endKey)
236+
if err != nil {
237+
return errors.Trace(err)
238+
}
239+
240+
digest := crc64.New(crc64.MakeTable(crc64.ECMA))
241+
242+
var res uint64
243+
244+
for iter.Valid() {
245+
err := iter.Next()
246+
if err != nil {
247+
return errors.Trace(err)
248+
}
249+
if len(iter.Key()) == 0 {
250+
break
251+
}
252+
_, _ = digest.Write(iter.Key())
253+
_, _ = digest.Write(iter.Value())
254+
res ^= digest.Sum64()
255+
}
256+
_ = txn.Commit(context.TODO())
257+
258+
log.Info("Checksum result", zap.Uint64("checksum", res))
259+
fmt.Printf("Checksum result: %016x\n", res)
260+
return nil
261+
}
262+
263+
func deleteRange(client *txnkv.Client, startKey, endKey []byte) error {
264+
log.Info("Start delete data in range",
265+
zap.String("startkey", hex.EncodeToString(startKey)), zap.String("endkey", hex.EncodeToString(endKey)))
266+
_, err := client.DeleteRange(context.TODO(), startKey, endKey, *concurrency)
267+
return err
268+
}

0 commit comments

Comments
 (0)