diff --git a/cmd/debug.go b/cmd/debug.go index aa3e271b3..35f6e502d 100644 --- a/cmd/debug.go +++ b/cmd/debug.go @@ -10,7 +10,6 @@ import ( "encoding/json" "path" "reflect" - "strings" "github.com/gogo/protobuf/proto" "github.com/pingcap/errors" @@ -347,7 +346,7 @@ func encodeBackupMetaCommand() *cobra.Command { func setPDConfigCommand() *cobra.Command { pdConfigCmd := &cobra.Command{ Use: "reset-pd-config-as-default", - Short: "reset pd scheduler and config adjusted by BR to default value", + Short: "reset pd config adjusted by BR to default value", RunE: func(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithCancel(GetDefaultContext()) defer cancel() @@ -363,17 +362,6 @@ func setPDConfigCommand() *cobra.Command { } defer mgr.Close() - for scheduler := range pdutil.Schedulers { - if strings.HasPrefix(scheduler, "balance") { - err := mgr.AddScheduler(ctx, scheduler) - if err != nil { - return err - } - log.Info("add pd schedulers succeed", - zap.String("schedulers", scheduler)) - } - } - if err := mgr.UpdatePDScheduleConfig(ctx, pdutil.DefaultPDCfg); err != nil { return errors.Annotate(err, "fail to update PD merge config") } diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index eafeb1cf3..2b921cd16 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -34,6 +34,7 @@ const ( schedulerPrefix = "pd/api/v1/schedulers" maxMsgSize = int(128 * utils.MB) // pd.ScanRegion may return a large response scheduleConfigPrefix = "pd/api/v1/config/schedule" + pauseTimeout = 5 * time.Minute ) // clusterConfig represents a set of scheduler whose config have been modified @@ -45,6 +46,10 @@ type clusterConfig struct { scheduleCfg map[string]interface{} } +type pauseSchedulerBody struct { + Delay int64 `json:"delay"` +} + var ( // Schedulers represent region/leader schedulers which can impact on performance. Schedulers = map[string]struct{}{ @@ -115,6 +120,9 @@ type PdController struct { addrs []string cli *http.Client pdClient pd.Client + + // control the pause schedulers goroutine + schedulerPauseCh chan struct{} } // NewPdController creates a new PdController. @@ -167,9 +175,10 @@ func NewPdController( } return &PdController{ - addrs: processedAddrs, - cli: cli, - pdClient: pdClient, + addrs: processedAddrs, + cli: cli, + pdClient: pdClient, + schedulerPauseCh: make(chan struct{}), }, nil } @@ -242,38 +251,101 @@ func (p *PdController) getRegionCountWith( return 0, err } -// RemoveScheduler remove pd scheduler. -func (p *PdController) RemoveScheduler(ctx context.Context, scheduler string) error { - return p.removeSchedulerWith(ctx, scheduler, pdRequest) +// PauseSchedulers remove pd scheduler temporarily. +func (p *PdController) PauseSchedulers(ctx context.Context, schedulers []string) ([]string, error) { + return p.pauseSchedulersWith(ctx, schedulers, pdRequest) } -func (p *PdController) removeSchedulerWith(ctx context.Context, scheduler string, delete pdHTTPRequest) (err error) { - for _, addr := range p.addrs { +func (p *PdController) pauseSchedulersWith(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) { + removedSchedulers := make([]string, 0, len(schedulers)) + // pause this scheduler with 300 seconds + body, err := json.Marshal(pauseSchedulerBody{Delay: int64(pauseTimeout)}) + if err != nil { + return nil, err + } + + // first pause this scheduler, if the first time failed. we should return the error + // so put first time out of for loop. and in for loop we could ignore other failed pause. + for _, scheduler := range schedulers { prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler) - _, err = delete(ctx, addr, prefix, p.cli, http.MethodDelete, nil) + for _, addr := range p.addrs { + _, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body)) + if err == nil { + removedSchedulers = append(removedSchedulers, scheduler) + break + } + } if err != nil { - continue + log.Error("failed to pause scheduler at beginning", + zap.Strings("name", schedulers), zap.Error(err)) + return nil, err } - return nil } - return err + log.Info("pause scheduler successful at beginning", zap.Strings("name", schedulers)) + + go func() { + tick := time.NewTicker(pauseTimeout / 3) + defer tick.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + for _, scheduler := range schedulers { + prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler) + for _, addr := range p.addrs { + _, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body)) + if err == nil { + break + } + } + if err == nil { + log.Info("pause scheduler", zap.String("name", scheduler)) + } else { + log.Warn("pause scheduler failed, ignore it and wait next time pause", zap.Error(err)) + } + } + case <-p.schedulerPauseCh: + log.Info("exit pause scheduler successful") + return + } + } + }() + return removedSchedulers, nil } -// AddScheduler add pd scheduler. -func (p *PdController) AddScheduler(ctx context.Context, scheduler string) error { - return p.addSchedulerWith(ctx, scheduler, pdRequest) +// ResumeSchedulers resume pd scheduler. +func (p *PdController) ResumeSchedulers(ctx context.Context, schedulers []string) error { + return p.resumeSchedulerWith(ctx, schedulers, pdRequest) } -func (p *PdController) addSchedulerWith(ctx context.Context, scheduler string, post pdHTTPRequest) (err error) { - for _, addr := range p.addrs { - body := bytes.NewBuffer([]byte(`{"name":"` + scheduler + `"}`)) - _, err = post(ctx, addr, schedulerPrefix, p.cli, http.MethodPost, body) +func (p *PdController) resumeSchedulerWith(ctx context.Context, schedulers []string, post pdHTTPRequest) (err error) { + log.Info("resume scheduler", zap.Strings("schedulers", schedulers)) + p.schedulerPauseCh <- struct{}{} + + // 0 means stop pause. + body, err := json.Marshal(pauseSchedulerBody{Delay: 0}) + if err != nil { + return err + } + for _, scheduler := range schedulers { + prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler) + for _, addr := range p.addrs { + _, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body)) + if err == nil { + break + } + } if err != nil { - continue + log.Error("failed to resume scheduler after retry, you may reset this scheduler manually"+ + "or just wait this scheduler pause timeout", zap.String("scheduler", scheduler)) + } else { + log.Info("resume scheduler successful", zap.String("scheduler", scheduler)) } - return nil } - return err + // no need to return error, because the pause will timeout. + return nil } // ListSchedulers list all pd scheduler. @@ -340,18 +412,8 @@ func (p *PdController) UpdatePDScheduleConfig( return errors.Annotate(berrors.ErrPDUpdateFailed, "failed to update PD schedule config") } -func addPDLeaderScheduler(ctx context.Context, pd *PdController, removedSchedulers []string) error { - for _, scheduler := range removedSchedulers { - err := pd.AddScheduler(ctx, scheduler) - if err != nil { - return err - } - } - return nil -} - func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg clusterConfig) error { - if err := addPDLeaderScheduler(ctx, pd, clusterCfg.scheduler); err != nil { + if err := pd.ResumeSchedulers(ctx, clusterCfg.scheduler); err != nil { return errors.Annotate(err, "fail to add PD schedulers") } mergeCfg := make(map[string]interface{}) @@ -404,12 +466,11 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun needRemoveSchedulers = append(needRemoveSchedulers, s) } } - scheduler, err := removePDLeaderScheduler(ctx, p, needRemoveSchedulers) + removedSchedulers, err := p.PauseSchedulers(ctx, needRemoveSchedulers) if err != nil { return } - - undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: scheduler}) + undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers}) stores, err := p.pdClient.GetAllStores(ctx) if err != nil { @@ -420,7 +481,7 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun return } - undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: scheduler, scheduleCfg: scheduleCfg}) + undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers, scheduleCfg: scheduleCfg}) disableMergeCfg := make(map[string]interface{}) for _, cfgKey := range pdRegionMergeCfg { @@ -457,16 +518,5 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun // Close close the connection to pd. func (p *PdController) Close() { p.pdClient.Close() -} - -func removePDLeaderScheduler(ctx context.Context, pd *PdController, existSchedulers []string) ([]string, error) { - removedSchedulers := make([]string, 0, len(existSchedulers)) - for _, scheduler := range existSchedulers { - err := pd.RemoveScheduler(ctx, scheduler) - if err != nil { - return nil, err - } - removedSchedulers = append(removedSchedulers, scheduler) - } - return removedSchedulers, nil + close(p.schedulerPauseCh) } diff --git a/pkg/pdutil/pd_test.go b/pkg/pdutil/pd_test.go index be442bbbc..bbfb59e9c 100644 --- a/pkg/pdutil/pd_test.go +++ b/pkg/pdutil/pd_test.go @@ -36,12 +36,16 @@ func (s *testPDControllerSuite) TestScheduler(c *C) { mock := func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) { return nil, errors.New("failed") } - pdController := &PdController{addrs: []string{"", ""}} - err := pdController.removeSchedulerWith(ctx, scheduler, mock) + schedulerPauseCh := make(chan struct{}) + pdController := &PdController{addrs: []string{"", ""}, schedulerPauseCh: schedulerPauseCh} + _, err := pdController.pauseSchedulersWith(ctx, []string{scheduler}, mock) c.Assert(err, ErrorMatches, "failed") - err = pdController.addSchedulerWith(ctx, scheduler, mock) - c.Assert(err, ErrorMatches, "failed") + go func() { + <-schedulerPauseCh + }() + err = pdController.resumeSchedulerWith(ctx, []string{scheduler}, mock) + c.Assert(err, IsNil) _, err = pdController.listSchedulersWith(ctx, mock) c.Assert(err, ErrorMatches, "failed") @@ -49,10 +53,13 @@ func (s *testPDControllerSuite) TestScheduler(c *C) { mock = func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) { return []byte(`["` + scheduler + `"]`), nil } - err = pdController.removeSchedulerWith(ctx, scheduler, mock) + _, err = pdController.pauseSchedulersWith(ctx, []string{scheduler}, mock) c.Assert(err, IsNil) - err = pdController.addSchedulerWith(ctx, scheduler, mock) + go func() { + <-schedulerPauseCh + }() + err = pdController.resumeSchedulerWith(ctx, []string{scheduler}, mock) c.Assert(err, IsNil) schedulers, err := pdController.listSchedulersWith(ctx, mock) diff --git a/tests/br_other/run.sh b/tests/br_other/run.sh index 5b252239e..e37088884 100644 --- a/tests/br_other/run.sh +++ b/tests/br_other/run.sh @@ -78,7 +78,7 @@ sleep 1 curl "http://localhost:$PPROF_PORT/debug/pprof/trace?seconds=1" 2>&1 > /dev/null echo "pprof started..." -curl http://$PD_ADDR/pd/api/v1/config/schedule | grep '"disable": true' +curl http://$PD_ADDR/pd/api/v1/config/schedule | grep '"disable": false' backup_fail=0 echo "another backup start expect to fail due to last backup add a lockfile" @@ -88,6 +88,13 @@ if [ "$backup_fail" -ne "1" ];then exit 1 fi +# check is there still exists scheduler not in pause. +pause_schedulers=$(curl http://$PD_ADDR/pd/api/v1/schedulers?status="paused" | grep "scheduler" | wc -l) +if [ "$pause_schedulers" -ne "3" ];then + echo "TEST: [$TEST_NAME] failed because paused scheduler are not enough" + exit 1 +fi + if ps -p $_pid > /dev/null then echo "$_pid is running" @@ -101,22 +108,21 @@ fi # make sure we won't stuck in non-scheduler state, even we send a SIGTERM to it. # give enough time to BR so it can gracefully stop. sleep 5 -if curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '[."schedulers-v2"][0][0]' | grep -q '"disable": false' +if curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '[."schedulers-v2"][0][0]' | grep -q '"disable": true' then - echo "TEST: [$TEST_NAME] failed because scheduler has not been removed" + echo "TEST: [$TEST_NAME] failed because scheduler has been removed" exit 1 fi pd_settings=5 -# we need reset pd scheduler/config to default -# until pd has the solution to temporary set these scheduler/configs. -run_br validate reset-pd-config-as-default -# max-merge-region-size set to default 20 -curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-size"' | grep "20" || ((pd_settings--)) +# check is there still exists scheduler in pause. +pause_schedulers=$(curl http://$PD_ADDR/pd/api/v1/schedulers?status="paused" | grep "scheduler" | wc -l) +if [ "$pause_schedulers" -ne "3" ];then + echo "TEST: [$TEST_NAME] failed because paused scheduler has changed" + exit 1 +fi -# max-merge-region-keys set to default 200000 -curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-keys"' | grep "200000" || ((pd_settings--)) # balance-region scheduler enabled curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."schedulers-v2"[] | {disable: .disable, type: ."type" | select (.=="balance-region")}' | grep '"disable": false' || ((pd_settings--)) # balance-leader scheduler enabled @@ -124,6 +130,16 @@ curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."schedulers-v2"[] | {disab # hot region scheduler enabled curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."schedulers-v2"[] | {disable: .disable, type: ."type" | select (.=="hot-region")}' | grep '"disable": false' || ((pd_settings--)) +# we need reset pd config to default +# until pd has the solution to temporary set these scheduler/configs. +run_br validate reset-pd-config-as-default --pd $PD_ADDR + +# max-merge-region-size set to default 20 +curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-size"' | grep "20" || ((pd_settings--)) + +# max-merge-region-keys set to default 200000 +curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-keys"' | grep "200000" || ((pd_settings--)) + if [ "$pd_settings" -ne "5" ];then echo "TEST: [$TEST_NAME] test validate reset pd config failed!" exit 1