Skip to content

Commit

Permalink
Merge branch 'master' into fix-progress
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored Jun 27, 2024
2 parents bf6f902 + ab7f903 commit 15e4568
Show file tree
Hide file tree
Showing 28 changed files with 253 additions and 252 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,6 @@ issues:
- path: (pd-analysis|pd-api-bench|pd-backup|pd-ctl|pd-heartbeat-bench|pd-recover|pd-simulator|pd-tso-bench|pd-ut|regions-dump|stores-dump)
linters:
- errcheck
- path: (pkg/tso/admin.go|pkg/schedule/schedulers/split_bucket.go|server/api/plugin_disable.go|server/api/plugin_disable.go|server/api/operator.go|server/api/region.go|pkg/schedule/schedulers/balance_leader.go|server/api/.*\.go|pkg/replication/replication_mode.go|pkg/storage/endpoint/gc_safe_point.go|server/.*\.go|pkg/schedule/schedulers/.*\.go|pkg/syncer/server.go)
- path: (pkg/tso/admin.go|pkg/schedule/schedulers/split_bucket.go|server/api/plugin_disable.go|server/api/plugin_disable.go|server/api/operator.go|server/api/region.go|pkg/schedule/schedulers/balance_leader.go|pkg/replication/replication_mode.go|pkg/storage/endpoint/gc_safe_point.go|server/.*\.go|pkg/syncer/server.go)
linters:
- errcheck
10 changes: 9 additions & 1 deletion pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1477,7 +1477,7 @@ const (
PendingPeerInSubTree SubTreeRegionType = "pending"
)

// GetStoreRegions gets all RegionInfo with a given storeID
// GetStoreRegionsByTypeInSubTree gets all RegionInfo with a given storeID
func (r *RegionsInfo) GetStoreRegionsByTypeInSubTree(storeID uint64, typ SubTreeRegionType) ([]*RegionInfo, error) {
r.st.RLock()
var regions []*RegionInfo
Expand Down Expand Up @@ -2210,3 +2210,11 @@ func NewTestRegionInfo(regionID, storeID uint64, start, end []byte, opts ...Regi
}
return NewRegionInfo(metaRegion, leader, opts...)
}

// TraverseRegions executes a function on all regions.
// ONLY for simulator now and function need to be self-locked.
func (r *RegionsInfo) TraverseRegions(lockedFunc func(*RegionInfo)) {
for _, item := range r.regions {
lockedFunc(item.RegionInfo)
}
}
9 changes: 7 additions & 2 deletions pkg/schedule/config/store_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,14 @@ func (c *StoreConfig) CheckRegionSize(size, mergeSize uint64) error {
if size < c.GetRegionMaxSize() {
return nil
}

// This could happen when the region split size is set to a value less than 1MiB,
// which is a very extreme case, we just pass the check here to prevent panic.
regionSplitSize := c.GetRegionSplitSize()
if regionSplitSize == 0 {
return nil
}
// the smallest of the split regions can not be merge again, so it's size should less merge size.
if smallSize := size % c.GetRegionSplitSize(); smallSize <= mergeSize && smallSize != 0 {
if smallSize := size % regionSplitSize; smallSize <= mergeSize && smallSize != 0 {
log.Debug("region size is too small", zap.Uint64("size", size), zap.Uint64("merge-size", mergeSize), zap.Uint64("small-size", smallSize))
return errs.ErrCheckerMergeAgain.FastGenByArgs("the smallest region of the split regions is less than max-merge-region-size, " +
"it will be merged again")
Expand Down
5 changes: 5 additions & 0 deletions pkg/schedule/config/store_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,9 @@ func TestMergeCheck(t *testing.T) {
re.Error(config.CheckRegionKeys(v.keys, v.mergeKeys))
}
}
// Test CheckRegionSize when the region split size is 0.
config.RegionSplitSize = "100KiB"
config.Adjust()
re.Empty(config.GetRegionSplitSize())
re.NoError(config.CheckRegionSize(defaultRegionMaxSize, 50))
}
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@ func (handler *balanceLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http
data, _ := io.ReadAll(r.Body)
r.Body.Close()
httpCode, v := handler.config.Update(data)
handler.rd.JSON(w, httpCode, v)
_ = handler.rd.JSON(w, httpCode, v)
}

func (handler *balanceLeaderHandler) ListConfig(w http.ResponseWriter, _ *http.Request) {
conf := handler.config.Clone()
handler.rd.JSON(w, http.StatusOK, conf)
_ = handler.rd.JSON(w, http.StatusOK, conf)
}

type balanceLeaderScheduler struct {
Expand Down
12 changes: 8 additions & 4 deletions pkg/schedule/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,14 @@ func (conf *balanceWitnessSchedulerConfig) Update(data []byte) (int, any) {
newc, _ := json.Marshal(conf)
if !bytes.Equal(oldc, newc) {
if !conf.validateLocked() {
json.Unmarshal(oldc, conf)
if err := json.Unmarshal(oldc, conf); err != nil {
return http.StatusInternalServerError, err.Error()
}
return http.StatusBadRequest, "invalid batch size which should be an integer between 1 and 10"
}
conf.persistLocked()
if err := conf.persistLocked(); err != nil {
log.Warn("failed to persist config", zap.Error(err))
}
log.Info("balance-witness-scheduler config is updated", zap.ByteString("old", oldc), zap.ByteString("new", newc))
return http.StatusOK, "Config is updated."
}
Expand Down Expand Up @@ -147,12 +151,12 @@ func (handler *balanceWitnessHandler) UpdateConfig(w http.ResponseWriter, r *htt
data, _ := io.ReadAll(r.Body)
r.Body.Close()
httpCode, v := handler.config.Update(data)
handler.rd.JSON(w, httpCode, v)
_ = handler.rd.JSON(w, httpCode, v)
}

func (handler *balanceWitnessHandler) ListConfig(w http.ResponseWriter, _ *http.Request) {
conf := handler.config.Clone()
handler.rd.JSON(w, http.StatusOK, conf)
_ = handler.rd.JSON(w, http.StatusOK, conf)
}

type balanceWitnessScheduler struct {
Expand Down
33 changes: 20 additions & 13 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/unrolled/render"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -150,7 +151,9 @@ func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last
func (conf *evictLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) {
conf.Lock()
defer conf.Unlock()
conf.cluster.PauseLeaderTransfer(id)
if err := conf.cluster.PauseLeaderTransfer(id); err != nil {
log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err))
}
conf.StoreIDWithRanges[id] = keyRange
}

Expand Down Expand Up @@ -370,7 +373,7 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
if _, exists = handler.config.StoreIDWithRanges[id]; !exists {
if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil {
handler.config.RUnlock()
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
_ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}
Expand All @@ -385,26 +388,30 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
args = append(args, handler.config.getRanges(id)...)
}

handler.config.BuildWithArgs(args)
err := handler.config.Persist()
err := handler.config.BuildWithArgs(args)
if err != nil {
_ = handler.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
err = handler.config.Persist()
if err != nil {
handler.config.removeStore(id)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
_ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.")
_ = handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.")
}

func (handler *evictLeaderHandler) ListConfig(w http.ResponseWriter, _ *http.Request) {
conf := handler.config.Clone()
handler.rd.JSON(w, http.StatusOK, conf)
_ = handler.rd.JSON(w, http.StatusOK, conf)
}

func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.Request) {
idStr := mux.Vars(r)["store_id"]
id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
handler.rd.JSON(w, http.StatusBadRequest, err.Error())
_ = handler.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}

Expand All @@ -415,26 +422,26 @@ func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R
err = handler.config.Persist()
if err != nil {
handler.config.resetStore(id, keyRanges)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
_ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
if last {
if err := handler.config.removeSchedulerCb(EvictLeaderName); err != nil {
if errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) {
handler.rd.JSON(w, http.StatusNotFound, err.Error())
_ = handler.rd.JSON(w, http.StatusNotFound, err.Error())
} else {
handler.config.resetStore(id, keyRanges)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
_ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
}
return
}
resp = lastStoreDeleteInfo
}
handler.rd.JSON(w, http.StatusOK, resp)
_ = handler.rd.JSON(w, http.StatusOK, resp)
return
}

handler.rd.JSON(w, http.StatusNotFound, errs.ErrScheduleConfigNotExist.FastGenByArgs().Error())
_ = handler.rd.JSON(w, http.StatusNotFound, errs.ErrScheduleConfigNotExist.FastGenByArgs().Error())
}

func newEvictLeaderHandler(config *evictLeaderSchedulerConfig) http.Handler {
Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (handler *evictSlowStoreHandler) UpdateConfig(w http.ResponseWriter, r *htt
}
recoveryDurationGapFloat, ok := input["recovery-duration"].(float64)
if !ok {
handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error())
_ = handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error())
return
}
handler.config.Lock()
Expand All @@ -169,17 +169,17 @@ func (handler *evictSlowStoreHandler) UpdateConfig(w http.ResponseWriter, r *htt
recoveryDurationGap := uint64(recoveryDurationGapFloat)
handler.config.RecoveryDurationGap = recoveryDurationGap
if err := handler.config.persistLocked(); err != nil {
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
_ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
handler.config.RecoveryDurationGap = prevRecoveryDurationGap
return
}
log.Info("evict-slow-store-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap))
handler.rd.JSON(w, http.StatusOK, "Config updated.")
_ = handler.rd.JSON(w, http.StatusOK, "Config updated.")
}

func (handler *evictSlowStoreHandler) ListConfig(w http.ResponseWriter, _ *http.Request) {
conf := handler.config.Clone()
handler.rd.JSON(w, http.StatusOK, conf)
_ = handler.rd.JSON(w, http.StatusOK, conf)
}

type evictSlowStoreScheduler struct {
Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/schedulers/evict_slow_trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (handler *evictSlowTrendHandler) UpdateConfig(w http.ResponseWriter, r *htt
}
recoveryDurationGapFloat, ok := input["recovery-duration"].(float64)
if !ok {
handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error())
_ = handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error())
return
}
handler.config.Lock()
Expand All @@ -255,17 +255,17 @@ func (handler *evictSlowTrendHandler) UpdateConfig(w http.ResponseWriter, r *htt
recoveryDurationGap := uint64(recoveryDurationGapFloat)
handler.config.RecoveryDurationGap = recoveryDurationGap
if err := handler.config.persistLocked(); err != nil {
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
_ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
handler.config.RecoveryDurationGap = prevRecoveryDurationGap
return
}
log.Info("evict-slow-trend-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap))
handler.rd.JSON(w, http.StatusOK, "Config updated.")
_ = handler.rd.JSON(w, http.StatusOK, "Config updated.")
}

func (handler *evictSlowTrendHandler) ListConfig(w http.ResponseWriter, _ *http.Request) {
conf := handler.config.Clone()
handler.rd.JSON(w, http.StatusOK, conf)
_ = handler.rd.JSON(w, http.StatusOK, conf)
}

type evictSlowTrendScheduler struct {
Expand Down
33 changes: 20 additions & 13 deletions pkg/schedule/schedulers/grant_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/unrolled/render"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -130,7 +131,9 @@ func (conf *grantLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last
func (conf *grantLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) {
conf.Lock()
defer conf.Unlock()
conf.cluster.PauseLeaderTransfer(id)
if err := conf.cluster.PauseLeaderTransfer(id); err != nil {
log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err))
}
conf.StoreIDWithRanges[id] = keyRange
}

Expand Down Expand Up @@ -281,7 +284,7 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
if _, exists = handler.config.StoreIDWithRanges[id]; !exists {
if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil {
handler.config.RUnlock()
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
_ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}
Expand All @@ -296,26 +299,30 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
args = append(args, handler.config.getRanges(id)...)
}

handler.config.BuildWithArgs(args)
err := handler.config.Persist()
err := handler.config.BuildWithArgs(args)
if err != nil {
_ = handler.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
err = handler.config.Persist()
if err != nil {
handler.config.removeStore(id)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
_ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.")
_ = handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.")
}

func (handler *grantLeaderHandler) ListConfig(w http.ResponseWriter, _ *http.Request) {
conf := handler.config.Clone()
handler.rd.JSON(w, http.StatusOK, conf)
_ = handler.rd.JSON(w, http.StatusOK, conf)
}

func (handler *grantLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.Request) {
idStr := mux.Vars(r)["store_id"]
id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
handler.rd.JSON(w, http.StatusBadRequest, err.Error())
_ = handler.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}

Expand All @@ -326,26 +333,26 @@ func (handler *grantLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R
err = handler.config.Persist()
if err != nil {
handler.config.resetStore(id, keyRanges)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
_ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
if last {
if err := handler.config.removeSchedulerCb(GrantLeaderName); err != nil {
if errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) {
handler.rd.JSON(w, http.StatusNotFound, err.Error())
_ = handler.rd.JSON(w, http.StatusNotFound, err.Error())
} else {
handler.config.resetStore(id, keyRanges)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
_ = handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
}
return
}
resp = lastStoreDeleteInfo
}
handler.rd.JSON(w, http.StatusOK, resp)
_ = handler.rd.JSON(w, http.StatusOK, resp)
return
}

handler.rd.JSON(w, http.StatusNotFound, errs.ErrScheduleConfigNotExist.FastGenByArgs().Error())
_ = handler.rd.JSON(w, http.StatusNotFound, errs.ErrScheduleConfigNotExist.FastGenByArgs().Error())
}

func newGrantLeaderHandler(config *grantLeaderSchedulerConfig) http.Handler {
Expand Down
Loading

0 comments on commit 15e4568

Please sign in to comment.