Skip to content

Commit

Permalink
fix(server): reduce the screaming heartbeat logs
Browse files Browse the repository at this point in the history
Currently the only way we know that a peer isn't getting a heartbeat is
an edge triggered event from go raft on every missed heartbeat. This
means that we need to do some book keeping in order to do exponential
backoff.

The upside is that instead of screaming thousands of log lines before a
machine hits the default removal of 30 minutes it is only ~100.
  • Loading branch information
Brandon Philips committed Jun 7, 2014
1 parent 79c650d commit b71282c
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 2 deletions.
71 changes: 70 additions & 1 deletion server/peer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (
)

const (
// MaxHeartbeatTimeoutBackoff is the maximum number of seconds before we warn
// the user again about a peer not accepting heartbeats.
MaxHeartbeatTimeoutBackoff = 15 * time.Second

// ThresholdMonitorTimeout is the time between log notifications that the
// Raft heartbeat is too close to the election timeout.
ThresholdMonitorTimeout = 5 * time.Second
Expand Down Expand Up @@ -70,10 +74,18 @@ type PeerServer struct {
routineGroup sync.WaitGroup
timeoutThresholdChan chan interface{}

logBackoffs map[string]*logBackoff

metrics *metrics.Bucket
sync.Mutex
}

type logBackoff struct {
next time.Time
backoff time.Duration
count int
}

// TODO: find a good policy to do snapshot
type snapshotConf struct {
// Etcd will check if snapshot is need every checkingInterval
Expand All @@ -97,6 +109,7 @@ func NewPeerServer(psConfig PeerServerConfig, client *Client, registry *Registry
serverStats: serverStats,

timeoutThresholdChan: make(chan interface{}, 1),
logBackoffs: make(map[string]*logBackoff),

metrics: mb,
}
Expand Down Expand Up @@ -279,6 +292,7 @@ func (s *PeerServer) Start(snapshot bool, clusterConfig *ClusterConfig) error {
s.startRoutine(s.monitorTimeoutThreshold)
s.startRoutine(s.monitorActiveSize)
s.startRoutine(s.monitorPeerActivity)
s.startRoutine(s.monitorLogBackoff)

// open the snapshot
if snapshot {
Expand Down Expand Up @@ -691,7 +705,7 @@ func (s *PeerServer) raftEventLogger(event raft.Event) {
if peer, ok := value.(*raft.Peer); ok {
name = peer.Name
}
log.Infof("%s: warning: heartbeat timed out: '%v'", s.Config.Name, name)
s.logHeartbeatTimeout(name)
case raft.ElectionTimeoutThresholdEventType:
select {
case s.timeoutThresholdChan <- value:
Expand All @@ -701,6 +715,61 @@ func (s *PeerServer) raftEventLogger(event raft.Event) {
}
}

// logHeartbeatTimeout logs about the edge triggered heartbeat timeout event
// only if we haven't warned within a reasonable interval.
func (s *PeerServer) logHeartbeatTimeout(name string) {
b, ok := s.logBackoffs[name]
if !ok {
b = &logBackoff{time.Time{}, time.Second, 1}
s.logBackoffs[name] = b
}

if b.next.After(time.Now()) {
b.count++
return
}

b.backoff = 2 * b.backoff
if b.backoff > MaxHeartbeatTimeoutBackoff {
b.backoff = MaxHeartbeatTimeoutBackoff
}
b.next = time.Now().Add(b.backoff)

log.Infof("%s: warning: heartbeat time out peer=%q missed=%d backoff=%q", s.Config.Name, name, b.count, b.backoff)
}

// monitorLogBackoff cleans up any logBackoff entries for peers that have had activity.
func (s *PeerServer) monitorLogBackoff() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
// Ignore while this peer is not a leader.
if s.raftServer.State() != raft.Leader {
continue
}

select {
case <-s.closeChan:
return
case <-ticker.C:
if len(s.logBackoffs) == 0 {
continue
}
peers := s.raftServer.Peers()
for _, peer := range peers {
b, ok := s.logBackoffs[peer.Name]
if !ok {
continue
}

if peer.LastActivity().After(b.next) {
delete(s.logBackoffs, peer.Name)
}
}
}
}
}

func (s *PeerServer) recordMetricEvent(event raft.Event) {
name := fmt.Sprintf("raft.event.%s", event.Type())
value := event.Value().(time.Duration)
Expand Down
2 changes: 1 addition & 1 deletion server/peer_server_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (ps *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request)

// Response to the name request
func (ps *PeerServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] Get %s/version/ ", ps.Config.URL)
log.Debugf("[recv] Get %s/version", ps.Config.URL)
w.WriteHeader(http.StatusOK)
w.Write([]byte(strconv.Itoa(ps.store.Version())))
}
Expand Down

0 comments on commit b71282c

Please sign in to comment.