Skip to content

Commit

Permalink
br: fix flaky test br_txn (#56444)
Browse files Browse the repository at this point in the history
close #54979
  • Loading branch information
Tristan1900 authored Oct 11, 2024
1 parent 93cad31 commit f773b6e
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 38 deletions.
1 change: 0 additions & 1 deletion br/pkg/rtree/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//br/pkg/logutil",
"//pkg/kv",
"//pkg/tablecodec",
"//pkg/util/redact",
"@com_github_google_btree//:btree",
Expand Down
5 changes: 2 additions & 3 deletions br/pkg/rtree/rtree.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/redact"
"github.com/pkg/errors"
Expand Down Expand Up @@ -153,8 +152,8 @@ func NeedsMerge(left, right *RangeStats, splitSizeBytes, splitKeyCount uint64) b
if leftKeys+rightKeys > splitKeyCount {
return false
}
tableID1, indexID1, isRecord1, err1 := tablecodec.DecodeKeyHead(kv.Key(left.StartKey))
tableID2, indexID2, isRecord2, err2 := tablecodec.DecodeKeyHead(kv.Key(right.StartKey))
tableID1, indexID1, isRecord1, err1 := tablecodec.DecodeKeyHead(left.StartKey)
tableID2, indexID2, isRecord2, err2 := tablecodec.DecodeKeyHead(right.StartKey)

// Failed to decode the file key head... can this happen?
if err1 != nil || err2 != nil {
Expand Down
3 changes: 1 addition & 2 deletions br/pkg/task/restore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ func RunRestoreTxn(c context.Context, g glue.Glue, cmdName string, cfg *Config)
defer cancel()

// Restore raw does not need domain.
needDomain := false
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(cfg), cfg.CheckRequirements, needDomain, conn.NormalVersionChecker)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(cfg), cfg.CheckRequirements, false, conn.NormalVersionChecker)
if err != nil {
return errors.Trace(err)
}
Expand Down
76 changes: 44 additions & 32 deletions br/tests/br_txn/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"hash/crc64"
"math/rand"
"sync"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -78,20 +79,13 @@ func main() {

func randGenWithDuration(client *txnkv.Client, startKey, endKey []byte,
maxLen int, concurrency int, duration int) error {
var err error
ok := make(chan struct{})
go func() {
err = randGen(client, startKey, endKey, maxLen, concurrency)
ok <- struct{}{}
}()
select {
case <-time.After(time.Second * time.Duration(duration)):
case <-ok:
}
return errors.Trace(err)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(duration))
defer cancel()

return randGen(ctx, client, startKey, endKey, maxLen, concurrency)
}

func randGen(client *txnkv.Client, startKey, endKey []byte, maxLen int, concurrency int) error {
func randGen(ctx context.Context, client *txnkv.Client, startKey, endKey []byte, maxLen int, concurrency int) error {
log.Info("Start rand-gen", zap.Int("maxlen", maxLen),
zap.String("startkey", hex.EncodeToString(startKey)), zap.String("endkey", hex.EncodeToString(endKey)))
log.Info("Rand-gen will keep running. Please Ctrl+C to stop manually.")
Expand All @@ -108,39 +102,57 @@ func randGen(client *txnkv.Client, startKey, endKey []byte, maxLen int, concurre
}

const batchSize = 32
const numBatch = 100

errCh := make(chan error, concurrency)
var wg sync.WaitGroup

for i := maxLen; i <= maxLen+concurrency; i++ {
wg.Add(1)
go func(i int) {
for {
txn, err := client.Begin()
if err != nil {
errCh <- errors.Trace(err)
}
for j := 0; j < batchSize; j++ {
key := randKey(startKey, endKey, i)
// append index to avoid write conflict
key = appendIndex(key, i)
value := randValue()
err = txn.Set(key, value)
defer wg.Done()
for k := 0; k < numBatch; k++ {
select {
case <-ctx.Done():
return
default:
txn, err := client.Begin()
if err != nil {
errCh <- errors.Trace(err)
}
for j := 0; j < batchSize; j++ {
key := randKey(startKey, endKey, i)
// append index to avoid write conflict
key = appendIndex(key, i)
value := randValue()
err = txn.Set(key, value)
if err != nil {
errCh <- errors.Trace(err)
}
}
err = txn.Commit(context.TODO())
if err != nil {
errCh <- errors.Trace(err)
}
}
err = txn.Commit(context.TODO())
if err != nil {
errCh <- errors.Trace(err)
}
}
}(i)
}

err := <-errCh
if err != nil {
return errors.Trace(err)
}
// Use a separate goroutine to wait for all workers to finish
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()

return nil
select {
case <-done:
return nil
case err := <-errCh:
<-done
return err
}
}

func testRandKey(startKey, endKey []byte, maxLen int) {
Expand Down

0 comments on commit f773b6e

Please sign in to comment.