Skip to content

Commit

Permalink
pitr: Revert "pitr: add ingest recorder to repair indexes (#41670) (#…
Browse files Browse the repository at this point in the history
…46418)" (#46785)

ref #38045, close #41668
  • Loading branch information
Leavrth authored Sep 8, 2023
1 parent f096450 commit 8f340f8
Show file tree
Hide file tree
Showing 14 changed files with 37 additions and 788 deletions.
47 changes: 10 additions & 37 deletions br/pkg/glue/progressing.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ import (
"golang.org/x/term"
)

const OnlyOneTask int = -1

var spinnerText []string = []string{".", "..", "..."}

type pbProgress struct {
bar *mpb.Bar
progress *mpb.Progress
Expand Down Expand Up @@ -117,30 +113,8 @@ func (ops ConsoleOperations) startProgressBarOverDummy(title string, total int,

func (ops ConsoleOperations) startProgressBarOverTTY(title string, total int, extraFields ...ExtraField) ProgressWaiter {
pb := mpb.New(mpb.WithOutput(ops.Out()), mpb.WithRefreshRate(400*time.Millisecond))
bar := adjustTotal(pb, title, total, extraFields...)

// If total is zero, finish right now.
if total == 0 {
bar.SetTotal(0, true)
}

return pbProgress{
bar: bar,
ops: ops,
progress: pb,
}
}

func adjustTotal(pb *mpb.Progress, title string, total int, extraFields ...ExtraField) *mpb.Bar {
if total == OnlyOneTask {
return buildOneTaskBar(pb, title, 1)
}
return buildProgressBar(pb, title, total, extraFields...)
}

func buildProgressBar(pb *mpb.Progress, title string, total int, extraFields ...ExtraField) *mpb.Bar {
greenTitle := color.GreenString(title)
return pb.New(int64(total),
bar := pb.New(int64(total),
// Play as if the old BR style.
mpb.BarStyle().Lbound("<").Filler("-").Padding(".").Rbound(">").Tip("-", "\\", "|", "/", "-").TipOnComplete("-"),
mpb.BarFillerMiddleware(func(bf mpb.BarFiller) mpb.BarFiller {
Expand All @@ -154,16 +128,15 @@ func buildProgressBar(pb *mpb.Progress, title string, total int, extraFields ...
mpb.PrependDecorators(decor.OnAbort(decor.OnComplete(decor.Name(greenTitle), fmt.Sprintf("%s ::", title)), fmt.Sprintf("%s ::", title))),
mpb.AppendDecorators(decor.OnAbort(decor.Any(cbOnComplete(decor.NewPercentage("%02.2f"), printFinalMessage(extraFields))), color.RedString("ABORTED"))),
)
}

var (
spinnerDoneText string = fmt.Sprintf("... %s", color.GreenString("DONE"))
)
// If total is zero, finish right now.
if total == 0 {
bar.SetTotal(0, true)
}

func buildOneTaskBar(pb *mpb.Progress, title string, total int) *mpb.Bar {
return pb.New(int64(total),
mpb.NopStyle(),
mpb.PrependDecorators(decor.Name(title)),
mpb.AppendDecorators(decor.OnAbort(decor.OnComplete(decor.Spinner(spinnerText), spinnerDoneText), color.RedString("ABORTED"))),
)
return pbProgress{
bar: bar,
ops: ops,
progress: pb,
}
}
1 change: 0 additions & 1 deletion br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ go_library(
"//br/pkg/metautil",
"//br/pkg/pdutil",
"//br/pkg/redact",
"//br/pkg/restore/ingestrec",
"//br/pkg/restore/prealloc_table_id",
"//br/pkg/restore/split",
"//br/pkg/restore/tiflashrec",
Expand Down
106 changes: 0 additions & 106 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/restore/ingestrec"
tidalloc "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
Expand Down Expand Up @@ -2618,78 +2617,6 @@ func (rc *Client) UpdateSchemaVersion(ctx context.Context) error {
return nil
}

const (
alterTableDropIndexSQL = "ALTER TABLE %n.%n DROP INDEX %n"
alterTableAddIndexFormat = "ALTER TABLE %%n.%%n ADD INDEX %%n(%s)"
alterTableAddUniqueIndexFormat = "ALTER TABLE %%n.%%n ADD UNIQUE KEY %%n(%s)"
alterTableAddPrimaryFormat = "ALTER TABLE %%n.%%n ADD PRIMARY KEY (%s) NONCLUSTERED"
)

// RepairIngestIndex drops the indexes from IngestRecorder and re-add them.
func (rc *Client) RepairIngestIndex(ctx context.Context, ingestRecorder *ingestrec.IngestRecorder, g glue.Glue, storage kv.Storage) error {
dom, err := g.GetDomain(storage)
if err != nil {
return errors.Trace(err)
}
info := dom.InfoSchema()
allSchema := info.AllSchemas()
ingestRecorder.UpdateIndexInfo(allSchema)
console := glue.GetConsole(g)
err = ingestRecorder.Iterate(func(_, _ int64, info *ingestrec.IngestIndexInfo) error {
var (
addSQL strings.Builder
addArgs []interface{} = make([]interface{}, 0, 5+len(info.ColumnArgs))
progressTitle string = fmt.Sprintf("repair ingest index %s for table %s.%s", info.IndexInfo.Name.O, info.SchemaName, info.TableName)
)
w := console.StartProgressBar(progressTitle, glue.OnlyOneTask)
if info.IsPrimary {
addSQL.WriteString(fmt.Sprintf(alterTableAddPrimaryFormat, info.ColumnList))
addArgs = append(addArgs, info.SchemaName, info.TableName)
addArgs = append(addArgs, info.ColumnArgs...)
} else if info.IndexInfo.Unique {
addSQL.WriteString(fmt.Sprintf(alterTableAddUniqueIndexFormat, info.ColumnList))
addArgs = append(addArgs, info.SchemaName, info.TableName, info.IndexInfo.Name.O)
addArgs = append(addArgs, info.ColumnArgs...)
} else {
addSQL.WriteString(fmt.Sprintf(alterTableAddIndexFormat, info.ColumnList))
addArgs = append(addArgs, info.SchemaName, info.TableName, info.IndexInfo.Name.O)
addArgs = append(addArgs, info.ColumnArgs...)
}
// USING BTREE/HASH/RTREE
indexTypeStr := info.IndexInfo.Tp.String()
if len(indexTypeStr) > 0 {
addSQL.WriteString(" USING ")
addSQL.WriteString(indexTypeStr)
}

// COMMENT [...]
if len(info.IndexInfo.Comment) > 0 {
addSQL.WriteString(" COMMENT %?")
addArgs = append(addArgs, info.IndexInfo.Comment)
}

if info.IndexInfo.Invisible {
addSQL.WriteString(" INVISIBLE")
} else {
addSQL.WriteString(" VISIBLE")
}

if err := rc.db.se.ExecuteInternal(ctx, alterTableDropIndexSQL, info.SchemaName, info.TableName, info.IndexInfo.Name.O); err != nil {
return errors.Trace(err)
}
if err := rc.db.se.ExecuteInternal(ctx, addSQL.String(), addArgs...); err != nil {
return errors.Trace(err)
}
w.Inc()
if err := w.Wait(ctx); err != nil {
return errors.Trace(err)
}
w.Close()
return nil
})
return errors.Trace(err)
}

const (
insertDeleteRangeSQLPrefix = `INSERT IGNORE INTO mysql.gc_delete_range VALUES `
insertDeleteRangeSQLValue = "(%d, %d, '%s', '%s', %%[1]d)"
Expand Down Expand Up @@ -2909,39 +2836,6 @@ func (rc *Client) ResetTiFlashReplicas(ctx context.Context, g glue.Glue, storage
})
}

// RangeFilterFromIngestRecorder rewrites the table id of items in the ingestRecorder
// TODO: need to implement the range filter out feature
func (rc *Client) RangeFilterFromIngestRecorder(recorder *ingestrec.IngestRecorder, rewriteRules map[int64]*RewriteRules) error {
err := recorder.RewriteTableID(func(tableID int64) (int64, bool, error) {
rewriteRule, exists := rewriteRules[tableID]
if !exists {
// since the table's files will be skipped restoring, here also skips.
return 0, true, nil
}
newTableID := GetRewriteTableID(tableID, rewriteRule)
if newTableID == 0 {
return 0, false, errors.Errorf("newTableID is 0, tableID: %d", tableID)
}
return newTableID, false, nil
})
return errors.Trace(err)
/* TODO: we can use range filter to skip restoring the index kv using accelerated indexing feature
filter := rtree.NewRangeTree()
err = recorder.Iterate(func(tableID int64, indexID int64, info *ingestrec.IngestIndexInfo) error {
// range after table ID rewritten
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
rg := rtree.Range{
StartKey: codec.EncodeBytes([]byte{}, startKey),
EndKey: codec.EncodeBytes([]byte{}, endKey),
}
filter.InsertRange(rg)
return nil
})
return errors.Trace(err)
*/
}

// MockClient create a fake client used to test.
func MockClient(dbs map[string]*utils.Database) *Client {
return &Client{databases: dbs}
Expand Down
24 changes: 0 additions & 24 deletions br/pkg/restore/ingestrec/BUILD.bazel

This file was deleted.

Loading

0 comments on commit 8f340f8

Please sign in to comment.