diff --git a/cmd/rpc.go b/cmd/rpc.go index 00490cc5..c554cc99 100644 --- a/cmd/rpc.go +++ b/cmd/rpc.go @@ -115,7 +115,7 @@ func startNativeSpaceRpcServer(ctx context.Context, wg *sync.WaitGroup, storeCtx } // initialize gas station handler - gasHandler := handler.NewGasStationHandler(storeCtx.CfxDB, storeCtx.CfxCache) + gasHandler := handler.MustNewCfxGasStationHandlerFromViper(clientProvider) if storeCtx.CfxDB != nil { // initialize pruned logs handler diff --git a/config/config.yml b/config/config.yml index 8989858b..918c4cdd 100644 --- a/config/config.yml +++ b/config/config.yml @@ -88,6 +88,15 @@ eth: # The cold interval before the circuit breaker turns to be half-open since being turned open. openColdTime: 15s +# # Gas station configurations +# gasstation: +# # Whether to enable gas station. +# enabled: true +# # The number of blocks (or epochs) from the latest block (or epoch) to peek for gas price estimation. +# historicalPeekCount: 100 +# # Percentiles of average txn gas price mapped to three levels of urgency (`low`, `medium` and `high`). +# percentiles: [1, 50, 99] + # Blockchain sync configurations sync: # Core space sync configurations diff --git a/node/cfxclient.go b/node/cfxclient.go index c4a25878..ba6a6652 100644 --- a/node/cfxclient.go +++ b/node/cfxclient.go @@ -6,6 +6,7 @@ import ( "github.com/Conflux-Chain/confura/store/mysql" "github.com/Conflux-Chain/confura/util/rpc" sdk "github.com/Conflux-Chain/go-conflux-sdk" + "github.com/pkg/errors" ) // CfxClientProvider provides core space client by router. @@ -23,7 +24,7 @@ func NewCfxClientProvider(db *mysql.MysqlStore, router Router) *CfxClientProvide } } -// GetClient gets client of specific group (or use normal HTTP group as default. +// GetClient gets client of specific group (or use normal HTTP group as default). func (p *CfxClientProvider) GetClient(key string, groups ...Group) (sdk.ClientOperator, error) { client, err := p.getClient(key, cfxNodeGroup(groups...)) if err != nil { @@ -44,6 +45,25 @@ func (p *CfxClientProvider) GetClientByIP(ctx context.Context, groups ...Group) return client.(sdk.ClientOperator), nil } +// GetClientsByGroup gets all clients of specific group. +func (p *CfxClientProvider) GetClientsByGroup(grp Group) (clients []sdk.ClientOperator, err error) { + np := locateNodeProvider(p.router) + if np == nil { + return nil, errors.New("unsupported router type") + } + + nodeUrls := np.ListNodesByGroup(grp) + for _, url := range nodeUrls { + if c, err := p.getOrRegisterClient(string(url), grp); err == nil { + clients = append(clients, c.(sdk.ClientOperator)) + } else { + return nil, err + } + } + + return clients, nil +} + func cfxNodeGroup(groups ...Group) Group { grp := GroupCfxHttp if len(groups) > 0 { @@ -52,3 +72,20 @@ func cfxNodeGroup(groups ...Group) Group { return grp } + +// locateNodeProvider finds node provider from the router chain or nil. +func locateNodeProvider(r Router) NodeProvider { + if np, ok := r.(NodeProvider); ok { + return np + } + + if cr, ok := r.(*chainedRouter); ok { + for _, r := range cr.routers { + if np := locateNodeProvider(r); np != nil { + return np + } + } + } + + return nil +} diff --git a/node/client.go b/node/client.go index 1ec0e405..0874c15a 100644 --- a/node/client.go +++ b/node/client.go @@ -132,24 +132,27 @@ func (p *clientProvider) populateCache(token string) (grp Group, ok bool) { // getClient gets client based on keyword and node group type. func (p *clientProvider) getClient(key string, group Group) (interface{}, error) { - clients := p.getOrRegisterGroup(group) - - logger := logrus.WithFields(logrus.Fields{ - "key": key, - "group": group, - }) - url := p.router.Route(group, []byte(key)) if len(url) == 0 { - logger.WithError(ErrClientUnavailable).Error("Failed to get full node client from provider") + logrus.WithFields(logrus.Fields{ + "key": key, + "group": group, + }).Error("No full node client available from router") return nil, ErrClientUnavailable } + return p.getOrRegisterClient(url, group) +} + +// getOrRegisterClient gets or registers RPC client for fullnode proxy. +func (p *clientProvider) getOrRegisterClient(url string, group Group) (interface{}, error) { + clients := p.getOrRegisterGroup(group) nodeName := rpc.Url2NodeName(url) - logger = logger.WithFields(logrus.Fields{ - "node": nodeName, - "url": url, + logger := logrus.WithFields(logrus.Fields{ + "node": nodeName, + "url": url, + "group": group, }) logger.Trace("Route RPC requests") diff --git a/node/router.go b/node/router.go index 73addbe2..de13067c 100644 --- a/node/router.go +++ b/node/router.go @@ -57,6 +57,11 @@ type Router interface { Route(group Group, key []byte) string } +// NodeProvider provides full node URLs by group. +type NodeProvider interface { + ListNodesByGroup(group Group) (urls []string) +} + // MustNewRouter creates an instance of Router. func MustNewRouter(redisURL string, nodeRPCURL string, groupConf map[Group]UrlConfig) Router { var routers []Router @@ -235,7 +240,6 @@ func NewLocalRouter(group2Urls map[Group][]string) *LocalRouter { for k, v := range group2Urls { groups[k] = newLocalNodeGroup(v) } - return &LocalRouter{groups: groups} } @@ -255,6 +259,23 @@ func (r *LocalRouter) Route(group Group, key []byte) string { return "" } +// ListNodesByGroup returns all node URLs in a group. +func (r *LocalRouter) ListNodesByGroup(group Group) (urls []string) { + r.mu.Lock() + defer r.mu.Unlock() + + item, ok := r.groups[group] + if !ok { + return nil + } + + for _, v := range item.nodes { + urls = append(urls, v.String()) + } + + return urls +} + func NewLocalRouterFromNodeRPC(client *rpc.Client) (*LocalRouter, error) { router := &LocalRouter{ groups: make(map[Group]*localNodeGroup), diff --git a/rpc/apis.go b/rpc/apis.go index cb30d083..98557295 100644 --- a/rpc/apis.go +++ b/rpc/apis.go @@ -54,7 +54,7 @@ func filterExposedApis(allApis []API, exposedModules []string) (map[string]inter // nativeSpaceApis returns the collection of built-in RPC APIs for core space. func nativeSpaceApis( - clientProvider *node.CfxClientProvider, gashandler *handler.GasStationHandler, option ...CfxAPIOption, + clientProvider *node.CfxClientProvider, gashandler *handler.CfxGasStationHandler, option ...CfxAPIOption, ) []API { return []API{ { diff --git a/rpc/gastation_api.go b/rpc/gastation_api.go index 799eebdb..178426e7 100644 --- a/rpc/gastation_api.go +++ b/rpc/gastation_api.go @@ -2,23 +2,70 @@ package rpc import ( "context" + "math/big" "github.com/Conflux-Chain/confura/rpc/handler" "github.com/Conflux-Chain/confura/types" + cfxtypes "github.com/Conflux-Chain/go-conflux-sdk/types" + logutil "github.com/Conflux-Chain/go-conflux-util/log" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/sirupsen/logrus" ) // gasStationAPI provides core space gasstation API. type gasStationAPI struct { - handler *handler.GasStationHandler + handler *handler.CfxGasStationHandler + etLogger *logutil.ErrorTolerantLogger } -func newGasStationAPI(handler *handler.GasStationHandler) *gasStationAPI { - return &gasStationAPI{handler: handler} +// newGasStationAPI creates a new instance of gasStationAPI. +func newGasStationAPI(handler *handler.CfxGasStationHandler) *gasStationAPI { + return &gasStationAPI{ + handler: handler, + etLogger: logutil.NewErrorTolerantLogger(logutil.DefaultETConfig), + } } -func (api *gasStationAPI) Price(ctx context.Context) (*types.GasStationPrice, error) { - //return api.handler.GetPrice() +// SuggestedGasFees retrieves the suggested gas fees. +func (api *gasStationAPI) SuggestedGasFees(ctx context.Context) (*types.SuggestedGasFees, error) { + cfx := GetCfxClientFromContext(ctx) + + // Attempt to get suggested gas fees from the handler if available. + if api.handler != nil { + gasFee, err := api.handler.Suggest(cfx) + api.etLogger.Log( + logrus.StandardLogger(), err, "Failed to get suggested gas fees from handler", + ) + return gasFee, err + } + // Fallback to fetching gas fees directly from the blockchain. + latestBlock, err := cfx.GetBlockSummaryByEpoch(cfxtypes.EpochLatestState) + if err != nil { + return nil, err + } + + priorityFee, err := cfx.GetMaxPriorityFeePerGas() + if err != nil { + return nil, err + } + + baseFeePerGas := latestBlock.BaseFeePerGas.ToInt() + gasFeeEstimation := types.GasFeeEstimation{ + SuggestedMaxPriorityFeePerGas: priorityFee, + SuggestedMaxFeePerGas: (*hexutil.Big)(big.NewInt(0).Add(baseFeePerGas, priorityFee.ToInt())), + } + + return &types.SuggestedGasFees{ + Low: gasFeeEstimation, + Medium: gasFeeEstimation, + High: gasFeeEstimation, + EstimatedBaseFee: (*hexutil.Big)(baseFeePerGas), + }, nil +} + +// TODO: Deprecate it if not used by the community any more. +func (api *gasStationAPI) Price(ctx context.Context) (*types.GasStationPrice, error) { // Use oracle gas price from the blockchain. cfx := GetCfxClientFromContext(ctx) price, err := cfx.GetGasPrice() diff --git a/rpc/handler/cfx_gasstation.go b/rpc/handler/cfx_gasstation.go new file mode 100644 index 00000000..2800b46f --- /dev/null +++ b/rpc/handler/cfx_gasstation.go @@ -0,0 +1,390 @@ +package handler + +import ( + "container/list" + "errors" + "math/big" + "math/rand" + "sync/atomic" + "time" + + "github.com/Conflux-Chain/confura/node" + "github.com/Conflux-Chain/confura/types" + "github.com/Conflux-Chain/confura/util" + sdk "github.com/Conflux-Chain/go-conflux-sdk" + cfxtypes "github.com/Conflux-Chain/go-conflux-sdk/types" + logutil "github.com/Conflux-Chain/go-conflux-util/log" + "github.com/Conflux-Chain/go-conflux-util/viper" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/openweb3/go-rpc-provider/utils" + "github.com/sirupsen/logrus" +) + +const ( + // maxCachedBlockHashEpochs is the max number of epochs to cache their block hashes. + maxCachedBlockHashEpochs = 100 +) + +// CfxGasStationHandler handles RPC requests for gas price estimation. +type CfxGasStationHandler struct { + config *GasStationConfig // Gas station configuration + status atomic.Value // Gas station status + clientProvider *node.CfxClientProvider // Client provider to get full node clients + clients []sdk.ClientOperator // Clients used to get historical data + cliIndex int // Index of the main client + fromEpoch uint64 // Start epoch number to sync from + epochBlockHashList *list.List // Linked list to store epoch block hashes + window *PriorityFeeWindow // Block priority fee window +} + +func MustNewCfxGasStationHandlerFromViper(cp *node.CfxClientProvider) *CfxGasStationHandler { + var cfg GasStationConfig + viper.MustUnmarshalKey("gasStation", &cfg) + + if !cfg.Enabled { + return nil + } + + // Get all clients in the group. + clients, err := cp.GetClientsByGroup(node.GroupCfxHttp) + if err != nil { + logrus.WithError(err).Fatal("Failed to get fullnode cluster") + } + + if len(clients) == 0 { + logrus.Fatal("No full node client available") + } + + // Select a random client as the main client. + cliIndex := rand.Int() % len(clients) + // Get the latest epoch number with the main client. + latestEpoch, err := clients[cliIndex].GetEpochNumber(cfxtypes.EpochLatestState) + if err != nil { + logrus.WithError(err).Fatal("Failed to get latest epoch number") + } + + fromEpoch := latestEpoch.ToInt().Uint64() - uint64(cfg.HistoricalPeekCount) + h := &CfxGasStationHandler{ + config: &cfg, + clientProvider: cp, + clients: clients, + cliIndex: cliIndex, + epochBlockHashList: list.New(), + fromEpoch: fromEpoch, + window: NewPriorityFeeWindow(cfg.HistoricalPeekCount), + } + + go h.run() + return h +} + +// run starts to sync historical data and refresh cluster nodes. +func (h *CfxGasStationHandler) run() { + syncTicker := time.NewTimer(0) + defer syncTicker.Stop() + + refreshTicker := time.NewTicker(clusterUpdateInterval) + defer refreshTicker.Stop() + + etLogger := logutil.NewErrorTolerantLogger(logutil.DefaultETConfig) + for { + select { + case <-syncTicker.C: + complete, err := h.sync() + etLogger.Log( + logrus.WithFields(logrus.Fields{ + "status": h.status.Load(), + "fromEpoch": h.fromEpoch, + "clients": h.clients, + }), err, "Gas Station handler sync error", + ) + h.updateStatus(err) + h.resetSyncTicker(syncTicker, complete, err) + case <-refreshTicker.C: + if err := h.refreshClusterNodes(); err != nil { + logrus.WithError(err).Error("Gas station handler cluster refresh error") + } + } + } +} + +// sync synchronizes historical data from the full node cluster. +func (h *CfxGasStationHandler) sync() (complete bool, err error) { + if len(h.clients) == 0 { + return false, StationStatusClientUnavailable + } + + h.cliIndex %= len(h.clients) + for idx := h.cliIndex; ; { + complete, err = h.trySync(h.clients[idx]) + if err != nil { + logrus.WithFields(logrus.Fields{ + "cliIndex": idx, + "nodeUrl": h.clients[idx].GetNodeURL(), + }).WithError(err).Debug("Gas station handler sync once error") + } + + if err == nil || utils.IsRPCJSONError(err) { + h.cliIndex = idx + break + } + + idx = (idx + 1) % len(h.clients) + if idx == h.cliIndex { // failed all nodes? + break + } + } + + return complete, err +} + +func (h *CfxGasStationHandler) trySync(cfx sdk.ClientOperator) (bool, error) { + logrus.WithFields(logrus.Fields{ + "fromEpoch": h.fromEpoch, + "nodeUrl": cfx.GetNodeURL(), + }).Debug("Gas station handler syncing once") + + // Get the latest epoch number. + latestEpoch, err := cfx.GetEpochNumber(cfxtypes.EpochLatestState) + if err != nil { + return false, err + } + + latestEpochNo := latestEpoch.ToInt().Uint64() + if h.fromEpoch > latestEpochNo { // already catch-up? + return true, nil + } + + // Get the pivot block. + epoch := cfxtypes.NewEpochNumberUint64(h.fromEpoch) + pivotBlock, err := cfx.GetBlockByEpoch(epoch) + if err != nil { + return false, err + } + + prevEpochBh := h.prevEpochPivotBlockHash() + if len(prevEpochBh) > 0 && prevEpochBh != pivotBlock.ParentHash { + logrus.WithFields(logrus.Fields{ + "prevEpochBh": prevEpochBh, + "pivotBlockHash": pivotBlock.Hash, + "pivotBlockParentHash": pivotBlock.ParentHash, + }).Debug("Gas station handler detected reorg") + + // Reorg due to parent hash not match, remove the last epoch. + h.handleReorg() + h.fromEpoch-- + return false, nil + } + + blockHashes, blocks, err := h.fetchBlocks(cfx, epoch, pivotBlock) + if err != nil { + logrus.WithFields(logrus.Fields{ + "pivotBlockHash": pivotBlock.Hash, + "epoch": epoch, + }).WithError(err).Debug("Gas station handler fetch blocks error") + return false, err + } + + for i := range blocks { + h.handleBlock(blocks[i]) + } + + h.push(blockHashes) + h.fromEpoch++ + return false, nil +} + +func (h *CfxGasStationHandler) fetchBlocks( + cfx sdk.ClientOperator, epoch *cfxtypes.Epoch, pivotBlock *cfxtypes.Block, +) ([]cfxtypes.Hash, []*cfxtypes.Block, error) { + // Get epoch block hashes. + blockHashes, err := cfx.GetBlocksByEpoch(epoch) + if err != nil { + return nil, nil, err + } + + pivotHash := blockHashes[len(blockHashes)-1] + if pivotBlock.Hash != pivotHash { // abandon this epoch due to pivot switched + return nil, nil, errors.New("pivot switched") + } + + var blocks []*cfxtypes.Block + for i := 0; i < len(blockHashes)-1; i++ { + block, err := cfx.GetBlockByHashWithPivotAssumption(blockHashes[i], pivotHash, hexutil.Uint64(h.fromEpoch)) + if err != nil { + return nil, nil, err + } + blocks = append(blocks, &block) + } + + blocks = append(blocks, pivotBlock) + return blockHashes, blocks, nil +} + +func (h *CfxGasStationHandler) handleReorg() { + var blockHashes []string + for _, bh := range h.pop() { + blockHashes = append(blockHashes, bh.String()) + } + h.window.Remove(blockHashes...) + + logrus.WithField("blockHashes", blockHashes).Info("Gas station handler removed blocks due to reorg") +} + +func (h *CfxGasStationHandler) handleBlock(block *cfxtypes.Block) { + ratio, _ := big.NewInt(0).Div(block.GasUsed.ToInt(), block.GasLimit.ToInt()).Float64() + blockFee := &BlockPriorityFee{ + number: block.BlockNumber.ToInt().Uint64(), + hash: block.Hash.String(), + baseFee: block.BaseFeePerGas.ToInt(), + gasUsedRatio: ratio, + } + + var txnTips []*TxnPriorityFee + for i := range block.Transactions { + txn := block.Transactions[i] + + // Skip unexecuted transaction, e.g. + // 1) already executed in previous block + // 2) never executed, e.g. nonce mismatch + if !util.IsTxExecutedInBlock(&txn) { + continue + } + + // Calculate max priority fee per gas if not set + if txn.MaxPriorityFeePerGas == nil { + maxFeePerGas := txn.MaxFeePerGas.ToInt() + if maxFeePerGas == nil { + maxFeePerGas = txn.GasPrice.ToInt() + } + + baseFeePerGas := block.BaseFeePerGas.ToInt() + txn.MaxPriorityFeePerGas = (*hexutil.Big)(big.NewInt(0).Sub(maxFeePerGas, baseFeePerGas)) + } + logrus.WithFields(logrus.Fields{ + "txnHash": txn.Hash, + "maxPriorityFeePerGas": txn.MaxPriorityFeePerGas, + "maxFeePerGas": txn.MaxFeePerGas, + "baseFeePerGas": block.BaseFeePerGas, + "gasPrice": txn.GasPrice, + }).Debug("Gas station handler calculated txn priority fee") + + txnTips = append(txnTips, &TxnPriorityFee{ + hash: txn.Hash.String(), + tip: txn.MaxPriorityFeePerGas.ToInt(), + }) + } + + logrus.WithFields(logrus.Fields{ + "blockFeeInfo": blockFee, + "execTxnCount": len(txnTips), + }).Debug("Gas station handler pushing block") + + blockFee.Append(txnTips...) + h.window.Push(blockFee) +} + +func (h *CfxGasStationHandler) pop() []cfxtypes.Hash { + if h.epochBlockHashList.Len() == 0 { + return nil + } + + lastElement := h.epochBlockHashList.Back() + return h.epochBlockHashList.Remove(lastElement).([]cfxtypes.Hash) +} + +func (h *CfxGasStationHandler) prevEpochPivotBlockHash() cfxtypes.Hash { + if h.epochBlockHashList.Len() == 0 { + return cfxtypes.Hash("") + } + + blockHashes := h.epochBlockHashList.Back().Value.([]cfxtypes.Hash) + return blockHashes[len(blockHashes)-1] +} + +func (h *CfxGasStationHandler) push(blockHashes []cfxtypes.Hash) { + h.epochBlockHashList.PushBack(blockHashes) + if h.epochBlockHashList.Len() > maxCachedBlockHashEpochs { + // Remove old epoch block hashes if capacity is reached + h.epochBlockHashList.Remove(h.epochBlockHashList.Front()) + } +} + +func (h *CfxGasStationHandler) updateStatus(err error) { + if err != nil && !utils.IsRPCJSONError(err) { + // Set the gas station as unavailable due to network error. + h.status.Store(err) + } else { + h.status.Store(StationStatusOk) + } +} + +func (h *CfxGasStationHandler) refreshClusterNodes() error { + clients, err := h.clientProvider.GetClientsByGroup(node.GroupCfxHttp) + if err != nil { + return err + } + + h.clients = clients + return nil +} + +func (h *CfxGasStationHandler) resetSyncTicker(syncTicker *time.Timer, complete bool, err error) { + switch { + case err != nil: + syncTicker.Reset(syncIntervalNormal) + case complete: + syncTicker.Reset(syncIntervalNormal) + default: + syncTicker.Reset(syncIntervalCatchUp) + } +} + +func (h *CfxGasStationHandler) Suggest(cfx sdk.ClientOperator) (*types.SuggestedGasFees, error) { + if status := h.status.Load(); status != StationStatusOk { + return nil, status.(error) + } + + latestBlock, err := cfx.GetBlockSummaryByEpoch(cfxtypes.EpochLatestState) + if err != nil { + return nil, err + } + + baseFeePerGas := latestBlock.BaseFeePerGas.ToInt() + // Calculate the gas fee stats from the priority fee window. + stats := h.window.Calculate(h.config.Percentiles[:]) + + priorityFees := stats.AvgPercentiledPriorityFee + if priorityFees == nil { // use gas fees directly from the blockchain if no estimation made + oracleFee, err := cfx.GetMaxPriorityFeePerGas() + if err != nil { + return nil, err + } + + for i := 0; i < 3; i++ { + priorityFees = append(priorityFees, oracleFee.ToInt()) + } + } + + return &types.SuggestedGasFees{ + Low: types.GasFeeEstimation{ + SuggestedMaxPriorityFeePerGas: (*hexutil.Big)(priorityFees[0]), + SuggestedMaxFeePerGas: (*hexutil.Big)(big.NewInt(0).Add(baseFeePerGas, priorityFees[0])), + }, + Medium: types.GasFeeEstimation{ + SuggestedMaxPriorityFeePerGas: (*hexutil.Big)(priorityFees[1]), + SuggestedMaxFeePerGas: (*hexutil.Big)(big.NewInt(0).Add(baseFeePerGas, priorityFees[1])), + }, + High: types.GasFeeEstimation{ + SuggestedMaxPriorityFeePerGas: (*hexutil.Big)(priorityFees[2]), + SuggestedMaxFeePerGas: (*hexutil.Big)(big.NewInt(0).Add(baseFeePerGas, priorityFees[2])), + }, + EstimatedBaseFee: latestBlock.BaseFeePerGas, + NetworkCongestion: stats.NetworkCongestion, + LatestPriorityFeeRange: ToHexBigSlice(stats.LatestPriorityFeeRange), + HistoricalPriorityFeeRange: ToHexBigSlice(stats.HistoricalPriorityFeeRange), + HistoricalBaseFeeRange: ToHexBigSlice(stats.HistoricalBaseFeeRange), + PriorityFeeTrend: stats.PriorityFeeTrend, + BaseFeeTrend: stats.BaseFeeTrend, + }, nil +} diff --git a/rpc/handler/gasstation.go b/rpc/handler/gasstation.go index 03f015f5..d51772f0 100644 --- a/rpc/handler/gasstation.go +++ b/rpc/handler/gasstation.go @@ -1,146 +1,362 @@ package handler import ( + "container/list" "math/big" + "slices" + "sync" + "time" - "github.com/Conflux-Chain/confura/store" - itypes "github.com/Conflux-Chain/confura/types" - "github.com/Conflux-Chain/confura/util" + "github.com/Conflux-Chain/confura/node" + "github.com/Conflux-Chain/confura/types" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/sirupsen/logrus" ) -const ( // gas station price configs - ConfGasStationPriceFast = "gasstation_price_fast" - ConfGasStationPriceFastest = "gasstation_price_fastest" - ConfGasStationPriceSafeLow = "gasstation_price_safe_low" - ConfGasStationPriceAverage = "gasstation_price_average" +const ( + // The interval to sync data in normal mode. + syncIntervalNormal time.Duration = time.Second + // The interval to sync data in catch-up mode. + syncIntervalCatchUp time.Duration = time.Millisecond + // The interval to update the node cluster. + clusterUpdateInterval = time.Minute ) +// GasStationStatus represents the status of gas station. +type GasStationStatus struct{ error } + var ( - defaultGasStationPriceFastest = big.NewInt(1_000_000_000) // (1G) - defaultGasStationPriceFast = big.NewInt(1_000_000_000) // (1G) - defaultGasStationPriceAverage = big.NewInt(1_000_000_000) // (1G) - defaultGasStationPriceSafeLow = big.NewInt(1_000_000_000) // (1G) - - maxGasStationPriceFastest = big.NewInt(10_000_000_000) // (10G) - maxGasStationPriceFast = big.NewInt(10_000_000_000) // (10G) - maxGasStationPriceAverage = big.NewInt(10_000_000_000) // (10G) - maxGasStationPriceSafeLow = big.NewInt(10_000_000_000) // (10G) + StationStatusOk GasStationStatus = GasStationStatus{} + StationStatusClientUnavailable GasStationStatus = GasStationStatus{node.ErrClientUnavailable} ) -// GasStationHandler RPC handler to serve gas price estimation etc., -type GasStationHandler struct { - db, cache store.Configurable +type GasStationConfig struct { + // Whether to enable gas station. + Enabled bool + // Number of blocks/epochs to peek for gas price estimation. + HistoricalPeekCount int `default:"100"` + // Percentiles for average txn gas price mapped to three levels of urgency (`low`, `medium` and `high`). + Percentiles [3]float64 `default:"[1, 50, 99]"` +} + +// BlockPriorityFee holds the gas fees of transactions within a single block. +type BlockPriorityFee struct { + number uint64 // Block number + hash string // Block hash + baseFee *big.Int // Base fee per gas of the block + gasUsedRatio float64 // Gas used ratio of the block + txnTips []*TxnPriorityFee // Slice of ordered transaction priority fees } -func NewGasStationHandler(db, cache store.Configurable) *GasStationHandler { - return &GasStationHandler{db: db, cache: cache} +// Append appends transaction priority fees to the block and sorts them in ascending order. +func (b *BlockPriorityFee) Append(txnTips ...*TxnPriorityFee) { + b.txnTips = append(b.txnTips, txnTips...) + slices.SortFunc(b.txnTips, func(l, r *TxnPriorityFee) int { + return l.tip.Cmp(r.tip) + }) } -func (handler *GasStationHandler) GetPrice() (*itypes.GasStationPrice, error) { - gasStationPriceConfs := []string{ // order is important !!! - ConfGasStationPriceFast, - ConfGasStationPriceFastest, - ConfGasStationPriceSafeLow, - ConfGasStationPriceAverage, +// Percentile calculates the percentile of transaction priority fees +func (b *BlockPriorityFee) Percentile(p float64) *big.Int { + // Return nil if there are no transaction tips + if len(b.txnTips) == 0 { + return nil } - maxGasStationPrices := []*big.Int{ // order is important !!! - maxGasStationPriceFast, - maxGasStationPriceFastest, - maxGasStationPriceSafeLow, - maxGasStationPriceAverage, + // Ensure the percentile is between 0 and 100 + if p < 0 || p > 100 { + return nil } - var gasPriceConf map[string]interface{} - var err error + // Calculate the index corresponding to the given percentile + index := int(p * float64(len(b.txnTips)) / 100) + // Ensure the index is within bounds + if index >= len(b.txnTips) { + index = len(b.txnTips) - 1 + } - useCache := false - if !util.IsInterfaceValNil(handler.cache) { // load from cache first - useCache = true + return b.txnTips[index].tip +} - gasPriceConf, err = handler.cache.LoadConfig(gasStationPriceConfs...) - if err != nil { - logrus.WithError(err).Error("Failed to get gasstation price config from cache") - useCache = false - } else { - logrus.WithField("gasPriceConf", gasPriceConf).Debug("Loaded gasstation price config from cache") +// TipRange returns the range of transaction priority fees in the block. +func (b *BlockPriorityFee) TipRange() []*big.Int { + if len(b.txnTips) == 0 { + return nil + } + + return []*big.Int{b.txnTips[0].tip, b.txnTips[len(b.txnTips)-1].tip} +} + +// TxnPriorityFee holds the priority fee information of a single transaction. +type TxnPriorityFee struct { + hash string // Transaction hash + tip *big.Int // Priority fee of the transaction +} + +// PriorityFeeWindow holds priority fees of the latest blocks using a sliding window mechanism. +type PriorityFeeWindow struct { + mu sync.Mutex + feeChain *list.List // List of chronologically ordered blocks + hashToFee map[string]*list.Element // Map of block hash to linked list element + capacity int // Number of blocks to maintain in the window + historicalBaseFeeRanges []*big.Int // Range of base fees per gas over a historical period + historicalPriorityFeeRange []*big.Int // Range of priority fees per gas over a historical period +} + +// NewPriorityFeeWindow creates a new `PriorityFeeWindow` with the specified capacity. +func NewPriorityFeeWindow(capacity int) *PriorityFeeWindow { + return &PriorityFeeWindow{ + feeChain: list.New(), + hashToFee: make(map[string]*list.Element), + capacity: capacity, + } +} + +// Size returns the number of blocks in the window. +func (w *PriorityFeeWindow) Size() int { + w.mu.Lock() + defer w.mu.Unlock() + + return w.feeChain.Len() +} + +// Remove removes blocks from the window. +func (w *PriorityFeeWindow) Remove(blockHashes ...string) { + w.mu.Lock() + defer w.mu.Unlock() + + for i := range blockHashes { + if e, ok := w.hashToFee[blockHashes[i]]; ok { + w.feeChain.Remove(e) } } +} + +// Push adds a new block to the window. +func (w *PriorityFeeWindow) Push(blockFee *BlockPriorityFee) { + w.mu.Lock() + defer w.mu.Unlock() + + if _, ok := w.hashToFee[blockFee.hash]; ok { // Block already exists + return + } + + w.insertBlock(blockFee) + w.updateHistoricalBaseFeeRange(blockFee) + w.updateHistoricalPriorityFeeRange(blockFee) - if len(gasPriceConf) != len(gasStationPriceConfs) && !util.IsInterfaceValNil(handler.db) { // load from db - gasPriceConf, err = handler.db.LoadConfig(gasStationPriceConfs...) - if err != nil { - logrus.WithError(err).Error("Failed to get gasstation price config from db") + // If the window is full, prune the oldest block. + for w.feeChain.Len() > w.capacity { + w.feeChain.Remove(w.feeChain.Front()) + } +} - goto defaultR +// insertBlock inserts a block to the linked list. +func (w *PriorityFeeWindow) insertBlock(blockFee *BlockPriorityFee) { + // Locate the postion where this block should be inserted + var e *list.Element + for e = w.feeChain.Back(); e != nil; e = e.Prev() { + if e.Value.(*BlockPriorityFee).number < blockFee.number { + break } + } - logrus.WithField("gasPriceConf", gasPriceConf).Debug("Gasstation price loaded from db") - - if useCache { // update cache - for confName, confVal := range gasPriceConf { - if err := handler.cache.StoreConfig(confName, confVal); err != nil { - logrus.WithError(err).Error("Failed to update gas station price config in cache") - } else { - logrus.WithFields(logrus.Fields{ - "confName": confName, "confVal": confVal, - }).Debug("Update gas station price config in cache") - } - } + if e == nil { + w.hashToFee[blockFee.hash] = w.feeChain.PushFront(blockFee) + } else { + w.hashToFee[blockFee.hash] = w.feeChain.InsertAfter(blockFee, e) + } +} + +func (w *PriorityFeeWindow) updateHistoricalBaseFeeRange(blockFee *BlockPriorityFee) { + // Update the historical base fee range + if w.historicalBaseFeeRanges == nil { // initial setup + w.historicalBaseFeeRanges = []*big.Int{ + big.NewInt(0).Set(blockFee.baseFee), big.NewInt(0).Set(blockFee.baseFee), } + return + } + + if blockFee.baseFee.Cmp(w.historicalBaseFeeRanges[0]) < 0 { // update min + w.historicalBaseFeeRanges[0].Set(blockFee.baseFee) + } + + if blockFee.baseFee.Cmp(w.historicalBaseFeeRanges[1]) > 0 { // update max + w.historicalBaseFeeRanges[1].Set(blockFee.baseFee) } +} - if len(gasPriceConf) == len(gasStationPriceConfs) { - var gsp itypes.GasStationPrice +func (w *PriorityFeeWindow) updateHistoricalPriorityFeeRange(blockFee *BlockPriorityFee) { + tipRange := blockFee.TipRange() + if tipRange == nil { + return + } - setPtrs := []**hexutil.Big{ // order is important !!! - &gsp.Fast, &gsp.Fastest, &gsp.SafeLow, &gsp.Average, + if w.historicalPriorityFeeRange == nil { // initial setup + w.historicalPriorityFeeRange = []*big.Int{ + big.NewInt(0).Set(tipRange[0]), big.NewInt(0).Set(tipRange[1]), } + return + } + + if tipRange[0].Cmp(w.historicalPriorityFeeRange[0]) < 0 { // update min + w.historicalPriorityFeeRange[0] = big.NewInt(0).Set(tipRange[0]) + } - for i, gpc := range gasStationPriceConfs { - var bigV hexutil.Big + if tipRange[1].Cmp(w.historicalPriorityFeeRange[1]) > 0 { // update max + w.historicalPriorityFeeRange[1] = big.NewInt(0).Set(tipRange[1]) + } +} + +type GasStats struct { + NetworkCongestion float64 // Current congestion on the network (0 to 1) + LatestPriorityFeeRange []*big.Int // Range of priority fees for recent transactions + HistoricalPriorityFeeRange []*big.Int // Range of priority fees over a historical period + HistoricalBaseFeeRange []*big.Int // Range of base fees over a historical period + AvgPercentiledPriorityFee []*big.Int // Average priority fee per gas by given percentiles + PriorityFeeTrend types.GasFeeTrend // Current trend in priority fees + BaseFeeTrend types.GasFeeTrend // Current trend in base fees +} - gasPriceHex, ok := gasPriceConf[gpc].(string) - if !ok { - logrus.WithFields(logrus.Fields{ - "gasPriceConfig": gpc, "gasPriceConf": gasPriceConf, - }).Error("Invalid gas statation gas price config") +// Calculate calculates the gas fee statistics from the data within the window. +func (w *PriorityFeeWindow) Calculate(percentiles []float64) (stats GasStats) { + w.mu.Lock() + defer w.mu.Unlock() - goto defaultR - } + // Calculate the average priority fee of the given percentiles. + stats.AvgPercentiledPriorityFee = w.calculateAvgPriorityFees(percentiles) - if err := bigV.UnmarshalText([]byte(gasPriceHex)); err != nil { - logrus.WithFields(logrus.Fields{ - "gasPriceConfig": gpc, "gasPriceHex": gasPriceHex, - }).Error("Failed to unmarshal gas price from hex string") + // Calculate latest priority fee range. + stats.LatestPriorityFeeRange = w.calculateLatestPriorityFeeRange() + stats.HistoricalPriorityFeeRange = w.historicalPriorityFeeRange + stats.HistoricalBaseFeeRange = w.historicalBaseFeeRanges - goto defaultR - } + // Calculate the network congestion. + stats.NetworkCongestion = w.calculateNetworkCongestion() - if maxGasStationPrices[i].Cmp((*big.Int)(&bigV)) < 0 { - logrus.WithFields(logrus.Fields{ - "gasPriceConfig": gpc, "bigV": bigV.ToInt(), - }).Warn("Configured gas statation price overflows max limit, pls double check") + // Calculate the trend of priority fee and base fee. + stats.PriorityFeeTrend, stats.BaseFeeTrend = w.calculateFeeTrend() - *setPtrs[i] = (*hexutil.Big)(maxGasStationPrices[i]) - } else { - *setPtrs[i] = &bigV + return stats +} + +func (w *PriorityFeeWindow) calculateAvgPriorityFees(percentiles []float64) (res []*big.Int) { + for _, p := range percentiles { + if fee := w.calculateAvgPriorityFee(p); fee == nil { + return nil + } else { + res = append(res, fee) + } + } + + return res +} + +func (w *PriorityFeeWindow) calculateAvgPriorityFee(p float64) *big.Int { + totalFee := big.NewInt(0) + totalSize := int64(0) + + for e := w.feeChain.Front(); e != nil; e = e.Next() { + blockFee := e.Value.(*BlockPriorityFee) + if v := blockFee.Percentile(p); v != nil { + totalFee.Add(totalFee, v) + totalSize++ + } + } + + if totalSize != 0 { // Return the average priority fee per gas per block in the window. + return totalFee.Div(totalFee, big.NewInt(totalSize)) + } + + return nil +} + +func (w *PriorityFeeWindow) calculateFeeTrend() (priorityFeeTrend, baseFeeTrend types.GasFeeTrend) { + if w.feeChain.Len() < 2 { + return types.GasFeeTrendUp, types.GasFeeTrendUp + } + + latestBlockFee := w.feeChain.Back().Value.(*BlockPriorityFee) + prevBlockFee := w.feeChain.Back().Prev().Value.(*BlockPriorityFee) + + baseFeeTrend = w.determineTrend(prevBlockFee.baseFee, latestBlockFee.baseFee) + priorityFeeTrend = w.determinePriorityFeeTrend(prevBlockFee, latestBlockFee) + + return priorityFeeTrend, baseFeeTrend +} + +func (w *PriorityFeeWindow) determineTrend(prevFee, latestFee *big.Int) types.GasFeeTrend { + if cmp := prevFee.Cmp(latestFee); cmp < 0 { + return types.GasFeeTrendUp + } else if cmp > 0 { + return types.GasFeeTrendDown + } + + return "" +} + +func (w *PriorityFeeWindow) determinePriorityFeeTrend(prevBlockFee, latestBlockFee *BlockPriorityFee) types.GasFeeTrend { + prevAvgP50 := prevBlockFee.Percentile(50) + if prevAvgP50 == nil { + prevAvgP50 = big.NewInt(0) + } + + latestAvgP50 := latestBlockFee.Percentile(50) + if latestAvgP50 == nil { + latestAvgP50 = big.NewInt(0) + } + + if cmp := prevAvgP50.Cmp(latestAvgP50); cmp < 0 { + return types.GasFeeTrendUp + } else if cmp > 0 { + return types.GasFeeTrendDown + } + + return "" +} + +func (w *PriorityFeeWindow) calculateNetworkCongestion() float64 { + if e := w.feeChain.Back(); e != nil { + return e.Value.(*BlockPriorityFee).gasUsedRatio + } + + return 0 +} + +func (w *PriorityFeeWindow) calculateLatestPriorityFeeRange() (res []*big.Int) { + for e := w.feeChain.Front(); e != nil; e = e.Next() { + tipRange := e.Value.(*BlockPriorityFee).TipRange() + if tipRange == nil { // skip empty range + continue + } + + if res == nil { // initial setup + res = []*big.Int{ + big.NewInt(0).Set(tipRange[0]), big.NewInt(0).Set(tipRange[1]), } + continue + } + + if tipRange[0].Cmp(res[0]) < 0 { + res[0] = big.NewInt(0).Set(tipRange[0]) } - return &gsp, nil + if tipRange[1].Cmp(res[1]) > 0 { + res[1] = big.NewInt(0).Set(tipRange[1]) + } } -defaultR: - logrus.Debug("Gas station uses default as final gas price") + return res +} - // use default gas price - return &itypes.GasStationPrice{ - Fast: (*hexutil.Big)(defaultGasStationPriceFast), - Fastest: (*hexutil.Big)(defaultGasStationPriceFastest), - SafeLow: (*hexutil.Big)(defaultGasStationPriceSafeLow), - Average: (*hexutil.Big)(defaultGasStationPriceAverage), - }, nil +// ToHexBigSlice converts a slice of `*big.Int` to a slice of `*hexutil.Big`. +func ToHexBigSlice(arr []*big.Int) []*hexutil.Big { + if len(arr) == 0 { + return nil + } + + res := make([]*hexutil.Big, len(arr)) + for i, v := range arr { + res[i] = (*hexutil.Big)(v) + } + return res } diff --git a/rpc/server.go b/rpc/server.go index b93bf995..b9ec645c 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -24,7 +24,7 @@ const ( func MustNewNativeSpaceServer( registry *rate.Registry, clientProvider *infuraNode.CfxClientProvider, - gashandler *handler.GasStationHandler, + gashandler *handler.CfxGasStationHandler, exposedModules []string, option ...CfxAPIOption, ) *rpc.Server { diff --git a/types/gastation.go b/types/gastation.go index 02753d21..713bb3db 100644 --- a/types/gastation.go +++ b/types/gastation.go @@ -4,6 +4,48 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" ) +// GasFeeTrend represents the trend of gas fees. +type GasFeeTrend string + +const ( + GasFeeTrendUp GasFeeTrend = "up" + GasFeeTrendDown GasFeeTrend = "down" +) + +// Get estimated CIP-1559 gas fees +type SuggestedGasFees struct { + // Estimated values for transactions by level of urgency. + Low, Medium, High GasFeeEstimation + // The current estimated base fee per gas on the network. + EstimatedBaseFee *hexutil.Big `json:"estimatedBaseFee,omitempty"` + // The current congestion on the network, represented as a number between 0 and 1. + // A lower network congestion score (eg., 0.1), indicates that fewer transactions are being submitted, + // so it’s cheaper to validate transactions. + NetworkCongestion float64 `json:"networkCongestion,omitempty"` + // The range of priority fees per gas for recent transactions on the network. + LatestPriorityFeeRange []*hexutil.Big `json:"latestPriorityFeeRange,omitempty"` + // The range of priority fees per gas for transactions on the network over a historical period. + HistoricalPriorityFeeRange []*hexutil.Big `json:"historicalPriorityFeeRange,omitempty"` + // The range of base fees per gas on the network over a historical period. + HistoricalBaseFeeRange []*hexutil.Big `json:"historicalBaseFeeRange,omitempty"` + // The current trend in priority fees or base fees on the network, either up or down (whether + // it’s getting more expensive or cheaper). + PriorityFeeTrend GasFeeTrend `json:"priorityFeeTrend,omitempty"` + BaseFeeTrend GasFeeTrend `json:"baseFeeTrend,omitempty"` +} + +// GasFeeEstimation represents the estimated gas fees for a transaction. +type GasFeeEstimation struct { + // The maximum suggested priority fee per gas to pay to have transactions included in a block. + SuggestedMaxPriorityFeePerGas *hexutil.Big + // The maximum suggested total fee per gas to pay, including both the base fee and the priority fee. + SuggestedMaxFeePerGas *hexutil.Big + // The minimum and maximum estimated wait time (in milliseconds) for a transaction to be included + // in a block at the suggested gas price. + MinWaitTimeEstimate uint64 `json:"minWaitTimeEstimate,omitempty"` + MaxWaitTimeEstimate uint64 `json:"maxWaitTimeEstimate,omitempty"` +} + type GasStationPrice struct { Fast *hexutil.Big `json:"fast"` // Recommended fast gas price in drip Fastest *hexutil.Big `json:"fastest"` // Recommended fastest gas price in drip