Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

scheduler: use pause instead of remove schedulers #551

Merged
merged 12 commits into from
Oct 15, 2020
14 changes: 1 addition & 13 deletions cmd/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"encoding/json"
"path"
"reflect"
"strings"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -347,7 +346,7 @@ func encodeBackupMetaCommand() *cobra.Command {
func setPDConfigCommand() *cobra.Command {
pdConfigCmd := &cobra.Command{
Use: "reset-pd-config-as-default",
Short: "reset pd scheduler and config adjusted by BR to default value",
Short: "reset pd config adjusted by BR to default value",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(GetDefaultContext())
defer cancel()
Expand All @@ -363,17 +362,6 @@ func setPDConfigCommand() *cobra.Command {
}
defer mgr.Close()

for scheduler := range pdutil.Schedulers {
if strings.HasPrefix(scheduler, "balance") {
err := mgr.AddScheduler(ctx, scheduler)
if err != nil {
return err
}
log.Info("add pd schedulers succeed",
zap.String("schedulers", scheduler))
}
}

if err := mgr.UpdatePDScheduleConfig(ctx, pdutil.DefaultPDCfg); err != nil {
return errors.Annotate(err, "fail to update PD merge config")
}
Expand Down
148 changes: 99 additions & 49 deletions pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
schedulerPrefix = "pd/api/v1/schedulers"
maxMsgSize = int(128 * utils.MB) // pd.ScanRegion may return a large response
scheduleConfigPrefix = "pd/api/v1/config/schedule"
pauseTimeout = 5 * time.Minute
)

// clusterConfig represents a set of scheduler whose config have been modified
Expand All @@ -45,6 +46,10 @@ type clusterConfig struct {
scheduleCfg map[string]interface{}
}

type pauseSchedulerBody struct {
Delay int64 `json:"delay"`
}

var (
// Schedulers represent region/leader schedulers which can impact on performance.
Schedulers = map[string]struct{}{
Expand Down Expand Up @@ -115,6 +120,9 @@ type PdController struct {
addrs []string
cli *http.Client
pdClient pd.Client

// control the pause schedulers goroutine
schedulerPauseCh chan struct{}
}

// NewPdController creates a new PdController.
Expand Down Expand Up @@ -167,9 +175,10 @@ func NewPdController(
}

return &PdController{
addrs: processedAddrs,
cli: cli,
pdClient: pdClient,
addrs: processedAddrs,
cli: cli,
pdClient: pdClient,
schedulerPauseCh: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -242,38 +251,101 @@ func (p *PdController) getRegionCountWith(
return 0, err
}

// RemoveScheduler remove pd scheduler.
func (p *PdController) RemoveScheduler(ctx context.Context, scheduler string) error {
return p.removeSchedulerWith(ctx, scheduler, pdRequest)
// PauseSchedulers remove pd scheduler temporarily.
func (p *PdController) PauseSchedulers(ctx context.Context, schedulers []string) ([]string, error) {
return p.pauseSchedulersWith(ctx, schedulers, pdRequest)
}

func (p *PdController) removeSchedulerWith(ctx context.Context, scheduler string, delete pdHTTPRequest) (err error) {
for _, addr := range p.addrs {
func (p *PdController) pauseSchedulersWith(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) {
removedSchedulers := make([]string, 0, len(schedulers))
// pause this scheduler with 300 seconds
body, err := json.Marshal(pauseSchedulerBody{Delay: int64(pauseTimeout)})
if err != nil {
return nil, err
}

// first pause this scheduler, if the first time failed. we should return the error
// so put first time out of for loop. and in for loop we could ignore other failed pause.
for _, scheduler := range schedulers {
prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler)
_, err = delete(ctx, addr, prefix, p.cli, http.MethodDelete, nil)
for _, addr := range p.addrs {
_, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body))
if err == nil {
removedSchedulers = append(removedSchedulers, scheduler)
break
}
}
if err != nil {
continue
log.Error("failed to pause scheduler at beginning",
zap.Strings("name", schedulers), zap.Error(err))
return nil, err
}
return nil
}
return err
log.Info("pause scheduler successful at beginning", zap.Strings("name", schedulers))

go func() {
tick := time.NewTicker(pauseTimeout / 3)
defer tick.Stop()

for {
select {
case <-ctx.Done():
return
case <-tick.C:
for _, scheduler := range schedulers {
prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler)
for _, addr := range p.addrs {
_, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body))
if err == nil {
break
}
}
if err == nil {
log.Info("pause scheduler", zap.String("name", scheduler))
} else {
log.Warn("pause scheduler failed, ignore it and wait next time pause", zap.Error(err))
}
}
case <-p.schedulerPauseCh:
log.Info("exit pause scheduler successful")
return
}
}
}()
return removedSchedulers, nil
}

// AddScheduler add pd scheduler.
func (p *PdController) AddScheduler(ctx context.Context, scheduler string) error {
return p.addSchedulerWith(ctx, scheduler, pdRequest)
// ResumeSchedulers resume pd scheduler.
func (p *PdController) ResumeSchedulers(ctx context.Context, schedulers []string) error {
return p.resumeSchedulerWith(ctx, schedulers, pdRequest)
}

func (p *PdController) addSchedulerWith(ctx context.Context, scheduler string, post pdHTTPRequest) (err error) {
for _, addr := range p.addrs {
body := bytes.NewBuffer([]byte(`{"name":"` + scheduler + `"}`))
_, err = post(ctx, addr, schedulerPrefix, p.cli, http.MethodPost, body)
func (p *PdController) resumeSchedulerWith(ctx context.Context, schedulers []string, post pdHTTPRequest) (err error) {
log.Info("resume scheduler", zap.Strings("schedulers", schedulers))
p.schedulerPauseCh <- struct{}{}

// 0 means stop pause.
body, err := json.Marshal(pauseSchedulerBody{Delay: 0})
if err != nil {
return err
}
for _, scheduler := range schedulers {
prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler)
for _, addr := range p.addrs {
_, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body))
if err == nil {
break
}
}
if err != nil {
continue
log.Error("failed to resume scheduler after retry, you may reset this scheduler manually"+
"or just wait this scheduler pause timeout", zap.String("scheduler", scheduler))
} else {
log.Info("resume scheduler successful", zap.String("scheduler", scheduler))
}
return nil
}
return err
// no need to return error, because the pause will timeout.
return nil
}

// ListSchedulers list all pd scheduler.
Expand Down Expand Up @@ -340,18 +412,8 @@ func (p *PdController) UpdatePDScheduleConfig(
return errors.Annotate(berrors.ErrPDUpdateFailed, "failed to update PD schedule config")
}

func addPDLeaderScheduler(ctx context.Context, pd *PdController, removedSchedulers []string) error {
for _, scheduler := range removedSchedulers {
err := pd.AddScheduler(ctx, scheduler)
if err != nil {
return err
}
}
return nil
}

func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg clusterConfig) error {
if err := addPDLeaderScheduler(ctx, pd, clusterCfg.scheduler); err != nil {
if err := pd.ResumeSchedulers(ctx, clusterCfg.scheduler); err != nil {
return errors.Annotate(err, "fail to add PD schedulers")
}
mergeCfg := make(map[string]interface{})
Expand Down Expand Up @@ -404,12 +466,11 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun
needRemoveSchedulers = append(needRemoveSchedulers, s)
}
}
scheduler, err := removePDLeaderScheduler(ctx, p, needRemoveSchedulers)
removedSchedulers, err := p.PauseSchedulers(ctx, needRemoveSchedulers)
if err != nil {
return
}

undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: scheduler})
undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers})

stores, err := p.pdClient.GetAllStores(ctx)
if err != nil {
Expand All @@ -420,7 +481,7 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun
return
}

undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: scheduler, scheduleCfg: scheduleCfg})
undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers, scheduleCfg: scheduleCfg})

disableMergeCfg := make(map[string]interface{})
for _, cfgKey := range pdRegionMergeCfg {
Expand Down Expand Up @@ -457,16 +518,5 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun
// Close close the connection to pd.
func (p *PdController) Close() {
p.pdClient.Close()
}

func removePDLeaderScheduler(ctx context.Context, pd *PdController, existSchedulers []string) ([]string, error) {
removedSchedulers := make([]string, 0, len(existSchedulers))
for _, scheduler := range existSchedulers {
err := pd.RemoveScheduler(ctx, scheduler)
if err != nil {
return nil, err
}
removedSchedulers = append(removedSchedulers, scheduler)
}
return removedSchedulers, nil
close(p.schedulerPauseCh)
}
19 changes: 13 additions & 6 deletions pkg/pdutil/pd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,30 @@ func (s *testPDControllerSuite) TestScheduler(c *C) {
mock := func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) {
return nil, errors.New("failed")
}
pdController := &PdController{addrs: []string{"", ""}}
err := pdController.removeSchedulerWith(ctx, scheduler, mock)
schedulerPauseCh := make(chan struct{})
pdController := &PdController{addrs: []string{"", ""}, schedulerPauseCh: schedulerPauseCh}
_, err := pdController.pauseSchedulersWith(ctx, []string{scheduler}, mock)
c.Assert(err, ErrorMatches, "failed")

err = pdController.addSchedulerWith(ctx, scheduler, mock)
c.Assert(err, ErrorMatches, "failed")
go func() {
<-schedulerPauseCh
}()
err = pdController.resumeSchedulerWith(ctx, []string{scheduler}, mock)
c.Assert(err, IsNil)

_, err = pdController.listSchedulersWith(ctx, mock)
c.Assert(err, ErrorMatches, "failed")

mock = func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) {
return []byte(`["` + scheduler + `"]`), nil
}
err = pdController.removeSchedulerWith(ctx, scheduler, mock)
_, err = pdController.pauseSchedulersWith(ctx, []string{scheduler}, mock)
c.Assert(err, IsNil)

err = pdController.addSchedulerWith(ctx, scheduler, mock)
go func() {
<-schedulerPauseCh
}()
err = pdController.resumeSchedulerWith(ctx, []string{scheduler}, mock)
c.Assert(err, IsNil)

schedulers, err := pdController.listSchedulersWith(ctx, mock)
Expand Down
36 changes: 26 additions & 10 deletions tests/br_other/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ sleep 1
curl "http://localhost:$PPROF_PORT/debug/pprof/trace?seconds=1" 2>&1 > /dev/null
echo "pprof started..."

curl http://$PD_ADDR/pd/api/v1/config/schedule | grep '"disable": true'
curl http://$PD_ADDR/pd/api/v1/config/schedule | grep '"disable": false'

backup_fail=0
echo "another backup start expect to fail due to last backup add a lockfile"
Expand All @@ -88,6 +88,13 @@ if [ "$backup_fail" -ne "1" ];then
exit 1
fi

# check is there still exists scheduler not in pause.
pause_schedulers=$(curl http://$PD_ADDR/pd/api/v1/schedulers?status="paused" | grep "scheduler" | wc -l)
if [ "$pause_schedulers" -ne "3" ];then
echo "TEST: [$TEST_NAME] failed because paused scheduler are not enough"
exit 1
fi

if ps -p $_pid > /dev/null
then
echo "$_pid is running"
Expand All @@ -101,29 +108,38 @@ fi
# make sure we won't stuck in non-scheduler state, even we send a SIGTERM to it.
# give enough time to BR so it can gracefully stop.
sleep 5
if curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '[."schedulers-v2"][0][0]' | grep -q '"disable": false'
if curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '[."schedulers-v2"][0][0]' | grep -q '"disable": true'
then
echo "TEST: [$TEST_NAME] failed because scheduler has not been removed"
echo "TEST: [$TEST_NAME] failed because scheduler has been removed"
exit 1
fi

pd_settings=5
# we need reset pd scheduler/config to default
# until pd has the solution to temporary set these scheduler/configs.
run_br validate reset-pd-config-as-default

# max-merge-region-size set to default 20
curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-size"' | grep "20" || ((pd_settings--))
# check is there still exists scheduler in pause.
pause_schedulers=$(curl http://$PD_ADDR/pd/api/v1/schedulers?status="paused" | grep "scheduler" | wc -l)
if [ "$pause_schedulers" -ne "3" ];then
echo "TEST: [$TEST_NAME] failed because paused scheduler has changed"
exit 1
fi

# max-merge-region-keys set to default 200000
curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-keys"' | grep "200000" || ((pd_settings--))
# balance-region scheduler enabled
curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."schedulers-v2"[] | {disable: .disable, type: ."type" | select (.=="balance-region")}' | grep '"disable": false' || ((pd_settings--))
# balance-leader scheduler enabled
curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."schedulers-v2"[] | {disable: .disable, type: ."type" | select (.=="balance-leader")}' | grep '"disable": false' || ((pd_settings--))
# hot region scheduler enabled
curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."schedulers-v2"[] | {disable: .disable, type: ."type" | select (.=="hot-region")}' | grep '"disable": false' || ((pd_settings--))

# we need reset pd config to default
# until pd has the solution to temporary set these scheduler/configs.
run_br validate reset-pd-config-as-default --pd $PD_ADDR

# max-merge-region-size set to default 20
curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-size"' | grep "20" || ((pd_settings--))

# max-merge-region-keys set to default 200000
curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-keys"' | grep "200000" || ((pd_settings--))

if [ "$pd_settings" -ne "5" ];then
echo "TEST: [$TEST_NAME] test validate reset pd config failed!"
exit 1
Expand Down