Skip to content

Commit

Permalink
Merge branch 'release-4.0' into release-4.0-60508bde0819
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored May 10, 2021
2 parents 15e9a07 + a5addc4 commit efe8dcf
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 5 deletions.
24 changes: 20 additions & 4 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

// errRegionIsStale is error info for region is stale.
var errRegionIsStale = func(region *metapb.Region, origin *metapb.Region) error {
func errRegionIsStale(region *metapb.Region, origin *metapb.Region) error {
return errors.Errorf("region is stale: region %v origin %v", region, origin)
}

Expand Down Expand Up @@ -83,9 +83,16 @@ func classifyVoterAndLearner(region *RegionInfo) {
region.voters = voters
}

// EmptyRegionApproximateSize is the region approximate size of an empty region
// (heartbeat size <= 1MB).
const EmptyRegionApproximateSize = 1
const (
// EmptyRegionApproximateSize is the region approximate size of an empty region
// (heartbeat size <= 1MB).
EmptyRegionApproximateSize = 1
// ImpossibleFlowSize is an impossible flow size (such as written_bytes, read_keys, etc.)
// It may be caused by overflow, refer to https://github.com/tikv/pd/issues/3379.
// They need to be filtered so as not to affect downstream.
// (flow size >= 1024TB)
ImpossibleFlowSize = 1 << 50
)

// RegionFromHeartbeat constructs a Region from region heartbeat.
func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest) *RegionInfo {
Expand All @@ -112,6 +119,15 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest) *RegionInfo {
replicationStatus: heartbeat.GetReplicationStatus(),
}

if region.writtenKeys >= ImpossibleFlowSize || region.writtenBytes >= ImpossibleFlowSize {
region.writtenKeys = 0
region.writtenBytes = 0
}
if region.readKeys >= ImpossibleFlowSize || region.readBytes >= ImpossibleFlowSize {
region.readKeys = 0
region.readBytes = 0
}

sort.Sort(peerStatsSlice(region.downPeers))
sort.Sort(peerSlice(region.pendingPeers))

Expand Down
11 changes: 10 additions & 1 deletion tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,8 +1025,8 @@ func (s *clusterTestSuite) TestUpgradeStoreLimit(c *C) {

func (s *clusterTestSuite) TestStaleTermHeartbeat(c *C) {
tc, err := tests.NewTestCluster(s.ctx, 1)
defer tc.Destroy()
c.Assert(err, IsNil)
defer tc.Destroy()

err = tc.RunInitialServers()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -1084,6 +1084,15 @@ func (s *clusterTestSuite) TestStaleTermHeartbeat(c *C) {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)

// issue #3379
regionReq.KeysWritten = uint64(18446744073709551615) // -1
regionReq.BytesWritten = uint64(18446744073709550602) // -1024
region = core.RegionFromHeartbeat(regionReq)
c.Assert(region.GetKeysWritten(), Equals, uint64(0))
c.Assert(region.GetBytesWritten(), Equals, uint64(0))
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)

// Stale heartbeat, update check should fail
regionReq.Term = 5
regionReq.Leader = peers[0]
Expand Down

0 comments on commit efe8dcf

Please sign in to comment.