Skip to content
This repository has been archived by the owner on Apr 4, 2024. It is now read-only.

apply bloom filter when query ethlogs with range of blocks #587

Merged
merged 7 commits into from
Sep 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Ref: https://keepachangelog.com/en/1.0.0/

* (evm) [tharsis#461](https://github.com/tharsis/ethermint/pull/461) Increase performance of `StateDB` transaction log storage (r/w).
* (evm) [tharsis#566](https://github.com/tharsis/ethermint/pull/566) Introduce `stateErr` store in `StateDB` to avoid meaningless operations if any error happened before
* (rpc, evm) [tharsis#587](https://github.com/tharsis/ethermint/pull/587) Apply bloom filter when query ethlogs with range of blocks
* (evm) [tharsis#586](https://github.com/tharsis/ethermint/pull/586) Benchmark evm keeper

## [v0.5.0] - 2021-08-20
Expand Down
64 changes: 64 additions & 0 deletions ethereum/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
ethtypes "github.com/ethereum/go-ethereum/core/types"

"github.com/tharsis/ethermint/ethereum/rpc/namespaces/eth/filters"
"github.com/tharsis/ethermint/ethereum/rpc/types"
"github.com/tharsis/ethermint/server/config"
ethermint "github.com/tharsis/ethermint/types"
Expand Down Expand Up @@ -57,6 +58,7 @@ type Backend interface {
EstimateGas(args evmtypes.CallArgs, blockNrOptional *types.BlockNumber) (hexutil.Uint64, error)
RPCGasCap() uint64
RPCMinGasPrice() int64
GetFilteredBlocks(from int64, to int64, filter [][]filters.BloomIV, filterAddresses bool) ([]int64, error)
}

var _ Backend = (*EVMBackend)(nil)
Expand Down Expand Up @@ -713,3 +715,65 @@ func (e *EVMBackend) RPCMinGasPrice() int64 {

return ethermint.DefaultGasPrice
}

// GetFilteredBlocks returns the block height list match the given bloom filters.
func (e *EVMBackend) GetFilteredBlocks(
from int64,
to int64,
filters [][]filters.BloomIV,
filterAddresses bool,
) ([]int64, error) {
matchedBlocks := make([]int64, 0)

BLOCKS:
for height := from; height <= to; height++ {
if err := e.ctx.Err(); err != nil {
e.logger.Error("EVMBackend context error", "err", err)
return nil, err
}

h := height
bloom, err := e.BlockBloom(&h)
if err != nil {
e.logger.Error("retrieve header failed", "blockHeight", height, "err", err)
return nil, err
}

for i, filter := range filters {
// filter the header bloom with the addresses
if filterAddresses && i == 0 {
if !checkMatches(bloom, filter) {
continue BLOCKS
}

// the filter doesn't have any topics
if len(filters) == 1 {
matchedBlocks = append(matchedBlocks, height)
continue BLOCKS
}
continue
}

// filter the bloom with topics
if len(filter) > 0 && !checkMatches(bloom, filter) {
continue BLOCKS
}
}
matchedBlocks = append(matchedBlocks, height)
}

return matchedBlocks, nil
}

// checkMatches revised the function from
// https://github.com/ethereum/go-ethereum/blob/401354976bb44f0ad4455ca1e0b5c0dc31d9a5f5/core/types/bloom9.go#L88
func checkMatches(bloom ethtypes.Bloom, filter []filters.BloomIV) bool {
for _, bloomIV := range filter {
if bloomIV.V[0] == bloomIV.V[0]&bloom[bloomIV.I[0]] &&
bloomIV.V[1] == bloomIV.V[1]&bloom[bloomIV.I[1]] &&
bloomIV.V[2] == bloomIV.V[2]&bloom[bloomIV.I[2]] {
return true
}
}
return false
}
2 changes: 2 additions & 0 deletions ethereum/rpc/namespaces/eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Backend interface {

GetTransactionLogs(txHash common.Hash) ([]*ethtypes.Log, error)
BloomStatus() (uint64, uint64)

GetFilteredBlocks(from int64, to int64, bloomIndexes [][]BloomIV, filterAddresses bool) ([]int64, error)
}

// consider a filter inactive if it has not been polled for within deadline
Expand Down
148 changes: 85 additions & 63 deletions ethereum/rpc/namespaces/eth/filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package filters

import (
"context"
"fmt"
"encoding/binary"
"math/big"

"github.com/tharsis/ethermint/ethereum/rpc/types"
Expand All @@ -11,17 +11,25 @@ import (
"github.com/tendermint/tendermint/libs/log"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/bloombits"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/filters"
)

// BloomIV represents the bit indexes and value inside the bloom filter that belong
// to some key.
type BloomIV struct {
I [3]uint
V [3]byte
}

// Filter can be used to retrieve and filter logs.
type Filter struct {
logger log.Logger
backend Backend
criteria filters.FilterCriteria
matcher *bloombits.Matcher

bloomFilters [][]BloomIV // Filter the system is matching for
}

// NewBlockFilter creates a new filter which directly inspects the contents of
Expand Down Expand Up @@ -54,8 +62,6 @@ func NewRangeFilter(logger log.Logger, backend Backend, begin, end int64, addres
filtersBz = append(filtersBz, filter)
}

size, _ := backend.BloomStatus()

// Create a generic filter and convert it into a range filter
criteria := filters.FilterCriteria{
FromBlock: big.NewInt(begin),
Expand All @@ -64,16 +70,16 @@ func NewRangeFilter(logger log.Logger, backend Backend, begin, end int64, addres
Topics: topics,
}

return newFilter(logger, backend, criteria, bloombits.NewMatcher(size, filtersBz))
return newFilter(logger, backend, criteria, createBloomFilters(filtersBz, logger))
}

// newFilter returns a new Filter
func newFilter(logger log.Logger, backend Backend, criteria filters.FilterCriteria, matcher *bloombits.Matcher) *Filter {
func newFilter(logger log.Logger, backend Backend, criteria filters.FilterCriteria, bloomFilters [][]BloomIV) *Filter {
return &Filter{
logger: logger,
backend: backend,
criteria: criteria,
matcher: matcher,
logger: logger,
backend: backend,
criteria: criteria,
bloomFilters: bloomFilters,
}
}

Expand Down Expand Up @@ -132,52 +138,25 @@ func (f *Filter) Logs(_ context.Context) ([]*ethtypes.Log, error) {
f.criteria.ToBlock = big.NewInt(head + maxToOverhang)
}

for i := f.criteria.FromBlock.Int64(); i <= f.criteria.ToBlock.Int64(); i++ {
block, err := f.backend.GetBlockByNumber(types.BlockNumber(i), false)
if err != nil {
return logs, errors.Wrapf(err, "failed to fetch block by number %d", i)
}

if block["transactions"] == nil {
continue
}

var txHashes []common.Hash
from := f.criteria.FromBlock.Int64()
to := f.criteria.ToBlock.Int64()

txs, ok := block["transactions"].([]interface{})
if !ok {
_, ok = block["transactions"].([]common.Hash)
if !ok {
f.logger.Error(
"reading transactions from block data failed",
"type", fmt.Sprintf("%T", block["transactions"]),
)
}
blocks, err := f.backend.GetFilteredBlocks(from, to, f.bloomFilters, len(f.criteria.Addresses) > 0)
if err != nil {
return nil, err
}

continue
for _, height := range blocks {
ethLogs, err := f.backend.GetLogsByNumber(types.BlockNumber(height))
if err != nil {
return logs, errors.Wrapf(err, "failed to fetch block by number %d", height)
}

if len(txs) == 0 {
continue
for _, ethLog := range ethLogs {
filtered := FilterLogs(ethLog, f.criteria.FromBlock, f.criteria.ToBlock, f.criteria.Addresses, f.criteria.Topics)
logs = append(logs, filtered...)
}

for _, tx := range txs {
txHash, ok := tx.(common.Hash)
if !ok {
f.logger.Error(
"transactions list contains non-hash element",
"type", fmt.Sprintf("%T", tx),
)
continue
}

txHashes = append(txHashes, txHash)
}

logsMatched := f.checkMatches(txHashes)
logs = append(logs, logsMatched...)
}

return logs, nil
}

Expand Down Expand Up @@ -207,21 +186,64 @@ func (f *Filter) blockLogs(header *ethtypes.Header) ([]*ethtypes.Log, error) {
return logs, nil
}

// checkMatches checks if the logs from the a list of transactions transaction
// contain any log events that match the filter criteria. This function is
// called when the bloom filter signals a potential match.
func (f *Filter) checkMatches(transactions []common.Hash) []*ethtypes.Log {
unfiltered := []*ethtypes.Log{}
for _, tx := range transactions {
logs, err := f.backend.GetTransactionLogs(tx)
if err != nil {
// ignore error if transaction didn't set any logs (eg: when tx type is not
// MsgEthereumTx or MsgEthermint)
func createBloomFilters(filters [][][]byte, logger log.Logger) [][]BloomIV {
bloomFilters := make([][]BloomIV, 0)
for _, filter := range filters {
// Gather the bit indexes of the filter rule, special casing the nil filter
if len(filter) == 0 {
continue
}
bloomIVs := make([]BloomIV, len(filter))

// Transform the filter rules (the addresses and topics) to the bloom index and value arrays
// So it can be used to compare with the bloom of the block header. If the rule has any nil
// clauses. The rule will be ignored.
for i, clause := range filter {
if clause == nil {
bloomIVs = nil
break
}

unfiltered = append(unfiltered, logs...)
iv, err := calcBloomIVs(clause)
if err != nil {
bloomIVs = nil
logger.Error("calcBloomIVs error: %v", err)
break
}

bloomIVs[i] = iv
}
// Accumulate the filter rules if no nil rule was within
if bloomIVs != nil {
bloomFilters = append(bloomFilters, bloomIVs)
}
}
return bloomFilters
}

// calcBloomIVs returns BloomIV for the given data,
// revised from https://github.com/ethereum/go-ethereum/blob/401354976bb44f0ad4455ca1e0b5c0dc31d9a5f5/core/types/bloom9.go#L139
func calcBloomIVs(data []byte) (BloomIV, error) {
hashbuf := make([]byte, 6)
biv := BloomIV{}

sha := crypto.NewKeccakState()
sha.Reset()
if _, err := sha.Write(data); err != nil {
return BloomIV{}, err
}
if _, err := sha.Read(hashbuf); err != nil {
return BloomIV{}, err
}

// The actual bits to flip
biv.V[0] = byte(1 << (hashbuf[1] & 0x7))
biv.V[1] = byte(1 << (hashbuf[3] & 0x7))
biv.V[2] = byte(1 << (hashbuf[5] & 0x7))
// The indices for the bytes to OR in
biv.I[0] = ethtypes.BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf)&0x7ff)>>3) - 1
biv.I[1] = ethtypes.BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf[2:])&0x7ff)>>3) - 1
biv.I[2] = ethtypes.BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf[4:])&0x7ff)>>3) - 1

return FilterLogs(unfiltered, f.criteria.FromBlock, f.criteria.ToBlock, f.criteria.Addresses, f.criteria.Topics)
return biv, nil
}
38 changes: 36 additions & 2 deletions tests/rpc/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ const (
)

var (
MODE = os.Getenv("MODE")

MODE = os.Getenv("MODE")
zeroString = "0x0"
from = []byte{}
)
Expand Down Expand Up @@ -911,3 +910,38 @@ func TestEth_GetBlockByNumber(t *testing.T) {
require.Equal(t, "0x", block["extraData"].(string))
require.Equal(t, []interface{}{}, block["uncles"].([]interface{}))
}

func TestEth_GetLogs(t *testing.T) {
time.Sleep(time.Second)

rpcRes := call(t, "eth_blockNumber", []string{})

var res hexutil.Uint64
err := res.UnmarshalJSON(rpcRes.Result)
require.NoError(t, err)

param := make([]map[string]interface{}, 1)
param[0] = make(map[string]interface{})
param[0]["topics"] = []string{helloTopic, worldTopic}
param[0]["fromBlock"] = res.String()

deployTestContractWithFunction(t)

// get filter changes
logRes := call(t, "eth_getLogs", param)

var logs []*ethtypes.Log
err = json.Unmarshal(logRes.Result, &logs)
require.NoError(t, err)

require.Equal(t, 1, len(logs))

// filter log with address
param[0] = make(map[string]interface{})
param[0]["address"] = "0x" + fmt.Sprintf("%x", from)
param[0]["fromBlock"] = res.String()
err = json.Unmarshal(logRes.Result, &logs)
require.NoError(t, err)

require.Equal(t, 1, len(logs))
}