Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log Backup: decouple log backup resolve locks from GCWorker. #45904

Merged
merged 56 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
c99737f
update checkpoint
3pointer Aug 7, 2023
6ac2d25
refactor GCWorker for lockResolver
3pointer Aug 8, 2023
418bd47
decouple logbackup in GCWorker
3pointer Aug 8, 2023
1bf06cd
use GCLockResolver in advancer
3pointer Aug 8, 2023
a0742dc
avoid cycle import
3pointer Aug 8, 2023
885425f
expose tikvStore for lockResolver
3pointer Aug 8, 2023
e0df995
update test
3pointer Aug 8, 2023
a209893
address comment
3pointer Aug 8, 2023
ae12e5c
fix bazel
3pointer Aug 9, 2023
5e4ba62
remove LocateKey/SendReq in GCLockResolver interface
3pointer Aug 9, 2023
713b16f
fix gc worker unit test
3pointer Aug 9, 2023
8d821a7
add ut for resolve lock
3pointer Aug 9, 2023
e76a325
fix
3pointer Aug 10, 2023
2adf397
precise resolve lock ranges
3pointer Aug 11, 2023
bf8b1d5
update log
3pointer Aug 14, 2023
1fe03aa
udpate go mod for client-go
3pointer Aug 16, 2023
7e02b09
update test
3pointer Aug 16, 2023
8dc9ce3
remove useless code
3pointer Aug 17, 2023
29e65d9
update test
3pointer Aug 17, 2023
ba1a4d1
fix test
3pointer Aug 17, 2023
c1628f7
update check time
3pointer Aug 21, 2023
99d8d10
resolve conflicts
3pointer Aug 21, 2023
e58f0fa
adapt new lockResolver
3pointer Aug 28, 2023
74b4459
Merge branch 'master' into decouple_pitr_for_gc
3pointer Aug 28, 2023
f22ece2
make bazel_prepare
3pointer Aug 28, 2023
3a0db58
update go mod
3pointer Aug 29, 2023
effa59b
Merge branch 'decouple_pitr_for_gc' of https://github.com/3pointer/ti…
3pointer Aug 29, 2023
9e6fcea
update
3pointer Aug 29, 2023
0e4c9d3
fix bazel build
3pointer Aug 29, 2023
b7c2912
fix race test
3pointer Aug 29, 2023
38c95e4
fix bazel
3pointer Aug 29, 2023
012bdec
fix unit test
3pointer Aug 29, 2023
3b9e7e6
Merge branch 'decouple_pitr_for_gc' of https://github.com/3pointer/ti…
3pointer Aug 29, 2023
d32ed0e
fix unit test
3pointer Aug 29, 2023
0d69d7b
fix unit test
3pointer Aug 30, 2023
22ddfc5
udpate
3pointer Aug 30, 2023
88e8092
fix test
3pointer Aug 30, 2023
75fb8fd
fix gc job test
3pointer Sep 1, 2023
e903cc0
Merge branch 'master' into decouple_pitr_for_gc
3pointer Sep 1, 2023
3a60000
Merge branch 'master' into decouple_pitr_for_gc
3pointer Sep 1, 2023
5d7c382
Merge branch 'master' into decouple_pitr_for_gc
3pointer Sep 4, 2023
8d94a94
address comments
3pointer Sep 6, 2023
6f551b6
Merge branch 'master' into decouple_pitr_for_gc
3pointer Sep 6, 2023
d92df68
Merge branch 'master' into decouple_pitr_for_gc
3pointer Sep 6, 2023
9802295
address comment
3pointer Sep 6, 2023
ce3726f
address comment
3pointer Sep 6, 2023
e581709
address comment
3pointer Sep 6, 2023
d69e652
address comment
3pointer Sep 7, 2023
a545c59
address comment
3pointer Sep 7, 2023
c41accc
address comment
3pointer Sep 7, 2023
ba6c52a
fix ut
3pointer Sep 7, 2023
78ace34
fix unstable test
3pointer Sep 8, 2023
be0c535
address comment
3pointer Sep 11, 2023
b55796f
Merge branch 'master' into decouple_pitr_for_gc
3pointer Sep 11, 2023
2ac41e7
Merge branch 'master' into decouple_pitr_for_gc
3pointer Sep 12, 2023
5ae1830
address comments
3pointer Sep 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ func (mgr *Mgr) GetTLSConfig() *tls.Config {
return mgr.StoreManager.TLSConfig()
}

// GetStore gets the tikvStore.
func (mgr *Mgr) GetStore() tikv.Storage {
return mgr.tikvStore
}

// GetLockResolver gets the LockResolver.
func (mgr *Mgr) GetLockResolver() *txnlock.LockResolver {
return mgr.tikvStore.GetLockResolver()
Expand Down
115 changes: 91 additions & 24 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/gcutil"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/txnkv/rangetask"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -60,7 +64,7 @@ type CheckpointAdvancer struct {

// the cached last checkpoint.
// if no progress, this cache can help us don't to send useless requests.
lastCheckpoint uint64
lastCheckpoint checkpoint

checkpoints *spans.ValueSortedFull
checkpointsMu sync.Mutex
Expand All @@ -69,6 +73,45 @@ type CheckpointAdvancer struct {
subscriberMu sync.Mutex
}

// checkpoint represents the TS with specific range.
// it's only used in advancer.go.
type checkpoint struct {
StartKey []byte
EndKey []byte
TS uint64

// It's better to use PD timestamp in future, for now
// use local time to decide the time to resolve lock is ok.
generateTime time.Time
}

func newCheckpointWithTS(ts uint64) checkpoint {
return checkpoint{
TS: ts,
generateTime: time.Now(),
}
}

func newCheckpointWithSpan(s spans.Valued) checkpoint {
return checkpoint{
StartKey: s.Key.StartKey,
EndKey: s.Key.EndKey,
TS: s.Value,
generateTime: time.Now(),
}
}

func (c checkpoint) safeTS() uint64 {
return c.TS - 1
}

// if a checkpoint stay in a time too long(1 min)
// we should try to resolve lock for the range
// to keep the RPO in a short value.
func (c checkpoint) needResolveLocks() bool {
return time.Since(c.generateTime) > time.Minute
}

// NewCheckpointAdvancer creates a checkpoint advancer with the env.
func NewCheckpointAdvancer(env Env) *CheckpointAdvancer {
return &CheckpointAdvancer{
Expand Down Expand Up @@ -180,7 +223,7 @@ func (c *CheckpointAdvancer) WithCheckpoints(f func(*spans.ValueSortedFull)) {
}

func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context,
threshold time.Duration) (uint64, error) {
threshold time.Duration) (spans.Valued, error) {
var targets []spans.Valued
var minValue spans.Valued
c.WithCheckpoints(func(vsf *spans.ValueSortedFull) {
Expand All @@ -194,13 +237,13 @@ func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context,
zap.Stringer("min", minValue), zap.Int("for-polling", len(targets)),
zap.String("min-ts", oracle.GetTimeFromTS(minValue.Value).Format(time.RFC3339)))
if len(targets) == 0 {
return minValue.Value, nil
return minValue, nil
}
err := c.tryAdvance(ctx, len(targets), func(i int) kv.KeyRange { return targets[i].Key })
if err != nil {
return 0, err
return minValue, err
}
return minValue.Value, nil
return minValue, nil
}

func (c *CheckpointAdvancer) consumeAllTask(ctx context.Context, ch <-chan TaskEvent) error {
Expand Down Expand Up @@ -293,7 +336,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
c.task = e.Info
c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] })
c.checkpoints = spans.Sorted(spans.NewFullWith(e.Ranges, 0))
c.lastCheckpoint = e.Info.StartTs
c.lastCheckpoint = newCheckpointWithTS(e.Info.StartTs)
p, err := c.env.BlockGCUntil(ctx, c.task.StartTs)
if err != nil {
log.Warn("failed to upload service GC safepoint, skipping.", logutil.ShortError(err))
Expand Down Expand Up @@ -323,33 +366,57 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
return nil
}

func (c *CheckpointAdvancer) setCheckpoint(cp uint64) bool {
if cp < c.lastCheckpoint {
func (c *CheckpointAdvancer) setCheckpoint(ctx context.Context, s spans.Valued) bool {
cp := newCheckpointWithSpan(s)
if cp.TS < c.lastCheckpoint.TS {
log.Warn("failed to update global checkpoint: stale",
zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
zap.Uint64("old", c.lastCheckpoint.TS), zap.Uint64("new", cp.TS))
return false
}
// lastCheckpoint is not increased too long enough.
// assume the cluster has expired locks for whatever reasons.
if c.lastCheckpoint.needResolveLocks() {
3pointer marked this conversation as resolved.
Show resolved Hide resolved
handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
// we will scan all lock before cp.TS and try to resolve them by check txn status.
return gcutil.ResolveLocksForRange(ctx, "log backup advancer", c.env, cp.TS, r.StartKey, r.EndKey)
}
runner := rangetask.NewRangeTaskRunner("advancer-resolve-locks-runner", c.env.GetStore(), config.DefaultMaxConcurrencyAdvance, handler)
// Run resolve lock on the whole TiKV cluster. it will use startKey/endKey to scan region in PD. so we need encode key here.
encodedStartKey := codec.EncodeBytes([]byte{}, []byte(c.lastCheckpoint.StartKey))
encodedEndKey := codec.EncodeBytes([]byte{}, []byte(c.lastCheckpoint.EndKey))
err := runner.RunOnRange(ctx, encodedStartKey, encodedEndKey)
if err != nil {
log.Error("resolve locks failed", zap.String("category", "advancer"),
zap.String("uuid", "log backup advancer"),
zap.Error(err))
return false
}
log.Info("finish resolve locks", zap.String("category", "advancer"),
zap.String("uuid", "log backup advancer"),
zap.Int("regions", runner.CompletedRegions()))
return false
}
if cp <= c.lastCheckpoint {
if cp.TS <= c.lastCheckpoint.TS {
return false
}
c.lastCheckpoint = cp
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint))
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint.TS))
return true
}

// advanceCheckpointBy advances the checkpoint by a checkpoint getter function.
func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context,
getCheckpoint func(context.Context) (uint64, error)) error {
getCheckpoint func(context.Context) (spans.Valued, error)) error {
start := time.Now()
cp, err := getCheckpoint(ctx)
if err != nil {
return err
}

if c.setCheckpoint(cp) {
if c.setCheckpoint(ctx, cp) {
log.Info("uploading checkpoint for task",
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp)),
zap.Uint64("checkpoint", cp),
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp.Value)),
zap.Uint64("checkpoint", cp.Value),
zap.String("task", c.task.Name),
zap.Stringer("take", time.Since(start)))
}
Expand Down Expand Up @@ -403,24 +470,24 @@ func (c *CheckpointAdvancer) subscribeTick(ctx context.Context) error {

func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
c.checkpointsMu.Lock()
c.setCheckpoint(c.checkpoints.MinValue())
c.setCheckpoint(ctx, c.checkpoints.Min())
3pointer marked this conversation as resolved.
Show resolved Hide resolved
c.checkpointsMu.Unlock()
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint); err != nil {
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint.TS); err != nil {
return errors.Annotate(err, "failed to upload global checkpoint")
}
p, err := c.env.BlockGCUntil(ctx, c.lastCheckpoint-1)
p, err := c.env.BlockGCUntil(ctx, c.lastCheckpoint.safeTS())
if err != nil {
return errors.Annotatef(err,
"failed to update service GC safe point, current checkpoint is %d, target checkpoint is %d",
c.lastCheckpoint-1, p)
c.lastCheckpoint.safeTS(), p)
}
if p <= c.lastCheckpoint-1 {
if p <= c.lastCheckpoint.safeTS() {
log.Info("updated log backup GC safe point.",
zap.Uint64("checkpoint", p), zap.Uint64("target", c.lastCheckpoint-1))
zap.Uint64("checkpoint", p), zap.Uint64("target", c.lastCheckpoint.safeTS()))
}
if p > c.lastCheckpoint-1 {
if p > c.lastCheckpoint.safeTS() {
log.Warn("update log backup GC safe point failed: stale.",
zap.Uint64("checkpoint", p), zap.Uint64("target", c.lastCheckpoint-1))
zap.Uint64("checkpoint", p), zap.Uint64("target", c.lastCheckpoint.safeTS()))
}
return nil
}
Expand All @@ -433,7 +500,7 @@ func (c *CheckpointAdvancer) optionalTick(cx context.Context) error {
threshold = c.Config().GetSubscriberErrorStartPollThreshold()
}

err := c.advanceCheckpointBy(cx, func(cx context.Context) (uint64, error) {
err := c.advanceCheckpointBy(cx, func(cx context.Context) (spans.Valued, error) {
return c.CalculateGlobalCheckpointLight(cx, threshold)
})
if err != nil {
Expand Down
55 changes: 48 additions & 7 deletions br/pkg/streamhelper/advancer_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ import (
"context"
"time"

"github.com/pingcap/errors"
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/util/engine"
"github.com/pingcap/tidb/util/gcutil"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
Expand All @@ -29,6 +34,8 @@ type Env interface {
LogBackupService
// StreamMeta connects to the metadata service (normally PD).
StreamMeta
// GCLockResolver try to resolve locks when region checkpoint stopped.
gcutil.GCLockResolver
}

// PDRegionScanner is a simple wrapper over PD
Expand Down Expand Up @@ -83,6 +90,7 @@ type clusterEnv struct {
clis *utils.StoreManager
*AdvancerExt
PDRegionScanner
*AdvancerLockResolver
}

// GetLogBackupClient gets the log backup client.
Expand All @@ -98,16 +106,17 @@ func (t clusterEnv) GetLogBackupClient(ctx context.Context, storeID uint64) (log
}

// CliEnv creates the Env for CLI usage.
func CliEnv(cli *utils.StoreManager, etcdCli *clientv3.Client) Env {
func CliEnv(cli *utils.StoreManager, tikvStore tikv.Storage, etcdCli *clientv3.Client) Env {
return clusterEnv{
clis: cli,
AdvancerExt: &AdvancerExt{MetaDataClient: *NewMetaDataClient(etcdCli)},
PDRegionScanner: PDRegionScanner{cli.PDClient()},
clis: cli,
AdvancerExt: &AdvancerExt{MetaDataClient: *NewMetaDataClient(etcdCli)},
PDRegionScanner: PDRegionScanner{cli.PDClient()},
AdvancerLockResolver: &AdvancerLockResolver{TiKvStore: tikvStore},
}
}

// TiDBEnv creates the Env by TiDB config.
func TiDBEnv(pdCli pd.Client, etcdCli *clientv3.Client, conf *config.Config) (Env, error) {
func TiDBEnv(tikvStore tikv.Storage, pdCli pd.Client, etcdCli *clientv3.Client, conf *config.Config) (Env, error) {
tconf, err := conf.GetTiKVConfig().Security.ToTLSConfig()
if err != nil {
return nil, err
Expand All @@ -117,8 +126,9 @@ func TiDBEnv(pdCli pd.Client, etcdCli *clientv3.Client, conf *config.Config) (En
Time: time.Duration(conf.TiKVClient.GrpcKeepAliveTime) * time.Second,
Timeout: time.Duration(conf.TiKVClient.GrpcKeepAliveTimeout) * time.Second,
}, tconf),
AdvancerExt: &AdvancerExt{MetaDataClient: *NewMetaDataClient(etcdCli)},
PDRegionScanner: PDRegionScanner{Client: pdCli},
AdvancerExt: &AdvancerExt{MetaDataClient: *NewMetaDataClient(etcdCli)},
PDRegionScanner: PDRegionScanner{Client: pdCli},
AdvancerLockResolver: &AdvancerLockResolver{TiKvStore: tikvStore},
}, nil
}

Expand All @@ -137,3 +147,34 @@ type StreamMeta interface {
// ClearV3GlobalCheckpointForTask clears the global checkpoint to the meta store.
ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error
}

type AdvancerLockResolver struct {
TiKvStore tikv.Storage
}

func (w *AdvancerLockResolver) LocateKey(bo *tikv.Backoffer, key []byte) (*tikv.KeyLocation, error) {
return w.TiKvStore.GetRegionCache().LocateKey(bo, key)
}

// ResolveLocks tries to resolve expired locks with this method.
// It will check status of the txn. Resolve the lock if txn is expired, Or do nothing.
func (w *AdvancerLockResolver) ResolveLocks(bo *tikv.Backoffer, locks []*txnlock.Lock, loc tikv.RegionVerID) (bool, error) {
if len(locks) == 0 {
return true, nil
}

_, err := w.TiKvStore.GetLockResolver().ResolveLocks(bo, 0, locks)
return err == nil, errors.Trace(err)
}

func (w *AdvancerLockResolver) ScanLocks(key []byte, regionID uint64) []*txnlock.Lock {
return nil
}

func (w *AdvancerLockResolver) SendReq(bo *tikv.Backoffer, req *tikvrpc.Request, regionID tikv.RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) {
return w.TiKvStore.SendReq(bo, req, regionID, timeout)
}

func (w *AdvancerLockResolver) GetStore() tikv.Storage {
return w.TiKvStore
}
2 changes: 1 addition & 1 deletion br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre
if err != nil {
return err
}
env := streamhelper.CliEnv(mgr.StoreManager, etcdCLI)
env := streamhelper.CliEnv(mgr.StoreManager, mgr.GetStore(), etcdCLI)
advancer := streamhelper.NewCheckpointAdvancer(env)
advancer.UpdateConfig(cfg.AdvancerCfg)
advancerd := daemon.New(advancer, streamhelper.OwnerManagerForLogBackup(ctx, etcdCLI), cfg.AdvancerCfg.TickDuration)
Expand Down
7 changes: 6 additions & 1 deletion domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,12 @@ func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error {
log.Warn("pd / etcd client not provided, won't begin Advancer.")
return nil
}
env, err := streamhelper.TiDBEnv(pdClient, do.etcdClient, cfg)
tikvStore, ok := do.Store().(tikv.Storage)
if !ok {
log.Warn("non tikv store, stop begin Advancer.")
return nil
}
env, err := streamhelper.TiDBEnv(tikvStore, pdClient, do.etcdClient, cfg)
if err != nil {
return err
}
Expand Down
Loading
Loading