Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: operator's timeout depends on steps #4548

Merged
merged 33 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8e8b375
operator add region size properties and close #4457
bufferflies Dec 14, 2021
98e6c9a
lint check
bufferflies Dec 14, 2021
ff4e389
adds timeoutfactor property to operator struct, adds max-region-size …
bufferflies Jan 5, 2022
de8f984
merge master
bufferflies Jan 5, 2022
59a0dff
pass ut
bufferflies Jan 5, 2022
b792225
replace max-region-size
bufferflies Jan 13, 2022
33933f9
resolved confict
bufferflies Jan 24, 2022
8a5a203
check all in step
bufferflies Jan 24, 2022
155a53d
remove unnecessary change
bufferflies Jan 24, 2022
0a9d9cc
remove unnecessary change
bufferflies Jan 24, 2022
b5413d6
resolved conflict
bufferflies Jan 24, 2022
62340cf
resolved conflict
bufferflies Jan 24, 2022
1c7bdc8
resolved conflict
bufferflies Jan 25, 2022
1a796f9
rename TimeOut to timeout
bufferflies Jan 25, 2022
bb427c8
add and pass unit test
bufferflies Jan 25, 2022
2a10af6
dead lock
bufferflies Jan 25, 2022
99488c0
remove sleep
bufferflies Feb 8, 2022
685b900
merge timeout
bufferflies Feb 10, 2022
bfe9788
remove config changes
bufferflies Feb 11, 2022
be1c56b
update deadlock
bufferflies Feb 11, 2022
8395b99
remove prepare lock
bufferflies Feb 11, 2022
22dfc3f
move executor rate to operator control
bufferflies Feb 15, 2022
e696959
operator remove executor rate
bufferflies Feb 15, 2022
76f35d2
Merge branch 'master' into feature/operator_factor
bufferflies Feb 16, 2022
e07edf6
address comment
bufferflies Feb 16, 2022
d2906f1
Merge branch 'feature/operator_factor' of github.com:bufferflies/pd i…
bufferflies Feb 16, 2022
1866b51
remove operator executor rate
bufferflies Feb 16, 2022
55ac82a
resolv conflict
bufferflies Feb 17, 2022
38cdf2e
change timeout signature
bufferflies Feb 18, 2022
ceee2b7
address comment
bufferflies Feb 18, 2022
e74e2f7
change comment
bufferflies Feb 23, 2022
d57e796
Merge branch 'master' into feature/operator_factor
bufferflies Feb 24, 2022
21cebb9
Merge branch 'master' into feature/operator_factor
ti-chi-bot Mar 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
)

func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind operator.OpKind, steps ...operator.OpStep) *operator.Operator {
return operator.NewOperator("test", "test", regionID, regionEpoch, kind, steps...)
return operator.NewTestOperator("test", "test", regionID, regionEpoch, kind, steps...)
}

func (c *testCluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {
Expand Down
17 changes: 17 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,15 @@ type ScheduleConfig struct {

// The day of hot regions data to be reserved. 0 means close.
HotRegionsReservedDays int64 `toml:"hot-regions-reserved-days" json:"hot-regions-reserved-days"`

// OperatorTimeFactor is the time factor for operator.
// The max duration of one operator step is: multi(region_size, factor)
// Default: 6 s/MB
OperatorTimeFactor uint64 `toml:"operator-time-factor" json:"operator-time-factor"`
bufferflies marked this conversation as resolved.
Show resolved Hide resolved

// MaxRegionSize is the max size of region.
// default: 96MB
MaxRegionSize uint64 `toml:"max-region-size" json:"max-region-size"`
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
}

// Clone returns a cloned scheduling configuration.
Expand Down Expand Up @@ -787,6 +796,8 @@ const (
defaultEnableCrossTableMerge = true
defaultHotRegionsWriteInterval = 10 * time.Minute
defaultHotRegionsReservedDays = 7
defaultOperatorTimeFactor = 6
defaultMaxRegionSize = 96
)

func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error {
Expand Down Expand Up @@ -841,6 +852,12 @@ func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error {
if !meta.IsDefined("enable-cross-table-merge") {
c.EnableCrossTableMerge = defaultEnableCrossTableMerge
}
if !meta.IsDefined("operator-time-factor") {
adjustUint64(&c.OperatorTimeFactor, defaultOperatorTimeFactor)
}
if !meta.IsDefined("max-region-size") {
adjustUint64(&c.MaxRegionSize, defaultMaxRegionSize)
}
adjustFloat64(&c.LowSpaceRatio, defaultLowSpaceRatio)
adjustFloat64(&c.HighSpaceRatio, defaultHighSpaceRatio)

Expand Down
12 changes: 12 additions & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ const (
hotRegionScheduleLimitKey = "schedule.hot-region-schedule-limit"
schedulerMaxWaitingOperatorKey = "schedule.scheduler-max-waiting-operator"
enableLocationReplacement = "schedule.enable-location-replacement"
operatorTimeFactorKey = "schedule.operator-time-factor"
maxRegionSizeKey = "schedule.max-region-size"
)

var supportedTTLConfigs = []string{
Expand Down Expand Up @@ -245,6 +247,16 @@ func (o *PersistOptions) GetSplitMergeInterval() time.Duration {
return o.GetScheduleConfig().SplitMergeInterval.Duration
}

// GetOperatorTimeFactor returns the operator time factor.
func (o *PersistOptions) GetOperatorTimeFactor() uint64 {
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
return o.getTTLUintOr(operatorTimeFactorKey, o.GetScheduleConfig().OperatorTimeFactor)
}

// GetMaxRegionSize returns the max size of regions.
func (o *PersistOptions) GetMaxRegionSize() uint64 {
return o.getTTLUintOr(maxRegionSizeKey, o.GetScheduleConfig().MaxRegionSize)
}

// SetSplitMergeInterval to set the interval between finishing split and starting to merge. It's only used to test.
func (o *PersistOptions) SetSplitMergeInterval(splitMergeInterval time.Duration) {
v := o.GetScheduleConfig().Clone()
Expand Down
4 changes: 2 additions & 2 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,8 +768,8 @@ func (h *Handler) AddSplitRegionOperator(regionID uint64, policyStr string, keys
splitKeys = append(splitKeys, k)
}
}

op, err := operator.CreateSplitRegionOperator("admin-split-region", region, operator.OpAdmin, pdpb.CheckPolicy(policy), splitKeys)
factor := h.opt.GetOperatorTimeFactor()
op, err := operator.CreateSplitRegionOperator("admin-split-region", region, operator.OpAdmin, pdpb.CheckPolicy(policy), splitKeys, factor)
if err != nil {
return err
}
Expand Down
11 changes: 9 additions & 2 deletions server/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ import (
"github.com/tikv/pd/server/schedule/placement"
)

const maxTargetRegionSize = 500
const (
maxTargetRegionSize = 500
maxRegionFactor = 5
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
)

// When a region has label `merge_option=deny`, skip merging the region.
// If label value is `allow` or other value, it will be treated as `allow`.
Expand Down Expand Up @@ -146,7 +149,11 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {
return nil
}

if target.GetApproximateSize() > maxTargetRegionSize {
maxSize := int64(maxRegionFactor * m.cluster.GetOpts().GetMaxRegionSize())
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
if maxSize < maxTargetRegionSize {
maxSize = maxTargetRegionSize
}
if target.GetApproximateSize() > maxSize {
checkerCounter.WithLabelValues("merge_checker", "target-too-large").Inc()
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/checker/split_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *SplitChecker) Check(region *core.RegionInfo) *operator.Operator {
return nil
}

op, err := operator.CreateSplitRegionOperator(desc, region, 0, pdpb.CheckPolicy_USEKEY, keys)
op, err := operator.CreateSplitRegionOperator(desc, region, 0, pdpb.CheckPolicy_USEKEY, keys, c.cluster.GetOpts().GetOperatorTimeFactor())
if err != nil {
log.Debug("create split region operator failed", errs.ZapError(err))
return nil
Expand Down
24 changes: 13 additions & 11 deletions server/schedule/operator/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ import (
// according to various constraints.
type Builder struct {
// basic info
desc string
cluster opt.Cluster
regionID uint64
regionEpoch *metapb.RegionEpoch
rules []*placement.Rule
expectedRoles map[uint64]placement.PeerRoleType
desc string
cluster opt.Cluster
regionID uint64
regionEpoch *metapb.RegionEpoch
rules []*placement.Rule
expectedRoles map[uint64]placement.PeerRoleType
approximateSize int64

// operation record
originPeers peersMap
Expand Down Expand Up @@ -84,10 +85,11 @@ func SkipOriginJointStateCheck(b *Builder) {
// NewBuilder creates a Builder.
func NewBuilder(desc string, cluster opt.Cluster, region *core.RegionInfo, opts ...BuilderOption) *Builder {
b := &Builder{
desc: desc,
cluster: cluster,
regionID: region.GetID(),
regionEpoch: region.GetRegionEpoch(),
desc: desc,
cluster: cluster,
regionID: region.GetID(),
regionEpoch: region.GetRegionEpoch(),
approximateSize: region.GetApproximateSize(),
}

// options
Expand Down Expand Up @@ -332,7 +334,7 @@ func (b *Builder) Build(kind OpKind) (*Operator, error) {
return nil, b.err
}

return NewOperator(b.desc, brief, b.regionID, b.regionEpoch, kind, b.steps...), nil
return NewOperator(b.desc, brief, b.regionID, b.regionEpoch, kind, b.approximateSize, b.cluster.GetOpts().GetOperatorTimeFactor(), b.steps...), nil
}

// Initialize intermediate states.
Expand Down
11 changes: 6 additions & 5 deletions server/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func CreateMoveLeaderOperator(desc string, cluster opt.Cluster, region *core.Reg
}

// CreateSplitRegionOperator creates an operator that splits a region.
func CreateSplitRegionOperator(desc string, region *core.RegionInfo, kind OpKind, policy pdpb.CheckPolicy, keys [][]byte) (*Operator, error) {
func CreateSplitRegionOperator(desc string, region *core.RegionInfo, kind OpKind, policy pdpb.CheckPolicy, keys [][]byte, factor uint64) (*Operator, error) {
if core.IsInJointState(region.GetPeers()...) {
return nil, errors.Errorf("cannot split region which is in joint state")
}
Expand All @@ -133,7 +133,7 @@ func CreateSplitRegionOperator(desc string, region *core.RegionInfo, kind OpKind
}
brief += fmt.Sprintf(" and keys %v", hexKeys)
}
return NewOperator(desc, brief, region.GetID(), region.GetRegionEpoch(), kind|OpSplit, step), nil
return NewOperator(desc, brief, region.GetID(), region.GetRegionEpoch(), kind|OpSplit, region.GetApproximateSize(), factor, step), nil
}

// CreateMergeRegionOperator creates an operator that merge two region into one.
Expand Down Expand Up @@ -169,8 +169,9 @@ func CreateMergeRegionOperator(desc string, cluster opt.Cluster, source *core.Re
})

brief := fmt.Sprintf("merge: region %v to %v", source.GetID(), target.GetID())
op1 := NewOperator(desc, brief, source.GetID(), source.GetRegionEpoch(), kind|OpMerge, steps...)
op2 := NewOperator(desc, brief, target.GetID(), target.GetRegionEpoch(), kind|OpMerge, MergeRegion{
factor := cluster.GetOpts().GetOperatorTimeFactor()
op1 := NewOperator(desc, brief, source.GetID(), source.GetRegionEpoch(), kind|OpMerge, source.GetApproximateSize(), factor, steps...)
op2 := NewOperator(desc, brief, target.GetID(), target.GetRegionEpoch(), kind|OpMerge, target.GetApproximateSize(), factor, MergeRegion{
FromRegion: source.GetMeta(),
ToRegion: target.GetMeta(),
IsPassive: true,
Expand Down Expand Up @@ -274,5 +275,5 @@ func CreateLeaveJointStateOperator(desc string, cluster opt.Cluster, origin *cor
}

b.execChangePeerV2(false, true)
return NewOperator(b.desc, brief, b.regionID, b.regionEpoch, kind, b.steps...), nil
return NewOperator(b.desc, brief, b.regionID, b.regionEpoch, kind, origin.GetApproximateSize(), cluster.GetOpts().GetOperatorTimeFactor(), b.steps...), nil
}
2 changes: 1 addition & 1 deletion server/schedule/operator/create_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (s *testCreateOperatorSuite) TestCreateSplitRegionOperator(c *C) {
EndKey: tc.endKey,
Peers: tc.originPeers,
}, tc.originPeers[0])
op, err := CreateSplitRegionOperator("test", region, 0, tc.policy, tc.keys)
op, err := CreateSplitRegionOperator("test", region, 0, tc.policy, tc.keys, 6)
if tc.expectedError {
c.Assert(err, NotNil)
continue
Expand Down
21 changes: 19 additions & 2 deletions server/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ type Operator struct {
Counters []prometheus.Counter
FinishedCounters []prometheus.Counter
AdditionalInfos map[string]string
ApproximateSize int64
TimeFactor uint64
}

// NewOperator creates a new operator.
func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.RegionEpoch, kind OpKind, steps ...OpStep) *Operator {
func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.RegionEpoch, kind OpKind, approximateSize int64, factor uint64, steps ...OpStep) *Operator {
level := core.NormalPriority
if kind&OpAdmin != 0 {
level = core.HighPriority
Expand All @@ -73,6 +75,8 @@ func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.Region
status: NewOpStatusTracker(),
level: level,
AdditionalInfos: make(map[string]string),
ApproximateSize: approximateSize,
TimeFactor: factor,
}
}

Expand All @@ -81,7 +85,9 @@ func (o *Operator) String() string {
for i := range o.steps {
stepStrs[i] = o.steps[i].String()
}
s := fmt.Sprintf("%s {%s} (kind:%s, region:%v(%v,%v), createAt:%s, startAt:%s, currentStep:%v, steps:[%s])", o.desc, o.brief, o.kind, o.regionID, o.regionEpoch.GetVersion(), o.regionEpoch.GetConfVer(), o.GetCreateTime(), o.GetStartTime(), atomic.LoadInt32(&o.currentStep), strings.Join(stepStrs, ", "))
s := fmt.Sprintf("%s {%s} (kind:%s, region:%v(%v,%v), createAt:%s, startAt:%s, currentStep:%v, size:%v,steps:[%s])",
o.desc, o.brief, o.kind, o.regionID, o.regionEpoch.GetVersion(), o.regionEpoch.GetConfVer(), o.GetCreateTime(),
o.GetStartTime(), atomic.LoadInt32(&o.currentStep), o.ApproximateSize, strings.Join(stepStrs, ", "))
if o.CheckSuccess() {
s += " finished"
}
Expand Down Expand Up @@ -363,3 +369,14 @@ func (o *Operator) GetAdditionalInfo() string {
}
return ""
}

// mock region default region size is 96Mb.
const (
mockRegionSize = 96 * (1 << 20)
mockFactor = 6
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
)

// NewTestOperator creates a test operator.
func NewTestOperator(desc, brief string, regionID uint64, regionEpoch *metapb.RegionEpoch, kind OpKind, steps ...OpStep) *Operator {
return NewOperator(desc, brief, regionID, regionEpoch, kind, mockRegionSize, mockFactor, steps...)
}
2 changes: 1 addition & 1 deletion server/schedule/operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *testOperatorSuite) TestOperatorStep(c *C) {

//nolint
func (s *testOperatorSuite) newTestOperator(regionID uint64, kind OpKind, steps ...OpStep) *Operator {
return NewOperator("test", "test", regionID, &metapb.RegionEpoch{}, kind, steps...)
return NewTestOperator("test", "test", regionID, &metapb.RegionEpoch{}, kind, steps...)
}

func (s *testOperatorSuite) checkSteps(c *C, op *Operator, steps []OpStep) {
Expand Down
Loading