Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: Enable checkpoint advancer to pause tasks lagged too large (#51441) #52555

Merged
Prev Previous commit
Next Next commit
ut finished
RidRisR authored and ti-chi-bot committed Apr 12, 2024
commit ef92b6bc7c50c5f81c7460b08d092cf92a057c8f
5 changes: 2 additions & 3 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
@@ -551,11 +551,10 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool,error

now,err := c.env.FetchCurrentTS(ctx)
if err != nil {
return false,err
return true,err
}

lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(c.lastCheckpoint.TS))

if lagDuration > c.cfg.CheckPointLagLimit {
log.Warn("checkpoint lag is too large", zap.String("category", "log backup advancer"),
zap.Stringer("lag", lagDuration))
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/advancer_env.go
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ func (c PDRegionScanner) BlockGCUntil(ctx context.Context, at uint64) (uint64, e
}

func (c PDRegionScanner) FetchCurrentTS(ctx context.Context) (uint64, error) {
return uint64(time.Now().UnixNano()), nil
return uint64(time.Now().Unix()), nil
}

// RegionScan gets a list of regions, starts from the region that contains key.
27 changes: 23 additions & 4 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
@@ -478,11 +478,30 @@ func TestEnableCheckPointLimit(t *testing.T) {
c.CheckPointLagLimit = 1*time.Minute
})
adv.StartTaskListener(ctx)
c.FetchCurrentTS();
require.NoError(t, adv.OnTick(ctx))
for i := 0; i < 5; i++ {
cp := c.advanceCheckpointBy()
c.advanceClusterTimeBy(30 * time.Second)
c.advanceCheckpointBy(20 * time.Second)
require.NoError(t, adv.OnTick(ctx))
require.Equal(t, env.getCheckpoint(), cp)
}
}

func TestCheckPointLaggedFailure(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1*time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(60 * time.Second)
c.advanceCheckpointBy(10 * time.Second)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(60 * time.Second)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
}
37 changes: 22 additions & 15 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock"
@@ -494,21 +495,27 @@ func (f *fakeCluster) advanceCheckpoints() uint64 {
return minCheckpoint
}

func (f *fakeCluster) advanceCheckpointBy(checkPoint uint64) uint64 {
minCheckpoint := uint64(math.MaxUint64)
for _, r := range f.regions {
f.updateRegion(r.id, func(r *region) {
// The current implementation assumes that the server never returns checkpoint with value 0.
// This assumption is true for the TiKV implementation, simulating it here.
cp := r.checkpoint.Add(checkPoint)
if cp < minCheckpoint {
minCheckpoint = cp
}
r.fsim.flushedEpoch.Store(0)
})
}
log.Info("checkpoint updated", zap.Uint64("to", minCheckpoint))
return minCheckpoint
func (f *fakeCluster) advanceCheckpointBy(duration time.Duration) uint64 {
minCheckpoint := uint64(math.MaxUint64)
for _, r := range f.regions {
f.updateRegion(r.id, func(r *region) {
newCheckpointTime := oracle.GetTimeFromTS(r.checkpoint.Load()).Add(duration)
newCheckpoint := oracle.GoTimeToTS(newCheckpointTime)
r.checkpoint.Store(newCheckpoint)
if newCheckpoint < minCheckpoint {
minCheckpoint = newCheckpoint
}
r.fsim.flushedEpoch.Store(0)
})
}
log.Info("checkpoint updated", zap.Uint64("to", minCheckpoint))
return minCheckpoint
}

func (f *fakeCluster) advanceClusterTimeBy(duration time.Duration) uint64 {
newTime := oracle.GoTimeToTS(oracle.GetTimeFromTS(f.currentTS).Add(duration))
f.currentTS = newTime
return newTime
}

func createFakeCluster(t *testing.T, n int, simEnabled bool) *fakeCluster {
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/regioniter_test.go
Original file line number Diff line number Diff line change
@@ -83,7 +83,7 @@ func (c constantRegions) BlockGCUntil(ctx context.Context, at uint64) (uint64, e
}

func (c constantRegions) FetchCurrentTS(ctx context.Context) (uint64, error) {
return uint64(time.Now().UnixNano()), nil
return uint64(time.Now().Unix()), nil
}

func makeSubrangeRegions(keys ...string) constantRegions {