Skip to content

Commit

Permalink
eth/filters: derive log fields after filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
s1na committed Dec 13, 2022
1 parent 250a80a commit 68c6bb3
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 96 deletions.
7 changes: 7 additions & 0 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,13 @@ func (fb *filterBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*t
return fb.bc.GetHeaderByHash(hash), nil
}

func (fb *filterBackend) GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) {
if body := fb.bc.GetBody(hash); body != nil {
return body, nil
}
return nil, errors.New("block body not found")
}

func (fb *filterBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
return fb.backend.pendingBlock, fb.backend.pendingReceipts
}
Expand Down
15 changes: 3 additions & 12 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,9 +714,9 @@ func deriveLogFields(receipts []*receiptLogs, hash common.Hash, number uint64, t
return nil
}

// ReadLogs retrieves the logs for all transactions in a block. The log fields
// are populated with metadata. In case the receipts or the block body
// are not found, a nil is returned.
// ReadLogs retrieves the logs for all transactions in a block. In case
// receipts is not found, a nil is returned.
// Note: ReadLogs does not derive unstored log fields.
func ReadLogs(db ethdb.Reader, hash common.Hash, number uint64, config *params.ChainConfig) [][]*types.Log {
// Retrieve the flattened receipt slice
data := ReadReceiptsRLP(db, hash, number)
Expand All @@ -729,15 +729,6 @@ func ReadLogs(db ethdb.Reader, hash common.Hash, number uint64, config *params.C
return nil
}

body := ReadBody(db, hash, number)
if body == nil {
log.Error("Missing body but have receipt", "hash", hash, "number", number)
return nil
}
if err := deriveLogFields(receipts, hash, number, body.Transactions); err != nil {
log.Error("Failed to derive block receipts fields", "hash", hash, "number", number, "err", err)
return nil
}
logs := make([][]*types.Log, len(receipts))
for i, receipt := range receipts {
logs[i] = receipt.Logs
Expand Down
4 changes: 0 additions & 4 deletions core/rawdb/accessors_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,10 +750,6 @@ func TestReadLogs(t *testing.T) {
t.Fatalf("unexpected number of logs[1] returned, have %d want %d", have, want)
}

// Fill in log fields so we can compare their rlp encoding
if err := types.Receipts(receipts).DeriveFields(params.TestChainConfig, hash, 0, body.Transactions); err != nil {
t.Fatal(err)
}
for i, pr := range receipts {
for j, pl := range pr.Logs {
rlpHave, err := rlp.EncodeToBytes(newFullLogRLP(logs[i][j]))
Expand Down
11 changes: 11 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ func (b *EthAPIBackend) BlockByHash(ctx context.Context, hash common.Hash) (*typ
return b.eth.blockchain.GetBlockByHash(hash), nil
}

// GetBody returns body of a block. It does not resolve special block numbers.
func (b *EthAPIBackend) GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) {
if number < 0 || hash == (common.Hash{}) {
return nil, errors.New("invalid arguments; expect hash and no special block numbers")
}
if body := b.eth.blockchain.GetBody(hash); body != nil {
return body, nil
}
return nil, errors.New("block body not found")
}

func (b *EthAPIBackend) BlockByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Block, error) {
if blockNr, ok := blockNrOrHash.Number(); ok {
return b.BlockByNumber(ctx, blockNr)
Expand Down
68 changes: 31 additions & 37 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
if header == nil {
return nil, errors.New("unknown block")
}
return f.blockLogs(ctx, header, false)
return f.blockLogs(ctx, header)
}
// Short-cut if all we care about is pending logs
if f.begin == rpc.PendingBlockNumber.Int64() {
Expand Down Expand Up @@ -216,7 +216,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err
if header == nil || err != nil {
return logs, err
}
found, err := f.blockLogs(ctx, header, true)
found, err := f.blockLogs(ctx, header)
if err != nil {
return logs, err
}
Expand All @@ -238,7 +238,7 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e
if header == nil || err != nil {
return logs, err
}
found, err := f.blockLogs(ctx, header, false)
found, err := f.blockLogs(ctx, header)
if err != nil {
return logs, err
}
Expand All @@ -248,46 +248,48 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e
}

// blockLogs returns the logs matching the filter criteria within a single block.
func (f *Filter) blockLogs(ctx context.Context, header *types.Header, skipBloom bool) ([]*types.Log, error) {
// Fast track: no filtering criteria
if len(f.addresses) == 0 && len(f.topics) == 0 {
list, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
if err != nil {
return nil, err
}
return flatten(list), nil
} else if skipBloom || bloomFilter(header.Bloom, f.addresses, f.topics) {
func (f *Filter) blockLogs(ctx context.Context, header *types.Header) ([]*types.Log, error) {
skipFilter := len(f.addresses) == 0 && len(f.topics) == 0
if skipFilter || bloomFilter(header.Bloom, f.addresses, f.topics) {
return f.checkMatches(ctx, header)
}
return nil, nil
}

// checkMatches checks if the receipts belonging to the given header contain any log events that
// match the filter criteria. This function is called when the bloom filter signals a potential match.
// skipFilter signals all logs of the given block are requested.
func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*types.Log, error) {
logsList, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
hash := header.Hash()
// Logs in cache are partially filled with context data
// such as tx index, block hash, etc.
// Notably tx hash is NOT filled in because it needs
// access to block body data.
cached, err := f.sys.cachedLogElem(ctx, header.Hash(), header.Number.Uint64())
if err != nil {
return nil, err
}

unfiltered := flatten(logsList)
unfiltered := cached.logs
logs := filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
if len(logs) > 0 {
// We have matching logs, check if we need to resolve full logs via the light client
if logs[0].TxHash == (common.Hash{}) {
receipts, err := f.sys.backend.GetReceipts(ctx, header.Hash())
if err != nil {
return nil, err
}
unfiltered = unfiltered[:0]
for _, receipt := range receipts {
unfiltered = append(unfiltered, receipt.Logs...)
}
logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
}
if len(logs) == 0 {
return nil, nil
}
// Most backends will deliver un-derived logs, but check nevertheless.
if len(logs) > 0 && logs[0].TxHash != (common.Hash{}) {
return logs, nil
}
return nil, nil
// Only resolve
body, err := f.sys.cachedGetBody(ctx, cached, hash, header.Number.Uint64())
if err != nil {
return nil, err
}
for i, log := range logs {
// Copy log not to modify cache elements
logcopy := *log
logcopy.TxHash = body.Transactions[logcopy.TxIndex].Hash()
logs[i] = &logcopy
}
return logs, nil
}

// pendingLogs returns the logs matching the filter criteria within the pending block.
Expand Down Expand Up @@ -377,11 +379,3 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo
}
return true
}

func flatten(list [][]*types.Log) []*types.Log {
var flat []*types.Log
for _, logs := range list {
flat = append(flat, logs...)
}
return flat
}
111 changes: 71 additions & 40 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum"
Expand Down Expand Up @@ -58,6 +59,7 @@ type Backend interface {
ChainDb() ethdb.Database
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error)
PendingBlockAndReceipts() (*types.Block, types.Receipts)
Expand All @@ -77,7 +79,7 @@ type Backend interface {
// FilterSystem holds resources shared by all filters.
type FilterSystem struct {
backend Backend
logsCache *lru.Cache[common.Hash, [][]*types.Log]
logsCache *lru.Cache[common.Hash, *logCacheElem]
cfg *Config
}

Expand All @@ -86,13 +88,18 @@ func NewFilterSystem(backend Backend, config Config) *FilterSystem {
config = config.withDefaults()
return &FilterSystem{
backend: backend,
logsCache: lru.NewCache[common.Hash, [][]*types.Log](config.LogCacheSize),
logsCache: lru.NewCache[common.Hash, *logCacheElem](config.LogCacheSize),
cfg: &config,
}
}

// cachedGetLogs loads block logs from the backend and caches the result.
func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) {
type logCacheElem struct {
logs []*types.Log
body atomic.Pointer[types.Body]
}

// cachedLogElem loads block logs from the backend and caches the result.
func (sys *FilterSystem) cachedLogElem(ctx context.Context, blockHash common.Hash, number uint64) (*logCacheElem, error) {
cached, ok := sys.logsCache.Get(blockHash)
if ok {
return cached, nil
Expand All @@ -105,8 +112,35 @@ func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Has
if logs == nil {
return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", number, blockHash.TerminalString())
}
sys.logsCache.Add(blockHash, logs)
return logs, nil
// Database logs are un-derived.
// Fill in whatever we can (txHash is inaccessible at this point).
flattened := make([]*types.Log, 0)
var logIdx uint
for i, txLogs := range logs {
for _, log := range txLogs {
log.BlockHash = blockHash
log.BlockNumber = number
log.TxIndex = uint(i)
log.Index = logIdx
logIdx++
flattened = append(flattened, log)
}
}
elem := &logCacheElem{logs: flattened}
sys.logsCache.Add(blockHash, elem)
return elem, nil
}

func (sys *FilterSystem) cachedGetBody(ctx context.Context, elem *logCacheElem, hash common.Hash, number uint64) (*types.Body, error) {
if body := elem.body.Load(); body != nil {
return body, nil
}
body, err := sys.backend.GetBody(ctx, hash, rpc.BlockNumber(number))
if err != nil {
return nil, err
}
elem.body.Store(body)
return body, nil
}

// Type determines the kind of filter and is used to put the filter in to
Expand Down Expand Up @@ -474,42 +508,39 @@ func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func

// filter logs of a single header in light client mode
func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log {
if bloomFilter(header.Bloom, addresses, topics) {
// Get the logs of the block
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
logsList, err := es.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
if err != nil {
return nil
}
var unfiltered []*types.Log
for _, logs := range logsList {
for _, log := range logs {
logcopy := *log
logcopy.Removed = remove
unfiltered = append(unfiltered, &logcopy)
}
}
logs := filterLogs(unfiltered, nil, nil, addresses, topics)
if len(logs) > 0 && logs[0].TxHash == (common.Hash{}) {
// We have matching but non-derived logs
receipts, err := es.backend.GetReceipts(ctx, header.Hash())
if err != nil {
return nil
}
unfiltered = unfiltered[:0]
for _, receipt := range receipts {
for _, log := range receipt.Logs {
logcopy := *log
logcopy.Removed = remove
unfiltered = append(unfiltered, &logcopy)
}
}
logs = filterLogs(unfiltered, nil, nil, addresses, topics)
}
if !bloomFilter(header.Bloom, addresses, topics) {
return nil
}
// Get the logs of the block
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
cached, err := es.sys.cachedLogElem(ctx, header.Hash(), header.Number.Uint64())
unfiltered := cached.logs
if err != nil {
return nil
}
for i, log := range unfiltered {
// Don't modify in-cache elements
logcopy := *log
logcopy.Removed = remove
// Swap copy in-place
unfiltered[i] = &logcopy
}
logs := filterLogs(unfiltered, nil, nil, addresses, topics)
// Txhash is already resolved
if len(logs) > 0 && logs[0].TxHash != (common.Hash{}) {
return logs
}
return nil
// Resolve txhash
body, err := es.sys.cachedGetBody(ctx, cached, header.Hash(), header.Number.Uint64())
if err != nil {
return nil
}
for _, log := range logs {
// logs are already copied, safe to modify
log.TxHash = body.Transactions[log.TxIndex].Hash()
}
return logs
}

// eventLoop (un)installs filters and processes mux events.
Expand Down
7 changes: 7 additions & 0 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ func (b *testBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*type
return rawdb.ReadHeader(b.db, hash, *number), nil
}

func (b *testBackend) GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) {
if body := rawdb.ReadBody(b.db, hash, uint64(number)); body != nil {
return body, nil
}
return nil, errors.New("block body not found")
}

func (b *testBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) {
if number := rawdb.ReadHeaderNumber(b.db, hash); number != nil {
return rawdb.ReadReceipts(b.db, hash, *number, params.TestChainConfig), nil
Expand Down
1 change: 1 addition & 0 deletions internal/ethapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type Backend interface {
// This is copied from filters.Backend
// eth/filters needs to be initialized from this backend type, so methods needed by
// it must also be included here.
GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error)
GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error)
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
Expand Down
3 changes: 3 additions & 0 deletions internal/ethapi/transaction_args_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ func (b *backendMock) BlockByHash(ctx context.Context, hash common.Hash) (*types
func (b *backendMock) BlockByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Block, error) {
return nil, nil
}
func (b *backendMock) GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) {
return nil, nil
}
func (b *backendMock) StateAndHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*state.StateDB, *types.Header, error) {
return nil, nil, nil
}
Expand Down
4 changes: 4 additions & 0 deletions les/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ func (b *LesApiBackend) BlockByNumberOrHash(ctx context.Context, blockNrOrHash r
return nil, errors.New("invalid arguments; neither block nor hash specified")
}

func (b *LesApiBackend) GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) {
return light.GetBody(ctx, b.eth.odr, hash, uint64(number))
}

func (b *LesApiBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
return nil, nil
}
Expand Down
Loading

0 comments on commit 68c6bb3

Please sign in to comment.