Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Sep 18, 2023
1 parent 06fce54 commit bf547b5
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 129 deletions.
65 changes: 65 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cluster

import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/statistics"
)

// Cluster provides an overview of a cluster's basic information.
type Cluster interface {
GetHotStat() *statistics.HotStat
GetRegionStats() *statistics.RegionStatistics
GetLabelStats() *statistics.LabelStatistics
GetCoordinator() *schedule.Coordinator
GetRuleManager() *placement.RuleManager
}

// HandleStatsAsync handles the flow asynchronously.
func HandleStatsAsync(c Cluster, region *core.RegionInfo) {
c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval)
c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region))
}
c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region)
}

// HandleOverlaps handles the overlap regions.
func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) {
for _, item := range overlaps {
if c.GetRegionStats() != nil {
c.GetRegionStats().ClearDefunctRegion(item.GetID())
}
c.GetLabelStats().ClearDefunctRegion(item.GetID())
c.GetRuleManager().InvalidCache(item.GetID())
}
}

// Collect collects the cluster information.
func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats, isNew, isPrepared bool) {
if hasRegionStats {
c.GetRegionStats().Observe(region, stores)
}
if !isPrepared && isNew {
c.GetCoordinator().GetPrepareChecker().Collect(region)
}
}
83 changes: 30 additions & 53 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/logutil"
Expand Down Expand Up @@ -144,59 +143,31 @@ const (
InitClusterRegionThreshold = 100
)

// RegionFromHeartbeat constructs a Region from region heartbeat.
func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo {
// Convert unit to MB.
// If region isn't empty and less than 1MB, use 1MB instead.
regionSize := heartbeat.GetApproximateSize() / units.MiB
// Due to https://github.com/tikv/tikv/pull/11170, if region size is not initialized,
// approximate size will be zero, and region size is zero not EmptyRegionApproximateSize
if heartbeat.GetApproximateSize() > 0 && regionSize < EmptyRegionApproximateSize {
regionSize = EmptyRegionApproximateSize
}
regionKvSize := heartbeat.GetApproximateKvSize() / units.MiB

region := &RegionInfo{
term: heartbeat.GetTerm(),
meta: heartbeat.GetRegion(),
leader: heartbeat.GetLeader(),
downPeers: heartbeat.GetDownPeers(),
pendingPeers: heartbeat.GetPendingPeers(),
cpuUsage: heartbeat.GetCpuUsage(),
writtenBytes: heartbeat.GetBytesWritten(),
writtenKeys: heartbeat.GetKeysWritten(),
readBytes: heartbeat.GetBytesRead(),
readKeys: heartbeat.GetKeysRead(),
approximateSize: int64(regionSize),
approximateKvSize: int64(regionKvSize),
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
replicationStatus: heartbeat.GetReplicationStatus(),
queryStats: heartbeat.GetQueryStats(),
}

for _, opt := range opts {
opt(region)
}

if region.writtenKeys >= ImpossibleFlowSize || region.writtenBytes >= ImpossibleFlowSize {
region.writtenKeys = 0
region.writtenBytes = 0
}
if region.readKeys >= ImpossibleFlowSize || region.readBytes >= ImpossibleFlowSize {
region.readKeys = 0
region.readBytes = 0
}

sort.Sort(peerStatsSlice(region.downPeers))
sort.Sort(peerSlice(region.pendingPeers))

classifyVoterAndLearner(region)
return region
// RegionHeartbeatResponse is the interface for region heartbeat response.
type RegionHeartbeatResponse interface {
GetTargetPeer() *metapb.Peer
GetRegionId() uint64
}

// RegionHeartbeatRequest is the interface for region heartbeat request.
type RegionHeartbeatRequest interface {
GetTerm() uint64
GetRegion() *metapb.Region
GetLeader() *metapb.Peer
GetDownPeers() []*pdpb.PeerStats
GetPendingPeers() []*metapb.Peer
GetBytesWritten() uint64
GetKeysWritten() uint64
GetBytesRead() uint64
GetKeysRead() uint64
GetInterval() *pdpb.TimeInterval
GetQueryStats() *pdpb.QueryStats
GetApproximateSize() uint64
GetApproximateKeys() uint64
}

// RegionFromForward constructs a Region from forwarded region heartbeat.
func RegionFromForward(heartbeat *schedulingpb.RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo {
// RegionFromHeartbeat constructs a Region from region heartbeat.
func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo {
// Convert unit to MB.
// If region isn't empty and less than 1MB, use 1MB instead.
regionSize := heartbeat.GetApproximateSize() / units.MiB
Expand All @@ -212,7 +183,6 @@ func RegionFromForward(heartbeat *schedulingpb.RegionHeartbeatRequest, opts ...R
leader: heartbeat.GetLeader(),
downPeers: heartbeat.GetDownPeers(),
pendingPeers: heartbeat.GetPendingPeers(),
cpuUsage: heartbeat.GetCpuUsage(),
writtenBytes: heartbeat.GetBytesWritten(),
writtenKeys: heartbeat.GetKeysWritten(),
readBytes: heartbeat.GetBytesRead(),
Expand All @@ -223,6 +193,13 @@ func RegionFromForward(heartbeat *schedulingpb.RegionHeartbeatRequest, opts ...R
queryStats: heartbeat.GetQueryStats(),
}

// scheduling service doesn't need the following fields.
if h, ok := heartbeat.(*pdpb.RegionHeartbeatRequest); ok {
region.approximateKvSize = int64(h.GetApproximateKvSize() / units.MiB)
region.replicationStatus = h.GetReplicationStatus()
region.cpuUsage = h.GetCpuUsage()
}

for _, opt := range opts {
opt(region)
}
Expand Down
49 changes: 22 additions & 27 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/cluster"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
Expand Down Expand Up @@ -39,7 +40,7 @@ type Cluster struct {
ruleManager *placement.RuleManager
labelerManager *labeler.RegionLabeler
regionStats *statistics.RegionStatistics
labelLevelStats *statistics.LabelStatistics
labelStats *statistics.LabelStatistics
hotStat *statistics.HotStat
storage storage.Storage
coordinator *schedule.Coordinator
Expand Down Expand Up @@ -67,7 +68,7 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig,
labelerManager: labelerManager,
persistConfig: persistConfig,
hotStat: statistics.NewHotStat(ctx),
labelLevelStats: statistics.NewLabelStatistics(),
labelStats: statistics.NewLabelStatistics(),
regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager),
storage: storage,
clusterID: clusterID,
Expand All @@ -87,6 +88,21 @@ func (c *Cluster) GetCoordinator() *schedule.Coordinator {
return c.coordinator
}

// GetHotStat gets hot stat.
func (c *Cluster) GetHotStat() *statistics.HotStat {
return c.hotStat
}

// GetRegionStats gets region statistics.
func (c *Cluster) GetRegionStats() *statistics.RegionStatistics {
return c.regionStats
}

// GetLabelStats gets label statistics.
func (c *Cluster) GetLabelStats() *statistics.LabelStatistics {
return c.labelStats

Check warning on line 103 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L103

Added line #L103 was not covered by tests
}

// GetBasicCluster returns the basic cluster.
func (c *Cluster) GetBasicCluster() *core.BasicCluster {
return c.BasicCluster
Expand Down Expand Up @@ -287,7 +303,7 @@ func (c *Cluster) waitSchedulersInitialized() {
// UpdateRegionsLabelLevelStats updates the status of the region label level by types.
func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {
for _, region := range regions {
c.labelLevelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels())
c.labelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels())
}
}

Expand Down Expand Up @@ -410,15 +426,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
region.InheritBuckets(origin)

Check warning on line 426 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L426

Added line #L426 was not covered by tests
}

c.hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
c.hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval)
c.hotStat.CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region))
}
c.coordinator.GetSchedulersController().CheckTransferWitnessLeader(region)
cluster.HandleStatsAsync(c, region)

hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
Expand All @@ -444,23 +452,10 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
return err

Check warning on line 452 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L452

Added line #L452 was not covered by tests
}

for _, item := range overlaps {
if c.regionStats != nil {
c.regionStats.ClearDefunctRegion(item.GetID())
}
c.labelLevelStats.ClearDefunctRegion(item.GetID())
c.ruleManager.InvalidCache(item.GetID())
}
}

if hasRegionStats {
c.regionStats.Observe(region, c.GetRegionStores(region))
}

if !c.IsPrepared() && isNew {
c.coordinator.GetPrepareChecker().Collect(region)
cluster.HandleOverlaps(c, overlaps)
}

cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared())
return nil
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -81,7 +80,7 @@ type heartbeatServer struct {
closed int32
}

func (s *heartbeatServer) Send(m hbstream.RegionHeartbeatResponse) error {
func (s *heartbeatServer) Send(m core.RegionHeartbeatResponse) error {
if atomic.LoadInt32(&s.closed) == 1 {
return io.EOF

Check warning on line 85 in pkg/mcs/scheduling/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/grpc_service.go#L84-L85

Added lines #L84 - L85 were not covered by tests
}
Expand Down Expand Up @@ -162,7 +161,7 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat
s.hbStreams.BindStream(storeID, server)
lastBind = time.Now()
}
region := core.RegionFromForward(request, core.SetFromHeartbeat(true))
region := core.RegionFromHeartbeat(request, core.SetFromHeartbeat(true))
err = c.HandleRegionHeartbeat(region)
if err != nil {
// TODO: if we need to send the error back to API server.
Expand Down
8 changes: 4 additions & 4 deletions pkg/mock/mockhbstream/mockhbstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ import (

// HeartbeatStream is used to mock HeartbeatStream for test use.
type HeartbeatStream struct {
ch chan hbstream.RegionHeartbeatResponse
ch chan core.RegionHeartbeatResponse
}

// NewHeartbeatStream creates a new HeartbeatStream.
func NewHeartbeatStream() HeartbeatStream {
return HeartbeatStream{
ch: make(chan hbstream.RegionHeartbeatResponse),
ch: make(chan core.RegionHeartbeatResponse),
}
}

// Send mocks method.
func (s HeartbeatStream) Send(m hbstream.RegionHeartbeatResponse) error {
func (s HeartbeatStream) Send(m core.RegionHeartbeatResponse) error {
select {
case <-time.After(time.Second):
return errors.New("timeout")
Expand All @@ -52,7 +52,7 @@ func (s HeartbeatStream) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartb
func (s HeartbeatStream) BindStream(storeID uint64, stream hbstream.HeartbeatStream) {}

// Recv mocks method.
func (s HeartbeatStream) Recv() hbstream.RegionHeartbeatResponse {
func (s HeartbeatStream) Recv() core.RegionHeartbeatResponse {
select {
case <-time.After(time.Millisecond * 10):
return nil
Expand Down
16 changes: 5 additions & 11 deletions pkg/schedule/hbstream/heartbeat_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ import (
"go.uber.org/zap"
)

// RegionHeartbeatResponse is the interface for region heartbeat response.
type RegionHeartbeatResponse interface {
GetTargetPeer() *metapb.Peer
GetRegionId() uint64
}

// Operation is detailed scheduling step of a region.
type Operation struct {
ChangePeer *pdpb.ChangePeer
Expand All @@ -52,7 +46,7 @@ type Operation struct {

// HeartbeatStream is an interface.
type HeartbeatStream interface {
Send(RegionHeartbeatResponse) error
Send(core.RegionHeartbeatResponse) error
}

const (
Expand All @@ -72,7 +66,7 @@ type HeartbeatStreams struct {
hbStreamCancel context.CancelFunc
clusterID uint64
streams map[uint64]HeartbeatStream
msgCh chan RegionHeartbeatResponse
msgCh chan core.RegionHeartbeatResponse
streamCh chan streamUpdate
storeInformer core.StoreSetInformer
typ string
Expand All @@ -97,7 +91,7 @@ func newHbStreams(ctx context.Context, clusterID uint64, typ string, storeInform
hbStreamCancel: hbStreamCancel,
clusterID: clusterID,
streams: make(map[uint64]HeartbeatStream),
msgCh: make(chan RegionHeartbeatResponse, heartbeatChanCapacity),
msgCh: make(chan core.RegionHeartbeatResponse, heartbeatChanCapacity),
streamCh: make(chan streamUpdate, 1),
storeInformer: storeInformer,
typ: typ,
Expand All @@ -118,7 +112,7 @@ func (s *HeartbeatStreams) run() {
keepAliveTicker := time.NewTicker(heartbeatStreamKeepAliveInterval)
defer keepAliveTicker.Stop()

var keepAlive RegionHeartbeatResponse
var keepAlive core.RegionHeartbeatResponse
switch s.typ {
case utils.SchedulingServiceName:
keepAlive = &schedulingpb.RegionHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ClusterId: s.clusterID}}
Expand Down Expand Up @@ -208,7 +202,7 @@ func (s *HeartbeatStreams) SendMsg(region *core.RegionInfo, op *Operation) {
}

// TODO: use generic
var resp RegionHeartbeatResponse
var resp core.RegionHeartbeatResponse
switch s.typ {
case utils.SchedulingServiceName:
resp = &schedulingpb.RegionHeartbeatResponse{
Expand Down
Loading

0 comments on commit bf547b5

Please sign in to comment.