This repository has been archived by the owner on Jul 24, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 101
restore: adjust PD config to speed up restore #198
Merged
Merged
Changes from 16 commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
76855df
restore: adjust PD config to speed up restore
overvenus 7621282
Merge branch 'master' into tweak-pd
kennytm 3abff09
address comments
overvenus 9da22f6
Merge branch 'tweak-pd' of https://github.com/overvenus/br-1 into twe…
overvenus 92a74d9
tests: disable TLS test
overvenus ded4124
Merge branch 'master' into tweak-pd
overvenus 08a69c3
Merge branch 'master' into tweak-pd
overvenus a46efee
Merge branch 'master' into tweak-pd
overvenus 483f51c
Update pkg/task/restore.go
overvenus 5bac31b
Merge branch 'master' into tweak-pd
overvenus ca53aa7
restore run post work after restore files success
overvenus 87c39d5
Merge branch 'tweak-pd' into tweak-pd
overvenus 2f5632a
Merge branch 'master' into tweak-pd
overvenus c4d970b
Merge branch 'master' into tweak-pd
overvenus bcd0964
restore TiKV/PD config before validating checksum
overvenus ed8d5fb
Merge branch 'master' into tweak-pd
overvenus f494123
defer splitPostWork
overvenus 56f5232
Update pkg/task/restore.go
overvenus d505e89
task: remove sync
overvenus File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,8 @@ package task | |
|
||
import ( | ||
"context" | ||
"math" | ||
"sync" | ||
|
||
"github.com/pingcap/errors" | ||
"github.com/pingcap/kvproto/pkg/backup" | ||
|
@@ -26,23 +28,32 @@ import ( | |
const ( | ||
flagOnline = "online" | ||
flagNoSchema = "no-schema" | ||
) | ||
|
||
var schedulers = map[string]struct{}{ | ||
"balance-leader-scheduler": {}, | ||
"balance-hot-region-scheduler": {}, | ||
"balance-region-scheduler": {}, | ||
|
||
"shuffle-leader-scheduler": {}, | ||
"shuffle-region-scheduler": {}, | ||
"shuffle-hot-region-scheduler": {}, | ||
} | ||
|
||
const ( | ||
defaultRestoreConcurrency = 128 | ||
maxRestoreBatchSizeLimit = 256 | ||
) | ||
|
||
var ( | ||
schedulers = map[string]struct{}{ | ||
"balance-leader-scheduler": {}, | ||
"balance-hot-region-scheduler": {}, | ||
"balance-region-scheduler": {}, | ||
|
||
"shuffle-leader-scheduler": {}, | ||
"shuffle-region-scheduler": {}, | ||
"shuffle-hot-region-scheduler": {}, | ||
} | ||
pdRegionMergeCfg = []string{ | ||
"max-merge-region-keys", | ||
"max-merge-region-size", | ||
} | ||
pdScheduleLimitCfg = []string{ | ||
"leader-schedule-limit", | ||
"region-schedule-limit", | ||
"max-snapshot-count", | ||
} | ||
) | ||
|
||
// RestoreConfig is the configuration specific for restore tasks. | ||
type RestoreConfig struct { | ||
Config | ||
|
@@ -201,6 +212,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf | |
if err = splitPrepareWork(ctx, client, newTables); err != nil { | ||
return err | ||
} | ||
splitPostWork(ctx, client, newTables) | ||
|
||
ranges = restore.AttachFilesToRanges(files, ranges) | ||
|
||
|
@@ -216,6 +228,15 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf | |
if err != nil { | ||
return err | ||
} | ||
// Always run the post-work even on error, so we don't stuck in the import | ||
// mode or emptied schedulers | ||
restorePostWorkOnce := sync.Once{} | ||
restorePostWork := func() { | ||
restorePostWorkOnce.Do(func() { | ||
restorePostWork(ctx, client, mgr, clusterCfg) | ||
}) | ||
} | ||
overvenus marked this conversation as resolved.
Show resolved
Hide resolved
|
||
defer restorePostWork() | ||
|
||
// Do not reset timestamp if we are doing incremental restore, because | ||
// we are not allowed to decrease timestamp. | ||
|
@@ -250,6 +271,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf | |
err = restore.SplitRanges(ctx, client, rangeBatch, rewriteRules, updateCh) | ||
if err != nil { | ||
log.Error("split regions failed", zap.Error(err)) | ||
// If any error happened, return now, don't execute checksum. | ||
return err | ||
Comment on lines
+274
to
275
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's now There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @overvenus But this means the |
||
} | ||
|
||
|
@@ -262,28 +284,17 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf | |
// After split, we can restore backup files. | ||
err = client.RestoreFiles(fileBatch, rewriteRules, rejectStoreMap, updateCh) | ||
if err != nil { | ||
break | ||
// If any error happened, return now, don't execute checksum. | ||
return err | ||
} | ||
} | ||
|
||
// Always run the post-work even on error, so we don't stuck in the import | ||
// mode or emptied schedulers | ||
if errRestorePostWork := restorePostWork(ctx, client, mgr, clusterCfg); err == nil { | ||
err = errRestorePostWork | ||
} | ||
|
||
if errSplitPostWork := splitPostWork(ctx, client, newTables); err == nil { | ||
err = errSplitPostWork | ||
} | ||
|
||
// If any error happened, return now, don't execute checksum. | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Restore has finished. | ||
updateCh.Close() | ||
|
||
// Restore TiKV/PD config before validating checksum. | ||
restorePostWork() | ||
|
||
// Checksum | ||
if cfg.Checksum { | ||
updateCh = g.StartProgress( | ||
|
@@ -328,27 +339,89 @@ func filterRestoreFiles( | |
return | ||
} | ||
|
||
type clusterConfig struct { | ||
// Enable PD schedulers before restore | ||
scheduler []string | ||
// Original scheudle configuration | ||
scheduleCfg map[string]interface{} | ||
} | ||
|
||
// restorePreWork executes some prepare work before restore | ||
func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) ([]string, error) { | ||
func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) (clusterConfig, error) { | ||
if client.IsOnline() { | ||
return nil, nil | ||
return clusterConfig{}, nil | ||
} | ||
|
||
// Switch TiKV cluster to import mode (adjust rocksdb configuration). | ||
if err := client.SwitchToImportMode(ctx); err != nil { | ||
return nil, err | ||
return clusterConfig{}, nil | ||
} | ||
|
||
// Remove default PD scheduler that may affect restore process. | ||
existSchedulers, err := mgr.ListSchedulers(ctx) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
return clusterConfig{}, nil | ||
} | ||
needRemoveSchedulers := make([]string, 0, len(existSchedulers)) | ||
for _, s := range existSchedulers { | ||
if _, ok := schedulers[s]; ok { | ||
needRemoveSchedulers = append(needRemoveSchedulers, s) | ||
} | ||
} | ||
return removePDLeaderScheduler(ctx, mgr, needRemoveSchedulers) | ||
scheduler, err := removePDLeaderScheduler(ctx, mgr, needRemoveSchedulers) | ||
if err != nil { | ||
return clusterConfig{}, nil | ||
} | ||
|
||
stores, err := mgr.GetPDClient().GetAllStores(ctx) | ||
if err != nil { | ||
return clusterConfig{}, err | ||
} | ||
|
||
scheduleCfg, err := mgr.GetPDScheduleConfig(ctx) | ||
if err != nil { | ||
return clusterConfig{}, err | ||
} | ||
|
||
disableMergeCfg := make(map[string]interface{}) | ||
for _, cfgKey := range pdRegionMergeCfg { | ||
value := scheduleCfg[cfgKey] | ||
if value == nil { | ||
// Ignore non-exist config. | ||
continue | ||
} | ||
// Disable region merge by setting config to 0. | ||
disableMergeCfg[cfgKey] = 0 | ||
} | ||
err = mgr.UpdatePDScheduleConfig(ctx, disableMergeCfg) | ||
if err != nil { | ||
return clusterConfig{}, err | ||
} | ||
|
||
scheduleLimitCfg := make(map[string]interface{}) | ||
for _, cfgKey := range pdScheduleLimitCfg { | ||
value := scheduleCfg[cfgKey] | ||
if value == nil { | ||
// Ignore non-exist config. | ||
continue | ||
} | ||
|
||
// Speed update PD scheduler by enlarging scheduling limits. | ||
// Multiply limits by store count but no more than 40. | ||
// Larger limit may make cluster unstable. | ||
limit := int(value.(float64)) | ||
scheduleLimitCfg[cfgKey] = math.Min(40, float64(limit*len(stores))) | ||
} | ||
err = mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg) | ||
if err != nil { | ||
return clusterConfig{}, err | ||
} | ||
|
||
cluster := clusterConfig{ | ||
scheduler: scheduler, | ||
scheduleCfg: scheduleCfg, | ||
} | ||
return cluster, nil | ||
} | ||
|
||
func removePDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, existSchedulers []string) ([]string, error) { | ||
|
@@ -364,14 +437,43 @@ func removePDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, existSchedulers | |
} | ||
|
||
// restorePostWork executes some post work after restore | ||
func restorePostWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr, removedSchedulers []string) error { | ||
func restorePostWork( | ||
ctx context.Context, client *restore.Client, mgr *conn.Mgr, clusterCfg clusterConfig, | ||
) { | ||
if client.IsOnline() { | ||
return nil | ||
return | ||
} | ||
if err := client.SwitchToNormalMode(ctx); err != nil { | ||
return err | ||
log.Warn("fail to switch to normal mode") | ||
} | ||
if err := addPDLeaderScheduler(ctx, mgr, clusterCfg.scheduler); err != nil { | ||
log.Warn("fail to add PD schedulers") | ||
} | ||
mergeCfg := make(map[string]interface{}) | ||
for _, cfgKey := range pdRegionMergeCfg { | ||
value := clusterCfg.scheduleCfg[cfgKey] | ||
if value == nil { | ||
// Ignore non-exist config. | ||
continue | ||
} | ||
mergeCfg[cfgKey] = value | ||
} | ||
if err := mgr.UpdatePDScheduleConfig(ctx, mergeCfg); err != nil { | ||
log.Warn("fail to update PD region merge config") | ||
} | ||
|
||
scheduleLimitCfg := make(map[string]interface{}) | ||
for _, cfgKey := range pdScheduleLimitCfg { | ||
value := clusterCfg.scheduleCfg[cfgKey] | ||
if value == nil { | ||
// Ignore non-exist config. | ||
continue | ||
} | ||
scheduleLimitCfg[cfgKey] = value | ||
} | ||
if err := mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil { | ||
log.Warn("fail to update PD schedule config") | ||
} | ||
return addPDLeaderScheduler(ctx, mgr, removedSchedulers) | ||
} | ||
|
||
func addPDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, removedSchedulers []string) error { | ||
|
@@ -399,17 +501,17 @@ func splitPrepareWork(ctx context.Context, client *restore.Client, tables []*mod | |
return nil | ||
} | ||
|
||
func splitPostWork(ctx context.Context, client *restore.Client, tables []*model.TableInfo) error { | ||
func splitPostWork(ctx context.Context, client *restore.Client, tables []*model.TableInfo) { | ||
err := client.ResetPlacementRules(ctx, tables) | ||
if err != nil { | ||
return errors.Trace(err) | ||
log.Warn("reset placement rules failed", zap.Error(err)) | ||
return | ||
} | ||
|
||
err = client.ResetRestoreLabels(ctx) | ||
if err != nil { | ||
return errors.Trace(err) | ||
log.Warn("reset store labels failed", zap.Error(err)) | ||
} | ||
return nil | ||
} | ||
|
||
// RunRestoreTiflashReplica restores the replica of tiflash saved in the last restore. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems miss defer?