Skip to content

Commit

Permalink
chore(v2): add raft node metrics (#3748)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae authored Dec 11, 2024
1 parent d071a31 commit de17ff2
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 30 deletions.
23 changes: 13 additions & 10 deletions pkg/experiment/metastore/raftnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ func (cfg *Config) Validate() error {
}

type Node struct {
logger log.Logger
config Config
reg prometheus.Registerer
fsm raft.FSM
logger log.Logger
config Config
metrics *metrics
reg prometheus.Registerer
fsm raft.FSM

walDir string
wal *raftwal.WAL
Expand All @@ -110,10 +111,11 @@ func NewNode(
fsm raft.FSM,
) (_ *Node, err error) {
n := Node{
logger: logger,
config: config,
reg: reg,
fsm: fsm,
logger: logger,
config: config,
metrics: newMetrics(reg),
reg: reg,
fsm: fsm,
}

defer func() {
Expand Down Expand Up @@ -159,7 +161,7 @@ func (n *Node) Init() (err error) {
if err != nil {
return fmt.Errorf("starting raft node: %w", err)
}
n.observer = NewRaftStateObserver(n.logger, n.raft, n.reg)
n.observer = NewRaftStateObserver(n.logger, n.raft, n.metrics.state)
n.service = NewRaftNodeService(n)

hasState, err := raft.HasExistingState(n.logStore, n.stableStore, n.snapshotStore)
Expand Down Expand Up @@ -272,11 +274,12 @@ func (n *Node) TransferLeadership() (err error) {
// Propose makes an attempt to apply the given command to the FSM.
// The function returns an error if node is not the leader.
func (n *Node) Propose(t fsm.RaftLogEntryType, m proto.Message) (resp proto.Message, err error) {
// TODO: logs, stats?
raw, err := fsm.MarshalEntry(t, m)
if err != nil {
return nil, err
}
timer := prometheus.NewTimer(n.metrics.apply)
defer timer.ObserveDuration()
future := n.raft.Apply(raw, n.config.ApplyTimeout)
if err = future.Error(); err != nil {
return nil, WithRaftLeaderStatusDetails(err, n.raft)
Expand Down
57 changes: 57 additions & 0 deletions pkg/experiment/metastore/raftnode/node_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package raftnode

import (
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/pyroscope/pkg/util"
)

type metrics struct {
apply prometheus.Histogram
read prometheus.Histogram
state *prometheus.GaugeVec
}

func newMetrics(reg prometheus.Registerer) *metrics {
m := &metrics{
apply: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "raft_apply_command_duration_seconds",
Help: "Duration of applying a command to the Raft log",
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramZeroThreshold: 0,
NativeHistogramMaxBucketNumber: 50,
NativeHistogramMinResetDuration: time.Hour,
NativeHistogramMaxZeroThreshold: 0,
}),

read: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "raft_read_index_wait_duration_seconds",
Help: "Duration of the Raft log read index wait",
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramZeroThreshold: 0,
NativeHistogramMaxBucketNumber: 50,
NativeHistogramMinResetDuration: time.Hour,
NativeHistogramMaxZeroThreshold: 0,
}),

state: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "raft_state",
Help: "Current Raft state",
},
[]string{"state"},
),
}

if reg != nil {
util.RegisterOrGet(reg, m.apply)
util.RegisterOrGet(reg, m.read)
util.RegisterOrGet(reg, m.state)
}

return m
}
3 changes: 3 additions & 0 deletions pkg/experiment/metastore/raftnode/node_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/hashicorp/raft"
"github.com/prometheus/client_golang/prometheus"
)

var (
Expand Down Expand Up @@ -162,6 +163,8 @@ func (r *StateReader[tx]) WaitLeaderCommitIndexApplied(ctx context.Context) (Rea
}

func (n *Node) ReadIndex() (ReadIndex, error) {
timer := prometheus.NewTimer(n.metrics.read)
defer timer.ObserveDuration()
v, err := n.readIndex()
return v, WithRaftLeaderStatusDetails(err, n.raft)
}
Expand Down
29 changes: 9 additions & 20 deletions pkg/experiment/metastore/raftnode/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,16 @@ type Observer struct {
done chan struct{}
}

func NewRaftStateObserver(logger log.Logger, r *raft.Raft, reg prometheus.Registerer) *Observer {
func NewRaftStateObserver(logger log.Logger, r *raft.Raft, state *prometheus.GaugeVec) *Observer {
o := &Observer{
logger: logger,
raft: r,
c: make(chan raft.Observation, 1),
stop: make(chan struct{}),
done: make(chan struct{}),
state: state,
}
o.state = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "raft_state",
Help: "Current Raft state",
},
[]string{"state"},
)
if reg != nil {
reg.MustRegister(o.state)
}
_ = level.Debug(o.logger).Log("msg", "registering raft state observer")
level.Debug(o.logger).Log("msg", "registering raft state observer")
o.observer = raft.NewObserver(o.c, true, func(o *raft.Observation) bool {
_, ok := o.Data.(raft.RaftState)
return ok
Expand All @@ -58,14 +49,10 @@ func (o *Observer) RegisterHandler(h StateHandler) {
o.updateRaftState()
}

func (o *Observer) OnLeader(a LeaderActivity) {
o.RegisterHandler(&leaderStateHandler{activity: a})
}

func (o *Observer) Deregister() {
close(o.stop)
<-o.done
_ = level.Debug(o.logger).Log("msg", "deregistering raft observer")
level.Debug(o.logger).Log("msg", "deregistering raft observer")
o.raft.DeregisterObserver(o.observer)
}

Expand All @@ -85,9 +72,11 @@ func (o *Observer) run() {

func (o *Observer) updateRaftState() {
state := o.raft.State()
o.state.Reset()
o.state.WithLabelValues(state.String()).Set(1)
_ = level.Debug(o.logger).Log("msg", "raft state changed", "raft_state", state)
level.Debug(o.logger).Log("msg", "raft state changed", "raft_state", state)
if o.state != nil {
o.state.Reset()
o.state.WithLabelValues(state.String()).Set(1)
}
for _, h := range o.handlers {
h.Observe(state)
}
Expand Down

0 comments on commit de17ff2

Please sign in to comment.