Skip to content

Commit

Permalink
add prometheus metrics server and config (#1015)
Browse files Browse the repository at this point in the history
* add prometheus metrics server and config

* verify json-rpc req counter
  • Loading branch information
jchappelow authored Sep 24, 2024
1 parent 65f74c8 commit 162cabe
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 16 deletions.
5 changes: 5 additions & 0 deletions cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ var DefaultConfig = func() *commonConfig.KwildConfig {
MaxLogSizeKB: 100_000, // 100 MB uncompressed threshold
MaxLogRolls: 0, // the zero value means retain all (don't delete oldest archived logs)
},
Instrumentation: &commonConfig.InstrumentationConfig{
Prometheus: false,
PromListenAddr: "0.0.0.0:26660",
MaxConnections: 1,
},

ChainConfig: &commonConfig.ChainConfig{
P2P: &commonConfig.P2PConfig{
Expand Down
21 changes: 11 additions & 10 deletions cmd/kwild/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,27 +365,28 @@ func DefaultEmptyConfig() *config.KwildConfig {
StateSync: &config.StateSyncConfig{},
Consensus: &config.ConsensusConfig{},
},
Logging: &config.Logging{},
Logging: &config.Logging{},
Instrumentation: &config.InstrumentationConfig{},
}
}

// EmptyConfig returns a config with all fields set to their zero values.
// This is useful for guaranteeing that all fields are set when merging
// EmptyConfig returns a config with all fields set to their zero values (except
// no nil pointers for the sub-sections structs). This is useful for
// guaranteeing that all fields are set when merging.
func EmptyConfig() *config.KwildConfig {
return &config.KwildConfig{
AppConfig: &config.AppConfig{
ExtensionEndpoints: []string{},
},
ChainConfig: &config.ChainConfig{
P2P: &config.P2PConfig{},
RPC: &config.ChainRPCConfig{},
Mempool: &config.MempoolConfig{},
StateSync: &config.StateSyncConfig{
RPCServers: "",
},
P2P: &config.P2PConfig{},
RPC: &config.ChainRPCConfig{},
Mempool: &config.MempoolConfig{},
StateSync: &config.StateSyncConfig{},
Consensus: &config.ConsensusConfig{},
},
Logging: &config.Logging{},
Logging: &config.Logging{},
Instrumentation: &config.InstrumentationConfig{},
}
}

Expand Down
4 changes: 4 additions & 0 deletions cmd/kwild/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func Test_Config_Toml(t *testing.T) {
assert.Equal(t, "localhost:50052", cfg.AppConfig.ExtensionEndpoints[0])
assert.Equal(t, "localhost:50053", cfg.AppConfig.ExtensionEndpoints[1])

assert.Equal(t, true, cfg.Instrumentation.Prometheus)
assert.Equal(t, 6, cfg.Instrumentation.MaxConnections)
assert.Equal(t, "9.8.7.6:20660", cfg.Instrumentation.PromListenAddr)

// TODO: Add bunch of other validations for different types
}

Expand Down
14 changes: 13 additions & 1 deletion cmd/kwild/config/default_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -307,4 +307,16 @@ discovery_time = "15s"
chunk_request_timeout = "10s"

# Note: If the requested chunk is not received for a duration of 2 minutes (hard-coded default),
# the state sync process is aborted and the node will fall back to the regular block sync process.
# the state sync process is aborted and the node will fall back to the regular block sync process.

[instrumentation]

# collect and serve are served under /metrics
prometheus = false

# listen address for prometheus metrics
prometheus_listen_addr = "0.0.0.0:26660"

# Maximum number of simultaneous connections.
# 0 - unlimited.
max_open_connections = 1
5 changes: 5 additions & 0 deletions cmd/kwild/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ to instead run a dedicated seeder like https://github.com/kwilteam/cometseed.`))
flagSet.Var(&cfg.ChainConfig.StateSync.DiscoveryTime, "chain.statesync.discovery-time", "Chain state sync discovery time")
flagSet.Var(&cfg.ChainConfig.StateSync.ChunkRequestTimeout, "chain.statesync.chunk-request-timeout", "Chain state sync chunk request timeout")

// Instrumentation flags
flagSet.BoolVar(&cfg.Instrumentation.Prometheus, "instrumentation.prometheus", cfg.Instrumentation.Prometheus, "collect and serve prometheus metrics")
flagSet.StringVar(&cfg.Instrumentation.PromListenAddr, "instrumentation.prometheus-listen-addr", cfg.Instrumentation.PromListenAddr, "listen address for prometheus metrics")
flagSet.IntVar(&cfg.Instrumentation.MaxConnections, "instrumentation.max-open-connections", cfg.Instrumentation.MaxConnections, "maximum number of simultaneous connections")

// TODO: delete in v0.10.0
flagSet.String("app.snapshots.snapshot-dir", "", "Snapshot directory path")
flagSet.MarkDeprecated("app.snapshots.snapshot-dir", "this value is no longer configurable")
Expand Down
12 changes: 12 additions & 0 deletions cmd/kwild/config/test_data/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,15 @@ chunk_request_timeout = "10s"

# Note: If the requested chunk is not received for a duration of 2 minutes (hard-coded default),
# the state sync process is aborted and the node will fall back to the regular block sync process.

[instrumentation]

# collect and serve are served under /metrics
prometheus = true

# listen address for prometheus metrics
prometheus_listen_addr = "9.8.7.6:20660"

# Maximum number of simultaneous connections.
# 0 - unlimited.
max_open_connections = 6
10 changes: 9 additions & 1 deletion cmd/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
abciTypes "github.com/cometbft/cometbft/abci/types"
cmtEd "github.com/cometbft/cometbft/crypto/ed25519"
cmtlocal "github.com/cometbft/cometbft/rpc/client/local"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"

kwildcfg "github.com/kwilteam/kwil-db/cmd/kwild/config"
"github.com/kwilteam/kwil-db/common"
Expand Down Expand Up @@ -112,6 +114,11 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server {
// FinalizeBlock+Commit. This is not just a constructor, sadly.
cometBftNode := buildCometNode(d, closers, abciApp)

prometheus.MustRegister(
collectors.NewBuildInfoCollector(),
// collectors.NewDBStatsCollector() // TODO: do something like this for pg.DB
)

// Give abci p2p module access to removing peers
p2p.SetRemovePeerFn(cometBftNode.RemovePeer)

Expand Down Expand Up @@ -156,7 +163,8 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server {
jsonRPCServer, err := rpcserver.NewServer(d.cfg.AppConfig.JSONRPCListenAddress,
*rpcServerLogger, rpcserver.WithTimeout(time.Duration(d.cfg.AppConfig.RPCTimeout)),
rpcserver.WithReqSizeLimit(d.cfg.AppConfig.RPCMaxReqSize),
rpcserver.WithCORS(), rpcserver.WithServerInfo(&usersvc.SpecInfo))
rpcserver.WithCORS(), rpcserver.WithServerInfo(&usersvc.SpecInfo),
rpcserver.WithMetricsNamespace("kwil_json_rpc_user_server"))
if err != nil {
failBuild(err, "unable to create json-rpc server")
}
Expand Down
7 changes: 7 additions & 0 deletions cmd/kwild/server/cometbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ func newCometConfig(cfg *config.KwildConfig) *cmtCfg.Config {
nodeCfg.StateSync.DiscoveryTime = time.Duration(userChainCfg.StateSync.DiscoveryTime)
nodeCfg.StateSync.ChunkRequestTimeout = time.Duration(userChainCfg.StateSync.ChunkRequestTimeout)

nodeCfg.Instrumentation = &cmtCfg.InstrumentationConfig{
Prometheus: cfg.Instrumentation.Prometheus,
PrometheusListenAddr: cfg.Instrumentation.PromListenAddr,
MaxOpenConnections: cfg.Instrumentation.MaxConnections,
Namespace: "cometbft",
}

// Light client verification
nodeCfg.StateSync.TrustPeriod = 36000 * time.Second // 10 hours (6s block time)

Expand Down
13 changes: 10 additions & 3 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import (
type KwildConfig struct {
RootDir string

AppConfig *AppConfig `mapstructure:"app"`
ChainConfig *ChainConfig `mapstructure:"chain"`
Logging *Logging `mapstructure:"log"`
AppConfig *AppConfig `mapstructure:"app"`
ChainConfig *ChainConfig `mapstructure:"chain"`
Logging *Logging `mapstructure:"log"`
Instrumentation *InstrumentationConfig `mapstructure:"instrumentation"`
}

type Logging struct {
Expand All @@ -36,6 +37,12 @@ type Logging struct {
MaxLogRolls int `mapstructure:"retain_max_rolls"`
}

type InstrumentationConfig struct {
Prometheus bool `mapstructure:"prometheus"`
PromListenAddr string `mapstructure:"prometheus_listen_addr"`
MaxConnections int `mapstructure:"max_open_connections"`
}

type AppConfig struct {
JSONRPCListenAddress string `mapstructure:"jsonrpc_listen_addr"`
AdminListenAddress string `mapstructure:"admin_listen_addr"`
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/near/borsh-go v0.3.1
github.com/olekukonko/tablewriter v0.0.6-0.20230925090304-df64c4bbad77
github.com/pelletier/go-toml/v2 v2.2.2
github.com/prometheus/client_golang v1.20.1
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.19.0
Expand Down Expand Up @@ -116,7 +117,6 @@ require (
github.com/pganalyze/pg_query_go/v5 v5.1.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.20.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
Expand Down
51 changes: 51 additions & 0 deletions internal/services/jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/kwilteam/kwil-db/core/log"
jsonrpc "github.com/kwilteam/kwil-db/core/rpc/json"
"github.com/kwilteam/kwil-db/internal/services/jsonrpc/openrpc"
Expand Down Expand Up @@ -56,6 +58,14 @@ type Server struct {
spec json.RawMessage
authSHA []byte
tlsCfg *tls.Config

// UNSTABLE: this is not much more than a placeholder to ensure we can add
// our own metrics to the global prometheus metrics registry.
metrics map[string]Metrics
}

type Metrics interface {
Inc()
}

type serverConfig struct {
Expand All @@ -66,6 +76,7 @@ type serverConfig struct {
specInfo *openrpc.Info
reqSzLimit int
proxyCount int
namespace string
}

type Opt func(*serverConfig)
Expand Down Expand Up @@ -93,6 +104,13 @@ func WithTrustedProxyCount(trustedProxyCount int) Opt {
}
}

// WithMetricsNamespace enables metrics with the provided namespace.
func WithMetricsNamespace(namespace string) Opt {
return func(c *serverConfig) {
c.namespace = namespace
}
}

// WithServerInfo sets the OpenRPC "info" section to use when serving the
// OpenRPC JSON specification either via a spec REST endpoint or the
// rpc.discover JSON-RPC method.
Expand Down Expand Up @@ -171,6 +189,11 @@ var (
}
)

const (
// This is name of the counter for all JSON-RPC requests (on /rpc/v1).
reqCounterName = "jsonrpc_request_counter_UNSTABLE"
)

// NewServer creates a new JSON-RPC server. Use RegisterMethodHandler or
// RegisterSvc to add method handlers.
func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) {
Expand Down Expand Up @@ -201,6 +224,22 @@ func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) {
opt(cfg)
}

// A more complete and structured metrics system should to be created, but
// this is a start to ensure we are accessing the global metrics system used
// by cometbft. In Grafana or another prom dash,
// 'kwil_json_rpc_user_server_request_counter_UNSTABLE' will be graphable.
var metrics map[string]Metrics
if cfg.namespace != "" {
counter := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: cfg.namespace,
Name: reqCounterName,
})
prometheus.MustRegister(counter)
metrics = map[string]Metrics{
reqCounterName: counter,
}
}

mux := http.NewServeMux() // http.DefaultServeMux has the pprof endpoints mounted

disconnectTimeout := cfg.timeout + 5*time.Second // for jsonRPCTimeoutHandler to respond, don't disconnect immediately
Expand Down Expand Up @@ -228,6 +267,7 @@ func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) {
services: make(map[string]Svc),
specInfo: cfg.specInfo,
tlsCfg: cfg.tlsConfig,
metrics: metrics,
}

if cfg.pass != "" {
Expand All @@ -246,6 +286,7 @@ func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) {
if cfg.enableCORS {
h = corsHandler(h)
}
h = reqCounter(h, metrics[reqCounterName])
h = realIPHandler(h, cfg.proxyCount) // for effective rate limiting
h = recoverer(h, log) // first, wrap with defer and call next ^

Expand Down Expand Up @@ -356,6 +397,16 @@ func corsHandler(h http.Handler) http.Handler {
})
}

func reqCounter(h http.Handler, counter Metrics) http.Handler {
if counter == nil {
return h
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
counter.Inc()
h.ServeHTTP(w, r)
})
}

func recoverer(h http.Handler, log log.Logger) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
Expand Down

0 comments on commit 162cabe

Please sign in to comment.