Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional metrics regarding log dispatching and committal #316

Merged
merged 1 commit into from
Apr 9, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,9 @@ func (r *Raft) leaderLoop() {
}
}

var numProcessed int
start := time.Now()

for {
e := r.leaderState.inflight.Front()
if e == nil {
Expand All @@ -532,10 +535,19 @@ func (r *Raft) leaderLoop() {
}
// Measure the commit time
metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch)

r.processLogs(idx, commitLog)

r.leaderState.inflight.Remove(e)
numProcessed++
}

// Measure the time to enqueue batch of logs for FSM to apply
metrics.MeasureSince([]string{"raft", "fsm", "enqueue"}, start)

// Count the number of logs enqueued
metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(numProcessed))

if stepDown {
if r.conf.ShutdownOnRemove {
r.logger.Printf("[INFO] raft: Removed ourself, shutting down")
Expand Down Expand Up @@ -848,7 +860,10 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) {

term := r.getCurrentTerm()
lastIndex := r.getLastIndex()
logs := make([]*Log, len(applyLogs))

n := len(applyLogs)
logs := make([]*Log, n)
metrics.SetGauge([]string{"raft", "leader", "dispatchNumLogs"}, float32(n))

for idx, applyLog := range applyLogs {
applyLog.dispatch = now
Expand Down Expand Up @@ -879,10 +894,10 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
}
}

// processLogs is used to apply all the committed entires that haven't been
// processLogs is used to apply all the committed entries that haven't been
// applied up to the given index limit.
// This can be called from both leaders and followers.
// Followers call this from AppendEntires, for n entires at a time, and always
// Followers call this from AppendEntries, for n entries at a time, and always
// pass future=nil.
// Leaders call this once per inflight when entries are committed. They pass
// the future from inflights.
Expand All @@ -899,7 +914,6 @@ func (r *Raft) processLogs(index uint64, future *logFuture) {
// Get the log, either from the future or from our log store
if future != nil && future.log.Index == idx {
r.processLog(&future.log, future)

} else {
l := new(Log)
if err := r.logs.GetLog(idx, l); err != nil {
Expand Down