Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pitr: add ingest recorder to repair indexes #41670

Merged
merged 18 commits into from
Mar 14, 2023
Merged
47 changes: 37 additions & 10 deletions br/pkg/glue/progressing.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (
"golang.org/x/term"
)

const OnlyOneTask int = -1

var spinnerText []string = []string{".", "..", "..."}

type pbProgress struct {
bar *mpb.Bar
progress *mpb.Progress
Expand Down Expand Up @@ -113,8 +117,30 @@ func (ops ConsoleOperations) startProgressBarOverDummy(title string, total int,

func (ops ConsoleOperations) startProgressBarOverTTY(title string, total int, extraFields ...ExtraField) ProgressWaiter {
pb := mpb.New(mpb.WithOutput(ops.Out()), mpb.WithRefreshRate(400*time.Millisecond))
bar := adjustTotal(pb, title, total, extraFields...)

// If total is zero, finish right now.
if total == 0 {
bar.SetTotal(0, true)
}

return pbProgress{
bar: bar,
ops: ops,
progress: pb,
}
}

func adjustTotal(pb *mpb.Progress, title string, total int, extraFields ...ExtraField) *mpb.Bar {
if total == OnlyOneTask {
return buildOneTaskBar(pb, title, 1)
}
return buildProgressBar(pb, title, total, extraFields...)
}

func buildProgressBar(pb *mpb.Progress, title string, total int, extraFields ...ExtraField) *mpb.Bar {
greenTitle := color.GreenString(title)
bar := pb.New(int64(total),
return pb.New(int64(total),
// Play as if the old BR style.
mpb.BarStyle().Lbound("<").Filler("-").Padding(".").Rbound(">").Tip("-", "\\", "|", "/", "-").TipOnComplete("-"),
mpb.BarFillerMiddleware(func(bf mpb.BarFiller) mpb.BarFiller {
Expand All @@ -128,15 +154,16 @@ func (ops ConsoleOperations) startProgressBarOverTTY(title string, total int, ex
mpb.PrependDecorators(decor.OnAbort(decor.OnComplete(decor.Name(greenTitle), fmt.Sprintf("%s ::", title)), fmt.Sprintf("%s ::", title))),
mpb.AppendDecorators(decor.OnAbort(decor.Any(cbOnComplete(decor.NewPercentage("%02.2f"), printFinalMessage(extraFields))), color.RedString("ABORTED"))),
)
}

// If total is zero, finish right now.
if total == 0 {
bar.SetTotal(0, true)
}
var (
spinnerDoneText string = fmt.Sprintf("... %s", color.GreenString("DONE"))
)

return pbProgress{
bar: bar,
ops: ops,
progress: pb,
}
func buildOneTaskBar(pb *mpb.Progress, title string, total int) *mpb.Bar {
return pb.New(int64(total),
mpb.NopStyle(),
mpb.PrependDecorators(decor.Name(title)),
mpb.AppendDecorators(decor.OnAbort(decor.OnComplete(decor.Spinner(spinnerText), spinnerDoneText), color.RedString("ABORTED"))),
)
}
1 change: 1 addition & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//br/pkg/metautil",
"//br/pkg/pdutil",
"//br/pkg/redact",
"//br/pkg/restore/ingestrec",
"//br/pkg/restore/prealloc_table_id",
"//br/pkg/restore/split",
"//br/pkg/restore/tiflashrec",
Expand Down
105 changes: 105 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/restore/ingestrec"
tidalloc "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
Expand Down Expand Up @@ -2534,6 +2535,78 @@ func (rc *Client) UpdateSchemaVersion(ctx context.Context) error {
return nil
}

const (
alterTableDropIndexSQL = "ALTER TABLE %n.%n DROP INDEX %n"
alterTableAddIndexFormat = "ALTER TABLE %%n.%%n ADD INDEX %%n(%s)"
alterTableAddUniqueIndexFormat = "ALTER TABLE %%n.%%n ADD UNIQUE KEY %%n(%s)"
alterTableAddPrimaryFormat = "ALTER TABLE %%n.%%n ADD PRIMARY KEY (%s) NONCLUSTERED"
)

// RepairIngestIndex drops the indexes from IngestRecorder and re-add them.
func (rc *Client) RepairIngestIndex(ctx context.Context, ingestRecorder *ingestrec.IngestRecorder, g glue.Glue, storage kv.Storage) error {
dom, err := g.GetDomain(storage)
if err != nil {
return errors.Trace(err)
}
info := dom.InfoSchema()
allSchema := info.AllSchemas()
ingestRecorder.UpdateIndexInfo(allSchema)
console := glue.GetConsole(g)
err = ingestRecorder.Iterate(func(_, _ int64, info *ingestrec.IngestIndexInfo) error {
var (
addSQL strings.Builder
addArgs []interface{} = make([]interface{}, 0, 5+len(info.ColumnArgs))
progressTitle string = fmt.Sprintf("repair ingest index %s for table %s.%s", info.IndexInfo.Name.O, info.SchemaName, info.TableName)
)
w := console.StartProgressBar(progressTitle, glue.OnlyOneTask)
if info.IsPrimary {
addSQL.WriteString(fmt.Sprintf(alterTableAddPrimaryFormat, info.ColumnList))
addArgs = append(addArgs, info.SchemaName, info.TableName)
addArgs = append(addArgs, info.ColumnArgs...)
} else if info.IndexInfo.Unique {
addSQL.WriteString(fmt.Sprintf(alterTableAddUniqueIndexFormat, info.ColumnList))
addArgs = append(addArgs, info.SchemaName, info.TableName, info.IndexInfo.Name.O)
addArgs = append(addArgs, info.ColumnArgs...)
} else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add generate uk statement

addSQL.WriteString(fmt.Sprintf(alterTableAddIndexFormat, info.ColumnList))
addArgs = append(addArgs, info.SchemaName, info.TableName, info.IndexInfo.Name.O)
addArgs = append(addArgs, info.ColumnArgs...)
}
// USING BTREE/HASH/RTREE
indexTypeStr := info.IndexInfo.Tp.String()
if len(indexTypeStr) > 0 {
addSQL.WriteString(" USING ")
addSQL.WriteString(indexTypeStr)
}

// COMMENT [...]
if len(info.IndexInfo.Comment) > 0 {
addSQL.WriteString(" COMMENT %?")
addArgs = append(addArgs, info.IndexInfo.Comment)
}

if info.IndexInfo.Invisible {
addSQL.WriteString(" INVISIBLE")
} else {
addSQL.WriteString(" VISIBLE")
}

if err := rc.db.se.ExecuteInternal(ctx, alterTableDropIndexSQL, info.SchemaName, info.TableName, info.IndexInfo.Name.O); err != nil {
return errors.Trace(err)
}
if err := rc.db.se.ExecuteInternal(ctx, addSQL.String(), addArgs...); err != nil {
return errors.Trace(err)
}
w.Inc()
if err := w.Wait(ctx); err != nil {
return errors.Trace(err)
}
w.Close()
return nil
})
return errors.Trace(err)
}

const (
insertDeleteRangeSQLPrefix = `INSERT IGNORE INTO mysql.gc_delete_range VALUES `
insertDeleteRangeSQLValue = "(%d, %d, '%s', '%s', %%[1]d)"
Expand Down Expand Up @@ -2753,6 +2826,38 @@ func (rc *Client) ResetTiFlashReplicas(ctx context.Context, g glue.Glue, storage
})
}

// RangeFilterFromIngestRecorder rewrites the table id of items in the ingestRecorder
// TODO: need to implement the range filter out feature
func (rc *Client) RangeFilterFromIngestRecorder(recorder *ingestrec.IngestRecorder, rewriteRules map[int64]*RewriteRules) error {
err := recorder.RewriteTableID(func(tableID int64) (int64, error) {
rewriteRule, exists := rewriteRules[tableID]
if !exists {
return 0, errors.Errorf("rewriteRule not found, tableID: %d", tableID)
}
newTableID := GetRewriteTableID(tableID, rewriteRule)
if newTableID == 0 {
return 0, errors.Errorf("newTableID is 0, tableID: %d", tableID)
}
return newTableID, nil
})
return errors.Trace(err)
/* TODO: we can use range filter to skip restoring the index kv using accelerated indexing feature
filter := rtree.NewRangeTree()
err = recorder.Iterate(func(tableID int64, indexID int64, info *ingestrec.IngestIndexInfo) error {
// range after table ID rewritten
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
rg := rtree.Range{
StartKey: codec.EncodeBytes([]byte{}, startKey),
EndKey: codec.EncodeBytes([]byte{}, endKey),
}
filter.InsertRange(rg)
return nil
})
return errors.Trace(err)
*/
}

// MockClient create a fake client used to test.
func MockClient(dbs map[string]*utils.Database) *Client {
return &Client{databases: dbs}
Expand Down
24 changes: 24 additions & 0 deletions br/pkg/restore/ingestrec/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "ingestrec",
srcs = ["ingest_recorder.go"],
importpath = "github.com/pingcap/tidb/br/pkg/restore/ingestrec",
visibility = ["//visibility:public"],
deps = [
"//parser/model",
"//types",
"@com_github_pingcap_errors//:errors",
],
)

go_test(
name = "ingestrec_test",
srcs = ["ingest_recorder_test.go"],
deps = [
":ingestrec",
"//parser/model",
"@com_github_pkg_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
Loading