Skip to content

Commit

Permalink
Merge pull request #163 from ipfs-force-community/feat/add-more-metrics
Browse files Browse the repository at this point in the history
Feat/add more metrics
  • Loading branch information
simlecode authored Dec 7, 2023
2 parents 08ab3ab + c970795 commit 52578e5
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 123 deletions.
4 changes: 4 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ issues:
linters-settings:
goconst:
min-occurrences: 6
revive:
rules:
- name: unused-parameter
disabled: true
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/filecoin-project/venus v1.14.0
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/ipfs-force-community/metrics v1.0.1-0.20231011024528-8c881d456601
github.com/ipfs-force-community/metrics v1.0.1-0.20231207081445-30178e706d09
github.com/ipfs-force-community/sophon-auth v1.14.0
github.com/ipfs-force-community/sophon-miner v1.14.0
github.com/ipfs/go-cid v0.4.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,8 @@ github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/ipfs-force-community/go-jsonrpc v0.1.9 h1:5QavBltfvV6fz/+EbYsCkVxJ1MSJncZm6YuPs1SLdZU=
github.com/ipfs-force-community/go-jsonrpc v0.1.9/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM=
github.com/ipfs-force-community/metrics v1.0.1-0.20231011024528-8c881d456601 h1:zxKQ30KAD6KfvSFAx9tuqQXLDsEHyF+eVaUBXXYC2bU=
github.com/ipfs-force-community/metrics v1.0.1-0.20231011024528-8c881d456601/go.mod h1:wM6EmkEcnJgWOFcVytgvK0u15awEmt8He0f2kAdsFDA=
github.com/ipfs-force-community/metrics v1.0.1-0.20231207081445-30178e706d09 h1:qEI6ItxKtgOupMMuGJwqK5zEzztKKPUP1QKq9g+X5bM=
github.com/ipfs-force-community/metrics v1.0.1-0.20231207081445-30178e706d09/go.mod h1:wM6EmkEcnJgWOFcVytgvK0u15awEmt8He0f2kAdsFDA=
github.com/ipfs-force-community/sophon-auth v1.14.0 h1:ctBJ6UHkcytEzfVPgiiHo0cW4FGQrE7r1H3Um0FcHbo=
github.com/ipfs-force-community/sophon-auth v1.14.0/go.mod h1:d6J6u3zyIwcEajRho5BhVBcoIChEf0K76wP4yJEfEhc=
github.com/ipfs-force-community/sophon-miner v1.14.0 h1:3c+EoHBM4Ir0EnbPQCJpkNmgAoBR8chDZO0MPRYpYT4=
Expand Down
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@ func RunMain(ctx context.Context, repoPath string, cfg *config.Config) error {
return err
}

metrics2.ApiState.Set(ctx, 1)
defer func() {
metrics2.ApiState.Set(ctx, 0)
}()
if err = srv.Serve(manet.NetListener(nl)); err != nil && err != http.ErrServerClosed {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions marketevent/market_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func (m *MarketEventStream) ListenMarketEvent(ctx context.Context, policy *gtype

ctx, _ = tag.New(ctx, tag.Upsert(metrics.IPKey, ip), tag.Upsert(metrics.MinerAddressKey, mAddr.String()),
tag.Upsert(metrics.MinerTypeKey, "market"))
stats.Record(ctx, metrics.MinerRegister.M(1))
stats.Record(ctx, metrics.MinerSource.M(1))
metrics.MinerRegister.Tick(ctx)
metrics.MinerSource.Tick(ctx)

out <- &gtypes.RequestEvent{
ID: sharedTypes.NewUUID(),
Expand Down
34 changes: 3 additions & 31 deletions metrics/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,20 @@ package metrics

import (
"context"
"fmt"

"github.com/ipfs-force-community/metrics"
logging "github.com/ipfs/go-log/v2"
"go.opencensus.io/stats/view"

v2API "github.com/filecoin-project/venus/venus-shared/api/gateway/v2"
)

var log = logging.Logger("metrics")

func SetupMetrics(ctx context.Context, metricsConfig *metrics.MetricsConfig, api v2API.IGateway) error {
log.Infof("metrics config: enabled: %v, exporter type: %s, prometheus: %+v, graphite: %+v",
metricsConfig.Enabled, metricsConfig.Exporter.Type, metricsConfig.Exporter.Prometheus,
metricsConfig.Exporter.Graphite)

if !metricsConfig.Enabled {
return nil
}

if err := view.Register(views...); err != nil {
return fmt.Errorf("cannot register the view: %w", err)
}

switch metricsConfig.Exporter.Type {
case metrics.ETPrometheus:
go func() {
if err := metrics.RegisterPrometheusExporter(ctx, metricsConfig.Exporter.Prometheus); err != nil {
log.Errorf("failed to register prometheus exporter err: %v", err)
}
log.Infof("prometheus exporter server graceful shutdown successful")
}()

case metrics.ETGraphite:
if err := metrics.RegisterGraphiteExporter(ctx, metricsConfig.Exporter.Graphite); err != nil {
log.Errorf("failed to register graphite exporter: %v", err)
}
default:
log.Warnf("invalid exporter type: %s", metricsConfig.Exporter.Type)
err := metrics.SetupMetrics(ctx, metricsConfig)
if err != nil {
return err
}

go recordMetricsLoop(ctx, api)

return nil
}
89 changes: 23 additions & 66 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package metrics
import (
"time"

"github.com/filecoin-project/go-jsonrpc/metrics"
rpcMetrics "github.com/filecoin-project/go-jsonrpc/metrics"
"github.com/ipfs-force-community/metrics"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
Expand All @@ -26,26 +27,28 @@ var defaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.

var (
// wallet
WalletRegister = stats.Int64("wallet_register", "Wallet register", stats.UnitDimensionless)
WalletUnregister = stats.Int64("wallet_unregister", "Wallet unregister", stats.UnitDimensionless)
WalletNum = stats.Int64("wallet_num", "Wallet count", stats.UnitDimensionless)
WalletAddressNum = stats.Int64("wallet_address_num", "Address owned by wallet", stats.UnitDimensionless)
WalletAddAddr = stats.Int64("wallet_add_addr", "Wallet add a new address", stats.UnitDimensionless)
WalletRemoveAddr = stats.Int64("wallet_remove_addr", "Wallet remove a new address", stats.UnitDimensionless)
WalletConnNum = stats.Int64("wallet_conn_num", "Wallet connection count", stats.UnitDimensionless)
WalletNum = metrics.NewInt64("wallet/num", "Wallet count", stats.UnitDimensionless)
WalletAddressNum = metrics.NewInt64("wallet/address_num", "Address owned by wallet", stats.UnitDimensionless)
WalletConnNum = metrics.NewInt64("wallet/conn_num", "Wallet connection count", stats.UnitDimensionless)
WalletRegister = stats.Int64("wallet/register", "Wallet register", stats.UnitDimensionless)
WalletUnregister = stats.Int64("wallet/unregister", "Wallet unregister", stats.UnitDimensionless)
WalletAddAddr = stats.Int64("wallet/add_addr", "Wallet add a new address", stats.UnitDimensionless)
WalletRemoveAddr = stats.Int64("wallet/remove_addr", "Wallet remove a new address", stats.UnitDimensionless)

// miner
MinerRegister = stats.Int64("miner_register", "Miner register", stats.UnitDimensionless)
MinerUnregister = stats.Int64("miner_unregister", "Miner unregister", stats.UnitDimensionless)
MinerNum = stats.Int64("miner_num", "Wallet count", stats.UnitDimensionless)
MinerSource = stats.Int64("wallet_source", "Miner IP", stats.UnitDimensionless)
MinerConnNum = stats.Int64("miner_conn_num", "Miner connection count", stats.UnitDimensionless)
MinerRegister = metrics.NewCounter("miner/register", "Miner register", MinerAddressKey, IPKey, MinerTypeKey)
MinerUnregister = metrics.NewCounter("miner/unregister", "Miner unregister", MinerAddressKey, IPKey, MinerTypeKey)
MinerSource = metrics.NewCounter("miner/source", "Miner IP", MinerAddressKey, MinerTypeKey)
MinerNum = metrics.NewInt64("miner/num", "Wallet count", "", MinerTypeKey)
MinerConnNum = metrics.NewInt64("miner/conn_num", "Miner connection count", "", MinerTypeKey)

// method call
WalletSign = stats.Float64("wallet_sign", "Call WalletSign spent time", stats.UnitMilliseconds)
WalletList = stats.Float64("wallet_list", "Call WalletList spent time", stats.UnitMilliseconds)
ComputeProof = stats.Float64("compute_proof", "Call ComputeProof spent time", stats.UnitMilliseconds)
SectorsUnsealPiece = stats.Float64("sectors_unseal_piece", "Call SectorsUnsealPiece spent time", stats.UnitMilliseconds)

ApiState = metrics.NewInt64("api/state", "api service state. 0: down, 1: up", "")
)

var (
Expand All @@ -60,16 +63,7 @@ var (
Aggregation: view.Count(),
TagKeys: []tag.Key{WalletAccountKey, IPKey},
}
walletNumView = &view.View{
Measure: WalletNum,
Aggregation: view.Count(),
TagKeys: []tag.Key{WalletAccountKey},
}
walletAddressNumView = &view.View{
Measure: WalletAddressNum,
Aggregation: view.Count(),
TagKeys: []tag.Key{WalletAccountKey, WalletAddressKey},
}

walletAddAddrView = &view.View{
Measure: WalletAddAddr,
Aggregation: view.Count(),
Expand All @@ -80,38 +74,6 @@ var (
Aggregation: view.Count(),
TagKeys: []tag.Key{WalletAccountKey, WalletAddressKey},
}
walletConnNumView = &view.View{
Measure: WalletConnNum,
Aggregation: view.Count(),
TagKeys: []tag.Key{WalletAccountKey, IPKey},
}

// miner
minerRegisterView = &view.View{
Measure: MinerRegister,
Aggregation: view.Count(),
TagKeys: []tag.Key{MinerAddressKey, MinerTypeKey, IPKey},
}
minerUnregisterView = &view.View{
Measure: MinerUnregister,
Aggregation: view.Count(),
TagKeys: []tag.Key{MinerAddressKey, MinerTypeKey, IPKey},
}
minerNumView = &view.View{
Measure: MinerNum,
Aggregation: view.Count(),
TagKeys: []tag.Key{MinerAddressKey, MinerTypeKey},
}
minerSourceView = &view.View{
Measure: MinerSource,
Aggregation: view.Count(),
TagKeys: []tag.Key{MinerAddressKey, MinerTypeKey},
}
minerConnNumView = &view.View{
Measure: WalletConnNum,
Aggregation: view.Count(),
TagKeys: []tag.Key{MinerAddressKey, IPKey, MinerTypeKey},
}

// method call
walletSignView = &view.View{
Expand Down Expand Up @@ -139,25 +101,20 @@ var (
var views = append([]*view.View{
walletRegisterView,
walletUnregisterView,
walletNumView,
walletAddressNumView,
walletAddAddrView,
walletRemoveAddrView,
walletConnNumView,

minerRegisterView,
minerUnregisterView,
minerNumView,
minerSourceView,
minerConnNumView,

walletSignView,
walletListView,
computeProofView,
sectorsUnsealPieceView,
}, metrics.DefaultViews...)
}, rpcMetrics.DefaultViews...)

// SinceInMilliseconds returns the duration of time since the provide time as a float64.
func SinceInMilliseconds(startTime time.Time) float64 {
return float64(time.Since(startTime).Nanoseconds()) / 1e6
}

func init() {
// register metrics
_ = view.Register(views...)
}
35 changes: 19 additions & 16 deletions metrics/record_metrics_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/filecoin-project/go-address"
"go.opencensus.io/stats"
"go.opencensus.io/tag"

v2API "github.com/filecoin-project/venus/venus-shared/api/gateway/v2"
Expand Down Expand Up @@ -35,60 +34,64 @@ func recordWalletConnectionInfo(ctx context.Context, api v2API.IGateway) {
return
}

var walletNum, connNum int64
addrs := make(map[address.Address]struct{})
for _, detail := range walletDetails {
ctx, _ = tag.New(ctx, tag.Upsert(WalletAccountKey, detail.Account))
stats.Record(ctx, WalletNum.M(1))
walletNum++

for _, conn := range detail.ConnectStates {
_ = stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(IPKey, conn.IP)}, WalletConnNum.M(1))
connNum++
for _, addr := range conn.Addrs {
if _, ok := addrs[addr]; ok {
continue
}
addrs[addr] = struct{}{}
_ = stats.RecordWithTags(ctx,
[]tag.Mutator{tag.Upsert(WalletAddressKey, addr.String())}, WalletAddressNum.M(1))
}
}
addrs = make(map[address.Address]struct{})
}

WalletNum.Set(ctx, walletNum)
WalletConnNum.Set(ctx, connNum)
WalletAddressNum.Set(ctx, int64(len(addrs)))
}

func recordMarketConnectionInfo(ctx context.Context, api v2API.IGateway) {
ctx, _ = tag.New(ctx, tag.Upsert(MinerTypeKey, "market"))
connsState, err := api.ListMarketConnectionsState(ctx)
if err != nil {
log.Warnf("failed to get market connections state %v", err)
return
}

var connNum int64
for _, state := range connsState {
ctx, _ = tag.New(ctx, tag.Upsert(MinerAddressKey, state.Addr.String()), tag.Upsert(MinerTypeKey, "market"))
for _, conn := range state.Conn.Connections {
_ = stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(IPKey, conn.IP)}, MinerConnNum.M(1))
}
stats.Record(ctx, MinerNum.M(1))

connNum += int64(len(state.Conn.Connections))
}
MinerConnNum.Set(ctx, connNum)
MinerNum.Set(ctx, int64(len(connsState)))
}

func recordMinerConnectionInfo(ctx context.Context, api v2API.IGateway) {
ctx, _ = tag.New(ctx, tag.Upsert(MinerTypeKey, "pprof"))

miners, err := api.ListConnectedMiners(ctx)
if err != nil {
log.Warnf("faield to list connected miners %v", err)
return
}

var connNum int64
for _, miner := range miners {
state, err := api.ListMinerConnection(ctx, miner)
if err != nil {
log.Warnf("failed to list miner connection %v", err)
return
}

ctx, _ = tag.New(ctx, tag.Upsert(MinerTypeKey, "pprof"), tag.Upsert(MinerAddressKey, miner.String()))
for _, conn := range state.Connections {
_ = stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(IPKey, conn.IP)}, MinerConnNum.M(1))
}
stats.Record(ctx, MinerNum.M(1))
connNum += int64(len(state.Connections))
}
MinerConnNum.Set(ctx, connNum)
MinerNum.Set(ctx, int64(len(miners)))
}
6 changes: 3 additions & 3 deletions proofevent/proof_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func (e *ProofEventStream) ListenProofEvent(ctx context.Context, policy *sharedG

ctx, _ = tag.New(ctx, tag.Upsert(metrics.IPKey, ip), tag.Upsert(metrics.MinerAddressKey, mAddr.String()),
tag.Upsert(metrics.MinerTypeKey, "pprof"))
stats.Record(ctx, metrics.MinerRegister.M(1))
stats.Record(ctx, metrics.MinerSource.M(1))
metrics.MinerRegister.Tick(ctx)
metrics.MinerSource.Tick(ctx)

reqEventChan <- &sharedGatewayTypes.RequestEvent{
ID: sharedTypes.NewUUID(),
Expand All @@ -120,7 +120,7 @@ func (e *ProofEventStream) ListenProofEvent(ctx context.Context, policy *sharedG
case <-ctx.Done():
removeChannel()
close(out)
stats.Record(ctx, metrics.MinerUnregister.M(1))
metrics.MinerUnregister.Tick(ctx)
return
case c := <-reqEventChan:
out <- c
Expand Down
2 changes: 0 additions & 2 deletions walletevent/wallet_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,6 @@ func (w *WalletEventStream) getValidatedAddress(ctx context.Context, channel *ty
return nil, err
}
validAddrs = append(validAddrs, addr)
_ = stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(metrics.WalletAddressKey, addr.String())},
metrics.WalletAddressNum.M(1))
}

return validAddrs, nil
Expand Down

0 comments on commit 52578e5

Please sign in to comment.