From e35ec2fcc72ce87284506f7ae82342a023f29307 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Tue, 29 Mar 2022 15:17:48 +0800 Subject: [PATCH] address comment Signed-off-by: bufferflies <1045931706@qq.com> --- server/cluster/cluster.go | 4 ---- server/core/region.go | 7 +++---- server/grpc_service.go | 12 ++++++------ server/metrics.go | 4 ++-- 4 files changed, 11 insertions(+), 16 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index afb041539f3..56e20ee2b86 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -609,9 +609,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error { // processBucketHeartbeat update the bucket information. func (c *RaftCluster) processBucketHeartbeat(buckets *metapb.Buckets) error { - c.RLock() region := c.core.GetRegion(buckets.GetRegionId()) - c.RUnlock() if region == nil { bucketEventCounter.WithLabelValues("region_cache_miss").Inc() return errors.Errorf("region %v not found", buckets.GetRegionId()) @@ -625,8 +623,6 @@ func (c *RaftCluster) processBucketHeartbeat(buckets *metapb.Buckets) error { return nil } if ok := region.UpdateBuckets(buckets); ok { - log.Info("update buckets successful", zap.Uint64("region-id", buckets.GetRegionId()), - zap.Uint64("version", buckets.Version)) bucketEventCounter.WithLabelValues("update_cache").Inc() return nil } diff --git a/server/core/region.go b/server/core/region.go index 5eaa0ddfe8e..7fc259fd329 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -40,8 +40,8 @@ func errRegionIsStale(region *metapb.Region, origin *metapb.Region) error { } // RegionInfo records detail region info. -// most properties is Read-Only once created exclude buckets. -// buckets should be modified through report buckets and the version is greater than the current. +// the properties are Read-Only once created except buckets. +// the `buckets` could be modified by the request `report buckets` with greater version. type RegionInfo struct { term uint64 meta *metapb.Region @@ -169,8 +169,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC } // Inherit inherits the buckets and region size from the parent region. -// CorrectApproximateSize correct approximate size by the previous size if here exists an reported RegionInfo. -// +// correct approximate size and buckets by the previous size if here exists a reported RegionInfo. // See https://github.com/tikv/tikv/issues/11114 func (r *RegionInfo) Inherit(origin *RegionInfo) { // regionSize should not be zero if region is not empty. diff --git a/server/grpc_service.go b/server/grpc_service.go index e58686b1e77..698576f0b4b 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -652,14 +652,14 @@ const heartbeatSendTimeout = 5 * time.Second var errSendHeartbeatTimeout = errors.New("send heartbeat timeout") -// bucketHeartServer wraps PD_ReportBucketsServer to ensure when any error +// bucketHeartbeatServer wraps PD_ReportBucketsServer to ensure when any error // occurs on SendAndClose() or Recv(), both endpoints will be closed. -type bucketHeartServer struct { +type bucketHeartbeatServer struct { stream pdpb.PD_ReportBucketsServer closed int32 } -func (b *bucketHeartServer) Send(bucket *pdpb.ReportBucketsResponse) error { +func (b *bucketHeartbeatServer) Send(bucket *pdpb.ReportBucketsResponse) error { if atomic.LoadInt32(&b.closed) == 1 { return status.Errorf(codes.Canceled, "stream is closed") } @@ -679,7 +679,7 @@ func (b *bucketHeartServer) Send(bucket *pdpb.ReportBucketsResponse) error { } } -func (b *bucketHeartServer) Recv() (*pdpb.ReportBucketsRequest, error) { +func (b *bucketHeartbeatServer) Recv() (*pdpb.ReportBucketsRequest, error) { if atomic.LoadInt32(&b.closed) == 1 { return nil, io.EOF } @@ -731,7 +731,7 @@ func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) { // ReportBuckets implements gRPC PDServer func (s *GrpcServer) ReportBuckets(stream pdpb.PD_ReportBucketsServer) error { var ( - server = &bucketHeartServer{stream: stream} + server = &bucketHeartbeatServer{stream: stream} forwardStream pdpb.PD_ReportBucketsClient cancel context.CancelFunc lastForwardedHost string @@ -1836,7 +1836,7 @@ func (s *GrpcServer) createReportBucketsForwardStream(client *grpc.ClientConn) ( return forwardStream, cancel, err } -func forwardReportBucketClientToServer(forwardStream pdpb.PD_ReportBucketsClient, server *bucketHeartServer, errCh chan error) { +func forwardReportBucketClientToServer(forwardStream pdpb.PD_ReportBucketsClient, server *bucketHeartbeatServer, errCh chan error) { defer close(errCh) for { resp, err := forwardStream.CloseAndRecv() diff --git a/server/metrics.go b/server/metrics.go index 2517dec3c94..46b699190dc 100644 --- a/server/metrics.go +++ b/server/metrics.go @@ -27,7 +27,7 @@ var ( bucketReportCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", - Subsystem: "scheduler", + Subsystem: "server", Name: "bucket_report", Help: "Counter of bucket report.", }, []string{"type", "status"}) @@ -94,7 +94,7 @@ var ( bucketReportHandleDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "pd", - Subsystem: "scheduler", + Subsystem: "server", Name: "handle_bucket_report_duration_seconds", Help: "Bucketed histogram of processing time (s) of handled bucket report requests.", Buckets: prometheus.ExponentialBuckets(0.0001, 2, 29), // 0.1ms ~ 7hours