Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/filters: retrieve logs in async #27135

Merged
merged 26 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
78b2136
eth/filters: pendingLogs will not fail
jsvisa Apr 20, 2023
58b35f7
eth/filters: pass a channel to recv logs
jsvisa Apr 20, 2023
4cdd665
eth/filters: retrive logs in async mode
jsvisa Apr 20, 2023
4d33109
eth/filters: backfilling it
jsvisa Apr 21, 2023
ece83c7
eth/filters: make LogsAsync private
jsvisa Apr 22, 2023
fc883c2
Revert "eth/filters: backfilling it"
jsvisa Apr 24, 2023
9654751
eth/filters: close logChan and errChan inside producer's side
jsvisa Apr 24, 2023
be1f183
eth/filters: let's resolve special in the begining of the goroutine
jsvisa Apr 24, 2023
7bcf58b
eth/filters: rename logsAsync to rangeLogsAsync
jsvisa May 5, 2023
9093799
eth/filters: resolve block number in Logs
jsvisa May 5, 2023
368d09c
eth/filters: handle pending logs outside async
jsvisa May 6, 2023
a2fcf1e
eth/filters: handle error
jsvisa May 6, 2023
4ebecb2
add tests for single block and pending
s1na May 8, 2023
23e1876
use logger contract in tests
s1na May 12, 2023
5625439
test full logs output
s1na May 12, 2023
9cf858e
expand tests
s1na May 12, 2023
4353b09
add timeout case
s1na May 12, 2023
aa2cf79
Update eth/filters/filter.go
jsvisa May 18, 2023
a014cd3
Update eth/filters/filter.go
jsvisa May 18, 2023
eda6214
Update eth/filters/filter.go
jsvisa May 18, 2023
da62eac
eth/filters: fix typo
jsvisa May 18, 2023
0ba4db0
eth/filters: no for loop
jsvisa May 18, 2023
2df3c84
eth/filters: no need to check 10th context error
jsvisa May 19, 2023
5000eb7
minor
s1na May 22, 2023
5167b95
Update eth/filters/filter_test.go
s1na May 23, 2023
f54f906
eth/filters: pending logs maybe null
jsvisa May 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 103 additions & 65 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,32 +106,32 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
}
return f.blockLogs(ctx, header)
}
// Short-cut if all we care about is pending logs
if f.begin == rpc.PendingBlockNumber.Int64() {
if f.end != rpc.PendingBlockNumber.Int64() {
return nil, errors.New("invalid block range")
}
return f.pendingLogs()
}
// Figure out the limits of the filter range
header, _ := f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if header == nil {
return nil, nil
}

var (
err error
head = header.Number.Int64()
pending = f.end == rpc.PendingBlockNumber.Int64()
beginPending = f.begin == rpc.PendingBlockNumber.Int64()
endPending = f.end == rpc.PendingBlockNumber.Int64()
)

// special case for pending logs
if beginPending && !endPending {
return nil, errors.New("invalid block range")
}

// Short-cut if all we care about is pending logs
if beginPending && endPending {
return f.pendingLogs(), nil
}

resolveSpecial := func(number int64) (int64, error) {
var hdr *types.Header
switch number {
case rpc.LatestBlockNumber.Int64():
return head, nil
case rpc.PendingBlockNumber.Int64():
case rpc.LatestBlockNumber.Int64(), rpc.PendingBlockNumber.Int64():
// we should return head here since we've already captured
// that we need to get the pending logs in the pending boolean above
return head, nil
hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if hdr == nil {
return 0, errors.New("latest header not found")
}
case rpc.FinalizedBlockNumber.Int64():
hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.FinalizedBlockNumber)
if hdr == nil {
Expand All @@ -147,57 +147,92 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
}
return hdr.Number.Int64(), nil
}

var err error
// range query need to resolve the special begin/end block number
if f.begin, err = resolveSpecial(f.begin); err != nil {
return nil, err
}
if f.end, err = resolveSpecial(f.end); err != nil {
return nil, err
}
// Gather all indexed logs, and finish with non indexed ones

logChan, errChan := f.rangeLogsAsync(ctx)
var logs []*types.Log
for {
select {
case log := <-logChan:
logs = append(logs, log)
case err := <-errChan:
if err != nil {
// if an error occurs during extraction, we do return the extracted data
return logs, err
}
// Append the pending ones
if endPending {
pendingLogs := f.pendingLogs()
logs = append(logs, pendingLogs...)
}
return logs, nil
}
}
}

// rangeLogsAsync retrieves block-range logs that match the filter criteria asynchronously,
// it creates and returns two channels: one for delivering log data, and one for reporting errors.
func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *types.Log, chan error) {
var (
logs []*types.Log
end = uint64(f.end)
size, sections = f.sys.backend.BloomStatus()
logChan = make(chan *types.Log)
errChan = make(chan error)
)
if indexed := sections * size; indexed > uint64(f.begin) {
if indexed > end {
logs, err = f.indexedLogs(ctx, end)
} else {
logs, err = f.indexedLogs(ctx, indexed-1)
}
if err != nil {
return logs, err

go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't reviewed this in any depth, so maybe I'm missing something... but it looks to me like this spins up two routines that may be stuck forever trying to deliver, in case the other side is no longer reading, for whatever reason.
Wouldn't it make sense it use the ctx to check if it's time to abort these routines?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. But seems we have handled this issue inside the indexedLogs https://github.com/ethereum/go-ethereum/blob/101ac683e447a2c23fc0141d33bbb3f2e0e32607/eth/filters/filter.go#L266

I wonder if I've lost something

Copy link
Contributor

@s1na s1na May 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to protect against potential receiver failure, we'd have to wrap every channel send with a select, as in:

select {
  case logchan <- log:
  case ctx.Done():
    return ctx.Error()
}

I guess I'm just wondering if this has an impact on performance or that should be negligible

defer func() {
close(errChan)
close(logChan)
}()

// Gather all indexed logs, and finish with non indexed ones
var (
end = uint64(f.end)
size, sections = f.sys.backend.BloomStatus()
err error
)
if indexed := sections * size; indexed > uint64(f.begin) {
if indexed > end {
indexed = end + 1
}
if err = f.indexedLogs(ctx, indexed-1, logChan); err != nil {
errChan <- err
return
}
}
}
rest, err := f.unindexedLogs(ctx, end)
logs = append(logs, rest...)
if pending {
pendingLogs, err := f.pendingLogs()
if err != nil {
return nil, err

if err := f.unindexedLogs(ctx, end, logChan); err != nil {
errChan <- err
return
}
logs = append(logs, pendingLogs...)
}
return logs, err

errChan <- nil
}()

return logChan, errChan
}

// indexedLogs returns the logs matching the filter criteria based on the bloom
// bits indexed available locally or via the network.
func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
func (f *Filter) indexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error {
// Create a matcher session and request servicing from the backend
matches := make(chan uint64, 64)

session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
if err != nil {
return nil, err
return err
}
defer session.Close()

f.sys.backend.ServiceFilter(ctx, session)

// Iterate over the matches until exhausted or context closed
var logs []*types.Log

for {
select {
case number, ok := <-matches:
Expand All @@ -207,47 +242,50 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err
if err == nil {
f.begin = int64(end) + 1
}
return logs, err
return err
}
f.begin = int64(number) + 1

// Retrieve the suggested block and pull any truly matching logs
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
if header == nil || err != nil {
return logs, err
return err
}
found, err := f.checkMatches(ctx, header)
if err != nil {
return logs, err
return err
}
for _, log := range found {
logChan <- log
}
logs = append(logs, found...)

case <-ctx.Done():
return logs, ctx.Err()
return ctx.Err()
}
}
}

// unindexedLogs returns the logs matching the filter criteria based on raw block
// iteration and bloom matching.
func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
var logs []*types.Log

func (f *Filter) unindexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error {
for ; f.begin <= int64(end); f.begin++ {
if f.begin%10 == 0 && ctx.Err() != nil {
return logs, ctx.Err()
}
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
if header == nil || err != nil {
return logs, err
return err
}
found, err := f.blockLogs(ctx, header)
if err != nil {
return logs, err
return err
}
for _, log := range found {
select {
case logChan <- log:
case <-ctx.Done():
return ctx.Err()
}
}
logs = append(logs, found...)
}
return logs, nil
return nil
}

// blockLogs returns the logs matching the filter criteria within a single block.
Expand Down Expand Up @@ -294,19 +332,19 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*typ
}

// pendingLogs returns the logs matching the filter criteria within the pending block.
func (f *Filter) pendingLogs() ([]*types.Log, error) {
func (f *Filter) pendingLogs() []*types.Log {
block, receipts := f.sys.backend.PendingBlockAndReceipts()
if block == nil {
return nil, errors.New("pending state not available")
return nil
}
if bloomFilter(block.Bloom(), f.addresses, f.topics) {
var unfiltered []*types.Log
for _, r := range receipts {
unfiltered = append(unfiltered, r.Logs...)
}
return filterLogs(unfiltered, nil, nil, f.addresses, f.topics), nil
return filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
}
return nil, nil
return nil
}

func includes(addresses []common.Address, a common.Address) bool {
Expand Down
4 changes: 3 additions & 1 deletion eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type testBackend struct {
rmLogsFeed event.Feed
pendingLogsFeed event.Feed
chainFeed event.Feed
pendingBlock *types.Block
pendingReceipts types.Receipts
}

func (b *testBackend) ChainConfig() *params.ChainConfig {
Expand Down Expand Up @@ -124,7 +126,7 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash, number uint
}

func (b *testBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
return nil, nil
return b.pendingBlock, b.pendingReceipts
}

func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
Expand Down
Loading