Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log Backup: decouple log backup resolve locks from GCWorker. (#45904) #46893

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3703,8 +3703,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:2W6QJ+Dh8SreeR4Z3xHXZLwdhIf1FHQwoMNBBBTn5Jg=",
version = "v2.0.4-0.20230905091602-cf19ede9ecd6",
sum = "h1:pvHtHnUDfqMAU3/F8JVpuuG86E/lemJWZq0iSCV3kKY=",
version = "v2.0.4-0.20230912041415-9c163cc8574b",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ func (mgr *Mgr) GetTLSConfig() *tls.Config {
return mgr.StoreManager.TLSConfig()
}

// GetStore gets the tikvStore.
func (mgr *Mgr) GetStore() tikv.Storage {
return mgr.tikvStore
}

// GetLockResolver gets the LockResolver.
func (mgr *Mgr) GetLockResolver() *txnlock.LockResolver {
return mgr.tikvStore.GetLockResolver()
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ go_library(
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//txnkv/rangetask",
"@com_github_tikv_client_go_v2//txnkv/txnlock",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
Expand Down Expand Up @@ -82,18 +85,24 @@ go_test(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/logbackuppb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//txnkv/txnlock",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_server_v3//embed",
"@io_etcd_go_etcd_server_v3//mvcc",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//status",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
Expand Down
209 changes: 177 additions & 32 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
package streamhelper

import (
"bytes"
"context"
"math"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand All @@ -17,7 +21,10 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/rangetask"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -60,7 +67,9 @@ type CheckpointAdvancer struct {

// the cached last checkpoint.
// if no progress, this cache can help us don't to send useless requests.
lastCheckpoint uint64
lastCheckpoint *checkpoint
lastCheckpointMu sync.Mutex
inResolvingLock atomic.Bool

checkpoints *spans.ValueSortedFull
checkpointsMu sync.Mutex
Expand All @@ -69,6 +78,53 @@ type CheckpointAdvancer struct {
subscriberMu sync.Mutex
}

// checkpoint represents the TS with specific range.
// it's only used in advancer.go.
type checkpoint struct {
StartKey []byte
EndKey []byte
TS uint64

// It's better to use PD timestamp in future, for now
// use local time to decide the time to resolve lock is ok.
resolveLockTime time.Time
}

func newCheckpointWithTS(ts uint64) *checkpoint {
return &checkpoint{
TS: ts,
resolveLockTime: time.Now(),
}
}

func NewCheckpointWithSpan(s spans.Valued) *checkpoint {
return &checkpoint{
StartKey: s.Key.StartKey,
EndKey: s.Key.EndKey,
TS: s.Value,
resolveLockTime: time.Now(),
}
}

func (c *checkpoint) safeTS() uint64 {
return c.TS - 1
}

func (c *checkpoint) equal(o *checkpoint) bool {
return bytes.Equal(c.StartKey, o.StartKey) &&
bytes.Equal(c.EndKey, o.EndKey) && c.TS == o.TS
}

// if a checkpoint stay in a time too long(3 min)
// we should try to resolve lock for the range
// to keep the RPO in 5 min.
func (c *checkpoint) needResolveLocks() bool {
failpoint.Inject("NeedResolveLocks", func(val failpoint.Value) {
failpoint.Return(val.(bool))
})
return time.Since(c.resolveLockTime) > 3*time.Minute
}

// NewCheckpointAdvancer creates a checkpoint advancer with the env.
func NewCheckpointAdvancer(env Env) *CheckpointAdvancer {
return &CheckpointAdvancer{
Expand All @@ -92,6 +148,13 @@ func (c *CheckpointAdvancer) UpdateConfigWith(f func(*config.Config)) {
c.UpdateConfig(cfg)
}

// UpdateLastCheckpoint modify the checkpoint in ticking.
func (c *CheckpointAdvancer) UpdateLastCheckpoint(p *checkpoint) {
c.lastCheckpointMu.Lock()
c.lastCheckpoint = p
c.lastCheckpointMu.Unlock()
}

// Config returns the current config.
func (c *CheckpointAdvancer) Config() config.Config {
return c.cfg
Expand Down Expand Up @@ -170,16 +233,35 @@ func tsoBefore(n time.Duration) uint64 {
return oracle.ComposeTS(now.UnixMilli()-n.Milliseconds(), 0)
}

func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context, threshold time.Duration) (uint64, error) {
func tsoAfter(ts uint64, n time.Duration) uint64 {
return oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(n))
}

func (c *CheckpointAdvancer) WithCheckpoints(f func(*spans.ValueSortedFull)) {
c.checkpointsMu.Lock()
defer c.checkpointsMu.Unlock()

f(c.checkpoints)
}

// only used for test
func (c *CheckpointAdvancer) NewCheckpoints(cps *spans.ValueSortedFull) {
c.checkpoints = cps
}

func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context,
threshold time.Duration) (spans.Valued, error) {
var targets []spans.Valued
c.checkpoints.TraverseValuesLessThan(tsoBefore(threshold), func(v spans.Valued) bool {
targets = append(targets, v)
return true
var minValue spans.Valued
c.WithCheckpoints(func(vsf *spans.ValueSortedFull) {
c.checkpoints.TraverseValuesLessThan(tsoBefore(threshold), func(v spans.Valued) bool {
targets = append(targets, v)
return true
})
minValue = vsf.Min()
})
if len(targets) == 0 {
c.checkpointsMu.Lock()
defer c.checkpointsMu.Unlock()
return c.checkpoints.MinValue(), nil
return minValue, nil
}
samples := targets
if len(targets) > 3 {
Expand All @@ -191,12 +273,9 @@ func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context,

err := c.tryAdvance(ctx, len(targets), func(i int) kv.KeyRange { return targets[i].Key })
if err != nil {
return 0, err
return minValue, err
}
c.checkpointsMu.Lock()
ts := c.checkpoints.MinValue()
c.checkpointsMu.Unlock()
return ts, nil
return minValue, nil
}

func (c *CheckpointAdvancer) consumeAllTask(ctx context.Context, ch <-chan TaskEvent) error {
Expand Down Expand Up @@ -288,8 +367,9 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
c.task = e.Info
c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] })
c.checkpoints = spans.Sorted(spans.NewFullWith(e.Ranges, 0))
c.lastCheckpoint = e.Info.StartTs
log.Info("added event", zap.Stringer("task", e.Info), zap.Stringer("ranges", logutil.StringifyKeys(c.taskRange)))
c.lastCheckpoint = newCheckpointWithTS(e.Info.StartTs)
log.Info("added event", zap.Stringer("task", e.Info),
zap.Stringer("ranges", logutil.StringifyKeys(c.taskRange)))
case EventDel:
utils.LogBackupTaskCountDec()
c.task = nil
Expand All @@ -310,33 +390,39 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
return nil
}

func (c *CheckpointAdvancer) setCheckpoint(cp uint64) bool {
if cp < c.lastCheckpoint {
log.Warn("failed to update global checkpoint: stale", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
func (c *CheckpointAdvancer) setCheckpoint(ctx context.Context, s spans.Valued) bool {
cp := NewCheckpointWithSpan(s)
if cp.TS < c.lastCheckpoint.TS {
log.Warn("failed to update global checkpoint: stale",
zap.Uint64("old", c.lastCheckpoint.TS), zap.Uint64("new", cp.TS))
return false
}
if cp <= c.lastCheckpoint {
// Need resolve lock for different range and same TS
// so check the range and TS here.
if cp.equal(c.lastCheckpoint) {
return false
}
c.lastCheckpoint = cp
c.UpdateLastCheckpoint(cp)
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint.TS))
return true
}

// advanceCheckpointBy advances the checkpoint by a checkpoint getter function.
func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context, getCheckpoint func(context.Context) (uint64, error)) error {
func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context,
getCheckpoint func(context.Context) (spans.Valued, error)) error {
start := time.Now()
cp, err := getCheckpoint(ctx)
if err != nil {
return err
}

if c.setCheckpoint(cp) {
if c.setCheckpoint(ctx, cp) {
log.Info("uploading checkpoint for task",
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp)),
zap.Uint64("checkpoint", cp),
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp.Value)),
zap.Uint64("checkpoint", cp.Value),
zap.String("task", c.task.Name),
zap.Stringer("take", time.Since(start)))
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint))
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint.TS))
}
return nil
}
Expand Down Expand Up @@ -387,28 +473,44 @@ func (c *CheckpointAdvancer) subscribeTick(ctx context.Context) error {

func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
c.checkpointsMu.Lock()
c.setCheckpoint(c.checkpoints.MinValue())
c.setCheckpoint(ctx, c.checkpoints.Min())
c.checkpointsMu.Unlock()
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint); err != nil {
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint.TS); err != nil {
return errors.Annotate(err, "failed to upload global checkpoint")
}
return nil
}

func (c *CheckpointAdvancer) optionalTick(cx context.Context) error {
// lastCheckpoint is not increased too long enough.
// assume the cluster has expired locks for whatever reasons.
var targets []spans.Valued
if c.lastCheckpoint != nil && c.lastCheckpoint.needResolveLocks() && c.inResolvingLock.CompareAndSwap(false, true) {
c.WithCheckpoints(func(vsf *spans.ValueSortedFull) {
// when get locks here. assume these locks are not belong to same txn,
// but these locks' start ts are close to 1 minute. try resolve these locks at one time
vsf.TraverseValuesLessThan(tsoAfter(c.lastCheckpoint.TS, time.Minute), func(v spans.Valued) bool {
targets = append(targets, v)
return true
})
})
if len(targets) != 0 {
log.Info("Advancer starts to resolve locks", zap.Int("targets", len(targets)))
// use new context here to avoid timeout
ctx := context.Background()
c.asyncResolveLocksForRanges(ctx, targets)
}
c.inResolvingLock.Store(false)
}
threshold := c.Config().GetDefaultStartPollThreshold()
if err := c.subscribeTick(cx); err != nil {
log.Warn("[log backup advancer] Subscriber meet error, would polling the checkpoint.", logutil.ShortError(err))
threshold = c.Config().GetSubscriberErrorStartPollThreshold()
}

err := c.advanceCheckpointBy(cx, func(cx context.Context) (uint64, error) {
return c.advanceCheckpointBy(cx, func(cx context.Context) (spans.Valued, error) {
return c.CalculateGlobalCheckpointLight(cx, threshold)
})
if err != nil {
return err
}
return nil
}

func (c *CheckpointAdvancer) tick(ctx context.Context) error {
Expand Down Expand Up @@ -437,3 +539,46 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {

return errs
}

func (c *CheckpointAdvancer) asyncResolveLocksForRanges(ctx context.Context, targets []spans.Valued) {
// run in another goroutine
// do not block main tick here
go func() {
handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
// we will scan all locks and try to resolve them by check txn status.
return tikv.ResolveLocksForRange(
ctx, c.env, math.MaxUint64, r.StartKey, r.EndKey, tikv.NewGcResolveLockMaxBackoffer, tikv.GCScanLockLimit)
}
workerPool := utils.NewWorkerPool(uint(config.DefaultMaxConcurrencyAdvance), "advancer resolve locks")
var wg sync.WaitGroup
for _, r := range targets {
targetRange := r
wg.Add(1)
workerPool.Apply(func() {
defer wg.Done()
// Run resolve lock on the whole TiKV cluster.
// it will use startKey/endKey to scan region in PD.
// but regionCache already has a codecPDClient. so just use decode key here.
// and it almost only include one region here. so set concurrency to 1.
runner := rangetask.NewRangeTaskRunner("advancer-resolve-locks-runner",
c.env.GetStore(), 1, handler)
err := runner.RunOnRange(ctx, targetRange.Key.StartKey, targetRange.Key.EndKey)
if err != nil {
// wait for next tick
log.Warn("resolve locks failed, wait for next tick", zap.String("category", "advancer"),
zap.String("uuid", "log backup advancer"),
zap.Error(err))
}
})
}
wg.Wait()
log.Info("finish resolve locks for checkpoint", zap.String("category", "advancer"),
zap.String("uuid", "log backup advancer"),
logutil.Key("StartKey", c.lastCheckpoint.StartKey),
logutil.Key("EndKey", c.lastCheckpoint.EndKey),
zap.Int("targets", len(targets)))
c.lastCheckpointMu.Lock()
c.lastCheckpoint.resolveLockTime = time.Now()
c.lastCheckpointMu.Unlock()
}()
}
Loading