Skip to content

Commit

Permalink
improve flow update
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Feb 1, 2024
1 parent cd0ffba commit 0e4e8fb
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 49 deletions.
11 changes: 11 additions & 0 deletions pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,17 @@ func (h *Handler) RemoveOperator(regionID uint64) error {
return nil
}

// RemoveOperators removes the all operators.
func (h *Handler) RemoveOperators() error {
c, err := h.GetOperatorController()
if err != nil {
return err
}

c.RemoveOperators(operator.AdminStop)
return nil
}

// GetOperators returns the running operators.
func (h *Handler) GetOperators() ([]*operator.Operator, error) {
c, err := h.GetOperatorController()
Expand Down
35 changes: 35 additions & 0 deletions pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,41 @@ func (oc *Controller) ack(op *Operator) {
}
}

// RemoveOperators removes all operators from the running operators.
func (oc *Controller) RemoveOperators(reasons ...CancelReasonType) {
oc.Lock()
removed := oc.removeOperatorsLocked()
oc.Unlock()
var cancelReason CancelReasonType
if len(reasons) > 0 {
cancelReason = reasons[0]
}
for _, op := range removed {
if op.Cancel(cancelReason) {
log.Info("operator removed",
zap.Uint64("region-id", op.RegionID()),
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op))
}
oc.buryOperator(op)
}
}

func (oc *Controller) removeOperatorsLocked() []*Operator {
var removed []*Operator
for regionID, op := range oc.operators {
delete(oc.operators, regionID)
operatorCounter.WithLabelValues(op.Desc(), "remove").Inc()
oc.ack(op)
if op.Kind()&OpMerge != 0 {
oc.removeRelatedMergeOperator(op)
}
removed = append(removed, op)
}
oc.updateCounts(oc.operators)
return removed
}

// RemoveOperator removes an operator from the running operators.
func (oc *Controller) RemoveOperator(op *Operator, reasons ...CancelReasonType) bool {
oc.Lock()
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/rangelist/range_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,4 @@ func (l List) GetSplitKeys(start, end []byte) [][]byte {
}
return keys
}
-
15 changes: 15 additions & 0 deletions server/api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,21 @@ func (h *operatorHandler) GetOperators(w http.ResponseWriter, r *http.Request) {
}
}

// @Tags operator
// @Summary Cancel all pending operators.
// @Produce json
// @Success 200 {string} string "All pending operators are canceled."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators [delete]
func (h *operatorHandler) DeleteOperatorByRegion(w http.ResponseWriter, r *http.Request) {
if err := h.RemoveOperators(); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.r.JSON(w, http.StatusOK, "All pending operators are canceled.")
}

// FIXME: details of input json body params
// @Tags operator
// @Summary Create an operator.
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
operatorHandler := newOperatorHandler(handler, rd)
registerFunc(apiRouter, "/operators", operatorHandler.GetOperators, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(apiRouter, "/operators", operatorHandler.CreateOperator, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(apiRouter, "/operators", operatorHandler.DeleteOperators, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))
registerFunc(apiRouter, "/operators/records", operatorHandler.GetOperatorRecords, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(apiRouter, "/operators/{region_id}", operatorHandler.GetOperatorsByRegion, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(apiRouter, "/operators/{region_id}", operatorHandler.DeleteOperatorByRegion, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))
Expand Down
1 change: 0 additions & 1 deletion tools/pd-heartbeat-bench/config-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ round = 0
store-count = 100
region-count = 2000000

key-length = 56
replica = 3

leader-update-ratio = 0.06
Expand Down
28 changes: 19 additions & 9 deletions tools/pd-heartbeat-bench/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (
const (
defaultStoreCount = 50
defaultRegionCount = 1000000
defaultKeyLength = 56
defaultHotStoreCount = 0
defaultReplica = 3
defaultLeaderUpdateRatio = 0.06
defaultEpochUpdateRatio = 0.04
defaultSpaceUpdateRatio = 0.15
defaultFlowUpdateRatio = 0.35
defaultEpochUpdateRatio = 0.0
defaultSpaceUpdateRatio = 0.0
defaultFlowUpdateRatio = 0.0
defaultReportRatio = 1
defaultRound = 0
defaultSample = false
Expand All @@ -41,8 +41,8 @@ type Config struct {
Security configutil.SecurityConfig `toml:"security" json:"security"`

StoreCount int `toml:"store-count" json:"store-count"`
HotStoreCount int `toml:"hot-store-count" json:"hot-store-count"`
RegionCount int `toml:"region-count" json:"region-count"`
KeyLength int `toml:"key-length" json:"key-length"`
Replica int `toml:"replica" json:"replica"`
LeaderUpdateRatio float64 `toml:"leader-update-ratio" json:"leader-update-ratio"`
EpochUpdateRatio float64 `toml:"epoch-update-ratio" json:"epoch-update-ratio"`
Expand Down Expand Up @@ -117,10 +117,9 @@ func (c *Config) Adjust(meta *toml.MetaData) {
configutil.AdjustInt(&c.RegionCount, defaultRegionCount)
}

if !meta.IsDefined("key-length") {
configutil.AdjustInt(&c.KeyLength, defaultKeyLength)
if !meta.IsDefined("hot-store-count") {
configutil.AdjustInt(&c.HotStoreCount, defaultHotStoreCount)
}

if !meta.IsDefined("replica") {
configutil.AdjustInt(&c.Replica, defaultReplica)
}
Expand All @@ -147,6 +146,9 @@ func (c *Config) Adjust(meta *toml.MetaData) {

// Validate is used to validate configurations
func (c *Config) Validate() error {
if c.HotStoreCount < 0 || c.HotStoreCount > c.StoreCount {
return errors.Errorf("hot-store-count must be in [0, store-count]")
}
if c.ReportRatio < 0 || c.ReportRatio > 1 {
return errors.Errorf("report-ratio must be in [0, 1]")
}
Expand Down Expand Up @@ -174,7 +176,8 @@ func (c *Config) Clone() *Config {

// Options is the option of the heartbeat-bench.
type Options struct {
ReportRatio atomic.Value
HotStoreCount atomic.Value
ReportRatio atomic.Value

LeaderUpdateRatio atomic.Value
EpochUpdateRatio atomic.Value
Expand All @@ -185,6 +188,7 @@ type Options struct {
// NewOptions creates a new option.
func NewOptions(cfg *Config) *Options {
o := &Options{}
o.HotStoreCount.Store(cfg.HotStoreCount)
o.LeaderUpdateRatio.Store(cfg.LeaderUpdateRatio)
o.EpochUpdateRatio.Store(cfg.EpochUpdateRatio)
o.SpaceUpdateRatio.Store(cfg.SpaceUpdateRatio)
Expand All @@ -193,6 +197,11 @@ func NewOptions(cfg *Config) *Options {
return o
}

// GetHotStoreCount returns the hot store count.
func (o *Options) GetHotStoreCount() int {
return o.HotStoreCount.Load().(int)
}

// GetLeaderUpdateRatio returns the leader update ratio.
func (o *Options) GetLeaderUpdateRatio() float64 {
return o.LeaderUpdateRatio.Load().(float64)
Expand Down Expand Up @@ -220,6 +229,7 @@ func (o *Options) GetReportRatio() float64 {

// SetOptions sets the option.
func (o *Options) SetOptions(cfg *Config) {
o.HotStoreCount.Store(cfg.HotStoreCount)
o.LeaderUpdateRatio.Store(cfg.LeaderUpdateRatio)
o.EpochUpdateRatio.Store(cfg.EpochUpdateRatio)
o.SpaceUpdateRatio.Store(cfg.SpaceUpdateRatio)
Expand Down
84 changes: 45 additions & 39 deletions tools/pd-heartbeat-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/log"
"github.com/spf13/pflag"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/pkg/codec"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/tools/pd-heartbeat-bench/config"
Expand All @@ -46,9 +47,12 @@ import (
)

const (
bytesUnit = 8 * units.MiB
keysUint = 8 * units.KiB
queryUnit = 1 * units.KiB
bytesUnit = 128
keysUint = 8
queryUnit = 8
hotByteUnit = 16 * units.KiB
hotKeysUint = 256
hotQueryUnit = 256
regionReportInterval = 60 // 60s
storeReportInterval = 10 // 10s
capacity = 4 * units.TiB
Expand Down Expand Up @@ -168,18 +172,6 @@ func putStores(ctx context.Context, cfg *config.Config, cli pdpb.PDClient, store
}
}

func newStartKey(id uint64, keyLen int) []byte {
k := make([]byte, keyLen)
copy(k, fmt.Sprintf("%010d", id))
return k
}

func newEndKey(id uint64, keyLen int) []byte {
k := newStartKey(id, keyLen)
k[len(k)-1]++
return k
}

// Regions simulates all regions to heartbeat.
type Regions struct {
regions []*pdpb.RegionHeartbeatRequest
Expand All @@ -193,22 +185,21 @@ type Regions struct {
updateFlow []int
}

func (rs *Regions) init(cfg *config.Config, options *config.Options) []int {
func (rs *Regions) init(cfg *config.Config, options *config.Options) {
rs.regions = make([]*pdpb.RegionHeartbeatRequest, 0, cfg.RegionCount)
rs.updateRound = 0

// Generate regions
id := uint64(1)
now := uint64(time.Now().Unix())

keyLen := cfg.KeyLength
for i := 0; i < cfg.RegionCount; i++ {
region := &pdpb.RegionHeartbeatRequest{
Header: header(),
Region: &metapb.Region{
Id: id,
StartKey: newStartKey(id, keyLen),
EndKey: newEndKey(id, keyLen),
StartKey: codec.GenerateTableKey(int64(i)),
EndKey: codec.GenerateTableKey(int64(i + 1)),
RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 1},
},
ApproximateSize: bytesUnit,
Expand Down Expand Up @@ -238,25 +229,23 @@ func (rs *Regions) init(cfg *config.Config, options *config.Options) []int {
region.Leader = peers[0]
rs.regions = append(rs.regions, region)
}
}

func (rs *Regions) update(cfg *config.Config, options *config.Options) {
rs.updateRound += 1

// Generate sample index
indexes := make([]int, cfg.RegionCount)
for i := range indexes {
indexes[i] = i
}

return indexes
}

func (rs *Regions) update(cfg *config.Config, options *config.Options, indexes []int) {
rs.updateRound += 1

reportRegions := pick(indexes, cfg.RegionCount, options.GetReportRatio())

reportCount := len(reportRegions)
rs.updateLeader = pick(reportRegions, reportCount, options.GetLeaderUpdateRatio())
rs.updateEpoch = pick(reportRegions, reportCount, options.GetEpochUpdateRatio())
rs.updateSpace = pick(reportRegions, reportCount, options.GetSpaceUpdateRatio())
rs.updateFlow = pick(reportRegions, reportCount, options.GetFlowUpdateRatio())
rs.updateLeader = randomPick(reportRegions, reportCount, options.GetLeaderUpdateRatio())
rs.updateEpoch = randomPick(reportRegions, reportCount, options.GetEpochUpdateRatio())
rs.updateSpace = randomPick(reportRegions, reportCount, options.GetSpaceUpdateRatio())
var (
updatedStatisticsMap = make(map[int]*pdpb.RegionHeartbeatRequest)
awakenRegions []*pdpb.RegionHeartbeatRequest
Expand All @@ -281,13 +270,24 @@ func (rs *Regions) update(cfg *config.Config, options *config.Options, indexes [
// update flow
for _, i := range rs.updateFlow {
region := rs.regions[i]
region.BytesWritten = uint64(bytesUnit * rand.Float64())
region.BytesRead = uint64(bytesUnit * rand.Float64())
region.KeysWritten = uint64(keysUint * rand.Float64())
region.KeysRead = uint64(keysUint * rand.Float64())
region.QueryStats = &pdpb.QueryStats{
Get: uint64(queryUnit * rand.Float64()),
Put: uint64(queryUnit * rand.Float64()),
if region.Leader.StoreId <= uint64(options.GetHotStoreCount()) {
region.BytesWritten = uint64(hotByteUnit * (1 + rand.Float64()))
region.BytesRead = uint64(hotByteUnit * (1 + rand.Float64()))
region.KeysWritten = uint64(hotKeysUint * (1 + rand.Float64()))
region.KeysRead = uint64(hotKeysUint * (1 + rand.Float64()))
region.QueryStats = &pdpb.QueryStats{
Get: uint64(hotQueryUnit * (1 + rand.Float64())),
Put: uint64(hotQueryUnit * (1 + rand.Float64())),
}
} else {
region.BytesWritten = uint64(bytesUnit * rand.Float64())
region.BytesRead = uint64(bytesUnit * rand.Float64())
region.KeysWritten = uint64(keysUint * rand.Float64())
region.KeysRead = uint64(keysUint * rand.Float64())
region.QueryStats = &pdpb.QueryStats{
Get: uint64(queryUnit * rand.Float64()),
Put: uint64(queryUnit * rand.Float64()),
}
}
updatedStatisticsMap[i] = region
}
Expand Down Expand Up @@ -438,13 +438,17 @@ func (s *Stores) update(rs *Regions) {
}
}

func pick(slice []int, total int, ratio float64) []int {
func randomPick(slice []int, total int, ratio float64) []int {
rand.Shuffle(total, func(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
})
return append(slice[:0:0], slice[0:int(float64(total)*ratio)]...)
}

func pick(slice []int, total int, ratio float64) []int {
return append(slice[:0:0], slice[0:int(float64(total)*ratio)]...)
}

func main() {
rand.New(rand.NewSource(0)) // Ensure consistent behavior multiple times
cfg := config.NewConfig()
Expand Down Expand Up @@ -490,7 +494,7 @@ func main() {
initClusterID(ctx, cli)
go runHTTPServer(cfg, options)
regions := new(Regions)
indexes := regions.init(cfg, options)
regions.init(cfg, options)
log.Info("finish init regions")
stores := newStores(cfg.StoreCount)
stores.update(regions)
Expand Down Expand Up @@ -533,7 +537,7 @@ func main() {
zap.String("rps", fmt.Sprintf("%.4f", stats.RPS)),
)
log.Info("store heartbeat stats", zap.String("max", fmt.Sprintf("%.4fs", since)))
regions.update(cfg, options, indexes)
regions.update(cfg, options)
go stores.update(regions) // update stores in background, unusually region heartbeat is slower than store update.
case <-ctx.Done():
log.Info("got signal to exit")
Expand Down Expand Up @@ -599,6 +603,7 @@ func runHTTPServer(cfg *config.Config, options *config.Options) {
pprof.Register(engine)
engine.PUT("config", func(c *gin.Context) {
newCfg := cfg.Clone()
newCfg.HotStoreCount = options.GetHotStoreCount()
newCfg.FlowUpdateRatio = options.GetFlowUpdateRatio()
newCfg.LeaderUpdateRatio = options.GetLeaderUpdateRatio()
newCfg.EpochUpdateRatio = options.GetEpochUpdateRatio()
Expand All @@ -617,6 +622,7 @@ func runHTTPServer(cfg *config.Config, options *config.Options) {
})
engine.GET("config", func(c *gin.Context) {
output := cfg.Clone()
output.HotStoreCount = options.GetHotStoreCount()
output.FlowUpdateRatio = options.GetFlowUpdateRatio()
output.LeaderUpdateRatio = options.GetLeaderUpdateRatio()
output.EpochUpdateRatio = options.GetEpochUpdateRatio()
Expand Down

0 comments on commit 0e4e8fb

Please sign in to comment.