Skip to content

Commit

Permalink
Merge branch 'master' into adjust-pending
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch authored Jul 5, 2019
2 parents a6be4d6 + c7052f6 commit 309e874
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 32 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ build: pd-server pd-ctl pd-tso-bench pd-recover
pd-server: export GO111MODULE=on
pd-server:
ifeq ("$(WITH_RACE)", "1")
CGO_ENABLED=1 go build -race -ldflags '$(LDFLAGS)' -o bin/pd-server cmd/pd-server/main.go
CGO_ENABLED=1 go build -race -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o bin/pd-server cmd/pd-server/main.go
else
CGO_ENABLED=0 go build -ldflags '$(LDFLAGS)' -o bin/pd-server cmd/pd-server/main.go
CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o bin/pd-server cmd/pd-server/main.go
endif

pd-ctl: export GO111MODULE=on
pd-ctl:
CGO_ENABLED=0 go build -ldflags '$(LDFLAGS)' -o bin/pd-ctl tools/pd-ctl/main.go
CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o bin/pd-ctl tools/pd-ctl/main.go
pd-tso-bench: export GO111MODULE=on
pd-tso-bench:
CGO_ENABLED=0 go build -o bin/pd-tso-bench tools/pd-tso-bench/main.go
Expand Down
1 change: 1 addition & 0 deletions conf/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ leader-schedule-limit = 4
region-schedule-limit = 64
replica-schedule-limit = 64
merge-schedule-limit = 8
hot-region-schedule-limit = 4
#tolerant-size-ratio = 0.0
#enable-one-way-merge = false

Expand Down
6 changes: 3 additions & 3 deletions pkg/mock/mockoption/mockoption.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ const (
defaultSplitMergeInterval = 0
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 4
defaultRegionScheduleLimit = 4
defaultReplicaScheduleLimit = 8
defaultRegionScheduleLimit = 64
defaultReplicaScheduleLimit = 64
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 2
defaultHotRegionScheduleLimit = 4
defaultStoreBalanceRate = 60
defaultTolerantSizeRatio = 2.5
defaultLowSpaceRatio = 0.8
Expand Down
45 changes: 45 additions & 0 deletions server/checker/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,23 @@
package checker

import (
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/pkg/mock/mockcluster"
"github.com/pingcap/pd/pkg/mock/mockhbstream"
"github.com/pingcap/pd/pkg/mock/mockoption"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
"github.com/pingcap/pd/server/schedule"
)

func TestChecker(t *testing.T) {
TestingT(t)
}

var _ = Suite(&testMergeCheckerSuite{})

type testMergeCheckerSuite struct {
Expand Down Expand Up @@ -258,3 +264,42 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
},
})
}

func (s *testMergeCheckerSuite) TestStorelimit(c *C) {
oc := schedule.NewOperatorController(s.cluster, mockhbstream.NewHeartbeatStream())
s.cluster.ScheduleOptions.SplitMergeInterval = time.Hour
s.cluster.ScheduleOptions.StoreBalanceRate = 60
s.regions[2] = s.regions[2].Clone(
core.SetPeers([]*metapb.Peer{
{Id: 109, StoreId: 2},
{Id: 110, StoreId: 3},
{Id: 111, StoreId: 6},
}),
core.WithLeader(&metapb.Peer{Id: 109, StoreId: 2}),
)
s.cluster.PutRegion(s.regions[2])
ops := s.mc.Check(s.regions[2])
c.Assert(ops, NotNil)
// The size of Region is less or equal than 1MB.
for i := 0; i < 50; i++ {
c.Assert(oc.AddOperator(ops...), IsTrue)
for _, op := range ops {
oc.RemoveOperator(op)
}
}
s.regions[2] = s.regions[2].Clone(
core.SetApproximateSize(2),
core.SetApproximateKeys(2),
)
s.cluster.PutRegion(s.regions[2])
ops = s.mc.Check(s.regions[2])
c.Assert(ops, NotNil)
// The size of Region is more than 1MB but no more than 20MB.
for i := 0; i < 5; i++ {
c.Assert(oc.AddOperator(ops...), IsTrue)
for _, op := range ops {
oc.RemoveOperator(op)
}
}
c.Assert(oc.AddOperator(ops...), IsFalse)
}
2 changes: 1 addition & 1 deletion server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ func (c *RaftCluster) GetAdjacentRegions(region *core.RegionInfo) (*core.RegionI
// UpdateStoreLabels updates a store's location labels.
func (c *RaftCluster) UpdateStoreLabels(storeID uint64, labels []*metapb.StoreLabel) error {
c.RLock()
defer c.RUnlock()
store := c.cachedCluster.GetStore(storeID)
c.RUnlock()
if store == nil {
return errors.Errorf("invalid store ID %d, not found", storeID)
}
Expand Down
4 changes: 2 additions & 2 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,11 +576,11 @@ const (
defaultSplitMergeInterval = 1 * time.Hour
defaultPatrolRegionInterval = 100 * time.Millisecond
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 8
defaultLeaderScheduleLimit = 4
defaultRegionScheduleLimit = 64
defaultReplicaScheduleLimit = 64
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 2
defaultHotRegionScheduleLimit = 4
defaultStoreBalanceRate = 15
defaultTolerantSizeRatio = 0
defaultLowSpaceRatio = 0.8
Expand Down
23 changes: 19 additions & 4 deletions server/schedule/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ const (
RegionOperatorWaitTime = 10 * time.Minute
// RegionInfluence represents the influence of a operator step, which is used by ratelimit.
RegionInfluence int64 = 1000
// smallRegionInfluence represents the influence of a operator step
// when the region size is smaller than smallRegionThreshold, which is used by ratelimit.
smallRegionInfluence int64 = 200
// smallRegionThreshold is used to represent a region which can be regarded as a small region once the size is small than it.
smallRegionThreshold int64 = 20
)

// OperatorStep describes the basic scheduling steps that can not be subdivided.
Expand Down Expand Up @@ -98,9 +103,14 @@ func (ap AddPeer) IsFinish(region *core.RegionInfo) bool {
func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to := opInfluence.GetStoreInfluence(ap.ToStore)

to.RegionSize += region.GetApproximateSize()
regionSize := region.GetApproximateSize()
to.RegionSize += regionSize
to.RegionCount++
to.StepCost += RegionInfluence
if regionSize > smallRegionThreshold {
to.StepCost += RegionInfluence
} else if regionSize <= smallRegionThreshold && regionSize > core.EmptyRegionApproximateSize {
to.StepCost += smallRegionInfluence
}
}

// AddLearner is an OperatorStep that adds a region learner peer.
Expand Down Expand Up @@ -128,9 +138,14 @@ func (al AddLearner) IsFinish(region *core.RegionInfo) bool {
func (al AddLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to := opInfluence.GetStoreInfluence(al.ToStore)

to.RegionSize += region.GetApproximateSize()
regionSize := region.GetApproximateSize()
to.RegionSize += regionSize
to.RegionCount++
to.StepCost += RegionInfluence
if regionSize > smallRegionThreshold {
to.StepCost += RegionInfluence
} else if regionSize <= smallRegionThreshold && regionSize > core.EmptyRegionApproximateSize {
to.StepCost += smallRegionInfluence
}
}

// PromoteLearner is an OperatorStep that promotes a region learner peer to normal voter.
Expand Down
34 changes: 17 additions & 17 deletions server/schedule/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *testOperatorSuite) newTestRegion(regionID uint64, leaderPeer uint64, pe
leader = peer
}
}
regionInfo := core.NewRegionInfo(&region, leader, core.SetApproximateSize(10), core.SetApproximateKeys(10))
regionInfo := core.NewRegionInfo(&region, leader, core.SetApproximateSize(50), core.SetApproximateKeys(50))
return regionInfo
}

Expand Down Expand Up @@ -124,71 +124,71 @@ func (s *testOperatorSuite) TestInfluence(c *C) {
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 0,
LeaderCount: 0,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

TransferLeader{FromStore: 1, ToStore: 2}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -1,
RegionSize: 0,
RegionCount: 0,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

RemovePeer{FromStore: 1}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -1,
RegionSize: -10,
RegionSize: -50,
RegionCount: -1,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

MergeRegion{IsPassive: false}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -1,
RegionSize: -10,
RegionSize: -50,
RegionCount: -1,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

MergeRegion{IsPassive: true}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -2,
RegionSize: -10,
RegionSize: -50,
RegionCount: -2,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 0,
StepCost: 1000,
})
Expand Down
4 changes: 2 additions & 2 deletions server/schedule/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewBalanceSelector(kind core.ResourceKind, filters []Filter) *BalanceSelect
}
}

// SelectSource selects the store that can pass all filters and has the minimal
// SelectSource selects the store that can pass all filters and has the maximal
// resource score.
func (s *BalanceSelector) SelectSource(opt Options, stores []*core.StoreInfo) *core.StoreInfo {
var result *core.StoreInfo
Expand All @@ -51,7 +51,7 @@ func (s *BalanceSelector) SelectSource(opt Options, stores []*core.StoreInfo) *c
return result
}

// SelectTarget selects the store that can pass all filters and has the maximal
// SelectTarget selects the store that can pass all filters and has the minimal
// resource score.
func (s *BalanceSelector) SelectTarget(opt Options, stores []*core.StoreInfo, filters ...Filter) *core.StoreInfo {
filters = append(filters, s.filters...)
Expand Down
17 changes: 17 additions & 0 deletions server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,29 @@ func (s *balanceRegionScheduler) hasPotentialTarget(cluster schedule.Cluster, re

for _, store := range cluster.GetStores() {
if schedule.FilterTarget(cluster, store, filters) {
log.Debug("skip target store by filters",
zap.String("scheduler", s.GetName()),
zap.Uint64("region", region.GetID()),
zap.Uint64("source", source.GetID()),
zap.Uint64("target", store.GetID()))
continue
}
if !store.IsUp() || store.DownTime() > cluster.GetMaxStoreDownTime() {
log.Debug("skip target store by status",
zap.String("scheduler", s.GetName()),
zap.Uint64("region", region.GetID()),
zap.Uint64("source", source.GetID()),
zap.Uint64("target", store.GetID()),
zap.Bool("isup", store.IsUp()),
zap.Duration("downtime", store.DownTime()))
continue
}
if !shouldBalance(cluster, source, store, region, core.RegionKind, opInfluence) {
log.Debug("skip target store for it should not balance",
zap.String("scheduler", s.GetName()),
zap.Uint64("region", region.GetID()),
zap.Uint64("source", source.GetID()),
zap.Uint64("target", store.GetID()))
continue
}
return true
Expand Down
Loading

0 comments on commit 309e874

Please sign in to comment.