Skip to content

Commit

Permalink
Merge branch 'main' into metrics-base
Browse files Browse the repository at this point in the history
  • Loading branch information
ping-ke committed Jan 7, 2024
2 parents 9372b64 + 26c220f commit 13763e8
Show file tree
Hide file tree
Showing 14 changed files with 138 additions and 78 deletions.
13 changes: 6 additions & 7 deletions cmd/es-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
ListenPort: ctx.GlobalInt(flags.RPCListenPort.Name),
ESCallURL: ctx.GlobalString(flags.RPCESCallURL.Name),
},
// Metrics: node.MetricsConfig{
// Enabled: ctx.GlobalBool(flags.MetricsEnabledFlag.Name),
// ListenAddr: ctx.GlobalString(flags.MetricsAddrFlag.Name),
// ListenPort: ctx.GlobalInt(flags.MetricsPortFlag.Name),
// },
Metrics: node.MetricsConfig{
Enabled: ctx.GlobalBool(flags.MetricsEnabledFlag.Name),
ListenAddr: ctx.GlobalString(flags.MetricsAddrFlag.Name),
ListenPort: ctx.GlobalInt(flags.MetricsPortFlag.Name),
},
Pprof: oppprof.CLIConfig{
Enabled: ctx.GlobalBool(flags.PprofEnabledFlag.Name),
ListenAddr: ctx.GlobalString(flags.PprofAddrFlag.Name),
Expand Down Expand Up @@ -212,8 +212,7 @@ func NewRollupConfig(ctx *cli.Context) (*rollup.EsConfig, error) {
// return nil, err
// }
config := rollup.EsConfig{
L2ChainID: new(big.Int).SetUint64(ctx.GlobalUint64(flags.L2ChainId.Name)),
MetricsEnable: ctx.Bool(flags.MetricsEnable.Name),
L2ChainID: new(big.Int).SetUint64(ctx.GlobalUint64(flags.L2ChainId.Name)),
}

return &config, nil
Expand Down
9 changes: 8 additions & 1 deletion cmd/es-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/ethstorage/go-ethstorage/ethstorage"
"github.com/ethstorage/go-ethstorage/ethstorage/flags"
eslog "github.com/ethstorage/go-ethstorage/ethstorage/log"
"github.com/ethstorage/go-ethstorage/ethstorage/metrics"
"github.com/ethstorage/go-ethstorage/ethstorage/node"
"github.com/urfave/cli"
)
Expand Down Expand Up @@ -118,7 +119,11 @@ func EsNodeMain(ctx *cli.Context) error {
return err
}

n, err := node.New(context.Background(), cfg, log, VersionWithMeta)
var m metrics.Metricer = metrics.NoopMetrics
if cfg.Metrics.Enabled {
m = metrics.NewMetrics("default")
}
n, err := node.New(context.Background(), cfg, log, VersionWithMeta, m)
if err != nil {
log.Error("Unable to create the storage node", "error", err)
return err
Expand All @@ -131,6 +136,8 @@ func EsNodeMain(ctx *cli.Context) error {
}
defer n.Close()

m.RecordInfo(VersionWithMeta)
m.RecordUp()
// TODO: heartbeat
if cfg.Pprof.Enabled {
pprofCtx, pprofCancel := context.WithCancel(context.Background())
Expand Down
24 changes: 19 additions & 5 deletions ethstorage/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,22 @@ var (
Value: 3333,
EnvVar: prefixEnvVar("L2_CHAIN_ID"),
}
MetricsEnable = cli.BoolFlag{
Name: "metrics.enable",
Usage: "Enable metrics",
EnvVar: prefixEnvVar("METRICS_ENABLE"),
MetricsEnabledFlag = cli.BoolFlag{
Name: "metrics.enabled",
Usage: "Enable the metrics server",
EnvVar: prefixEnvVar("METRICS_ENABLED"),
}
MetricsAddrFlag = cli.StringFlag{
Name: "metrics.addr",
Usage: "Metrics listening address",
Value: "0.0.0.0",
EnvVar: prefixEnvVar("METRICS_ADDR"),
}
MetricsPortFlag = cli.IntFlag{
Name: "metrics.port",
Usage: "Metrics listening port",
Value: 7300,
EnvVar: prefixEnvVar("METRICS_PORT"),
}
PprofEnabledFlag = cli.BoolFlag{
Name: "pprof.enabled",
Expand Down Expand Up @@ -205,7 +217,9 @@ var optionalFlags = []cli.Flag{
L1BeaconSlotTime,
L1MinDurationForBlobsRequest,
L2ChainId,
MetricsEnable,
MetricsEnabledFlag,
MetricsAddrFlag,
MetricsPortFlag,
PprofEnabledFlag,
PprofAddrFlag,
PprofPortFlag,
Expand Down
44 changes: 22 additions & 22 deletions ethstorage/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type Metricer interface {
ClientGetBlobsByRangeEvent(peerID string, resultCode byte, duration time.Duration)
ClientGetBlobsByListEvent(peerID string, resultCode byte, duration time.Duration)
ClientFillEmptyBlobsEvent(count uint64, duration time.Duration)
ClientOnBlobsByRange(peerID string, reqCount, retBlobCount, insertedCount uint64, duration time.Duration)
ClientOnBlobsByList(peerID string, reqCount, retBlobCount, insertedCount uint64, duration time.Duration)
ClientOnBlobsByRange(peerID string, reqCount, getBlobCount, insertedCount uint64, duration time.Duration)
ClientOnBlobsByList(peerID string, reqCount, getBlobCount, insertedCount uint64, duration time.Duration)
ClientRecordTimeUsed(method string) func()
IncDropPeerCount()
IncPeerCount()
Expand Down Expand Up @@ -483,51 +483,51 @@ func (m *Metrics) SetPeerScores(scores map[string]float64) {
func (m *Metrics) ClientGetBlobsByRangeEvent(peerID string, resultCode byte, duration time.Duration) {
code := strconv.FormatUint(uint64(resultCode), 10)
m.SyncClientRequestsTotal.WithLabelValues("get_blobs_by_range", code).Inc()
m.SyncClientRequestDurationSeconds.WithLabelValues("get_blobs_by_range", code).Observe(float64(duration) / float64(time.Second))
m.SyncClientRequestDurationSeconds.WithLabelValues("get_blobs_by_range", code).Observe(duration.Seconds())
m.SyncClientPeerRequestsTotal.WithLabelValues(peerID, "get_blobs_by_range", code).Inc()
m.SyncClientPeerRequestDurationSeconds.WithLabelValues(peerID, "get_blobs_by_range", code).Observe(float64(duration) / float64(time.Second))
m.SyncClientPeerRequestDurationSeconds.WithLabelValues(peerID, "get_blobs_by_range", code).Observe(duration.Seconds())
}

func (m *Metrics) ClientGetBlobsByListEvent(peerID string, resultCode byte, duration time.Duration) {
code := strconv.FormatUint(uint64(resultCode), 10)
m.SyncClientRequestsTotal.WithLabelValues("get_blobs_by_list", code).Inc()
m.SyncClientRequestDurationSeconds.WithLabelValues("get_blobs_by_list", code).Observe(float64(duration) / float64(time.Second))
m.SyncClientRequestDurationSeconds.WithLabelValues("get_blobs_by_list", code).Observe(duration.Seconds())
m.SyncClientPeerRequestsTotal.WithLabelValues(peerID, "get_blobs_by_list", code).Inc()
m.SyncClientPeerRequestDurationSeconds.WithLabelValues(peerID, "get_blobs_by_list", code).Observe(float64(duration) / float64(time.Second))
m.SyncClientPeerRequestDurationSeconds.WithLabelValues(peerID, "get_blobs_by_list", code).Observe(duration.Seconds())
}

func (m *Metrics) ClientFillEmptyBlobsEvent(count uint64, duration time.Duration) {
method := "fillEmpty"
m.SyncClientPerfCallTotal.WithLabelValues(method).Add(float64(count))
m.SyncClientPerfCallDurationSeconds.WithLabelValues(method).Observe(float64(duration) / float64(time.Second) / float64(count))
m.SyncClientPerfCallDurationSeconds.WithLabelValues(method).Observe(duration.Seconds() / float64(count))
}

func (m *Metrics) ClientOnBlobsByRange(peerID string, reqBlobCount, retBlobCount, insertedCount uint64, duration time.Duration) {
func (m *Metrics) ClientOnBlobsByRange(peerID string, reqBlobCount, getBlobCount, insertedCount uint64, duration time.Duration) {
m.SyncClientState.WithLabelValues("reqBlobCount").Add(float64(reqBlobCount))
m.SyncClientState.WithLabelValues("retBlobCount").Add(float64(retBlobCount))
m.SyncClientState.WithLabelValues("getBlobCount").Add(float64(getBlobCount))
m.SyncClientState.WithLabelValues("insertedBlobCount").Add(float64(insertedCount))

m.SyncClientPeerState.WithLabelValues(peerID, "reqBlobCount").Add(float64(reqBlobCount))
m.SyncClientPeerState.WithLabelValues(peerID, "retBlobCount").Add(float64(retBlobCount))
m.SyncClientPeerState.WithLabelValues(peerID, "getBlobCount").Add(float64(getBlobCount))
m.SyncClientPeerState.WithLabelValues(peerID, "insertedBlobCount").Add(float64(insertedCount))

method := "onBlobsByRange"
m.SyncClientPerfCallTotal.WithLabelValues(method).Inc()
m.SyncClientPerfCallDurationSeconds.WithLabelValues(method).Observe(float64(duration) / float64(time.Second))
m.SyncClientPerfCallDurationSeconds.WithLabelValues(method).Observe(duration.Seconds())
}

func (m *Metrics) ClientOnBlobsByList(peerID string, reqCount, retBlobCount, insertedCount uint64, duration time.Duration) {
func (m *Metrics) ClientOnBlobsByList(peerID string, reqCount, getBlobCount, insertedCount uint64, duration time.Duration) {
m.SyncClientState.WithLabelValues("reqBlobCount").Add(float64(reqCount))
m.SyncClientState.WithLabelValues("retBlobCount").Add(float64(retBlobCount))
m.SyncClientState.WithLabelValues("getBlobCount").Add(float64(getBlobCount))
m.SyncClientState.WithLabelValues("insertedBlobCount").Add(float64(insertedCount))

m.SyncClientPeerState.WithLabelValues(peerID, "reqBlobCount").Add(float64(reqCount))
m.SyncClientPeerState.WithLabelValues(peerID, "retBlobCount").Add(float64(retBlobCount))
m.SyncClientPeerState.WithLabelValues(peerID, "getBlobCount").Add(float64(getBlobCount))
m.SyncClientPeerState.WithLabelValues(peerID, "insertedBlobCount").Add(float64(insertedCount))

method := "onBlobsByList"
m.SyncClientPerfCallTotal.WithLabelValues(method).Inc()
m.SyncClientPerfCallDurationSeconds.WithLabelValues(method).Observe(float64(duration) / float64(time.Second))
m.SyncClientPerfCallDurationSeconds.WithLabelValues(method).Observe(duration.Seconds())
}

func (m *Metrics) ClientRecordTimeUsed(method string) func() {
Expand All @@ -553,19 +553,19 @@ func (m *Metrics) DecPeerCount() {
func (m *Metrics) ServerGetBlobsByRangeEvent(peerID string, resultCode byte, duration time.Duration) {
code := strconv.FormatUint(uint64(resultCode), 10)
m.SyncServerHandleReqTotal.WithLabelValues("get_blobs_by_range", code).Inc()
m.SyncServerHandleReqDurationSeconds.WithLabelValues("get_blobs_by_range", code).Observe(float64(duration) / float64(time.Second))
m.SyncServerHandleReqDurationSeconds.WithLabelValues("get_blobs_by_range", code).Observe(duration.Seconds())

m.SyncServerHandleReqTotalPerPeer.WithLabelValues(peerID, "get_blobs_by_range", code).Inc()
m.SyncServerHandleReqDurationSecondsPerPeer.WithLabelValues(peerID, "get_blobs_by_range", code).Observe(float64(duration) / float64(time.Second))
m.SyncServerHandleReqDurationSecondsPerPeer.WithLabelValues(peerID, "get_blobs_by_range", code).Observe(duration.Seconds())
}

func (m *Metrics) ServerGetBlobsByListEvent(peerID string, resultCode byte, duration time.Duration) {
code := strconv.FormatUint(uint64(resultCode), 10)
m.SyncServerHandleReqTotal.WithLabelValues("get_blobs_by_list", code).Inc()
m.SyncServerHandleReqDurationSeconds.WithLabelValues("get_blobs_by_list", code).Observe(float64(duration) / float64(time.Second))
m.SyncServerHandleReqDurationSeconds.WithLabelValues("get_blobs_by_list", code).Observe(duration.Seconds())

m.SyncServerHandleReqTotalPerPeer.WithLabelValues(peerID, "get_blobs_by_list", code).Inc()
m.SyncServerHandleReqDurationSecondsPerPeer.WithLabelValues(peerID, "get_blobs_by_list", code).Observe(float64(duration) / float64(time.Second))
m.SyncServerHandleReqDurationSecondsPerPeer.WithLabelValues(peerID, "get_blobs_by_list", code).Observe(duration.Seconds())
}

func (m *Metrics) ServerReadBlobs(peerID string, read, sucRead uint64, timeUse time.Duration) {
Expand All @@ -576,7 +576,7 @@ func (m *Metrics) ServerReadBlobs(peerID string, read, sucRead uint64, timeUse t

method := "readBlobs"
m.SyncServerPerfCallTotal.WithLabelValues(method).Inc()
m.SyncServerPerfCallDurationSeconds.WithLabelValues(method).Observe(float64(timeUse) / float64(time.Second))
m.SyncServerPerfCallDurationSeconds.WithLabelValues(method).Observe(timeUse.Seconds())
}

func (m *Metrics) ServerRecordTimeUsed(method string) func() {
Expand Down Expand Up @@ -642,11 +642,11 @@ func (n *noopMetricer) ClientGetBlobsByListEvent(peerID string, resultCode byte,
func (n *noopMetricer) ClientFillEmptyBlobsEvent(count uint64, duration time.Duration) {
}

func (n *noopMetricer) ClientOnBlobsByRange(peerID string, reqCount, retBlobCount, insertedCount uint64, duration time.Duration) {
func (n *noopMetricer) ClientOnBlobsByRange(peerID string, reqCount, getBlobCount, insertedCount uint64, duration time.Duration) {

}

func (n *noopMetricer) ClientOnBlobsByList(peerID string, reqCount, retBlobCount, insertedCount uint64, duration time.Duration) {
func (n *noopMetricer) ClientOnBlobsByList(peerID string, reqCount, getBlobCount, insertedCount uint64, duration time.Duration) {
}

func (n *noopMetricer) ClientRecordTimeUsed(method string) func() {
Expand Down
28 changes: 24 additions & 4 deletions ethstorage/node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package node

import (
"errors"
"fmt"
"math"
"path/filepath"
"time"

Expand Down Expand Up @@ -41,7 +43,7 @@ type Config struct {

Storage storage.StorageConfig

// Metrics MetricsConfig
Metrics MetricsConfig

Pprof oppprof.CLIConfig

Expand All @@ -54,6 +56,24 @@ type Config struct {
Mining *miner.Config
}

type MetricsConfig struct {
Enabled bool
ListenAddr string
ListenPort int
}

func (m MetricsConfig) Check() error {
if !m.Enabled {
return nil
}

if m.ListenPort < 0 || m.ListenPort > math.MaxUint16 {
return errors.New("invalid metrics port")
}

return nil
}

type RPCConfig struct {
ListenAddr string
ListenPort int
Expand All @@ -71,9 +91,9 @@ func (cfg *Config) Check() error {
// if err := cfg.Rollup.Check(); err != nil {
// return fmt.Errorf("rollup config error: %w", err)
// }
// if err := cfg.Metrics.Check(); err != nil {
// return fmt.Errorf("metrics config error: %w", err)
// }
if err := cfg.Metrics.Check(); err != nil {
return fmt.Errorf("metrics config error: %w", err)
}
if err := cfg.Pprof.Check(); err != nil {
return fmt.Errorf("pprof config error: %w", err)
}
Expand Down
16 changes: 9 additions & 7 deletions ethstorage/node/eth_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package node

import (
"context"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -15,28 +16,29 @@ import (
)

type ethAPI struct {
rpcCfg *RPCConfig
log log.Logger
rpcCfg *RPCConfig
chainId *big.Int
log log.Logger
}

const (
ESChainID = 333
defaultCallTimeout = 2 * time.Second
)

var (
rpcCli *rpc.Client
)

func NewETHAPI(config *RPCConfig, log log.Logger) *ethAPI {
func NewETHAPI(config *RPCConfig, chainId *big.Int, log log.Logger) *ethAPI {
return &ethAPI{
rpcCfg: config,
log: log,
rpcCfg: config,
chainId: chainId,
log: log,
}
}

func (api *ethAPI) ChainId() hexutil.Uint64 {
return hexutil.Uint64(ESChainID)
return hexutil.Uint64(api.chainId.Uint64())
}

type TransactionArgs struct {
Expand Down
35 changes: 25 additions & 10 deletions ethstorage/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import (
type EsNode struct {
log log.Logger
appVersion string
metrics *metrics.Metrics
metrics metrics.Metricer

l1HeadsSub ethereum.Subscription // Subscription to get L1 heads (automatically re-subscribes on error)
l1SafeSub ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling)
l1FinalizedSub ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling)
l1FinalizedSub ethereum.Subscription // Subscription to get L1 Finalized blocks, a.k.a. justified data (polling)

l1Source *eth.PollingClient // L1 Client to fetch data from
l1Beacon *eth.BeaconClient // L1 Beacon Chain to fetch blobs from
Expand All @@ -56,14 +56,15 @@ type EsNode struct {
feed *event.Feed
}

func New(ctx context.Context, cfg *Config, log log.Logger, appVersion string) (*EsNode, error) {
// if err := cfg.Check(); err != nil {
// return nil, err
// }
func New(ctx context.Context, cfg *Config, log log.Logger, appVersion string, m metrics.Metricer) (*EsNode, error) {
if err := cfg.Check(); err != nil {
return nil, err
}

n := &EsNode{
log: log,
appVersion: appVersion,
metrics: m,
}
// not a context leak, gossipsub is closed with a context.
n.resourcesCtx, n.resourcesClose = context.WithCancel(context.Background())
Expand Down Expand Up @@ -113,9 +114,9 @@ func (n *EsNode) init(ctx context.Context, cfg *Config) error {
if err := n.initRPCServer(ctx, cfg); err != nil {
return err
}
// if err := n.initMetricsServer(ctx, cfg); err != nil {
// return err
// }
if err := n.initMetricsServer(ctx, cfg); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -215,7 +216,7 @@ func (n *EsNode) initStorageManager(ctx context.Context, cfg *Config) error {
}

func (n *EsNode) initRPCServer(ctx context.Context, cfg *Config) error {
server, err := newRPCServer(ctx, &cfg.RPC, n.storageManager, n.downloader, n.log, n.appVersion)
server, err := newRPCServer(ctx, &cfg.RPC, cfg.Rollup.L2ChainID, n.storageManager, n.downloader, n.log, n.appVersion)
if err != nil {
return err
}
Expand All @@ -227,6 +228,20 @@ func (n *EsNode) initRPCServer(ctx context.Context, cfg *Config) error {
return nil
}

func (n *EsNode) initMetricsServer(ctx context.Context, cfg *Config) error {
if !cfg.Metrics.Enabled {
n.log.Info("Metrics disabled")
return nil
}
n.log.Info("Starting metrics server", "addr", cfg.Metrics.ListenAddr, "port", cfg.Metrics.ListenPort)
go func() {
if err := n.metrics.Serve(ctx, cfg.Metrics.ListenAddr, cfg.Metrics.ListenPort); err != nil {
log.Crit("Error starting metrics server", "err", err)
}
}()
return nil
}

func (n *EsNode) initMiner(ctx context.Context, cfg *Config) error {
if cfg.Mining == nil {
// not enabled
Expand Down
Loading

0 comments on commit 13763e8

Please sign in to comment.