Skip to content
This repository has been archived by the owner on Apr 4, 2024. It is now read-only.

Apply the bloom filter when query the ethlogs with range of blocks #570

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions ethereum/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
ethtypes "github.com/ethereum/go-ethereum/core/types"

"github.com/tharsis/ethermint/ethereum/rpc/namespaces/eth/filters"
"github.com/tharsis/ethermint/ethereum/rpc/types"
"github.com/tharsis/ethermint/server/config"
ethermint "github.com/tharsis/ethermint/types"
Expand Down Expand Up @@ -57,6 +58,7 @@ type Backend interface {
EstimateGas(args evmtypes.CallArgs, blockNrOptional *types.BlockNumber) (hexutil.Uint64, error)
RPCGasCap() uint64
RPCMinGasPrice() int64
GetFilteredBlocks(from int64, to int64, filter [][]filters.BloomIV, filterAddresses bool) ([]int64, error)
}

var _ Backend = (*EVMBackend)(nil)
Expand Down Expand Up @@ -713,3 +715,65 @@ func (e *EVMBackend) RPCMinGasPrice() int64 {

return ethermint.DefaultGasPrice
}

// GetFilteredBlocks returns the block height list match the given bloom filters.
func (e *EVMBackend) GetFilteredBlocks(
from int64,
to int64,
filters [][]filters.BloomIV,
filterAddresses bool,
) ([]int64, error) {
matchedBlocks := make([]int64, 0)

BLOCKS:
for height := from; height <= to; height++ {
if err := e.ctx.Err(); err != nil {
e.logger.Error("EVMBackend context error", "err", err)
return nil, err
}

h := height
bloom, err := e.BlockBloom(&h)
if err != nil {
e.logger.Error("retrieve header failed", "blockHeight", height, "err", err)
return nil, err
}

for i, filter := range filters {
// filter the header bloom with the addresses
if filterAddresses && i == 0 {
if !checkMatches(bloom, filter) {
continue BLOCKS
}

// the filter doesn't have any topics
if len(filters) == 1 {
matchedBlocks = append(matchedBlocks, height)
continue BLOCKS
}
continue
}

// filter the bloom with topics
if len(filter) > 0 && !checkMatches(bloom, filter) {
continue BLOCKS
}
}
matchedBlocks = append(matchedBlocks, height)
}

return matchedBlocks, nil
}

// checkMatches revised the function from
// https://github.com/ethereum/go-ethereum/blob/401354976bb44f0ad4455ca1e0b5c0dc31d9a5f5/core/types/bloom9.go#L88
func checkMatches(bloom ethtypes.Bloom, filter []filters.BloomIV) bool {
for _, bloomIV := range filter {
if bloomIV.V[0] == bloomIV.V[0]&bloom[bloomIV.I[0]] &&
bloomIV.V[1] == bloomIV.V[1]&bloom[bloomIV.I[1]] &&
bloomIV.V[2] == bloomIV.V[2]&bloom[bloomIV.I[2]] {
return true
}
}
return false
}
Comment on lines +768 to +779
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use the BloomLookup func instead?

Copy link
Contributor Author

@JayT106 JayT106 Sep 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I didn't use BloomLookup is trying to avoid the calculation in https://github.com/ethereum/go-ethereum/blob/401354976bb44f0ad4455ca1e0b5c0dc31d9a5f5/core/types/bloom9.go#L139
If we use it, the bloom for the addresses and the topics in the filter will be calculated against every block.

Also this PR I can put the one-time bloom calculation when the RPC layer creates the filter.

2 changes: 2 additions & 0 deletions ethereum/rpc/namespaces/eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Backend interface {

GetTransactionLogs(txHash common.Hash) ([]*ethtypes.Log, error)
BloomStatus() (uint64, uint64)

GetFilteredBlocks(from int64, to int64, bloomIndexes [][]BloomIV, filterAddresses bool) ([]int64, error)
}

// consider a filter inactive if it has not been polled for within deadline
Expand Down
136 changes: 73 additions & 63 deletions ethereum/rpc/namespaces/eth/filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package filters

import (
"context"
"fmt"
"encoding/binary"
"math/big"

"github.com/tharsis/ethermint/ethereum/rpc/types"
Expand All @@ -11,17 +11,25 @@ import (
"github.com/tendermint/tendermint/libs/log"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/bloombits"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/filters"
)

// BloomIV represents the bit indexes and value inside the bloom filter that belong
// to some key.
type BloomIV struct {
I [3]uint
V [3]byte
}

// Filter can be used to retrieve and filter logs.
type Filter struct {
logger log.Logger
backend Backend
criteria filters.FilterCriteria
matcher *bloombits.Matcher

bloomFilters [][]BloomIV // Filter the system is matching for
}

// NewBlockFilter creates a new filter which directly inspects the contents of
Expand Down Expand Up @@ -54,8 +62,6 @@ func NewRangeFilter(logger log.Logger, backend Backend, begin, end int64, addres
filtersBz = append(filtersBz, filter)
}

size, _ := backend.BloomStatus()

// Create a generic filter and convert it into a range filter
criteria := filters.FilterCriteria{
FromBlock: big.NewInt(begin),
Expand All @@ -64,16 +70,16 @@ func NewRangeFilter(logger log.Logger, backend Backend, begin, end int64, addres
Topics: topics,
}

return newFilter(logger, backend, criteria, bloombits.NewMatcher(size, filtersBz))
return newFilter(logger, backend, criteria, createBloomFilters(filtersBz))
}

// newFilter returns a new Filter
func newFilter(logger log.Logger, backend Backend, criteria filters.FilterCriteria, matcher *bloombits.Matcher) *Filter {
func newFilter(logger log.Logger, backend Backend, criteria filters.FilterCriteria, bloomFilters [][]BloomIV) *Filter {
return &Filter{
logger: logger,
backend: backend,
criteria: criteria,
matcher: matcher,
logger: logger,
backend: backend,
criteria: criteria,
bloomFilters: bloomFilters,
}
}

Expand Down Expand Up @@ -132,52 +138,25 @@ func (f *Filter) Logs(_ context.Context) ([]*ethtypes.Log, error) {
f.criteria.ToBlock = big.NewInt(head + maxToOverhang)
}

for i := f.criteria.FromBlock.Int64(); i <= f.criteria.ToBlock.Int64(); i++ {
block, err := f.backend.GetBlockByNumber(types.BlockNumber(i), false)
if err != nil {
return logs, errors.Wrapf(err, "failed to fetch block by number %d", i)
}

if block["transactions"] == nil {
continue
}

var txHashes []common.Hash

txs, ok := block["transactions"].([]interface{})
if !ok {
_, ok = block["transactions"].([]common.Hash)
if !ok {
f.logger.Error(
"reading transactions from block data failed",
"type", fmt.Sprintf("%T", block["transactions"]),
)
}
from := f.criteria.FromBlock.Int64()
to := f.criteria.ToBlock.Int64()

continue
}
blocks, err := f.backend.GetFilteredBlocks(from, to, f.bloomFilters, len(f.criteria.Addresses) > 0)
if err != nil {
return nil, err
}

if len(txs) == 0 {
continue
for _, height := range blocks {
ethLogs, err := f.backend.GetLogsByNumber(types.BlockNumber(height))
if err != nil {
return logs, errors.Wrapf(err, "failed to fetch block by number %d", height)
}

for _, tx := range txs {
txHash, ok := tx.(common.Hash)
if !ok {
f.logger.Error(
"transactions list contains non-hash element",
"type", fmt.Sprintf("%T", tx),
)
continue
}

txHashes = append(txHashes, txHash)
for _, ethLog := range ethLogs {
filtered := FilterLogs(ethLog, f.criteria.FromBlock, f.criteria.ToBlock, f.criteria.Addresses, f.criteria.Topics)
logs = append(logs, filtered...)
}

logsMatched := f.checkMatches(txHashes)
logs = append(logs, logsMatched...)
}

return logs, nil
}

Expand Down Expand Up @@ -207,21 +186,52 @@ func (f *Filter) blockLogs(header *ethtypes.Header) ([]*ethtypes.Log, error) {
return logs, nil
}

// checkMatches checks if the logs from the a list of transactions transaction
// contain any log events that match the filter criteria. This function is
// called when the bloom filter signals a potential match.
func (f *Filter) checkMatches(transactions []common.Hash) []*ethtypes.Log {
unfiltered := []*ethtypes.Log{}
for _, tx := range transactions {
logs, err := f.backend.GetTransactionLogs(tx)
if err != nil {
// ignore error if transaction didn't set any logs (eg: when tx type is not
// MsgEthereumTx or MsgEthermint)
func createBloomFilters(filters [][][]byte) [][]BloomIV {
bloomFilters := make([][]BloomIV, 0)
for _, filter := range filters {
// Gather the bit indexes of the filter rule, special casing the nil filter
if len(filter) == 0 {
continue
}
bloomIVs := make([]BloomIV, len(filter))
for i, clause := range filter {
if clause == nil {
bloomIVs = nil
break
}
bloomIVs[i] = calcBloomIVs(clause)
}
// Accumulate the filter rules if no nil rule was within
if bloomIVs != nil {
bloomFilters = append(bloomFilters, bloomIVs)
}
}
return bloomFilters
}

unfiltered = append(unfiltered, logs...)
// calcBloomIVs returns BloomIV for the given data,
// revised from https://github.com/ethereum/go-ethereum/blob/401354976bb44f0ad4455ca1e0b5c0dc31d9a5f5/core/types/bloom9.go#L139
func calcBloomIVs(data []byte) BloomIV {
hashbuf := make([]byte, 6)
biv := BloomIV{}

sha := crypto.NewKeccakState()
sha.Reset()
if _, err := sha.Write(data); err != nil {
panic(err)
}
if _, err := sha.Read(hashbuf); err != nil {
panic(err)
}

// The actual bits to flip
biv.V[0] = byte(1 << (hashbuf[1] & 0x7))
biv.V[1] = byte(1 << (hashbuf[3] & 0x7))
biv.V[2] = byte(1 << (hashbuf[5] & 0x7))
// The indices for the bytes to OR in
biv.I[0] = ethtypes.BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf)&0x7ff)>>3) - 1
biv.I[1] = ethtypes.BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf[2:])&0x7ff)>>3) - 1
biv.I[2] = ethtypes.BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf[4:])&0x7ff)>>3) - 1

return FilterLogs(unfiltered, f.criteria.FromBlock, f.criteria.ToBlock, f.criteria.Addresses, f.criteria.Topics)
return biv
}
38 changes: 36 additions & 2 deletions tests/rpc/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ const (
)

var (
MODE = os.Getenv("MODE")

MODE = os.Getenv("MODE")
zeroString = "0x0"
from = []byte{}
)
Expand Down Expand Up @@ -911,3 +910,38 @@ func TestEth_GetBlockByNumber(t *testing.T) {
require.Equal(t, "0x", block["extraData"].(string))
require.Equal(t, []interface{}{}, block["uncles"].([]interface{}))
}

func TestEth_GetLogs(t *testing.T) {
time.Sleep(time.Second)

rpcRes := call(t, "eth_blockNumber", []string{})

var res hexutil.Uint64
err := res.UnmarshalJSON(rpcRes.Result)
require.NoError(t, err)

param := make([]map[string]interface{}, 1)
param[0] = make(map[string]interface{})
param[0]["topics"] = []string{helloTopic, worldTopic}
param[0]["fromBlock"] = res.String()

deployTestContractWithFunction(t)

// get filter changes
logRes := call(t, "eth_getLogs", param)

var logs []*ethtypes.Log
err = json.Unmarshal(logRes.Result, &logs)
require.NoError(t, err)

require.Equal(t, 1, len(logs))

// filter log with address
param[0] = make(map[string]interface{})
param[0]["address"] = "0x" + fmt.Sprintf("%x", from)
param[0]["fromBlock"] = res.String()
err = json.Unmarshal(logRes.Result, &logs)
require.NoError(t, err)

require.Equal(t, 1, len(logs))
}