Skip to content

Commit

Permalink
Merge branch 'master' into exchange-partition-GA
Browse files Browse the repository at this point in the history
  • Loading branch information
ymkzpx authored Jul 18, 2022
2 parents 913641d + 9859512 commit ae31db1
Show file tree
Hide file tree
Showing 111 changed files with 3,559 additions and 220 deletions.
1 change: 0 additions & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ build:release --config=ci
build --incompatible_strict_action_env --incompatible_enable_cc_toolchain_resolution
build:ci --remote_cache=http://172.16.4.21:8080/tidb --remote_timeout="10s"
test:ci --verbose_failures
test:ci --test_timeout=180
test:ci --test_env=GO_TEST_WRAP_TESTV=1 --test_verbose_timeout_warnings
test:ci --remote_cache=http://172.16.4.21:8080/tidb --remote_timeout="10s"
test:ci --test_env=TZ=Asia/Shanghai --test_output=errors --experimental_ui_max_stdouterr_bytes=104857600
16 changes: 8 additions & 8 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2347,8 +2347,8 @@ def go_deps():
name = "com_github_nxadm_tail",
build_file_proto_mode = "disable_global",
importpath = "github.com/nxadm/tail",
sum = "h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=",
version = "v1.4.8",
sum = "h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=",
version = "v1.4.4",
)
go_repository(
name = "com_github_oklog_ulid",
Expand Down Expand Up @@ -2510,8 +2510,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:VKMmvYhtG28j1sCCBdq4s+V9UOYqNgQ6CQviQwOgTeg=",
version = "v0.0.0-20220705090230-a5d4ffd2ba33",
sum = "h1:PAXtUVMJnyQQS8t9GzihIFmh6FBXu0JziWbIVknLniA=",
version = "v0.0.0-20220711062932-08b02befd813",
)
go_repository(
name = "com_github_pingcap_log",
Expand All @@ -2531,8 +2531,8 @@ def go_deps():
name = "com_github_pingcap_tipb",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/tipb",
sum = "h1:XaTE4ZhQbQtQZtAVzlZh/Pf6SjFfMSTe1ia2nGcl36Y=",
version = "v0.0.0-20220706024432-7be3cc83a7d5",
sum = "h1:hE1dQdnvxWCHhD0snX67paV9y6inq8TxVFbsKqjaTQk=",
version = "v0.0.0-20220714100504-7d3474676bc9",
)
go_repository(
name = "com_github_pkg_browser",
Expand Down Expand Up @@ -3012,8 +3012,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:RpH/obpgyNKkXV4Wt8PqSdcUTnqWyExPcla+qdTVgi0=",
version = "v2.0.1-0.20220711061028-1c198aab9585",
sum = "h1:OSxo1R2y6iyAAxbUPL7h1HC/17CNuwNkW2lF1JySJ7k=",
version = "v2.0.1-0.20220718080214-86d51ba7eb02",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
8 changes: 8 additions & 0 deletions br/cmd/br/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ var (
"*.*",
fmt.Sprintf("!%s.*", utils.TemporaryDBName("*")),
"!mysql.*",
"mysql.user",
"mysql.db",
"mysql.tables_priv",
"mysql.columns_priv",
"mysql.global_priv",
"mysql.global_grants",
"mysql.default_roles",
"mysql.role_edges",
"!sys.*",
"!INFORMATION_SCHEMA.*",
"!PERFORMANCE_SCHEMA.*",
Expand Down
23 changes: 23 additions & 0 deletions br/cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
package main

import (
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/gluetikv"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/task"
Expand Down Expand Up @@ -38,11 +41,31 @@ func runRestoreCommand(command *cobra.Command, cmdName string) error {
}
if err := task.RunRestore(GetDefaultContext(), tidbGlue, cmdName, &cfg); err != nil {
log.Error("failed to restore", zap.Error(err))
printWorkaroundOnFullRestoreError(command, err)
return errors.Trace(err)
}
return nil
}

// print workaround when we met not fresh or incompatible cluster error on full cluster restore
func printWorkaroundOnFullRestoreError(command *cobra.Command, err error) {
if !errors.ErrorEqual(err, berrors.ErrRestoreNotFreshCluster) &&
!errors.ErrorEqual(err, berrors.ErrRestoreIncompatibleSys) {
return
}
fmt.Println("#######################################################################")
switch {
case errors.ErrorEqual(err, berrors.ErrRestoreNotFreshCluster):
fmt.Println("# the target cluster is not fresh, br cannot restore system tables.")
case errors.ErrorEqual(err, berrors.ErrRestoreIncompatibleSys):
fmt.Println("# the target cluster is not compatible with the backup data,")
fmt.Println("# br cannot restore system tables.")
}
fmt.Println("# you can use the following command to skip restoring system tables:")
fmt.Printf("# br restore %s --filter '*.*' --filter '!mysql.*' \n", command.Use)
fmt.Println("#######################################################################")
}

func runRestoreRawCommand(command *cobra.Command, cmdName string) error {
cfg := task.RestoreRawConfig{
RawKvConfig: task.RawKvConfig{Config: task.Config{LogProgress: HasLogFile()}},
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ var (
ErrRestoreInvalidRange = errors.Normalize("invalid restore range", errors.RFCCodeText("BR:Restore:ErrRestoreInvalidRange"))
ErrRestoreWriteAndIngest = errors.Normalize("failed to write and ingest", errors.RFCCodeText("BR:Restore:ErrRestoreWriteAndIngest"))
ErrRestoreSchemaNotExists = errors.Normalize("schema not exists", errors.RFCCodeText("BR:Restore:ErrRestoreSchemaNotExists"))
ErrRestoreNotFreshCluster = errors.Normalize("cluster is not fresh", errors.RFCCodeText("BR:Restore:ErrRestoreNotFreshCluster"))
ErrRestoreIncompatibleSys = errors.Normalize("incompatible system table", errors.RFCCodeText("BR:Restore:ErrRestoreIncompatibleSys"))
ErrUnsupportedSystemTable = errors.Normalize("the system table isn't supported for restoring yet", errors.RFCCodeText("BR:Restore:ErrUnsupportedSysTable"))
ErrDatabasesAlreadyExisted = errors.Normalize("databases already existed in restored cluster", errors.RFCCodeText("BR:Restore:ErrDatabasesAlreadyExisted"))

Expand Down
1 change: 1 addition & 0 deletions br/pkg/glue/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//domain",
"//kv",
"//parser/model",
"//sessionctx",
"@com_github_fatih_color//:color",
"@com_github_pingcap_log//:log",
"@com_github_tikv_pd_client//:client",
Expand Down
34 changes: 31 additions & 3 deletions br/pkg/glue/console_glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,40 @@ type ConsoleOperations struct {
ConsoleGlue
}

// StartTask prints a task start information, and mark as finished when the returned function called.
func (ops ConsoleOperations) StartTask(message string) func() {
// An extra field appending to the task.
// return type is a {key: string, value: string} tuple.
type ExtraField func() [2]string

// NOTE:
// Perhaps we'd better move these modifiers and terminal function to another package
// like `glue/termutil?`

// WithTimeCost adds the task information of time costing for `ShowTask`.
func WithTimeCost() ExtraField {
start := time.Now()
return func() [2]string {
return [2]string{"take", time.Since(start).String()}
}
}

// WithConstExtraField adds an extra field with constant values.
func WithConstExtraField(key string, value interface{}) ExtraField {
return func() [2]string {
return [2]string{key, fmt.Sprint(value)}
}
}

// ShowTask prints a task start information, and mark as finished when the returned function called.
// This is for TUI presenting.
func (ops ConsoleOperations) ShowTask(message string, extraFields ...ExtraField) func() {
ops.Print(message)
return func() {
ops.Printf("%s; take = %s\n", color.HiGreenString("DONE"), time.Since(start))
fields := make([]string, 0, len(extraFields))
for _, fieldFunc := range extraFields {
field := fieldFunc()
fields = append(fields, fmt.Sprintf("%s = %s", field[0], color.New(color.Bold).Sprint(field[1])))
}
ops.Printf("%s; %s\n", color.HiGreenString("DONE"), strings.Join(fields, ", "))
}
}

Expand Down
31 changes: 20 additions & 11 deletions br/pkg/logutil/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,18 +282,27 @@ func (kr StringifyKeys) String() string {
if i > 0 {
sb.WriteString(", ")
}
sb.WriteString("[")
sb.WriteString(redact.Key(rng.StartKey))
sb.WriteString(", ")
var endKey string
if len(rng.EndKey) == 0 {
endKey = "inf"
} else {
endKey = redact.Key(rng.EndKey)
}
sb.WriteString(redact.String(endKey))
sb.WriteString(")")
sb.WriteString(StringifyRange(rng).String())
}
sb.WriteString("}")
return sb.String()
}

// StringifyRange is the wrapper for displaying a key range.
type StringifyRange kv.KeyRange

func (rng StringifyRange) String() string {
sb := new(strings.Builder)
sb.WriteString("[")
sb.WriteString(redact.Key(rng.StartKey))
sb.WriteString(", ")
var endKey string
if len(rng.EndKey) == 0 {
endKey = "inf"
} else {
endKey = redact.Key(rng.EndKey)
}
sb.WriteString(redact.String(endKey))
sb.WriteString(")")
return sb.String()
}
139 changes: 139 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ import (
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/config"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/pdtypes"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -147,6 +149,16 @@ type Client struct {

storage storage.ExternalStorage

// if fullClusterRestore = true:
// - if there's system tables in the backup(backup data since br 5.1.0), the cluster should be a fresh cluster
// without user database or table. and system tables about privileges is restored together with user data.
// - if there no system tables in the backup(backup data from br < 5.1.0), restore all user data just like
// previous version did.
// if fullClusterRestore = false, restore all user data just like previous version did.
// fullClusterRestore = true when there is no explicit filter setting, and it's full restore or point command
// with a full backup data.
// todo: maybe change to an enum
fullClusterRestore bool
// the query to insert rows into table `gc_delete_range`, lack of ts.
deleteRangeQuery []string
deleteRangeQueryCh chan string
Expand Down Expand Up @@ -494,6 +506,14 @@ func (rc *Client) GetDatabase(name string) *utils.Database {
return rc.databases[name]
}

// HasBackedUpSysDB whether we have backed up system tables
// br backs system tables up since 5.1.0
func (rc *Client) HasBackedUpSysDB() bool {
temporaryDB := utils.TemporaryDBName(mysql.SystemDB)
_, backedUp := rc.databases[temporaryDB.O]
return backedUp
}

// GetPlacementPolicies returns policies.
func (rc *Client) GetPlacementPolicies() (*sync.Map, error) {
policies := &sync.Map{}
Expand Down Expand Up @@ -834,6 +854,101 @@ func (rc *Client) createTablesInWorkerPool(ctx context.Context, dom *domain.Doma
return eg.Wait()
}

// CheckTargetClusterFresh check whether the target cluster is fresh or not
// if there's no user dbs or tables, we take it as a fresh cluster, although
// user may have created some users or made other changes.
func (rc *Client) CheckTargetClusterFresh(ctx context.Context) error {
log.Info("checking whether target cluster is fresh")
userDBs := GetExistedUserDBs(rc.dom)
if len(userDBs) == 0 {
return nil
}

const maxPrintCount = 10
userTableOrDBNames := make([]string, 0, maxPrintCount+1)
addName := func(name string) bool {
if len(userTableOrDBNames) == maxPrintCount {
userTableOrDBNames = append(userTableOrDBNames, "...")
return false
}
userTableOrDBNames = append(userTableOrDBNames, name)
return true
}
outer:
for _, db := range userDBs {
if !addName(db.Name.L) {
break outer
}
for _, tbl := range db.Tables {
if !addName(tbl.Name.L) {
break outer
}
}
}
log.Error("not fresh cluster", zap.Strings("user tables", userTableOrDBNames))
return errors.Annotate(berrors.ErrRestoreNotFreshCluster, "user db/tables: "+strings.Join(userTableOrDBNames, ", "))
}

func (rc *Client) CheckSysTableCompatibility(dom *domain.Domain, tables []*metautil.Table) error {
log.Info("checking target cluster system table compatibility with backed up data")
privilegeTablesInBackup := make([]*metautil.Table, 0)
for _, table := range tables {
decodedSysDBName, ok := utils.GetSysDBCIStrName(table.DB.Name)
if ok && utils.IsSysDB(decodedSysDBName.L) && sysPrivilegeTableSet[table.Info.Name.L] {
privilegeTablesInBackup = append(privilegeTablesInBackup, table)
}
}
sysDB := model.NewCIStr(mysql.SystemDB)
for _, table := range privilegeTablesInBackup {
ti, err := rc.GetTableSchema(dom, sysDB, table.Info.Name)
if err != nil {
log.Error("missing table on target cluster", zap.Stringer("table", table.Info.Name))
return errors.Annotate(berrors.ErrRestoreIncompatibleSys, "missed system table: "+table.Info.Name.O)
}
backupTi := table.Info
if len(ti.Columns) != len(backupTi.Columns) {
log.Error("column count mismatch",
zap.Stringer("table", table.Info.Name),
zap.Int("col in cluster", len(ti.Columns)),
zap.Int("col in backup", len(backupTi.Columns)))
return errors.Annotatef(berrors.ErrRestoreIncompatibleSys,
"column count mismatch, table: %s, col in cluster: %d, col in backup: %d",
table.Info.Name.O, len(ti.Columns), len(backupTi.Columns))
}
backupColMap := make(map[string]*model.ColumnInfo)
for i := range backupTi.Columns {
col := backupTi.Columns[i]
backupColMap[col.Name.L] = col
}
// order can be different but type must compatible
for i := range ti.Columns {
col := ti.Columns[i]
backupCol := backupColMap[col.Name.L]
if backupCol == nil {
log.Error("missing column in backup data",
zap.Stringer("table", table.Info.Name),
zap.String("col", fmt.Sprintf("%s %s", col.Name, col.FieldType.String())))
return errors.Annotatef(berrors.ErrRestoreIncompatibleSys,
"missing column in backup data, table: %s, col: %s %s",
table.Info.Name.O,
col.Name, col.FieldType.String())
}
if !utils.IsTypeCompatible(backupCol.FieldType, col.FieldType) {
log.Error("incompatible column",
zap.Stringer("table", table.Info.Name),
zap.String("col in cluster", fmt.Sprintf("%s %s", col.Name, col.FieldType.String())),
zap.String("col in backup", fmt.Sprintf("%s %s", backupCol.Name, backupCol.FieldType.String())))
return errors.Annotatef(berrors.ErrRestoreIncompatibleSys,
"incompatible column, table: %s, col in cluster: %s %s, col in backup: %s %s",
table.Info.Name.O,
col.Name, col.FieldType.String(),
backupCol.Name, backupCol.FieldType.String())
}
}
}
return nil
}

// ExecDDLs executes the queries of the ddl jobs.
func (rc *Client) ExecDDLs(ctx context.Context, ddlJobs []*model.Job) error {
// Sort the ddl jobs by schema version in ascending order.
Expand Down Expand Up @@ -1467,6 +1582,14 @@ func (rc *Client) getRuleID(tableID int64) string {
return "restore-t" + strconv.FormatInt(tableID, 10)
}

// IsFull returns whether this backup is full.
func (rc *Client) IsFull() bool {
failpoint.Inject("mock-incr-backup-data", func() {
failpoint.Return(false)
})
return !rc.IsIncremental()
}

// IsIncremental returns whether this backup is incremental.
func (rc *Client) IsIncremental() bool {
return !(rc.backupMeta.StartVersion == rc.backupMeta.EndVersion ||
Expand Down Expand Up @@ -2188,6 +2311,22 @@ func (rc *Client) SaveSchemas(
return nil
}

// InitFullClusterRestore init fullClusterRestore and set SkipGrantTable as needed
func (rc *Client) InitFullClusterRestore(explicitFilter bool) {
rc.fullClusterRestore = !explicitFilter && rc.IsFull()

log.Info("full cluster restore", zap.Bool("value", rc.fullClusterRestore))

if rc.fullClusterRestore {
// have to skip grant table, in order to NotifyUpdatePrivilege
config.GetGlobalConfig().Security.SkipGrantTable = true
}
}

func (rc *Client) IsFullClusterRestore() bool {
return rc.fullClusterRestore
}

// MockClient create a fake client used to test.
func MockClient(dbs map[string]*utils.Database) *Client {
return &Client{databases: dbs}
Expand Down
Loading

0 comments on commit ae31db1

Please sign in to comment.