Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

restore: adjust PD config to speed up restore #198

Merged
merged 19 commits into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/config/pd.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
[schedule]
# Disbale Region Merge
max-merge-region-size = 0
max-merge-region-key = 0
max-merge-region-keys = 0
merge-schedule-limit = 0

max-snapshot-count = 10
Expand Down
42 changes: 42 additions & 0 deletions pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
regionCountPrefix = "pd/api/v1/stats/region"
schdulerPrefix = "pd/api/v1/schedulers"
maxMsgSize = int(128 * utils.MB) // pd.ScanRegion may return a large response
scheduleConfigPrefix = "pd/api/v1/config/schedule"
)

// Mgr manages connections to a TiDB cluster.
Expand Down Expand Up @@ -433,6 +434,47 @@ func (mgr *Mgr) listSchedulersWith(ctx context.Context, get pdHTTPRequest) ([]st
return nil, err
}

// GetPDScheduleConfig returns PD schedule config value associated with the key.
// It returns nil if there is no such config item.
func (mgr *Mgr) GetPDScheduleConfig(
ctx context.Context,
) (map[string]interface{}, error) {
var err error
for _, addr := range mgr.pdHTTP.addrs {
v, e := pdRequest(
ctx, addr, scheduleConfigPrefix, mgr.pdHTTP.cli, http.MethodGet, nil)
if e != nil {
err = e
continue
}
cfg := make(map[string]interface{})
err = json.Unmarshal(v, &cfg)
if err != nil {
return nil, err
}
return cfg, nil
}
return nil, err
}

// UpdatePDScheduleConfig updates PD schedule config value associated with the key.
func (mgr *Mgr) UpdatePDScheduleConfig(
ctx context.Context, cfg map[string]interface{},
) error {
for _, addr := range mgr.pdHTTP.addrs {
reqData, err := json.Marshal(cfg)
if err != nil {
return err
}
_, e := pdRequest(ctx, addr, scheduleConfigPrefix,
mgr.pdHTTP.cli, http.MethodPost, bytes.NewBuffer(reqData))
if e == nil {
return nil
}
}
return errors.New("update PD schedule config failed")
}

// Close closes all client in Mgr.
func (mgr *Mgr) Close() {
mgr.grpcClis.mu.Lock()
Expand Down
184 changes: 143 additions & 41 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package task

import (
"context"
"math"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/backup"
Expand All @@ -26,23 +27,32 @@ import (
const (
flagOnline = "online"
flagNoSchema = "no-schema"
)

var schedulers = map[string]struct{}{
"balance-leader-scheduler": {},
"balance-hot-region-scheduler": {},
"balance-region-scheduler": {},

"shuffle-leader-scheduler": {},
"shuffle-region-scheduler": {},
"shuffle-hot-region-scheduler": {},
}

const (
defaultRestoreConcurrency = 128
maxRestoreBatchSizeLimit = 256
)

var (
schedulers = map[string]struct{}{
"balance-leader-scheduler": {},
"balance-hot-region-scheduler": {},
"balance-region-scheduler": {},

"shuffle-leader-scheduler": {},
"shuffle-region-scheduler": {},
"shuffle-hot-region-scheduler": {},
}
pdRegionMergeCfg = []string{
"max-merge-region-keys",
"max-merge-region-size",
}
pdScheduleLimitCfg = []string{
"leader-schedule-limit",
"region-schedule-limit",
"max-snapshot-count",
}
)

// RestoreConfig is the configuration specific for restore tasks.
type RestoreConfig struct {
Config
Expand Down Expand Up @@ -201,6 +211,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if err = splitPrepareWork(ctx, client, newTables); err != nil {
return err
}
defer splitPostWork(ctx, client, newTables)

ranges = restore.AttachFilesToRanges(files, ranges)

Expand All @@ -216,6 +227,16 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if err != nil {
return err
}
// Always run the post-work even on error, so we don't stuck in the import
// mode or emptied schedulers
shouldRestorePostWork := true
restorePostWork := func() {
if shouldRestorePostWork {
shouldRestorePostWork = false
restorePostWork(ctx, client, mgr, clusterCfg)
}
}
defer restorePostWork()

// Do not reset timestamp if we are doing incremental restore, because
// we are not allowed to decrease timestamp.
Expand Down Expand Up @@ -250,6 +271,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
err = restore.SplitRanges(ctx, client, rangeBatch, rewriteRules, updateCh)
if err != nil {
log.Error("split regions failed", zap.Error(err))
// If any error happened, return now, don't execute checksum.
return err
Comment on lines +274 to 275
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to restorePostWork?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's now defered.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@overvenus But this means the restorePostWork is done after checksum

}

Expand All @@ -262,28 +284,17 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
// After split, we can restore backup files.
err = client.RestoreFiles(fileBatch, rewriteRules, rejectStoreMap, updateCh)
if err != nil {
break
// If any error happened, return now, don't execute checksum.
return err
}
}

// Always run the post-work even on error, so we don't stuck in the import
// mode or emptied schedulers
if errRestorePostWork := restorePostWork(ctx, client, mgr, clusterCfg); err == nil {
err = errRestorePostWork
}

if errSplitPostWork := splitPostWork(ctx, client, newTables); err == nil {
err = errSplitPostWork
}

// If any error happened, return now, don't execute checksum.
if err != nil {
return err
}

// Restore has finished.
updateCh.Close()

// Restore TiKV/PD config before validating checksum.
restorePostWork()

// Checksum
if cfg.Checksum {
updateCh = g.StartProgress(
Expand Down Expand Up @@ -328,27 +339,89 @@ func filterRestoreFiles(
return
}

type clusterConfig struct {
// Enable PD schedulers before restore
scheduler []string
// Original scheudle configuration
scheduleCfg map[string]interface{}
}

// restorePreWork executes some prepare work before restore
func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) ([]string, error) {
func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) (clusterConfig, error) {
if client.IsOnline() {
return nil, nil
return clusterConfig{}, nil
}

// Switch TiKV cluster to import mode (adjust rocksdb configuration).
if err := client.SwitchToImportMode(ctx); err != nil {
return nil, err
return clusterConfig{}, nil
}

// Remove default PD scheduler that may affect restore process.
existSchedulers, err := mgr.ListSchedulers(ctx)
if err != nil {
return nil, errors.Trace(err)
return clusterConfig{}, nil
}
needRemoveSchedulers := make([]string, 0, len(existSchedulers))
for _, s := range existSchedulers {
if _, ok := schedulers[s]; ok {
needRemoveSchedulers = append(needRemoveSchedulers, s)
}
}
return removePDLeaderScheduler(ctx, mgr, needRemoveSchedulers)
scheduler, err := removePDLeaderScheduler(ctx, mgr, needRemoveSchedulers)
if err != nil {
return clusterConfig{}, nil
}

stores, err := mgr.GetPDClient().GetAllStores(ctx)
if err != nil {
return clusterConfig{}, err
}

scheduleCfg, err := mgr.GetPDScheduleConfig(ctx)
if err != nil {
return clusterConfig{}, err
}

disableMergeCfg := make(map[string]interface{})
for _, cfgKey := range pdRegionMergeCfg {
value := scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
}
// Disable region merge by setting config to 0.
disableMergeCfg[cfgKey] = 0
}
err = mgr.UpdatePDScheduleConfig(ctx, disableMergeCfg)
if err != nil {
return clusterConfig{}, err
}

scheduleLimitCfg := make(map[string]interface{})
for _, cfgKey := range pdScheduleLimitCfg {
value := scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
}

// Speed update PD scheduler by enlarging scheduling limits.
// Multiply limits by store count but no more than 40.
// Larger limit may make cluster unstable.
limit := int(value.(float64))
scheduleLimitCfg[cfgKey] = math.Min(40, float64(limit*len(stores)))
}
err = mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg)
if err != nil {
return clusterConfig{}, err
}

cluster := clusterConfig{
scheduler: scheduler,
scheduleCfg: scheduleCfg,
}
return cluster, nil
}

func removePDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, existSchedulers []string) ([]string, error) {
Expand All @@ -364,14 +437,43 @@ func removePDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, existSchedulers
}

// restorePostWork executes some post work after restore
func restorePostWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr, removedSchedulers []string) error {
func restorePostWork(
ctx context.Context, client *restore.Client, mgr *conn.Mgr, clusterCfg clusterConfig,
) {
if client.IsOnline() {
return nil
return
}
if err := client.SwitchToNormalMode(ctx); err != nil {
return err
log.Warn("fail to switch to normal mode")
}
if err := addPDLeaderScheduler(ctx, mgr, clusterCfg.scheduler); err != nil {
log.Warn("fail to add PD schedulers")
}
mergeCfg := make(map[string]interface{})
for _, cfgKey := range pdRegionMergeCfg {
value := clusterCfg.scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
}
mergeCfg[cfgKey] = value
}
if err := mgr.UpdatePDScheduleConfig(ctx, mergeCfg); err != nil {
log.Warn("fail to update PD region merge config")
}

scheduleLimitCfg := make(map[string]interface{})
for _, cfgKey := range pdScheduleLimitCfg {
value := clusterCfg.scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
}
scheduleLimitCfg[cfgKey] = value
}
if err := mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil {
log.Warn("fail to update PD schedule config")
}
return addPDLeaderScheduler(ctx, mgr, removedSchedulers)
}

func addPDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, removedSchedulers []string) error {
Expand Down Expand Up @@ -399,17 +501,17 @@ func splitPrepareWork(ctx context.Context, client *restore.Client, tables []*mod
return nil
}

func splitPostWork(ctx context.Context, client *restore.Client, tables []*model.TableInfo) error {
func splitPostWork(ctx context.Context, client *restore.Client, tables []*model.TableInfo) {
err := client.ResetPlacementRules(ctx, tables)
if err != nil {
return errors.Trace(err)
log.Warn("reset placement rules failed", zap.Error(err))
return
}

err = client.ResetRestoreLabels(ctx)
if err != nil {
return errors.Trace(err)
log.Warn("reset store labels failed", zap.Error(err))
}
return nil
}

// RunRestoreTiflashReplica restores the replica of tiflash saved in the last restore.
Expand Down
7 changes: 1 addition & 6 deletions pkg/task/restore_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR
if err != nil {
return errors.Trace(err)
}
defer func() {
errPostWork := restorePostWork(ctx, client, mgr, removedSchedulers)
if err == nil {
err = errPostWork
}
}()
defer restorePostWork(ctx, client, mgr, removedSchedulers)

err = client.RestoreRaw(cfg.StartKey, cfg.EndKey, files, updateCh)
if err != nil {
Expand Down