Skip to content

Commit

Permalink
grafana add store infomation
Browse files Browse the repository at this point in the history
Signed-off-by: bufferflies <1045931706@qq.com>
  • Loading branch information
bufferflies committed Mar 30, 2022
1 parent b20dd37 commit a78e632
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 12 deletions.
5 changes: 5 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,11 @@ func (c *RaftCluster) GetStores() []*core.StoreInfo {
return c.core.GetStores()
}

// GetLeaderStoreByRegionID returns the leader store of the given region.
func (c *RaftCluster) GetLeaderStoreByRegionID(regionID uint64) *core.StoreInfo {
return c.core.GetLeaderStoreByRegionID(regionID)
}

// GetStore gets store from cluster.
func (c *RaftCluster) GetStore(storeID uint64) *core.StoreInfo {
return c.core.GetStore(storeID)
Expand Down
1 change: 0 additions & 1 deletion server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,6 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())

cluster.coordinator = newCoordinator(s.ctx, cluster, nil)

n, np := uint64(3), uint64(3)
Expand Down
11 changes: 11 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ func (bc *BasicCluster) GetFollowerStores(region *RegionInfo) []*StoreInfo {
return Stores
}

// GetLeaderStoreByRegionID returns the leader store of the given region.
func (bc *BasicCluster) GetLeaderStoreByRegionID(regionID uint64) *StoreInfo {
bc.RLock()
defer bc.RUnlock()
region := bc.Regions.GetRegion(regionID)
if region == nil || region.GetLeader() == nil {
return nil
}
return bc.Stores.GetStore(region.GetLeader().GetStoreId())
}

// GetLeaderStore returns all Stores that contains the region's leader peer.
func (bc *BasicCluster) GetLeaderStore(region *RegionInfo) *StoreInfo {
bc.RLock()
Expand Down
3 changes: 2 additions & 1 deletion server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ type RegionInfo struct {
replicationStatus *replication_modepb.RegionReplicationStatus
QueryStats *pdpb.QueryStats
flowRoundDivisor uint64
buckets unsafe.Pointer
// buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version.
buckets unsafe.Pointer
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
Expand Down
19 changes: 13 additions & 6 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,19 +788,26 @@ func (s *GrpcServer) ReportBuckets(stream pdpb.PD_ReportBucketsServer) error {
if err := s.validateRequest(request.GetHeader()); err != nil {
return err
}
start := time.Now()
bucketReportCounter.WithLabelValues("report", "recv").Inc()
buckets := request.GetBuckets()
if buckets == nil || len(buckets.Keys) == 0 {
continue
}
err = rc.HandleBucketHeartbeat(request.Buckets)
store := rc.GetLeaderStoreByRegionID(buckets.GetRegionId())
if store == nil {
return errors.Errorf("the store of the bucket in region %v is not found ", buckets.GetRegionId())
}
storeLabel := strconv.FormatUint(store.GetID(), 10)
storeAddress := store.GetAddress()
bucketReportCounter.WithLabelValues(storeAddress, storeLabel, "report", "recv").Inc()

start := time.Now()
err = rc.HandleBucketHeartbeat(buckets)
if err != nil {
bucketReportCounter.WithLabelValues("report", "err").Inc()
bucketReportCounter.WithLabelValues(storeAddress, storeLabel, "report", "err").Inc()
continue
}
bucketReportHandleDuration.WithLabelValues().Observe(time.Since(start).Seconds())
bucketReportCounter.WithLabelValues("report", "ok").Inc()
bucketReportLatency.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds())
bucketReportCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc()
}
}

Expand Down
8 changes: 4 additions & 4 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var (
Subsystem: "server",
Name: "bucket_report",
Help: "Counter of bucket report.",
}, []string{"type", "status"})
}, []string{"address", "store", "type", "status"})
regionHeartbeatCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Expand Down Expand Up @@ -91,14 +91,14 @@ var (
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
})

bucketReportHandleDuration = prometheus.NewHistogramVec(
bucketReportLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "server",
Name: "handle_bucket_report_duration_seconds",
Help: "Bucketed histogram of processing time (s) of handled bucket report requests.",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 29), // 0.1ms ~ 7hours
}, []string{})
}, []string{"address", "store"})

regionHeartbeatHandleDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down Expand Up @@ -149,6 +149,6 @@ func init() {
prometheus.MustRegister(storeHeartbeatHandleDuration)
prometheus.MustRegister(serverInfo)
prometheus.MustRegister(bucketReportCounter)
prometheus.MustRegister(bucketReportHandleDuration)
prometheus.MustRegister(bucketReportLatency)
prometheus.MustRegister(serviceAuditHistogram)
}

0 comments on commit a78e632

Please sign in to comment.