diff --git a/raft.go b/raft.go index 395ecf745..a6e0d72c0 100644 --- a/raft.go +++ b/raft.go @@ -520,6 +520,9 @@ func (r *Raft) leaderLoop() { } } + var numProcessed int + start := time.Now() + for { e := r.leaderState.inflight.Front() if e == nil { @@ -532,10 +535,19 @@ func (r *Raft) leaderLoop() { } // Measure the commit time metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch) + r.processLogs(idx, commitLog) + r.leaderState.inflight.Remove(e) + numProcessed++ } + // Measure the time to enqueue batch of logs for FSM to apply + metrics.MeasureSince([]string{"raft", "fsm", "enqueue"}, start) + + // Count the number of logs enqueued + metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(numProcessed)) + if stepDown { if r.conf.ShutdownOnRemove { r.logger.Printf("[INFO] raft: Removed ourself, shutting down") @@ -848,7 +860,10 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { term := r.getCurrentTerm() lastIndex := r.getLastIndex() - logs := make([]*Log, len(applyLogs)) + + n := len(applyLogs) + logs := make([]*Log, n) + metrics.SetGauge([]string{"raft", "leader", "dispatchNumLogs"}, float32(n)) for idx, applyLog := range applyLogs { applyLog.dispatch = now @@ -879,10 +894,10 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { } } -// processLogs is used to apply all the committed entires that haven't been +// processLogs is used to apply all the committed entries that haven't been // applied up to the given index limit. // This can be called from both leaders and followers. -// Followers call this from AppendEntires, for n entires at a time, and always +// Followers call this from AppendEntries, for n entries at a time, and always // pass future=nil. // Leaders call this once per inflight when entries are committed. They pass // the future from inflights. @@ -899,7 +914,6 @@ func (r *Raft) processLogs(index uint64, future *logFuture) { // Get the log, either from the future or from our log store if future != nil && future.log.Index == idx { r.processLog(&future.log, future) - } else { l := new(Log) if err := r.logs.GetLog(idx, l); err != nil {