Skip to content

Commit

Permalink
This is an automated cherry-pick of #41670
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
Leavrth authored and ti-chi-bot committed Mar 14, 2023
1 parent 113e2cc commit 13adb2c
Show file tree
Hide file tree
Showing 13 changed files with 1,071 additions and 11 deletions.
169 changes: 169 additions & 0 deletions br/pkg/glue/progressing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.

package glue

import (
"context"
"fmt"
"io"
"os"
"time"

"github.com/fatih/color"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/vbauerster/mpb/v7"
"github.com/vbauerster/mpb/v7/decor"
"golang.org/x/term"
)

const OnlyOneTask int = -1

var spinnerText []string = []string{".", "..", "..."}

type pbProgress struct {
bar *mpb.Bar
progress *mpb.Progress
ops ConsoleOperations
}

// Inc increases the progress. This method must be goroutine-safe, and can
// be called from any goroutine.
func (p pbProgress) Inc() {
p.bar.Increment()
}

// IncBy increases the progress by n.
func (p pbProgress) IncBy(n int64) {
p.bar.IncrBy(int(n))
}

func (p pbProgress) GetCurrent() int64 {
return p.bar.Current()
}

// Close marks the progress as 100% complete and that Inc() can no longer be
// called.
func (p pbProgress) Close() {
if p.bar.Completed() || p.bar.Aborted() {
return
}
p.bar.Abort(false)
}

// Wait implements the ProgressWaiter interface.
func (p pbProgress) Wait(ctx context.Context) error {
ch := make(chan struct{})
go func() {
p.progress.Wait()
close(ch)
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-ch:
return nil
}
}

// ProgressWaiter is the extended `Progress“: which provides a `wait` method to
// allow caller wait until all unit in the progress finished.
type ProgressWaiter interface {
Progress
Wait(context.Context) error
}

type noOPWaiter struct {
Progress
}

func (nw noOPWaiter) Wait(context.Context) error {
return nil
}

// cbOnComplete like `decor.OnComplete`, however allow the message provided by a function.
func cbOnComplete(decl decor.Decorator, cb func() string) decor.DecorFunc {
return func(s decor.Statistics) string {
if s.Completed {
return cb()
}
return decl.Decor(s)
}
}

func (ops ConsoleOperations) OutputIsTTY() bool {
f, ok := ops.Out().(*os.File)
if !ok {
return false
}
return term.IsTerminal(int(f.Fd()))
}

// StartProgressBar starts a progress bar with the console operations.
// Note: This function has overlapped function with `glue.StartProgress`, however this supports display extra fields
//
// after success, and implement by `mpb` (instead of `pb`).
//
// Note': Maybe replace the old `StartProgress` with `mpb` too.
func (ops ConsoleOperations) StartProgressBar(title string, total int, extraFields ...ExtraField) ProgressWaiter {
if !ops.OutputIsTTY() {
return ops.startProgressBarOverDummy(title, total, extraFields...)
}
return ops.startProgressBarOverTTY(title, total, extraFields...)
}

func (ops ConsoleOperations) startProgressBarOverDummy(title string, total int, extraFields ...ExtraField) ProgressWaiter {
return noOPWaiter{utils.StartProgress(context.TODO(), title, int64(total), true, nil)}
}

func (ops ConsoleOperations) startProgressBarOverTTY(title string, total int, extraFields ...ExtraField) ProgressWaiter {
pb := mpb.New(mpb.WithOutput(ops.Out()), mpb.WithRefreshRate(400*time.Millisecond))
bar := adjustTotal(pb, title, total, extraFields...)

// If total is zero, finish right now.
if total == 0 {
bar.SetTotal(0, true)
}

return pbProgress{
bar: bar,
ops: ops,
progress: pb,
}
}

func adjustTotal(pb *mpb.Progress, title string, total int, extraFields ...ExtraField) *mpb.Bar {
if total == OnlyOneTask {
return buildOneTaskBar(pb, title, 1)
}
return buildProgressBar(pb, title, total, extraFields...)
}

func buildProgressBar(pb *mpb.Progress, title string, total int, extraFields ...ExtraField) *mpb.Bar {
greenTitle := color.GreenString(title)
return pb.New(int64(total),
// Play as if the old BR style.
mpb.BarStyle().Lbound("<").Filler("-").Padding(".").Rbound(">").Tip("-", "\\", "|", "/", "-").TipOnComplete("-"),
mpb.BarFillerMiddleware(func(bf mpb.BarFiller) mpb.BarFiller {
return mpb.BarFillerFunc(func(w io.Writer, reqWidth int, stat decor.Statistics) {
if stat.Aborted || stat.Completed {
return
}
bf.Fill(w, reqWidth, stat)
})
}),
mpb.PrependDecorators(decor.OnAbort(decor.OnComplete(decor.Name(greenTitle), fmt.Sprintf("%s ::", title)), fmt.Sprintf("%s ::", title))),
mpb.AppendDecorators(decor.OnAbort(decor.Any(cbOnComplete(decor.NewPercentage("%02.2f"), printFinalMessage(extraFields))), color.RedString("ABORTED"))),
)
}

var (
spinnerDoneText string = fmt.Sprintf("... %s", color.GreenString("DONE"))
)

func buildOneTaskBar(pb *mpb.Progress, title string, total int) *mpb.Bar {
return pb.New(int64(total),
mpb.NopStyle(),
mpb.PrependDecorators(decor.Name(title)),
mpb.AppendDecorators(decor.OnAbort(decor.OnComplete(decor.Spinner(spinnerText), spinnerDoneText), color.RedString("ABORTED"))),
)
}
5 changes: 5 additions & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ go_library(
"//br/pkg/metautil",
"//br/pkg/pdutil",
"//br/pkg/redact",
<<<<<<< HEAD
=======
"//br/pkg/restore/ingestrec",
"//br/pkg/restore/prealloc_table_id",
>>>>>>> c8e68765b3 (pitr: add ingest recorder to repair indexes (#41670))
"//br/pkg/restore/split",
"//br/pkg/restore/tiflashrec",
"//br/pkg/rtree",
Expand Down
180 changes: 180 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/redact"
<<<<<<< HEAD
=======
"github.com/pingcap/tidb/br/pkg/restore/ingestrec"
tidalloc "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id"
>>>>>>> c8e68765b3 (pitr: add ingest recorder to repair indexes (#41670))
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
"github.com/pingcap/tidb/br/pkg/rtree"
Expand Down Expand Up @@ -2440,6 +2445,78 @@ func (rc *Client) UpdateSchemaVersion(ctx context.Context) error {
return nil
}

const (
alterTableDropIndexSQL = "ALTER TABLE %n.%n DROP INDEX %n"
alterTableAddIndexFormat = "ALTER TABLE %%n.%%n ADD INDEX %%n(%s)"
alterTableAddUniqueIndexFormat = "ALTER TABLE %%n.%%n ADD UNIQUE KEY %%n(%s)"
alterTableAddPrimaryFormat = "ALTER TABLE %%n.%%n ADD PRIMARY KEY (%s) NONCLUSTERED"
)

// RepairIngestIndex drops the indexes from IngestRecorder and re-add them.
func (rc *Client) RepairIngestIndex(ctx context.Context, ingestRecorder *ingestrec.IngestRecorder, g glue.Glue, storage kv.Storage) error {
dom, err := g.GetDomain(storage)
if err != nil {
return errors.Trace(err)
}
info := dom.InfoSchema()
allSchema := info.AllSchemas()
ingestRecorder.UpdateIndexInfo(allSchema)
console := glue.GetConsole(g)
err = ingestRecorder.Iterate(func(_, _ int64, info *ingestrec.IngestIndexInfo) error {
var (
addSQL strings.Builder
addArgs []interface{} = make([]interface{}, 0, 5+len(info.ColumnArgs))
progressTitle string = fmt.Sprintf("repair ingest index %s for table %s.%s", info.IndexInfo.Name.O, info.SchemaName, info.TableName)
)
w := console.StartProgressBar(progressTitle, glue.OnlyOneTask)
if info.IsPrimary {
addSQL.WriteString(fmt.Sprintf(alterTableAddPrimaryFormat, info.ColumnList))
addArgs = append(addArgs, info.SchemaName, info.TableName)
addArgs = append(addArgs, info.ColumnArgs...)
} else if info.IndexInfo.Unique {
addSQL.WriteString(fmt.Sprintf(alterTableAddUniqueIndexFormat, info.ColumnList))
addArgs = append(addArgs, info.SchemaName, info.TableName, info.IndexInfo.Name.O)
addArgs = append(addArgs, info.ColumnArgs...)
} else {
addSQL.WriteString(fmt.Sprintf(alterTableAddIndexFormat, info.ColumnList))
addArgs = append(addArgs, info.SchemaName, info.TableName, info.IndexInfo.Name.O)
addArgs = append(addArgs, info.ColumnArgs...)
}
// USING BTREE/HASH/RTREE
indexTypeStr := info.IndexInfo.Tp.String()
if len(indexTypeStr) > 0 {
addSQL.WriteString(" USING ")
addSQL.WriteString(indexTypeStr)
}

// COMMENT [...]
if len(info.IndexInfo.Comment) > 0 {
addSQL.WriteString(" COMMENT %?")
addArgs = append(addArgs, info.IndexInfo.Comment)
}

if info.IndexInfo.Invisible {
addSQL.WriteString(" INVISIBLE")
} else {
addSQL.WriteString(" VISIBLE")
}

if err := rc.db.se.ExecuteInternal(ctx, alterTableDropIndexSQL, info.SchemaName, info.TableName, info.IndexInfo.Name.O); err != nil {
return errors.Trace(err)
}
if err := rc.db.se.ExecuteInternal(ctx, addSQL.String(), addArgs...); err != nil {
return errors.Trace(err)
}
w.Inc()
if err := w.Wait(ctx); err != nil {
return errors.Trace(err)
}
w.Close()
return nil
})
return errors.Trace(err)
}

const (
insertDeleteRangeSQLPrefix = `INSERT IGNORE INTO mysql.gc_delete_range VALUES `
insertDeleteRangeSQLValue = "(%d, %d, '%s', '%s', %%[1]d)"
Expand Down Expand Up @@ -2591,6 +2668,109 @@ func (rc *Client) SetWithSysTable(withSysTable bool) {
rc.withSysTable = withSysTable
}

<<<<<<< HEAD
=======
func (rc *Client) ResetTiFlashReplicas(ctx context.Context, g glue.Glue, storage kv.Storage) error {
dom, err := g.GetDomain(storage)
if err != nil {
return errors.Trace(err)
}
info := dom.InfoSchema()
allSchema := info.AllSchemas()
recorder := tiflashrec.New()

expectTiFlashStoreCount := uint64(0)
needTiFlash := false
for _, s := range allSchema {
for _, t := range s.Tables {
if t.TiFlashReplica != nil {
expectTiFlashStoreCount = mathutil.Max(expectTiFlashStoreCount, t.TiFlashReplica.Count)
recorder.AddTable(t.ID, *t.TiFlashReplica)
needTiFlash = true
}
}
}
if !needTiFlash {
log.Info("no need to set tiflash replica, since there is no tables enable tiflash replica")
return nil
}
// we wait for ten minutes to wait tiflash starts.
// since tiflash only starts when set unmark recovery mode finished.
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
err = utils.WithRetry(timeoutCtx, func() error {
tiFlashStoreCount, err := rc.getTiFlashNodeCount(ctx)
log.Info("get tiflash store count for resetting TiFlash Replica",
zap.Uint64("count", tiFlashStoreCount))
if err != nil {
return errors.Trace(err)
}
if tiFlashStoreCount < expectTiFlashStoreCount {
log.Info("still waiting for enough tiflash store start",
zap.Uint64("expect", expectTiFlashStoreCount),
zap.Uint64("actual", tiFlashStoreCount),
)
return errors.New("tiflash store count is less than expected")
}
return nil
}, &waitTiFlashBackoffer{
Attempts: 30,
BaseBackoff: 4 * time.Second,
})
if err != nil {
return err
}

sqls := recorder.GenerateResetAlterTableDDLs(info)
log.Info("Generating SQLs for resetting tiflash replica",
zap.Strings("sqls", sqls))

return g.UseOneShotSession(storage, false, func(se glue.Session) error {
for _, sql := range sqls {
if errExec := se.ExecuteInternal(ctx, sql); errExec != nil {
logutil.WarnTerm("Failed to restore tiflash replica config, you may execute the sql restore it manually.",
logutil.ShortError(errExec),
zap.String("sql", sql),
)
}
}
return nil
})
}

// RangeFilterFromIngestRecorder rewrites the table id of items in the ingestRecorder
// TODO: need to implement the range filter out feature
func (rc *Client) RangeFilterFromIngestRecorder(recorder *ingestrec.IngestRecorder, rewriteRules map[int64]*RewriteRules) error {
err := recorder.RewriteTableID(func(tableID int64) (int64, error) {
rewriteRule, exists := rewriteRules[tableID]
if !exists {
return 0, errors.Errorf("rewriteRule not found, tableID: %d", tableID)
}
newTableID := GetRewriteTableID(tableID, rewriteRule)
if newTableID == 0 {
return 0, errors.Errorf("newTableID is 0, tableID: %d", tableID)
}
return newTableID, nil
})
return errors.Trace(err)
/* TODO: we can use range filter to skip restoring the index kv using accelerated indexing feature
filter := rtree.NewRangeTree()
err = recorder.Iterate(func(tableID int64, indexID int64, info *ingestrec.IngestIndexInfo) error {
// range after table ID rewritten
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
rg := rtree.Range{
StartKey: codec.EncodeBytes([]byte{}, startKey),
EndKey: codec.EncodeBytes([]byte{}, endKey),
}
filter.InsertRange(rg)
return nil
})
return errors.Trace(err)
*/
}

>>>>>>> c8e68765b3 (pitr: add ingest recorder to repair indexes (#41670))
// MockClient create a fake client used to test.
func MockClient(dbs map[string]*utils.Database) *Client {
return &Client{databases: dbs}
Expand Down
Loading

0 comments on commit 13adb2c

Please sign in to comment.