Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dashboard part 2: Show all the nodes' status on the dashboard #268

Merged
merged 49 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
02319a0
add private dashboard
ping-ke Apr 1, 2024
7cd74fa
small change
ping-ke Apr 2, 2024
ddba647
Merge branch 'main' into priv-dashboard
ping-ke Apr 2, 2024
a0687c1
fix bug
ping-ke Apr 2, 2024
1ec0481
change base
ping-ke Apr 3, 2024
dc4a3ff
fix bug
ping-ke Apr 3, 2024
7ff49a4
resolve comments
ping-ke Apr 4, 2024
a6cafe2
change metrics name
ping-ke Apr 4, 2024
79b7d9c
change key
ping-ke Apr 5, 2024
2c1cf26
add upload url flag
ping-ke Apr 5, 2024
2c84ebf
fix bug
ping-ke Apr 6, 2024
ae3fd3a
add comments
ping-ke Apr 7, 2024
519de6d
update metrics
ping-ke Apr 7, 2024
1b1f56c
fix bug
ping-ke Apr 7, 2024
8493dc1
fix bug
ping-ke Apr 7, 2024
07109ba
resolve comments
ping-ke Apr 7, 2024
6558940
fix bug
ping-ke Apr 7, 2024
3a76112
add log for private dashboard
ping-ke Apr 7, 2024
212628a
change default value
ping-ke Apr 7, 2024
4110b6c
fix bug and test cases
ping-ke Apr 8, 2024
a00a58d
fix test
ping-ke Apr 8, 2024
ab718f4
small change
ping-ke Apr 9, 2024
8ccc8d0
Merge branch 'main' into priv-dashboard
ping-ke Apr 9, 2024
38b001d
Merge branch 'main' into priv-dashboard
qzhodl Apr 15, 2024
3346d8c
Merge branch 'main' into priv-dashboard
ping-ke Apr 15, 2024
81fbb20
Merge branch 'priv-dashboard' of https://github.com/ethstorage/es-nod…
ping-ke Apr 15, 2024
6fc75a0
Merge branch 'main' into priv-dashboard
ping-ke Apr 15, 2024
c7a2e74
Merge branch 'main' into priv-dashboard
ping-ke Apr 16, 2024
62b7c25
Merge branch 'main' into priv-dashboard
ping-ke Apr 19, 2024
11cb311
remove useless code
ping-ke Apr 19, 2024
1698fa7
fix build
ping-ke Apr 19, 2024
aa3e350
Merge branch 'main' into priv-dashboard
ping-ke Apr 19, 2024
f40eecf
resolve comments
ping-ke Apr 19, 2024
0816e5b
Merge branch 'priv-dashboard' of https://github.com/ethstorage/es-nod…
ping-ke Apr 19, 2024
15953ab
Merge branch 'main' into priv-dashboard
ping-ke Apr 22, 2024
d397251
merge
ping-ke Apr 22, 2024
365da52
fix bug
ping-ke Apr 22, 2024
996de56
Merge branch 'main' into priv-dashboard
ping-ke Apr 23, 2024
0fcc6cc
Merge branch 'main' into priv-dashboard
ping-ke Apr 23, 2024
ec50baa
bug fix
ping-ke Apr 23, 2024
367e4c1
Merge branch 'priv-dashboard' of https://github.com/ethstorage/es-nod…
ping-ke Apr 23, 2024
3bc1f41
resolve comments
ping-ke Apr 23, 2024
c0bc54c
delete items
ping-ke Apr 23, 2024
5f961ac
fix bug
ping-ke Apr 23, 2024
776525c
add DeletePeerInfo
ping-ke Apr 23, 2024
f2d63fd
add DeletePeerInfo
ping-ke Apr 23, 2024
0350659
bug fix
ping-ke Apr 23, 2024
302880b
resolve comments
ping-ke Apr 23, 2024
0b0f78f
fix bug
ping-ke Apr 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/es-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
Rollup: *rollupConfig,
Downloader: *dlConfig,

DataDir: datadir,
DBConfig: db.DefaultDBConfig(),
DataDir: datadir,
StateUploadURL: ctx.GlobalString(flags.StateUploadURL.Name),
DBConfig: db.DefaultDBConfig(),
// Driver: *driverConfig,
RPC: node.RPCConfig{
ListenAddr: ctx.GlobalString(flags.RPCListenAddr.Name),
Expand Down
192 changes: 192 additions & 0 deletions cmd/priv-dashboard/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright 2022-2023, EthStorage.
// For license information, see https://github.com/ethstorage/es-node/blob/main/LICENSE

package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"math"
"net/http"
"os"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethstorage/go-ethstorage/ethstorage/metrics"
"github.com/ethstorage/go-ethstorage/ethstorage/node"
)

const (
timeoutTime = time.Minute * 10
)

var (
listenAddrFlag = flag.String("address", "0.0.0.0", "Listener address")
portFlag = flag.Int("port", 8080, "Listener port for the es-node to report node status")
grafanaPortFlag = flag.Int("grafana", 9500, "Listener port for the metrics report")
logFlag = flag.Int("loglevel", 3, "Log level to use for Ethereum and the faucet")
)

type record struct {
receivedTime time.Time
state *node.NodeState
}

type dashboard struct {
ctx context.Context
nodes map[string]*record
ping-ke marked this conversation as resolved.
Show resolved Hide resolved
m *metrics.NetworkMetrics
logger log.Logger
}

func newDashboard() (*dashboard, error) {
var (
m = metrics.NewNetworkMetrics()
logger = log.New("app", "Dashboard")
ctx = context.Background()
)

return &dashboard{
ctx: ctx,
nodes: make(map[string]*record),
m: m,
logger: logger,
}, nil
}

func (d *dashboard) HelloHandler(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
d.logger.Warn("Read Hello body failed", "err", err.Error())
return
}
d.logger.Info("Get hello from node", "id", string(body))
answer := `{"status":"ok"}`
w.Write([]byte(answer))
}

func (d *dashboard) ReportStateHandler(w http.ResponseWriter, r *http.Request) {
state := node.NodeState{}

body, err := io.ReadAll(r.Body)
if err != nil {
d.logger.Warn("Read ReportState body failed", "err", err.Error())
return
}
err = json.Unmarshal(body, &state)
if err != nil {
log.Warn("Parse node state failed", "error", err.Error())
w.Write([]byte(fmt.Sprintf(`{"status":"error", "err message":"%s"}`, err.Error())))
return
}

log.Info("Get state from peer", "peer id", state.Id, "state", string(body))
d.nodes[state.Id] = &record{receivedTime: time.Now(), state: &state}
for _, shard := range state.Shards {
d.m.SetPeerInfo(state.Id, state.Version, state.Address, shard.ShardId, shard.Miner)
sync, mining, submission := shard.SyncState, shard.MiningState, shard.SubmissionState
d.m.SetSyncState(state.Id, state.Version, state.Address, shard.ShardId, shard.Miner, sync.PeerCount, sync.SyncProgress,
sync.SyncedSeconds, sync.FillEmptyProgress, sync.FillEmptySeconds, shard.ProvidedBlob)
d.m.SetMiningState(state.Id, state.Version, state.Address, shard.ShardId, shard.Miner, mining.MiningPower, mining.SamplingTime)
d.m.SetSubmissionState(state.Id, state.Version, state.Address, shard.ShardId, shard.Miner, submission.Succeeded,
submission.Failed, submission.Dropped, submission.LastSucceededTime)
}
w.Write([]byte(`{"status":"ok"}`))
}

func (d *dashboard) Report() {
peersTotal := len(d.nodes)
minerOfShards := make(map[uint64]map[common.Address]struct{})
versions := make(map[string]int)
shards := make(map[uint64]int)
phasesOfShard := make(map[uint64]map[string]int)
for id, r := range d.nodes {
if time.Since(r.receivedTime) > timeoutTime {
delete(d.nodes, id)
continue
}

if _, ok := versions[r.state.Version]; !ok {
versions[r.state.Version] = 0
}
versions[r.state.Version] = versions[r.state.Version] + 1

for _, s := range r.state.Shards {
if _, ok := shards[s.ShardId]; !ok {
shards[s.ShardId] = 0
}
shards[s.ShardId] = shards[s.ShardId] + 1

if _, ok := minerOfShards[s.ShardId]; !ok {
minerOfShards[s.ShardId] = make(map[common.Address]struct{})
}
minerOfShards[s.ShardId][s.Miner] = struct{}{}

if _, ok := phasesOfShard[s.ShardId]; !ok {
phases := make(map[string]int)
phases["syncing"] = 0
phases["mining"] = 0
phases["mined"] = 0
phasesOfShard[s.ShardId] = phases
}
if s.SyncState.SyncProgress < 10000 || s.SyncState.FillEmptyProgress < 10000 {
phasesOfShard[s.ShardId]["syncing"] = phasesOfShard[s.ShardId]["syncing"] + 1
} else if s.SubmissionState.Succeeded > 0 {
phasesOfShard[s.ShardId]["mined"] = phasesOfShard[s.ShardId]["mined"] + 1
} else {
phasesOfShard[s.ShardId]["mining"] = phasesOfShard[s.ShardId]["mining"] + 1
}
}
}

d.m.SetStaticMetrics(peersTotal, minerOfShards, versions, shards, phasesOfShard)
}

func (d *dashboard) loop() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
d.Report()
case <-d.ctx.Done():
return
}
}
}

func (d *dashboard) listenAndServe(port int) error {
go d.loop()

http.HandleFunc("/hello", d.HelloHandler)
http.HandleFunc("/reportstate", d.ReportStateHandler)
return http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
}

func main() {
// Parse the flags and set up the logger to print everything requested
flag.Parse()
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*logFlag), log.StreamHandler(os.Stderr, log.TerminalFormat(true))))

if *portFlag < 0 || *portFlag > math.MaxUint16 {
log.Crit("Invalid port")
}

if *grafanaPortFlag < 0 || *grafanaPortFlag > math.MaxUint16 {
log.Crit("Invalid grafana port")
}
d, err := newDashboard()
if err != nil {
log.Crit("New dashboard fail", "err", err)
}

go d.listenAndServe(*portFlag)

if err := d.m.Serve(d.ctx, *listenAddrFlag, *grafanaPortFlag); err != nil {
log.Crit("Error starting metrics server", "err", err)
}
}
6 changes: 6 additions & 0 deletions ethstorage/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ var (
EnvVar: prefixEnvVar("RPC_ESCALL_URL"),
Value: "http://127.0.0.1:8545",
}
StateUploadURL = cli.StringFlag{
Name: "state.upload.url",
Usage: "API that update es-node state to, the node will upload state to API for statistic if it has been set correctly.",
EnvVar: prefixEnvVar("STATE_UPLOAD_URL"),
}
)

// Not use 'Required' field in order to avoid unnecessary check when use 'init' subcommand
Expand Down Expand Up @@ -234,6 +239,7 @@ var optionalFlags = []cli.Flag{
RPCListenAddr,
RPCListenPort,
RPCESCallURL,
StateUploadURL,
}

// Flags contains the list of configuration options available to the binary.
Expand Down
164 changes: 164 additions & 0 deletions ethstorage/metrics/network_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package metrics

import (
"context"
"fmt"
"net"
"strconv"
"time"

ophttp "github.com/ethereum-optimism/optimism/op-service/httputil"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

const (
SubsystemName = "network"
)

// NetworkMetrics tracks all the metrics for the es-node.
type NetworkMetrics struct {
// static Status
PeersTotal prometheus.Gauge
MinersOfShards *prometheus.GaugeVec
PeersOfShards *prometheus.GaugeVec
PeersOfVersions *prometheus.GaugeVec
PeersOfPhase *prometheus.GaugeVec

// peer metrics
PeerState *prometheus.GaugeVec

registry *prometheus.Registry
factory metrics.Factory
}

// NewMetrics creates a new [NetworkMetrics] instance with the given process name.
func NewNetworkMetrics() *NetworkMetrics {
ns := Namespace

registry := prometheus.NewRegistry()
registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
registry.MustRegister(collectors.NewGoCollector())
factory := metrics.With(registry)
return &NetworkMetrics{

PeersTotal: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Subsystem: SubsystemName,
Name: "peers",
Help: "The number of peers existed in the last 10 minutes",
}),

MinersOfShards: factory.NewGaugeVec(prometheus.GaugeOpts{
ping-ke marked this conversation as resolved.
Show resolved Hide resolved
Namespace: ns,
Subsystem: SubsystemName,
Name: "miners",
Help: "The number of miners existed in each shard",
}, []string{
"shard_id",
}),

PeersOfShards: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Subsystem: SubsystemName,
Name: "peers_of_shards",
Help: "The number of peers in each shard",
}, []string{
"shard_id",
}),

PeersOfVersions: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Subsystem: SubsystemName,
Name: "peers_of_versions",
Help: "The number of peers for each version",
}, []string{
"version",
}),

PeersOfPhase: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Subsystem: SubsystemName,
Name: "peers_of_phases",
Help: "The number of peers for each phase",
}, []string{
"shard_id",
"phase",
}),

PeerState: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Subsystem: SubsystemName,
Name: "peer_state",
Help: "The sync state of each peer",
}, []string{
"key",
"type",
}),

registry: registry,
factory: factory,
}
}

func (m *NetworkMetrics) SetPeerInfo(id, version, address string, shardId uint64, miner common.Address) {
m.PeerState.WithLabelValues(fmt.Sprintf("{\"id\":\"%s\", \"version\":\"%s\", \"address\":\"%s\", \"shardid\":%d, \"miner\":\"%s\"}", id, version, address, shardId, miner.Hex()), "UpdateTime").Set(float64(time.Now().UnixMilli()))
}

func (m *NetworkMetrics) SetSyncState(id, version, address string, shardId uint64, miner common.Address, peerCount int, syncProgress, syncedSeconds,
fillEmptyProgress, fillEmptySeconds, providedBlobs uint64) {
m.PeerState.WithLabelValues(fmt.Sprintf("{\"id\":\"%s\", \"version\":\"%s\", \"address\":\"%s\", \"shardid\":%d, \"miner\":\"%s\"}", id, version, address, shardId, miner.Hex()), "PeerCount").Set(float64(peerCount))
m.PeerState.WithLabelValues(fmt.Sprintf("{\"id\":\"%s\", \"version\":\"%s\", \"address\":\"%s\", \"shardid\":%d, \"miner\":\"%s\"}", id, version, address, shardId, miner.Hex()), "SyncProgress").Set(float64(syncProgress) / 100)
m.PeerState.WithLabelValues(fmt.Sprintf("{\"id\":\"%s\", \"version\":\"%s\", \"address\":\"%s\", \"shardid\":%d, \"miner\":\"%s\"}", id, version, address, shardId, miner.Hex()), "SyncedSeconds").Set(float64(syncedSeconds) / 3600)
m.PeerState.WithLabelValues(fmt.Sprintf("{\"id\":\"%s\", \"version\":\"%s\", \"address\":\"%s\", \"shardid\":%d, \"miner\":\"%s\"}", id, version, address, shardId, miner.Hex()), "FillEmptyProgress").Set(float64(fillEmptyProgress) / 100)
m.PeerState.WithLabelValues(fmt.Sprintf("{\"id\":\"%s\", \"version\":\"%s\", \"address\":\"%s\", \"shardid\":%d, \"miner\":\"%s\"}", id, version, address, shardId, miner.Hex()), "FillEmptySeconds").Set(float64(fillEmptySeconds) / 3600)
m.PeerState.WithLabelValues(fmt.Sprintf("{\"id\":\"%s\", \"version\":\"%s\", \"address\":\"%s\", \"shardid\":%d, \"miner\":\"%s\"}", id, version, address, shardId, miner.Hex()), "ProvidedBlobs").Set(float64(providedBlobs))
}

func (m *NetworkMetrics) SetMiningState(id, version, address string, shardId uint64, miner common.Address, miningPower, samplingTime uint64) {
m.PeerState.WithLabelValues(fmt.Sprintf("{\"id\":\"%s\", \"version\":\"%s\", \"address\":\"%s\", \"shardid\":%d, \"miner\":\"%s\"}", id, version, address, shardId, miner.Hex()), "MiningPower").Set(float64(miningPower) / 100)
m.PeerState.WithLabelValues(fmt.Sprintf("{\"id\":\"%s\", \"version\":\"%s\", \"address\":\"%s\", \"shardid\":%d, \"miner\":\"%s\"}", id, version, address, shardId, miner.Hex()), "SamplingTime").Set(float64(samplingTime) / 1000)
}

func (m *NetworkMetrics) SetSubmissionState(id, version, address string, shardId uint64, miner common.Address, succeeded, failed, dropped int, lastSucceededTime int64) {
m.PeerState.WithLabelValues(fmt.Sprintf("{\"id\":\"%s\", \"version\":\"%s\", \"address\":\"%s\", \"shardid\":%d, \"miner\":\"%s\"}", id, version, address, shardId, miner.Hex()), "Succeeded").Set(float64(succeeded))
m.PeerState.WithLabelValues(fmt.Sprintf("{\"id\":\"%s\", \"version\":\"%s\", \"address\":\"%s\", \"shardid\":%d, \"miner\":\"%s\"}", id, version, address, shardId, miner.Hex()), "Failed").Set(float64(failed))
m.PeerState.WithLabelValues(fmt.Sprintf("{\"id\":\"%s\", \"version\":\"%s\", \"address\":\"%s\", \"shardid\":%d, \"miner\":\"%s\"}", id, version, address, shardId, miner.Hex()), "Dropped").Set(float64(dropped))
m.PeerState.WithLabelValues(fmt.Sprintf("{\"id\":\"%s\", \"version\":\"%s\", \"address\":\"%s\", \"shardid\":%d, \"miner\":\"%s\"}", id, version, address, shardId, miner.Hex()), "LastSucceededTime").Set(float64(lastSucceededTime))
}

func (m *NetworkMetrics) SetStaticMetrics(peersTotal int, minerOfShards map[uint64]map[common.Address]struct{},
versions map[string]int, shards map[uint64]int, phasesOfShard map[uint64]map[string]int) {
m.PeersTotal.Set(float64(peersTotal))

for shardId, miners := range minerOfShards {
m.MinersOfShards.WithLabelValues(fmt.Sprintf("%d", shardId)).Set(float64(len(miners)))
}
for shardId, count := range shards {
m.PeersOfShards.WithLabelValues(fmt.Sprintf("%d", shardId)).Set(float64(count))
}
for version, count := range versions {
m.PeersOfVersions.WithLabelValues(version).Set(float64(count))
}
for shardId, phases := range phasesOfShard {
for phase, count := range phases {
m.PeersOfPhase.WithLabelValues(fmt.Sprintf("%d", shardId), phase).Set(float64(count))
}
}
}

func (m *NetworkMetrics) Serve(ctx context.Context, hostname string, port int) error {
addr := net.JoinHostPort(hostname, strconv.Itoa(port))
server := ophttp.NewHttpServer(promhttp.InstrumentMetricHandler(
m.registry, promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}),
))
server.Addr = addr
go func() {
<-ctx.Done()
server.Close()
}()
return server.ListenAndServe()
}
Loading
Loading