Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: rafactor persist config #7598

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ type balanceLeaderSchedulerConfig struct {
Batch int `json:"batch"`
}

func (conf *balanceLeaderSchedulerConfig) getStorage() endpoint.ConfigStorage {
return conf.storage
}

func (*balanceLeaderSchedulerConfig) getSchedulerName() string {
return BalanceLeaderName
}

func (conf *balanceLeaderSchedulerConfig) Update(data []byte) (int, any) {
conf.Lock()
defer conf.Unlock()
Expand All @@ -89,7 +97,7 @@ func (conf *balanceLeaderSchedulerConfig) Update(data []byte) (int, any) {
json.Unmarshal(oldConfig, conf)
return http.StatusBadRequest, "invalid batch size which should be an integer between 1 and 10"
}
conf.persistLocked()
saveSchedulerConfig(conf)
log.Info("balance-leader-scheduler config is updated", zap.ByteString("old", oldConfig), zap.ByteString("new", newConfig))
return http.StatusOK, "Config is updated."
}
Expand Down Expand Up @@ -119,14 +127,6 @@ func (conf *balanceLeaderSchedulerConfig) Clone() *balanceLeaderSchedulerConfig
}
}

func (conf *balanceLeaderSchedulerConfig) persistLocked() error {
data, err := EncodeConfig(conf)
if err != nil {
return err
}
return conf.storage.SaveSchedulerConfig(BalanceLeaderName, data)
}

func (conf *balanceLeaderSchedulerConfig) getBatch() int {
conf.RLock()
defer conf.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1400,7 +1400,7 @@ func TestConcurrencyUpdateConfig(t *testing.T) {
default:
}
sche.config.BuildWithArgs(args)
re.NoError(sche.config.Persist())
re.NoError(saveSchedulerConfig(sche.config))
}
}()
for i := 0; i < 1000; i++ {
Expand Down
22 changes: 13 additions & 9 deletions pkg/schedule/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ type balanceWitnessSchedulerConfig struct {
Batch int `json:"batch"`
}

func (conf *balanceWitnessSchedulerConfig) getStorage() endpoint.ConfigStorage {
return conf.storage
}

func (*balanceWitnessSchedulerConfig) getSchedulerName() string {
return BalanceWitnessName
}

func (conf *balanceWitnessSchedulerConfig) Update(data []byte) (int, any) {
conf.Lock()
defer conf.Unlock()
Expand All @@ -75,7 +83,11 @@ func (conf *balanceWitnessSchedulerConfig) Update(data []byte) (int, any) {
json.Unmarshal(oldc, conf)
return http.StatusBadRequest, "invalid batch size which should be an integer between 1 and 10"
}
conf.persistLocked()
err := saveSchedulerConfig(conf)
if err != nil {
log.Error("failed to save balance-witness-scheduler config", errs.ZapError(err))
return http.StatusInternalServerError, err.Error()
}
log.Info("balance-witness-scheduler config is updated", zap.ByteString("old", oldc), zap.ByteString("new", newc))
return http.StatusOK, "Config is updated."
}
Expand Down Expand Up @@ -105,14 +117,6 @@ func (conf *balanceWitnessSchedulerConfig) Clone() *balanceWitnessSchedulerConfi
}
}

func (conf *balanceWitnessSchedulerConfig) persistLocked() error {
data, err := EncodeConfig(conf)
if err != nil {
return err
}
return conf.storage.SaveSchedulerConfig(BalanceWitnessName, data)
}

func (conf *balanceWitnessSchedulerConfig) getBatch() int {
conf.RLock()
defer conf.RUnlock()
Expand Down
27 changes: 10 additions & 17 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/gorilla/mux"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
Expand Down Expand Up @@ -104,24 +103,14 @@ func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig {
}
}

func (conf *evictLeaderSchedulerConfig) Persist() error {
name := conf.getSchedulerName()
conf.RLock()
defer conf.RUnlock()
data, err := EncodeConfig(conf)
failpoint.Inject("persistFail", func() {
err = errors.New("fail to persist")
})
if err != nil {
return err
}
return conf.storage.SaveSchedulerConfig(name, data)
}

func (*evictLeaderSchedulerConfig) getSchedulerName() string {
return EvictLeaderName
}

func (conf *evictLeaderSchedulerConfig) getStorage() endpoint.ConfigStorage {
return conf.storage
}

func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string {
conf.RLock()
defer conf.RUnlock()
Expand Down Expand Up @@ -386,7 +375,9 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
}

handler.config.BuildWithArgs(args)
err := handler.config.Persist()
handler.config.RLock()
defer handler.config.RUnlock()
err := saveSchedulerConfig(handler.config)
if err != nil {
handler.config.removeStore(id)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
Expand All @@ -412,7 +403,9 @@ func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R
keyRanges := handler.config.getKeyRangesByID(id)
succ, last := handler.config.removeStore(id)
if succ {
err = handler.config.Persist()
handler.config.RLock()
defer handler.config.RUnlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it have a dead lock problem?

err = saveSchedulerConfig(handler.config)
if err != nil {
handler.config.resetStore(id, keyRanges)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
Expand Down
22 changes: 9 additions & 13 deletions pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,12 @@ func (conf *evictSlowStoreSchedulerConfig) Clone() *evictSlowStoreSchedulerConfi
}
}

func (conf *evictSlowStoreSchedulerConfig) persistLocked() error {
name := EvictSlowStoreName
data, err := EncodeConfig(conf)
failpoint.Inject("persistFail", func() {
err = errors.New("fail to persist")
})
if err != nil {
return err
}
return conf.storage.SaveSchedulerConfig(name, data)
func (conf *evictSlowStoreSchedulerConfig) getStorage() endpoint.ConfigStorage {
return conf.storage
}

func (*evictSlowStoreSchedulerConfig) getSchedulerName() string {
return EvictSlowStoreName
}

func (conf *evictSlowStoreSchedulerConfig) getStores() []uint64 {
Expand Down Expand Up @@ -122,7 +118,7 @@ func (conf *evictSlowStoreSchedulerConfig) setStoreAndPersist(id uint64) error {
defer conf.Unlock()
conf.EvictedStores = []uint64{id}
conf.lastSlowStoreCaptureTS = time.Now()
return conf.persistLocked()
return saveSchedulerConfig(conf)
}

func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err error) {
Expand All @@ -132,7 +128,7 @@ func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err
if oldID > 0 {
conf.EvictedStores = []uint64{}
conf.lastSlowStoreCaptureTS = time.Time{}
err = conf.persistLocked()
err = saveSchedulerConfig(conf)
}
return
}
Expand Down Expand Up @@ -168,7 +164,7 @@ func (handler *evictSlowStoreHandler) UpdateConfig(w http.ResponseWriter, r *htt
prevRecoveryDurationGap := handler.config.RecoveryDurationGap
recoveryDurationGap := uint64(recoveryDurationGapFloat)
handler.config.RecoveryDurationGap = recoveryDurationGap
if err := handler.config.persistLocked(); err != nil {
if err := saveSchedulerConfig(handler.config); err != nil {
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
handler.config.RecoveryDurationGap = prevRecoveryDurationGap
return
Expand Down
26 changes: 11 additions & 15 deletions pkg/schedule/schedulers/evict_slow_trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ func initEvictSlowTrendSchedulerConfig(storage endpoint.ConfigStorage) *evictSlo
}
}

func (conf *evictSlowTrendSchedulerConfig) getStorage() endpoint.ConfigStorage {
return conf.storage
}

func (*evictSlowTrendSchedulerConfig) getSchedulerName() string {
return EvictSlowTrendName
}

func (conf *evictSlowTrendSchedulerConfig) Clone() *evictSlowTrendSchedulerConfig {
conf.RLock()
defer conf.RUnlock()
Expand All @@ -85,18 +93,6 @@ func (conf *evictSlowTrendSchedulerConfig) Clone() *evictSlowTrendSchedulerConfi
}
}

func (conf *evictSlowTrendSchedulerConfig) persistLocked() error {
name := EvictSlowTrendName
data, err := EncodeConfig(conf)
failpoint.Inject("persistFail", func() {
err = errors.New("fail to persist")
})
if err != nil {
return err
}
return conf.storage.SaveSchedulerConfig(name, data)
}

func (conf *evictSlowTrendSchedulerConfig) getStores() []uint64 {
conf.RLock()
defer conf.RUnlock()
Expand Down Expand Up @@ -203,7 +199,7 @@ func (conf *evictSlowTrendSchedulerConfig) setStoreAndPersist(id uint64) error {
conf.Lock()
defer conf.Unlock()
conf.EvictedStores = []uint64{id}
return conf.persistLocked()
return saveSchedulerConfig(conf)
}

func (conf *evictSlowTrendSchedulerConfig) clearAndPersist(cluster sche.SchedulerCluster) (oldID uint64, err error) {
Expand All @@ -220,7 +216,7 @@ func (conf *evictSlowTrendSchedulerConfig) clearAndPersist(cluster sche.Schedule
conf.Lock()
defer conf.Unlock()
conf.EvictedStores = []uint64{}
return oldID, conf.persistLocked()
return oldID, saveSchedulerConfig(conf)
}

type evictSlowTrendHandler struct {
Expand Down Expand Up @@ -254,7 +250,7 @@ func (handler *evictSlowTrendHandler) UpdateConfig(w http.ResponseWriter, r *htt
prevRecoveryDurationGap := handler.config.RecoveryDurationGap
recoveryDurationGap := uint64(recoveryDurationGapFloat)
handler.config.RecoveryDurationGap = recoveryDurationGap
if err := handler.config.persistLocked(); err != nil {
if err := saveSchedulerConfig(handler.config); err != nil {
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
handler.config.RecoveryDurationGap = prevRecoveryDurationGap
return
Expand Down
16 changes: 5 additions & 11 deletions pkg/schedule/schedulers/grant_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,8 @@ func (conf *grantHotRegionSchedulerConfig) Clone() *grantHotRegionSchedulerConfi
}
}

func (conf *grantHotRegionSchedulerConfig) Persist() error {
name := conf.getSchedulerName()
conf.RLock()
defer conf.RUnlock()
data, err := EncodeConfig(conf)
if err != nil {
return err
}
return conf.storage.SaveSchedulerConfig(name, data)
func (conf *grantHotRegionSchedulerConfig) getStorage() endpoint.ConfigStorage {
return conf.storage
}

func (*grantHotRegionSchedulerConfig) getSchedulerName() string {
Expand Down Expand Up @@ -231,8 +224,9 @@ func (handler *grantHotRegionHandler) UpdateConfig(w http.ResponseWriter, r *htt
_ = handler.rd.JSON(w, http.StatusBadRequest, errs.ErrSchedulerConfig)
return
}

if err = handler.config.Persist(); err != nil {
handler.config.RLock()
defer handler.config.RUnlock()
if err = saveSchedulerConfig(handler.config); err != nil {
handler.config.SetStoreLeaderID(0)
_ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
Expand Down
19 changes: 8 additions & 11 deletions pkg/schedule/schedulers/grant_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,8 @@ func (conf *grantLeaderSchedulerConfig) Clone() *grantLeaderSchedulerConfig {
}
}

func (conf *grantLeaderSchedulerConfig) Persist() error {
name := conf.getSchedulerName()
conf.RLock()
defer conf.RUnlock()
data, err := EncodeConfig(conf)
if err != nil {
return err
}
return conf.storage.SaveSchedulerConfig(name, data)
func (conf *grantLeaderSchedulerConfig) getStorage() endpoint.ConfigStorage {
return conf.storage
}

func (*grantLeaderSchedulerConfig) getSchedulerName() string {
Expand Down Expand Up @@ -297,7 +290,9 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
}

handler.config.BuildWithArgs(args)
err := handler.config.Persist()
handler.config.RLock()
defer handler.config.RUnlock()
err := saveSchedulerConfig(handler.config)
if err != nil {
handler.config.removeStore(id)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
Expand All @@ -323,7 +318,9 @@ func (handler *grantLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R
keyRanges := handler.config.getKeyRangesByID(id)
succ, last := handler.config.removeStore(id)
if succ {
err = handler.config.Persist()
handler.config.RLock()
defer handler.config.RUnlock()
err = saveSchedulerConfig(handler.config)
if err != nil {
handler.config.resetStore(id, keyRanges)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
Expand Down
14 changes: 7 additions & 7 deletions pkg/schedule/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ func (conf *hotRegionSchedulerConfig) handleSetConfig(w http.ResponseWriter, r *
}
newc, _ := json.Marshal(conf)
if !bytes.Equal(oldc, newc) {
conf.persistLocked()
saveSchedulerConfig(conf)
log.Info("hot-region-scheduler config is updated", zap.String("old", string(oldc)), zap.String("new", string(newc)))
rd.Text(w, http.StatusOK, "Config is updated.")
return
Expand All @@ -473,12 +473,12 @@ func (conf *hotRegionSchedulerConfig) handleSetConfig(w http.ResponseWriter, r *
rd.Text(w, http.StatusBadRequest, "Config item is not found.")
}

func (conf *hotRegionSchedulerConfig) persistLocked() error {
data, err := EncodeConfig(conf)
if err != nil {
return err
}
return conf.storage.SaveSchedulerConfig(HotRegionName, data)
func (conf *hotRegionSchedulerConfig) getStorage() endpoint.ConfigStorage {
return conf.storage
}

func (*hotRegionSchedulerConfig) getSchedulerName() string {
return HotRegionName
}

func (conf *hotRegionSchedulerConfig) checkQuerySupport(cluster sche.SchedulerCluster) bool {
Expand Down
19 changes: 7 additions & 12 deletions pkg/schedule/schedulers/scatter_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,6 @@ func (conf *scatterRangeSchedulerConfig) Clone() *scatterRangeSchedulerConfig {
}
}

func (conf *scatterRangeSchedulerConfig) Persist() error {
name := conf.getSchedulerName()
conf.RLock()
defer conf.RUnlock()
data, err := EncodeConfig(conf)
if err != nil {
return err
}
return conf.storage.SaveSchedulerConfig(name, data)
}

func (conf *scatterRangeSchedulerConfig) GetRangeName() string {
conf.RLock()
defer conf.RUnlock()
Expand All @@ -114,6 +103,10 @@ func (conf *scatterRangeSchedulerConfig) getSchedulerName() string {
return fmt.Sprintf("scatter-range-%s", conf.RangeName)
}

func (conf *scatterRangeSchedulerConfig) getStorage() endpoint.ConfigStorage {
return conf.storage
}

type scatterRangeScheduler struct {
*BaseScheduler
name string
Expand Down Expand Up @@ -275,7 +268,9 @@ func (handler *scatterRangeHandler) UpdateConfig(w http.ResponseWriter, r *http.
args = append(args, string(handler.config.GetEndKey()))
}
handler.config.BuildWithArgs(args)
err := handler.config.Persist()
handler.config.RLock()
defer handler.config.RUnlock()
err := saveSchedulerConfig(handler.config)
if err != nil {
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
}
Expand Down
Loading
Loading