Skip to content

Commit

Permalink
log-backup: added more metrics and hint; fixed a bug may cause inf lo…
Browse files Browse the repository at this point in the history
…op (#36228)

ref #29501
  • Loading branch information
YuJuncen authored Jul 18, 2022
1 parent 8b30e52 commit 9859512
Show file tree
Hide file tree
Showing 13 changed files with 338 additions and 26 deletions.
34 changes: 31 additions & 3 deletions br/pkg/glue/console_glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,40 @@ type ConsoleOperations struct {
ConsoleGlue
}

// StartTask prints a task start information, and mark as finished when the returned function called.
func (ops ConsoleOperations) StartTask(message string) func() {
// An extra field appending to the task.
// return type is a {key: string, value: string} tuple.
type ExtraField func() [2]string

// NOTE:
// Perhaps we'd better move these modifiers and terminal function to another package
// like `glue/termutil?`

// WithTimeCost adds the task information of time costing for `ShowTask`.
func WithTimeCost() ExtraField {
start := time.Now()
return func() [2]string {
return [2]string{"take", time.Since(start).String()}
}
}

// WithConstExtraField adds an extra field with constant values.
func WithConstExtraField(key string, value interface{}) ExtraField {
return func() [2]string {
return [2]string{key, fmt.Sprint(value)}
}
}

// ShowTask prints a task start information, and mark as finished when the returned function called.
// This is for TUI presenting.
func (ops ConsoleOperations) ShowTask(message string, extraFields ...ExtraField) func() {
ops.Print(message)
return func() {
ops.Printf("%s; take = %s\n", color.HiGreenString("DONE"), time.Since(start))
fields := make([]string, 0, len(extraFields))
for _, fieldFunc := range extraFields {
field := fieldFunc()
fields = append(fields, fmt.Sprintf("%s = %s", field[0], color.New(color.Bold).Sprint(field[1])))
}
ops.Printf("%s; %s\n", color.HiGreenString("DONE"), strings.Join(fields, ", "))
}
}

Expand Down
31 changes: 20 additions & 11 deletions br/pkg/logutil/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,18 +282,27 @@ func (kr StringifyKeys) String() string {
if i > 0 {
sb.WriteString(", ")
}
sb.WriteString("[")
sb.WriteString(redact.Key(rng.StartKey))
sb.WriteString(", ")
var endKey string
if len(rng.EndKey) == 0 {
endKey = "inf"
} else {
endKey = redact.Key(rng.EndKey)
}
sb.WriteString(redact.String(endKey))
sb.WriteString(")")
sb.WriteString(StringifyRange(rng).String())
}
sb.WriteString("}")
return sb.String()
}

// StringifyRange is the wrapper for displaying a key range.
type StringifyRange kv.KeyRange

func (rng StringifyRange) String() string {
sb := new(strings.Builder)
sb.WriteString("[")
sb.WriteString(redact.Key(rng.StartKey))
sb.WriteString(", ")
var endKey string
if len(rng.EndKey) == 0 {
endKey = "inf"
} else {
endKey = redact.Key(rng.EndKey)
}
sb.WriteString(redact.String(endKey))
sb.WriteString(")")
return sb.String()
}
17 changes: 13 additions & 4 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ func NewCheckpointAdvancer(env Env) *CheckpointAdvancer {
// you may need to change the config `AdvancingByCache`.
func (c *CheckpointAdvancer) disableCache() {
c.cache = NoOPCheckpointCache{}
c.state = fullScan{}
c.state = &fullScan{}
}

// enable the cache.
// also check `AdvancingByCache` in the config.
func (c *CheckpointAdvancer) enableCache() {
c.cache = NewCheckpoints()
c.state = fullScan{}
c.state = &fullScan{}
}

// UpdateConfig updates the config for the advancer.
Expand Down Expand Up @@ -185,6 +185,7 @@ func (c *CheckpointAdvancer) tryAdvance(ctx context.Context, rst RangesSharesTS)
defer c.recordTimeCost("try advance", zap.Uint64("checkpoint", rst.TS), zap.Int("len", len(rst.Ranges)))()
defer func() {
if err != nil {
log.Warn("failed to advance", logutil.ShortError(err), zap.Object("target", rst.Zap()))
c.cache.InsertRanges(rst)
}
}()
Expand Down Expand Up @@ -225,11 +226,19 @@ func (c *CheckpointAdvancer) tryAdvance(ctx context.Context, rst RangesSharesTS)

// CalculateGlobalCheckpointLight tries to advance the global checkpoint by the cache.
func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context) (uint64, error) {
log.Info("advancer with cache: current tree", zap.Stringer("ct", c.cache))
log.Info("[log backup advancer hint] advancer with cache: current tree", zap.Stringer("ct", c.cache))
rsts := c.cache.PopRangesWithGapGT(config.DefaultTryAdvanceThreshold)
if len(rsts) == 0 {
return 0, nil
}
samples := rsts
if len(rsts) > 3 {
samples = rsts[:3]
}
for _, sample := range samples {
log.Info("[log backup advancer hint] sample range.", zap.Object("range", sample.Zap()), zap.Int("total-len", len(rsts)))
}

workers := utils.NewWorkerPool(uint(config.DefaultMaxConcurrencyAdvance), "regions")
eg, cx := errgroup.WithContext(ctx)
for _, rst := range rsts {
Expand All @@ -242,7 +251,6 @@ func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context)
if err != nil {
return 0, err
}
log.Info("advancer with cache: new tree", zap.Stringer("cache", c.cache))
ts := c.cache.CheckpointTS()
return ts, nil
}
Expand Down Expand Up @@ -420,6 +428,7 @@ func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context, getCheckpo
if err != nil {
return err
}
log.Info("get checkpoint", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
if cp < c.lastCheckpoint {
log.Warn("failed to update global checkpoint: stale", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
}
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/streamhelper/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,16 @@ func (c *storeCollector) sendPendingRequests(ctx context.Context) error {
for _, checkpoint := range cps.Checkpoints {
if checkpoint.Err != nil {
log.Debug("failed to get region checkpoint", zap.Stringer("err", checkpoint.Err))
if checkpoint.Err.EpochNotMatch != nil {
metrics.RegionCheckpointFailure.WithLabelValues("epoch-not-match").Inc()
}
if checkpoint.Err.NotLeader != nil {
metrics.RegionCheckpointFailure.WithLabelValues("not-leader").Inc()
}
metrics.RegionCheckpointRequest.WithLabelValues("fail").Inc()
c.inconsistent = append(c.inconsistent, c.regionMap[checkpoint.Region.Id])
} else {
metrics.RegionCheckpointRequest.WithLabelValues("success").Inc()
if c.onSuccess != nil {
c.onSuccess(checkpoint.Checkpoint, c.regionMap[checkpoint.Region.Id])
}
Expand Down
30 changes: 27 additions & 3 deletions br/pkg/streamhelper/regioniter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ package streamhelper
import (
"bytes"
"context"
"fmt"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
)

const (
Expand Down Expand Up @@ -44,6 +49,13 @@ type RegionIter struct {
PageSize int
}

func (r *RegionIter) String() string {
return fmt.Sprintf("RegionIter:%s;%v;from=%s",
logutil.StringifyKeys([]kv.KeyRange{{StartKey: r.currentStartKey, EndKey: r.endKey}}),
r.infScanFinished,
redact.Key(r.startKey))
}

// IterateRegion creates an iterater over the region range.
func IterateRegion(cli RegionScanner, startKey, endKey []byte) *RegionIter {
return &RegionIter{
Expand Down Expand Up @@ -85,8 +97,17 @@ func CheckRegionConsistency(startKey, endKey []byte, regions []RegionWithLeader)
// Next get the next page of regions.
func (r *RegionIter) Next(ctx context.Context) ([]RegionWithLeader, error) {
var rs []RegionWithLeader
state := utils.InitialRetryState(30, 500*time.Millisecond, 500*time.Millisecond)
err := utils.WithRetry(ctx, func() error {
state := utils.InitialRetryState(8, 500*time.Millisecond, 500*time.Millisecond)
err := utils.WithRetry(ctx, func() (retErr error) {
defer func() {
if retErr != nil {
log.Warn("failed with trying to scan regions", logutil.ShortError(retErr),
logutil.Key("start", r.currentStartKey),
logutil.Key("end", r.endKey),
)
}
metrics.RegionCheckpointFailure.WithLabelValues("retryable-scan-region").Inc()
}()
regions, err := r.cli.RegionScan(ctx, r.currentStartKey, r.endKey, r.PageSize)
if err != nil {
return err
Expand Down Expand Up @@ -115,8 +136,11 @@ func (r *RegionIter) Next(ctx context.Context) ([]RegionWithLeader, error) {

// Done checks whether the iteration is done.
func (r *RegionIter) Done() bool {
// special case: we want to scan to the end of key space.
// at this time, comparing currentStartKey and endKey may be misleading when
// they are both "".
if len(r.endKey) == 0 {
return r.infScanFinished
}
return bytes.Compare(r.currentStartKey, r.endKey) >= 0
return r.infScanFinished || bytes.Compare(r.currentStartKey, r.endKey) >= 0
}
Loading

0 comments on commit 9859512

Please sign in to comment.