Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ethstats: Fix Issue with Unreported Pending Transaction Information #9371

Merged
merged 1 commit into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,14 +872,6 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config) error {
if gpoParams.Default == nil {
gpoParams.Default = config.Miner.GasPrice
}
//eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)
if config.Ethstats != "" {
var headCh chan [][]byte
headCh, s.unsubscribeEthstat = s.notifications.Events.AddHeaderSubscription()
if err := ethstats.New(stack, s.sentryServers, chainKv, s.blockReader, s.engine, config.Ethstats, s.networkID, ctx.Done(), headCh); err != nil {
return err
}
}
// start HTTP API
httpRpcCfg := stack.Config().Http
ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, blockReader, ethBackendRPC,
Expand All @@ -888,6 +880,15 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config) error {
return err
}

//eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)
if config.Ethstats != "" {
var headCh chan [][]byte
headCh, s.unsubscribeEthstat = s.notifications.Events.AddHeaderSubscription()
if err := ethstats.New(stack, s.sentryServers, chainKv, s.blockReader, s.engine, config.Ethstats, s.networkID, ctx.Done(), headCh, txPoolRpcClient); err != nil {
return err
}
}

s.apiList = jsonrpc.APIList(chainKv, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, blockReader, s.agg, &httpRpcCfg, s.engine, s.logger)

if config.SilkwormRpcDaemon && httpRpcCfg.Enabled {
Expand Down
44 changes: 27 additions & 17 deletions ethstats/ethstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/ledgerwatch/erigon-lib/gointerfaces/txpool"
"math/big"
"net/http"
"regexp"
Expand Down Expand Up @@ -70,6 +71,7 @@ type Service struct {
histCh chan []uint64 // History request block numbers are fed into this channel

blockReader services.FullBlockReader
txPool txpool.TxpoolClient
}

// connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the
Expand Down Expand Up @@ -122,7 +124,8 @@ func (w *connWrapper) Close() error {
}

// New returns a monitoring service ready for stats reporting.
func New(node *node.Node, servers []*sentry.GrpcServer, chainDB kv.RoDB, blockReader services.FullBlockReader, engine consensus.Engine, url string, networkid uint64, quitCh <-chan struct{}, headCh chan [][]byte) error {
func New(node *node.Node, servers []*sentry.GrpcServer, chainDB kv.RoDB, blockReader services.FullBlockReader,
engine consensus.Engine, url string, networkid uint64, quitCh <-chan struct{}, headCh chan [][]byte, txPoolRpcClient txpool.TxpoolClient) error {
// Parse the netstats connection url
re := regexp.MustCompile("([^:@]*)(:([^@]*))?@(.+)")
parts := re.FindStringSubmatch(url)
Expand All @@ -142,6 +145,7 @@ func New(node *node.Node, servers []*sentry.GrpcServer, chainDB kv.RoDB, blockRe
chaindb: chainDB,
headCh: headCh,
quitCh: quitCh,
txPool: txPoolRpcClient,
}

node.RegisterLifecycle(ethstats)
Expand Down Expand Up @@ -635,25 +639,31 @@ func (s *Service) reportHistory(conn *connWrapper, list []uint64) error {
return conn.WriteJSON(report)
}

// pendStats is the information to report about pending transactions.
type pendStats struct {
Pending int `json:"pending"`
}

// reportPending retrieves the current number of pending transactions and reports
// it to the stats server.
func (s *Service) reportPending(conn *connWrapper) error {
/* // Retrieve the pending count from the local blockchain
pending, _ := s.backend.Stats()
// Assemble the transaction stats and send it to the server
log.Trace("Sending pending transactions to ethstats", "count", pending)

stats := map[string]interface{}{
"id": s.node,
"stats": &pendStats{
Pending: pending,
},
}
report := map[string][]interface{}{
"emit": {"pending", stats},
}
return conn.WriteJSON(report)*/
return nil
in := new(txpool.StatusRequest)
status, err := s.txPool.Status(context.Background(), in)
if err != nil {
return err
}
log.Trace("Sending pending transactions to ethstats", "count", status.PendingCount)

stats := map[string]interface{}{
"id": s.node,
"stats": &pendStats{
Pending: int(status.PendingCount),
},
}
report := map[string][]interface{}{
"emit": {"pending", stats},
}
return conn.WriteJSON(report)
}

// nodeStats is the information to report about the local node.
Expand Down
Loading