Skip to content

Commit

Permalink
mcs: forward region requests to scheduling server (tikv#7023)
Browse files Browse the repository at this point in the history
ref tikv#5839

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Dec 1, 2023
1 parent f327342 commit 643c2c3
Show file tree
Hide file tree
Showing 24 changed files with 673 additions and 206 deletions.
65 changes: 65 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cluster

import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/statistics"
)

// Cluster provides an overview of a cluster's basic information.
type Cluster interface {
GetHotStat() *statistics.HotStat
GetRegionStats() *statistics.RegionStatistics
GetLabelStats() *statistics.LabelStatistics
GetCoordinator() *schedule.Coordinator
GetRuleManager() *placement.RuleManager
}

// HandleStatsAsync handles the flow asynchronously.
func HandleStatsAsync(c Cluster, region *core.RegionInfo) {
c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval)
c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region))
}
c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region)
}

// HandleOverlaps handles the overlap regions.
func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) {
for _, item := range overlaps {
if c.GetRegionStats() != nil {
c.GetRegionStats().ClearDefunctRegion(item.GetID())
}
c.GetLabelStats().ClearDefunctRegion(item.GetID())
c.GetRuleManager().InvalidCache(item.GetID())
}
}

// Collect collects the cluster information.
func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats, isNew, isPrepared bool) {
if hasRegionStats {
c.GetRegionStats().Observe(region, stores)
}
if !isPrepared && isNew {
c.GetCoordinator().GetPrepareChecker().Collect(region)
}
}
62 changes: 44 additions & 18 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,34 +143,60 @@ const (
InitClusterRegionThreshold = 100
)

// RegionHeartbeatResponse is the interface for region heartbeat response.
type RegionHeartbeatResponse interface {
GetTargetPeer() *metapb.Peer
GetRegionId() uint64
}

// RegionHeartbeatRequest is the interface for region heartbeat request.
type RegionHeartbeatRequest interface {
GetTerm() uint64
GetRegion() *metapb.Region
GetLeader() *metapb.Peer
GetDownPeers() []*pdpb.PeerStats
GetPendingPeers() []*metapb.Peer
GetBytesWritten() uint64
GetKeysWritten() uint64
GetBytesRead() uint64
GetKeysRead() uint64
GetInterval() *pdpb.TimeInterval
GetQueryStats() *pdpb.QueryStats
GetApproximateSize() uint64
GetApproximateKeys() uint64
}

// RegionFromHeartbeat constructs a Region from region heartbeat.
func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo {
func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo {
// Convert unit to MB.
// If region isn't empty and less than 1MB, use 1MB instead.
// The size of empty region will be correct by the previous RegionInfo.
regionSize := heartbeat.GetApproximateSize() / units.MiB
if heartbeat.GetApproximateSize() > 0 && regionSize < EmptyRegionApproximateSize {
regionSize = EmptyRegionApproximateSize
}
regionKvSize := heartbeat.GetApproximateKvSize() / units.MiB

region := &RegionInfo{
term: heartbeat.GetTerm(),
meta: heartbeat.GetRegion(),
leader: heartbeat.GetLeader(),
downPeers: heartbeat.GetDownPeers(),
pendingPeers: heartbeat.GetPendingPeers(),
cpuUsage: heartbeat.GetCpuUsage(),
writtenBytes: heartbeat.GetBytesWritten(),
writtenKeys: heartbeat.GetKeysWritten(),
readBytes: heartbeat.GetBytesRead(),
readKeys: heartbeat.GetKeysRead(),
approximateSize: int64(regionSize),
approximateKvSize: int64(regionKvSize),
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
replicationStatus: heartbeat.GetReplicationStatus(),
queryStats: heartbeat.GetQueryStats(),
term: heartbeat.GetTerm(),
meta: heartbeat.GetRegion(),
leader: heartbeat.GetLeader(),
downPeers: heartbeat.GetDownPeers(),
pendingPeers: heartbeat.GetPendingPeers(),
writtenBytes: heartbeat.GetBytesWritten(),
writtenKeys: heartbeat.GetKeysWritten(),
readBytes: heartbeat.GetBytesRead(),
readKeys: heartbeat.GetKeysRead(),
approximateSize: int64(regionSize),
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
queryStats: heartbeat.GetQueryStats(),
}

// scheduling service doesn't need the following fields.
if h, ok := heartbeat.(*pdpb.RegionHeartbeatRequest); ok {
region.approximateKvSize = int64(h.GetApproximateKvSize() / units.MiB)
region.replicationStatus = h.GetReplicationStatus()
region.cpuUsage = h.GetCpuUsage()
}

for _, opt := range opts {
Expand Down
2 changes: 1 addition & 1 deletion pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (manager *Manager) splitKeyspaceRegion(id uint32, waitRegionSplit bool) (er

start := time.Now()
skipRaw := manager.config.GetDisableRawKVRegionSplit()
keyspaceRule := MakeLabelRule(id, skipRaw)
keyspaceRule := makeLabelRule(id, skipRaw)
cl, ok := manager.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler })
if !ok {
return errors.New("cluster does not support region label")
Expand Down
4 changes: 2 additions & 2 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ func getRegionLabelID(id uint32) string {
return regionLabelIDPrefix + strconv.FormatUint(uint64(id), endpoint.SpaceIDBase)
}

// MakeLabelRule makes the label rule for the given keyspace id.
func MakeLabelRule(id uint32, skipRaw bool) *labeler.LabelRule {
// makeLabelRule makes the label rule for the given keyspace id.
func makeLabelRule(id uint32, skipRaw bool) *labeler.LabelRule {
return &labeler.LabelRule{
ID: getRegionLabelID(id),
Index: 0,
Expand Down
2 changes: 1 addition & 1 deletion pkg/keyspace/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,6 @@ func TestMakeLabelRule(t *testing.T) {
},
}
for _, testCase := range testCases {
re.Equal(testCase.expectedLabelRule, MakeLabelRule(testCase.id, testCase.skipRaw))
re.Equal(testCase.expectedLabelRule, makeLabelRule(testCase.id, testCase.skipRaw))
}
}
78 changes: 75 additions & 3 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/cluster"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
"github.com/tikv/pd/pkg/schedule"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/slice"
Expand All @@ -38,7 +40,7 @@ type Cluster struct {
ruleManager *placement.RuleManager
labelerManager *labeler.RegionLabeler
regionStats *statistics.RegionStatistics
labelLevelStats *statistics.LabelStatistics
labelStats *statistics.LabelStatistics
hotStat *statistics.HotStat
storage storage.Storage
coordinator *schedule.Coordinator
Expand Down Expand Up @@ -66,8 +68,8 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig,
labelerManager: labelerManager,
persistConfig: persistConfig,
hotStat: statistics.NewHotStat(ctx),
labelStats: statistics.NewLabelStatistics(),
regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager),
labelLevelStats: statistics.NewLabelStatistics(),
storage: storage,
clusterID: clusterID,
checkMembershipCh: checkMembershipCh,
Expand All @@ -86,6 +88,21 @@ func (c *Cluster) GetCoordinator() *schedule.Coordinator {
return c.coordinator
}

// GetHotStat gets hot stat.
func (c *Cluster) GetHotStat() *statistics.HotStat {
return c.hotStat
}

// GetRegionStats gets region statistics.
func (c *Cluster) GetRegionStats() *statistics.RegionStatistics {
return c.regionStats
}

// GetLabelStats gets label statistics.
func (c *Cluster) GetLabelStats() *statistics.LabelStatistics {
return c.labelStats
}

// GetBasicCluster returns the basic cluster.
func (c *Cluster) GetBasicCluster() *core.BasicCluster {
return c.BasicCluster
Expand Down Expand Up @@ -287,7 +304,7 @@ func (c *Cluster) waitSchedulersInitialized() {
// UpdateRegionsLabelLevelStats updates the status of the region label level by types.
func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {
for _, region := range regions {
c.labelLevelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels())
c.labelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels())
}
}

Expand Down Expand Up @@ -389,3 +406,58 @@ func (c *Cluster) StopBackgroundJobs() {
c.cancel()
c.wg.Wait()
}

// HandleRegionHeartbeat processes RegionInfo reports from client.
func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
if err := c.processRegionHeartbeat(region); err != nil {
return err
}

c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL)
return nil
}

// processRegionHeartbeat updates the region information.
func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
origin, _, err := c.PreCheckPutRegion(region)
if err != nil {
return err
}

cluster.HandleStatsAsync(c, region)

hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin)
if !saveCache && !isNew {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
c.regionStats.Observe(region, c.GetRegionStores(region))
}
return nil
}

var overlaps []*core.RegionInfo
if saveCache {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
// However it can't solve the race condition of concurrent heartbeats from the same region.
if overlaps, err = c.AtomicCheckAndPutRegion(region); err != nil {
return err
}

cluster.HandleOverlaps(c, overlaps)
}

cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared())
return nil
}

// IsPrepared return true if the prepare checker is ready.
func (c *Cluster) IsPrepared() bool {
return c.coordinator.GetPrepareChecker().IsPrepared()
}
1 change: 0 additions & 1 deletion pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func (c *Config) Parse(flagSet *pflag.FlagSet) error {
configutil.AdjustCommandLineString(flagSet, &c.BackendEndpoints, "backend-endpoints")
configutil.AdjustCommandLineString(flagSet, &c.ListenAddr, "listen-addr")
configutil.AdjustCommandLineString(flagSet, &c.AdvertiseListenAddr, "advertise-listen-addr")

return c.adjust(meta)
}

Expand Down
Loading

0 comments on commit 643c2c3

Please sign in to comment.