Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: bufferflies <1045931706@qq.com>
  • Loading branch information
bufferflies committed Mar 29, 2022
1 parent 5c951c8 commit e35ec2f
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 16 deletions.
4 changes: 0 additions & 4 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,9 +609,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {

// processBucketHeartbeat update the bucket information.
func (c *RaftCluster) processBucketHeartbeat(buckets *metapb.Buckets) error {
c.RLock()
region := c.core.GetRegion(buckets.GetRegionId())
c.RUnlock()
if region == nil {
bucketEventCounter.WithLabelValues("region_cache_miss").Inc()
return errors.Errorf("region %v not found", buckets.GetRegionId())
Expand All @@ -625,8 +623,6 @@ func (c *RaftCluster) processBucketHeartbeat(buckets *metapb.Buckets) error {
return nil
}
if ok := region.UpdateBuckets(buckets); ok {
log.Info("update buckets successful", zap.Uint64("region-id", buckets.GetRegionId()),
zap.Uint64("version", buckets.Version))
bucketEventCounter.WithLabelValues("update_cache").Inc()
return nil
}
Expand Down
7 changes: 3 additions & 4 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func errRegionIsStale(region *metapb.Region, origin *metapb.Region) error {
}

// RegionInfo records detail region info.
// most properties is Read-Only once created exclude buckets.
// buckets should be modified through report buckets and the version is greater than the current.
// the properties are Read-Only once created except buckets.
// the `buckets` could be modified by the request `report buckets` with greater version.
type RegionInfo struct {
term uint64
meta *metapb.Region
Expand Down Expand Up @@ -169,8 +169,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
}

// Inherit inherits the buckets and region size from the parent region.
// CorrectApproximateSize correct approximate size by the previous size if here exists an reported RegionInfo.
//
// correct approximate size and buckets by the previous size if here exists a reported RegionInfo.
// See https://github.com/tikv/tikv/issues/11114
func (r *RegionInfo) Inherit(origin *RegionInfo) {
// regionSize should not be zero if region is not empty.
Expand Down
12 changes: 6 additions & 6 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,14 +652,14 @@ const heartbeatSendTimeout = 5 * time.Second

var errSendHeartbeatTimeout = errors.New("send heartbeat timeout")

// bucketHeartServer wraps PD_ReportBucketsServer to ensure when any error
// bucketHeartbeatServer wraps PD_ReportBucketsServer to ensure when any error
// occurs on SendAndClose() or Recv(), both endpoints will be closed.
type bucketHeartServer struct {
type bucketHeartbeatServer struct {
stream pdpb.PD_ReportBucketsServer
closed int32
}

func (b *bucketHeartServer) Send(bucket *pdpb.ReportBucketsResponse) error {
func (b *bucketHeartbeatServer) Send(bucket *pdpb.ReportBucketsResponse) error {
if atomic.LoadInt32(&b.closed) == 1 {
return status.Errorf(codes.Canceled, "stream is closed")
}
Expand All @@ -679,7 +679,7 @@ func (b *bucketHeartServer) Send(bucket *pdpb.ReportBucketsResponse) error {
}
}

func (b *bucketHeartServer) Recv() (*pdpb.ReportBucketsRequest, error) {
func (b *bucketHeartbeatServer) Recv() (*pdpb.ReportBucketsRequest, error) {
if atomic.LoadInt32(&b.closed) == 1 {
return nil, io.EOF
}
Expand Down Expand Up @@ -731,7 +731,7 @@ func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) {
// ReportBuckets implements gRPC PDServer
func (s *GrpcServer) ReportBuckets(stream pdpb.PD_ReportBucketsServer) error {
var (
server = &bucketHeartServer{stream: stream}
server = &bucketHeartbeatServer{stream: stream}
forwardStream pdpb.PD_ReportBucketsClient
cancel context.CancelFunc
lastForwardedHost string
Expand Down Expand Up @@ -1836,7 +1836,7 @@ func (s *GrpcServer) createReportBucketsForwardStream(client *grpc.ClientConn) (
return forwardStream, cancel, err
}

func forwardReportBucketClientToServer(forwardStream pdpb.PD_ReportBucketsClient, server *bucketHeartServer, errCh chan error) {
func forwardReportBucketClientToServer(forwardStream pdpb.PD_ReportBucketsClient, server *bucketHeartbeatServer, errCh chan error) {
defer close(errCh)
for {
resp, err := forwardStream.CloseAndRecv()
Expand Down
4 changes: 2 additions & 2 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
bucketReportCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "scheduler",
Subsystem: "server",
Name: "bucket_report",
Help: "Counter of bucket report.",
}, []string{"type", "status"})
Expand Down Expand Up @@ -94,7 +94,7 @@ var (
bucketReportHandleDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "scheduler",
Subsystem: "server",
Name: "handle_bucket_report_duration_seconds",
Help: "Bucketed histogram of processing time (s) of handled bucket report requests.",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 29), // 0.1ms ~ 7hours
Expand Down

0 comments on commit e35ec2f

Please sign in to comment.