Skip to content

Commit

Permalink
Merge branch 'master' into fix-race12
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Nov 23, 2023
2 parents ca42385 + c0dad35 commit 45cefa7
Show file tree
Hide file tree
Showing 27 changed files with 612 additions and 291 deletions.
22 changes: 11 additions & 11 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,16 @@ var (
)

type balanceLeaderSchedulerConfig struct {
mu syncutil.RWMutex
syncutil.RWMutex
storage endpoint.ConfigStorage
Ranges []core.KeyRange `json:"ranges"`
// Batch is used to generate multiple operators by one scheduling
Batch int `json:"batch"`
}

func (conf *balanceLeaderSchedulerConfig) Update(data []byte) (int, interface{}) {
conf.mu.Lock()
defer conf.mu.Unlock()
conf.Lock()
defer conf.Unlock()

oldc, _ := json.Marshal(conf)

Expand Down Expand Up @@ -109,8 +109,8 @@ func (conf *balanceLeaderSchedulerConfig) validate() bool {
}

func (conf *balanceLeaderSchedulerConfig) Clone() *balanceLeaderSchedulerConfig {
conf.mu.RLock()
defer conf.mu.RUnlock()
conf.RLock()
defer conf.RUnlock()
ranges := make([]core.KeyRange, len(conf.Ranges))
copy(ranges, conf.Ranges)
return &balanceLeaderSchedulerConfig{
Expand Down Expand Up @@ -210,14 +210,14 @@ func (l *balanceLeaderScheduler) GetType() string {
}

func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) {
l.conf.mu.RLock()
defer l.conf.mu.RUnlock()
l.conf.RLock()
defer l.conf.RUnlock()
return EncodeConfig(l.conf)
}

func (l *balanceLeaderScheduler) ReloadConfig() error {
l.conf.mu.Lock()
defer l.conf.mu.Unlock()
l.conf.Lock()
defer l.conf.Unlock()
cfgData, err := l.conf.storage.LoadSchedulerConfig(l.GetName())
if err != nil {
return err
Expand Down Expand Up @@ -335,8 +335,8 @@ func (cs *candidateStores) resortStoreWithPos(pos int) {
}

func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
l.conf.mu.RLock()
defer l.conf.mu.RUnlock()
l.conf.RLock()
defer l.conf.RUnlock()
basePlan := plan.NewBalanceSchedulerPlan()
var collector *plan.Collector
if dryRun {
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var (
type balanceRegionSchedulerConfig struct {
Name string `json:"name"`
Ranges []core.KeyRange `json:"ranges"`
// TODO: When we prepare to use Ranges, we will need to implement the ReloadConfig function for this scheduler.
}

type balanceRegionScheduler struct {
Expand Down
22 changes: 11 additions & 11 deletions pkg/schedule/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ const (
)

type balanceWitnessSchedulerConfig struct {
mu syncutil.RWMutex
syncutil.RWMutex
storage endpoint.ConfigStorage
Ranges []core.KeyRange `json:"ranges"`
// Batch is used to generate multiple operators by one scheduling
Batch int `json:"batch"`
}

func (conf *balanceWitnessSchedulerConfig) Update(data []byte) (int, interface{}) {
conf.mu.Lock()
defer conf.mu.Unlock()
conf.Lock()
defer conf.Unlock()

oldc, _ := json.Marshal(conf)

Expand Down Expand Up @@ -95,8 +95,8 @@ func (conf *balanceWitnessSchedulerConfig) validate() bool {
}

func (conf *balanceWitnessSchedulerConfig) Clone() *balanceWitnessSchedulerConfig {
conf.mu.RLock()
defer conf.mu.RUnlock()
conf.RLock()
defer conf.RUnlock()
ranges := make([]core.KeyRange, len(conf.Ranges))
copy(ranges, conf.Ranges)
return &balanceWitnessSchedulerConfig{
Expand Down Expand Up @@ -205,14 +205,14 @@ func (b *balanceWitnessScheduler) GetType() string {
}

func (b *balanceWitnessScheduler) EncodeConfig() ([]byte, error) {
b.conf.mu.RLock()
defer b.conf.mu.RUnlock()
b.conf.RLock()
defer b.conf.RUnlock()
return EncodeConfig(b.conf)
}

func (b *balanceWitnessScheduler) ReloadConfig() error {
b.conf.mu.Lock()
defer b.conf.mu.Unlock()
b.conf.Lock()
defer b.conf.Unlock()
cfgData, err := b.conf.storage.LoadSchedulerConfig(b.GetName())
if err != nil {
return err
Expand All @@ -238,8 +238,8 @@ func (b *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.SchedulerCluste
}

func (b *balanceWitnessScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
b.conf.mu.RLock()
defer b.conf.mu.RUnlock()
b.conf.RLock()
defer b.conf.RUnlock()
basePlan := plan.NewBalanceSchedulerPlan()
var collector *plan.Collector
if dryRun {
Expand Down
72 changes: 28 additions & 44 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,16 @@ var (
)

type evictLeaderSchedulerConfig struct {
mu syncutil.RWMutex
syncutil.RWMutex
storage endpoint.ConfigStorage
StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"`
cluster *core.BasicCluster
removeSchedulerCb func(string) error
}

func (conf *evictLeaderSchedulerConfig) getStores() []uint64 {
conf.mu.RLock()
defer conf.mu.RUnlock()
conf.RLock()
defer conf.RUnlock()
stores := make([]uint64, 0, len(conf.StoreIDWithRanges))
for storeID := range conf.StoreIDWithRanges {
stores = append(stores, storeID)
Expand All @@ -86,15 +86,15 @@ func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error {
if err != nil {
return err
}
conf.mu.Lock()
defer conf.mu.Unlock()
conf.Lock()
defer conf.Unlock()
conf.StoreIDWithRanges[id] = ranges
return nil
}

func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig {
conf.mu.RLock()
defer conf.mu.RUnlock()
conf.RLock()
defer conf.RUnlock()
storeIDWithRanges := make(map[uint64][]core.KeyRange)
for id, ranges := range conf.StoreIDWithRanges {
storeIDWithRanges[id] = append(storeIDWithRanges[id], ranges...)
Expand All @@ -106,8 +106,8 @@ func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig {

func (conf *evictLeaderSchedulerConfig) Persist() error {
name := conf.getSchedulerName()
conf.mu.RLock()
defer conf.mu.RUnlock()
conf.RLock()
defer conf.RUnlock()
data, err := EncodeConfig(conf)
failpoint.Inject("persistFail", func() {
err = errors.New("fail to persist")
Expand All @@ -123,8 +123,8 @@ func (conf *evictLeaderSchedulerConfig) getSchedulerName() string {
}

func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string {
conf.mu.RLock()
defer conf.mu.RUnlock()
conf.RLock()
defer conf.RUnlock()
ranges := conf.StoreIDWithRanges[id]
res := make([]string, 0, len(ranges)*2)
for index := range ranges {
Expand All @@ -134,8 +134,8 @@ func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string {
}

func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last bool) {
conf.mu.Lock()
defer conf.mu.Unlock()
conf.Lock()
defer conf.Unlock()
_, exists := conf.StoreIDWithRanges[id]
succ, last = false, false
if exists {
Expand All @@ -148,15 +148,15 @@ func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last
}

func (conf *evictLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) {
conf.mu.Lock()
defer conf.mu.Unlock()
conf.Lock()
defer conf.Unlock()
conf.cluster.PauseLeaderTransfer(id)
conf.StoreIDWithRanges[id] = keyRange
}

func (conf *evictLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange {
conf.mu.RLock()
defer conf.mu.RUnlock()
conf.RLock()
defer conf.RUnlock()
if ranges, exist := conf.StoreIDWithRanges[id]; exist {
return ranges
}
Expand Down Expand Up @@ -199,14 +199,14 @@ func (s *evictLeaderScheduler) GetType() string {
}

func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
s.conf.RLock()
defer s.conf.RUnlock()
return EncodeConfig(s.conf)
}

func (s *evictLeaderScheduler) ReloadConfig() error {
s.conf.mu.Lock()
defer s.conf.mu.Unlock()
s.conf.Lock()
defer s.conf.Unlock()
cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName())
if err != nil {
return err
Expand All @@ -223,25 +223,9 @@ func (s *evictLeaderScheduler) ReloadConfig() error {
return nil
}

// pauseAndResumeLeaderTransfer checks the old and new store IDs, and pause or resume the leader transfer.
func pauseAndResumeLeaderTransfer(cluster *core.BasicCluster, old, new map[uint64][]core.KeyRange) {
for id := range old {
if _, ok := new[id]; ok {
continue
}
cluster.ResumeLeaderTransfer(id)
}
for id := range new {
if _, ok := old[id]; ok {
continue
}
cluster.PauseLeaderTransfer(id)
}
}

func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
s.conf.RLock()
defer s.conf.RUnlock()
var res error
for id := range s.conf.StoreIDWithRanges {
if err := cluster.PauseLeaderTransfer(id); err != nil {
Expand All @@ -252,8 +236,8 @@ func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) erro
}

func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
s.conf.RLock()
defer s.conf.RUnlock()
for id := range s.conf.StoreIDWithRanges {
cluster.ResumeLeaderTransfer(id)
}
Expand Down Expand Up @@ -382,15 +366,15 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
idFloat, ok := input["store_id"].(float64)
if ok {
id = (uint64)(idFloat)
handler.config.mu.RLock()
handler.config.RLock()
if _, exists = handler.config.StoreIDWithRanges[id]; !exists {
if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil {
handler.config.mu.RUnlock()
handler.config.RUnlock()
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}
handler.config.mu.RUnlock()
handler.config.RUnlock()
args = append(args, strconv.FormatUint(id, 10))
}

Expand Down
Loading

0 comments on commit 45cefa7

Please sign in to comment.