Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
restore: adjust PD config to speed up restore (#198) (#268)
Browse files Browse the repository at this point in the history
* restore: adjust PD config to speed up restore

Signed-off-by: Neil Shen <overvenus@gmail.com>

* address comments

Signed-off-by: Neil Shen <overvenus@gmail.com>

* tests: disable TLS test

Signed-off-by: Neil Shen <overvenus@gmail.com>

* Update pkg/task/restore.go

Co-Authored-By: kennytm <kennytm@gmail.com>

* restore run post work after restore files success

Signed-off-by: Neil Shen <overvenus@gmail.com>

* restore TiKV/PD config before validating checksum

Signed-off-by: Neil Shen <overvenus@gmail.com>

* defer splitPostWork

Signed-off-by: Neil Shen <overvenus@gmail.com>

* Update pkg/task/restore.go

Co-authored-by: kennytm <kennytm@gmail.com>

* task: remove sync

Signed-off-by: Neil Shen <overvenus@gmail.com>

Co-authored-by: kennytm <kennytm@gmail.com>

Co-authored-by: Neil Shen <overvenus@gmail.com>
Co-authored-by: kennytm <kennytm@gmail.com>
  • Loading branch information
3 people authored May 9, 2020
1 parent fff9f74 commit a79b242
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 48 deletions.
2 changes: 1 addition & 1 deletion docker/config/pd.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
[schedule]
# Disbale Region Merge
max-merge-region-size = 0
max-merge-region-key = 0
max-merge-region-keys = 0
merge-schedule-limit = 0

max-snapshot-count = 10
Expand Down
42 changes: 42 additions & 0 deletions pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
regionCountPrefix = "pd/api/v1/stats/region"
schdulerPrefix = "pd/api/v1/schedulers"
maxMsgSize = int(128 * utils.MB) // pd.ScanRegion may return a large response
scheduleConfigPrefix = "pd/api/v1/config/schedule"
)

// Mgr manages connections to a TiDB cluster.
Expand Down Expand Up @@ -433,6 +434,47 @@ func (mgr *Mgr) listSchedulersWith(ctx context.Context, get pdHTTPRequest) ([]st
return nil, err
}

// GetPDScheduleConfig returns PD schedule config value associated with the key.
// It returns nil if there is no such config item.
func (mgr *Mgr) GetPDScheduleConfig(
ctx context.Context,
) (map[string]interface{}, error) {
var err error
for _, addr := range mgr.pdHTTP.addrs {
v, e := pdRequest(
ctx, addr, scheduleConfigPrefix, mgr.pdHTTP.cli, http.MethodGet, nil)
if e != nil {
err = e
continue
}
cfg := make(map[string]interface{})
err = json.Unmarshal(v, &cfg)
if err != nil {
return nil, err
}
return cfg, nil
}
return nil, err
}

// UpdatePDScheduleConfig updates PD schedule config value associated with the key.
func (mgr *Mgr) UpdatePDScheduleConfig(
ctx context.Context, cfg map[string]interface{},
) error {
for _, addr := range mgr.pdHTTP.addrs {
reqData, err := json.Marshal(cfg)
if err != nil {
return err
}
_, e := pdRequest(ctx, addr, scheduleConfigPrefix,
mgr.pdHTTP.cli, http.MethodPost, bytes.NewBuffer(reqData))
if e == nil {
return nil
}
}
return errors.New("update PD schedule config failed")
}

// Close closes all client in Mgr.
func (mgr *Mgr) Close() {
mgr.grpcClis.mu.Lock()
Expand Down
184 changes: 143 additions & 41 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package task

import (
"context"
"math"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/backup"
Expand All @@ -26,23 +27,32 @@ import (
const (
flagOnline = "online"
flagNoSchema = "no-schema"
)

var schedulers = map[string]struct{}{
"balance-leader-scheduler": {},
"balance-hot-region-scheduler": {},
"balance-region-scheduler": {},

"shuffle-leader-scheduler": {},
"shuffle-region-scheduler": {},
"shuffle-hot-region-scheduler": {},
}

const (
defaultRestoreConcurrency = 128
maxRestoreBatchSizeLimit = 256
)

var (
schedulers = map[string]struct{}{
"balance-leader-scheduler": {},
"balance-hot-region-scheduler": {},
"balance-region-scheduler": {},

"shuffle-leader-scheduler": {},
"shuffle-region-scheduler": {},
"shuffle-hot-region-scheduler": {},
}
pdRegionMergeCfg = []string{
"max-merge-region-keys",
"max-merge-region-size",
}
pdScheduleLimitCfg = []string{
"leader-schedule-limit",
"region-schedule-limit",
"max-snapshot-count",
}
)

// RestoreConfig is the configuration specific for restore tasks.
type RestoreConfig struct {
Config
Expand Down Expand Up @@ -200,6 +210,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if err = splitPrepareWork(ctx, client, newTables); err != nil {
return err
}
defer splitPostWork(ctx, client, newTables)

ranges = restore.AttachFilesToRanges(files, ranges)

Expand All @@ -215,6 +226,16 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if err != nil {
return err
}
// Always run the post-work even on error, so we don't stuck in the import
// mode or emptied schedulers
shouldRestorePostWork := true
restorePostWork := func() {
if shouldRestorePostWork {
shouldRestorePostWork = false
restorePostWork(ctx, client, mgr, clusterCfg)
}
}
defer restorePostWork()

// Do not reset timestamp if we are doing incremental restore, because
// we are not allowed to decrease timestamp.
Expand Down Expand Up @@ -249,6 +270,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
err = restore.SplitRanges(ctx, client, rangeBatch, rewriteRules, updateCh)
if err != nil {
log.Error("split regions failed", zap.Error(err))
// If any error happened, return now, don't execute checksum.
return err
}

Expand All @@ -261,28 +283,17 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
// After split, we can restore backup files.
err = client.RestoreFiles(fileBatch, rewriteRules, rejectStoreMap, updateCh)
if err != nil {
break
// If any error happened, return now, don't execute checksum.
return err
}
}

// Always run the post-work even on error, so we don't stuck in the import
// mode or emptied schedulers
if errRestorePostWork := restorePostWork(ctx, client, mgr, clusterCfg); err == nil {
err = errRestorePostWork
}

if errSplitPostWork := splitPostWork(ctx, client, newTables); err == nil {
err = errSplitPostWork
}

// If any error happened, return now, don't execute checksum.
if err != nil {
return err
}

// Restore has finished.
updateCh.Close()

// Restore TiKV/PD config before validating checksum.
restorePostWork()

// Checksum
if cfg.Checksum {
updateCh = g.StartProgress(
Expand Down Expand Up @@ -327,27 +338,89 @@ func filterRestoreFiles(
return
}

type clusterConfig struct {
// Enable PD schedulers before restore
scheduler []string
// Original scheudle configuration
scheduleCfg map[string]interface{}
}

// restorePreWork executes some prepare work before restore
func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) ([]string, error) {
func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) (clusterConfig, error) {
if client.IsOnline() {
return nil, nil
return clusterConfig{}, nil
}

// Switch TiKV cluster to import mode (adjust rocksdb configuration).
if err := client.SwitchToImportMode(ctx); err != nil {
return nil, err
return clusterConfig{}, nil
}

// Remove default PD scheduler that may affect restore process.
existSchedulers, err := mgr.ListSchedulers(ctx)
if err != nil {
return nil, errors.Trace(err)
return clusterConfig{}, nil
}
needRemoveSchedulers := make([]string, 0, len(existSchedulers))
for _, s := range existSchedulers {
if _, ok := schedulers[s]; ok {
needRemoveSchedulers = append(needRemoveSchedulers, s)
}
}
return removePDLeaderScheduler(ctx, mgr, needRemoveSchedulers)
scheduler, err := removePDLeaderScheduler(ctx, mgr, needRemoveSchedulers)
if err != nil {
return clusterConfig{}, nil
}

stores, err := mgr.GetPDClient().GetAllStores(ctx)
if err != nil {
return clusterConfig{}, err
}

scheduleCfg, err := mgr.GetPDScheduleConfig(ctx)
if err != nil {
return clusterConfig{}, err
}

disableMergeCfg := make(map[string]interface{})
for _, cfgKey := range pdRegionMergeCfg {
value := scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
}
// Disable region merge by setting config to 0.
disableMergeCfg[cfgKey] = 0
}
err = mgr.UpdatePDScheduleConfig(ctx, disableMergeCfg)
if err != nil {
return clusterConfig{}, err
}

scheduleLimitCfg := make(map[string]interface{})
for _, cfgKey := range pdScheduleLimitCfg {
value := scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
}

// Speed update PD scheduler by enlarging scheduling limits.
// Multiply limits by store count but no more than 40.
// Larger limit may make cluster unstable.
limit := int(value.(float64))
scheduleLimitCfg[cfgKey] = math.Min(40, float64(limit*len(stores)))
}
err = mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg)
if err != nil {
return clusterConfig{}, err
}

cluster := clusterConfig{
scheduler: scheduler,
scheduleCfg: scheduleCfg,
}
return cluster, nil
}

func removePDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, existSchedulers []string) ([]string, error) {
Expand All @@ -363,14 +436,43 @@ func removePDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, existSchedulers
}

// restorePostWork executes some post work after restore
func restorePostWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr, removedSchedulers []string) error {
func restorePostWork(
ctx context.Context, client *restore.Client, mgr *conn.Mgr, clusterCfg clusterConfig,
) {
if client.IsOnline() {
return nil
return
}
if err := client.SwitchToNormalMode(ctx); err != nil {
return err
log.Warn("fail to switch to normal mode")
}
if err := addPDLeaderScheduler(ctx, mgr, clusterCfg.scheduler); err != nil {
log.Warn("fail to add PD schedulers")
}
mergeCfg := make(map[string]interface{})
for _, cfgKey := range pdRegionMergeCfg {
value := clusterCfg.scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
}
mergeCfg[cfgKey] = value
}
if err := mgr.UpdatePDScheduleConfig(ctx, mergeCfg); err != nil {
log.Warn("fail to update PD region merge config")
}

scheduleLimitCfg := make(map[string]interface{})
for _, cfgKey := range pdScheduleLimitCfg {
value := clusterCfg.scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
}
scheduleLimitCfg[cfgKey] = value
}
if err := mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil {
log.Warn("fail to update PD schedule config")
}
return addPDLeaderScheduler(ctx, mgr, removedSchedulers)
}

func addPDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, removedSchedulers []string) error {
Expand Down Expand Up @@ -398,17 +500,17 @@ func splitPrepareWork(ctx context.Context, client *restore.Client, tables []*mod
return nil
}

func splitPostWork(ctx context.Context, client *restore.Client, tables []*model.TableInfo) error {
func splitPostWork(ctx context.Context, client *restore.Client, tables []*model.TableInfo) {
err := client.ResetPlacementRules(ctx, tables)
if err != nil {
return errors.Trace(err)
log.Warn("reset placement rules failed", zap.Error(err))
return
}

err = client.ResetRestoreLabels(ctx)
if err != nil {
return errors.Trace(err)
log.Warn("reset store labels failed", zap.Error(err))
}
return nil
}

// RunRestoreTiflashReplica restores the replica of tiflash saved in the last restore.
Expand Down
7 changes: 1 addition & 6 deletions pkg/task/restore_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR
if err != nil {
return errors.Trace(err)
}
defer func() {
errPostWork := restorePostWork(ctx, client, mgr, removedSchedulers)
if err == nil {
err = errPostWork
}
}()
defer restorePostWork(ctx, client, mgr, removedSchedulers)

err = client.RestoreRaw(cfg.StartKey, cfg.EndKey, files, updateCh)
if err != nil {
Expand Down

0 comments on commit a79b242

Please sign in to comment.