From a78e6325fe2c93e93a2eb83f182b96b5c3261225 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Wed, 30 Mar 2022 11:20:23 +0800 Subject: [PATCH] grafana add store infomation Signed-off-by: bufferflies <1045931706@qq.com> --- server/cluster/cluster.go | 5 +++++ server/cluster/cluster_test.go | 1 - server/core/basic_cluster.go | 11 +++++++++++ server/core/region.go | 3 ++- server/grpc_service.go | 19 +++++++++++++------ server/metrics.go | 8 ++++---- 6 files changed, 35 insertions(+), 12 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 0906ff9e302..c437e8b9aab 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -894,6 +894,11 @@ func (c *RaftCluster) GetStores() []*core.StoreInfo { return c.core.GetStores() } +// GetLeaderStoreByRegionID returns the leader store of the given region. +func (c *RaftCluster) GetLeaderStoreByRegionID(regionID uint64) *core.StoreInfo { + return c.core.GetLeaderStoreByRegionID(regionID) +} + // GetStore gets store from cluster. func (c *RaftCluster) GetStore(storeID uint64) *core.StoreInfo { return c.core.GetStore(storeID) diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 6a49a743582..59de3e0c23c 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -471,7 +471,6 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) - cluster.coordinator = newCoordinator(s.ctx, cluster, nil) n, np := uint64(3), uint64(3) diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index fe5eeec8420..9af92795cc8 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -116,6 +116,17 @@ func (bc *BasicCluster) GetFollowerStores(region *RegionInfo) []*StoreInfo { return Stores } +// GetLeaderStoreByRegionID returns the leader store of the given region. +func (bc *BasicCluster) GetLeaderStoreByRegionID(regionID uint64) *StoreInfo { + bc.RLock() + defer bc.RUnlock() + region := bc.Regions.GetRegion(regionID) + if region == nil || region.GetLeader() == nil { + return nil + } + return bc.Stores.GetStore(region.GetLeader().GetStoreId()) +} + // GetLeaderStore returns all Stores that contains the region's leader peer. func (bc *BasicCluster) GetLeaderStore(region *RegionInfo) *StoreInfo { bc.RLock() diff --git a/server/core/region.go b/server/core/region.go index 14fb421b87b..ebf34a3a5dd 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -60,7 +60,8 @@ type RegionInfo struct { replicationStatus *replication_modepb.RegionReplicationStatus QueryStats *pdpb.QueryStats flowRoundDivisor uint64 - buckets unsafe.Pointer + // buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version. + buckets unsafe.Pointer } // NewRegionInfo creates RegionInfo with region's meta and leader peer. diff --git a/server/grpc_service.go b/server/grpc_service.go index 0e7bd2a7f7e..0c45b7c2a66 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -788,19 +788,26 @@ func (s *GrpcServer) ReportBuckets(stream pdpb.PD_ReportBucketsServer) error { if err := s.validateRequest(request.GetHeader()); err != nil { return err } - start := time.Now() - bucketReportCounter.WithLabelValues("report", "recv").Inc() buckets := request.GetBuckets() if buckets == nil || len(buckets.Keys) == 0 { continue } - err = rc.HandleBucketHeartbeat(request.Buckets) + store := rc.GetLeaderStoreByRegionID(buckets.GetRegionId()) + if store == nil { + return errors.Errorf("the store of the bucket in region %v is not found ", buckets.GetRegionId()) + } + storeLabel := strconv.FormatUint(store.GetID(), 10) + storeAddress := store.GetAddress() + bucketReportCounter.WithLabelValues(storeAddress, storeLabel, "report", "recv").Inc() + + start := time.Now() + err = rc.HandleBucketHeartbeat(buckets) if err != nil { - bucketReportCounter.WithLabelValues("report", "err").Inc() + bucketReportCounter.WithLabelValues(storeAddress, storeLabel, "report", "err").Inc() continue } - bucketReportHandleDuration.WithLabelValues().Observe(time.Since(start).Seconds()) - bucketReportCounter.WithLabelValues("report", "ok").Inc() + bucketReportLatency.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds()) + bucketReportCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc() } } diff --git a/server/metrics.go b/server/metrics.go index 46b699190dc..aad97ee0bf5 100644 --- a/server/metrics.go +++ b/server/metrics.go @@ -30,7 +30,7 @@ var ( Subsystem: "server", Name: "bucket_report", Help: "Counter of bucket report.", - }, []string{"type", "status"}) + }, []string{"address", "store", "type", "status"}) regionHeartbeatCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", @@ -91,14 +91,14 @@ var ( Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), }) - bucketReportHandleDuration = prometheus.NewHistogramVec( + bucketReportLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "pd", Subsystem: "server", Name: "handle_bucket_report_duration_seconds", Help: "Bucketed histogram of processing time (s) of handled bucket report requests.", Buckets: prometheus.ExponentialBuckets(0.0001, 2, 29), // 0.1ms ~ 7hours - }, []string{}) + }, []string{"address", "store"}) regionHeartbeatHandleDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -149,6 +149,6 @@ func init() { prometheus.MustRegister(storeHeartbeatHandleDuration) prometheus.MustRegister(serverInfo) prometheus.MustRegister(bucketReportCounter) - prometheus.MustRegister(bucketReportHandleDuration) + prometheus.MustRegister(bucketReportLatency) prometheus.MustRegister(serviceAuditHistogram) }