Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
backup,restore: support backing up / restore system databases (#1048) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored May 11, 2021
1 parent 75e95a2 commit bfeea3f
Show file tree
Hide file tree
Showing 17 changed files with 319 additions and 11 deletions.
2 changes: 1 addition & 1 deletion cmd/br/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func newFullBackupCommand() *cobra.Command {
return runBackupCommand(command, "Full backup")
},
}
task.DefineFilterFlags(command)
task.DefineFilterFlags(command, acceptAllTables)
return command
}

Expand Down
14 changes: 14 additions & 0 deletions cmd/br/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ var (
hasLogFile uint64
tidbGlue = gluetidb.New()
envLogToTermKey = "BR_LOG_TO_TERM"

filterOutSysAndMemTables = []string{
"*.*",
fmt.Sprintf("!%s.*", utils.TemporaryDBName("*")),
"!mysql.*",
"!sys.*",
"!INFORMATION_SCHEMA.*",
"!PERFORMANCE_SCHEMA.*",
"!METRICS_SCHEMA.*",
"!INSPECTION_SCHEMA.*",
}
acceptAllTables = []string{
"*.*",
}
)

const (
Expand Down
4 changes: 2 additions & 2 deletions cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func newFullRestoreCommand() *cobra.Command {
return runRestoreCommand(cmd, "Full restore")
},
}
task.DefineFilterFlags(command)
task.DefineFilterFlags(command, filterOutSysAndMemTables)
return command
}

Expand Down Expand Up @@ -136,7 +136,7 @@ func newLogRestoreCommand() *cobra.Command {
return runLogRestoreCommand(cmd)
},
}
task.DefineFilterFlags(command)
task.DefineFilterFlags(command, filterOutSysAndMemTables)
task.DefineLogRestoreFlags(command)
return command
}
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,8 @@ error = '''
failed to write and ingest
'''

["BR:Restore:ErrUnsupportedSysTable"]
error = '''
the system table isn't supported for restoring yet
'''

14 changes: 13 additions & 1 deletion pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func BuildBackupRangeAndSchema(

for _, dbInfo := range dbs {
// skip system databases
if util.IsMemOrSysDB(dbInfo.Name.L) {
if !tableFilter.MatchSchema(dbInfo.Name.O) || isMemDB(dbInfo.Name.L) {
continue
}

Expand Down Expand Up @@ -1030,3 +1030,15 @@ func CollectChecksums(backupMeta *backuppb.BackupMeta) ([]Checksum, error) {
func isRetryableError(err error) bool {
return status.Code(err) == codes.Unavailable
}

// isMemDB checks whether dbLowerName is memory database.
// Remove it when tidb.utils has this function
func isMemDB(dbLowerName string) bool {
switch dbLowerName {
case util.InformationSchemaName.L,
util.PerformanceSchemaName.L,
util.MetricSchemaName.L:
return true
}
return false
}
3 changes: 3 additions & 0 deletions pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ func (ss *Schemas) BackupSchemas(

schemas := make([]*backuppb.Schema, 0, len(ss.schemas))
for name, schema := range ss.schemas {
if utils.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)
}
dbBytes, err := json.Marshal(schema.dbInfo)
if err != nil {
return nil, errors.Trace(err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) {
c.Assert(backupSchemas, IsNil)

// Empty database.
noFilter, err := filter.Parse([]string{"*.*"})
// Filter out system tables manually.
noFilter, err := filter.Parse([]string{"*.*", "!mysql.*"})
c.Assert(err, IsNil)
_, backupSchemas, err = backup.BuildBackupRangeAndSchema(
s.mock.Storage, noFilter, math.MaxUint64)
Expand Down
1 change: 1 addition & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
ErrRestoreInvalidRange = errors.Normalize("invalid restore range", errors.RFCCodeText("BR:Restore:ErrRestoreInvalidRange"))
ErrRestoreWriteAndIngest = errors.Normalize("failed to write and ingest", errors.RFCCodeText("BR:Restore:ErrRestoreWriteAndIngest"))
ErrRestoreSchemaNotExists = errors.Normalize("schema not exists", errors.RFCCodeText("BR:Restore:ErrRestoreSchemaNotExists"))
ErrUnsupportedSystemTable = errors.Normalize("the system table isn't supported for restoring yet", errors.RFCCodeText("BR:Restore:ErrUnsupportedSysTable"))

// TODO maybe it belongs to PiTR.
ErrRestoreRTsConstrain = errors.Normalize("resolved ts constrain violation", errors.RFCCodeText("BR:Restore:ErrRestoreResolvedTsConstrain"))
Expand Down
2 changes: 1 addition & 1 deletion pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (g Glue) GetVersion() string {

// Execute implements glue.Session.
func (gs *tidbSession) Execute(ctx context.Context, sql string) error {
_, err := gs.se.Execute(ctx, sql)
_, err := gs.se.ExecuteInternal(ctx, sql)
return errors.Trace(err)
}

Expand Down
185 changes: 185 additions & 0 deletions pkg/restore/systable_restore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package restore

import (
"context"
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"go.uber.org/multierr"
"go.uber.org/zap"

berrors "github.com/pingcap/br/pkg/errors"
"github.com/pingcap/br/pkg/logutil"
"github.com/pingcap/br/pkg/utils"
)

var statsTables = map[string]struct{}{
"stats_buckets": {},
"stats_extended": {},
"stats_feedback": {},
"stats_fm_sketch": {},
"stats_histograms": {},
"stats_meta": {},
"stats_top_n": {},
}

func isStatsTable(tableName string) bool {
_, ok := statsTables[tableName]
return ok
}

// RestoreSystemSchemas restores the system schema(i.e. the `mysql` schema).
// Detail see https://github.com/pingcap/br/issues/679#issuecomment-762592254.
func (rc *Client) RestoreSystemSchemas(ctx context.Context, f filter.Filter) {
sysDB := mysql.SystemDB

temporaryDB := utils.TemporaryDBName(sysDB)
defer rc.cleanTemporaryDatabase(ctx, sysDB)

if !f.MatchSchema(temporaryDB.O) {
log.Debug("system database filtered out", zap.String("database", sysDB))
return
}
originDatabase, ok := rc.databases[temporaryDB.O]
if !ok {
log.Info("system database not backed up, skipping", zap.String("database", sysDB))
return
}
db, ok := rc.getDatabaseByName(sysDB)
if !ok {
// Or should we create the database here?
log.Warn("target database not exist, aborting", zap.String("database", sysDB))
return
}

tablesRestored := make([]string, 0, len(originDatabase.Tables))
for _, table := range originDatabase.Tables {
tableName := table.Info.Name
if f.MatchTable(sysDB, tableName.O) {
if err := rc.replaceTemporaryTableToSystable(ctx, tableName.L, db); err != nil {
logutil.WarnTerm("error during merging temporary tables into system tables",
logutil.ShortError(err),
zap.Stringer("table", tableName),
)
}
}
tablesRestored = append(tablesRestored, tableName.L)
}
if err := rc.afterSystemTablesReplaced(ctx, tablesRestored); err != nil {
for _, e := range multierr.Errors(err) {
logutil.WarnTerm("error during reconfigurating the system tables", zap.String("database", sysDB), logutil.ShortError(e))
}
}
}

// database is a record of a database.
// For fast querying whether a table exists and the temporary database of it.
type database struct {
ExistingTables map[string]*model.TableInfo
Name model.CIStr
TemporaryName model.CIStr
}

// getDatabaseByName make a record of a database from info schema by its name.
func (rc *Client) getDatabaseByName(name string) (*database, bool) {
infoSchema := rc.dom.InfoSchema()
schema, ok := infoSchema.SchemaByName(model.NewCIStr(name))
if !ok {
return nil, false
}
db := &database{
ExistingTables: map[string]*model.TableInfo{},
Name: model.NewCIStr(name),
TemporaryName: utils.TemporaryDBName(name),
}
for _, t := range schema.Tables {
db.ExistingTables[t.Name.L] = t
}
return db, true
}

// afterSystemTablesReplaced do some extra work for special system tables.
// e.g. after inserting to the table mysql.user, we must execute `FLUSH PRIVILEGES` to allow it take effect.
func (rc *Client) afterSystemTablesReplaced(ctx context.Context, tables []string) error {
var err error
for _, table := range tables {
switch {
case table == "user":
// We cannot execute `rc.dom.NotifyUpdatePrivilege` here, because there isn't
// sessionctx.Context provided by the glue.
// TODO: update the glue type and allow we retrive a session context from it.
err = multierr.Append(err, errors.Annotatef(berrors.ErrUnsupportedSystemTable,
"restored user info may not take effect, until you should execute `FLUSH PRIVILEGES` manually"))
}
}
return err
}

// replaceTemporaryTableToSystable replaces the temporary table to real system table.
func (rc *Client) replaceTemporaryTableToSystable(ctx context.Context, tableName string, db *database) error {
execSQL := func(sql string) error {
// SQLs here only contain table name and database name, seems it is no need to redact them.
if err := rc.db.se.Execute(ctx, sql); err != nil {
log.Warn("failed to execute SQL restore system database",
zap.String("table", tableName),
zap.Stringer("database", db.Name),
zap.String("sql", sql),
zap.Error(err),
)
return berrors.ErrUnknown.Wrap(err).GenWithStack("failed to execute %s", sql)
}
log.Info("successfully restore system database",
zap.String("table", tableName),
zap.Stringer("database", db.Name),
zap.String("sql", sql),
)
return nil
}

// The newly created tables have different table IDs with original tables,
// hence the old statistics are invalid.
//
// TODO:
// 1 ) Rewrite the table IDs via `UPDATE _temporary_mysql.stats_xxx SET table_id = new_table_id WHERE table_id = old_table_id`
// BEFORE replacing into and then execute `rc.statsHandler.Update(rc.dom.InfoSchema())`.
// 1.5 ) (Optional) The UPDATE statement sometimes costs, the whole system tables restore step can be place into the restore pipeline.
// 2 ) Deprecate the origin interface for backing up statistics.
if isStatsTable(tableName) {
return berrors.ErrUnsupportedSystemTable.GenWithStack("restoring stats via `mysql` schema isn't support yet: " +
"the table ID is out-of-date and may corrupt existing statistics")
}

if db.ExistingTables[tableName] != nil {
log.Info("table existing, using replace into for restore",
zap.String("table", tableName),
zap.Stringer("schema", db.Name))
replaceIntoSQL := fmt.Sprintf("REPLACE INTO %s SELECT * FROM %s;",
utils.EncloseDBAndTable(db.Name.L, tableName),
utils.EncloseDBAndTable(db.TemporaryName.L, tableName))
return execSQL(replaceIntoSQL)
}

renameSQL := fmt.Sprintf("RENAME TABLE %s TO %s;",
utils.EncloseDBAndTable(db.TemporaryName.L, tableName),
utils.EncloseDBAndTable(db.Name.L, tableName),
)
return execSQL(renameSQL)
}

func (rc *Client) cleanTemporaryDatabase(ctx context.Context, originDB string) {
database := utils.TemporaryDBName(originDB)
log.Debug("dropping temporary database", zap.Stringer("database", database))
sql := fmt.Sprintf("DROP DATABASE IF EXISTS %s", utils.EncloseName(database.L))
if err := rc.db.se.Execute(ctx, sql); err != nil {
logutil.WarnTerm("failed to drop temporary database, it should be dropped manually",
zap.Stringer("database", database),
logutil.ShortError(err),
)
}
}
4 changes: 2 additions & 2 deletions pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ func DefineTableFlags(command *cobra.Command) {
}

// DefineFilterFlags defines the --filter and --case-sensitive flags for `full` subcommand.
func DefineFilterFlags(command *cobra.Command) {
func DefineFilterFlags(command *cobra.Command, defaultFilter []string) {
flags := command.Flags()
flags.StringArrayP(flagFilter, "f", []string{"*.*"}, "select tables to process")
flags.StringArrayP(flagFilter, "f", defaultFilter, "select tables to process")
flags.Bool(flagCaseSensitive, false, "whether the table names used in --filter should be case-sensitive")
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
return errors.Trace(err)
}

// The cost of rename user table / replace into system table wouldn't be so high.
// So leave it out of the pipeline for easier implementation.
client.RestoreSystemSchemas(ctx, cfg.TableFilter)

// Set task summary to success status.
summary.SetSuccessStatus(true)
return nil
Expand Down
18 changes: 18 additions & 0 deletions pkg/utils/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ package utils

import (
"encoding/json"
"fmt"
"strings"

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/tablecodec"

Expand Down Expand Up @@ -155,3 +157,19 @@ func ArchiveSize(meta *backuppb.BackupMeta) uint64 {
func EncloseName(name string) string {
return "`" + strings.ReplaceAll(name, "`", "``") + "`"
}

// EncloseDBAndTable formats the database and table name in sql.
func EncloseDBAndTable(database, table string) string {
return fmt.Sprintf("%s.%s", EncloseName(database), EncloseName(table))
}

// IsSysDB tests whether the database is system DB.
// Currently, the only system DB is mysql.
func IsSysDB(dbLowerName string) bool {
return dbLowerName == mysql.SystemDB
}

// TemporaryDBName makes a 'private' database name.
func TemporaryDBName(db string) model.CIStr {
return model.NewCIStr("__TiDB_BR_Temporary_" + db)
}
2 changes: 1 addition & 1 deletion tests/br_full_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ cluster_index_before_backup=$(run_sql "show variables like '%cluster%';" | awk '
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 --log-file $LOG --ignore-stats=false || cat $LOG
checksum_count=$(cat $LOG | grep "checksum success" | wc -l | xargs)

if [ "${checksum_count}" != "1" ];then
if [ "${checksum_count}" -lt "1" ];then
echo "TEST: [$TEST_NAME] fail on fast checksum"
echo $(cat $LOG | grep checksum)
exit 1
Expand Down
4 changes: 2 additions & 2 deletions tests/br_full_index/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ BR_LOG_TO_TERM=1

checksum_count=$(cat $LOG | grep "checksum success" | wc -l | xargs)

if [ "${checksum_count}" != "$DB_COUNT" ];then
echo "TEST: [$TEST_NAME] fail on fast checksum"
if [ "${checksum_count}" -lt "$DB_COUNT" ];then
echo "TEST: [$TEST_NAME] fail on fast checksum: required $DB_COUNT databases checked, but only ${checksum_count} dbs checked"
echo $(cat $LOG | grep checksum)
exit 1
fi
Expand Down
Loading

0 comments on commit bfeea3f

Please sign in to comment.