Skip to content

Commit

Permalink
Problem: gasUsed, gasLimit and bloom are not aligned in subscribe new…
Browse files Browse the repository at this point in the history
…Heads
  • Loading branch information
mmsqe committed Oct 17, 2024
1 parent a2ad87c commit 4be8c27
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 42 deletions.
30 changes: 4 additions & 26 deletions rpc/backend/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package backend

import (
"bytes"
"fmt"
"math/big"
"strconv"
Expand Down Expand Up @@ -397,18 +396,7 @@ func (b *Backend) HeaderByHash(blockHash common.Hash) (*ethtypes.Header, error)

// BlockBloom query block bloom filter from block results
func (b *Backend) BlockBloom(blockRes *tmrpctypes.ResultBlockResults) (ethtypes.Bloom, error) {
for _, event := range blockRes.FinalizeBlockEvents {
if event.Type != evmtypes.EventTypeBlockBloom {
continue
}

for _, attr := range event.Attributes {
if bytes.Equal([]byte(attr.Key), bAttributeKeyEthereumBloom) {
return ethtypes.BytesToBloom([]byte(attr.Value)), nil
}
}
}
return ethtypes.Bloom{}, errors.New("block bloom event is not found")
return GetBlockBloom(blockRes)
}

// RPCBlockFromTendermintBlock returns a JSON-RPC compatible Ethereum block from a
Expand Down Expand Up @@ -491,20 +479,10 @@ func (b *Backend) RPCBlockFromTendermintBlock(
b.logger.Error("failed to query consensus params", "error", err.Error())
}

var gasUsed uint64
for _, txsResult := range blockRes.TxsResults {
// workaround for cosmos-sdk bug. https://github.com/cosmos/cosmos-sdk/issues/10832
if ShouldIgnoreGasUsed(txsResult) {
// block gas limit has exceeded, other txs must have failed with same reason.
break
}
gas, err := ethermint.SafeUint64(txsResult.GetGasUsed())
if err != nil {
return nil, err
}
gasUsed += gas
gasUsed, err := GetGasUsed(blockRes)
if err != nil {
return nil, err
}

formattedBlock := rpctypes.FormatBlock(
block.Header, block.Size(),
gasLimit, new(big.Int).SetUint64(gasUsed),
Expand Down
33 changes: 33 additions & 0 deletions rpc/backend/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package backend

import (
"bytes"
"fmt"
"math/big"
"sort"
Expand Down Expand Up @@ -339,3 +340,35 @@ func (b *Backend) getValidatorAccount(header *cmttypes.Header) (sdk.AccAddress,
}
return sdk.AccAddressFromBech32(res.AccountAddress)
}

func GetBlockBloom(blockRes *tmrpctypes.ResultBlockResults) (ethtypes.Bloom, error) {
for _, event := range blockRes.FinalizeBlockEvents {
if event.Type != evmtypes.EventTypeBlockBloom {
continue
}

for _, attr := range event.Attributes {
if bytes.Equal([]byte(attr.Key), bAttributeKeyEthereumBloom) {
return ethtypes.BytesToBloom([]byte(attr.Value)), nil
}
}
}
return ethtypes.Bloom{}, errors.New("block bloom event is not found")
}

func GetGasUsed(blockRes *tmrpctypes.ResultBlockResults) (uint64, error) {
var gasUsed uint64
for _, txsResult := range blockRes.TxsResults {
// workaround for cosmos-sdk bug. https://github.com/cosmos/cosmos-sdk/issues/10832
if ShouldIgnoreGasUsed(txsResult) {
// block gas limit has exceeded, other txs must have failed with same reason.
break
}
gas, err := ethermint.SafeUint64(txsResult.GetGasUsed())
if err != nil {
return 0, err
}
gasUsed += gas
}
return gasUsed, nil
}
58 changes: 46 additions & 12 deletions rpc/stream/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ import (
"cosmossdk.io/log"
cmtquery "github.com/cometbft/cometbft/libs/pubsub/query"
rpcclient "github.com/cometbft/cometbft/rpc/client"
tmrpcclient "github.com/cometbft/cometbft/rpc/client"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
tmtypes "github.com/cometbft/cometbft/types"
"github.com/cosmos/cosmos-sdk/client"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/evmos/ethermint/rpc/types"
"github.com/evmos/ethermint/rpc/backend"
rpctypes "github.com/evmos/ethermint/rpc/types"
ethermint "github.com/evmos/ethermint/types"
evmtypes "github.com/evmos/ethermint/x/evm/types"
"google.golang.org/grpc"
Expand Down Expand Up @@ -54,7 +57,7 @@ type validatorAccountFunc func(
type RPCStream struct {
evtClient rpcclient.EventsClient
logger log.Logger
txDecoder sdk.TxDecoder
clientCtx client.Context

headerStream *Stream[RPCHeader]
pendingTxStream *Stream[common.Hash]
Expand All @@ -67,14 +70,13 @@ type RPCStream struct {
func NewRPCStreams(
evtClient rpcclient.EventsClient,
logger log.Logger,
txDecoder sdk.TxDecoder,
clientCtx client.Context,
validatorAccount validatorAccountFunc,
) (*RPCStream, error) {
s := &RPCStream{
evtClient: evtClient,
logger: logger,
txDecoder: txDecoder,

evtClient: evtClient,
logger: logger,
clientCtx: clientCtx,
headerStream: NewStream[RPCHeader](headerStreamSegmentSize, headerStreamCapacity),
pendingTxStream: NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity),
logStream: NewStream[*ethtypes.Log](logStreamSegmentSize, logStreamCapacity),
Expand Down Expand Up @@ -153,9 +155,9 @@ func (s *RPCStream) start(
continue
}

baseFee := types.BaseFeeFromEvents(data.ResultFinalizeBlock.Events)
baseFee := rpctypes.BaseFeeFromEvents(data.ResultFinalizeBlock.Events)
res, err := s.validatorAccount(
types.ContextWithHeight(data.Block.Height),
rpctypes.ContextWithHeight(data.Block.Height),
&evmtypes.QueryValidatorAccountRequest{
ConsAddress: sdk.ConsAddress(data.Block.Header.ProposerAddress).String(),
},
Expand All @@ -169,10 +171,42 @@ func (s *RPCStream) start(
s.logger.Error("failed to convert validator account", "err", err)
continue
}
// TODO: fetch bloom from events
header := types.EthHeaderFromTendermint(data.Block.Header, ethtypes.Bloom{}, baseFee, validator)
height := data.Block.Header.Height
ctx := rpctypes.ContextWithHeight(height)
gasLimit, err := rpctypes.BlockMaxGasFromConsensusParams(ctx, s.clientCtx, height)
if err != nil {
s.logger.Error("failed to query consensus params", "error", err.Error())
continue
}
sc, ok := s.clientCtx.Client.(tmrpcclient.SignClient)
if !ok {
fmt.Println("failed to fetch block result from Tendermint", "height", height, "error", "invalid rpc client")
continue
}
blockRes, err := sc.BlockResults(ctx, &height)
if err != nil {
s.logger.Error("failed to fetch block result from Tendermint", "height", height, "error", err.Error())
continue
}
gasUsed, err := backend.GetGasUsed(blockRes)
if err != nil {
s.logger.Error("failed to get gasUsed", "height", height, "error", err.Error())
continue
}
bloom, err := backend.GetBlockBloom(blockRes)
if err != nil {
s.logger.Error("HeaderByHash BlockBloom failed", "height", height, "error", err.Error())
continue
}
header := rpctypes.EthHeaderFromTendermint(data.Block.Header, bloom, baseFee, validator)
limit, err := ethermint.SafeUint64(gasLimit)
if err != nil {
s.logger.Error("invalid gasLimit", "height", height, "error", err.Error())
continue
}
header.GasLimit = limit
header.GasUsed = gasUsed
s.headerStream.Add(RPCHeader{EthHeader: header, Hash: common.BytesToHash(data.Block.Header.Hash())})

case ev, ok := <-chLogs:
if !ok {
chLogs = nil
Expand Down
2 changes: 1 addition & 1 deletion server/json_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func StartJSONRPC(srvCtx *server.Context,
var err error
queryClient := rpctypes.NewQueryClient(clientCtx)
for i := 0; i < MaxRetry; i++ {
rpcStream, err = stream.NewRPCStreams(evtClient, logger, clientCtx.TxConfig.TxDecoder(), queryClient.ValidatorAccount)
rpcStream, err = stream.NewRPCStreams(evtClient, logger, clientCtx, queryClient.ValidatorAccount)
if err == nil {
break
}
Expand Down
14 changes: 11 additions & 3 deletions tests/integration_tests/test_websockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,17 @@ async def subscriber_test(c: Client, w3):
assert int(msgs[1]["number"], 0) == int(msgs[0]["number"], 0) + 1
assert int(msgs[2]["number"], 0) == int(msgs[1]["number"], 0) + 1
for msg in msgs:
b = w3.eth.get_block(msg["number"])
assert HexBytes(msg["hash"]) == b["hash"]
assert msg["miner"] == b["miner"]
b = w3.provider.make_request(
"eth_getBlockByNumber", [msg["number"], True]
)["result"]
for key in b:
if not msg.get(key):
continue
if key == "miner":
eq = msg[key].lower() == b[key]
else:
eq = msg[key] == b[key]
assert eq, key
await assert_unsubscribe(c, sub_id)

async def logs_test(c: Client, w3, contract):
Expand Down

0 comments on commit 4be8c27

Please sign in to comment.