@@ -22,6 +22,7 @@ import (
22
22
"context"
23
23
"fmt"
24
24
"sync"
25
+ "sync/atomic"
25
26
"time"
26
27
27
28
"github.com/ethereum/go-ethereum"
@@ -58,6 +59,7 @@ type Backend interface {
58
59
ChainDb () ethdb.Database
59
60
HeaderByNumber (ctx context.Context , blockNr rpc.BlockNumber ) (* types.Header , error )
60
61
HeaderByHash (ctx context.Context , blockHash common.Hash ) (* types.Header , error )
62
+ GetBody (ctx context.Context , hash common.Hash , number rpc.BlockNumber ) (* types.Body , error )
61
63
GetReceipts (ctx context.Context , blockHash common.Hash ) (types.Receipts , error )
62
64
GetLogs (ctx context.Context , blockHash common.Hash , number uint64 ) ([][]* types.Log , error )
63
65
PendingBlockAndReceipts () (* types.Block , types.Receipts )
@@ -77,7 +79,7 @@ type Backend interface {
77
79
// FilterSystem holds resources shared by all filters.
78
80
type FilterSystem struct {
79
81
backend Backend
80
- logsCache * lru.Cache [common.Hash , [][] * types. Log ]
82
+ logsCache * lru.Cache [common.Hash , * logCacheElem ]
81
83
cfg * Config
82
84
}
83
85
@@ -86,13 +88,18 @@ func NewFilterSystem(backend Backend, config Config) *FilterSystem {
86
88
config = config .withDefaults ()
87
89
return & FilterSystem {
88
90
backend : backend ,
89
- logsCache : lru.NewCache [common.Hash , [][] * types. Log ](config .LogCacheSize ),
91
+ logsCache : lru.NewCache [common.Hash , * logCacheElem ](config .LogCacheSize ),
90
92
cfg : & config ,
91
93
}
92
94
}
93
95
94
- // cachedGetLogs loads block logs from the backend and caches the result.
95
- func (sys * FilterSystem ) cachedGetLogs (ctx context.Context , blockHash common.Hash , number uint64 ) ([][]* types.Log , error ) {
96
+ type logCacheElem struct {
97
+ logs []* types.Log
98
+ body atomic.Pointer [types.Body ]
99
+ }
100
+
101
+ // cachedLogElem loads block logs from the backend and caches the result.
102
+ func (sys * FilterSystem ) cachedLogElem (ctx context.Context , blockHash common.Hash , number uint64 ) (* logCacheElem , error ) {
96
103
cached , ok := sys .logsCache .Get (blockHash )
97
104
if ok {
98
105
return cached , nil
@@ -105,8 +112,35 @@ func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Has
105
112
if logs == nil {
106
113
return nil , fmt .Errorf ("failed to get logs for block #%d (0x%s)" , number , blockHash .TerminalString ())
107
114
}
108
- sys .logsCache .Add (blockHash , logs )
109
- return logs , nil
115
+ // Database logs are un-derived.
116
+ // Fill in whatever we can (txHash is inaccessible at this point).
117
+ flattened := make ([]* types.Log , 0 )
118
+ var logIdx uint
119
+ for i , txLogs := range logs {
120
+ for _ , log := range txLogs {
121
+ log .BlockHash = blockHash
122
+ log .BlockNumber = number
123
+ log .TxIndex = uint (i )
124
+ log .Index = logIdx
125
+ logIdx ++
126
+ flattened = append (flattened , log )
127
+ }
128
+ }
129
+ elem := & logCacheElem {logs : flattened }
130
+ sys .logsCache .Add (blockHash , elem )
131
+ return elem , nil
132
+ }
133
+
134
+ func (sys * FilterSystem ) cachedGetBody (ctx context.Context , elem * logCacheElem , hash common.Hash , number uint64 ) (* types.Body , error ) {
135
+ if body := elem .body .Load (); body != nil {
136
+ return body , nil
137
+ }
138
+ body , err := sys .backend .GetBody (ctx , hash , rpc .BlockNumber (number ))
139
+ if err != nil {
140
+ return nil , err
141
+ }
142
+ elem .body .Store (body )
143
+ return body , nil
110
144
}
111
145
112
146
// Type determines the kind of filter and is used to put the filter in to
@@ -431,6 +465,12 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
431
465
if es .lightMode && len (filters [LogsSubscription ]) > 0 {
432
466
es .lightFilterNewHead (ev .Block .Header (), func (header * types.Header , remove bool ) {
433
467
for _ , f := range filters [LogsSubscription ] {
468
+ if f .logsCrit .FromBlock != nil && header .Number .Cmp (f .logsCrit .FromBlock ) < 0 {
469
+ continue
470
+ }
471
+ if f .logsCrit .ToBlock != nil && header .Number .Cmp (f .logsCrit .ToBlock ) > 0 {
472
+ continue
473
+ }
434
474
if matchedLogs := es .lightFilterLogs (header , f .logsCrit .Addresses , f .logsCrit .Topics , remove ); len (matchedLogs ) > 0 {
435
475
f .logs <- matchedLogs
436
476
}
@@ -474,42 +514,39 @@ func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func
474
514
475
515
// filter logs of a single header in light client mode
476
516
func (es * EventSystem ) lightFilterLogs (header * types.Header , addresses []common.Address , topics [][]common.Hash , remove bool ) []* types.Log {
477
- if bloomFilter (header .Bloom , addresses , topics ) {
478
- // Get the logs of the block
479
- ctx , cancel := context .WithTimeout (context .Background (), time .Second * 5 )
480
- defer cancel ()
481
- logsList , err := es .sys .cachedGetLogs (ctx , header .Hash (), header .Number .Uint64 ())
482
- if err != nil {
483
- return nil
484
- }
485
- var unfiltered []* types.Log
486
- for _ , logs := range logsList {
487
- for _ , log := range logs {
488
- logcopy := * log
489
- logcopy .Removed = remove
490
- unfiltered = append (unfiltered , & logcopy )
491
- }
492
- }
493
- logs := filterLogs (unfiltered , nil , nil , addresses , topics )
494
- if len (logs ) > 0 && logs [0 ].TxHash == (common.Hash {}) {
495
- // We have matching but non-derived logs
496
- receipts , err := es .backend .GetReceipts (ctx , header .Hash ())
497
- if err != nil {
498
- return nil
499
- }
500
- unfiltered = unfiltered [:0 ]
501
- for _ , receipt := range receipts {
502
- for _ , log := range receipt .Logs {
503
- logcopy := * log
504
- logcopy .Removed = remove
505
- unfiltered = append (unfiltered , & logcopy )
506
- }
507
- }
508
- logs = filterLogs (unfiltered , nil , nil , addresses , topics )
509
- }
517
+ if ! bloomFilter (header .Bloom , addresses , topics ) {
518
+ return nil
519
+ }
520
+ // Get the logs of the block
521
+ ctx , cancel := context .WithTimeout (context .Background (), time .Second * 5 )
522
+ defer cancel ()
523
+ cached , err := es .sys .cachedLogElem (ctx , header .Hash (), header .Number .Uint64 ())
524
+ if err != nil {
525
+ return nil
526
+ }
527
+ unfiltered := append ([]* types.Log {}, cached .logs ... )
528
+ for i , log := range unfiltered {
529
+ // Don't modify in-cache elements
530
+ logcopy := * log
531
+ logcopy .Removed = remove
532
+ // Swap copy in-place
533
+ unfiltered [i ] = & logcopy
534
+ }
535
+ logs := filterLogs (unfiltered , nil , nil , addresses , topics )
536
+ // Txhash is already resolved
537
+ if len (logs ) > 0 && logs [0 ].TxHash != (common.Hash {}) {
510
538
return logs
511
539
}
512
- return nil
540
+ // Resolve txhash
541
+ body , err := es .sys .cachedGetBody (ctx , cached , header .Hash (), header .Number .Uint64 ())
542
+ if err != nil {
543
+ return nil
544
+ }
545
+ for _ , log := range logs {
546
+ // logs are already copied, safe to modify
547
+ log .TxHash = body .Transactions [log .TxIndex ].Hash ()
548
+ }
549
+ return logs
513
550
}
514
551
515
552
// eventLoop (un)installs filters and processes mux events.
0 commit comments