Skip to content

Commit

Permalink
mcs: add service role (#7175)
Browse files Browse the repository at this point in the history
ref #5839

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Oct 9, 2023
1 parent 2556b5b commit 097f25a
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 188 deletions.
305 changes: 136 additions & 169 deletions metrics/grafana/pd.json

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ import (

var _ bs.Server = (*Server)(nil)

const serviceName = "Resource Manager"

// Server is the resource manager server, and it implements bs.Server.
type Server struct {
*server.BaseServer
Expand Down Expand Up @@ -168,6 +170,7 @@ func (s *Server) campaignLeader() {
defer resetLeaderOnce.Do(func() {
cancel()
s.participant.ResetLeader()
member.ServiceMemberGauge.WithLabelValues(serviceName).Set(0)
})

// maintain the leadership, after this, Resource Manager could be ready to provide service.
Expand All @@ -180,6 +183,7 @@ func (s *Server) campaignLeader() {
}

s.participant.EnableLeader()
member.ServiceMemberGauge.WithLabelValues(serviceName).Set(1)
log.Info("resource manager primary is ready to serve", zap.String("resource-manager-primary-name", s.participant.Name()))

leaderTicker := time.NewTicker(utils.LeaderTickInterval)
Expand Down Expand Up @@ -382,8 +386,8 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) {
// Flushing any buffered log entries
defer log.Sync()

versioninfo.Log("Resource Manager")
log.Info("Resource Manager config", zap.Reflect("config", cfg))
versioninfo.Log(serviceName)
log.Info("resource manager config", zap.Reflect("config", cfg))

grpcprometheus.EnableHandlingTimeHistogram()
metricutil.Push(&cfg.Metric)
Expand Down
12 changes: 9 additions & 3 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ import (

var _ bs.Server = (*Server)(nil)

const memberUpdateInterval = time.Minute
const (
serviceName = "Scheduling Service"

memberUpdateInterval = time.Minute
)

// Server is the scheduling server, and it implements bs.Server.
type Server struct {
Expand Down Expand Up @@ -255,6 +259,7 @@ func (s *Server) campaignLeader() {
defer resetLeaderOnce.Do(func() {
cancel()
s.participant.ResetLeader()
member.ServiceMemberGauge.WithLabelValues(serviceName).Set(0)
})

// maintain the leadership, after this, Scheduling could be ready to provide service.
Expand All @@ -274,6 +279,7 @@ func (s *Server) campaignLeader() {
}
}()
s.participant.EnableLeader()
member.ServiceMemberGauge.WithLabelValues(serviceName).Set(1)
log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name()))

leaderTicker := time.NewTicker(utils.LeaderTickInterval)
Expand Down Expand Up @@ -533,8 +539,8 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) {
// Flushing any buffered log entries
defer log.Sync()

versioninfo.Log("Scheduling")
log.Info("Scheduling config", zap.Reflect("config", cfg))
versioninfo.Log(serviceName)
log.Info("scheduling service config", zap.Reflect("config", cfg))

grpcprometheus.EnableHandlingTimeHistogram()
metricutil.Push(&cfg.Metric)
Expand Down
6 changes: 4 additions & 2 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ import (
var _ bs.Server = (*Server)(nil)
var _ tso.ElectionMember = (*member.Participant)(nil)

const serviceName = "TSO Service"

// Server is the TSO server, and it implements bs.Server.
type Server struct {
*server.BaseServer
Expand Down Expand Up @@ -450,8 +452,8 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) {
// Flushing any buffered log entries
defer log.Sync()

versioninfo.Log("TSO")
log.Info("TSO config", zap.Reflect("config", cfg))
versioninfo.Log(serviceName)
log.Info("TSO service config", zap.Reflect("config", cfg))

grpcprometheus.EnableHandlingTimeHistogram()
metricutil.Push(&cfg.Metric)
Expand Down
32 changes: 32 additions & 0 deletions pkg/member/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package member

import "github.com/prometheus/client_golang/prometheus"

var (
// ServiceMemberGauge is used to record the leader/primary of services.
ServiceMemberGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "service",
Subsystem: "member",
Name: "role",
Help: "The leader/primary of services",
}, []string{"service"})
)

func init() {
prometheus.MustRegister(ServiceMemberGauge)
}
18 changes: 9 additions & 9 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (c *Coordinator) PatrolRegions() {
ticker := time.NewTicker(c.cluster.GetCheckerConfig().GetPatrolRegionInterval())
defer ticker.Stop()

log.Info("Coordinator starts patrol regions")
log.Info("coordinator starts patrol regions")
start := time.Now()
var (
key []byte
Expand Down Expand Up @@ -252,7 +252,7 @@ func (c *Coordinator) checkPriorityRegions() {
func (c *Coordinator) checkSuspectRanges() {
defer logutil.LogPanic()
defer c.wg.Done()
log.Info("Coordinator begins to check suspect key ranges")
log.Info("coordinator begins to check suspect key ranges")
ticker := time.NewTicker(checkSuspectRangesInterval)
defer ticker.Stop()
for {
Expand Down Expand Up @@ -316,7 +316,7 @@ func (c *Coordinator) drivePushOperator() {
defer logutil.LogPanic()

defer c.wg.Done()
log.Info("Coordinator begins to actively drive push operator")
log.Info("coordinator begins to actively drive push operator")
ticker := time.NewTicker(pushOperatorTickInterval)
defer ticker.Stop()
for {
Expand Down Expand Up @@ -370,10 +370,10 @@ func (c *Coordinator) driveSlowNodeScheduler() {
func (c *Coordinator) RunUntilStop() {
c.Run()
<-c.ctx.Done()
log.Info("Coordinator is stopping")
log.Info("coordinator is stopping")
c.GetSchedulersController().Wait()
c.wg.Wait()
log.Info("Coordinator has been stopped")
log.Info("coordinator has been stopped")
}

// Run starts coordinator.
Expand All @@ -383,20 +383,20 @@ func (c *Coordinator) Run() {
ticker = time.NewTicker(100 * time.Millisecond)
})
defer ticker.Stop()
log.Info("Coordinator starts to collect cluster information")
log.Info("coordinator starts to collect cluster information")
for {
if c.ShouldRun() {
log.Info("Coordinator has finished cluster information preparation")
log.Info("coordinator has finished cluster information preparation")
break
}
select {
case <-ticker.C:
case <-c.ctx.Done():
log.Info("Coordinator stops running")
log.Info("coordinator stops running")
return
}
}
log.Info("Coordinator starts to run schedulers")
log.Info("coordinator starts to run schedulers")
c.InitSchedulers(true)

c.wg.Add(4)
Expand Down
4 changes: 4 additions & 0 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/errs"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/logutil"
Expand Down Expand Up @@ -619,10 +620,13 @@ func (gta *GlobalTSOAllocator) campaignLeader() {
gta.am.ResetAllocatorGroup(GlobalDCLocation)
}()

tsoLabel := fmt.Sprintf("TSO Service Group %d", gta.getGroupID())
gta.member.EnableLeader()
member.ServiceMemberGauge.WithLabelValues(tsoLabel).Set(1)
defer resetLeaderOnce.Do(func() {
cancel()
gta.member.ResetLeader()
member.ServiceMemberGauge.WithLabelValues(tsoLabel).Set(0)
})

// TODO: if enable-local-tso is true, check the cluster dc-location after the primary is elected
Expand Down
4 changes: 2 additions & 2 deletions pkg/unsaferecovery/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func (u *Controller) CollectReport(heartbeat *pdpb.StoreHeartbeatRequest) (bool,
}

if heartbeat.StoreReport.GetStep() != u.step {
log.Info("Unsafe recovery receives invalid store report",
log.Info("unsafe recovery receives invalid store report",
zap.Uint64("store-id", storeID), zap.Uint64("expected-step", u.step), zap.Uint64("obtained-step", heartbeat.StoreReport.GetStep()))
// invalid store report, ignore
return false, nil
Expand Down Expand Up @@ -891,7 +891,7 @@ func (t *regionTree) insert(item *regionItem) (bool, error) {
}

for _, newer := range overlaps {
log.Info("Unsafe recovery found overlap regions", logutil.ZapRedactStringer("newer-region-meta", core.RegionToHexMeta(newer.Region())), logutil.ZapRedactStringer("older-region-meta", core.RegionToHexMeta(item.Region())))
log.Info("unsafe recovery found overlap regions", logutil.ZapRedactStringer("newer-region-meta", core.RegionToHexMeta(newer.Region())), logutil.ZapRedactStringer("older-region-meta", core.RegionToHexMeta(item.Region())))
// it's ensured by the `buildUpFromReports` that peers are inserted in epoch descending order.
if newer.IsEpochStale(item) {
return false, errors.Errorf("region %v's epoch shouldn't be staler than old ones %v", item, newer)
Expand Down
4 changes: 3 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ const (
// PDMode represents that server is in PD mode.
PDMode = "PD"
// APIServiceMode represents that server is in API service mode.
APIServiceMode = "API service"
APIServiceMode = "API Service"

// maxRetryTimesGetServicePrimary is the max retry times for getting primary addr.
// Note: it need to be less than client.defaultPDTimeout
Expand Down Expand Up @@ -1721,6 +1721,7 @@ func (s *Server) campaignLeader() {
}
// EnableLeader to accept the remaining service, such as GetStore, GetRegion.
s.member.EnableLeader()
member.ServiceMemberGauge.WithLabelValues(s.mode).Set(1)
if !s.IsAPIServiceMode() {
// Check the cluster dc-location after the PD leader is elected.
go s.tsoAllocatorManager.ClusterDCLocationChecker()
Expand All @@ -1730,6 +1731,7 @@ func (s *Server) campaignLeader() {
// to be new leader.
cancel()
s.member.ResetLeader()
member.ServiceMemberGauge.WithLabelValues(s.mode).Set(0)
})

CheckPDVersion(s.persistOptions)
Expand Down

0 comments on commit 097f25a

Please sign in to comment.