Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pitr: add ingest recorder to repair indexes #41670

Merged
merged 18 commits into from
Mar 14, 2023
Merged
1 change: 1 addition & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//br/pkg/metautil",
"//br/pkg/pdutil",
"//br/pkg/redact",
"//br/pkg/restore/ingestrec",
"//br/pkg/restore/prealloc_table_id",
"//br/pkg/restore/split",
"//br/pkg/restore/tiflashrec",
Expand Down
95 changes: 95 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/restore/ingestrec"
tidalloc "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
Expand Down Expand Up @@ -2535,6 +2536,68 @@ func (rc *Client) UpdateSchemaVersion(ctx context.Context) error {
return nil
}

const (
alterTableDropIndexFormat = "ALTER TABLE `%s`.`%s` DROP INDEX `%s`"
alterTableAddIndexFormat = "ALTER TABLE `%s`.`%s` ADD INDEX `%s`(%s)"
alterTableAddUniqueIndexFormat = "ALTER TABLE `%s`.`%s` ADD UNIQUE KEY `%s`(%s)"
alterTableAddPrimaryFormat = "ALTER TABLE `%s`.`%s` ADD PRIMARY KEY (%s) NONCLUSTERED"
)

// RepairIngestIndex drops the indexes from IngestRecorder and re-add them.
func (rc *Client) RepairIngestIndex(ctx context.Context, ingestRecorder *ingestrec.IngestRecorder, g glue.Glue, storage kv.Storage) error {
dom, err := g.GetDomain(storage)
if err != nil {
return errors.Trace(err)
}
info := dom.InfoSchema()
allSchema := info.AllSchemas()
ingestRecorder.UpdateIndexInfo(allSchema)
err = ingestRecorder.Iterate(func(_, _ int64, info *ingestrec.IngestIndexInfo) error {
var (
dropSQL string = fmt.Sprintf(alterTableDropIndexFormat, info.SchemaName, info.TableName, info.IndexInfo.Name.O)
addSQL string
indexTypeSQL string
commentSQL string
visibleSQL string = " VISIBLE"
)

if info.IsPrimary {
addSQL = fmt.Sprintf(alterTableAddPrimaryFormat, info.SchemaName, info.TableName, info.ColumnList)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we escape here? Perhaps some guys like to name their indices like(How absurd!):

CREATE INDEX `idx``; DROP DATABASE business; --`(some_row);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. test updated

} else if info.IndexInfo.Unique {
addSQL = fmt.Sprintf(alterTableAddUniqueIndexFormat, info.SchemaName, info.TableName, info.IndexInfo.Name.O, info.ColumnList)
} else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add generate uk statement

addSQL = fmt.Sprintf(alterTableAddIndexFormat, info.SchemaName, info.TableName, info.IndexInfo.Name.O, info.ColumnList)
}
// USING BTREE/HASH/RTREE
indexTypeStr := info.IndexInfo.Tp.String()
if len(indexTypeStr) > 0 {
indexTypeSQL = fmt.Sprintf(" USING %s", indexTypeStr)
}

// COMMENT [...]
if len(info.IndexInfo.Comment) > 0 {
commentSQL = " COMMENT %?"
}

if info.IndexInfo.Invisible {
visibleSQL = " INVISIBLE"
}

addSQL = fmt.Sprintf("%s%s%s%s", addSQL, indexTypeSQL, commentSQL, visibleSQL)

log.Debug("repair ingest sql", zap.String("drop", dropSQL))
if err := rc.db.se.Execute(ctx, dropSQL); err != nil {
return errors.Trace(err)
}
log.Debug("repair ingest sql", zap.String("add", addSQL))
if err := rc.db.se.ExecuteInternal(ctx, addSQL, info.IndexInfo.Comment); err != nil {
return errors.Trace(err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the debug log left here on purpose? If not, please remove them.

Why does dropSQL use Execute() and addSQL() use ExecuteInternal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the Comment has some escape characters such as " and \n, so here use ExecuteInter to send with args instead of plaintext.
The function pattern is as follow:
Execute(context, sql string)
ExecuteInternal(context, sql string, args...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, both use ExecuteInternal.

return nil
})
return errors.Trace(err)
}

const (
insertDeleteRangeSQLPrefix = `INSERT IGNORE INTO mysql.gc_delete_range VALUES `
insertDeleteRangeSQLValue = "(%d, %d, '%s', '%s', %%[1]d)"
Expand Down Expand Up @@ -2754,6 +2817,38 @@ func (rc *Client) ResetTiFlashReplicas(ctx context.Context, g glue.Glue, storage
})
}

// RangeFilterFromIngestRecorder rewrites the table id of items in the ingestRecorder
// TODO: need to implement the range filter out feature
func (rc *Client) RangeFilterFromIngestRecorder(recorder *ingestrec.IngestRecorder, rewriteRules map[int64]*RewriteRules) error {
filter := rtree.NewRangeTree()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the filter used for?

err := recorder.RewriteTableID(func(tableID int64) (int64, error) {
rewriteRule, exists := rewriteRules[tableID]
if !exists {
return 0, errors.Errorf("rewriteRule not found, tableID: %d", tableID)
}
newTableID := GetRewriteTableID(tableID, rewriteRule)
if newTableID == 0 {
return 0, errors.Errorf("newTableID is 0, tableID: %d", tableID)
}
return newTableID, nil
})
if err != nil {
return errors.Trace(err)
}
err = recorder.Iterate(func(tableID int64, indexID int64, info *ingestrec.IngestIndexInfo) error {
// range after table ID rewritten
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
rg := rtree.Range{
StartKey: codec.EncodeBytes([]byte{}, startKey),
EndKey: codec.EncodeBytes([]byte{}, endKey),
}
filter.InsertRange(rg)
return nil
})
return errors.Trace(err)
}

// MockClient create a fake client used to test.
func MockClient(dbs map[string]*utils.Database) *Client {
return &Client{databases: dbs}
Expand Down
24 changes: 24 additions & 0 deletions br/pkg/restore/ingestrec/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "ingestrec",
srcs = ["ingest_recorder.go"],
importpath = "github.com/pingcap/tidb/br/pkg/restore/ingestrec",
visibility = ["//visibility:public"],
deps = [
"//parser/model",
"//types",
"@com_github_pingcap_errors//:errors",
],
)

go_test(
name = "ingestrec_test",
srcs = ["ingest_recorder_test.go"],
deps = [
":ingestrec",
"//parser/model",
"@com_github_pkg_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
182 changes: 182 additions & 0 deletions br/pkg/restore/ingestrec/ingest_recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ingestrec

import (
"fmt"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/types"
)

// IngestIndexInfo records the information used to generate index drop/re-add SQL.
type IngestIndexInfo struct {
SchemaName string
TableName string
ColumnList string
IsPrimary bool
IndexInfo *model.IndexInfo
Updated bool
}

// IngestRecorder records the indexes information that use ingest mode to construct kvs.
// Currently log backup cannot backed up these ingest kvs. So need to re-construct them.
type IngestRecorder struct {
// Table ID -> Index ID -> Index info
items map[int64]map[int64]*IngestIndexInfo
}

// Return an empty IngestRecorder
func New() *IngestRecorder {
return &IngestRecorder{
items: make(map[int64]map[int64]*IngestIndexInfo),
}
}

func notIngestJob(job *model.Job) bool {
return job.ReorgMeta == nil ||
job.ReorgMeta.ReorgTp != model.ReorgTypeLitMerge
}

func notAddIndexJob(job *model.Job) bool {
/* support new index using accelerated indexing in future:
* // 1. skip if the new index didn't generate new kvs
* // 2. shall the ReorgTp of ModifyColumnJob be ReorgTypeLitMerge if use accelerated indexing?
* if job.RowCount > 0 && notIngestJob(job) {
* // ASSERT: select new indexes, which have the highest IDs in this job's BinlogInfo
* newIndexesInfo := getIndexesWithTheHighestIDs(len(indexIDs))
* for _, newIndexInfo := range newIndexesInfo {
* tableindexes[newIndexInfo.ID] = ...
* }
* }
*/
return job.Type != model.ActionAddIndex &&
job.Type != model.ActionAddPrimaryKey
}

func notSynced(job *model.Job) bool {
return job.State != model.JobStateSynced
}

// AddJob firstly filters the ingest index add operation job, and records it into IngestRecorder.
func (i *IngestRecorder) AddJob(job *model.Job) error {
if job == nil || notIngestJob(job) || notAddIndexJob(job) || notSynced(job) {
return nil
}

// If the add-index operation affects no row, the index doesn't need to be repair.
if job.RowCount == 0 {
return nil
}

var indexID int64 = 0
if err := job.DecodeArgs(&indexID); err != nil {
return errors.Trace(err)
}

tableindexes, exists := i.items[job.TableID]
if !exists {
tableindexes = make(map[int64]*IngestIndexInfo)
i.items[job.TableID] = tableindexes
}

// the current information of table/index might be modified by other ddl jobs,
// therefore update the index information at last
tableindexes[indexID] = &IngestIndexInfo{
IsPrimary: job.Type == model.ActionAddPrimaryKey,
Updated: false,
}
tangenta marked this conversation as resolved.
Show resolved Hide resolved

return nil
}

// RerwiteIndexInfo rewrites the table id of the items in the IngestRecorder
func (i *IngestRecorder) RewriteTableID(rewriteFunc func(tableID int64) (int64, error)) error {
newItems := make(map[int64]map[int64]*IngestIndexInfo)
for tableID, item := range i.items {
newTableID, err := rewriteFunc(tableID)
if err != nil {
return errors.Annotatef(err, "failed to rewrite table id: %d", tableID)
}
newItems[newTableID] = item
}
i.items = newItems
return nil
}

// UpdateIndexInfo uses the newest schemas to update the ingest index's information
func (i *IngestRecorder) UpdateIndexInfo(dbInfos []*model.DBInfo) {
for _, dbInfo := range dbInfos {
for _, tblInfo := range dbInfo.Tables {
tableindexes, tblexists := i.items[tblInfo.ID]
if !tblexists {
continue
}
for _, indexInfo := range tblInfo.Indices {
index, idxexists := tableindexes[indexInfo.ID]
if !idxexists {
continue
}
var columnListBuilder strings.Builder
var isFirst bool = true
for _, column := range indexInfo.Columns {
if !isFirst {
columnListBuilder.WriteByte(',')
}
isFirst = false

// expression / column
col := tblInfo.Columns[column.Offset]
if col.Hidden {
// (expression)
columnListBuilder.WriteByte('(')
columnListBuilder.WriteString(col.GeneratedExprString)
columnListBuilder.WriteByte(')')
} else {
// `columnName`
columnListBuilder.WriteByte('`')
columnListBuilder.WriteString(column.Name.O)
columnListBuilder.WriteByte('`')
if column.Length != types.UnspecifiedLength {
columnListBuilder.WriteString(fmt.Sprintf("(%d)", column.Length))
}
}
}
index.ColumnList = columnListBuilder.String()
index.IndexInfo = indexInfo
index.SchemaName = dbInfo.Name.O
index.TableName = tblInfo.Name.O
index.Updated = true
}
}
}
}

// Iterate iterates all the ingest index.
func (i *IngestRecorder) Iterate(f func(tableID int64, indexID int64, info *IngestIndexInfo) error) error {
for tableID, is := range i.items {
for indexID, info := range is {
if !info.Updated {
continue
}
if err := f(tableID, indexID, info); err != nil {
return errors.Trace(err)
}
}
}
return nil
}
Loading