Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(v2): raft wait-committed #4072

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ require (
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-hclog v1.6.3 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-metrics v0.5.4 // indirect
github.com/hashicorp/go-msgpack v1.1.5 // indirect
github.com/hashicorp/go-msgpack/v2 v2.1.2 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
Expand Down Expand Up @@ -267,6 +268,8 @@ replace (
// merged upstream yet.
github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220708130638-bd88e10a3d91

github.com/hashicorp/raft => github.com/kolesnikovae/raft v0.0.0-20250403080330-9f32d71ebd58

// gopkg.in/yaml.v3
// + https://github.com/go-yaml/yaml/pull/691
// + https://github.com/go-yaml/yaml/pull/876
Expand Down
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVH
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJFeZnpfm2KLowc=
github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-metrics v0.5.4 h1:8mmPiIJkTPPEbAiV97IxdAGNdRdaWwVap1BU6elejKY=
github.com/hashicorp/go-metrics v0.5.4/go.mod h1:CG5yz4NZ/AI/aQt9Ucm/vdBnbh7fvmv4lxZ350i+QQI=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-msgpack v1.1.5 h1:9byZdVjKTe5mce63pRVNP1L7UAmdHOTEMGehn6KvJWs=
github.com/hashicorp/go-msgpack v1.1.5/go.mod h1:gWVc3sv/wbDmR3rQsj1CAktEZzoz1YNK9NfGLXJ69/4=
Expand Down Expand Up @@ -487,8 +489,6 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO
github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc=
github.com/hashicorp/nomad/api v0.0.0-20240306004928-3e7191ccb702 h1:fI1LXuBaS1d9z1kmb++Og6YD8uMRwadXorCwE+xgOFA=
github.com/hashicorp/nomad/api v0.0.0-20240306004928-3e7191ccb702/go.mod h1:z71gkJdrkAt/Rl6C7Q79VE7AwJ5lUF+M+fzFTyIHYB0=
github.com/hashicorp/raft v1.7.2-0.20241119084901-7e8e836fe2e8 h1:d2HabIDMkwzIKw+w82mZYelwMy4giCbpX4mjDQxmeuk=
github.com/hashicorp/raft v1.7.2-0.20241119084901-7e8e836fe2e8/go.mod h1:hUeiEwQQR/Nk2iKDD0dkEhklSsu3jcAcqvPzPoZSAEM=
github.com/hashicorp/raft-wal v0.4.1 h1:aU8XZ6x8R9BAIB/83Z1dTDtXvDVmv9YVYeXxd/1QBSA=
github.com/hashicorp/raft-wal v0.4.1/go.mod h1:A6vP5o8hGOs1LHfC1Okh9xPwWDcmb6Vvuz/QyqUXlOE=
github.com/hashicorp/serf v0.10.1 h1:Z1H2J60yRKvfDYAOZLd2MU0ND4AH/WDz7xYHDWQsIPY=
Expand Down Expand Up @@ -536,6 +536,8 @@ github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY=
github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8=
github.com/kolesnikovae/raft v0.0.0-20250403080330-9f32d71ebd58 h1:36aPY4Bo+Qb7Vdk/YMPX8mjdew9UG7k887pqOv2pds0=
github.com/kolesnikovae/raft v0.0.0-20250403080330-9f32d71ebd58/go.mod h1:DfvCGFxpAUPE0L4Uc8JLlTPtc3GzSbdH0MTJCLgnmJQ=
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b h1:udzkj9S/zlT5X367kqJis0QP7YMxobob6zhzq6Yre00=
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -668,6 +670,7 @@ github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5Fsn
github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
Expand Down
10 changes: 4 additions & 6 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -957,8 +957,6 @@ github.com/gobwas/ws v1.2.1 h1:F2aeBZrm2NDsc7vbovKrWSogd4wvfAxg0FQ89/iqOTk=
github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk=
github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/goccy/go-yaml v1.9.5 h1:Eh/+3uk9kLxG4koCX6lRMAPS1OaMSAi+FJcya0INdB0=
github.com/goccy/go-yaml v1.9.5/go.mod h1:U/jl18uSupI5rdI2jmuCswEA2htH9eXfferR3KfscvA=
github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA=
Expand Down Expand Up @@ -1020,6 +1018,8 @@ github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtng
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-hclog v0.12.0/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ=
github.com/hashicorp/go-hclog v1.2.0/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ=
github.com/hashicorp/go-metrics v0.5.4 h1:8mmPiIJkTPPEbAiV97IxdAGNdRdaWwVap1BU6elejKY=
github.com/hashicorp/go-metrics v0.5.4/go.mod h1:CG5yz4NZ/AI/aQt9Ucm/vdBnbh7fvmv4lxZ350i+QQI=
github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-syslog v1.0.0 h1:KaodqZuhUoZereWVIYmpUgZysurB1kBLX2j0MwMrUAE=
github.com/hashicorp/go-version v1.2.1/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
Expand Down Expand Up @@ -1062,6 +1062,8 @@ github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg=
github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/kolesnikovae/raft v0.0.0-20250403080330-9f32d71ebd58 h1:36aPY4Bo+Qb7Vdk/YMPX8mjdew9UG7k887pqOv2pds0=
github.com/kolesnikovae/raft v0.0.0-20250403080330-9f32d71ebd58/go.mod h1:DfvCGFxpAUPE0L4Uc8JLlTPtc3GzSbdH0MTJCLgnmJQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
Expand Down Expand Up @@ -1099,12 +1101,8 @@ github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpsp
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
github.com/minio/crc64nvme v1.0.1 h1:DHQPrYPdqK7jQG/Ls5CTBZWeex/2FMS3G5XGkycuFrY=
github.com/minio/crc64nvme v1.0.1/go.mod h1:eVfm2fAzLlxMdUGc0EEBGSMmPwmXD5XiNRpnu9J3bvg=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/minio/minio-go/v7 v7.0.88 h1:v8MoIJjwYxOkehp+eiLIuvXk87P2raUtoU5klrAAshs=
github.com/minio/minio-go/v7 v7.0.88/go.mod h1:33+O8h0tO7pCeCWwBVa07RhVVfB/3vS4kEX7rwYKmIg=
github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g=
github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM=
github.com/mitchellh/cli v1.1.0 h1:tEElEatulEHDeedTxwckzyYMA5c86fbmNIUL1hBIiTg=
Expand Down
4 changes: 2 additions & 2 deletions pkg/experiment/metastore/compaction_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (svc *CompactionService) PollCompactionJobs(
}

cmd := fsm.RaftLogEntryType(raft_log.RaftCommand_RAFT_COMMAND_GET_COMPACTION_PLAN_UPDATE)
resp, err := svc.raft.Propose(cmd, req)
resp, err := svc.raft.Apply(cmd, req)
if err != nil {
level.Error(svc.logger).Log("msg", "failed to prepare compaction plan", "err", err)
return nil, err
Expand Down Expand Up @@ -140,7 +140,7 @@ func (svc *CompactionService) PollCompactionJobs(
// scenario, and we don't want to stop the node/cluster). Instead, an
// empty response would indicate that the plan is rejected.
proposal := &raft_log.UpdateCompactionPlanRequest{Term: prepared.Term, PlanUpdate: planUpdate}
if resp, err = svc.raft.Propose(cmd, proposal); err != nil {
if resp, err = svc.raft.Apply(cmd, proposal); err != nil {
level.Error(svc.logger).Log("msg", "failed to update compaction plan", "err", err)
return nil, err
}
Expand Down
8 changes: 3 additions & 5 deletions pkg/experiment/metastore/fsm/boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"io"
"os"
"path/filepath"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/bbolt"
)

Expand Down Expand Up @@ -98,10 +98,8 @@ func (db *boltdb) shutdown() {
}

func (db *boltdb) restore(snapshot io.Reader) error {
start := time.Now()
defer func() {
db.metrics.boltDBRestoreSnapshotDuration.Observe(time.Since(start).Seconds())
}()
timer := prometheus.NewTimer(db.metrics.boltDBRestoreSnapshotDuration)
defer timer.ObserveDuration()
// Snapshot is a full copy of the database, therefore we copy
// it on disk, compact, and use it instead of the current database.
// Compacting the snapshot is necessary to reclaim the space
Expand Down
11 changes: 4 additions & 7 deletions pkg/experiment/metastore/fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"io"
"strconv"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -147,11 +146,11 @@ func (fsm *FSM) restore() error {

// Restore restores the FSM state from a snapshot.
func (fsm *FSM) Restore(snapshot io.ReadCloser) (err error) {
start := time.Now()
timer := prometheus.NewTimer(fsm.db.metrics.fsmRestoreSnapshotDuration)
level.Info(fsm.logger).Log("msg", "restoring snapshot")
defer func() {
_ = snapshot.Close()
fsm.db.metrics.fsmRestoreSnapshotDuration.Observe(time.Since(start).Seconds())
timer.ObserveDuration()
}()

level.Info(fsm.logger).Log("msg", "restoring snapshot")
Expand Down Expand Up @@ -225,7 +224,6 @@ func (fsm *FSM) Apply(log *raft.Log) any {
// and calls the corresponding handler on the _local_ FSM, based on
// the command type.
func (fsm *FSM) applyCommand(cmd *raft.Log) any {
start := time.Now()
var e RaftLogEntry
if err := e.UnmarshalBinary(cmd.Data); err != nil {
return errResponse(cmd, err)
Expand All @@ -238,9 +236,8 @@ func (fsm *FSM) applyCommand(cmd *raft.Log) any {

cmdType := strconv.FormatUint(uint64(e.Type), 10)
fsm.db.metrics.fsmApplyCommandSize.WithLabelValues(cmdType).Observe(float64(len(cmd.Data)))
defer func() {
fsm.db.metrics.fsmApplyCommandDuration.WithLabelValues(cmdType).Observe(time.Since(start).Seconds())
}()
timer := prometheus.NewTimer(fsm.db.metrics.fsmApplyCommandDuration.WithLabelValues(cmdType))
defer timer.ObserveDuration()

handle, ok := fsm.handlers[e.Type]
if !ok {
Expand Down
7 changes: 3 additions & 4 deletions pkg/experiment/metastore/fsm/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"context"
"io"
"runtime/pprof"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/hashicorp/raft"
"github.com/klauspost/compress/zstd"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/bbolt"
)

Expand All @@ -28,12 +28,11 @@ func (s *snapshotWriter) Persist(sink raft.SnapshotSink) (err error) {
pprof.SetGoroutineLabels(pprof.WithLabels(ctx, pprof.Labels("metastore_op", "persist")))
defer pprof.SetGoroutineLabels(ctx)

start := time.Now()
timer := prometheus.NewTimer(s.metrics.boltDBPersistSnapshotDuration)
level.Debug(s.logger).Log("msg", "persisting snapshot", "sink_id", sink.ID())
defer func() {
s.metrics.boltDBPersistSnapshotDuration.Observe(time.Since(start).Seconds())
if err == nil {
level.Info(s.logger).Log("msg", "persisted snapshot", "sink_id", sink.ID(), "duration", time.Since(start))
level.Info(s.logger).Log("msg", "persisted snapshot", "sink_id", sink.ID(), "duration", timer.ObserveDuration())
if err = sink.Close(); err != nil {
level.Error(s.logger).Log("msg", "failed to close sink", "err", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/experiment/metastore/index_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (svc *IndexService) addBlockMetadata(
level.Warn(svc.logger).Log("invalid metadata", "block", req.Block.Id, "err", err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}
_, err := svc.raft.Propose(
err := svc.raft.Commit(
fsm.RaftLogEntryType(raft_log.RaftCommand_RAFT_COMMAND_ADD_BLOCK_METADATA),
&raft_log.AddBlockMetadataRequest{Metadata: req.Block},
)
Expand Down
10 changes: 9 additions & 1 deletion pkg/experiment/metastore/metastore_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,16 @@ import (

// Raft represents a Raft consensus protocol interface. Any modifications to
// the state should be proposed through the Raft interface.
//
// The methods return an error if node is not the leader.
type Raft interface {
Propose(fsm.RaftLogEntryType, proto.Message) (proto.Message, error)
// Apply makes an attempt to apply the given command to the FSM:
// it returns when the command is applied to the local FSM.
Apply(fsm.RaftLogEntryType, proto.Message) (proto.Message, error)

// Commit makes an attempt to commit the given command to the raft log:
// it returns once the command is replicated to the quorum.
Commit(fsm.RaftLogEntryType, proto.Message) error
}

// State represents a consistent read-only view of the metastore.
Expand Down
24 changes: 21 additions & 3 deletions pkg/experiment/metastore/raftnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ func (n *Node) Init() (err error) {
raftConfig.SnapshotInterval = n.config.SnapshotInterval
raftConfig.LocalID = raft.ServerID(n.config.ServerID)

// Maximum number of buffered commands (default: 64).
// This sets the size of the channel used to queue commands
// for the FSM to apply.
raftConfig.MaxAppendEntries = 1 << 10
raftConfig.BatchApplyCh = true

n.raft, err = raft.NewRaft(raftConfig, n.fsm, n.logStore, n.stableStore, n.snapshotStore, n.transport)
if err != nil {
return fmt.Errorf("starting raft node: %w", err)
Expand Down Expand Up @@ -271,9 +277,7 @@ func (n *Node) TransferLeadership() (err error) {
return err
}

// Propose makes an attempt to apply the given command to the FSM.
// The function returns an error if node is not the leader.
func (n *Node) Propose(t fsm.RaftLogEntryType, m proto.Message) (resp proto.Message, err error) {
func (n *Node) Apply(t fsm.RaftLogEntryType, m proto.Message) (resp proto.Message, err error) {
raw, err := fsm.MarshalEntry(t, m)
if err != nil {
return nil, err
Expand All @@ -290,3 +294,17 @@ func (n *Node) Propose(t fsm.RaftLogEntryType, m proto.Message) (resp proto.Mess
}
return resp, r.Err
}

func (n *Node) Commit(t fsm.RaftLogEntryType, m proto.Message) error {
raw, err := fsm.MarshalEntry(t, m)
if err != nil {
return err
}
timer := prometheus.NewTimer(n.metrics.apply)
defer timer.ObserveDuration()
future := n.raft.Apply(raw, n.config.ApplyTimeout)
if err = future.WaitCommitted(); err != nil {
return WithRaftLeaderStatusDetails(err, n.raft)
}
return nil
}
21 changes: 17 additions & 4 deletions pkg/experiment/metastore/raftnode/node_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,28 @@ import (
)

type metrics struct {
apply prometheus.Histogram
read prometheus.Histogram
state *prometheus.GaugeVec
apply prometheus.Histogram
commit prometheus.Histogram
read prometheus.Histogram
state *prometheus.GaugeVec
}

func newMetrics(reg prometheus.Registerer) *metrics {
m := &metrics{
apply: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "raft_apply_command_duration_seconds",
Help: "Duration of applying a command to the Raft log",
Help: "Duration of applying a command to the Raft FSM",
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramZeroThreshold: 0,
NativeHistogramMaxBucketNumber: 50,
NativeHistogramMinResetDuration: time.Hour,
NativeHistogramMaxZeroThreshold: 0,
}),

commit: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "raft_commit_command_duration_seconds",
Help: "Duration of committing a command to the Raft log",
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramZeroThreshold: 0,
Expand Down Expand Up @@ -49,6 +61,7 @@ func newMetrics(reg prometheus.Registerer) *metrics {

if reg != nil {
util.RegisterOrGet(reg, m.apply)
util.RegisterOrGet(reg, m.commit)
util.RegisterOrGet(reg, m.read)
util.RegisterOrGet(reg, m.state)
}
Expand Down
Loading