From 051f12c884ee0ad6fafa62e1f396a0a41e795bef Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 11 Nov 2024 21:37:56 +0800 Subject: [PATCH] log backup: set a proper maxVersion to resolve lock (#57178) (#57280) close pingcap/tidb#57134 --- br/pkg/streamhelper/BUILD.bazel | 1 + br/pkg/streamhelper/advancer.go | 14 ++++-- br/pkg/streamhelper/advancer_test.go | 60 +++++++++++++---------- br/pkg/streamhelper/basic_lib_for_test.go | 36 ++++++++++++-- 4 files changed, 78 insertions(+), 33 deletions(-) diff --git a/br/pkg/streamhelper/BUILD.bazel b/br/pkg/streamhelper/BUILD.bazel index ef4ca75797449..163a264d42440 100644 --- a/br/pkg/streamhelper/BUILD.bazel +++ b/br/pkg/streamhelper/BUILD.bazel @@ -81,6 +81,7 @@ go_test( "//kv", "//tablecodec", "//util/codec", + "//util/mathutil", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 0222ecd9570ac..c1b776a6b45b8 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -6,7 +6,6 @@ import ( "bytes" "context" "fmt" - "math" "strings" "sync" "sync/atomic" @@ -22,6 +21,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/util/mathutil" tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" @@ -471,7 +471,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error return nil } -func (c *CheckpointAdvancer) setCheckpoint(ctx context.Context, s spans.Valued) bool { +func (c *CheckpointAdvancer) setCheckpoint(s spans.Valued) bool { cp := NewCheckpointWithSpan(s) if cp.TS < c.lastCheckpoint.TS { log.Warn("failed to update global checkpoint: stale", @@ -497,7 +497,7 @@ func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context, return err } - if c.setCheckpoint(ctx, cp) { + if c.setCheckpoint(cp) { log.Info("uploading checkpoint for task", zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp.Value)), zap.Uint64("checkpoint", cp.Value), @@ -583,7 +583,7 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro func (c *CheckpointAdvancer) importantTick(ctx context.Context) error { c.checkpointsMu.Lock() - c.setCheckpoint(ctx, c.checkpoints.Min()) + c.setCheckpoint(c.checkpoints.Min()) c.checkpointsMu.Unlock() if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint.TS); err != nil { return errors.Annotate(err, "failed to upload global checkpoint") @@ -683,10 +683,14 @@ func (c *CheckpointAdvancer) asyncResolveLocksForRanges(ctx context.Context, tar // do not block main tick here go func() { failpoint.Inject("AsyncResolveLocks", func() {}) + maxTs := uint64(0) + for _, t := range targets { + maxTs = mathutil.Max(maxTs, t.Value) + } handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { // we will scan all locks and try to resolve them by check txn status. return tikv.ResolveLocksForRange( - ctx, c.env, math.MaxUint64, r.StartKey, r.EndKey, tikv.NewGcResolveLockMaxBackoffer, tikv.GCScanLockLimit) + ctx, c.env, maxTs+1, r.StartKey, r.EndKey, tikv.NewGcResolveLockMaxBackoffer, tikv.GCScanLockLimit) } workerPool := utils.NewWorkerPool(uint(config.DefaultMaxConcurrencyAdvance), "advancer resolve locks") var wg sync.WaitGroup diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index dc7988524dbb1..8921a1c4661b6 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -3,6 +3,7 @@ package streamhelper_test import ( + "bytes" "context" "fmt" "strings" @@ -20,6 +21,7 @@ import ( "github.com/pingcap/tidb/br/pkg/streamhelper/config" "github.com/pingcap/tidb/br/pkg/streamhelper/spans" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/util/mathutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" @@ -357,15 +359,20 @@ func TestResolveLock(t *testing.T) { lockRegion := c.findRegionByKey([]byte("01")) allLocks := []*txnlock.Lock{ { - Key: []byte{1}, + Key: []byte("011"), // TxnID == minCheckpoint TxnID: minCheckpoint, }, { - Key: []byte{2}, + Key: []byte("012"), // TxnID > minCheckpoint TxnID: minCheckpoint + 1, }, + { + Key: []byte("013"), + // this lock cannot be resolved due to scan version + TxnID: oracle.GoTimeToTS(oracle.GetTimeFromTS(minCheckpoint).Add(2 * time.Minute)), + }, } c.LockRegion(lockRegion, allLocks) @@ -373,32 +380,39 @@ func TestResolveLock(t *testing.T) { resolveLockRef := atomic.NewBool(false) env.resolveLocks = func(locks []*txnlock.Lock, loc *tikv.KeyLocation) (*tikv.KeyLocation, error) { resolveLockRef.Store(true) - require.ElementsMatch(t, locks, allLocks) + // The third lock has skipped, because it's less than max version. + require.ElementsMatch(t, locks, allLocks[:2]) return loc, nil } adv := streamhelper.NewCheckpointAdvancer(env) - // make lastCheckpoint stuck at 123 - adv.UpdateLastCheckpoint(streamhelper.NewCheckpointWithSpan(spans.Valued{ - Key: kv.KeyRange{ - StartKey: kv.Key([]byte("1")), - EndKey: kv.Key([]byte("2")), - }, - Value: 123, - })) - adv.NewCheckpoints( - spans.Sorted(spans.NewFullWith([]kv.KeyRange{ - { - StartKey: kv.Key([]byte("1")), - EndKey: kv.Key([]byte("2")), - }, - }, 0)), - ) adv.StartTaskListener(ctx) - require.Eventually(t, func() bool { return adv.OnTick(ctx) == nil }, - time.Second, 50*time.Millisecond) + + maxTargetTs := uint64(0) coll := streamhelper.NewClusterCollector(ctx, env) + coll.SetOnSuccessHook(func(u uint64, kr kv.KeyRange) { + adv.WithCheckpoints(func(s *spans.ValueSortedFull) { + for _, lock := range allLocks { + // if there is any lock key in the range + if bytes.Compare(kr.StartKey, lock.Key) <= 0 && (bytes.Compare(lock.Key, kr.EndKey) < 0 || len(kr.EndKey) == 0) { + // mock lock behavior, do not update checkpoint + s.Merge(spans.Valued{Key: kr, Value: minCheckpoint}) + return + } + } + s.Merge(spans.Valued{Key: kr, Value: u}) + maxTargetTs = mathutil.Max(maxTargetTs, u) + }) + }) err := adv.GetCheckpointInRange(ctx, []byte{}, []byte{}, coll) require.NoError(t, err) + r, err := coll.Finish(ctx) + require.NoError(t, err) + require.Len(t, r.FailureSubRanges, 0) + require.Equal(t, r.Checkpoint, minCheckpoint, "%d %d", r.Checkpoint, minCheckpoint) + + env.maxTs = maxTargetTs + 1 + require.Eventually(t, func() bool { return adv.OnTick(ctx) == nil }, + time.Second, 50*time.Millisecond) // now the lock state must be ture. because tick finished and asyncResolveLocks got stuck. require.True(t, adv.GetInResolvingLock()) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/AsyncResolveLocks")) @@ -407,10 +421,6 @@ func TestResolveLock(t *testing.T) { // state must set to false after tick require.Eventually(t, func() bool { return !adv.GetInResolvingLock() }, 8*time.Second, 50*time.Microsecond) - r, err := coll.Finish(ctx) - require.NoError(t, err) - require.Len(t, r.FailureSubRanges, 0) - require.Equal(t, r.Checkpoint, minCheckpoint, "%d %d", r.Checkpoint, minCheckpoint) } func TestOwnerDropped(t *testing.T) { diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 1aa7c56829ab9..7b18f01f355de 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -17,6 +17,7 @@ import ( "testing" "time" + "github.com/pingcap/errors" backup "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" @@ -98,6 +99,7 @@ type fakeCluster struct { idAlloced uint64 stores map[uint64]*fakeStore regions []*region + maxTs uint64 testCtx *testing.T onGetClient func(uint64) error @@ -765,17 +767,31 @@ func (t *testEnv) putTask() { } func (t *testEnv) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, maxVersion uint64, limit uint32) ([]*txnlock.Lock, *tikv.KeyLocation, error) { + t.mu.Lock() + defer t.mu.Unlock() + if t.maxTs != maxVersion { + return nil, nil, errors.Errorf("unexpect max version in scan lock, expected %d, actual %d", t.maxTs, maxVersion) + } for _, r := range t.regions { if len(r.locks) != 0 { - return r.locks, &tikv.KeyLocation{ + locks := make([]*txnlock.Lock, 0, len(r.locks)) + for _, l := range r.locks { + // skip the lock larger than maxVersion + if l.TxnID < maxVersion { + locks = append(locks, l) + } + } + return locks, &tikv.KeyLocation{ Region: tikv.NewRegionVerID(r.id, 0, 0), }, nil } } - return nil, nil, nil + return nil, &tikv.KeyLocation{}, nil } func (t *testEnv) ResolveLocksInOneRegion(bo *tikv.Backoffer, locks []*txnlock.Lock, loc *tikv.KeyLocation) (*tikv.KeyLocation, error) { + t.mu.Lock() + defer t.mu.Unlock() for _, r := range t.regions { if loc != nil && loc.Region.GetID() == r.id { // reset locks @@ -783,7 +799,7 @@ func (t *testEnv) ResolveLocksInOneRegion(bo *tikv.Backoffer, locks []*txnlock.L return t.resolveLocks(locks, loc) } } - return nil, nil + return loc, nil } func (t *testEnv) Identifier() string { @@ -848,6 +864,20 @@ func (p *mockPDClient) GetStore(_ context.Context, storeID uint64) (*metapb.Stor }, nil } +func (p *mockPDClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) { + // only used for GetRegionCache once in resolve lock + return []*metapb.Store{ + { + Id: 1, + Address: "127.0.0.1", + }, + }, nil +} + +func (p *mockPDClient) GetClusterID(ctx context.Context) uint64 { + return 1 +} + func newMockRegion(regionID uint64, startKey []byte, endKey []byte) *pd.Region { leader := &metapb.Peer{ Id: regionID,