Skip to content
This repository has been archived by the owner on Apr 4, 2024. It is now read-only.

apply bloom filter when query ethlogs with range of blocks #587

Merged
merged 7 commits into from
Sep 27, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions ethereum/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
ethtypes "github.com/ethereum/go-ethereum/core/types"

"github.com/tharsis/ethermint/ethereum/rpc/namespaces/eth/filters"
"github.com/tharsis/ethermint/ethereum/rpc/types"
"github.com/tharsis/ethermint/server/config"
ethermint "github.com/tharsis/ethermint/types"
Expand Down Expand Up @@ -57,6 +58,7 @@ type Backend interface {
EstimateGas(args evmtypes.CallArgs, blockNrOptional *types.BlockNumber) (hexutil.Uint64, error)
RPCGasCap() uint64
RPCMinGasPrice() int64
GetFilteredBlocks(from int64, to int64, filter [][]filters.BloomIV, filterAddresses bool) ([]int64, error)
}

var _ Backend = (*EVMBackend)(nil)
Expand Down Expand Up @@ -713,3 +715,65 @@ func (e *EVMBackend) RPCMinGasPrice() int64 {

return ethermint.DefaultGasPrice
}

// GetFilteredBlocks returns the block height list match the given bloom filters.
func (e *EVMBackend) GetFilteredBlocks(
from int64,
to int64,
filters [][]filters.BloomIV,
filterAddresses bool,
) ([]int64, error) {
matchedBlocks := make([]int64, 0)

BLOCKS:
for height := from; height <= to; height++ {
if err := e.ctx.Err(); err != nil {
e.logger.Error("EVMBackend context error", "err", err)
return nil, err
}

h := height
bloom, err := e.BlockBloom(&h)
if err != nil {
e.logger.Error("retrieve header failed", "blockHeight", height, "err", err)
return nil, err
}

for i, filter := range filters {
// filter the header bloom with the addresses
if filterAddresses && i == 0 {
if !checkMatches(bloom, filter) {
continue BLOCKS
}

// the filter doesn't have any topics
if len(filters) == 1 {
matchedBlocks = append(matchedBlocks, height)
continue BLOCKS
}
continue
}

// filter the bloom with topics
if len(filter) > 0 && !checkMatches(bloom, filter) {
continue BLOCKS
}
}
matchedBlocks = append(matchedBlocks, height)
}

return matchedBlocks, nil
}

// checkMatches revised the function from
// https://github.com/ethereum/go-ethereum/blob/401354976bb44f0ad4455ca1e0b5c0dc31d9a5f5/core/types/bloom9.go#L88
func checkMatches(bloom ethtypes.Bloom, filter []filters.BloomIV) bool {
for _, bloomIV := range filter {
if bloomIV.V[0] == bloomIV.V[0]&bloom[bloomIV.I[0]] &&
bloomIV.V[1] == bloomIV.V[1]&bloom[bloomIV.I[1]] &&
bloomIV.V[2] == bloomIV.V[2]&bloom[bloomIV.I[2]] {
return true
}
}
return false
}
2 changes: 2 additions & 0 deletions ethereum/rpc/namespaces/eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Backend interface {

GetTransactionLogs(txHash common.Hash) ([]*ethtypes.Log, error)
BloomStatus() (uint64, uint64)

GetFilteredBlocks(from int64, to int64, bloomIndexes [][]BloomIV, filterAddresses bool) ([]int64, error)
}

// consider a filter inactive if it has not been polled for within deadline
Expand Down
136 changes: 73 additions & 63 deletions ethereum/rpc/namespaces/eth/filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package filters

import (
"context"
"fmt"
"encoding/binary"
"math/big"

"github.com/tharsis/ethermint/ethereum/rpc/types"
Expand All @@ -11,17 +11,25 @@ import (
"github.com/tendermint/tendermint/libs/log"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/bloombits"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/filters"
)

// BloomIV represents the bit indexes and value inside the bloom filter that belong
// to some key.
type BloomIV struct {
I [3]uint
V [3]byte
}

// Filter can be used to retrieve and filter logs.
type Filter struct {
logger log.Logger
backend Backend
criteria filters.FilterCriteria
matcher *bloombits.Matcher

bloomFilters [][]BloomIV // Filter the system is matching for
}

// NewBlockFilter creates a new filter which directly inspects the contents of
Expand Down Expand Up @@ -54,8 +62,6 @@ func NewRangeFilter(logger log.Logger, backend Backend, begin, end int64, addres
filtersBz = append(filtersBz, filter)
}

size, _ := backend.BloomStatus()

// Create a generic filter and convert it into a range filter
criteria := filters.FilterCriteria{
FromBlock: big.NewInt(begin),
Expand All @@ -64,16 +70,16 @@ func NewRangeFilter(logger log.Logger, backend Backend, begin, end int64, addres
Topics: topics,
}

return newFilter(logger, backend, criteria, bloombits.NewMatcher(size, filtersBz))
return newFilter(logger, backend, criteria, createBloomFilters(filtersBz))
}

// newFilter returns a new Filter
func newFilter(logger log.Logger, backend Backend, criteria filters.FilterCriteria, matcher *bloombits.Matcher) *Filter {
func newFilter(logger log.Logger, backend Backend, criteria filters.FilterCriteria, bloomFilters [][]BloomIV) *Filter {
return &Filter{
logger: logger,
backend: backend,
criteria: criteria,
matcher: matcher,
logger: logger,
backend: backend,
criteria: criteria,
bloomFilters: bloomFilters,
}
}

Expand Down Expand Up @@ -132,52 +138,25 @@ func (f *Filter) Logs(_ context.Context) ([]*ethtypes.Log, error) {
f.criteria.ToBlock = big.NewInt(head + maxToOverhang)
}

for i := f.criteria.FromBlock.Int64(); i <= f.criteria.ToBlock.Int64(); i++ {
block, err := f.backend.GetBlockByNumber(types.BlockNumber(i), false)
if err != nil {
return logs, errors.Wrapf(err, "failed to fetch block by number %d", i)
}

if block["transactions"] == nil {
continue
}

var txHashes []common.Hash

txs, ok := block["transactions"].([]interface{})
if !ok {
_, ok = block["transactions"].([]common.Hash)
if !ok {
f.logger.Error(
"reading transactions from block data failed",
"type", fmt.Sprintf("%T", block["transactions"]),
)
}
from := f.criteria.FromBlock.Int64()
to := f.criteria.ToBlock.Int64()

continue
}
blocks, err := f.backend.GetFilteredBlocks(from, to, f.bloomFilters, len(f.criteria.Addresses) > 0)
if err != nil {
return nil, err
}

if len(txs) == 0 {
continue
for _, height := range blocks {
ethLogs, err := f.backend.GetLogsByNumber(types.BlockNumber(height))
if err != nil {
return logs, errors.Wrapf(err, "failed to fetch block by number %d", height)
}

for _, tx := range txs {
txHash, ok := tx.(common.Hash)
if !ok {
f.logger.Error(
"transactions list contains non-hash element",
"type", fmt.Sprintf("%T", tx),
)
continue
}

txHashes = append(txHashes, txHash)
for _, ethLog := range ethLogs {
filtered := FilterLogs(ethLog, f.criteria.FromBlock, f.criteria.ToBlock, f.criteria.Addresses, f.criteria.Topics)
logs = append(logs, filtered...)
}

logsMatched := f.checkMatches(txHashes)
logs = append(logs, logsMatched...)
}

return logs, nil
}

Expand Down Expand Up @@ -207,21 +186,52 @@ func (f *Filter) blockLogs(header *ethtypes.Header) ([]*ethtypes.Log, error) {
return logs, nil
}

// checkMatches checks if the logs from the a list of transactions transaction
// contain any log events that match the filter criteria. This function is
// called when the bloom filter signals a potential match.
func (f *Filter) checkMatches(transactions []common.Hash) []*ethtypes.Log {
unfiltered := []*ethtypes.Log{}
for _, tx := range transactions {
logs, err := f.backend.GetTransactionLogs(tx)
if err != nil {
// ignore error if transaction didn't set any logs (eg: when tx type is not
// MsgEthereumTx or MsgEthermint)
func createBloomFilters(filters [][][]byte) [][]BloomIV {
bloomFilters := make([][]BloomIV, 0)
for _, filter := range filters {
// Gather the bit indexes of the filter rule, special casing the nil filter
if len(filter) == 0 {
continue
}
bloomIVs := make([]BloomIV, len(filter))
for i, clause := range filter {
if clause == nil {
bloomIVs = nil
break
}
bloomIVs[i] = calcBloomIVs(clause)
JayT106 marked this conversation as resolved.
Show resolved Hide resolved
}
// Accumulate the filter rules if no nil rule was within
if bloomIVs != nil {
bloomFilters = append(bloomFilters, bloomIVs)
}
}
return bloomFilters
}

unfiltered = append(unfiltered, logs...)
// calcBloomIVs returns BloomIV for the given data,
// revised from https://github.com/ethereum/go-ethereum/blob/401354976bb44f0ad4455ca1e0b5c0dc31d9a5f5/core/types/bloom9.go#L139
func calcBloomIVs(data []byte) BloomIV {
hashbuf := make([]byte, 6)
biv := BloomIV{}

sha := crypto.NewKeccakState()
sha.Reset()
if _, err := sha.Write(data); err != nil {
panic(err)
}
if _, err := sha.Read(hashbuf); err != nil {
panic(err)
fedekunze marked this conversation as resolved.
Show resolved Hide resolved
}

// The actual bits to flip
biv.V[0] = byte(1 << (hashbuf[1] & 0x7))
biv.V[1] = byte(1 << (hashbuf[3] & 0x7))
biv.V[2] = byte(1 << (hashbuf[5] & 0x7))
// The indices for the bytes to OR in
biv.I[0] = ethtypes.BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf)&0x7ff)>>3) - 1
biv.I[1] = ethtypes.BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf[2:])&0x7ff)>>3) - 1
biv.I[2] = ethtypes.BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf[4:])&0x7ff)>>3) - 1

return FilterLogs(unfiltered, f.criteria.FromBlock, f.criteria.ToBlock, f.criteria.Addresses, f.criteria.Topics)
return biv
}
38 changes: 36 additions & 2 deletions tests/rpc/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ const (
)

var (
MODE = os.Getenv("MODE")

MODE = os.Getenv("MODE")
zeroString = "0x0"
from = []byte{}
)
Expand Down Expand Up @@ -911,3 +910,38 @@ func TestEth_GetBlockByNumber(t *testing.T) {
require.Equal(t, "0x", block["extraData"].(string))
require.Equal(t, []interface{}{}, block["uncles"].([]interface{}))
}

func TestEth_GetLogs(t *testing.T) {
time.Sleep(time.Second)

rpcRes := call(t, "eth_blockNumber", []string{})

var res hexutil.Uint64
err := res.UnmarshalJSON(rpcRes.Result)
require.NoError(t, err)

param := make([]map[string]interface{}, 1)
param[0] = make(map[string]interface{})
param[0]["topics"] = []string{helloTopic, worldTopic}
param[0]["fromBlock"] = res.String()

deployTestContractWithFunction(t)

// get filter changes
logRes := call(t, "eth_getLogs", param)

var logs []*ethtypes.Log
err = json.Unmarshal(logRes.Result, &logs)
require.NoError(t, err)

require.Equal(t, 1, len(logs))

// filter log with address
param[0] = make(map[string]interface{})
param[0]["address"] = "0x" + fmt.Sprintf("%x", from)
param[0]["fromBlock"] = res.String()
err = json.Unmarshal(logRes.Result, &logs)
require.NoError(t, err)

require.Equal(t, 1, len(logs))
}